In [9]:
import pyspark
import time
from pyspark.streaming import StreamingContext
from socket import *
from threading import Thread
from opensky_api import OpenSkyApi
import json
from pyspark.sql import Row, SparkSession
import matplotlib.pyplot as plt
from IPython import display

sc = pyspark.SparkContext("local[4]", "app")

## Spark SQL functions

In [10]:
def getSparkSessionInstance(sparkConf):
    if ('sparkSessionSingletonInstance' not in globals()):
        globals()['sparkSessionSingletonInstance'] = SparkSession\
            .builder\
            .config(conf=sparkConf)\
            .getOrCreate()
    return globals()['sparkSessionSingletonInstance']
        
def process_airlines(time, rdd):
    #print("========= %s =========" % str(time))
    try:
        spark = getSparkSessionInstance(rdd.context.getConf())
        rowRdd = rdd.map(lambda w: Row(airline=w[0], count=w[1]))
        wordsDataFrame = spark.createDataFrame(rowRdd)
        wordsDataFrame.createOrReplaceTempView("airlines")
    except Exception as e:    
        #print(e)
        pass
    
def get_airlines():
    spark = globals()["sparkSessionSingletonInstance"]
    return spark.sql("select * from airlines where length(airline) > 2").toPandas()

## Input socket stream

In [4]:
%run ads-server.py

In [11]:
batchInterval = 3
ssc = StreamingContext(sc, batchInterval)
socketDstream = ssc.socketTextStream("localhost", 5555)
jsonMsg = socketDstream.map(lambda x: json.loads(x))

## AIrline codes

In [12]:
jsonFlights = jsonMsg.flatMap(lambda x: x).map(lambda x: (x['callsign'][0:3], 1))
airlinesCodes = jsonFlights.reduceByKey(lambda a, b: a+b).transform(lambda rdd: 
                                                            rdd.sortBy(lambda x: x[1], ascending=False))
airlinesCodes.foreachRDD(process_airlines)

In [13]:
ssc.start()
#time.sleep(120)       
#ssc.stop()

In [9]:
spark = globals()["sparkSessionSingletonInstance"]
df_airlines = spark.sql("select * from airlines where length(airline) > 2").toPandas()

In [24]:
df_airlines = spark.sql("select * from airlines where length(airline) > 2")
df_airlines.show()

+-------+-----+
|airline|count|
+-------+-----+
|    SWR|    8|
|    DLH|    4|
|    TAR|    2|
|    BAW|    2|
|    RYR|    2|
|    EZS|    2|
|    VOE|    1|
|    TOM|    1|
|    UAE|    1|
|    AUA|    1|
|    AFR|    1|
|    GSW|    1|
|    LMU|    1|
|    IAW|    1|
|    EWG|    1|
|    FBR|    1|
|    MSR|    1|
|    SVA|    1|
|    FEG|    1|
|    THY|    1|
+-------+-----+
only showing top 20 rows

RDD is empty
RDD is empty


In [20]:
df_airlines = get_airlines()
df_airlines.dtypes

airline    object
count       int64
dtype: object

In [1]:
while(True):
    
    df_airlines = get_airlines()
    x = range(df_airlines.shape[0])
    fig, ax = plt.subplots(figsize=(14,6))
    plt.bar(x, df_airlines['count'])
    plt.xticks(x, df_airlines.airline, rotation='vertical')
    display.clear_output(wait=True)
    display.display(plt.gcf())
    time.sleep(10)
    #except KeyboardInterrupt:
    #    break

In [14]:
sc

In [2]:
import cartopy
import cartopy.feature as cpf
from matplotlib.pyplot import figure, show

ax = figure(figsize=(14,6)).gca(projection=cartopy.crs.PlateCarree())

ax.add_feature(cpf.LAND)
#ax.add_feature(cpf.OCEAN)
#ax.add_feature(cpf.COASTLINE)
ax.add_feature(cpf.BORDERS, linestyle='-')
#ax.add_feature(cpf.LAKES, alpha=0.5)
#ax.add_feature(cpf.RIVERS)

ax.set_extent([5.9962, 10.5226, 45.8389, 47.8229]) #45.8389, 47.8229, 5.9962, 10.5226

plt.plot([ny_lon, delhi_lon], [ny_lat, delhi_lat],
         color='blue', linewidth=2, marker='o',
         transform=cartopy.crs.Geodetic(),
         )

show()