In [0]:
from pyspark.sql import functions as F

In [0]:
dbfs_fileStore_prefix = "/FileStore/tables"
prefix = "ontimeperformance"
size = "small"

In [0]:
flights = f"{dbfs_fileStore_prefix}/{prefix}_flights_{size}.csv"
aircrafts = f"{dbfs_fileStore_prefix}/{prefix}_aircrafts.csv"
airlines = f"{dbfs_fileStore_prefix}/{prefix}_airlines.csv"

In [0]:
flights_df = spark.read.csv(flights, inferSchema=True, header=True).cache()
aircrafts_df = spark.read.csv(aircrafts, inferSchema=True, header=True).cache()
airlines_df = spark.read.csv(airlines, inferSchema=True, header=True).cache()

In [0]:
flights_df.head(5)

In [0]:
aircrafts_df.tail(5)

In [0]:
aircrafts_df.filter(aircrafts_df['manufacturer'] == "CESSNA").show()

In [0]:
aircrafts_df.printSchema()

In [0]:
aircrafts_df.filter(aircrafts_df["manufacturer"] == "CESSNA") \
  .select("manufacturer", "model", "type", "engine_type", "tailnum", "aircraft_type") \
  .show()

In [0]:
aircrafts_df.filter(aircrafts_df["manufacturer"] == "CESSNA") \
  .groupBy("model") \
  .count() \
  .orderBy("count", ascending=False) \
  .show()

In [0]:
from sparkmeasure import StageMetrics
stagemetrics = StageMetrics(spark)

In [0]:
import re

def trim(df):
  return df.toDF(*[re.sub('^[ \t]+|[ \t]+$', '', x) for x in df.columns])

In [0]:
def top_cessna_models(spark_session, flights_path, aircrafts_path):
    aircrafts_df = (
        spark_session.read.csv(aircrafts_path, inferSchema=True, header=True)
        .select(F.col('tailnum').alias('tail_number'), F.col('manufacturer'), F.col('model'))
    )

    flights = spark_session.read.csv(flights_path, inferSchema=True, header=True)
    flights_df = flights.toDF(*[re.sub('^[ \t]+|[ \t]+$', '', x) for x in flights.columns])

    cessna_models = aircrafts_df.filter(
        F.col('manufacturer') == "CESSNA"
    ).withColumn('model', F.regexp_extract(F.col('model'), "\d{3}", 0))

    cessna_flights_count = (
        F.broadcast(cessna_models).join(
            flights_df,
            on="tail_number",
            how="left"
        )
        .groupBy('model')
        .count()
        .orderBy('count', ascending=False)
    )

    models = cessna_flights_count.take(3)

    for (model, count) in models:
        print("Cessna %s\t%i" % (model, count), file=sys.stdout)


In [0]:
def top_cessna_models(spark_session, flights_df, aircrafts_df):
  aircrafts_df = aircrafts_df \
    .select(F.col('tailnum').alias('tail_number'), F.col('manufacturer'), F.col('model'))
  flights_df = trim(flights_df)
  
  cessna_models = aircrafts_df.filter(
    F.col('manufacturer') == "CESSNA"
  ).withColumn('model', F.regexp_extract(F.col('model'), "\d{3}", 0))
  
  cessna_flights_count = (
    F.broadcast(cessna_models).join(
      flights_df,
      on="tail_number",
      how="left"
    )
    .groupBy('model')
    .count()
    .orderBy('count', ascending=False)
  )
  
  models = cessna_flights_count.take(3)
  
  for (model, count) in models:
    print("Cessna %s\t%i" % (model, count))

In [0]:
stagemetrics.begin()
top_cessna_models(spark, flights_df, aircrafts_df)
stagemetrics.end()

In [0]:
stagemetrics.print_report()