In [23]:
import json
import pandas as pd
from kafka import KafkaConsumer
from pyspark.sql import SparkSession
import joblib
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

In [24]:
spark = SparkSession.builder.appName("Driving_Declinometer").getOrCreate()

kafka_config = {
        "bootstrap_servers": 'localhost:9092',
        "value_deserializer": lambda v: json.loads(v.decode("utf-8")),
    }

KAFKA_TOPIC_NAME_CONS = "Driving_Declinometer_test"

In [25]:
schema = StructType([
    StructField("Epm_nEng_100ms", DoubleType(), True),
    StructField("VehV_v_100ms", DoubleType(), True),
    StructField("ActMod_trqInr_100ms", DoubleType(), True),
    StructField("RngMod_trqCrSmin_100ms", DoubleType(), True),
    StructField("CoVeh_trqAcs_100ms", DoubleType(), True),
    StructField("Clth_st_100ms", IntegerType(), True),
    StructField("CoEng_st_100ms", IntegerType(), True),
    StructField("Com_rTSC1VRVCURtdrTq_100ms", IntegerType(), True),
    StructField("Com_rTSC1VRRDTrqReq_100ms", IntegerType(), True),
])

In [26]:
def preprocess(received_data): 
    scaler = joblib.load('./scaler.joblib')
    driving_data = pd.DataFrame([received_data])
    columns_to_remove = ['CoVeh_trqAcs_100ms', 'Clth_st_100ms', 'CoEng_st_100ms', 'Com_rTSC1VRVCURtdrTq_100ms', 'Com_rTSC1VRRDTrqReq_100ms']
    driving_data.drop(columns=columns_to_remove, inplace=True)
    for column in driving_data.columns:
        if(driving_data[column].dtype=='object'):
            driving_data[column]=scaler.fit_transform(driving_data[column])
    return driving_data.values


In [27]:
def predict(data): 
    preproccessed_data = preprocess(data)
    loaded_regressor = joblib.load('./linear_regression_model.joblib')
    result = loaded_regressor.predict(preproccessed_data)
    return result

In [28]:
print("Kafka Consumer Application Started ... ")
consumer = KafkaConsumer(KAFKA_TOPIC_NAME_CONS,
                             bootstrap_servers=kafka_config["bootstrap_servers"])

Kafka Consumer Application Started ... 


In [29]:
for message in consumer:
    
    message_value = message.value.decode('utf-8')


    message_dict = json.loads(message_value)

    print("Received Message:")
    print(message_dict)
    predict(message_dict)
        
    print("-----------------------")
    print("result ", predict(message_dict))
    print("-----------------------")

Received Message:
{'Epm_nEng_100ms': 1301.5, 'VehV_v_100ms': 52.96, 'ActMod_trqInr_100ms': 0.0, 'RngMod_trqCrSmin_100ms': -140.0, 'CoVeh_trqAcs_100ms': 9.999747, 'Clth_st_100ms': 0, 'CoEng_st_100ms': 3, 'Com_rTSC1VRVCURtdrTq_100ms': 0, 'Com_rTSC1VRRDTrqReq_100ms': 0}
-----------------------
result  [113.03661295]
-----------------------
Received Message:
{'Epm_nEng_100ms': 912.0, 'VehV_v_100ms': 67.82, 'ActMod_trqInr_100ms': 1963.5, 'RngMod_trqCrSmin_100ms': -168.0, 'CoVeh_trqAcs_100ms': 9.999747, 'Clth_st_100ms': 0, 'CoEng_st_100ms': 3, 'Com_rTSC1VRVCURtdrTq_100ms': 0, 'Com_rTSC1VRRDTrqReq_100ms': 0}
-----------------------
result  [4487.95476539]
-----------------------
Received Message:
{'Epm_nEng_100ms': 1213.0, 'VehV_v_100ms': 74.94, 'ActMod_trqInr_100ms': 1603.812, 'RngMod_trqCrSmin_100ms': -196.0, 'CoVeh_trqAcs_100ms': 9.999747, 'Clth_st_100ms': 0, 'CoEng_st_100ms': 3, 'Com_rTSC1VRVCURtdrTq_100ms': 0, 'Com_rTSC1VRRDTrqReq_100ms': 0}
-----------------------
result  [3706.23639924

KeyboardInterrupt: 