## 1. Creating the Spark Session

In [1]:
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkConf, SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = (SparkSession
         .builder
         .master("local[*]")
         .getOrCreate())

sc = spark.sparkContext
sqlContext = SQLContext(spark.sparkContext)

SparkContext.setSystemProperty('spark.executor.memory', '4g')
SparkContext.setSystemProperty('spark.driver.memory', '20g')
SparkContext.setSystemProperty('spark.driver.maxResultSize', '5g')

## 2. Load The Data From a File Into a Dataframe

In [2]:
import pandas as pd
dff = pd.read_csv('data/flight.csv')

In [3]:
dff.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 336776 entries, 0 to 336775
Data columns (total 13 columns):
dep_time          328521 non-null float64
sched_dep_time    336776 non-null int64
dep_delay         328521 non-null float64
arr_time          328063 non-null float64
sched_arr_time    336776 non-null int64
arr_delay         327346 non-null float64
carrier           336776 non-null object
flight            336776 non-null int64
tailnum           334264 non-null object
origin            336776 non-null object
dest              336776 non-null object
distance          336776 non-null int64
time_hour         336776 non-null object
dtypes: float64(4), int64(4), object(5)
memory usage: 33.4+ MB


In [4]:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', 
                                        inferschema='true').load('data/flight.csv')
rows = df.count()
columns = len(df.columns)
print("The dataset contains {0} rows and {1} columns.".format(rows, columns))
df.dtypes

#df = pd.read_csv('../flight.csv')
#df.info()

The dataset contains 336776 rows and 13 columns.


[('dep_time', 'double'),
 ('sched_dep_time', 'int'),
 ('dep_delay', 'double'),
 ('arr_time', 'double'),
 ('sched_arr_time', 'int'),
 ('arr_delay', 'double'),
 ('carrier', 'string'),
 ('flight', 'int'),
 ('tailnum', 'string'),
 ('origin', 'string'),
 ('dest', 'string'),
 ('distance', 'int'),
 ('time_hour', 'timestamp')]

In [5]:
df.limit(5).toPandas()

Unnamed: 0,dep_time,sched_dep_time,dep_delay,arr_time,sched_arr_time,arr_delay,carrier,flight,tailnum,origin,dest,distance,time_hour
0,517.0,515,2.0,830.0,819,11.0,UA,1545,N14228,EWR,IAH,1400,2013-01-01 05:00:00
1,533.0,529,4.0,850.0,830,20.0,UA,1714,N24211,LGA,IAH,1416,2013-01-01 05:00:00
2,542.0,540,2.0,923.0,850,33.0,AA,1141,N619AA,JFK,MIA,1089,2013-01-01 05:00:00
3,544.0,545,-1.0,1004.0,1022,-18.0,B6,725,N804JB,JFK,BQN,1576,2013-01-01 05:00:00
4,554.0,600,-6.0,812.0,837,-25.0,DL,461,N668DN,LGA,ATL,762,2013-01-01 06:00:00


In [6]:
dff[['dep_delay','arr_delay','carrier','origin',
 'dest','distance','time_hour']].head()

Unnamed: 0,dep_delay,arr_delay,carrier,origin,dest,distance,time_hour
0,2.0,11.0,UA,EWR,IAH,1400,2013-01-01 05:00:00
1,4.0,20.0,UA,LGA,IAH,1416,2013-01-01 05:00:00
2,2.0,33.0,AA,JFK,MIA,1089,2013-01-01 05:00:00
3,-1.0,-18.0,B6,JFK,BQN,1576,2013-01-01 05:00:00
4,-6.0,-25.0,DL,LGA,ATL,762,2013-01-01 06:00:00


In [7]:
df.select('dep_delay','arr_delay','carrier','origin',
 'dest','distance','time_hour').limit(5).toPandas()

#df[['dep_delay','arr_delay','carrier','origin',
# 'dest','distance','time_hour']].head()

Unnamed: 0,dep_delay,arr_delay,carrier,origin,dest,distance,time_hour
0,2.0,11.0,UA,EWR,IAH,1400,2013-01-01 05:00:00
1,4.0,20.0,UA,LGA,IAH,1416,2013-01-01 05:00:00
2,2.0,33.0,AA,JFK,MIA,1089,2013-01-01 05:00:00
3,-1.0,-18.0,B6,JFK,BQN,1576,2013-01-01 05:00:00
4,-6.0,-25.0,DL,LGA,ATL,762,2013-01-01 06:00:00


In [8]:
df = df.withColumn('distance', df.distance.astype('float'))
df = df.drop('tailnum')
df.limit(5).toPandas()
#df['distance'] = df['distance'].astype('float')
#df = df.drop(['tailnum'], axis = 1)

Unnamed: 0,dep_time,sched_dep_time,dep_delay,arr_time,sched_arr_time,arr_delay,carrier,flight,origin,dest,distance,time_hour
0,517.0,515,2.0,830.0,819,11.0,UA,1545,EWR,IAH,1400.0,2013-01-01 05:00:00
1,533.0,529,4.0,850.0,830,20.0,UA,1714,LGA,IAH,1416.0,2013-01-01 05:00:00
2,542.0,540,2.0,923.0,850,33.0,AA,1141,JFK,MIA,1089.0,2013-01-01 05:00:00
3,544.0,545,-1.0,1004.0,1022,-18.0,B6,725,JFK,BQN,1576.0,2013-01-01 05:00:00
4,554.0,600,-6.0,812.0,837,-25.0,DL,461,LGA,ATL,762.0,2013-01-01 06:00:00


In [9]:
df.select('dep_delay','arr_delay','distance',
          'time_hour').describe().toPandas()
    
# df[['dep_delay','arr_delay','distance',
#          'time_hour']].describe()

Unnamed: 0,summary,dep_delay,arr_delay,distance
0,count,328521.0,327346.0,336776.0
1,mean,12.639070257304708,6.89537675731489,1039.9126036297123
2,stddev,40.21006089213004,44.633291690194,733.2330333236779
3,min,-43.0,-86.0,17.0
4,max,1301.0,1272.0,4983.0


In [10]:
df.filter((df.time_hour > '2013-05-01') & (df.time_hour <= '2013-05-05')).limit(5).toPandas()
#df[(df.start_time > '2013-05-01') & (df.start_time <= '2013-05-05')].head(10)

Unnamed: 0,dep_time,sched_dep_time,dep_delay,arr_time,sched_arr_time,arr_delay,carrier,flight,origin,dest,distance,time_hour
0,9.0,1655,434.0,308.0,2020,408.0,VX,413,JFK,LAX,2475.0,2013-05-01 16:00:00
1,451.0,500,-9.0,641.0,640,1.0,US,1219,EWR,CLT,529.0,2013-05-01 05:00:00
2,537.0,540,-3.0,836.0,840,-4.0,AA,701,JFK,MIA,1089.0,2013-05-01 05:00:00
3,544.0,545,-1.0,818.0,827,-9.0,UA,450,LGA,IAH,1416.0,2013-05-01 05:00:00
4,548.0,600,-12.0,831.0,854,-23.0,B6,371,LGA,FLL,1076.0,2013-05-01 06:00:00


In [11]:
df = df.withColumn('origin', when((col('origin') == 'EWR'), 
            'ewr').otherwise(df['origin']))
df.groupBy('origin').count().toPandas().sort_values(by='count', 
                                                    ascending=False)
#df['origin'] = np.where(df['origin']== 'EWR', 'ewr', df['origin'])
#df.groupby('origin').origin.count().to_frame().sort_values(by='origin', 
                                                            #ascending=False)

Unnamed: 0,origin,count
1,ewr,120835
2,JFK,111279
0,LGA,104662


In [12]:
df = df.withColumn('year', year(df.time_hour))
df = df.withColumn('month', month(df.time_hour))
df = df.withColumn('day', dayofmonth(df.time_hour))
df = df.withColumn('week_day', dayofweek(df.time_hour))
df = df.withColumn('hour', hour(df.time_hour))
df = df.withColumn('minute', minute(df.time_hour))
df = df.withColumn('second', second(df.time_hour))
df = df.withColumn("date",df['time_hour'].cast(DateType()))
df = df.withColumn('unix_time', unix_timestamp(df.time_hour))
df.limit(5).toPandas().iloc[:,6:]

#df['time_hour'] = pd.to_datetime(df['time_hour'])
#df['year'] = df['time_hour'].dt.year
#df['month'] = df['time_hour'].dt.month
#df['day'] = df['time_hour'].dt.day
#df['week_day'] = df['time_hour'].dt.dayofweek
#df['hour'] = df['time_hour'].dt.hour
#df['minute'] = df['time_hour'].dt.minute
#df['second'] = df['time_hour'].dt.second
#df['start_time'] = df['time_hour'].map(lambda x: x.strftime('%Y-%m-%d'))
#df['unix_time']= df['time_hour'].astype('int64')//1e9

Unnamed: 0,carrier,flight,origin,dest,distance,time_hour,year,month,day,week_day,hour,minute,second,date,unix_time
0,UA,1545,ewr,IAH,1400.0,2013-01-01 05:00:00,2013,1,1,3,5,0,0,2013-01-01,1357016400
1,UA,1714,LGA,IAH,1416.0,2013-01-01 05:00:00,2013,1,1,3,5,0,0,2013-01-01,1357016400
2,AA,1141,JFK,MIA,1089.0,2013-01-01 05:00:00,2013,1,1,3,5,0,0,2013-01-01,1357016400
3,B6,725,JFK,BQN,1576.0,2013-01-01 05:00:00,2013,1,1,3,5,0,0,2013-01-01,1357016400
4,DL,461,LGA,ATL,762.0,2013-01-01 06:00:00,2013,1,1,3,6,0,0,2013-01-01,1357020000


In [13]:
df.groupby(['origin', 'date']).agg(mean('distance').alias('mean_distance'),
                                      sum('distance').alias('sum_distance'),
                                      ).limit(5).toPandas()
    
#df1 = df.groupby(['origin','date'])
#df1['distance'].agg([np.mean,np.sum]).head(10)

Unnamed: 0,origin,date,mean_distance,sum_distance
0,JFK,2013-12-06,1302.83959,381732.0
1,LGA,2013-02-17,816.751055,193570.0
2,ewr,2013-08-09,1098.631285,393310.0
3,LGA,2013-10-02,751.6,251786.0
4,ewr,2013-11-27,1077.449591,395424.0


In [14]:
df.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save("data/xxx.csv")
#df.to_csv(xxx.csv, index=False)