In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import col, udf, struct, from_json
from pyspark.sql.types import DateType, TimestampType, FloatType
import pickle as pkl
import numpy as np
import pandas as pd

In [0]:
import numpy as np
import pandas as pd

def sigmoid(z):
  z = np.array(z, dtype=np.float64)
  return 1/(1+np.exp(-z))

class OurModel:
  def __init__(self, input_size, output_size, hidden_size = 512, w1 = None, w2 = None):
    self.hidden_size = hidden_size
    self.output_size = output_size
    self.w1 = w1
    if w1 is None:
      self.w1 = np.random.normal(size=(hidden_size,input_size),scale = 0.01)
    self.w2 = w2
    if w2 is None:
      self.w2 = np.random.normal(size=(output_size,hidden_size),scale = 0.01)  # 10 like number of possible classifications
    self.loss = 0 # at the start of every epoch should be set to 0

  def forward(self, x):
    x = np.array(x, ndmin=2)
    self.z1 = np.dot(self.w1,x.T)
    self.hidden = sigmoid(self.z1) 
    self.z2 = np.dot(self.w2,self.hidden)
    y_hat = sigmoid(self.z2)
    return y_hat


  def backward(self, x, y, y_hat, lr = 0.005):
    X = np.array(x, ndmin=2)
    y = np.array(y, ndmin=2).T
    batch_size = y.shape[1]

    # looking for dl_dw2
    dl_dy_hat = (2/batch_size)*(y_hat - y)
    dy_hat_dz2 =  y_hat * (1- y_hat) # this is excatly the gradient of the sigmoid as dsig_dx = sig(x)*(1-sig(x))
    dl_dw2 = np.dot(dl_dy_hat * dy_hat_dz2, self.hidden.T) # hidden.T is dz_dw2

    # looking for dl_dw1
    dl_dh = np.dot(self.w2.T, dl_dy_hat)
    dh_dz1 = self.hidden * (1-self.hidden) 
    dl_dz1 = dl_dh * dh_dz1

    # updating the weights accordingly
    self.w1 -= lr * np.dot(dl_dz1, x) # x is dz1_dw1
    self.w2 -= lr*dl_dw2

# Initializing some lists that help keep track
losses = []
accuracies = []
ys = []
y_hats = []
recalls = []
labels_sum = []
labels_tables = []

def prep_forward_n_back(kafka_raw_df,y):
  import numpy as np
  
  is_c = udf(lambda x: 1 if x==True else 0)  
  get_hour_date = udf(lambda x: str(x)[:13])
  
  # Preprocessing
  df_value = kafka_raw_df.withColumn("value", col("value").cast("string")).select("value").withColumn("json", from_json( col("value"), schema=schema)).select("json.*")
  df_value = df_value.withColumn("timestamp", col("timestamp.$numberLong").cast("bigint")).withColumn("timestamp", col("timestamp")/1000).withColumn("date_timestamp", col("timestamp").cast(TimestampType()))\
  .withColumn("hourRounded", get_hour_date(col("date_timestamp")))
  true_values = df_value.filter(col("congestion") == True)
  
  # Balancing classes
  sample = df_value.filter(col("congestion") == False).sample(withReplacement=False, fraction=0.01)
  df_value = true_values.union(sample)
  labels_table = df_value.select("congestion").withColumn("is_congestion",is_c(col("congestion"))).drop("congestion")
  
  # Turning everything into pandas/numpy
  df_value = df_value.select(["currentHour","latitude","longitude","hourRounded"])
  X = df_value.toPandas()
  labels_tables.append(labels_table)
  y = np.array(labels_table.toPandas()).astype('float')
  temp_pd = temp.toPandas()
  ys.append(y)
  batch_size = len(X)
  
  
  # Adding weather
  X = X.merge(right=temp_pd, on='hourRounded', how='left').reset_index().drop('hourRounded',axis=1)
  X = np.array(X)
 
  # Just for keeping track on number of records from each label to see that it's balanced
  batch_size = len(X)
  congests = sum([1 if y[i,0] == 1 else 0 for i in range(batch_size)])
  non_congests = sum([1 if y[i,0] == 0 else 0 for i in range(batch_size)])
  labels_sum.append((congests,non_congests))
  
  # Training
  y_hat = model.forward(X)
  y_hats.append(y_hat)
  batch_acc = sum([y[i,0] == round(y_hat[0,i]) for i in range(batch_size)])/(batch_size if batch_size!=0 else 1)
  total_congestions = sum([1 if y[i,0] == 1 else 0 for i in range(batch_size)])
  batch_recall = sum([1 if y[i,0] == 1 and round(y_hat[0,i]) == 1 else 0 for i in range(batch_size)])/(total_congestions if total_congestions!=0 else 1)
  recalls.append(batch_recall)
  accuracies.append(batch_acc)
  loss = (1/batch_size)*np.linalg.norm(y.T-y_hat)
  losses.append(loss)
  pkl.dump(model.w1,open("/dbfs/FileStore/avi_maxim_models/w1.pkl",'wb'))
  pkl.dump(model.w2,open("/dbfs/FileStore/avi_maxim_models/w2.pkl",'wb'))
  model.backward(X, y, y_hat)

In [0]:
model = OurModel(input_size=12,output_size=1,hidden_size =20)

In [0]:
with open("/dbfs/FileStore/tables/schema.pkl", "rb") as f:
  schema = pkl.load(f)

kafka_server = '10.0.0.30:9091'
  

kafka_raw_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_server) \
  .option("subscribePattern", "vehicleId_.*") \
  .option("startingOffsets", "earliest") \
  .option("maxOffsetsPerTrigger", 50) \
  .load()


In [0]:
get_hour_date = udf(lambda x: str(x)[:13])
w = spark.read.option("inferSchema", True).option("header", True).csv("/FileStore/tables/relevant_weather_updated.csv")
w = w.withColumn("hourRounded", get_hour_date(col("date")))
temp = w.select("hourRounded", "rain", "temp", "wetb", "dewpt", "vappr", "rhum", "msl", "vis").groupby("hourRounded").agg({"rain": "avg", "temp":"avg", "wetb":"avg", "dewpt":"avg", "vappr":"avg", "rhum":"avg", "msl":"avg", "vis":"avg"}).withColumnRenamed("avg(temp)", "temp").withColumnRenamed("avg(msl)", "msl").withColumnRenamed("avg(vis)", "vis").withColumnRenamed("avg(rain)", "rain").withColumnRenamed("avg(vappr)", "vappr").withColumnRenamed("avg(rhum)", "rhum").withColumnRenamed("avg(dewpt)", "dewpt").withColumnRenamed("avg(wetb)", "wetb")
temp.show(5)

In [0]:
kafka_raw_df.writeStream.foreachBatch(prep_forward_n_back).start()