-
Notifications
You must be signed in to change notification settings - Fork 1
/
route_delay.py
49 lines (43 loc) · 1.61 KB
/
route_delay.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import pyspark
import datetime
from pyspark.sql.functions import *
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
from geopy.distance import vincenty
from pyspark.sql import Row
import csv
from pyspark.sql import SQLContext
import pyspark.sql.functions as sf
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
import csv
def parseCSV(idx, part):
if idx==0:
part.next()
for p in csv.reader(part):
yield Row(ORIGIN=p[14],
DEST = p[23],
ROUTE = (p[14],p[23]),
ARR_DEL15 = p[44])
def main(sc):
spark = HiveContext(sc)
sqlContext = HiveContext(sc)
rows = sc.textFile('Flight_Project/Data/864625436_T_ONTIME_*2.csv').mapPartitionsWithIndex(parseCSV)
df = sqlContext.createDataFrame(rows)
first = df.withColumn('ARR_DEL15', df['ARR_DEL15'].cast('int'))
sec = first.na.drop()
third = sec.select('ROUTE','ARR_DEL15','DEST','ORIGIN').filter(sec.ARR_DEL15 == 1).groupby('ROUTE').count().withColumnRenamed('count', 'delay_count')
fourth = third.sort(desc('count'))
fifth = sec.select('ROUTE','ARR_DEL15','DEST','ORIGIN').groupby('ROUTE').count()
sixth = fifth.sort(desc('count'))
eighth = fourth.join(sixth,'ROUTE')
ninth = eighth.sort(desc('count'))
tenth = ninth.select('ROUTE',ninth['delay_count']/ninth['count'])
eleventh=tenth.sort(desc('(delay_count / count)'))
twelveth = eleventh.join(eighth,'ROUTE')
thirteen = twelveth.sort(desc('(delay_count / count)'))
thirteen.toPandas().to_csv('Output/route_delayone.csv')
if __name__ == "__main__":
sc = SparkContext()
main(sc)