In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import Normalizer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import CrossValidatorModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
import pandas as pd

In [2]:
#Creating sparkContext and sparkSession
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
spark = SparkSession(sc)

In [3]:
df = (spark.read
          .format("csv")
          .option('header', 'true')
          .load("./datasets/training.csv"))

In [4]:
# inputCols for StringIndexer
string_col = [
    'DataSource','DataValueUnit','DataValueTypeID',
    'QuestionID','LocationID','StratificationCategoryID1',
    'StratificationID1','TopicID'
]

#outputCols for StringIndexer
string_col_output = [
    'DataSourceIndex','DataValueUnitIndex','DataValueTypeIDIndex',
    'QuestionIDIndex','LocationIDIndex','StratificationCategoryID1Index',
    'StratificationID1Index','TopicIDIndex'
]

#inputCols for OneHotEncoder
string_col_encode_input = [
    'DataSourceIndex','DataValueUnitIndex','DataValueTypeIDIndex',
    'QuestionIDIndex','LocationIDIndex','StratificationCategoryID1Index',
    'StratificationID1Index'
]

#outputCols for OneHotEncoder
string_col_encoded = [
    'DataSourceVec','DataValueUnitVec','DataValueTypeIDVec',
    'QuestionIDVec','LocationIDVec','StratificationCategoryID1Vec',
    'StratificationID1Vec'
]

#inputCols for VectorAssembler
features_to_assemble = string_col_encoded + ['DataValue','LowConfidenceLimit','HighConfidenceLimit','Geo_lat','Geo_lon']

In [5]:
columns  = df.columns

dataset = df.select(col(columns[0]).cast('string'),
                    col(columns[1]).cast('string'),
                    col(columns[2]).cast('string'),
                    col(columns[3]).cast('float'),
                    col(columns[4]).cast('float'),
                    col(columns[5]).cast('float'),
                    col(columns[6]).cast('string'),
                    col(columns[7]).cast('string'),
                    col(columns[8]).cast('string'),
                    col(columns[9]).cast('string'),
                    col(columns[10]).cast('string'),
                    col(columns[11]).cast('float'),
                    col(columns[12]).cast('float'),
                   )

In [6]:
indexer = StringIndexer(inputCols= string_col, outputCols=string_col_output)
encoder = OneHotEncoder(inputCols=string_col_encode_input, outputCols=string_col_encoded)
vectorAssembler = VectorAssembler(inputCols=features_to_assemble,
                                  outputCol="features")
normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)
scaler = StandardScaler(inputCol="features_norm", outputCol="features_norm_scaled")
pipeline = Pipeline(stages=[indexer, encoder, vectorAssembler, normalizer,scaler])
model = pipeline.fit(dataset)
dd = model.transform(dataset)

In [7]:
path= r'./SavedModels/lr_model'
lr_model = CrossValidatorModel.read().load(path)
lr = lr_model.bestModel

In [8]:
label = []
prediction = []

In [9]:
def classification_report(metrics,dataset) : 
    print('Accuracy : {} %'.format(metrics.evaluate(dataset,{metrics.metricName: "accuracy"})*100))
    print('f1-score : {}'.format(metrics.evaluate(dataset,{metrics.metricName: "f1"})))
    print('precision-score : {}'.format(metrics.evaluate(dataset,{metrics.metricName: "precisionByLabel"})))
    print('recall-score : {}'.format(metrics.evaluate(dataset,{metrics.metricName: "recallByLabel"})))

In [10]:
def test_train(text) :
    global dtc,prediction,label
    if text.collect() != [] :
        for data in text.collect() :
            data = data.split('|')
            feature = [(data[0]),data[1],data[2],float(data[3]),float(data[4]),float(data[5]),data[6],data[7],data[8],data[9],data[10],float(data[11]),float(data[12])]
            #print(feature)
            Dframe = sc.parallelize([feature]).toDF(('DataSource','DataValueUnit','DataValueTypeID','DataValue','LowConfidenceLimit','HighConfidenceLimit','TopicID','QuestionID','LocationID','StratificationCategoryID1','StratificationID1','Geo_lat','Geo_lon'))
            #print(Dframe.collect())
            df = model.transform(Dframe)
            #print(df.collect())
            tempdf = lr.transform(df)
            
            prediction.append(float(tempdf.toPandas().pred_lr.values[0]))
            label.append(float(tempdf.toPandas().TopicIDIndex.values[0]))
    else :
        data = spark.createDataFrame(list(zip(prediction,label)),['prediction','label'])
        metrics = MulticlassClassificationEvaluator()
        metrics = metrics.setPredictionCol('prediction')
        classification_report(metrics,data)
        try :
            ssc.stop()
        except Py4JJavaError() as err:
            print(err)
            print('Streaming Stopped')

In [11]:
ssc = StreamingContext(sc, 2)
lines = ssc.socketTextStream('localhost', 9991)
lines.foreachRDD(test_train)

In [12]:
ssc.start()             # Start the streaming process
ssc.awaitTermination()  # Wait for the streaming to end

Accuracy : 45.83333333333333 %
f1-score : 0.4313949938949939
precision-score : 1.0
recall-score : 0.5
