#### Read data

In order to use Spark and its DataFrame API, we will need to use a SQLContext. When running Spark, we start Spark application by creating a SparkContext. We can then create a SQLContext from the SparkContext.

In [0]:
spark_DF = sqlContext.read.format("csv").options(header='true', inferSchema='true').load("/FileStore/tables/nycflights.csv")

Let's display the first 1000 rows.

In [0]:
display(spark_DF)

year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,air_time,distance,hour,minute
2013,6,30,940,15,1216,-4,VX,N626VA,407,JFK,LAX,313,2475,9,40
2013,5,7,1657,-3,2104,10,DL,N3760C,329,JFK,SJU,216,1598,16,57
2013,12,8,859,-1,1238,11,DL,N712TW,422,JFK,LAX,376,2475,8,59
2013,5,14,1841,-4,2122,-34,DL,N914DL,2391,JFK,TPA,135,1005,18,41
2013,7,21,1102,-3,1230,-8,9E,N823AY,3652,LGA,ORF,50,296,11,2
2013,1,1,1817,-3,2008,3,AA,N3AXAA,353,LGA,ORD,138,733,18,17
2013,12,9,1259,14,1617,22,WN,N218WN,1428,EWR,HOU,240,1411,12,59
2013,8,13,1920,85,2032,71,B6,N284JB,1407,JFK,IAD,48,228,19,20
2013,9,26,725,-10,1027,-8,AA,N3FSAA,2279,LGA,MIA,148,1096,7,25
2013,4,30,1323,62,1549,60,EV,N12163,4162,EWR,JAX,110,820,13,23


But how many rows do we have in the data frame?

In [0]:
print(spark_DF.count())

32735


Before doing any analysis, let's see the schema. We can also see the type of the DataFrame.

In [0]:
spark_DF.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: integer (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_time: integer (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)



In [0]:
print(type(spark_DF))

<class 'pyspark.sql.dataframe.DataFrame'>


##### Spark Operations

Now, let's perform various Spark DataFrame operations.

**select** helps us to select one or more columns. ***** means all columns

In [0]:
m=spark_DF.select("*")

display(m)

year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,air_time,distance,hour,minute
2013,6,30,940,15,1216,-4,VX,N626VA,407,JFK,LAX,313,2475,9,40
2013,5,7,1657,-3,2104,10,DL,N3760C,329,JFK,SJU,216,1598,16,57
2013,12,8,859,-1,1238,11,DL,N712TW,422,JFK,LAX,376,2475,8,59
2013,5,14,1841,-4,2122,-34,DL,N914DL,2391,JFK,TPA,135,1005,18,41
2013,7,21,1102,-3,1230,-8,9E,N823AY,3652,LGA,ORF,50,296,11,2
2013,1,1,1817,-3,2008,3,AA,N3AXAA,353,LGA,ORD,138,733,18,17
2013,12,9,1259,14,1617,22,WN,N218WN,1428,EWR,HOU,240,1411,12,59
2013,8,13,1920,85,2032,71,B6,N284JB,1407,JFK,IAD,48,228,19,20
2013,9,26,725,-10,1027,-8,AA,N3FSAA,2279,LGA,MIA,148,1096,7,25
2013,4,30,1323,62,1549,60,EV,N12163,4162,EWR,JAX,110,820,13,23


In [0]:
spark_DF.select('year','month','day').show(10)

+----+-----+---+
|year|month|day|
+----+-----+---+
|2013|    6| 30|
|2013|    5|  7|
|2013|   12|  8|
|2013|    5| 14|
|2013|    7| 21|
|2013|    1|  1|
|2013|   12|  9|
|2013|    8| 13|
|2013|    9| 26|
|2013|    4| 30|
+----+-----+---+
only showing top 10 rows



**drop** helps us to drop column(s).

In [0]:
display(m.drop("month"))

year,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,air_time,distance,hour,minute
2013,30,940,15,1216,-4,VX,N626VA,407,JFK,LAX,313,2475,9,40
2013,7,1657,-3,2104,10,DL,N3760C,329,JFK,SJU,216,1598,16,57
2013,8,859,-1,1238,11,DL,N712TW,422,JFK,LAX,376,2475,8,59
2013,14,1841,-4,2122,-34,DL,N914DL,2391,JFK,TPA,135,1005,18,41
2013,21,1102,-3,1230,-8,9E,N823AY,3652,LGA,ORF,50,296,11,2
2013,1,1817,-3,2008,3,AA,N3AXAA,353,LGA,ORD,138,733,18,17
2013,9,1259,14,1617,22,WN,N218WN,1428,EWR,HOU,240,1411,12,59
2013,13,1920,85,2032,71,B6,N284JB,1407,JFK,IAD,48,228,19,20
2013,26,725,-10,1027,-8,AA,N3FSAA,2279,LGA,MIA,148,1096,7,25
2013,30,1323,62,1549,60,EV,N12163,4162,EWR,JAX,110,820,13,23


We can also change the column names.

In [0]:
m=spark_DF.select(spark_DF.year.alias('Year'), spark_DF.month.alias('Month'), spark_DF.day.alias('Day'))
display(m)

Year,Month,Day
2013,6,30
2013,5,7
2013,12,8
2013,5,14
2013,7,21
2013,1,1
2013,12,9
2013,8,13
2013,9,26
2013,4,30


In [0]:
from pyspark.sql.functions import length

m=spark_DF.select(spark_DF.origin,length(spark_DF.origin).alias('Length'))
display(m)


origin,Length
JFK,3
JFK,3
JFK,3
JFK,3
LGA,3
LGA,3
EWR,3
JFK,3
LGA,3
EWR,3


We can use **filter** or **where** to filter certain rows.

In [0]:
m=spark_DF.filter("month =1") # selecting January only
display(m)

year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,air_time,distance,hour,minute
2013,1,1,1817,-3,2008,3,AA,N3AXAA,353,LGA,ORD,138,733,18,17
2013,1,23,2024,37,2141,29,EV,N17115,4412,EWR,BUF,53,282,20,24
2013,1,15,1626,-3,1941,10,B6,N594JB,369,LGA,PBI,150,1035,16,26
2013,1,17,626,-4,846,3,US,N554UW,1433,LGA,CLT,105,544,6,26
2013,1,8,902,-3,1006,-17,B6,N281JB,56,JFK,BTV,47,266,9,2
2013,1,15,1947,167,2241,171,AA,N5EGAA,575,JFK,EGE,265,1747,19,47
2013,1,1,1454,-4,1554,-21,EV,N11544,4390,EWR,PWM,47,284,14,54
2013,1,30,1306,-9,1430,-1,EV,N13969,4120,EWR,BUF,61,282,13,6
2013,1,4,1942,-3,2249,-40,B6,N637JB,645,JFK,SFO,333,2586,19,42
2013,1,8,1859,-6,2158,-27,AA,N322AA,21,JFK,LAX,337,2475,18,59


In [0]:
m=spark_DF.filter(spark_DF.month > 9)
display(m)

year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,air_time,distance,hour,minute
2013,12,8,859,-1,1238,11,DL,N712TW,422,JFK,LAX,376,2475,8,59
2013,12,9,1259,14,1617,22,WN,N218WN,1428,EWR,HOU,240,1411,12,59
2013,11,22,1320,5,1628,-2,B6,N526JB,1639,LGA,RSW,161,1080,13,20
2013,10,21,1217,-4,1322,-6,B6,N192JB,34,JFK,BTV,46,266,12,17
2013,10,21,859,-1,1036,11,UA,N57852,1030,EWR,ORD,121,719,8,59
2013,11,5,1126,11,1257,5,EV,N600QX,5273,LGA,PIT,58,335,11,26
2013,12,7,1154,-6,1356,-4,US,N122US,725,JFK,CLT,101,541,11,54
2013,10,8,821,-8,1007,-22,MQ,N528MQ,3478,LGA,DTW,77,502,8,21
2013,11,1,1058,-2,1227,-23,WN,N266WN,1341,LGA,BNA,124,764,10,58
2013,12,24,838,38,1104,32,UA,N556UA,561,LGA,DEN,237,1620,8,38


In [0]:
m=spark_DF.filter(spark_DF.month < 9).filter('month>=5')
display(m)

year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,air_time,distance,hour,minute
2013,6,30,940,15,1216,-4,VX,N626VA,407,JFK,LAX,313,2475,9,40
2013,5,7,1657,-3,2104,10,DL,N3760C,329,JFK,SJU,216,1598,16,57
2013,5,14,1841,-4,2122,-34,DL,N914DL,2391,JFK,TPA,135,1005,18,41
2013,7,21,1102,-3,1230,-8,9E,N823AY,3652,LGA,ORF,50,296,11,2
2013,8,13,1920,85,2032,71,B6,N284JB,1407,JFK,IAD,48,228,19,20
2013,6,17,940,5,1050,-4,B6,N351JB,20,JFK,ROC,50,264,9,40
2013,8,5,757,-3,1041,-23,DL,N380DA,1271,JFK,FLL,131,1069,7,57
2013,8,18,1638,8,1942,-17,VX,N849VA,27,JFK,SFO,334,2586,16,38
2013,7,7,2310,105,201,127,B6,N506JB,97,JFK,DEN,223,1626,23,10
2013,8,16,1251,-4,1433,-7,MQ,N720MQ,3388,LGA,CMH,83,479,12,51


[`distinct()`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.distinct) filters out duplicate rows, and it considers all columns.

In [0]:
m=spark_DF.select('month').filter(spark_DF.month < 9).filter('month>=5').distinct().collect()
display(m)

month
6
5
8
7


`where()` is an alias for `filter()`.

In [0]:
m=spark_DF.where("month <= 4")
display(m)

year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,air_time,distance,hour,minute
2013,1,1,1817,-3,2008,3,AA,N3AXAA,353,LGA,ORD,138,733,18,17
2013,4,30,1323,62,1549,60,EV,N12163,4162,EWR,JAX,110,820,13,23
2013,4,26,809,-2,1030,22,EV,N16559,5790,EWR,DAY,87,533,8,9
2013,3,25,2054,115,2256,91,FL,N919AT,645,LGA,ATL,104,762,20,54
2013,1,23,2024,37,2141,29,EV,N17115,4412,EWR,BUF,53,282,20,24
2013,2,8,644,-1,817,20,EV,N14916,4241,EWR,DCA,45,199,6,44
2013,1,15,1626,-3,1941,10,B6,N594JB,369,LGA,PBI,150,1035,16,26
2013,2,1,729,9,1018,-5,UA,N36247,1724,EWR,PBI,154,1023,7,29
2013,1,17,626,-4,846,3,US,N554UW,1433,LGA,CLT,105,544,6,26
2013,4,24,2253,123,5,101,EV,N14562,3852,EWR,BUF,53,282,22,53


In [0]:
m=spark_DF.where(spark_DF.month==12)
display(m)

year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,air_time,distance,hour,minute
2013,12,8,859,-1,1238,11,DL,N712TW,422,JFK,LAX,376,2475,8,59
2013,12,9,1259,14,1617,22,WN,N218WN,1428,EWR,HOU,240,1411,12,59
2013,12,7,1154,-6,1356,-4,US,N122US,725,JFK,CLT,101,541,11,54
2013,12,24,838,38,1104,32,UA,N556UA,561,LGA,DEN,237,1620,8,38
2013,12,18,730,0,1038,-2,AA,N3KCAA,29,LGA,PBI,140,1035,7,30
2013,12,26,958,-7,1141,-14,MQ,N817MQ,3572,LGA,CLE,78,419,9,58
2013,12,9,1922,-8,2234,-2,DL,N989DL,1485,LGA,MCO,149,950,19,22
2013,12,6,2052,-8,2238,32,US,N957UW,2164,LGA,BOS,40,184,20,52
2013,12,3,2126,-4,2229,-16,MQ,N817MQ,3621,JFK,DCA,39,213,21,26
2013,12,30,1154,9,1338,23,WN,N961WN,172,LGA,MDW,123,725,11,54


We can also use user defined functions

In [0]:
from pyspark.sql.types import BooleanType
even_months = udf(lambda s: s%2==0, BooleanType())  # select even months only
m = spark_DF.filter(even_months(spark_DF.month))
display(m.select('month').distinct())    # want to see the distinct months


month
12
6
4
8
10
2


In [0]:
even_months = udf(lambda s: s in (6,7,8,12), BooleanType())  # select June-August and December
m = spark_DF.filter(even_months(spark_DF.month))
display(m.select('month').distinct())

month
12
6
8
7


[`groupBy()`]((http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.groupBy) is one of the most powerful transformations. It allows you to perform aggregations on a DataFrame.

Unlike other DataFrame transformations, `groupBy()` does _not_ return a DataFrame. Instead, it returns a special [GroupedData](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData) object that contains various aggregation functions.

The most commonly used aggregation function is [count()](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData.count),
but there are others (like [sum()](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData.sum), [max()](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData.max), and [avg()](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData.avg).

These aggregation functions typically create a new column and return a new DataFrame.

In [0]:
m=spark_DF.groupBy(spark_DF.carrier).avg('distance')
display(m)

carrier,avg(distance)
UA,1528.4623916811092
AA,1349.525407779172
EV,561.5184753014391
B6,1062.682477678571
DL,1244.7432119553778
OO,615.3333333333334
F9,1620.0
YV,395.4150943396226
US,557.3861042183623
MQ,565.3972875947347


Averages of groups using DataFrames. [`orderBy()`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.distinct) allows you to sort a DataFrame by one or more columns, producing a new DataFrame.

In [0]:
m=spark_DF.groupBy(spark_DF.carrier).avg('distance').orderBy('carrier')
display(m)

carrier,avg(distance)
9E,537.590212264151
AA,1349.525407779172
AS,2402.0
B6,1062.682477678571
DL,1244.7432119553778
EV,561.5184753014391
F9,1620.0
FL,651.2736156351791
HA,4983.0
MQ,565.3972875947347


in orderBy(), the default is ascending but we can change it to descending by setting "ascending=False".

In [0]:
m=spark_DF.groupBy(spark_DF.carrier).avg('distance').orderBy('carrier',ascending=False)
display(m)

carrier,avg(distance)
YV,395.4150943396226
WN,995.4631245043616
VX,2501.2193158953723
US,557.3861042183623
UA,1528.4623916811092
OO,615.3333333333334
MQ,565.3972875947347
HA,4983.0
FL,651.2736156351791
F9,1620.0


In [0]:
m=spark_DF.groupBy(spark_DF.carrier).avg('distance')
display(m.sort('carrier',ascending=False))

carrier,avg(distance)
YV,395.4150943396226
WN,995.4631245043616
VX,2501.2193158953723
US,557.3861042183623
UA,1528.4623916811092
OO,615.3333333333334
MQ,565.3972875947347
HA,4983.0
FL,651.2736156351791
F9,1620.0


Use the [`count` function](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData.count) to find the number of times each carrier occurs

In [0]:
m=spark_DF.groupBy(spark_DF.carrier).count()
display(m)

carrier,count
UA,5770
AA,3188
EV,5142
B6,5376
DL,4751
OO,3
F9,69
YV,53
US,2015
MQ,2507


We can also calculate maximum, minimum and sum of groups using DataFrames

In [0]:
m=spark_DF.groupBy(spark_DF.carrier).max('distance','hour')
display(m)

carrier,max(distance),max(hour)
UA,4963,23
AA,2586,23
EV,1325,23
B6,2586,24
DL,2586,23
OO,1008,19
F9,1620,20
YV,544,20
US,2153,22
MQ,1147,23


In [0]:
m=spark_DF.groupBy(spark_DF.carrier).min('distance','hour')
display(m)

carrier,min(distance),min(hour)
UA,116,0
AA,187,0
EV,116,0
B6,173,0
DL,187,0
OO,419,14
F9,1620,8
YV,229,11
US,94,0
MQ,184,0


In [0]:
m=spark_DF.groupBy(spark_DF.carrier).sum('distance')
display(m)

carrier,sum(distance)
UA,8819228
AA,4302287
EV,2887328
B6,5712981
DL,5913775
OO,1846
F9,111780
YV,20957
US,1123133
MQ,1417451


### SQL statements in Spark

In [0]:
 # SQL statements can be run by using the sql methods provided by `spark`
  
# Register this DataFrame as a table.

spark_DF.registerTempTable("spark_DF")




In [0]:
m = sqlContext.sql("SELECT * FROM spark_DF WHERE month >= 5 AND month <= 9")
display(m)

year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,air_time,distance,hour,minute
2013,6,30,940,15,1216,-4,VX,N626VA,407,JFK,LAX,313,2475,9,40
2013,5,7,1657,-3,2104,10,DL,N3760C,329,JFK,SJU,216,1598,16,57
2013,5,14,1841,-4,2122,-34,DL,N914DL,2391,JFK,TPA,135,1005,18,41
2013,7,21,1102,-3,1230,-8,9E,N823AY,3652,LGA,ORF,50,296,11,2
2013,8,13,1920,85,2032,71,B6,N284JB,1407,JFK,IAD,48,228,19,20
2013,9,26,725,-10,1027,-8,AA,N3FSAA,2279,LGA,MIA,148,1096,7,25
2013,6,17,940,5,1050,-4,B6,N351JB,20,JFK,ROC,50,264,9,40
2013,8,5,757,-3,1041,-23,DL,N380DA,1271,JFK,FLL,131,1069,7,57
2013,8,18,1638,8,1942,-17,VX,N849VA,27,JFK,SFO,334,2586,16,38
2013,7,7,2310,105,201,127,B6,N506JB,97,JFK,DEN,223,1626,23,10


In [0]:
m = sqlContext.sql("SELECT * FROM spark_DF WHERE month >= 5 AND month <= 9")
display(m)


year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,air_time,distance,hour,minute
2013,6,30,940,15,1216,-4,VX,N626VA,407,JFK,LAX,313,2475,9,40
2013,5,7,1657,-3,2104,10,DL,N3760C,329,JFK,SJU,216,1598,16,57
2013,5,14,1841,-4,2122,-34,DL,N914DL,2391,JFK,TPA,135,1005,18,41
2013,7,21,1102,-3,1230,-8,9E,N823AY,3652,LGA,ORF,50,296,11,2
2013,8,13,1920,85,2032,71,B6,N284JB,1407,JFK,IAD,48,228,19,20
2013,9,26,725,-10,1027,-8,AA,N3FSAA,2279,LGA,MIA,148,1096,7,25
2013,6,17,940,5,1050,-4,B6,N351JB,20,JFK,ROC,50,264,9,40
2013,8,5,757,-3,1041,-23,DL,N380DA,1271,JFK,FLL,131,1069,7,57
2013,8,18,1638,8,1942,-17,VX,N849VA,27,JFK,SFO,334,2586,16,38
2013,7,7,2310,105,201,127,B6,N506JB,97,JFK,DEN,223,1626,23,10


In [0]:
m = sqlContext.sql("SELECT carrier, max(distance) AS Maximum_Distance FROM spark_DF GROUP BY carrier ORDER BY Maximum_Distance DESC")
display(m)

carrier,Maximum_Distance
HA,4983
UA,4963
AA,2586
B6,2586
DL,2586
VX,2586
AS,2402
US,2153
WN,2133
F9,1620


In [0]:
m = sqlContext.sql("SELECT DISTINCT(carrier) FROM spark_DF")
display(m)

carrier
UA
AA
EV
B6
DL
OO
F9
YV
US
MQ


In [0]:
m = sqlContext.sql("SELECT DISTINCT(month) FROM spark_DF")
display(m)

month
12
1
6
3
5
9
4
8
7
10


In [0]:
m = sqlContext.sql("SELECT DISTINCT month FROM spark_DF WHERE month > 3 AND month < 10")
display(m)

month
6
5
9
4
8
7


In [0]:
m = sqlContext.sql("SELECT DISTINCT month FROM spark_DF WHERE month BETWEEN 4 AND 9")
display(m)

month
6
5
9
4
8
7


In [0]:
m = sqlContext.sql("SELECT carrier, max(distance) AS Maximum_Distance FROM spark_DF GROUP BY carrier ORDER BY Maximum_Distance DESC LIMIT 5")
display(m)

carrier,Maximum_Distance
HA,4983
UA,4963
AA,2586
B6,2586
DL,2586


In [0]:
m = sqlContext.sql("SELECT carrier, max(distance) AS Maximum_Distance FROM spark_DF GROUP BY carrier HAVING Maximum_Distance > 2000 ORDER BY Maximum_Distance DESC")

display(m)

carrier,Maximum_Distance
HA,4983
UA,4963
AA,2586
B6,2586
DL,2586
VX,2586
AS,2402
US,2153
WN,2133


In [0]:
m = sqlContext.sql("SELECT DISTINCT month FROM spark_DF WHERE month !=2")
display(m)

month
12
1
6
3
5
9
4
8
7
10


In [0]:
m = sqlContext.sql("SELECT DISTINCT carrier FROM spark_DF WHERE month =1 OR month = 12")
display(m)

carrier
UA
AA
EV
B6
DL
F9
YV
US
MQ
HA


In [0]:
m = sqlContext.sql("SELECT DISTINCT carrier FROM spark_DF WHERE carrier LIKE ('A%')")
display(m)

carrier
AA
AS
