In [None]:
from pyspark import sql
from pyspark.sql import functions as f, udf
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

ModuleNotFoundError: ignored

In [None]:
!wget -q  https://s3.amazonaws.com/metcs777/flights.csv.bz2
    
!wget -q  https://s3.amazonaws.com/metcs777/airports.csv.bz2

In [None]:
sc = SparkContext(appName = "Text Cleaning")
strc = StreamingContext(sc, 3)

In [None]:
text_data = strc.socketTextStream("localhost", 8084)

In [None]:
sqlContext = sql.SparkSession.builder \
    .master("local") \
    .appName("Flight DF") \
    .getOrCreate()

flights = sqlContext.read.format('csv')\
    .options(header='true', inferSchema='true')\
    .load("flights.csv.bz2")

airport = sqlContext.read.format('csv')\
    .options(header='true', inferSchema='true')\
    .load("airports.csv.bz2")

In [None]:
flights.printSchema()
airport.printSchema()

In [None]:

flights.select("ORIGIN_AIRPORT").distinct().show()

In [None]:

flights.select("ORIGIN_AIRPORT", "DESTINATION_AIRPORT").distinct().show()

In [None]:

flights.where(flights.MONTH == 1)\
    .orderBy("DEPARTURE_DELAY", ascending=False)\
    .limit(1)\
    .select("ORIGIN_AIRPORT")\
    .show()

In [None]:

flights.filter("DAY_OF_WEEK = 6 OR DAY_OF_WEEK = 7" )\
    .orderBy("DEPARTURE_DELAY", ascending=False)\
    .limit(1)\
    .select("AIRLINE")\
    .show()

In [None]:

flights.filter("CANCELLED = 1")\
    .withColumn("COUNT", f.lit(1))\
    .groupBy("ORIGIN_AIRPORT")\
    .agg(f.sum("COUNT").alias("COUNT"))\
    .orderBy("COUNT", ascending=False)\
    .limit(1)\
    .select("ORIGIN_AIRPORT", "COUNT")\
    .show()

In [None]:

flights.withColumn("TOTAL", f.lit(1))\
    .groupBy("AIRLINE")\
    .agg(f.sum("CANCELLED").alias("TOTAL_CANCELLED"), f.sum("TOTAL").alias("TOTAL"))\
    .withColumn("CANCEL_RATE", f.col("TOTAL_CANCELLED")/f.col("TOTAL")*100)\
    .show()

In [None]:

flights.groupBy("AIRLINE")\
    .agg(f.max("DEPARTURE_DELAY").alias("MAX_DEPARTURE_DELAY"))\
    .show()

In [None]:

flights.groupBy("AIRLINE", "MONTH")\
    .agg(f.max("DEPARTURE_DELAY").alias("MAX_DEPARTURE_DELAY"))\
    .show()

In [None]:

flights.withColumn("TOTAL", f.lit(1))\
    .groupBy("AIRLINE")\
    .agg(f.sum("DEPARTURE_DELAY").alias("TOTAL_DEPARTURE_DELAY"), f.sum("TOTAL").alias("TOTAL"))\
    .withColumn("AVG_DEPARTURE_DELAY", f.col("TOTAL_DEPARTURE_DELAY")/f.col("TOTAL"))\
    .show()

In [None]:

flights.withColumn("TOTAL", f.lit(1))\
    .groupBy("AIRLINE","MONTH")\
    .agg(f.sum("DEPARTURE_DELAY").alias("TOTAL_DEPARTURE_DELAY"), f.sum("TOTAL").alias("TOTAL"))\
    .withColumn("AVG_DEPARTURE_DELAY", f.col("TOTAL_DEPARTURE_DELAY")/f.col("TOTAL"))\
    .select("AIRLINE", "MONTH", "AVG_DEPARTURE_DELAY")\
    .show()

In [None]:


flights.withColumn("TOTAL", f.lit(1))\
    .groupBy("YEAR","MONTH","DAY")\
    .agg(f.sum("CANCELLED").alias("TOTAL_CANCELLED"), f.sum("TOTAL").alias("TOTAL"))\
    .withColumn("CANCEL_RATE", f.col("TOTAL_CANCELLED")/f.col("TOTAL")*100)\
    .orderBy("CANCEL_RATE", ascending=False)\
    .limit(1)\
    .select("YEAR","MONTH","DAY")\
    .show()


In [None]:

from pyspark.sql.types import ArrayType, IntegerType, StringType
from pyspark.sql.functions import udf
fold_list = udf(lambda x,y: sorted(zip(x,y))[-1][1],StringType())
flights.withColumn("COUNT", f.lit(1))\
    .groupBy("AIRLINE", "DESTINATION_AIRPORT")\
    .agg(f.sum("DEPARTURE_DELAY").alias("TOTAL_DEPARTURE_DELAY"),f.sum("COUNT").alias("COUNT"))\
    .withColumn("DEPARTURE_AVG_DELAY", f.col("TOTAL_DEPARTURE_DELAY")/f.col("COUNT"))\
    .join(airport, flights.DESTINATION_AIRPORT == airport.IATA_CODE)\
    .select("AIRLINE", "TOTAL_DEPARTURE_DELAY", "STATE")\
    .groupBy("AIRLINE")\
    .agg(
        f.collect_list("TOTAL_DEPARTURE_DELAY").alias("delay"),
        f.collect_list("STATE").alias("state")
    )\
    .withColumn("MAX_AVGDELAY_STATE", fold_list(f.col("delay"), f.col("state")))\
    .select("AIRLINE", "MAX_AVGDELAY_STATE")\
    .show()


In [None]:
import re
from nltk.corpus import stopwords
stop_words = set(stopwords.words('english'))
from nltk.stem import WordNetLemmatizer
lemmatizer = WordNetLemmatizer()
def clean_text(sentence):
    sentence = sentence.lower()
    sentence = re.sub("s+"," ", sentence)
    sentence = re.sub("W"," ", sentence)
    sentence = re.sub(r"httpS+", "", sentence)
    sentence = ' '.join(word for word in sentence.split() if word not in stop_words)
    sentence = [lemmatizer.lemmatize(token, "v") for token in sentence.split()]
    sentence = " ".join(sentence)
    return sentence.strip()

In [None]:
strc.start()
strc.awaitTermination()