In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf, col, row_number, from_unixtime, to_timestamp, to_date, hour, year, month, date_format, mean, desc, stddev, struct, avg, when, window
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import VectorAssembler
from elasticsearch import Elasticsearch, helpers
import pickle
import requests
import numpy as np

spark.conf.set("spark.sql.session.timeZone", "GMT")

ES_HOST = '10.0.0.13'
es = Elasticsearch([{'host': ES_HOST}], timeout=60000)

spark.conf.set("spark.network.timeout", 10000000)
spark.conf.set("spark.executor.heartbeatInterval", 10000000)
spark.conf.set("spark.driver.maxResultSize", "4g")
spark.conf.set('spark.sql.streaming.stopActiveRunOnRestart', True)
spark.conf.set("spark.sql.broadcastTimeout", 3600)

def exists(path):
  try:
    dbutils.fs.ls(f'dbfs:/{path}')
    return True
  except Exception as e:
    if 'java.io.FileNotFoundException' in str(e):
      return False
    else:
      raise

def remove(path, recursive=False):
  if exists(path):
    dbutils.fs.rm(f'dbfs:/{path}', recursive)

In [0]:
schema = StructType([
    StructField('_id',                 StructType([StructField('$oid', StringType(), True)]), True),
    StructField('actualDelay',         LongType(), True),
    StructField('angle',               DoubleType(), True),
    StructField('anomaly',             BooleanType(), True),
    StructField('areaId',              LongType(), True),
    StructField('areaId1',             LongType(), True),
    StructField('areaId2',             LongType(), True),
    StructField('areaId3',             LongType(), True),
    StructField('atStop',              BooleanType(), True),
    StructField('busStop',             LongType(), True),
    StructField('calendar',            StructType([StructField('$numberLong', StringType(), True)]), True),
    StructField('congestion',          BooleanType(), True),
    StructField('currentHour',         LongType(), True),
    StructField('dateType',            LongType(), True),
    StructField('dateTypeEnum',        StringType(), True),
    StructField('delay',               LongType(), True),
    StructField('direction',           LongType(), True),
    StructField('distanceCovered',     DoubleType(), True),
    StructField('ellapsedTime',        LongType(), True),
    StructField('filteredActualDelay', LongType(), True),
    StructField('gridID',              StringType(), True),
    StructField('journeyPatternId',    StringType(), True),
    StructField('justLeftStop',        BooleanType(), True),
    StructField('justStopped',         BooleanType(), True),
    StructField('latitude',            FloatType(), True),
    StructField('lineId',              StringType(), True),
    StructField('loc',                 StringType(), True),
    StructField('longitude',           FloatType(), True),
    StructField('poiId',               LongType(), True),
    StructField('poiId2',              LongType(), True),
    StructField('probability',         FloatType(), True),
    StructField('systemTimestamp',     FloatType(), True),
    StructField('timestamp',           StructType([StructField('$numberLong', StringType(), True)]), True),
    StructField('vehicleId',           LongType(), True),
    StructField('vehicleSpeed',        LongType(), True),
])

data = spark.read.json('/mnt/dacoursedatabricksstg/dacoursedatabricksdata/busFile', schema)

In [0]:
kafka_server = '10.0.0.30:9091'

# Subscribe to a pattern
kafka_raw_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_server) \
  .option("subscribePattern", "vehicleId_.*") \
  .option("startingOffsets", "earliest") \
  .load()
 
kafka_value_df = kafka_raw_df.selectExpr("CAST(value AS STRING)")
 
schema = pickle.load(open("/dbfs/mnt/schema.pkl", "rb"))

kafka_df = kafka_value_df.select(F.from_json(F.col("value"), schema=schema).alias('json')).select("json.*")
stream_data = kafka_df

In [0]:
schema = StructType([
    StructField('county', StringType(), True),
    StructField('station', StringType(), True),
    StructField('latitude', FloatType(), True),
    StructField('longitude', FloatType(), True),
    StructField('date', StringType(), True),
    StructField('rain', FloatType(), True),
    StructField('temp', FloatType(), True),
    StructField('wetb', FloatType(), True),
    StructField('dewpt', FloatType(), True),
    StructField('vappr', FloatType(), True),
    StructField('rhum', IntegerType(), True),
    StructField('msl', FloatType(), True),
    StructField('wdsp', IntegerType(), True),
    StructField('wddir', IntegerType(), True),
    StructField('sun', FloatType(), True),
    StructField('vis', IntegerType(), True),
    StructField('clht', IntegerType(), True),
    StructField('clamt', IntegerType(), True),
])
weather = spark.read.option("timestampFormat", "MM/dd/yyyy HH:mm:ss.SSSZZ").csv('/FileStore/tables/hrly_Irish_weather.csv', schema=schema, header='true')
weather = weather.filter(weather['county'] == 'Dublin').withColumn('timestamp', to_timestamp('date', 'dd-MMM-yyyy HH:mm'))
weather = weather.withColumn('calendar', to_date('timestamp')).withColumn('hour', hour('timestamp')).withColumn('year', year('timestamp'))
weather = weather.filter(weather['year'] > 2010)
w = Window().partitionBy('date')
for c in ['wdsp', 'wddir', 'sun', 'vis', 'clht', 'clamt']:
  weather = weather.withColumn(c, when(col(c).isNull(), avg(col(c)).over(w)).otherwise(col(c)))
# Fix ambiguous columns
weather = weather.withColumnRenamed('calendar', 'w_calendar').withColumnRenamed('hour', 'w_hour').withColumnRenamed('station', 'w_station').withColumnRenamed('latitude', 'w_latitude').withColumnRenamed('longitude', 'w_longitude')

In [0]:
dbutils.library.installPyPI("holidays")
import holidays
import datetime
import pandas

h = holidays.Ireland()
start = datetime.date(2010, 1, 1)
end = datetime.date(2020, 1, 1)
delta = datetime.timedelta(days=1)
ir_dates = []
ir_holidays = []
while start < end:
  ir_dates.append(start)
  ir_holidays.append(int(start in h))
  start += delta
ir_holidays = spark.createDataFrame(pandas.DataFrame({'date': ir_dates, 'holiday': ir_holidays}))
ir_holidays = ir_holidays.withColumnRenamed('date', 'h_date')

In [0]:
import json

weather_stations = {
  'CASEMENT': (53.306, -6.439),
  'DUBLIN AIRPORT': (53.428, -6.241),
  'PHOENIX PARK': (53.364, -6.35)
}
def weather_station(lat, lon):
  distance = {station: (lat - slat) ** 2 + (lon - slon) ** 2 for station, (slat, slon) in weather_stations.items()}
  return min(weather_stations, key=distance.get)

weather_station_udf = udf(weather_station, StringType())
geo_udf = udf(lambda y,x: [x, y], "array<float>")

# flatten_df = flats all the structs columns so it will be easier to read/save
def flatten_df(nested_df):
    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']

    flat_df = nested_df.select(flat_cols +
                               [F.col(nc+'.'+c).alias(nc+'_'+c)
                                for nc in nested_cols
                                for c in nested_df.select(nc+'.*').columns])
    return flat_df

def transform_data(df):
  df = df.na.drop()
  df = df.drop('loc', 'anomaly', 'dateType', 'direction', 'poiId', 'poiId2', 'probability')
  df = flatten_df(df)
  df = df.withColumnRenamed('_id_$oid', '_id').drop('_id_$oid')
  df = df.withColumnRenamed('calendar_$numberLong', 'calendar').drop('calendar_$numberLong')
  df = df.withColumnRenamed('timestamp_$numberLong', 'timestamp').drop('timestamp_$numberLong')
  df = df.withColumn('datetime', to_timestamp(from_unixtime(col('timestamp') / 1000)))
  df = df.withColumn('calendar', to_date('datetime'))
  df = df.withColumn('geo', geo_udf(df.latitude, df.longitude))
  df = df.withColumn('station', weather_station_udf('latitude', 'longitude'))
  df = df.withColumn('month', month('datetime')).withColumn('hour', hour('datetime').cast('bigint'))
  df = df.withColumn('year', year('datetime').cast('bigint')).withColumn('weekday', date_format('calendar', 'E'))
  df = df.drop('_id')
  df = df.join(weather, (df['calendar'] == weather['w_calendar']) & (df['hour'] == weather['w_hour']) & (df['station'] == weather['w_station']), how='left').join(ir_holidays, df['calendar'] == ir_holidays['h_date'], how='left')
  return df

In [0]:
data = transform_data(data)
stream_data = transform_data(stream_data)

In [0]:
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression, LinearRegressionModel
from pyspark.ml.evaluation import RegressionEvaluator

def dummy_transform(df, categoricalCols, continuousCols, labelCol):
  indexers = [StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) for c in categoricalCols]
  encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol="{0}_encoded".format(indexer.getOutputCol())) for indexer in indexers]
  assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + continuousCols, outputCol="features")
  pipeline = Pipeline(stages=indexers + encoders + [assembler])
  return pipeline.fit(df)

def delay_regression(train_data):
  regression_transform = dummy_transform(train_data, ['weekday', 'holiday'],
                                         ['distanceCovered', 'longitude', 'latitude', 'areaId', 'hour', 'month',
                                          'vehicleSpeed', 'rain', 'temp', 'wetb', 'dewpt', 'vappr', 'rhum',
                                          'msl', 'wdsp', 'wddir', 'sun', 'vis', 'clht', 'clamt'], 'delay')
  train_data = regression_transform.transform(train_data).withColumn('label', train_data.delay).select('features', 'label')
  lr = LinearRegression(maxIter=1)
  regression = lr.fit(train_data)
  return regression_transform, regression

def evaluator(regression_transform, regression):
  def evaluate(df):
    evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
    predictions = regression.transform(regression_transform.transform(df).withColumn('label', df.delay)).drop('weekday_indexed', 'holiday_indexed', 'weekday_indexed_encoded', 'holiday_indexed_encoded', 'features')
    rmse = evaluator.evaluate(predictions)
    print(rmse)
    return predictions
  return evaluate

In [0]:
if not exists('fben/transform') or not exists('fben/regression'):
  regression_transform, regression = delay_regression(data)
  regression_transform.write().overwrite().save('fben/transform')
  regression.write().overwrite().save('fben/regression')
else:
  regression_transform = PipelineModel.load('fben/transform')
  regression = LinearRegressionModel.load('fben/regression')
evaluate = evaluator(regression_transform, regression)
dbutils.fs.ls('dbfs:/fben')

In [0]:
settings_with_mapping = {
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0,
    "refresh_interval" : -1
  },
  "mappings": {
    "properties": {
      "actualDelay" : {"type": "long"},
      "areaId" : {"type": "long" },
      "areaId1" : {"type": "long" },
      "areaId2" : {"type": "long" },
      "areaId3" : {"type": "long" },
      "atStop" : {"type": "boolean" },
      "busStop" : {"type": "long" },
      "congestion" : {"type": "boolean" },
      "currentHour" : {"type": "long"},
      "delay" : {"type": "long"},
      "distanceCovered" : {"type": "long"},
      "ellapsedTime" : {"type": "long"},
      "filteredActualDelay" : {"type": "long"},
      "journeyPatternId" : {"type": "keyword" },
      "justLeftStop" : {"type": "boolean"},
      "justStopped" : {"type": "boolean"},
      "latitude" : {"type": "long"},
      "lineId" : {"type": "keyword" },
      "geo": {"type": "geo_point"},
      "longitude" : {"type": "long"},
#       "systemTimestamp" : {"type": "long"},
      "vehicleId" : {"type": "long" },
      "vehicleSpeed" : {"type": "long" },
      "calendar" : {"type": "date"},
      "datetime" : {"type": "date"},
#       "timestamp" : {"type": "long"},
      "month" : {"type": "long"},
      "hour" : {"type": "long"},
#       "year" : {"type": "long"},
      "weekday" : {"type": "keyword"},
      "station": {'type': 'keyword'},
      'rain': {'type': 'long'},
      'temp': {'type': 'long'},
      'wetb': {'type': 'long'},
      'dewpt': {'type': 'long'},
      'vappr': {'type': 'long'},
      'rhum': {'type': 'long'},
      'msl': {'type': 'long'},
      'wdsp': {'type': 'long'},
      'wddir': {'type': 'long'},
      'sun': {'type': 'long'},
      'vis': {'type': 'long'},
      'clht': {'type': 'long'},
      'clamt': {'type': 'long'},
      'holiday': {'type': 'long'},
      'label': {'type': 'long'},
      'prediction': {'type': 'long'}
    }
  }
}

data = data.select("actualDelay","areaId","areaId1","areaId2","areaId3","atStop","busStop","congestion","currentHour","delay","distanceCovered","ellapsedTime","filteredActualDelay","journeyPatternId","justLeftStop","justStopped","latitude","lineId","geo","longitude","vehicleId","vehicleSpeed","calendar","datetime","month","hour","weekday","station",'rain','temp','wetb','dewpt','vappr','rhum','msl','wdsp','wddir','sun','vis','clht','clamt','holiday')
stream_data = stream_data.select("actualDelay","areaId","areaId1","areaId2","areaId3","atStop","busStop","congestion","currentHour","delay","distanceCovered","ellapsedTime","filteredActualDelay","journeyPatternId","justLeftStop","justStopped","latitude","lineId","geo","longitude","vehicleId","vehicleSpeed","calendar","datetime","month","hour","weekday","station",'rain','temp','wetb','dewpt','vappr','rhum','msl','wdsp','wddir','sun','vis','clht','clamt','holiday')

In [0]:
if not es.indices.exists('batch_index'):
  es.indices.create(index='batch_index', ignore=400, body=settings_with_mapping)
  evaluate(data).write.mode('append').format('org.elasticsearch.spark.sql').option('es.nodes.wan.only', 'true').option('checkpointLocation', '/tmp/fben/batch_cp/').option('es.resource', 'batch_index').option('es.nodes', ES_HOST).option('es.port', '9200').save()

In [0]:
def evaluate_batch(batch, batchId):
  evaluate(batch).write.mode("append").format("org.elasticsearch.spark.sql").option("es.nodes.wan.only","true").option("checkpointLocation", "/tmp/fben/stream_cp/").option("es.resource", 'stream_index').option("es.nodes", ES_HOST).option("es.port","9200").save()
if not es.indices.exists('stream_index'):
  es.indices.create(index='stream_index', ignore=400, body=settings_with_mapping)
  stream_data.writeStream.foreachBatch(evaluate_batch).outputMode('append').start()