In [30]:
from pyspark.sql.functions import col,to_timestamp,concat,lit,year,month,hour,to_date,weekofyear,date_format,when,rank,collect_list,udf,explode
from pyspark.sql.window import Window
from pyspark.sql.types import StringType, DoubleType,FloatType,IntegerType,ArrayType,StructField,StructType
from pyspark.sql import SparkSession
from configparser import ConfigParser, ExtendedInterpolation
from datetime import datetime
from dateutil import relativedelta
from sklearn.linear_model import LinearRegression
import stldecompose
from stldecompose.forecast_funcs import drift,mean
from statsmodels.tsa.stattools import adfuller
from pandas import read_csv
from pandas import datetime
from pandas import *
from matplotlib import pyplot
from pandas.tools.plotting import autocorrelation_plot
from statsmodels.tsa.arima_model import ARIMA
import statsmodels.api as sm
from sklearn.metrics import mean_squared_error
import math
import numpy
import warnings

import pandas as pd
import numpy as np 
%autosave 0

# Initialize spark session
spark = SparkSession.builder.appName("prediction_4g_hw").getOrCreate()

pg_jdbc_conf = {'url':'jdbc:postgresql://10.55.56.46/myanmar?user=postgres&password=password123',
               'daily_bh':'oss.nokia_4g_cell_master_ml_daily',
               'weekly_bh':'oss.arima_test',
                'weekly_timestamp':'oss.weekly_timestamp',
               'predict':'oss.nokia_4g_monthly_prediction_arima'}


cell_col = 'cell_name'
week_number_col = 'week_number'
years_col = 'years'
measurement_col = 'prb'
date_col = 'date'
date_col1 = 'date1'


def evaluate_arima_model(X, arima_order):
	# prepare training dataset
	train_size = int(len(X) * 0.66)
	train, test = X[0:train_size], X[train_size:]
	history = [x for x in train]
	# make predictions
	predictions = list()
	for t in range(len(test)):
		model = ARIMA(history, order=arima_order)
		model_fit = model.fit(disp=0)
		yhat = model_fit.forecast()[0]
		predictions.append(yhat)
		history.append(test[t])
	# calculate out of sample error
	error = mean_squared_error(test, predictions)
	return error
 
# evaluate combinations of p, d and q values for an ARIMA model
def evaluate_models(dataset, p_values, d_values, q_values):
    dataset = dataset.astype('float32')
    best_score, best_cfg = float("inf"), None
    for p in p_values:
        for d in d_values:
            for q in q_values:
                order = (p,d,q)
                try:
                    mse = evaluate_arima_model(dataset, order)
                    if mse < best_score:
                        best_score, best_cfg = mse, order
                    print('ARIMA%s MSE=%.3f' % (order,mse))
                except:
                    continue
    print('Best ARIMA%s MSE=%.3f' % (best_cfg, best_score))
    return best_cfg


def get_predictions(df,steps,prediction_period):
    try:
        size = int(len(df) * 0.66)
        train, test = df[0:size], df[size:len(df)]
        # load dataset
        # evaluate parameters
        p_values = [0, 1, 2]
        d_values = range(0, 3)
        q_values = range(0, 5)
        warnings.filterwarnings("ignore")
        best_cfg = evaluate_models(train.values, p_values, d_values, q_values)

        X = df.values
        history = [x for x in df.values]
        initial_length = len(history)
        predictions = list()
        for t in range(prediction_period):
            if best_cfg == None:
                history.append(history[-1])
                predictions.append(history[-1])
            else:
            #model = ARIMA(history, order=(0,0,0))
                model = ARIMA(history, order=best_cfg)
                model_fit = model.fit()
                output = model_fit.forecast(steps=steps)
                yhat = output[0]
                predictions.append(yhat)
                #obs = test[t]
                history.append(output[0][0])
        result = []
        for i in range(initial_length,len(history)):

            result.append(float(history[i]))
        return result
    except:
        return [0,0,0,0,0,0,0,0,0,0,0,0]
            
        

def predict_arima_func(ts,predict_weeks=6):
    
#     return [pd.date_range(start=datetime.now(),periods=12,freq='M').map(lambda x:str(x.date())).tolist(),predict.tolist()]
    result = get_predictions(ts,1,predict_weeks)
#     for element in result:
#         element = float(element)
    
    return [pd.date_range(start='2019-01-07',periods=6,freq='w').map(lambda x:str(x.date())).tolist(),result]
# .map(lambda x: [float(e) for e in x])]
   

def predict_linear_func(ts,predict_months=12):
    X = np.arange(ts.shape[0]).reshape(-1,1)
    y = ts.values
    
    lr = LinearRegression()
    lr.fit(X,y)

    predict = lr.predict(np.arange(X[-1]+1,X[-1]+predict_months+1).reshape(-1,1))  
    return [pd.date_range(start=datetime.now(),periods=12,freq='M').map(lambda x:str(x.date())).tolist(),predict.tolist()]


def predict_py(years,week_number,prb):
    
    df = pd.DataFrame([week_number,years,prb])
    df = df.T
    df = df.rename(columns={0:'week',1:'year',2:'prb'})
    df = df.sort_values(by=['year','week'])
    df = df.set_index('week')
    
    return predict_linear_func(df.iloc[:,1])

def predict_arima(cell_name,date, prb):
    

    df = pd.DataFrame(prb,index=date,columns=['prb'])
    df.index = df.index.map(pd.to_datetime)
    df = df.sort_index()
    
#     with open('error_cell.txt','a') as f:
        
#         f.write('\n')
#         f.write(cell_name)
#     df.to_pickle('error-cell.pkl')
    df_return = predict_arima_func(df.iloc[:,0])
#     if len(df_return) == 0:
#         return [[pd.date_range(start=datetime.now(),periods=12,freq='M').map(lambda x:str(x.date())).tolist()],[0,0,0,0,0,0,0,0,0,0,0,0]]
#     else:
    return predict_arima_func(df.iloc[:,0])
    
# UDF creation
udf_predict_py = udf(predict_py,ArrayType(ArrayType(StringType())))
udf_predict_arima = udf(predict_arima,ArrayType(ArrayType(StringType())))

zip_ = udf(
  lambda x, y: list(zip(x, y)),
  ArrayType(StructType([
      # Adjust types to reflect data types
      StructField("date", StringType()),
      StructField("predict_prb", StringType())
  ]))
)

# zip_ = udf(
#   lambda x, y: list(zip(x, y)),
#   ArrayType(StructType([
#       Adjust types to reflect data types
#       StructField("date", StringType()),
#       StructField("predict_prb", StringType())
#   ]))
# )


Autosave disabled


In [31]:
df_timestamp = spark.read.format('jdbc').options(url=pg_jdbc_conf['url'],
                           dbtable=pg_jdbc_conf['weekly_timestamp']).load()
df = spark.read.format('jdbc').options(url=pg_jdbc_conf['url'],
                           dbtable=pg_jdbc_conf['weekly_bh']).load()

df = df.join(df_timestamp, ['week_number'], "outer")
df = df.withColumn('date',col('date').cast(StringType()))
df = df.withColumn('date1', col('date'))

# df.head(5)


df = df.groupBy(cell_col).agg({week_number_col:'collect_list',
                            years_col:'collect_list',
                            date_col:'collect_list',
                            date_col1:'collect_list',
                            measurement_col:'collect_list'})

df = df.withColumnRenamed('collect_list({})'.format(measurement_col),measurement_col).withColumnRenamed('collect_list({})'.format(week_number_col),week_number_col).\
    withColumnRenamed('collect_list({})'.format(years_col),years_col)


df = df.withColumnRenamed('collect_list({})'.format(date_col),date_col)
df = df.withColumnRenamed('collect_list({})'.format(date_col1),date_col1)

# df = df.filter(col('cell_name') == 'BAL5396_22')




df = df.withColumn('predict_list',udf_predict_arima(col('cell_name'),col('date'), col('prb')))
# df = df.withColumn('predict_list',udf_predict_py(col('years'),col('week_number'),col('prb')))
df = df.withColumn('date',col('predict_list')[0]).withColumn('predict_prb',col('predict_list')[1])
df = df.select([cell_col,'date','predict_prb'])

df = df.withColumn('temp',zip_(col('date'),col('predict_prb')))
df = df.withColumn('temp',explode('temp'))
df = df.select(cell_col,col('temp.date'),col('temp.predict_prb'))


df = df.withColumn('date',to_date(col('date'))).withColumn('predict_prb',col('predict_prb').cast(FloatType()))
df = df.select('cell_name','predict_prb')

df.write.format('jdbc').options(url=pg_jdbc_conf['url'],
                           dbtable='{}'.format(pg_jdbc_conf['predict'])).mode('overwrite').save()

In [5]:
df.show(6)

+----------+-----------+
| cell_name|predict_prb|
+----------+-----------+
|BAL5008_23|   3.160381|
|BAL5008_23|  2.9987497|
|BAL5008_23|  2.9311094|
|BAL5008_23|   2.902451|
|BAL5008_23|  2.8901658|
|BAL5008_23|  2.8848317|
+----------+-----------+
only showing top 6 rows



In [18]:
error_cell = pd.read_pickle('error-cell.pkl')

In [19]:
predict_arima_func(error_cell.iloc[:,0])

Best ARIMANone MSE=inf


[['2019-01-13',
  '2019-01-20',
  '2019-01-27',
  '2019-02-03',
  '2019-02-10',
  '2019-02-17'],
 [12.403666496276855,
  12.403666496276855,
  12.403666496276855,
  12.403666496276855,
  12.403666496276855,
  12.403666496276855]]