In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.elasticsearch:elasticsearch-hadoop:7.4.2 pyspark-shell'

# IP = 'da2019w-1019.eastus.cloudapp.azure.com'
IP = '10.0.0.25'

HERE_API_KEY = 'TarqgkWPPRHbWkLVZWCz2VAGJVHWj_B18ii-yO5pyZo'

from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import VectorAssembler
from elasticsearch import Elasticsearch, helpers
from pyspark.sql.window import Window
import requests


spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "1g") \
    .getOrCreate()

spark.conf.set("spark.sql.session.timeZone", "GMT")

es = Elasticsearch([{'host': IP}])

## Helper Functions:

In [None]:

def read_elastic(index, query="", scroll_size="10000", array_field=""):
    if not es.indices.exists(index):
        raise Exception("Index doesn't exist!")

    return spark.read\
                .format("org.elasticsearch.spark.sql")\
                .option("es.nodes.wan.only","true")\
                .option("es.port","9200")\
                .option("es.nodes",IP)\
                .option("es.nodes.client.only", "false")\
                .option("pushdown", "true")\
                .option("es.query", query)\
                .option("es.scroll.size", scroll_size)\
                .option("es.scroll.keepalive", "120m")\
                .option("es.read.field.as.array.include", array_field)\
                .load(index)

        
DEFUALT_SCEHMA = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0
    },
    "mappings": {
        "properties": {
            "actualDelay" : { "type": "long" },
            "filteredActualDelay" : { "type": "long" },
            "delay" : { "type": "long" },
            "areaId" : { "type": "long" },
            "areaId1" : { "type": "long" },
            "areaId2" : { "type": "long" },
            "areaId3" : { "type": "long" },
            "atStop" : { "type": "boolean" },
            "busStop" : { "type": "long" },
            "congestion" : { "type": "boolean" },
            "gridID" : { "type": "keyword" },
            "journeyPatternId" : { "type": "keyword" },
            "lineId" : { "type": "keyword" },
            "coordinates" : { "type": "geo_point" },
            "timestamp" : { "type": "date", "format" : "epoch_millis" },
            "vehicleId" : { "type": "long" },
            "dateTime" : { "type": "date" }
        }
    }
}

def write_to_elastic(df, index: str, settings=DEFUALT_SCEHMA, append=True):
    if es.indices.exists(index) and not append:
        es.indices.delete(index=index)
    
    es.indices.create(index=index, ignore=400, body=settings)

    df.write.format("org.elasticsearch.spark.sql")\
        .option("es.resource", index)\
        .option("es.nodes.wan.only","true")\
        .option("es.port","9200")\
        .option("es.nodes",IP)\
        .option("es.nodes.client.only", "false")\
        .save()

@F.udf(ArrayType(DoubleType()))
def get_station_coord(stopNumber):
    res = requests.get(f"https://data.smartdublin.ie/cgi-bin/rtpi/busstopinformation?stopid={stopNumber}").json()
    if not res['errorcode'] == '0' or not res['results']:
        return [0, 0]
    res = res['results'][0]
    return [float(res['longitude']), float(res['latitude'])]


def calculate_centroids(df):
    centroid_df = df.groupBy('busStop')\
                    .agg(F.mean(df.coordinates[0]).alias('centroid_longitude'), 
                            F.mean(df.coordinates[1]).alias('centroid_latitude'))

    centroid_df = centroid_df.withColumn("coordinates", F.array('centroid_longitude', 'centroid_latitude'))\
                                .drop('centroid_longitude', 'centroid_latitude')
    return centroid_df

from math import radians, cos, sin, asin, sqrt

@F.udf("float")
def get_distance(coord_a, coord_b):
    longit_a, latit_a = coord_a
    longit_b, latit_b = coord_b
    if None in [longit_a, latit_a, longit_b, latit_b]:
        return 9999
    # Transform to radians
    longit_a, latit_a, longit_b, latit_b = map(radians, [longit_a,  latit_a, longit_b, latit_b])
    dist_longit = longit_b - longit_a
    dist_latit = latit_b - latit_a
    # Calculate area
    area = sin(dist_latit/2)**2 + cos(latit_a) * cos(latit_b) * sin(dist_longit/2)**2
    # Calculate the central angle
    central_angle = 2 * asin(sqrt(area))
    radius = 6371
    # Calculate Distance
    distance = central_angle * radius
    return abs(round(distance, 4))

def add_distance_to_centroid(centroid_df, stop_df, drop_centroid_col=True):
    c_df = centroid_df.selectExpr("coordinates as c_coordinates", "busStop as c_busStop")
    left_join = stop_df.join(c_df, stop_df['busStop'] == c_df['c_busStop'], how='inner')
    res = left_join.withColumn('distance', get_distance(left_join.c_coordinates, left_join.coordinates)).drop('c_busStop')
    if drop_centroid_col:
        return res.drop('c_coordinates')
    return res

@F.udf(ArrayType(DoubleType()))
def merge_coordinates(longitude, latitude):
    return [float(longitude), float(latitude)]

@F.udf("float")
def normalize_text_distance(name1, name2, distance):
    return distance / max(len(name1), len(name2))


def lev_distance(s1,s2):
    if len(s1) > len(s2):
        s1,s2 = s2,s1
    distances = range(len(s1) + 1)
    for index2,char2 in enumerate(s2):
        newDistances = [index2+1]
        for index1,char1 in enumerate(s1):
            if char1 == char2:
                newDistances.append(distances[index1])
            else:
                newDistances.append(1 + min((distances[index1],
                                             distances[index1+1],
                                             newDistances[-1])))
        distances = newDistances
    return distances[-1]

@F.udf('float')
def get_text_distance(station_name, reverse_gecode):
    lev_dist = 2**10
    
    if reverse_gecode and station_name:
        lev_dist = min([lev_distance(station_name, address)/max(len(station_name), len(address)) for address in reverse_gecode if address])
    return lev_dist


STOPWORDS = ['avenue', 'ave', 'blvd', 'boulevard', 'box', 'cir', 'court', 'ct', 'drive', 'dr', 'lane', 'ln', 'loop', 'lp', 'pl', 'place', 'po', 'pob', 'pt', 'rd', 'road', 'route', 'rr', 'rte', 'rural', 'sq', 'st', 'ste', 'street', 'suit', 'trl', 'way', 'wy']

def extract_address(result):
    address = []
    try:
        address = result['Location']['Address']['Street']
    except:
        address = result['Location']['Address']['Label']
    return ' '.join(filter(lambda word: word.lower().rstrip('.') not in STOPWORDS, address.split()))

@F.udf(ArrayType(StringType()))
def reverse_gecode(coords):
    lng, lat = coords
    params = {'prox' : f"{lat}, {lng}, 5", 'mode' : 'retrieveAddresses', 'apiKey' : HERE_API_KEY}
    results = requests.get("https://reverse.geocoder.ls.hereapi.com/6.2/reversegeocode.json", params=params)\
                                                                                    .json()['Response']['View'][0]['Result']
    addresses = list(set(map(extract_address, results)))
    
    return addresses


@F.udf(BooleanType())
def is_approx_near(coords_a, coords_b, decimal=5):
    lng_a, lat_a = coords_a
    lng_b, lat_b = coords_b

    return (round(lng_a, decimal) == round(lng_b, decimal)) and (round(lat_a, decimal) == round(lat_b, decimal))

# ETL

In [None]:
schema = StructType([StructField('_id',StructType([StructField('$oid',StringType(),True)]),True),
                     StructField('actualDelay',LongType(),True),
                     StructField('angle',DoubleType(),True),
                     StructField('anomaly',BooleanType(),True),
                     StructField('areaId',LongType(),True),
                     StructField('areaId1',LongType(),True),
                     StructField('areaId2',LongType(),True),
                     StructField('areaId3',LongType(),True),
                     StructField('atStop',BooleanType(),True),
                     StructField('busStop',LongType(),True),
                     StructField('calendar',StructType([StructField('$numberLong',StringType(),True)]),True),
                     StructField('congestion',BooleanType(),True),
                     StructField('currentHour',LongType(),True),
                     StructField('dateType',LongType(),True),
                     StructField('dateTypeEnum',StringType(),True),
                     StructField('delay',LongType(),True),
                     StructField('direction',LongType(),True),
                     StructField('distanceCovered',DoubleType(),True),
                     StructField('ellapsedTime',LongType(),True),
                     StructField('filteredActualDelay',LongType(),True),
                     StructField('gridID',StringType(),True),
                     StructField('journeyPatternId',StringType(),True),
                     StructField('justLeftStop',BooleanType(),True),
                     StructField('justStopped',BooleanType(),True),
                     StructField('latitude',DoubleType(),True),
                     StructField('lineId',StringType(),True),
                     StructField('loc',StructType([StructField('coordinates',ArrayType(DoubleType(),True),True),StructField('type',StringType(),True)]),True),
                     StructField('longitude',DoubleType(),True),
                     StructField('poiId',LongType(),True),
                     StructField('poiId2',LongType(),True),
                     StructField('probability',DoubleType(),True),
                     StructField('systemTimestamp',DoubleType(),True),
                     StructField('timestamp',StructType([StructField('$numberLong',StringType(),True)]),True),
                     StructField('vehicleId',LongType(),True),
                     StructField('vehicleSpeed',LongType(),True)])

## Read File:

In [None]:
df = spark.read.json('/datashare/busFile', schema=schema)\
        .withColumn('timestamp', df['timestamp']['$numberLong'].cast('bigint'))\
        .withColumn('dateTime', F.to_timestamp((df['timestamp'] / 1000)))\
        .withColumn('coordinates', df['loc']['coordinates'])\
        .drop(  
                '_id',
                'loc',
                'calendar',
                'currentHour', 
                'systemTimestamp', 
                'dateType', 
                'dateTypeEnum',
                'direction',
                'poiId',
                'poiId2',
                'anomaly',
                'probability',
                'distanceCovered',
                'ellapsedTime',
                'vehicleSpeed',
                'justLeftStop',
                'justStopped',
                'angle')

## Read from Stream:

In [None]:
StreamingPath = "/datashare/busFile"

inputDf = spark.readStream\
                .json(StreamingPath, schema=schema)\
                .withColumn('timestamp', df['timestamp']['$numberLong'].cast('bigint'))\
                .withColumn('dateTime', F.to_timestamp((df['timestamp'] / 1000)))\
                .withColumn('coordinates', df['loc']['coordinates'])\
                .drop(  
                        '_id',
                        'loc',
                        'calendar',
                        'currentHour', 
                        'systemTimestamp', 
                        'dateType', 
                        'dateTypeEnum',
                        'direction',
                        'poiId',
                        'poiId2',
                        'anomaly',
                        'probability',
                        'distanceCovered',
                        'ellapsedTime',
                        'vehicleSpeed',
                        'justLeftStop',
                        'justStopped',
                        'angle')

## Save to DWH:

In [None]:
write_to_elastic(df, "dublin-bus-full", append=False)

## Calculate station centroids:

In [None]:
dublin_bus_at_stop = read_elastic('dublin-bus-full').filter("atStop")
# write_to_elastic(dublin_bus_at_stop, "dublin-bus-at-stop", append=False)

In [None]:
q ="""{
  "query": {
    "filtered": {
      "filter": {
        "exists": {
          "field": "label"
        }
      },
      "query": {
        "match_all": {}
      }
    }
  }
}"""


In [None]:
dublin_bus_at_stop = read_elastic('dublin-bus-full').filter("atStop")
# TODO: Calculate centroids through Elastic query

'''
POST /dublin-bus-full/_search?size=1000
{
  "query": {
      "bool": {
        "must": {
            "term": {
                "atStop": "true"

            }
        }
      }
    },
    "aggs" : {
        "busStop" : {
            "terms" : { "field" : "busStop" },
            "aggs" : {
                "centroid" : {
                    "geo_centroid" : { "field" : "coordinates" }
                }
            }
        }
    }
}
'''

index = 'stop-index'

search = {
    "size": 0,
    "aggs" : {
        "busStop" : {

            "terms" : {
                     "field" : "busStop"
            },
            "aggregations" : { 
                "lineId": {
                    "terms": {"field": "lineId"}
                }
            }
        }
    }

}
res = es.search(index=index, body=search)



centroid_df = calculate_centroids(dublin_bus_at_stop.select("busStop", "coordinates"))

settings = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0
    },
    "mappings": {
        "properties": {
            "busStop" : { "type": "long" },
            "coordinates" : { "type": "geo_point" },
        }
    }
}

write_to_elastic(centroid_df, index="dublin-bus-at-stop-station-centroids", settings=settings, append=False)

## Filter distance > 0.5Km :

In [None]:
es_centroid_df = read_elastic("stop-centroid-index")

filter_stop = add_distance_to_centroid(es_centroid_df, es_df).filter("distance < 0.5").drop('distance')

settings = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0
    },
    "mappings": {
        "properties": {
            "actualDelay" : { "type": "long" },
            "areaId" : { "type": "long" },
            "areaId1" : { "type": "long" },
            "areaId2" : { "type": "long" },
            "areaId3" : { "type": "long" },
            "atStop" : { "type": "boolean" },
            "busStop" : { "type": "long" },
            "congestion" : { "type": "boolean" },
            "gridID" : { "type": "keyword" },
            "journeyPatternId" : { "type": "keyword" },
            "lineId" : { "type": "keyword" },
            "coordinates" : { "type": "geo_point" },
            "timestamp" : { "type": "date", "format" : "epoch_millis" },
            "vehicleId" : { "type": "long" },
            "dateTime" : { "type": "date" }
        }
    }
}

write_to_elastic(filter_stop, 'filter-500-index', settings=settings, append=False)

In [None]:
def calculate_cenroids(index):
    centroids = []
    num_partitions = 10

    for partition in range(num_partitions):
        search = {
            "size" : 1,
            "query": {
            "bool": {
                "must": {
                    "term": {
                        "atStop": "true"

                    }
                }
            }
            },
            "aggs" : {
                "busStop" : {
                    "terms" : { 
                    "field" : "busStop",
                    "size" : 5000,
                    "include": {
                    "partition": partition,
                    "num_partitions": num_partitions
                        }
                    },
                    "aggs" : {
                        "centroid" : {
                            "geo_centroid" : { "field" : "coordinates" }
                        }
                    }
                }
            }

        }
        res = es.search(index=index, body=search)
        for bucket1 in res['aggregations']['busStop']['buckets']:
            busStop = bucket1['key']
            coordinates = [bucket1['centroid']['location']['lon'],  bucket1['centroid']['location']['lat']]
            centroids.append((busStop, coordinates))
                
    schema = StructType([StructField('busStop',LongType(),True),
                        StructField('coordinates',ArrayType(DoubleType(),True))])
    centroids_df = spark.createDataFrame(centroids, schema)
    return centroids_df