In [1]:
# pip install pyspark # for creating results from datasets
# pip install pymysql # for creating connection to DB
# pip install pandas # for creating Pandas DF
# pip install matplotlib # for plotting purpose

Import Python Packages

In [2]:
import pymysql
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
from sqlalchemy import create_engine

Create SQL Engine from sqlalchemy

In [3]:
sqlEngine=create_engine('mysql+mysqlconnector://root:123456@localhost:3306/streamdb', pool_recycle=3600)

Create DB Connection

In [4]:
dbConnection=sqlEngine.connect()

Read MySql Tables [Transactions, Locations]

In [5]:
locations= pd.read_sql("select * from streamdb.locations", dbConnection)
transactions=pd.read_sql("select * from streamdb.transactions", dbConnection)
#print(transactions)
#print(locations)

Import PySpark Packages

In [6]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,desc,count,expr,isnan

Create Spark Session

In [7]:
spark = SparkSession.builder.master("local[*]") \
                    .appName('Query-Visualization') \
                    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

22/12/04 14:17:45 WARN Utils: Your hostname, Anurags-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.2.15 instead (on interface en0)
22/12/04 14:17:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/04 14:17:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/12/04 14:17:47 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Transactions Spark DataFrame

In [8]:
transactionsDF = spark.createDataFrame(transactions)

In [9]:
transactionsDF.columns

['UniqueId',
 'TransactionDateUTC',
 'Itinerary',
 'OriginAirportCode',
 'DestinationAirportCode',
 'OneWayOrReturn',
 'DepartureAirportCode',
 'ArrivalAirportCode',
 'SegmentNumber',
 'LegNumber',
 'NumberOfPassengers']

Locations Spark DataFrame

In [10]:
locationsDF = spark.createDataFrame(locations)

In [11]:
locationsDF.columns

['AirportCode', 'CountryName', 'Region']

Query1 Code - Country with Most Transactions Originating

In [12]:
Q1_trxDF = transactionsDF.select("UniqueId","OriginAirportCode").distinct()
Q2_locDF = locationsDF.select("AirportCode","CountryName")
GraphQuery1 = Q1_trxDF \
    .join(Q2_locDF, Q1_trxDF.OriginAirportCode == Q2_locDF.AirportCode) \
    .withColumnRenamed("CountryName","Country") \
    .groupBy("Country") \
    .agg(count("UniqueId").alias("Transactions")) \
    .orderBy(desc("transactions")) \
    .limit(10)

NameError: name 'locDF' is not defined

Query1 Show Results

In [None]:
Query1.show()

Query1 Plot - Country with Most Transactions Originating

In [None]:
Query1.toPandas().plot(kind='barh',x='Country',y='Transactions')

Query2 Code - Split between International and Domestic Flights

In [None]:
Q2_trxDF = transactionsDF \
           .select("UniqueId","Itinerary","OriginAirportCode","DestinationAirportCode") \
           .distinct()
Q2_locDF = locationsDF \
           .select("AirportCode","CountryName")

OriginDF = Q2_trxDF \
           .join(Q2_locDF,
                 Q2_trxDF.OriginAirportCode==Q2_locDF.AirportCode) \
           .selectExpr("UniqueId","Itinerary","OriginAirportCode","CountryName as OriginCountry")

DestDF = Q2_trxDF \
           .join(Q2_locDF,
                 Q2_trxDF.DestinationAirportCode==Q2_locDF.AirportCode) \
           .selectExpr("UniqueId","DestinationAirportCode","CountryName as DestinationCountry")

Query2 = OriginDF \
         .join(DestDF,["UniqueId"]) \
         .select("UniqueId","Itinerary","OriginAirportCode","DestinationAirportCode","OriginCountry","DestinationCountry") \
         .withColumn("FlightType",
         expr("CASE WHEN OriginCountry == DestinationCountry then 'Domestic' ELSE 'International' END"))

GraphQuery2 = Query2.select("UniqueId","FlightType").groupBy("FlightType").agg(count("UniqueId").alias("Flights"))

Query2 Show Results

In [None]:
GraphQuery2.show()

Query2 Plot - Split between International and Domestic Flights

In [None]:
GraphQuery2.toPandas().plot(kind='barh',x='FlightType',y='Flights')

Query3 Code - Distribution of No. of Segments in Transactions

In [None]:
GraphQuery3 = transactionsDF \
.select("UniqueId","Itinerary",'SegmentNumber') \
.filter(~isnan("SegmentNumber")) \
.groupBy("SegmentNumber") \
.agg(count("UniqueId").alias("Transactions")) \
.orderBy("SegmentNumber")

Query3 Show Results

In [None]:
GraphQuery3.show()

Query3 Plot - Distribution of No. of Segments in Transactions

In [None]:
Segments = ['Segment Number 1.0', 'Segment Number 2.0']
data = GraphQuery3.select("Transactions").rdd.flatMap(lambda x: x).collect()
fig = plt.figure(figsize =(4, 4))
plt.pie(data, labels = Segments)
plt.show()

Close mySQL Connection

In [None]:
dbConnection.close()