In [105]:
!pip install pyspark
!pip install plotly
!head metrics_df.csv

	id	vendor_id	pickup_datetime	dropoff_datetime	passenger_count	pickup_longitude	pickup_latitude	dropoff_longitude	dropoff_latitude	store_and_fwd_flag	trip_duration	metres	speed	longitude_dif	latitude_dif
0	id1080784	2	2016-02-29 16:40:21	2016-02-29 16:47:01	1	-73.95391845703125	40.77887344360352	-73.96387481689453	40.77116394042969	N	400	24.43799807447412	0.0610949951861853	-0.00995635986328125	-0.00770950317383523
1	id0889885	1	2016-03-11 23:35:37	2016-03-11 23:53:57	2	-73.98831176757811	40.73174285888672	-73.9947509765625	40.69493103027344	N	1100	72.52924603024766	0.06593567820931606	-0.006439208984389211	-0.03681182861328125
2	id0857912	2	2016-02-21 17:59:33	2016-02-21 18:26:48	2	-73.997314453125	40.721458435058594	-73.94802856445312	40.774917602539055	N	1635	141.11418608474608	0.0863083706940343	0.049285888671875	0.053459167480461645
3	id3744273	2	2016-01-05 09:44:31	2016-01-05 10:03:32	6	-73.961669921875	40.75971984863281	-73.95677947998048	40.780628204345696	N	1141	41.67413202697

In [115]:
from datetime import datetime, timedelta
import pandas as pd
import plotly.express as px
from prophet import Prophet
import math

def convert_to_datetime(str_datetime):
  return datetime(int(str_datetime[:4]), int(str_datetime[5:7]), int(str_datetime[8:10]), int(str_datetime[11:13]), int(str_datetime[14:16]), int(str_datetime[17:19]))

def del_title(line):
  temp = line.split('\t')
  if temp[3] != 'pickup_datetime':
    temp[3] = convert_to_datetime(temp[3])
    round_datetime = datetime(temp[3].year, temp[3].month, temp[3].day, temp[3].hour, 0, 0)
    return round_datetime, temp[1:]

def passenger_count(row):
  return row[0], (int(row[1][4]), 1)

def pickup_longitude(row):
  return row[0], (float(row[1][5]), 1)

def pickup_latitude(row):
  return row[0], (float(row[1][6]), 1)

def duration(row):
  return row[0], (int(row[1][10]), 1)

def distance(row):
  return row[0], (float(row[1][11]), 1)

def speed(row):
  return row[0], (float(row[1][12]), 1)

def dif_longitude(row):
  return row[0], (abs(float(row[1][13])), 1)

def dif_latitude(row):
  return row[0], (abs(float(row[1][14])), 1)

def trip_count(data):
  result = data.map(del_title)\
  .filter(lambda x: x != None)\
  .map(lambda x: (x[0], 1))\
  .reduceByKey(lambda a, b: a + b)
  return result

def time_series(data, func):
  result = data.map(del_title)\
  .filter(lambda x: x!=None)\
  .map(func)\
  .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))\
  .map(lambda x: (x[0], x[1][0] / x[1][1]))
  return result

In [2]:
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("test").setMaster("local")
sc = SparkContext(conf=conf)
data = sc.textFile("metrics_df.csv")

In [116]:
def detect_anomalies(forecast):
    forecasted = forecast[['ds','trend', 'yhat', 'yhat_lower', 'yhat_upper', 'fact']].copy()
    forecasted['anomaly'] = 0
    forecasted.loc[forecasted['fact'] > forecasted['yhat_upper'], 'anomaly'] = 1
    forecasted.loc[forecasted['fact'] < forecasted['yhat_lower'], 'anomaly'] = -1
    forecasted['importance'] = 0
    forecasted.loc[forecasted['anomaly'] ==1, 'importance'] = \
        (forecasted['fact'] - forecasted['yhat_upper'])/forecast['fact']
    forecasted.loc[forecasted['anomaly'] ==-1, 'importance'] = \
        (forecasted['yhat_lower'] - forecasted['fact'])/forecast['fact']

    return forecasted

def fit_predict_model(dataframe, interval_width = 0.99, changepoint_range = 0.8):
    m = Prophet(daily_seasonality = False, yearly_seasonality = False, weekly_seasonality = False,
                seasonality_mode = 'multiplicative',
                interval_width = interval_width,
                changepoint_range = changepoint_range)
    m = m.fit(dataframe)

    forecast = m.predict(dataframe)
    forecast['fact'] = dataframe['y'].reset_index(drop = True)
    #print('Displaying Prophet plot')
    #fig1 = m.plot(forecast)
    return forecast

def process(result):
  temp = result.collect()
  temp = pd.DataFrame(temp, columns=['ds', 'y'])
  temp = temp.sort_values(by=['ds'])
  pred = fit_predict_model(temp)
  pred = detect_anomalies(pred)
  fig = px.scatter(pred, x='ds', y='fact', color='anomaly', color_continuous_scale=["orange", "blue", "red"])
  return pred[pred['anomaly'] != 0], fig

func_list = [duration, distance, speed, dif_longitude, dif_latitude]
fig_list = []
df_anomaly_list = []

for x in func_list:
  result = time_series(data, x)
  pred, fig = process(result)
  fig_list.append(fig)
  df_anomaly_list.append(pred)

for x in fig_list:
  x.show()

DEBUG:cmdstanpy:input tempfile: /tmp/tmpkfrgw88d/d94mp8yc.json
DEBUG:cmdstanpy:input tempfile: /tmp/tmpkfrgw88d/z5un2fpy.json
DEBUG:cmdstanpy:idx 0
DEBUG:cmdstanpy:running CmdStan, num_threads: None
DEBUG:cmdstanpy:CmdStan args: ['/usr/local/lib/python3.10/dist-packages/prophet/stan_model/prophet_model.bin', 'random', 'seed=85405', 'data', 'file=/tmp/tmpkfrgw88d/d94mp8yc.json', 'init=/tmp/tmpkfrgw88d/z5un2fpy.json', 'output', 'file=/tmp/tmpkfrgw88d/prophet_modelbsnysoaq/prophet_model-20231215190938.csv', 'method=optimize', 'algorithm=lbfgs', 'iter=10000']
19:09:38 - cmdstanpy - INFO - Chain [1] start processing
INFO:cmdstanpy:Chain [1] start processing
19:09:39 - cmdstanpy - INFO - Chain [1] done processing
INFO:cmdstanpy:Chain [1] done processing
DEBUG:cmdstanpy:input tempfile: /tmp/tmpkfrgw88d/rmrq1v1x.json
DEBUG:cmdstanpy:input tempfile: /tmp/tmpkfrgw88d/kgc3dvmp.json
DEBUG:cmdstanpy:idx 0
DEBUG:cmdstanpy:running CmdStan, num_threads: None
DEBUG:cmdstanpy:CmdStan args: ['/usr/local/