In [1]:
import pandas as pd
import numpy as np
import datetime as dt

import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *#avg, count, expr
from pyspark.sql.types import *

In [2]:
sc = pyspark.SparkContext()
ss = SparkSession(sc)

## Access the <a href=http://localhost:4040/jobs/> Spark GUI</a>

In [3]:
shows = int(input('show progress?'))

show progress? 0


In [4]:
stt = dt.datetime.now()
print(stt)

2021-03-29 12:45:17.963974


In [5]:
''' load the data into temp views '''
# paths
filDelay = './LearningSparkV2-master/databricks-datasets/learning-spark-v2/flights/departuredelays.csv'
filAport = './LearningSparkV2-master/databricks-datasets/learning-spark-v2/flights/airport-codes-na.txt'

# load delay data
schema = StructType([StructField('date', StringType()), # need this as a string so it's at least filterable correctly
                     StructField('delay', IntegerType()),
                     StructField('distance', IntegerType()),
                     StructField('origin', StringType()),
                     StructField('destination', StringType())])

dfDelay = ss.read.format('csv').options(header=True, inferschema=False).schema(schema).load(filDelay).cache()

# load the airports data
schema = StructType([StructField('City', StringType()),
                     StructField('State', StringType()),
                     StructField('Country', StringType()),
                     StructField('IATA', StringType())])
dfAport = ss.read.format('csv').options(header=True, inferschema=False).schema(schema).option("delimiter", "\t").load(filAport).cache()

# make views
dfDelay.createOrReplaceTempView('departureDelays')
dfAport.createOrReplaceTempView('airports_na')

# see some data
if shows:
    ss.sql('select * from departureDelays limit 10;').show()
    ss.sql('select * from airports_na limit 10;').show()

In [6]:
# filter delays Seattle -> San Francisco
foo = dfDelay.filter(expr("""origin == 'SEA' and destination == 'SFO' and date like '01010%' and delay > 0""")).cache()
foo.createOrReplaceTempView('foo')
if shows:
    ss.sql('select * from foo;').show()

In [7]:
# demonstrate union
if shows:
    print(dfDelay.distinct().union(foo).count()) # union is UNION ALL
    print(dfDelay.distinct().union(foo).distinct().count())

In [8]:
# inner join - spark uses broadcast without me specifying it
jnd = foo.join(dfAport[['IATA', 'City', 'State']], on=[foo['origin']==dfAport['IATA']], how='inner').withColumnRenamed('City', 'Origin_City').withColumnRenamed('State', 'Origin_State').drop('IATA').\
    join(dfAport[['IATA', 'City', 'State']], on=[foo['destination']==dfAport['IATA']], how='inner').withColumnRenamed('City', 'Dest_City').withColumnRenamed('State', 'Dest_State').drop('IATA')
#jnd = foo.join(broadcast(dfAport[['IATA', 'City', 'State']]), on=[foo['origin']==dfAport['IATA']], how='inner').withColumnRenamed('City', 'Origin_City').withColumnRenamed('State', 'Origin_State').drop('IATA').\
#    join(broadcast(dfAport[['IATA', 'City', 'State']]), on=[foo['destination']==dfAport['IATA']], how='inner').withColumnRenamed('City', 'Dest_City').withColumnRenamed('State', 'Dest_State').drop('IATA')
if shows:
    jnd.show()

In [9]:
jnd.explain('simple')

== Physical Plan ==
*(3) Project [date#0, delay#1, distance#2, origin#3, destination#4, Origin_City#190, Origin_State#199, City#35 AS Dest_City#248, State#36 AS Dest_State#259]
+- *(3) BroadcastHashJoin [destination#4], [IATA#38], Inner, BuildRight
   :- *(3) Project [date#0, delay#1, distance#2, origin#3, destination#4, City#35 AS Origin_City#190, State#36 AS Origin_State#199]
   :  +- *(3) BroadcastHashJoin [origin#3], [IATA#38], Inner, BuildRight
   :     :- *(3) Filter (isnotnull(origin#3) AND isnotnull(destination#4))
   :     :  +- InMemoryTableScan [date#0, delay#1, distance#2, origin#3, destination#4], [isnotnull(origin#3), isnotnull(destination#4)]
   :     :        +- InMemoryRelation [date#0, delay#1, distance#2, origin#3, destination#4], StorageLevel(disk, memory, deserialized, 1 replicas)
   :     :              +- *(1) Filter (((((((isnotnull(origin#3) AND isnotnull(destination#4)) AND isnotnull(delay#1)) AND isnotnull(date#0)) AND (origin#3 = SEA)) AND (destination#4 = S

In [10]:
# outer join  broadcast is slightly faster
#mn = dfDelay.select(['origin', 'delay']).filter(col('delay')>0).groupBy(['origin']).mean().withColumnRenamed('origin','org')
#mx = dfDelay.select(['origin', 'delay']).filter(col('delay')>0).groupBy(['origin']).max()
#mnmx = mn.join(mx, on=[mn['org']==mx['origin']], how='inner').drop('org')

timA = dt.datetime.now()
mnmx = dfDelay.select(['origin', 'delay']).filter(col('delay')>0).groupBy(['origin']).agg(*[mean(col('delay')).alias('delay_mean'), max(col('delay')).alias('delay_max')])
#avgDelay = dfAport.join(mnmx, on=[dfAport['IATA']==mnmx['origin']], how='left_outer').drop('origin')
avgDelay = dfAport.join(broadcast(mnmx), on=[dfAport['IATA']==mnmx['origin']], how='left_outer').drop('origin')
if shows:
    avgDelay.show()
timB = dt.datetime.now()
print(timB-timA)

0:00:00.142621


In [11]:
avgDelay.explain('simple')

== Physical Plan ==
*(3) Project [City#35, State#36, Country#37, IATA#38, delay_mean#519, delay_max#521]
+- *(3) BroadcastHashJoin [IATA#38], [origin#3], LeftOuter, BuildRight
   :- InMemoryTableScan [City#35, State#36, Country#37, IATA#38]
   :     +- InMemoryRelation [City#35, State#36, Country#37, IATA#38], StorageLevel(disk, memory, deserialized, 1 replicas)
   :           +- FileScan csv [City#35,State#36,Country#37,IATA#38] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/howean/Documents/spark_learning/LearningSparkV2-master/databrick..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<City:string,State:string,Country:string,IATA:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true])), [id=#136]
      +- *(2) HashAggregate(keys=[origin#3], functions=[avg(cast(delay#1 as bigint)), max(delay#1)])
         +- Exchange hashpartitioning(origin#3, 200), true, [id=#132]
            +- *(1) HashAggrega

In [12]:
avgDelay.filter(col('delay_mean')>0).withColumn('Status', expr("CASE WHEN delay_mean <= 30 THEN 'On-time' ELSE 'Delayed' END")).show() # how to do this without the hard-coded string expression?

+-----------+-----+-------+----+------------------+---------+-------+
|       City|State|Country|IATA|        delay_mean|delay_max| Status|
+-----------+-----+-------+----+------------------+---------+-------+
|    Abilene|   TX|    USA| ABI| 50.04494382022472|      395|Delayed|
|      Akron|   OH|    USA| CAK| 42.40041067761807|      425|Delayed|
|     Albany|   GA|    USA| ABY| 37.09230769230769|      236|Delayed|
|     Albany|   NY|    USA| ALB| 36.91700680272109|      491|Delayed|
|Albuquerque|   NM|    USA| ABQ|  31.5553772070626|     1305|Delayed|
| Alexandria|   LA|    USA| AEX|50.634328358208954|      580|Delayed|
|  Allentown|   PA|    USA| ABE|        52.8828125|      333|Delayed|
|   Amarillo|   TX|    USA| AMA|38.015068493150686|      624|Delayed|
|  Anchorage|   AK|    USA| ANC|30.401273885350317|     1033|Delayed|
|   Appleton|   WI|    USA| ATW| 47.13942307692308|      522|Delayed|
|  Asheville|   NC|    USA| AVL|46.645348837209305|      332|Delayed|
|    Atlanta|   GA| 

In [13]:
# time if shows is false 0:00:42; time if shows is true 0:00.47
stp = dt.datetime.now()
print(stp)
print(stp-stt)

2021-03-29 12:45:27.028735
0:00:09.064761


In [14]:
sc.stop()