# Bandwidth per location

In [56]:
from pyspark.sql import SQLContext
import pandas as pd
import pymysql
import warnings
warnings.filterwarnings("ignore")
from datetime import datetime, timedelta
import logging
from tqdm import tqdm
from fbprophet import Prophet
from sklearn.metrics import mean_squared_error as mse
import math

## Logging ##

import os
import sys


from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
import os
import pandas as pd

from pyspark.sql import SparkSession
import datetime
full_t1 = datetime.datetime.now()
# initialise sparkContext
spark1 = SparkSession.builder \
    .master('local') \
    .appName('p7_sample') \
    .config('spark.executor.memory', '24gb') \
    .config("spark.cores.max", "16") \
    .getOrCreate()

sc = spark1.sparkContext

# using SQLContext to read parquet file
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

def create_prophet_m(app_name,z1,delay=24):
    
    ### --- For realtime pred ---###
    
    full_df = z1.bw.iloc[0:len(z1)]
    full_df = full_df.reset_index()
    full_df.columns = ['ds','y']
    
    #removing outliers
    q50 = full_df.y.median()
    q100 = full_df.y.quantile(1)
    q75  = full_df.y.quantile(.75)
    #print(max(train_df.y))
    if((q100-q50) >= (2*q75)):
     
        full_df.loc[full_df.y>=(2*q75),'y'] = None
         
    #-- Realtime prediction --##
    #model 
    model_r = Prophet(yearly_seasonality=False,changepoint_prior_scale=.2)
    model_r.fit(full_df)
    future_r = model_r.make_future_dataframe(periods=delay,freq='H')
    forecast_r = model_r.predict(future_r)
    forecast_r.index = forecast_r['ds']
    #forecast 
    pred_r = pd.DataFrame(forecast_r['yhat'][len(z1):(len(z1)+delay)])
    pred_r=pred_r.reset_index()
    #--- completes realtime pred ---#
    
    train_end_index=len(z1.bw)-delay
    train_df=z1.bw.iloc[0:train_end_index]
    #train_df= train_df[train_df<cutter]
    
    
    test_df=z1.bw.iloc[train_end_index:len(z1)]
    
    
    
    train_df=train_df.reset_index()
    test_df=test_df.reset_index()
    train_df.columns=['ds','y']
    
    #--- removing outliers in trainset  ---#
    
    q50 = train_df.y.median()
    q100 = train_df.y.quantile(1)
    q75  = train_df.y.quantile(.75)
    #print(max(train_df.y))
    if((q100-q50) >= (2*q75)):
        
        train_df.loc[train_df.y>=(2*q75),'y'] = None
    
    test_df.columns=['ds','y']
    #print('len of testdf = ',len(test_df))
    #model 
    model = Prophet(yearly_seasonality=False,changepoint_prior_scale=.2)
    model.fit(train_df)
    future = model.make_future_dataframe(periods=len(test_df),freq='H')
    forecast = model.predict(future)
    forecast.index = forecast['ds']
    #forecast 
    pred = pd.DataFrame(forecast['yhat'][train_end_index:len(z1)])
    pred=pred.reset_index()
    pred_df=pd.merge(test_df,pred,on='ds',how='left')
    pred_df.dropna(inplace=True)
    
    df=pd.DataFrame()
    
    if(len(pred_df)>0):
        
        pred_df['error_test']=(pred_df.y-pred_df.yhat)
    
        
    
        MSE=mse(pred_df.y,pred_df.yhat)
        RMSE=math.sqrt(MSE)
        
        pred_df['APE']=abs(pred_df.error_test*100/pred_df.y)
        MAPE=pred_df.APE.mean()
        
        #print("MAPE :",MAPE)
        
        q98=pred_df['APE'].quantile(0.98)
        mape_q98=pred_df['APE'][pred_df.APE<pred_df['APE'].quantile(0.98)].mean()

        df = pd.DataFrame({'length':len(z1),#'predicted_t':[forcast_lag],
                             'test_rmse':RMSE,
                             'test_mape':MAPE,
                 #'test_ape_98':q98,
                 'test_mape_98':mape_q98},
                   
                          index=[app_name])

    return(df,model,forecast,pred_df,pred_r)


#### Reading the datas and merging

In [20]:
import datetime
df = sqlContext.read.parquet('appid_datapoint_parquet1')
df2 =  sqlContext.read.parquet('appid_attribute_parquet')
df2 = df2[['attribute_id','source','target_address','location']]

In [21]:
from pyspark.sql.functions import col

df2 = df2.select(col("attribute_id").alias("target_attribute_id"),
                   col("source").alias("source_y"),
                   col("target_address").alias("target_address_y"),
                   col("location").alias("location"), 
                  )
df2.head()

Row(target_attribute_id=219824640, source_y='134.141.121.91', target_address_y='134.141.122.114/maahmed-PC', location='Toronto')

In [22]:
df_merge = df.join(df2,how='left',on='target_attribute_id')
#df_merge.dropna()
#df_merge.head()

In [23]:
t1 = datetime.datetime.now()
data = df_merge.registerTempTable('dummy')
data = sqlContext.sql('select sum(byte_count) as byte_count_sum  , time_stamp, location from dummy group by location, time_stamp')
data = data[data.byte_count_sum > 0]
# data cleaning
bw_df=data.toPandas()
t2 = datetime.datetime.now()
str(t2-t1)

'0:03:10.430196'

In [24]:
bw_df['bw'] = bw_df['byte_count_sum']/(8*3600)
bw_df = bw_df.sort_values(by='location')
dates_outlook = pd.to_datetime(pd.Series(bw_df.time_stamp),unit='ms')
bw_df.index = dates_outlook   
bw_df = bw_df.sort_values(by='time_stamp')

In [26]:
dates_outlook[0:3]

37789   2018-01-02 08:00:00
46282   2017-11-28 04:00:00
58982   2017-12-29 21:00:00
Name: time_stamp, dtype: datetime64[ns]

In [31]:
bw_df.tail()

Unnamed: 0_level_0,byte_count_sum,time_stamp,location,bw
time_stamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2018-02-09 15:00:00,1234845,1518188400000,IT labs,42.87656
2018-02-09 15:00:00,3292110,1518188400000,Madrid,114.3094
2018-02-09 16:00:00,317935069610,1518192000000,,11039410.0
2018-02-09 17:00:00,286295475791,1518195600000,,9940815.0
2018-02-09 18:00:00,275526040185,1518199200000,,9566876.0


In [32]:
len(bw_df.location.unique())

43

In [33]:
def forcomb(l,temp):
    
    temp2 = temp[temp.location==l]
    prophet_future_df = pd.DataFrame()
    prophet_analysis_df = pd.DataFrame()
    prophet_df = pd.DataFrame()
        
    if(len(temp2)>1400):
        
        prophet_analysis_df,p_model,p_forcast,prophet_df,prophet_future_df=(create_prophet_m(l,temp2,24))
    
        prophet_future_df['location']=l
    
        prophet_analysis_df['location'] = l
        
        prophet_df['location'] = l

        
    return  prophet_df, prophet_analysis_df, prophet_future_df

In [36]:
l_list = bw_df.location.unique()
l=l_list[1]
a,b,c = forcomb(l,bw_df)
b

Unnamed: 0,length,test_mape,test_mape_98,test_rmse,location
San Jose,2144,45.712112,43.353237,109151.344707,San Jose


In [57]:
from joblib import Parallel, delayed
# Running for all combiantions

qt1 = datetime.datetime.now()

l_list = bw_df.location.unique()
prophet_df = pd.DataFrame()
prophet_future_df = pd.DataFrame()
prophet_analysis_df = pd.DataFrame()

pool = Parallel(n_jobs=-1,verbose=5,pre_dispatch='all')
r0  = pool(delayed(forcomb)(l,bw_df) for l in l_list) 

qt2 = datetime.datetime.now()
str(qt2-qt1)

[Parallel(n_jobs=-1)]: Done   8 out of  43 | elapsed:  1.2min remaining:  5.1min
[Parallel(n_jobs=-1)]: Done  17 out of  43 | elapsed:  2.2min remaining:  3.4min
[Parallel(n_jobs=-1)]: Done  26 out of  43 | elapsed:  2.4min remaining:  1.6min
[Parallel(n_jobs=-1)]: Done  35 out of  43 | elapsed:  2.5min remaining:   34.5s
[Parallel(n_jobs=-1)]: Done  43 out of  43 | elapsed:  2.8min finished


'0:02:49.784870'

In [58]:
for i in range(0,len(r0)):
    prophet_df = prophet_df.append(r0[i][0])
    prophet_analysis_df = prophet_analysis_df.append(r0[i][1])
    prophet_future_df = prophet_future_df.append(r0[i][2])
  
 

In [59]:
prophet_analysis_df.sort_values(by='test_mape_98')

Unnamed: 0,length,test_mape,test_mape_98,test_rmse,location
Chennai,2170,18.13329,17.155413,114439.8,Chennai
Reading,2170,24.815404,17.693835,3064.75,Reading
Reston,1930,18.548508,18.02416,0.04605776,Reston
IT labs,2170,284.724885,18.329385,954.9686,IT labs
World Wide Router,2168,20.048259,19.322505,105.0413,World Wide Router
RDU Guest Wireless,2144,21.812223,19.682059,29.70031,RDU Guest Wireless
Saudi,2082,76.013393,20.770538,48.62794,Saudi
Other Corporate Labs,2170,23.703101,21.140813,788955.1,Other Corporate Labs
Ultrech,2049,25.168722,21.886115,2.797069,Ultrech
Singapore,2146,25.893555,23.478484,2648.195,Singapore


In [71]:
test  = pd.read_csv('p7/bw_analysis_per_location_data.csv')
test.sort_values(by='test_mape_98')

Unnamed: 0,length,test_mape,test_mape_98,test_rmse,location
23,2170,18.13329,17.155413,114439.8,Chennai
28,2170,24.815404,17.693835,3064.75,Reading
19,1930,18.548508,18.02416,0.04605776,Reston
12,2170,284.724885,18.329385,954.9686,IT labs
3,2168,20.048259,19.322505,105.0413,World Wide Router
29,2144,21.812223,19.682059,29.70031,RDU Guest Wireless
32,2082,76.013393,20.770538,48.62794,Saudi
24,2170,23.703101,21.140813,788955.1,Other Corporate Labs
8,2049,25.168722,21.886115,2.797069,Ultrech
9,2146,25.893555,23.478484,2648.195,Singapore


In [60]:
import numpy as np
import plotly.offline as py
import plotly.graph_objs as go
py.init_notebook_mode(connected=True)

def plot_test(x,y,title1):
    #label = data.index[0:len(compare_train)]
    label1=x
    #py.init_notebook_mode
        

    # Create a trace
    
    trace1 = go.Scatter(
        x = label1,
        y = np.array(y),
        name ='Actual',
        mode = 'lines+markers'
    
        )


    # Edit the layout
    layout = dict(title = title1,
              xaxis = dict(title = 'Date'),
              yaxis = dict(title = 'app_count'),
              )
    

    data = [trace1]
    fig = dict(data=data, layout=layout)
    fig =go.Figure(data=data, layout=layout)
    py.iplot(fig, filename='basic-line')
    #py.iplot(fig, filename='jupyter/basic_bar')
  

In [61]:
def plot_test1(x,y,y2,title1):
    #label = data.index[0:len(compare_train)]
    label1=x
    #py.init_notebook_mode
        

    # Create a trace
    
    trace1 = go.Scatter(
        x = label1,
        y = np.array(y),
        name ='Actual',
        mode = 'lines+markers'
    
        )
    
    trace2 = go.Scatter(
        x = label1,
        y = np.array(y2),
        name ='Predicted',
        mode = 'lines+markers'
    
        )



    # Edit the layout
    layout = dict(title = title1,
              xaxis = dict(title = 'Date'),
              yaxis = dict(title = 'app_count'),
              )
    

    data = [trace1,trace2]
    fig = dict(data=data, layout=layout)
    fig =go.Figure(data=data, layout=layout)
    py.iplot(fig, filename='basic-line')

In [62]:
l = 'Sflow Salem test'
bw_df_1s = bw_df[bw_df.location==l]
plot_test(bw_df_1s.index,bw_df_1s.bw,l)

In [63]:
tt = prophet_df[prophet_df.location==l]
plot_test1(tt.ds,tt.y,tt.yhat,l)

In [64]:
l = 'Madrid'
bw_df_1s = bw_df[bw_df.location==l]
plot_test(bw_df_1s.index,bw_df_1s.bw,l)

In [65]:
tt = prophet_df[prophet_df.location==l]
plot_test1(tt.ds,tt.y,tt.yhat,l)