# Initialize SparkSession

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.getOrCreate()
spark

In [None]:
#helper function
import pandas as pd
def show(df, no=10):  
    return pd.DataFrame(df.take(no), columns=df.columns) 

# Read the dataset

In [None]:
df = spark.read.csv("data/Airports2.csv", header=True, inferSchema=False)

In [None]:
df.columns

In [None]:
show(df)

## Save flights as one file per month

In [None]:
from pyspark.sql.functions import year, month, dayofmonth
from pyspark.sql import functions as sf
df = df.withColumn('month', sf.concat(year('Fly_date'), sf.lit('_'), month('Fly_date')))
show(df)

In [None]:
show(df.groupBy("month").count())

In [None]:
#let's look what's inside one month
df1 = df.where("month = '1999_10'")
show(df1)

In [None]:
months = df.select("month").distinct().collect()

In [None]:
#Process quite slow, so take only 2 of them

for month in months[:2]:
    df2 = df.where(f"month = '{month[0]}'")
    #by adding coalesce(1) we save the dataframe to one file
    df2.coalesce(1).write.mode("append").option("header", "true").csv("data/traffic_per_months")

In [None]:
!cd data/traffic_per_months/ && ls

In [None]:
# Let's analyze what's in one file
# Change file name
part = spark.read.csv(
    "data/traffic_per_months/part-00000-6b669981-aa11-40fd-9074-972459682917-c000.csv",
    header=True,
    inferSchema=True,
)

In [None]:
part.groupBy("month").count().show()

# Create a stream of events

In [None]:
#Keep the same dataSchema
dataSchema = part.schema
part.printSchema()

Let's define our Straming 

- *maxFilesPerTrigger* allows you to control how quickly Spark will read all of the files in the folder. 

In [None]:
streaming = (
    spark.readStream.schema(dataSchema)
    .option("maxFilesPerTrigger", 1)
    .csv("data/traffic_per_months/")
)

Let's set up a transformation.

In [None]:
airports_count = streaming.withColumn('Origin_city', F.concat('Origin_city', F.lit(', US'))) \
    .groupBy(["Origin_city", "month"]).count().orderBy(F.desc("count"))

Now that we have our transformation, we need to specify an output sink for the results. We're going to write to a memory sink which keeps the results in memory.

We also need to define how Spark will output that data. We'll use the complete output mode (rewriting all of the keys along with their counts after every trigger).

We won't include activityQuery.awaitTermination() because it is required only to prevent the driver process from terminating when the stream is active.


In [None]:
activityQuery = (
    airports_count.writeStream.queryName("aiports")
    .format("memory")
    .outputMode("complete")
    .start()
)

# include this in production
# activityQuery.awaitTermination()

In [None]:
#Follow http://localhost:4040/StreamingQuery
import time

for x in range(25):
    df3 = spark.sql(
        "SELECT * FROM aiports where count > 2"
    )
    if df3.count() > 0:
        df3.show(10)
    time.sleep(0.5)

Check if stream is active:

In [None]:
spark.streams.active[0].isActive

In [None]:
activityQuery.status

If we  want to turn off the stream we'll run activityQuery.stop() to reset the query for testing purposes.

In [None]:
activityQuery.stop()

# Individual work

1. Process at least 10 months worth of data (to have more files in the traffic_per_months folder)

2. Using a new stream that processes 2 files at a time, implement the same functionality but for Destination_city