In [25]:
# !pip install findspark
import findspark
findspark.init() 

from pyspark.sql.session import SparkSession
from pyspark.sql.functions import explode, split, col, from_json, window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, TimestampType
import kafka

from time import sleep

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 pyspark-shell'

os.environ['PYSPARK_PYTHON']='/Users/aptroost/.pyenv/versions/3.6.13/bin/python'
os.environ['PYSPARK_DRIVER_PYTHON']='/Users/aptroost/.pyenv/versions/3.6.13/bin/python'

os.environ['SPARK_HOME']='/usr/local/Cellar/apache-spark/3.1.1/libexec'
os.environ['PYTHONPATH']='/usr/local/Cellar/apache-spark/3.1.1/libexec/python'


In [26]:
# Start session with Spark
spark = SparkSession \
    .builder \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1')\
    .appName("routeData") \
    .getOrCreate()

In [27]:
# Read the data from kafka
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "routes_data") \
    .option("startingOffsets", "earliest") \
    .load()

In [28]:
# Print out the dataframa schema
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [29]:
# Convert the datatype for value to a string
string_df = df.selectExpr("CAST(value AS STRING)")

In [30]:
# Print out the new dataframa schema
string_df.printSchema()

root
 |-- value: string (nullable = true)



In [31]:
# Create a schema for the df
schema = StructType([
    StructField("id", StringType()),
    StructField("datetime", TimestampType()),
    StructField("airline", StringType()),
    StructField("airline_id", IntegerType()),
    StructField("source_airport", StringType()),
    StructField("source_airport_id", IntegerType()),
    StructField("destination_airport", StringType()),
    StructField("destination_airport_id", IntegerType()),
    StructField("codeshare", BooleanType()),
    StructField("stops", IntegerType()),
    StructField("equipment", StringType())
])

In [32]:
# Select the data present in the column value and apply the schema on it
json_df = string_df \
    .withColumn("jsonData", 
                from_json(col("value"), 
                          schema)) \
    .select("jsondata.*")

In [33]:
# # Print out the dataframa schema
json_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- datetime: timestamp (nullable = true)
 |-- airline: string (nullable = true)
 |-- airline_id: integer (nullable = true)
 |-- source_airport: string (nullable = true)
 |-- source_airport_id: integer (nullable = true)
 |-- destination_airport: string (nullable = true)
 |-- destination_airport_id: integer (nullable = true)
 |-- codeshare: boolean (nullable = true)
 |-- stops: integer (nullable = true)
 |-- equipment: string (nullable = true)



In [138]:
def run_query(q, name, outputMode, outputFormat):
    from IPython.display import display, clear_output
    import pyspark.sql.functions as F
    import pyspark.sql.types as T
    import uuid
    from pyspark.sql.functions import desc

    unique_query_name = str(uuid.uuid4()).replace('-','')
    query = q \
        .writeStream \
        .outputMode(outputMode) \
        .format(outputFormat) \
        .queryName(unique_query_name) \
        .start()

    while True:
        sleep(1)
        clear_output(wait=True)
        display(query.status)

        spark_query = spark.sql('SELECT * FROM ' + unique_query_name)    
        df_pandas = spark_query.toPandas()
        df_pandas.to_csv(name + '.csv', index=False)
        
        if len(df_pandas.index) > 0: 
            return df_pandas

In [139]:
q = json_df.select("*")

display(run_query(q, "dump_table", "update", "memory"))

{'message': 'Getting offsets from KafkaV2[Subscribe[routes_data]]',
 'isDataAvailable': False,
 'isTriggerActive': True}

Unnamed: 0,id,datetime,airline,airline_id,source_airport,source_airport_id,destination_airport,destination_airport_id,codeshare,stops,equipment
0,38cd7668-cf94-4f81-90ea-58890fcc4665,2021-05-02 13:39:11.629770,2B,410,AER,2965,KZN,2990,False,0,CR2
1,786f4614-26ca-48c3-ac10-9ccf1d4619de,2021-05-02 13:39:11.688416,2B,410,ASF,2966,KZN,2990,False,0,CR2
2,712760c7-ff18-4f2d-ac1a-367af194e890,2021-05-02 13:39:11.689681,2B,410,ASF,2966,MRV,2962,False,0,CR2
3,f6f20a5b-5786-4482-80ea-37f812a3b952,2021-05-02 13:39:11.691060,2B,410,CEK,2968,KZN,2990,False,0,CR2
4,84e3e872-0f31-49ec-ad07-715291e1b0f8,2021-05-02 13:39:11.691636,2B,410,CEK,2968,OVB,4078,False,0,CR2
...,...,...,...,...,...,...,...,...,...,...,...
133525,3b03e32f-e284-4d47-8a5d-ffeb532a0cec,2021-05-02 13:53:46.017126,ZL,4178,WYA,6334,ADL,3341,False,0,SF3
133526,232f6875-a1ea-4a5a-8a72-3884249846dd,2021-05-02 13:53:46.017438,ZM,19016,DME,4029,FRU,2912,False,0,734
133527,3f42bc6c-231f-4f11-ada2-818f80e2af04,2021-05-02 13:53:46.017748,ZM,19016,FRU,2912,DME,4029,False,0,734
133528,c9c6cfaa-b3f7-4001-aafe-6eca9a7d6d22,2021-05-02 13:53:46.018306,ZM,19016,FRU,2912,OSS,2913,False,0,734


In [140]:
q = json_df.select("*")\
        .groupBy(
            json_df.source_airport
        )\
        .count()\
        .orderBy(
            col('count').desc()
        )\
        .limit(10)

display(run_query(q, "top10_source_airport", "complete", "memory"))

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

Unnamed: 0,source_airport,count
0,ATL,1830
1,ORD,1116
2,PEK,1070
3,LHR,1050
4,CDG,1048
5,FRA,994
6,LAX,984
7,DFW,938
8,JFK,912
9,AMS,906


In [143]:
q = json_df.select("*")\
        .groupBy(
            window(
                json_df.datetime, 
                "30 seconds", 
                "30 seconds"),
            json_df.equipment
        )\
        .count()\
        .orderBy(
            col('count').desc(), 
            col('equipment').desc()
        )\
        .limit(10)

df_pandas = run_query(q, "top10_equipment_30min_window", "complete", "memory")
for idx, row in df_pandas.iterrows():
    df_pandas.at[idx, 'start'] = row['window'][0]
    df_pandas.at[idx, 'end'] = row['window'][1]
df_pandas.pop("window")
df_pandas['start'] = df_pandas['start'].dt.strftime('%Y-%m-%d %H:%M:%S')
df_pandas['end'] = df_pandas['end'].dt.strftime('%Y-%m-%d %H:%M:%S')

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

0    (2021-05-02 13:39:30, 2021-05-02 13:40:00)
1    (2021-05-02 13:39:30, 2021-05-02 13:40:00)
2    (2021-05-02 13:40:00, 2021-05-02 13:40:30)
3    (2021-05-02 13:52:00, 2021-05-02 13:52:30)
4    (2021-05-02 13:52:30, 2021-05-02 13:53:00)
5    (2021-05-02 13:39:00, 2021-05-02 13:39:30)
6    (2021-05-02 13:52:30, 2021-05-02 13:53:00)
7    (2021-05-02 13:53:00, 2021-05-02 13:53:30)
8    (2021-05-02 13:51:30, 2021-05-02 13:52:00)
9    (2021-05-02 13:53:30, 2021-05-02 13:54:00)
Name: window, dtype: object

In [155]:
df_pandas.to_json(orient='records')

'[{"equipment":"738","count":4338,"start":"2021-05-02 13:39:30","end":"2021-05-02 13:40:00"},{"equipment":"320","count":3763,"start":"2021-05-02 13:39:30","end":"2021-05-02 13:40:00"},{"equipment":"320","count":3054,"start":"2021-05-02 13:40:00","end":"2021-05-02 13:40:30"},{"equipment":"738","count":2748,"start":"2021-05-02 13:52:00","end":"2021-05-02 13:52:30"},{"equipment":"320","count":2694,"start":"2021-05-02 13:52:30","end":"2021-05-02 13:53:00"},{"equipment":"320","count":2298,"start":"2021-05-02 13:39:00","end":"2021-05-02 13:39:30"},{"equipment":"738","count":2105,"start":"2021-05-02 13:52:30","end":"2021-05-02 13:53:00"},{"equipment":"320","count":1799,"start":"2021-05-02 13:53:00","end":"2021-05-02 13:53:30"},{"equipment":"320","count":1555,"start":"2021-05-02 13:51:30","end":"2021-05-02 13:52:00"},{"equipment":"320","count":1546,"start":"2021-05-02 13:53:30","end":"2021-05-02 13:54:00"}]'