# Automobile Dataset Description

#### This dataset consist of data From 1985 Ward's Automotive Yearbook.

#### This data set consists of three types of entities: 
 - (a) The specification of an auto in terms of various characteristics
 - (b) Its assigned insurance risk rating
 - (c) Its normalized losses in use as compared to other cars. 

#### The second rating corresponds to the degree to which the auto is more risky than its price indicates. Cars are initially assigned a risk factor symbol associated with its price. Then if it is more risky (or less), this symbol is adjusted by moving it up (or down) the scale. Actuarians call this process "symboling". A value of +3 indicates that the auto is risky, -3 that it is probably pretty safe.

#### The third factor is the relative averages loss payment per insured vehicle year. This value is normalized for all autos within a particular size classification (two-door small, station wagons, sports/speciality, etc...), and represents the averages loss per car per year.

## Importing the Libraries

In [1]:
# # !pip install CUDA 
# !conda install tensorflow-gpu cudatoolkit=10.1
# !sudo apt-get install libcudart10.1
#!pip install alibi

In [2]:
import numpy as np
import pandas as pd
import warnings
warnings.filterwarnings('ignore')
#import seaborn as sns
#from matplotlib import pyplot as plt
#from matplotlib import style

from sklearn import linear_model
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import Perceptron
from sklearn.linear_model import SGDClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.naive_bayes import GaussianNB
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
#from alibi.explainers import AnchorTabular
import joblib
from pyspark.sql import SparkSession
# from retrieve_minio import *
import uuid
import os
import shutil
import pickle
from datetime import datetime
# from numba import jit, cuda

# from minio import Minio
# from minio.error import ResponseError

from subprocess import run, Popen, PIPE
#from ctypes import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import DataFrame, SparkSession, Window
from pyspark.sql.functions import col, expr, monotonically_increasing_id, row_number,current_timestamp
# from data_io import *
import pandas as pd
import numpy as np
from typing import  List
from datetime import datetime
#from minio import Minio
#from minio.error import ResponseError
import uuid
import os
import shutil

In [3]:
MINIO_HOST="minio-service.kubeflow:9000"
MINIO_ACCESS_KEY="minio"
MINIO_SECRET_KEY="minio123"
MINIO_MODEL_BUCKET="seldon"

DEPLOY_NAMESPACE="kubeflow"

EVENT_TIMESTAMP_ALIAS = "event_timestamp"
CREATED_TIMESTAMP_ALIAS = "created_timestamp"

STORAGEACCOUNTNAME= "katonicusecases"
STORAGEACCOUNTKEY= "kxGVhR3tKmJoNdEFbhauyHOvBaNMEJR8/uIH+4NKX9QLbHEsEhmo5YQmuiUmSaW2g/96Fq3RrV9f3FeMyizzgg=="    
CONTAINERNAME= "modelbuilding"
BLOBNAME= "automobile_data.csv"

In [4]:
def save_to_feature_store(spark,pandas_df,feaurestore_name,STORAGEACCOUNTNAME,CONTAINERNAME):

    pandas_df['Unique_id']=np.random.choice(len(pandas_df), size=len(pandas_df), replace=False)
    pandas_df['event_timestamp']=pd.to_datetime(datetime.now())
    pandas_df['created_timestamp']=pd.to_datetime(datetime.now())
    
    df_var = spark.createDataFrame(pandas_df)
    
    op_path = "stagging" + "/" + feaurestore_name

    op_path = "wasbs://" + CONTAINERNAME + "@" + STORAGEACCOUNTNAME + ".blob.core.windows.net/"+op_path
    
    df_var.write.mode("overwrite").parquet(op_path)
    


def create_entity_df(spark,unique_id,feaurestore_name,STORAGEACCOUNTNAME,CONTAINERNAME):
    
    
    op_path = "stagging" + "/" + feaurestore_name

    op_path = "wasbs://" + CONTAINERNAME + "@" + STORAGEACCOUNTNAME + ".blob.core.windows.net/"+op_path
    
    df_var = spark.read.parquet(op_path)
    
    
    entity_df= df_var.select(unique_id).withColumn('event_timestamp',current_timestamp())
    
    return entity_df

def fetch_df(spark,feaurestore_name,STORAGEACCOUNTNAME,CONTAINERNAME):
    
    #os.mkdir('fs_logs')

    op_path = "stagging" + "/" + feaurestore_name

    op_path = "wasbs://" + CONTAINERNAME + "@" + STORAGEACCOUNTNAME + ".blob.core.windows.net/"+op_path
    
    df_var = spark.read.parquet(op_path)
    
    #df_var = spark.createDataFrame(data)
    
    #shutil.rmtree('fs_logs')
    return df_var

def create_df_feature(spark,path_dict,feature_dict,STORAGEACCOUNTNAME,CONTAINERNAME):
    df_list = []
    f_list  = []

    for fs,obj,features in zip(path_dict.keys(),path_dict.values(),feature_dict.values()):
        df = fetch_df(spark,fs,obj,STORAGEACCOUNTNAME,CONTAINERNAME)
        df_list.append(df)
        f_list.append(features)
    return [df_list,f_list]

def as_of_join(
    entity_df: DataFrame,
    feature_table_entity_names : list,
    feature_table_df : DataFrame,
    feature_list : list,
    feature_table_name : str,
    max_age = [],
    entity_event_timestamp_column = 'event_timestamp'

    ) -> DataFrame:
    #print (feature_list)
    #print(type(feature_list))
    feature_table_df = feature_table_df.select(feature_list+[EVENT_TIMESTAMP_ALIAS,CREATED_TIMESTAMP_ALIAS,feature_table_entity_names[0]])
    entity_with_id = entity_df.withColumn("_row_nr", monotonically_increasing_id())
    feature_event_timestamp_column_with_prefix = (
        f"{feature_table_name}__{EVENT_TIMESTAMP_ALIAS}"
        )
    feature_created_timestamp_column_with_prefix = (
        f"{feature_table_name}__{CREATED_TIMESTAMP_ALIAS}"
        )

    projection = [
        col(col_name).alias(f"{feature_table_name}__{col_name}")
        for col_name in feature_table_df.columns
        ]

    aliased_feature_table_df = feature_table_df.select(projection)
    
    join_cond = (
    entity_with_id[entity_event_timestamp_column]
        >= aliased_feature_table_df[feature_event_timestamp_column_with_prefix]
    )
    if max_age:
        join_cond = join_cond & (
        aliased_feature_table_df[feature_event_timestamp_column_with_prefix]
        >= entity_with_id[entity_event_timestamp_column]
        - expr(f"INTERVAL {max_age[0]} seconds")
        )
    for key in feature_table_entity_names:
        join_cond = join_cond & (
        entity_with_id[key]
        == aliased_feature_table_df[f"{feature_table_name}__{key}"]
        )
    conditional_join = entity_with_id.join(
        aliased_feature_table_df, join_cond, "leftOuter"
        )
    for key in feature_table_entity_names:
        conditional_join = conditional_join.drop(
        aliased_feature_table_df[f"{feature_table_name}__{key}"]
        )
    window = Window.partitionBy("_row_nr", *feature_table_entity_names).orderBy(
        col(feature_event_timestamp_column_with_prefix).desc(),
        col(feature_created_timestamp_column_with_prefix).desc(),
        )
    filter_most_recent_feature_timestamp = conditional_join.withColumn(
        "_rank", row_number().over(window)
        ).filter(col("_rank") == 1)
    return filter_most_recent_feature_timestamp.select(
        entity_df.columns
        + [
            f"{feature_table_name}__{feature}"
            for feature in feature_list
        ]
    )
    

def retrieve_feature(
    entity_df: DataFrame,
    feature_table_dfs:List[DataFrame],
    feature_lists :List[list],
    feature_table_names:list,
    feature_table_entity_names : List[str],
    max_age=[],
    entity_event_timestamp_column='event_timestamp',
    ) -> DataFrame :
    
    joined_df = entity_df

    for (feature_table_df, feature_list,feature_table_name) in zip(feature_table_dfs, feature_lists,feature_table_names ):
            joined_df = as_of_join(
                joined_df, feature_table_entity_names,feature_table_df, feature_list,feature_table_name,
            max_age = max_age,
        entity_event_timestamp_column = entity_event_timestamp_column)
    
    return joined_df




In [5]:
#Reading the modified data 
automobile = pd.read_csv(r'/mnt/automobile.csv')

In [6]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.hadoop:hadoop-azure:2.7.3,com.microsoft.azure:azure-storage:2.2.0,org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell'
spark =  SparkSession.builder.getOrCreate()
spark._jsc.hadoopConfiguration().set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
spark.conf.set("fs.wasbs.impl","org.apache.hadoop.fs.azure.NativeAzureFileSystem")
spark.conf.set('fs.azure.account.key.' + STORAGEACCOUNTNAME + '.blob.core.windows.net', STORAGEACCOUNTKEY)

In [7]:
save_to_feature_store(spark,automobile,"featurestore", STORAGEACCOUNTNAME,CONTAINERNAME)