In [1]:
import json
import numpy as np
import pandas as pd
from pyspark import *
from pyspark.ml import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from kafka import KafkaConsumer, KafkaProducer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import LinearRegression

In [2]:
###################################################
###                Transformers                 ###
###################################################
class ColumnRenamer(Estimator, Transformer):
    def __init__(self, columnsNameOld = "", columnsNameNew = ""):
        self.columnsNameOld = columnsNameOld
        self.columnsNameNew = columnsNameNew
    
    def _fit(self, X, y = None):
        return self
    
    def _transform(self, X):
        Xaux = X
        return Xaux.withColumnRenamed(self.columnsNameOld, self.columnsNameNew)

class ColumnDropper(Estimator, Transformer):
    def __init__(self, columnsName = ["id_region"]):
        self.columnsName = columnsName
    
    def _fit(self, X, y = None):
        return self
    
    def _transform(self, X):
        Xaux = X
        for g in self.columnsName:
            Xaux = Xaux.drop(g)
        return Xaux

class ColumnTransformer(Estimator, Transformer):
    def _fit(self, X, y = None):
        return self
    
    def _transform(self, X):
        Xaux = X
        Xaux = Xaux.withColumn('object_type', 
            when(Xaux.object_type.endswith('2'),regexp_replace(Xaux.object_type,'2','1')) \
            .otherwise(Xaux.object_type))
        Xaux = Xaux.withColumn('rooms', 
            when(Xaux.rooms.endswith('-1'),regexp_replace(Xaux.rooms,'-1','0')) \
            .otherwise(Xaux.rooms))
        Xaux = Xaux.withColumn('kitchen_area', 
            when(Xaux.kitchen_area == '-100.0',regexp_replace(Xaux.kitchen_area,'-100','0')) \
            .otherwise(Xaux.kitchen_area))
        # Xaux = Xaux.withColumn('object_type',Xaux.object_type.cast(IntegerType()))
        Xaux = Xaux.withColumn('object_type',Xaux.object_type.cast(BooleanType()))
        Xaux = Xaux.withColumn('rooms',Xaux.rooms.cast(IntegerType()))
        Xaux = Xaux.withColumn('kitchen_area',Xaux.kitchen_area.cast(FloatType()))
        return Xaux

class RecoveryDataTransformer(Estimator,Transformer):
    def __init__(self, columnsName = [""]):
        self.columnsName = columnsName

    def find_median(self, values_list) -> float:
        result_median = np.nanmedian(np.array(values_list, dtype=float))
        return float(result_median)

    def _fit(self, X, y = None):
        return self
    
    def _transform(self, X):
        Xaux = X

        if len(self.columnsName) > 0:
            for field in self.columnsName:
                Xaux_list = Xaux.select(field).collect()
                Xaux_array = [int(row[field]) for row in Xaux_list]
                median_result = self.find_median(Xaux_array)
                Xaux.fillna(value=median_result, subset=field)

        return Xaux

class LineDistincter(Estimator, Transformer):    
    def _fit(self, X, y = None):
        return self
    
    def _transform(self, X):
        Xaux = X
        return Xaux.distinct()

class LineDropper(Estimator, Transformer):
    def _fit(self, X, y = None):
        return self
    
    def _transform(self, X):
        Xaux = X
        return Xaux.na.drop()

In [6]:
to_encode_col_names = ['price','level','levels','rooms','area','kitchen_area','object_type','building_type','id_region']
to_remove_col_names = ['date','house_id', 'street_id', 'postal_code','geo_lat','geo_lon']
to_recover_col_names = ['price','level','levels','rooms','area','kitchen_area','building_type']

###################################################
###                   Dataset                   ###
###################################################
hdfsurl="hdfs://10.84.128.47:9000"

spark = SparkSession \
        .builder \
        .appName("House Prices") \
        .getOrCreate()
df = spark.read.load(hdfsurl + "/grupo8/input_data_1000_lines.csv", format="csv", sep=";", inferSchema="true", header="true")
df = df.withColumn("price", df.price.cast(DoubleType()))
df.head()


###################################################
###                  Pipeline                   ###
###################################################
# > Stages:
#   $ - ColumnDropper:
#   $ - RecoveryDataTransformer:
#   $ - LineDistincter:
#   $ - LineDropper:
#   $ - ColumnTransformer:
df_clean = Pipeline(stages=[ColumnDropper(to_remove_col_names), RecoveryDataTransformer(to_recover_col_names), LineDistincter(), LineDropper(), ColumnTransformer()]).fit(df).transform(df)

# Data preparation for model training
vectorAssembler = VectorAssembler(inputCols = df_clean.drop("price").columns, outputCol = 'features')
train_vector = vectorAssembler.transform(df_clean)

splits = train_vector.randomSplit([0.7, 0.3])
train = splits[0]
val = splits[1]

lr = LinearRegression(featuresCol = 'features', labelCol='price', maxIter=100, regParam=0.8, elasticNetParam=0.1)
#rfr= RandomForestRegressor(featuresCol = 'features', labelCol = 'price', maxDepth = 3)

###################################################
###                  Training                   ###
###################################################
lr_model = lr.fit(train)

lr_predictions = lr_model.transform(val)
#lr_predictions.select("prediction","price").show(20)


In [None]:
###################################################
###            Streaming Preparation            ###
###################################################
kafkaServer='172.25.27.157:9092'

# Kafka Consumer (Topic: streaming-bd)
consumer = KafkaConsumer(
    'streaming-bd',
    bootstrap_servers=kafkaServer,
    auto_offset_reset='earliest'
)

def serializer(message):
    return json.dumps(message).encode('utf-8')

# Kafka Producer (Topic: streaming-feedback)
producer = KafkaProducer(
    bootstrap_servers=kafkaServer,
    value_serializer=serializer
)

###################################################
###               Streaming Loop                ###
###################################################
for message in consumer:
    streaming_data = json.loads(message.value)
    data_normalized = pd.json_normalize(json.loads(message.value))
    new_data = pd.DataFrame.from_dict(data_normalized)
    
    if 'id' in new_data.columns: 
        id = new_data.iloc[:,0].to_string(index=False)
        atual_price = new_data.iloc[:,2].to_string(index=False)

        sparkDF=spark.createDataFrame(new_data) 

        spark_clean = Pipeline(stages=[ColumnDropper(to_remove_col_names), ColumnTransformer()]).fit(sparkDF).transform(sparkDF)
        spark_clean = spark_clean.drop('id')

        train_vector = vectorAssembler.transform(spark_clean)

        predPrice = lr_model.transform(train_vector)
        
        pred = predPrice.collect()[0][10]
        pred = format(np.abs(pred), '.1f')

        message = 'ID: ' + id
        
        if float(pred) > float(atual_price):
            message += ' has a Good Price!'
        else: 
            message += ' has a Bad Price!'
        
        message += ' House price: ' + atual_price + ' | Predition: ' + pred
        print(message)
        producer.send('streaming-feedback-bd', message)