<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Spark-Data-Frame-Basics-Part-One" data-toc-modified-id="Spark-Data-Frame-Basics-Part-One-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Spark Data Frame Basics Part One</a></span><ul class="toc-item"><li><span><a href="#Create-Session" data-toc-modified-id="Create-Session-1.1"><span class="toc-item-num">1.1&nbsp;&nbsp;</span>Create Session</a></span></li><li><span><a href="#Explore-data" data-toc-modified-id="Explore-data-1.2"><span class="toc-item-num">1.2&nbsp;&nbsp;</span>Explore data</a></span></li><li><span><a href="#Change-all-columns-types" data-toc-modified-id="Change-all-columns-types-1.3"><span class="toc-item-num">1.3&nbsp;&nbsp;</span>Change all columns types</a></span></li></ul></li><li><span><a href="##-Spark-Data-Frame-Basics-Part-Two" data-toc-modified-id="#-Spark-Data-Frame-Basics-Part-Two-2"><span class="toc-item-num">2&nbsp;&nbsp;</span># Spark Data Frame Basics Part Two</a></span><ul class="toc-item"><li><ul class="toc-item"><li><span><a href="#Spark-data-frame-basic-operations" data-toc-modified-id="Spark-data-frame-basic-operations-2.0.1"><span class="toc-item-num">2.0.1&nbsp;&nbsp;</span>Spark data frame basic operations</a></span></li></ul></li></ul></li></ul></div>

# Spark Data Frame Basics Part One

## Create Session

In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("Basic").getOrCreate()

In [4]:
df = spark.read.json('people.json')

## Explore data

In [5]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [6]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [7]:
df.columns

['age', 'name']

In [10]:
df.describe()

DataFrame[summary: string, age: string, name: string]

In [8]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



## Change all columns types

In [12]:
from pyspark.sql.types import(StructField,StructType,
                              IntegerType,StringType)

In [13]:
data_schema = [StructField("age",IntegerType(),True),StructField("name",StringType(),True)]

In [15]:
#<True: Can it have null values, if False then for null values it gives error>
    

In [16]:
final_struc = StructType(fields = data_schema)

In [17]:
df = spark.read.json("people.json",schema=final_struc)

In [18]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



# # Spark Data Frame Basics Part Two

In [20]:
 df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [21]:
type(df['age'])

pyspark.sql.column.Column

In [23]:
type(df.select('age'))

pyspark.sql.dataframe.DataFrame

In [25]:
df.select('age').show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [28]:
df['age']

Column<age>

In [31]:
type(df.head(2)[0])

pyspark.sql.types.Row

In [32]:
df.withColumn('extra',df.select('age')+99)

TypeError: unsupported operand type(s) for +: 'DataFrame' and 'int'

### Spark data frame basic operations

In [1]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('ops').getOrCreate()

In [4]:
import pandas as pd

In [5]:
a=pd.DataFrame()

In [7]:
df=spark.read.csv('appl_stock.csv',inferSchema=True,header=True)

In [8]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [13]:
df.head(3)[0]

Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)

In [21]:
df.filter('Open>500').show()

+-------------------+------------------+------------------+------------------+-----------------+---------+-----------------+
|               Date|              Open|              High|               Low|            Close|   Volume|        Adj Close|
+-------------------+------------------+------------------+------------------+-----------------+---------+-----------------+
|2012-02-14 00:00:00|        504.659988|         509.56002|        502.000008|       509.459991|115099600|        66.005408|
|2012-02-15 00:00:00|        514.259995|        526.290016|496.88998399999997|       497.669975|376530000|        64.477899|
|2012-02-17 00:00:00|        503.109993|507.77002000000005|        500.299995|        502.12001|133951300|        65.054443|
|2012-02-21 00:00:00|506.88001299999996|        514.850021|504.12000300000005|       514.850021|151398800|        66.703738|
|2012-02-22 00:00:00|        513.079994|        515.489983|509.07002300000005|       513.039993|120825600|66.46923100000001|


In [25]:

df.filter('Open>500').select(['Volume']).head(3)

[Row(Volume=115099600), Row(Volume=376530000), Row(Volume=133951300)]

In [28]:
df.filter((df['Open']>200)&(df['Volume']<376530000)).head(1)

[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)]

In [29]:
df.filter('Open>200 and Volume<376530000').head(1)


[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)]

In [37]:
df.filter('Low==197.16').collect()

Py4JJavaError: An error occurred while calling o329.collectToPython.
: java.lang.IllegalArgumentException
	at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
	at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
	at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
	at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:443)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:426)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
	at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:426)
	at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
	at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
	at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
	at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
	at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:257)
	at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:256)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:256)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:278)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2808)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2805)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2805)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2828)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2805)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.base/java.lang.Thread.run(Thread.java:844)


In [46]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("appName")
sc = SparkContext(conf=conf)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=ops, master=local[*]) created by getOrCreate at <ipython-input-3-2b2c18717ec1>:1 