In [0]:
from pyspark.sql.functions import col
from pyspark.sql.functions import count
import pandas as pd
import numpy as np

import matplotlib.pylab as plt
%matplotlib inline

import seaborn as sns
from sklearn.model_selection  import train_test_split
from sklearn.cluster import KMeans

from scipy.stats import zscore
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
#from pyspark.sql.functions import col

def get_date_int(in_date):
    return int(str(in_date).split('-')[2])
  

def get_date_text(in_date):
    return str(in_date)

def run_liner_regression(train_pdf,train_pdf_work,test_pdf,process_type,state_numeric,state_alpha,file,run_nbr):
    train_pdf['date_int'] = train_pdf.daily_rep_pd_date.apply(get_date_int)
    test_pdf['date_int'] = test_pdf.daily_rep_pd_date.apply(get_date_int)
    train_pdf_work['date_int'] = train_pdf_work.daily_rep_pd_date.apply(get_date_int)

    x_train = train_pdf_work.copy()
    x_test = test_pdf.copy()
    y_train = x_train[['errs']]
    y_test = x_test[['errs']]

    #Drop errors from the x dataset
    x_train=x_train.drop('errs',axis=1)
    x_test=x_test.drop('errs',axis=1)
    x_train=x_train.drop('daily_rep_pd_date',axis=1)
    x_test=x_test.drop('daily_rep_pd_date',axis=1)

    #Fit Linear model
    regression_model = LinearRegression()
    regression_model.fit(x_train,y_train)

    #Model Scores
    model_score_train = regression_model.score(x_train,y_train)
    #print("Model Score on Train set : " + str(model_score_train) )

    model_score_test = regression_model.score(x_test,y_test)
    #print("Model Score on Test set : " + str(model_score_test) )

    y_predict = pd.DataFrame(regression_model.predict(x_test))
    y_predict.rename(columns = {0:'errs_predict'}, inplace = True)

    lr_df = pd.concat([test_pdf,y_predict], axis=1, join='inner')
    
    lr_df['file_type'] = file
    lr_df['state_code_numeric'] = state_numeric
    lr_df['state_code_alpha'] = state_alpha
    lr_df['date_text'] = lr_df.daily_rep_pd_date.apply(get_date_text)

    lr_df['model_score_train'] = str(model_score_train)[:4]
    lr_df['model_score_validate'] = ' ' 
    lr_df['model_score_test'] = str(model_score_test)[:4]

    lr_df['model_parent_process'] = 'Python ML'
    lr_df['model_name'] = 'Linear regression'
    lr_df['score_type'] = 'R2'
    lr_df['errs_predict'] = lr_df['errs_predict'].astype('int64')

    lr_df["best_model_url"] = ''
    lr_df["experimant_info"] = ''
    lr_df["model_run_num"] = run_nbr
    lr_df['model_run_type'] = 'C - LR - Single Model'
    lr_df['comments'] = process_type
    lr_df['source_data_table_name'] = ' '
    
    lr_df['errors_actual_previous_mo'] = 0
    lr_df['mo_to_mo_difference'] = 0
    #lr_df['errors_predicted_current_mo'] = 0
    lr_df['errors_actual_previous_mo'] = 0
    lr_df['errors_actual_previous_mo'] = lr_df['errors_actual_previous_mo'].astype('int64')
    lr_df['actual_to_predicted_difference'] = 0
    lr_df['source_notebook_url'] = ' '
    lr_df['actual_to_predicted_pct'] = 0
    lr_df['experiment_info'] = ' '
    lr_df['mo_to_mo_pct'] = 0
    #lr_df['errors_actual_current_mo'] = 0
    lr_df.rename(columns={'errs': 'errors_actual_current_mo', 'errs_predict': 'errors_predicted_current_mo'}, inplace=True)
    lr_df['model_score_forecast'] = 0
    lr_df['absolute_difference'] = 0
    
    return_df=lr_df[['model_run_type','model_run_num','state_code_numeric','state_code_alpha','daily_rep_pd_date','date_text','date_int','file_type','errors_actual_previous_mo','errors_actual_current_mo','mo_to_mo_difference',	'mo_to_mo_pct','errors_predicted_current_mo','actual_to_predicted_difference','absolute_difference','actual_to_predicted_pct','model_parent_process','model_name','score_type','model_score_forecast','model_score_train','model_score_validate','model_score_test','source_notebook_url','best_model_url','experiment_info','source_data_table_name','comments']]
    
    spark_return_df = spark.createDataFrame(return_df) 
    spark_return_df.write.saveAsTable("datalab_scratch.lr_test_forecast_1_kv",mode="append")
    
    train_pdf['model_run_type'] = 'C - LR - Single Model'
    train_pdf['file_type'] = file
    train_pdf['state_code_numeric'] = state_numeric
    train_pdf['state_code_alpha'] = state_alpha
    train_pdf['comments'] = process_type
    train_pdf['model_run_num'] = run_nbr
    train_pdf['errs'] = train_pdf['errs'].astype('int64')
    
    return_train_pdf = train_pdf[['model_run_type','model_run_num','state_code_numeric','state_code_alpha','daily_rep_pd_date','date_int','file_type','comments','errs']]
    spark_train_pdf = spark.createDataFrame(return_train_pdf) 
    spark_train_pdf.write.saveAsTable("datalab_scratch.lr_train_forecast_1_kv",mode="append")
    
def get_train_and_test_df(table_name):
    query_1 = "select * from " + table_name + " where daily_rep_pd_date >= '2022-01-01' AND daily_rep_pd_date <= '2022-01-18'"
    query_2 = "select * from " + table_name + " where daily_rep_pd_date >= '2022-01-19'"
    
    train_df = spark.sql(query_1)
    train_pdf = train_df.select("*").toPandas()
    
    test_df = spark.sql(query_2)
    test_pdf = test_df.select("*").toPandas()
    
    return train_pdf, test_pdf

def treat_outliers(df, col):
    """
    treats outliers in a variable
    col: str, name of the numerical variable
    df: dataframe
    col: name of the column
    """
    Q1 = df[col].quantile(0.25)  # 25th quantile
    Q3 = df[col].quantile(0.75)  # 75th quantile
    IQR = Q3 - Q1                # Inter Quantile Range (75th perentile - 25th percentile)
    lower_whisker = Q1 - 1.5 * IQR
    upper_whisker = Q3 + 1.5 * IQR

    # all the values smaller than lower_whisker will be assigned the value of lower_whisker
    # all the values greater than upper_whisker will be assigned the value of upper_whisker
    # the assignment will be done by using the clip function of NumPy
    df[col] = np.clip(df[col], lower_whisker, upper_whisker)

    return df

#drop result table

try:
  spark.sql("drop table datalab_scratch.lr_train_forecast_1_kv ")
except Exception as e:
  print(f"Table does not esist")
  print(e) 

try:
  spark.sql("drop table datalab_scratch.lr_test_forecast_1_kv ")
except Exception as e:
  print(f"Table does not esist")
  print(e) 

state_df_spark = spark.sql("select * from datalab_scratch.state_codes where state_code_numeric in ('29', '44') order by State_Code_Numeric")

state_df = state_df_spark.toPandas()

file_list = ['ELG','PRV','MCR','TPL','CIP','COT','CLT','CRX']

process_type_list = ['Features/Processing: submission_methods','Features/Processing: submission_methods_with_outlier_treatment','Features/Processing: error_category','Features/Processing: error_category_with_outlier_treatment']
run_nbr = 0

      
for process_type in process_type_list:
    print(process_type)
    run_nbr = run_nbr + 1
    for index, row in state_df.iterrows():
        state_numeric = row.State_Code_Numeric
        state_alpha = row.State_Code_Alpha
        print(state_numeric)
        print(state_alpha)
        for file in file_list:
            print(file)
            if 'submission_methods' in process_type:
               table_name = "datalab_scratch.daily_" + state_numeric + "_" + state_alpha + "_" + file + "_ready_ml_input_df_ss"
            else:
               table_name = "datalab_scratch.daily_" + state_numeric + "_" + state_alpha + "_" + file + "_ready_ml_input_df"
            
            train_pdf, test_pdf = get_train_and_test_df(table_name)
            train_pdf_work = train_pdf.copy()

            
            if 'outlier' in process_type:
               train_pdf_work = treat_outliers(train_pdf_work,'errs')
              
            if train_pdf.empty or test_pdf.empty:
               print("No Data for the state " + state_alpha + " and file type " + file )
            else:
               run_liner_regression(train_pdf,train_pdf_work,test_pdf,process_type,state_numeric,state_alpha,file,run_nbr)
   



In [0]:
%sql
create or replace table datalab_scratch.lr_test_forecast_2_kv using delta
as
select model_run_type
     , model_run_num
     , state_code_numeric
     , state_code_alpha
     , daily_rep_pd_date
     , date_text
     , date_int
     , file_type
     , sum(errors_actual_previous_mo) as errors_actual_previous_mo
     , sum(errors_actual_current_mo) as errors_actual_current_mo
     , sum(mo_to_mo_difference) as mo_to_mo_difference
     , sum(mo_to_mo_pct) as mo_to_mo_pct
     , sum(errors_predicted_current_mo) as errors_predicted_current_mo
     , sum(actual_to_predicted_difference) as actual_to_predicted_difference
     , sum(absolute_difference) as absolute_difference
     , sum(actual_to_predicted_pct) as actual_to_predicted_pct
     , model_parent_process
     , model_name
     , score_type
     , model_score_forecast
     , model_score_train
     , model_score_validate
     , model_score_test
     , source_notebook_url
     , best_model_url
     , experiment_info
     , source_data_table_name
     , comments
  from datalab_scratch.lr_test_forecast_1_kv
-- where model_run_num = 3
--   and state_code_numeric = '44'
--   and file_type = 'COT'
 --  and date_int = 19
 group by model_run_type
        , model_run_num
        , state_code_numeric
        , state_code_alpha
        , daily_rep_pd_date
        , date_text
        , date_int
        , file_type
        , model_parent_process
        , model_name
        , score_type
        , model_score_forecast
        , model_score_train
        , model_score_validate
        , model_score_test
        , source_notebook_url
        , best_model_url
        , experiment_info
        , source_data_table_name
        , comments
 order by model_run_num
        , state_code_numeric
        , state_code_alpha
        , file_type
        , date_int

In [0]:
%sql select * from datalab_scratch.lr_test_forecast_2_kv

In [0]:
%sql
create or replace table datalab_scratch.lr_train_forecast_2_kv using delta
as
select model_run_type
     , model_run_num
     , state_code_numeric
     , state_code_alpha
     , daily_rep_pd_date
     , date_int
     , file_type
     , comments
     , sum(errs) as errs
  from datalab_scratch.lr_train_forecast_1_kv
-- where model_run_num = 3
--   and state_code_numeric = '44'
--   and file_type = 'COT'
--   and date_int = 18
 group by model_run_type
        , model_run_num
        , state_code_numeric
        , state_code_alpha
        , daily_rep_pd_date
        , date_int
        , file_type
        , comments
  order by model_run_num
        , state_code_numeric
        , state_code_alpha
        , file_type
        , date_int

In [0]:
%sql select * from datalab_scratch.lr_train_forecast_2_kv

In [0]:
import pandas as pd
import numpy as np

final_df = pd.DataFrame(columns=['model_run_type','model_run_num','state_code_numeric','state_code_alpha','daily_rep_pd_date','date_text','date_int','file_type','errors_actual_previous_mo','errors_actual_current_mo','mo_to_mo_difference',	'mo_to_mo_pct','errors_predicted_current_mo','actual_to_predicted_difference','absolute_difference','actual_to_predicted_pct','model_parent_process','model_name','score_type','model_score_forecast','model_score_train','model_score_validate','model_score_test','source_notebook_url','best_model_url','experiment_info','source_data_table_name','comments'])


try:
  spark.sql("drop table datalab_scratch.lr_final_forecast_kv")
except Exception as e:
#except:
  print(f"Table does not esist")
  print(e) 

test_df_spark = spark.sql("select * from datalab_scratch.lr_test_forecast_2_kv ")
test_df = test_df_spark.toPandas()

train_df_spark = spark.sql("select * from datalab_scratch.lr_train_forecast_2_kv ")
train_df = train_df_spark.toPandas()


for index, row in test_df.iterrows():
    previous_month = row.date_int - 1
    
    if row.date_int == 19:
       get_previous_month_errors = train_df.loc[(train_df['state_code_numeric'] == row.state_code_numeric) & (train_df['model_run_num'] == row.model_run_num) & (train_df['file_type'] == row.file_type) & (train_df['date_int'] == 18) ]['errs'].values[0]
    else:
       get_previous_month_errors = test_df.loc[(test_df['state_code_numeric'] == row.state_code_numeric) & (test_df['model_run_num'] == row.model_run_num) & (test_df['file_type'] == row.file_type) & (test_df['date_int'] == previous_month) ]['errors_actual_current_mo'].values[0]
    
    month_to_month_difference = row.errors_actual_current_mo - get_previous_month_errors
    month_to_month_pct = float(month_to_month_difference/get_previous_month_errors)
    
    actual_to_predicted_difference_errors = row.errors_predicted_current_mo - row.errors_actual_current_mo
    absolute_diff = abs(actual_to_predicted_difference_errors)
    actual_to_predic_pct = float(actual_to_predicted_difference_errors/row.errors_actual_current_mo)
    
    
    final_df = final_df.append({'model_run_type':row.model_run_type,'model_run_num':row.model_run_num,'state_code_numeric':row.state_code_numeric,'state_code_alpha':row.state_code_alpha,'daily_rep_pd_date':row.daily_rep_pd_date,'date_text':row.date_text,'date_int':row.date_int,'file_type':row.file_type,'errors_actual_previous_mo':get_previous_month_errors,'errors_actual_current_mo':row.errors_actual_current_mo,'mo_to_mo_difference':month_to_month_difference,	'mo_to_mo_pct':month_to_month_pct,'errors_predicted_current_mo':row.errors_predicted_current_mo,'actual_to_predicted_difference':actual_to_predicted_difference_errors,'absolute_difference':absolute_diff,'actual_to_predicted_pct':actual_to_predic_pct,'model_parent_process':row.model_parent_process,'model_name':row.model_name,'score_type':row.score_type,'model_score_forecast':row.model_score_forecast,'model_score_train':row.model_score_train,'model_score_validate':row.model_score_validate,'model_score_test':row.model_score_test,'source_notebook_url':row.source_notebook_url,'best_model_url':row.best_model_url,'experiment_info':row.experiment_info,'source_data_table_name':row.source_data_table_name,'comments':row.comments},ignore_index=True)

print(final_df)
spark_final_df = spark.createDataFrame(final_df) 
spark_final_df.write.saveAsTable("datalab_scratch.lr_final_forecast_kv",mode="overwrite")
    


In [0]:
%sql
select * from datalab_scratch.lr_final_forecast_kv
 where state_code_numeric = '29' 
 order by file_type,date_int, model_run_num

In [0]:
%sql
update datalab_scratch.lr_final_forecast_kv
   set source_data_table_name = 'datalab_scratch.lr_final_forecast_kv'
   

In [0]:
%sql
update datalab_scratch.lr_final_forecast_kv
   set source_notebook_url = 'https://databricks-val-data.macbisdw.cmscloud.local/#notebook/2783134/command/2830675'

In [0]:
%sql
select * from datalab_scratch.lr_final_forecast_kv
 where model_run_num = 1
   and state_code_numeric = 44
 order by file_type,date_int