# Function Notebook

In this notebook we define all of our project functions, relevants imports and Global Parameters. In each notebook with import the functions using 'libify' library.

In [2]:
### Imports ###

from sklearn.neighbors import KNeighborsClassifier

from pyspark.sql.types import *
import pyspark.sql.functions as F
import pickle
from pyspark.sql import SQLContext
from scipy.stats import beta 
from sklearn import preprocessing
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.elasticsearch:elasticsearch-hadoop:7.9.3 pyspark-shell'

!python -m pip install elasticsearch
from elasticsearch import Elasticsearch

from pyspark.sql.functions import date_format

import matplotlib.pyplot as plt
import matplotlib.image as mpimg

import pandas as pd # for small table fancy display only

import pyspark
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoderEstimator, Normalizer, StringIndexerModel, OneHotEncoderModel, VectorSizeHint
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.linalg import SparseVector, DenseVector,Vectors, VectorUDT
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.evaluation import RegressionEvaluator,  BinaryClassificationEvaluator, MulticlassClassificationEvaluator

from IPython.display import Image
from IPython.core.display import HTML

import matplotlib.pyplot as plt 
import numpy as np

import sys

import random
import string

In [3]:
## Global Parameters ##
elastic_settings_mappings_basic = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0,
        "refresh_interval" : -1
    }
}

elastic_settings_mappings_centorids = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0,
        "refresh_interval" : -1
    },
    "mappings": {
        "properties": {
            "centroids" : { "type": "geo_point" }
        }
    }
}

elastic_settings_mappings_date = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0,
        "refresh_interval" : -1
    },
    "mappings": {
        "properties": {
            "date" : { "type": "date"},
        }
    }
}

# ES_HOST = 'da2020w-0001.eastus.cloudapp.azure.com'
ES_HOST = '10.0.0.5'
es = Elasticsearch([{'host': ES_HOST}], timeout=600000)

base_path = "/dbfs/FileStore/shared_uploads/shubi@campus.technion.ac.il/"

In [4]:
### Helper Functions ###
def create_warehouse_df(streaming=False, USE_TAKE=False, min_date = None, max_date = None, path_ip=None):
    ### Schema ###
    schema = pickle.load(open("/dbfs/mnt/schema.pkl", "rb"))
    try:
        if not streaming:
            ### Load and Transfrom df ###
            df = spark.read.json(path_ip, schema=schema)

            if USE_TAKE:
                df = spark.createDataFrame(df.take(100000)).cache()

        else:  # Means we are Streaming
            """
                Here we read the stream from Kafkfa. 
                We specifiy the IP to read from, 
                the Topics to subscribe to (by Vehicle ID)
                the offset - which means from what point in time to take the data - in our case 'earliest', which means all
            """
            # Subscribe to multiple topics
            if USE_TAKE: # # 28051 = 23,057 records, vehicle_id - 28052 - 14,692
                kafka_raw_df = spark \
                  .readStream \
                  .format("kafka") \
                  .option("kafka.bootstrap.servers", path_ip) \
                  .option("subscribe", "vehicleId_28051, vehicleId_28052, vehicleId_33317, vehicleId_33318") \
                  .option("startingOffsets", "earliest") \
                  .load()

            # Subscribe to a pattern
            else:
                kafka_raw_df = spark \
                  .readStream \
                  .format("kafka") \
                  .option("kafka.bootstrap.servers", path_ip) \
                  .option("subscribePattern", "vehicleId_.*") \
                  .option("startingOffsets", "earliest") \
                  .load()

            df = kafka_raw_df.selectExpr("CAST(value AS STRING)") \
                       .select(F.from_json(F.col("value"), schema=schema).alias('json')) \
                       .select("json.*")
    except Exception:
        return "problem"
    
    df = df.withColumn('timestamp', df['timestamp']['$numberLong'].cast('bigint'))\
         .withColumn('calendar', df['calendar']['$numberLong'].cast('bigint'))
    

    df = df.withColumn('date', F.from_unixtime((df['timestamp'])/1000, 'yyyy-MM-dd HH:mm:ss').cast('timestamp'))\
         .withColumn('loc', df['loc']['coordinates'])\
         .drop("timestamp")\
         .withColumn('calendar', F.from_unixtime((df['calendar'])/1000000, 'yyyy-MM-dd HH:mm:ss').cast('timestamp'))\
         .withColumn('systemTimestamp', F.to_timestamp(df['systemTimestamp']))
    
    if USE_TAKE and min_date and max_date:
        df = df.where(df.date.between(min_date, max_date))

    df = df.drop("_id","calendar", "systemTimestamp", "probability", "poiId", "poiId2", "longitude", "latitude", "direction", "dateTypeEnum", "dateType", "currentHour", "angle", "filteredActualDelay", 'anomaly', 'gridID', 'actualDelay', 'delay', 'justLeftStop', 'vehicleSpeed')

    df = df.withColumn("is_weekend", date_format("date", 'EEE').isin(["Sat", "Sun"]).cast("int"))

    df = df.withColumn("currentHour", F.hour("date"))

    df = df.withColumn('ellapsedTime',df['ellapsedTime']/1000)

    df = df.drop('lineId')
    
#     df = df.dropDuplicates()

    return df

@F.udf
def hour_group(hour):
    if hour < 4:
        return int(0) # '0-3'
    elif hour < 8:
        return int(1) # '4-7'
    elif hour < 12:
        return int(2) # '8-11'
    elif hour < 16:
        return int(3) #'12-15'
    elif hour < 20:
        return int(4) #'16-19'
    else:
        return int(5) #'20-23'
      
@F.udf
def number_to_hour_group(hour):
    return {0:'0-3', 1:'4-7', 2:'8-11',3:'12-15',4:'16-19',5:'20-23'}[hour]


def column_to_index_with_pre_index(df, indexer_path):
    indexer = StringIndexerModel.load(indexer_path)
    df_indexed = indexer.transform(df)
    return df_indexed

def column_to_ine_hot_with_pre_encoder(df_indexed, encoder_path):
    encoder = OneHotEncoderModel.load(encoder_path)
    df_encoded = encoder.transform(df_indexed)
    return df_encoded

def sparse_to_array(v):
    v = DenseVector(v)
    new_array = list([float(x) for x in v])
    return new_array

sparse_to_array_udf = F.udf(sparse_to_array, ArrayType(FloatType()))

to_vector = udf(lambda a: Vectors.dense(a), VectorUDT())

def read_elastic(index, query="", scroll_size="10000", array_field=""):
    if not es.indices.exists(index):
        raise Exception("Index doesn't exist")

    return spark.read\
                .format("org.elasticsearch.spark.sql")\
                .option("es.nodes.wan.only","true")\
                .option("es.port","9200")\
                .option("es.nodes",ES_HOST)\
                .option("es.nodes.client.only", "false")\
                .option("pushdown", "true")\
                .option("es.query", query)\
                .option("es.scroll.size", scroll_size)\
                .option("es.scroll.keepalive", "120m")\
                .option("es.read.field.as.array.include", array_field)\
                .load(index)

def get_random_string(length):
    letters = string.ascii_lowercase
    result_str = ''.join(random.choice(letters) for i in range(length))
    return result_str

def write_to_elastic(df_to_upload, index_name: str, settings, overwrite=False, streaming=True):    
    if not es.indices.exists(index_name):
        es.indices.create(index=index_name, ignore=400, body=settings)        
    elif overwrite:
        es.indices.delete(index=index_name)
        es.indices.create(index=index_name, ignore=400, body=settings)
    else:
        print("Index already exists and overwrite flag is false")
        return
        
    if streaming:
        df_to_upload.writeStream.format("org.elasticsearch.spark.sql")\
        .option("es.resource", index_name)\
        .option("es.nodes.wan.only","true")\
        .queryName(index_name)\
        .option("es.port","9200")\
        .option("es.nodes",ES_HOST)\
        .option("es.nodes.client.only", "false")\
        .outputMode('append')\
        .option("checkpointLocation", "/home/vmadmin/stream"+get_random_string(8))\
        .start() 
    
    else:
        df_to_upload.write.format("org.elasticsearch.spark.sql")\
            .option("es.resource", index_name)\
            .option("es.nodes.wan.only","true")\
            .option("es.port","9200")\
            .option("es.nodes",ES_HOST)\
            .option("es.nodes.client.only", "false")\
            .save()

 

@F.udf
def checkLocation(areaId, locationList):
    if areaId in locationList:
        return 1
    else:
        return 0
  
@F.udf
def checkDate(date, event_dates_list):
    if date in event_dates_list:
        return 1
    else:
        return 0  

def KNN(XTrain, yTrain, XTest):
    knn = KNeighborsClassifier(n_neighbors=1)
    knn.fit(XTrain, yTrain)
    predictions = knn.predict(XTest)
    return predictions

def recreate_locations_centroids():
    df_centroids = read_elastic('areasidscentroids')
    locations_df = read_elastic('locations')
    
    locationList = list(locations_df.select("areaId1").distinct().rdd.flatMap(lambda x: x).collect())
    
    XTrain = df_centroids.select("lon_mean", "lat_mean").toPandas()
    yTrain = df_centroids.select("areaId1").toPandas()
    XTest = locations_df.select('lon','lat').toPandas()
    
    yPred = KNN(XTrain, yTrain, XTest)
    
    locations_centroids = locations_df.withColumn("centroids", F.array(F.col("lon"), F.col("lat")))
    locationNames = list(locations_centroids.select("locationName").rdd.flatMap(lambda x: x).collect())
    centroids = list(locations_centroids.select("centroids").rdd.flatMap(lambda x: x).collect())
    
    locations_centroids = spark.createDataFrame([[loc, int(area), cent] for loc, area, cent in zip(locationNames, yPred, centroids)], schema=["locationNames", "areaId1", "centroids"])
    
    return locations_centroids
    
def hydrate_data(data):
    
    # Load from Elastic - locations
    locations_df = read_elastic('locations')
    locationList = list(locations_df.select("areaId1").distinct().rdd.flatMap(lambda x: x).collect())
    
    # Load from Elastic - events
    event_dates = read_elastic('events').withColumn("date", F.to_date(F.col("date")).cast("string"))
    event_dates_list = list(event_dates.select("date").rdd.flatMap(lambda x: x).collect())
    
    df_hydrated = data.withColumn("eventsList", F.array([F.lit(i) for i in event_dates_list]))\
                .withColumn('day', date_format("date", 'yyyy-MM-dd'))\
                .withColumn('TimestampDay', F.to_timestamp(F.col('day')))\
                .withColumn("eventDate", checkDate(F.col('day'), F.col('eventsList')))\
                .withColumn("locationList", F.array([F.lit(int(i)) for i in locationList]))\
                .withColumn("locationAread1", checkLocation(F.col('areaId1'), F.col("locationList")))
    
    return df_hydrated


def filter_data(df, params):

    def filter_option(df, option):
        p = params[option]
        df = df.where(df[option]==p)
        return df
    df = filter_option(df, 'isWeekend')
    df = filter_option(df, 'isEventDate')
    df = filter_option(df, 'isPassedLocation')
    
    hourGroup_dict={'0-3':0, '4-7':1, '8-11':2,'12-15':3,'16-19':4,'20-23':5}
    hourGroups = [int(hourGroup_dict[h]) for h in params['hourGroup'].split(',')]
    if len(hourGroups) <6:
        df = df.where(df['hourGroup'].isin(hourGroups))
    
    return df
    
    
    
# Create Aggregate data for prediction Task
def aggregate_data(data_frame, streaming=False):   

    processed_df = data_frame.withColumn('hourGroup', hour_group(F.col('currentHour')).cast("int"))
    # Treat Uncertain Data  
    lower_thresh = 0
    upper_thresh = 600
    processed_df = processed_df.where(processed_df.ellapsedTime.between(lower_thresh, upper_thresh)) 

    ### Select Relevant Columns ### 
    processed_df = processed_df.select("day", "areaId1", "distanceCovered", "ellapsedTime", "journeyPatternId", "vehicleId", 'busStop', "hourGroup", "congestion", "locationAread1", "eventDate", 'TimestampDay')
    
    df_indexed = column_to_index_with_pre_index(processed_df, f"{base_path}indexer_areaId1")
    df_encoded = column_to_ine_hot_with_pre_encoder(df_indexed, f"{base_path}encoder_areaId1Index").withColumn('areaIds', sparse_to_array_udf('areaId1IndexVec')).drop("areaId1IndexVec", "areaId1")
    
    num_area_id = 12
    ### Groupby & Aggregate ###
    if streaming:
        grouped_df = df_encoded.withWatermark("TimestampDay", "1 days").groupBy("journeyPatternId", "vehicleId", "TimestampDay", "hourGroup")\
                           .agg(F.array(*[F.max(F.col("areaIds")[i]) for i in range(num_area_id)]).alias("areaIds"),\
                                F.sum("distanceCovered").alias("TotalDistance"), F.sum("ellapsedTime").alias("TotalTime"),\
                                F.max('eventDate').alias("isEventDate"),\
                                F.max("locationAread1").alias("isPassedLocation"),\
                                (F.sum(F.col("congestion").cast("int"))/F.count("congestion")).alias("congestionPercentage"),\
                               F.count("*").alias("recordCount"))\
                            .withColumn("isWeekend", date_format("TimestampDay", 'EEE').isin(["Sat", "Sun"]).cast("int"))
        
    else:
        grouped_df = df_encoded.groupBy("journeyPatternId","vehicleId", "day", "hourGroup")\
                           .agg(F.array(*[F.max(F.col("areaIds")[i]) for i in range(num_area_id)]).alias("areaIds"),\
                                F.sum("distanceCovered").alias("TotalDistance"), F.sum("ellapsedTime").alias("TotalTime"),\
                                F.max('eventDate').alias("isEventDate"),\
                                F.max("locationAread1").alias("isPassedLocation"),\
                                (F.sum(F.col("congestion").cast("int"))/F.count("congestion")).alias("congestionPercentage"),\
                               F.count("*").alias("recordCount"))\
                            .withColumn("isWeekend", date_format("day", 'EEE').isin(["Sat", "Sun"]).cast("int"))    
        
        
        
    grouped_df = grouped_df.where(grouped_df['recordCount'] > 20).drop("recordCount")
        
    grouped_df = grouped_df.select("*",F.when(F.col("congestionPercentage") > 0, 1.0).otherwise(0.0).alias('isCongestion'))
        
    return grouped_df



# Process data for prediction task  - 1. OneHot Encode journeyPatternId & hourGroup; 2. Assemble as Vector  
def encode_and_assemble(data_frame):
    data_frame_fixed = data_frame.withColumn('isEventDate', F.col('isEventDate').cast('int'))\
                                 .withColumn('isPassedLocation', F.col('isPassedLocation').cast('int'))
    
    ### OneHot Encode journeyPatternId & hourGroup ###    
    indexed_JPI = column_to_index_with_pre_index(data_frame_fixed, f"{base_path}indexer_journeyPatternId")
    encoded_JPI = column_to_ine_hot_with_pre_encoder(indexed_JPI, f"{base_path}encoder_journeyPatternIdIndex").withColumn('journeyPatternIds', sparse_to_array_udf('journeyPatternIdIndexVec'))\
              .drop("journeyPatternIdIndexVec","journeyPatternIdIndex")
    
    indexed_HG = column_to_index_with_pre_index(encoded_JPI, f"{base_path}indexer_hourGroup")
    encoded_HG = column_to_ine_hot_with_pre_encoder(indexed_HG, f"{base_path}encoder_hourGroupIndex").withColumn('hourGroups', sparse_to_array_udf('hourGroupIndexVec'))\
              .drop("hourGroupIndexVec", "hourGroupIndex")

    encoded = encoded_HG.withColumn("areaIds", to_vector("areaIds"))\
                     .withColumn("journeyPatternIds", to_vector("journeyPatternIds"))\
                     .withColumn("hourGroups", to_vector("hourGroups"))

    features_list = ["areaIds", "TotalDistance", "isWeekend", "journeyPatternIds", "hourGroups", "TotalTime", 'isEventDate', 'isPassedLocation'] 
    
    sizeHint = VectorSizeHint(inputCol="journeyPatternIds", handleInvalid="skip", size=583)    
    datasetWithSize1 = sizeHint.transform(encoded)
    sizeHint = VectorSizeHint(inputCol="hourGroups", handleInvalid="skip", size=5)    
    datasetWithSize2 = sizeHint.transform(datasetWithSize1)
    sizeHint = VectorSizeHint(inputCol="areaIds", handleInvalid="skip", size=12)    
    datasetWithSize3 = sizeHint.transform(datasetWithSize2)
    
    assembler = VectorAssembler(inputCols=features_list, outputCol="features")
    
    input_data = assembler.transform(datasetWithSize3)
#     input_data = assembler.transform(encoded)

    return input_data

def train_model(data, maxIter):

    # regression Model
    lr = LogisticRegression(featuresCol='features', labelCol='isCongestion').setMaxIter(maxIter)

    # Fit and Predict 
    lrModel = lr.fit(data)
    
    return lrModel

def evaluate_model_wrapper(data):
    predictions = lrModel.transform(data)

    # Create both evaluators
    evaluatorMulti = MulticlassClassificationEvaluator(labelCol="isCongestion", predictionCol="prediction")

    evaluate_model(predictions_train, "Test")
    
    return predictions
    
    
def save_indexer(c , batch_df):
    indexer = StringIndexer(inputCol=c, outputCol= c + "Index", handleInvalid='skip')
    indexer = indexer.fit(batch_df.select(c))
    indexer.save(f"{base_path}indexer_{c}")
    
def save_encoder(c , batch_df_indexed):
    encoder = OneHotEncoderEstimator(inputCols=[c],outputCols=[c + "Vec"])
    encoder = encoder.fit(batch_df_indexed)
    encoder.save(f"{base_path}encoder_{c}")
    
def raw_data_to_predictions(streaming = True, USE_TAKE=False, min_date='2018-07-01', max_date='2018-09-11', filter_params=None, path_ip=None):
    lr_model = LogisticRegressionModel.load(f"{base_path}model_new_new")
    df = create_warehouse_df(streaming=streaming, USE_TAKE=USE_TAKE, min_date=min_date, max_date=max_date, path_ip=path_ip)
    if df=='problem':
        return 'problem'
    df = hydrate_data(df)
    df = aggregate_data(df, streaming=streaming)
    if filter_params:
        df = filter_data(df, filter_params)
    df = encode_and_assemble(df)
    predictions = lr_model.transform(df)
    return predictions


In [5]:
!pip install libify

In [6]:
import libify
libify.exporter(globals())