In [None]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler 
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, DecisionTreeRegressor, GBTRegressor 
from pyspark.ml.evaluation import RegressionEvaluator
from sklearn.model_selection import train_test_split

from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

ModuleNotFoundError: ignored

In [None]:
def prepro(data):
    
    data['StateHoliday'] = data.StateHoliday.astype('category')
    tempdf = pd.get_dummies(data['StateHoliday'])
    hol = ['0 holiday', 'a holiday', 'b holiday', 'c holiday']
    temp0 = ['0', 'a', 'b', 'c']
    for h in range(len(hol)):
        data[hol[h]] = tempdf[temp0[h]]
    data = data[data.Open == 1].copy()

def datesplitandcomdura(data):
    
    data["Date"] = pd.to_datetime(data["Date"])
    data["Year"] = data["Date"].dt.year
    data["Month"] = data["Date"].dt.month
    data["Day"] = data["Date"].dt.day
    data["WeekOfYear"] = data["Date"].dt.isocalendar().week
    data['CompetitionOpenTime'] = 12 * (data.Year - data.CompetitionOpenSinceYear) \
                                  + (data.Month - data.CompetitionOpenSinceMonth)
    data['CompetitionOpenTime'] = data['CompetitionOpenTime'].apply(lambda x: 0 if x < 0 else x).fillna(0)

def nalfil(data):
    
    data['Promo'] = data['Promo'].apply(lambda x: 0 if x < 0 else x).fillna(0)
    data['Promo2'] = data['Promo2'].apply(lambda x: 0 if x < 0 else x).fillna(0)
    data['PromoInterval'] = data['PromoInterval'].fillna(0)

def check_promo_month(row):  # check if promo is given in the particular month
    
    month2str = {1: 'Jan', 2: 'Feb', 3: 'Mar', 4: 'Apr', 5: 'May', 6: 'Jun',
                 7: 'Jul', 8: 'Aug', 9: 'Sept', 10: 'Oct', 11: 'Nov', 12: 'Dec'}
    try:
        months = (row['PromoInterval'] or '').split(',')
        if row['Promo2Open'] and month2str[row['Month']] in months:
            return 1
        else:
            return 0
    except:
        return 0

def promo_cols(data):  # calculate duration of promotion(in months)
    
    data['Promo2Open'] = 12 * (data.Year - data.Promo2SinceYear) + (data.WeekOfYear - data.Promo2SinceWeek) * 7 / 30
    data['Promo2Open'] = data['Promo2Open'].apply(lambda x: 0 if x < 0 else x).fillna(0) * data[
        'Promo2']  # only when there is promo
    # whether a new round of promotion started in curent month
    data['IsPromo2Month'] = data.apply(check_promo_month, axis=1) * data['Promo2']

def compdis(data):
    
    max_comp_dist = data['CompetitionDistance'].max()
    data['CompetitionDistance'] = data['CompetitionDistance'].fillna(max_comp_dist)

def assandstoty(data):
    
    data['Assortment'] = data.Assortment.astype('category')
    data['StoreType'] = data.StoreType.astype('category')
    tempadf = pd.get_dummies(data['Assortment'])
    tempsdf = pd.get_dummies(data['StoreType'])

    ass = ['a ass', 'b ass', 'c ass']
    tempas = ['a', 'b', 'c']
    for a in range(len(ass)):
        data[ass[a]] = tempadf[tempas[a]]

    stoty = ['type a', 'type b', 'type c', 'type d']
    tempst = ['a', 'b', 'c', 'd']
    for s in range(len(stoty)):
        data[stoty[s]] = tempsdf[tempst[s]]

def popo(data):
    
    data.pop('Assortment')  # Preprocessed
    data.pop('StateHoliday')  # Preprocessed
    data.pop('StoreType')  # Preprocessed
    data.pop('PromoInterval')  # Preprocessed
    data.pop('Date')  # Preprocessed
    data.pop('CompetitionOpenSinceMonth')  # More than 50% NaN
    data.pop('CompetitionOpenSinceYear')  # More than 50% NaN
    data.pop('Promo2SinceYear')  # More than 50% NaN
    data.pop('Promo2SinceWeek')  # More than 50% NaN
    data.pop('Promo2')
    data.pop('type b')
    data.pop('Customers')

def prego(data):
    
    prepro(data)
    datesplitandcomdura(data)
    nalfil(data)
    promo_cols(data)
    compdis(data)
    assandstoty(data)
    popo(data)
       
def modbuil(data,  train, val):
    
    
    feature_list = []
    for col in data.columns:
        if col == 'Sales':
            continue
        else:
            feature_list.append(col)

    vectorassembler = VectorAssembler(inputCols=feature_list, outputCol='features').setHandleInvalid("keep")
    train_vector = vectorassembler.transform(data)

    splits = train_vector.randomSplit([0.8, 0.2])
    train = splits[0]
    val = splits[1]
    return train, val

def linreg(train, val, lr):
    
    paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.5,0.1,0.05,0.01])\
    .addGrid(lr.elasticNetParam, [0.3,0.1,0.05])\
    .build()
    evaluatorr = RegressionEvaluator(predictionCol="prediction", labelCol="Sales", metricName="r2")
    tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=evaluatorr,
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)
    
    lr_model = tvs.fit(train)
    lr_evaluator = evaluatorr.evaluate(lr_model.transform(train))
    return lr_evaluator

def renforeg(train, val, rf):
    
    paramGrid =ParamGridBuilder()\
    .addGrid(rf.maxDepth, [10,20,30])\
    .addGrid(rf.minInstancesPerNode, [1,2,3])\
    .addGrid(rf.bootstrap, [True,False])\
    .build()
    
    evaluatorr = RegressionEvaluator(predictionCol="prediction", labelCol="Sales", metricName="r2")
    tvs = TrainValidationSplit(estimator=rf,
                           estimatorParamMaps=paramGrid,
                           evaluator=evaluatorr,
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)
    
    rf_model = tvs.fit(train)
    rf_evaluator = evaluatorr.evaluate(rf_model.transform(train))
    
    return rf_evaluator

def dtreg(train, val, dt):

    paramGrid =ParamGridBuilder()\
    .addGrid(dt.maxDepth, [10,20,30])\
    .addGrid(dt.minInstancesPerNode, [1,2,3])\
    .build()
    
    evaluatorr = RegressionEvaluator(predictionCol="prediction", labelCol="Sales", metricName="r2")
    tvs = TrainValidationSplit(estimator=dt,
                           estimatorParamMaps=paramGrid,
                           evaluator=evaluatorr,
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)
    
    dt_model = tvs.fit(train)
    dt_evaluator = evaluatorr.evaluate(dt_model.transform(train))
        
    return dt_evaluator

def gbtreg(train, val, gbt):

    paramGrid =ParamGridBuilder()\
    .addGrid(gbt.maxDepth, [10,20,30])\
    .addGrid(gbt.minInstancesPerNode, [1,2,3])\
    .build()
    evaluatorr = RegressionEvaluator(predictionCol="prediction", labelCol="Sales", metricName="r2")
    tvs = TrainValidationSplit(estimator=gbt,
                           estimatorParamMaps=paramGrid,
                           evaluator=evaluatorr,
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)
    
    gbt_model = tvs.fit(train)
    gbt_evaluator = evaluatorr.evaluate(gbt_model.transform(train))
        
    return [gbt_evaluator, gbt_model]


def mogo(train_in):
  
    lr = LinearRegression(featuresCol='features', labelCol='Sales', maxIter=10,
                          regParam=0.8, elasticNetParam=0.1)
    rf = RandomForestRegressor(featuresCol='features', labelCol='Sales', maxDepth=20,
                                    minInstancesPerNode=2, bootstrap=True)
    dt = DecisionTreeRegressor(featuresCol='features', labelCol='Sales', maxDepth=20,
                                    minInstancesPerNode=2)
    gbt = GBTRegressor(featuresCol='features', labelCol='Sales', maxDepth=20,
                                    minInstancesPerNode=2)
    train = []
    val = []
    
    t, v = modbuil(train_in, train, val)
    l = linreg(t, v, lr)
    r = renforeg(t, v,  rf)
    d = dtreg(t, v, dt)
    g = gbtreg(t, v, gbt)
    return l, r, d, g
    

In [None]:
def spgo(xtrain):
     
    xtrain['CompetitionDistance'] = xtrain['CompetitionDistance'].astype(int) 
    xtrain['CompetitionOpenTime'] = xtrain['CompetitionOpenTime'].astype(int)
    xtrain['Promo2Open'] = xtrain['Promo2Open'].astype(int)
    #print(xtrain.iloc[-2:])
    
    
    mySchema = StructType([ StructField("Store", IntegerType(), True)\
                           ,StructField("DayOfWeek", IntegerType(), True)\
                           ,StructField("Sales", IntegerType(), True)\
                           ,StructField("Open", IntegerType(), True)\
                          ,StructField("Promo" , IntegerType(), True)\
                          ,StructField("SchoolHoliday", IntegerType(), True)\
                          ,StructField("CompetitionDistance", IntegerType(), True)\
                          ,StructField("0 holiday", IntegerType(), True)\
                          ,StructField("a holiday", IntegerType(), True)\
                          ,StructField("b holiday", IntegerType(), True)\
                          ,StructField("c holiday", IntegerType(), True)\
                          ,StructField("Year", IntegerType(), True)\
                          ,StructField("Month", IntegerType(), True)\
                          ,StructField("Day", IntegerType(), True)\
                          ,StructField("WeekOfYear", IntegerType(), True)\
                          ,StructField("CompetitionOpenTime", IntegerType(), True)\
                          ,StructField("Promo2Open", IntegerType(), True)\
                          ,StructField("IsPromo2Month", IntegerType(), True)\
                          ,StructField("a ass", IntegerType(), True)\
                          ,StructField("b ass", IntegerType(), True)\
                          ,StructField("c ass", IntegerType(), True)\
                          ,StructField("type a", IntegerType(), True)\
                          ,StructField("type c", IntegerType(), True)\
                          ,StructField("type d", IntegerType(), True)])
                        

    
    spark = SparkSession.builder.master("spark://master:7077").getOrCreate()
    trdf = spark.createDataFrame(xtrain,schema=mySchema )

    print("--------------made a spark df from the data------------")
    
    return trdf

In [None]:
x = pd.read_csv('x_train.csv')
y = x['Sales']


prego(x)

x = x.iloc[:100]

spdf = spgo(x)
l, r, d, g= mogo(spdf)


# print(l)
# print(' ')
# print(r)
# print(' ')
# print(d)
# print(' ')
# print(g[0])

FileNotFoundError: ignored

In [None]:
import matplotlib.pyplot as plt


def addlabels(x, y):
            for i in range(len(x)):
                plt.text(i, round(y[i],5), round(y[i],5))




x_bar = ['rl', 'rf', 'dt', 'gbt']
y_bar = [l,r,d,g[0]]                

addlabels(x_bar, y_bar)

plt.bar(x_bar,y_bar)

In [None]:
# import pyspark.sql.functions as F
from confluent_kafka import Producer, Consumer
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, FloatType
import matplotlib.pyplot as plt
import time

gbt_model= GBTRegressor(featuresCol='features', labelCol='Sales', maxDepth=20,
                                    minInstancesPerNode=2)

P1 = Producer({'bootstrap.servers':' localhost:9092'})

def forPred(new_data):
    
    
    new_data.collect()


def gbtreg(train,gbt_model,new_data):
   
    mySchema1 = StructType([ StructField("Store", IntegerType(), True)\
                           ,StructField("DayOfWeek", IntegerType(), True)\
                           ,StructField("Open", IntegerType(), True)\
                          ,StructField("Promo" , IntegerType(), True)\
                          ,StructField("SchoolHoliday", IntegerType(), True)\
                          ,StructField("CompetitionDistance", IntegerType(), True)\
                          ,StructField("0 holiday", IntegerType(), True)\
                          ,StructField("a holiday", IntegerType(), True)\
                          ,StructField("b holiday", IntegerType(), True)\
                          ,StructField("c holiday", IntegerType(), True)\
                          ,StructField("Year", IntegerType(), True)\
                          ,StructField("Month", IntegerType(), True)\
                          ,StructField("Day", IntegerType(), True)\
                          ,StructField("WeekOfYear", IntegerType(), True)\
                          ,StructField("CompetitionOpenTime", IntegerType(), True)\
                          ,StructField("Promo2Open", IntegerType(), True)\
                          ,StructField("IsPromo2Month", IntegerType(), True)\
                          ,StructField("a ass", IntegerType(), True)\
                          ,StructField("b ass", IntegerType(), True)\
                          ,StructField("c ass", IntegerType(), True)\
                          ,StructField("type a", IntegerType(), True)\
                          ,StructField("type c", IntegerType(), True)\
                          ,StructField("type d", IntegerType(), True)])
    
    print("------- vector assembling START-----------")
    print(" ")

    from pyspark.ml.feature import VectorAssembler
    
    vectorAssembler = VectorAssembler(inputCols = ['Store', 'DayOfWeek'\
                                                   ,'Open', 'Promo', 'SchoolHoliday' \
                                                   ,'CompetitionDistance', '0 holiday'\
                                                   , 'a holiday', 'b holiday', 'c holiday'\
                                                   , 'Year','Month','Day', 'WeekOfYear', 'CompetitionOpenTime'\
                                                   , 'Promo2Open', 'IsPromo2Month', 'a ass', 'b ass', 'c ass'\
                                                   , 'type a' , 'type c', 'type d'], outputCol = 'features')
    train = vectorAssembler.transform(train)
    new_data = vectorAssembler.transform(new_data)
    
    print("------- vector assembling OK-----------")
    print(" ")
    

    
    paramGrid =ParamGridBuilder()\
    .addGrid(gbt_model.maxDepth, [10,20,30])\
    .addGrid(gbt_model.minInstancesPerNode, [1,2,3])\
    .build()
    evaluatorr = RegressionEvaluator(predictionCol="prediction", labelCol="Sales", metricName="r2")
    tvs = TrainValidationSplit(estimator=gbt_model,
                           estimatorParamMaps=paramGrid,
                           evaluator=evaluatorr,
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)
    
    print("------------------starting to train a new model-------------------------")
    gbt_new = tvs.fit(train)
    gbt_evaluator = evaluatorr.evaluate(gbt_new.transform(train))

    count = 0 
    
    mse_arr = []
    sale_arr = []
    
    for i in range(3):
        
        Sale = new_data.collect()[i]['Sales']
        pred = [new_data.select(['Store', 'DayOfWeek'\
                                        ,'Open', 'Promo', 'SchoolHoliday' \
                                        ,'CompetitionDistance', '0 holiday'\
                                        , 'a holiday', 'b holiday', 'c holiday'\
                                        , 'Year','Month','Day', 'WeekOfYear', 'CompetitionOpenTime'\
                                        , 'Promo2Open', 'IsPromo2Month', 'a ass', 'b ass', 'c ass'\
                                        , 'type a' , 'type c', 'type d']).collect()[i]]
        
        
        
        
        row1 = spark.createDataFrame(pred ,schema=mySchema1)
        
        time.sleep(10)
        

        P1.produce(topic = str(int(row1.collect()[0]['Store'])) + "a",key='', 
                   value = ("prediction for next day " + " " + str(gbt_new.transform(vectorAssembler.transform(row1)).collect()[0]['prediction'])).encode())
                   
        
        
        
        print("topic is ", str(int(row1.collect()[0]['Store'])) + "a")
        print("values is -------------- prediction for next day " + " " + str(gbt_new.transform(vectorAssembler.transform(row1)).collect()[0]['prediction']))

        print("-----sent-------------------" + str(int(row1.collect()[0]['Store'])))
        
        
        model_pred = gbt_new.transform(vectorAssembler.transform(row1)).collect()[0]['prediction']
        
        print(Sale , " - ", model_pred)
        mse_arr.append( Sale - model_pred)
        sale_arr.append(Sale)
        print("msearr -----------")
        print(mse_arr)
        
    #calc the mse for the new samples
    # c = 0
    # k = 0
    # y_mean = sum(sale_arr)/len(sale_arr)
    
    # print("y_mean -----------")
    # print(y_mean)
    
    # for j in sale_arr:
    #     k += (j - x_mean) ** 2 #Rtot
    
    
    # for i in mse_arr:
    #     print(i)
    #     print("printed i -------------")
    #     c += i ** 2 #RSS
           
    # if(k == 0.0):
    #   RSquare = 0.0 #SHABAT
    # else:
    #   RSquare = c/k
    
    #c = c/len(mse_arr)   
    #print(c)
      
    time.sleep(10)
    
    meanErr = sum(mse_arr)/len(mse_arr)

    return meanErr




def calculateNewModel(train_data,consumer,topic):
        
    running = True
    
    def appendRow(row1,train_data):
        data_to_append = {}
        for i in range(len(train_data.columns)):
            data_to_append[train_data.columns[i]] = int(float(row1[i]))
        train_data = train_data.append(data_to_append, ignore_index = True)
        return train_data


    def basic_consume_loop(consumer, topic, train_data):
        print("Starting basic consume loop")
        res = []
        counter = 0
        
        new_data = pd.DataFrame(columns = x.columns)
        
        try:
            consumer.subscribe(topic)

            while running:
                msg = consumer.poll(timeout=10.0)
                if msg is None: continue

                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        # End of partition event
                        sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                         (msg.topic(), msg.partition(), msg.offset()))
                    elif msg.error():
                        raise KafkaException(msg.error())
                else:                    
                    
                    #print(msg.value())
                    m = msg.value().decode('utf-8').split(",")
                    if(m[0] == "Sample"):
                        print("got new msg")
                        #print(m[1:])
                        #print(m[3:])
                        new_data = appendRow(m[3:], new_data)
                        train_data = appendRow(m[3:], train_data)
                        #print(type(train_data["Store"].iloc[1]))
                        #print(train_data.iloc[-2:])
                        print('====ADDED====')
                        print(msg.value().decode('utf-8').split(','))
                        print('=============') 
                        if(len(new_data)%3 == 0):
                            #print(len(new_data))
                            #print(new_data.iloc[::-1])
                            res.append(gbtreg(spgo(train_data),gbt_model,spgo(new_data)))
                            new_data = new_data[0:0]
                            
                            def addlabels(x, y):
                                for i in range(len(x)):
                                    plt.text(i, round(y[i],5), round(y[i],5))
                            x_bar = range(len(res))
                            y_bar = res               
                            addlabels(x_bar, y_bar)
                            plt.bar(x_bar,y_bar)
                            plt.show()
                            
                            
#                             print("res is ---------------")
#                             print(res)
#                             for i in res:
#                                 print("difference = " , i)
                               
        finally:
            # Close down consumer to commit final offsets.
            consumer.close()

    def shutdown():
        running = False

    basic_consume_loop(consumer, topic, train_data)



In [None]:
from confluent_kafka import Consumer

conf = {'bootstrap.servers': "localhost:9092",
        'group.id': "foo",
        'auto.offset.reset': 'smallest'}

consumer1 = Consumer(conf)

In [None]:
calculateNewModel(x,consumer1,['1','2','3'])