In [0]:

import time
# Pfad zum Verzeichnis, das die Parquet-Dateien enthält
pfad = "/Volumes/workspace/default/flight_data/flight_parquet"

# Lese alle Parquet-Dateien aus dem Verzeichnis und wähle nur bestimmte Spalten aus
spalten = [
    "Year", "Month", "DayofMonth", "DayOfWeek", "FlightDate",
    "Reporting_Airline", "Origin", "Dest", "CRSDepTime", "DepTime",
    "DepDelay", "DepDelayMinutes", "DepDel15", "DepartureDelayGroups",
    "CRSArrTime", "ArrTime", "ArrDelay", "ArrDelayMinutes", "ArrDel15",
    "Cancelled", "CancellationCode", "Diverted", "CRSElapsedTime",
    "ActualElapsedTime", "AirTime", "Distance"
]
t0 = time.time()
df = spark.read.parquet(pfad).select(spalten)

df.count()
t1 = time.time()
print(f"Zeit zum Laden der Daten: {t1 - t0} Sekunden")
tpq = t1 - t0



Zeit zum Laden der Daten: 1.6889567375183105 Sekunden


In [0]:
#Durchschnittliche Abflugverspätung nach Fluggesellschaft
t0 = time.time()
ergebnis1 = spark.sql("""
     SELECT
      Reporting_Airline,
      COUNT(*) AS num_flights,
      COUNT(CASE WHEN DepDelayMinutes > 0.0 THEN 1 END) AS num_flights_with_delay,
      AVG(DepDelayMinutes) AS avg_dep_delay,
      percentile_approx(DepDelayMinutes, 0.90) as p90_dep_delay
    FROM flights
    GROUP BY Reporting_Airline
    ORDER BY avg_dep_delay DESC
""")
display(ergebnis1.toPandas())
t1 = time.time()
print(f"Zeit zum Ausführen der SQL-Abfrage 1: {t1 - t0} Sekunden")
tsql1 = t1 - t0

Reporting_Airline,num_flights,num_flights_with_delay,avg_dep_delay,p90_dep_delay
B6,273058,128314,26.434079299440526,82.0
F9,155482,72339,24.39428242517268,71.0
G4,116877,53631,22.275866348702586,63.0
YV,114779,35642,19.14711391265196,52.0
AA,874145,327182,18.003319316348044,49.0
NK,232400,90219,17.150878064135703,51.0
WN,1307149,713906,15.842448900850414,45.0
UA,626473,242658,15.34419398907104,42.0
OH,214450,61345,14.308438749615505,40.0
OO,733576,194764,14.177725161238389,33.0


Zeit zum Ausführen der SQL-Abfrage 1: 1.654423475265503 Sekunden


In [0]:
# Durchschnittliche Ankunftsverspätung nach Zielort
%%time
t0 = time.time()
ergebnis2 = spark.sql("""
     SELECT
      Dest,
      COUNT(*) AS num_flights,
      COUNT(CASE WHEN ArrDelayMinutes > 0.0 THEN 1 END) AS num_flights_with_delay,
      AVG(ArrDelayMinutes) as avg_arr_delay,
      percentile_approx(ArrDelayMinutes, 0.90) as p90_arr_delay
    FROM flights
    GROUP BY Dest
    HAVING num_flights > 100
    ORDER BY avg_arr_delay DESC
""")
display(ergebnis2.toPandas())
t1 = time.time()
print(f"Zeit zum Ausführen der SQL-Abfrage 2: {t1 - t0} Sekunden")
tsql2 = t1 - t0


Dest,num_flights,num_flights_with_delay,avg_arr_delay,p90_arr_delay
PGD,6152,3863,36.48650970957209,98.0
BQN,2398,1361,31.03270223752152,92.0
BIH,325,149,30.43312101910828,90.0
ASE,6464,2815,28.33727101523712,78.0
MQT,1100,386,27.72944550669216,54.0
DIK,116,18,27.471910112359552,32.0
SFB,9166,5049,27.38532528198701,73.0
PIE,7829,4165,26.67686577091925,76.0
SHD,606,145,25.903878583473865,63.0
PSE,863,416,25.74327628361858,85.0


Zeit zum Ausführen der SQL-Abfrage 2: 1.5228054523468018 Sekunden
CPU times: user 41.3 ms, sys: 13.1 ms, total: 54.4 ms
Wall time: 1.52 s


In [0]:
# Anzahl der Flüge nach Wochentag
t0 = time.time()
ergebnis3 = spark.sql("""
   SELECT
      CASE DayOfWeek
        WHEN 1 THEN 'Monday'
        WHEN 2 THEN 'Tuesday'
        WHEN 3 THEN 'Wednesday'
        WHEN 4 THEN 'Thursday'
        WHEN 5 THEN 'Friday'
        WHEN 6 THEN 'Saturday'
        WHEN 7 THEN 'Sunday'
      END AS weekday,
      COUNT(*) as flight_count
    FROM flights
    GROUP BY DayOfWeek
    ORDER BY DayOfWeek ASC
""")
display(ergebnis3.toPandas())
t1 = time.time()
print(f"Zeit zum Ausführen der SQL-Abfrage 3: {t1 - t0} Sekunden")
tsql3 = t1 - t0

weekday,flight_count
Monday,997219
Tuesday,934061
Wednesday,946037
Thursday,995372
Friday,1001705
Saturday,876949
Sunday,977782


Zeit zum Ausführen der SQL-Abfrage 3: 1.3133609294891357 Sekunden


In [0]:
# Durchschnittliche Flugdauer nach Fluggesellschaft
t0 = time.time()
ergebnis4 = spark.sql("""
   SELECT
      Reporting_Airline,
      AVG(AirTime) as avg_air_time,
      percentile_approx(AirTime, 0.90) as p90_air_time
    FROM flights
    GROUP BY Reporting_Airline
    ORDER BY avg_air_time DESC
""")
display(ergebnis4.toPandas())
t1 = time.time()
print(f"Zeit zum Ausführen der SQL-Abfrage 2: {t1 - t0} Sekunden")
tsql4 = t1 - t0

Reporting_Airline,avg_air_time,p90_air_time
AS,176.91235309786853,319.0
B6,153.86072816979936,294.0
UA,152.2819610139823,277.0
NK,135.15652884923034,217.0
AA,132.7626466273493,239.0
F9,131.75881283748822,210.0
DL,125.03134889511075,236.0
HA,121.80231731005992,324.0
G4,121.71262425977628,172.0
WN,99.8619083657919,170.0


Zeit zum Ausführen der SQL-Abfrage 2: 1.375037431716919 Sekunden


In [0]:
# Anzahl der annullierten Flüge nach Grund
t0 = time.time()
ergebnis5 = spark.sql("""
     SELECT
      CancellationCode,
      COUNT(*) as cancelled_flights
    FROM flights
    WHERE Cancelled = "1.00"
    GROUP BY CancellationCode
    ORDER BY cancelled_flights DESC
""")
display(ergebnis5.toPandas())
t1 = time.time()
print(f"Zeit zum Ausführen der SQL-Abfrage 2: {t1 - t0} Sekunden")
tsql5 = t1 - t0

CancellationCode,cancelled_flights
B,93013
A,71780
C,15469
D,1024


Zeit zum Ausführen der SQL-Abfrage 2: 1.2041640281677246 Sekunden


In [0]:
pfad_csv = "/Volumes/workspace/default/flight_data/"

df_airport = spark.read.csv(f"{pfad_csv}airport_lookup.csv", header=True, inferSchema=True)
df_carrier = spark.read.csv(f"{pfad_csv}carrier_lookup.csv", header=True, inferSchema=True)

display(df_airport.limit(5).toPandas())
display(df_carrier.limit(5).toPandas())
df_airport.createOrReplaceTempView("airport")
df_carrier.createOrReplaceTempView("carrier")

AIRPORT_SEQ_ID,AIRPORT_ID,AIRPORT,DISPLAY_AIRPORT_NAME,DISPLAY_AIRPORT_CITY_NAME_FULL,AIRPORT_COUNTRY_NAME,AIRPORT_COUNTRY_CODE_ISO,AIRPORT_STATE_NAME,AIRPORT_STATE_CODE,AIRPORT_STATE_FIPS,CITY_MARKET_ID,DISPLAY_CITY_MARKET_NAME_FULL,CITY_MARKET_WAC,LAT_DEGREES,LATITUDE,LON_DEGREES,LONGITUDE
1000101,10001,01A,Afognak Lake Airport,"Afognak Lake, AK",United States,US,Alaska,AK,2,30001,"Afognak Lake, AK",1,58,58.10944444,152,-152.90666667
1000301,10003,03A,Bear Creek Mining Strip,"Granite Mountain, AK",United States,US,Alaska,AK,2,30003,"Granite Mountain, AK",1,65,65.54805556,161,-161.07166667
1000401,10004,04A,Lik Mining Camp,"Lik, AK",United States,US,Alaska,AK,2,30004,"Lik, AK",1,68,68.08333333,163,-163.16666667
1000501,10005,05A,Little Squaw Airport,"Little Squaw, AK",United States,US,Alaska,AK,2,30005,"Little Squaw, AK",1,67,67.57,148,-148.18388889
1000601,10006,06A,Kizhuyak Bay,"Kizhuyak, AK",United States,US,Alaska,AK,2,30006,"Kizhuyak, AK",1,57,57.74527778,152,-152.88277778


Code,Description
02Q,Titan Airways
04Q,Tradewind Aviation
05Q,"Comlux Aviation, AG"
06Q,Master Top Linhas Aereas Ltd.
07Q,Flair Airlines Ltd.


In [0]:
# Durchschnittliche Abflugverspätung nach Zielort(Airport State)
t0 = time.time()
ergebnis6 = spark.sql("""
SELECT
      a.AIRPORT_STATE_NAME,
      COUNT(DISTINCT Dest) AS num_airports,
      COUNT(*) AS num_flights,
      COUNT(CASE WHEN ArrDelayMinutes > 0.0 THEN 1 END) AS num_flights_with_delay,
      AVG(ArrDelayMinutes) as avg_arr_delay
    FROM flights f
    JOIN airport a
    ON f.Dest = a.AIRPORT
    WHERE a.AIRPORT_STATE_NAME IS NOT NULL
    GROUP BY a.AIRPORT_STATE_NAME
    ORDER BY avg_arr_delay DESC
""")
display(ergebnis6.toPandas())
t1 = time.time()
tsql6 = t1 - t0
print(f"Zeit zum Ausführen der SQL-Abfrage 6: {t1 - t0} Sekunden")

AIRPORT_STATE_NAME,num_airports,num_flights,num_flights_with_delay,avg_arr_delay
Delaware,1,94,54,29.61904761904762
Puerto Rico,3,156400,71520,21.258731711159392
New Jersey,3,277224,114574,20.92439697861756
Florida,19,1816917,766891,19.410209221066257
New York,15,1674915,633976,19.15923752617429
Massachusetts,5,292608,106655,18.19946537903846
Connecticut,1,164402,67389,17.54810829879178
Maine,2,63778,23773,16.876499436675726
Indiana,4,238938,91024,16.597709296797337
Pennsylvania,9,578572,202171,16.52515629174459


Zeit zum Ausführen der SQL-Abfrage 6: 1.9930164813995361 Sekunden


In [0]:
t0 = time.time()
ergebnis7 = spark.sql("""
    SELECT airport_orig.AIRPORT as Origin, airport_dest.AIRPORT as Dest, AVG(flights.ArrDelay) as avg_arr_delay
    FROM flights
    inner join airport as airport_orig
    on flights.Origin = airport_orig.AIRPORT
    inner join airport as airport_dest
    on flights.Dest = airport_dest.AIRPORT
    INNER JOIN carrier
    on flights.Reporting_Airline = carrier.Code
    WHERE airport_orig.AIRPORT_STATE_CODE = 'TX'
    AND airport_dest.AIRPORT_STATE_CODE <> 'TX'
    GROUP BY ALL
""")
display(ergebnis7.toPandas())
t1 = time.time()
tsql7 = t1 - t0
print(f"Zeit zum Ausführen der SQL-Abfrage 7: {t1 - t0} Sekunden")

Origin,Dest,avg_arr_delay
IAH,PHX,2.670653566851773
MAF,PHX,12.505714285714284
DAL,VPS,5.463806970509383
IAH,LAX,1.5866388308977035
IAH,SFO,-1.5690843452576304
DFW,AVL,3.209621993127148
IAH,BNA,3.085675119945168
DAL,BHM,15.506607929515418
HOU,ONT,-1.6610169491525424
AMA,DEN,7.272912423625255


Zeit zum Ausführen der SQL-Abfrage 7: 1.7015001773834229 Sekunden


In [0]:
import pandas as pd

time_data = {
    "Operation": ["Load Parquet files", "Query 1", "Query 2", "Query 3", "Query 4", "Query 5", "Query 6", "Query 7"],
    "Elapsed Time (s)": [tpq, tsql1, tsql2, tsql3, tsql4, tsql5, tsql6, tsql7]
}

time_df = pd.DataFrame(time_data)
display(time_df)

Operation,Elapsed Time (s)
Load Parquet files,1.6889567375183103
Query 1,1.654423475265503
Query 2,1.5228054523468018
Query 3,1.3133609294891355
Query 4,1.375037431716919
Query 5,1.2041640281677246
Query 6,1.993016481399536
Query 7,1.7015001773834229
