In [1]:
from pyspark import SparkContext                                   
from pyspark.sql import SparkSession, Window, Row                 
from pyspark.sql import functions as F                             
from pyspark.sql.functions import col, isnan, when, count           
from pyspark.sql.functions import *                                
from pyspark.sql.types import *                                  

from pyspark.ml.feature import VectorAssembler                      
from pyspark.ml.regression import LinearRegression   

## **Initializing a SparkSession**

In [2]:
# Create a Spark session
spark = SparkSession.builder \
    .appName("DataPreparation") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minio") \
    .config("spark.hadoop.fs.s3a.secret.key", "minio123") \
    .getOrCreate()

# Read data from MinIO
parsed_df = spark.read.format("csv") \
    .option("header", "true") \
    .load("s3a://compteurbucket/compteurs.csv")


In [3]:
#printing the dataframe
parsed_df.show()

+--------------------+-------+-----------+-------+------------+--------------+------+----------+-----------+----------+-----------+
|           Timestamp|voltage|compteur_id|current|power_factor|consumption_KW| price|id_Machine|id_consumer|Nbr_Person|Nbr_machine|
+--------------------+-------+-----------+-------+------------+--------------+------+----------+-----------+----------+-----------+
|2023-05-24 12:58:...| 225.72|        996|     -6|        0.95|        794.15|959.02|       746|        698|       545|        218|
|2023-05-24 12:58:...| 220.14|        576|     -4|        0.99|        231.47|337.32|       440|        420|       189|        108|
|2023-05-24 12:58:...| 228.65|        955|      3|        0.83|        271.62| 202.1|       758|        142|       558|        388|
|2023-05-24 12:58:...| 226.94|        267|      1|        0.87|        218.09|593.35|       625|        670|       753|        453|
|2023-05-24 12:58:...| 232.79|        597|     -2|        0.85|        149.9

## **Data Pre-processing**

In [4]:
# Creating a dataframe to check null value counts
null_df = parsed_df.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '') | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in parsed_df.columns])

# Displaying the null value counts dataframe
null_df.show()

+---------+-------+-----------+-------+------------+--------------+-----+----------+-----------+----------+-----------+
|Timestamp|voltage|compteur_id|current|power_factor|consumption_KW|price|id_Machine|id_consumer|Nbr_Person|Nbr_machine|
+---------+-------+-----------+-------+------------+--------------+-----+----------+-----------+----------+-----------+
|        0|      0|          0|    194|         194|             0|    0|         0|          0|         0|          0|
+---------+-------+-----------+-------+------------+--------------+-----+----------+-----------+----------+-----------+



In [5]:
# Creating a dataframe to check negative value counts
null_df = parsed_df.select([count(when((col(c) == 'None') | \
                                      (col(c) == 'NULL') | \
                                      (col(c) == '') | \
                                      (col(c).isNull()) | \
                                      (isnan(col(c))) | \
                                      (col(c) < 0), c)).alias(c)
                           for c in parsed_df.columns])

# Displaying the negative value counts dataframe
null_df.show()

+---------+-------+-----------+-------+------------+--------------+-----+----------+-----------+----------+-----------+
|Timestamp|voltage|compteur_id|current|power_factor|consumption_KW|price|id_Machine|id_consumer|Nbr_Person|Nbr_machine|
+---------+-------+-----------+-------+------------+--------------+-----+----------+-----------+----------+-----------+
|        0|      0|          0|    567|         194|             0|    0|         0|          0|         0|          0|
+---------+-------+-----------+-------+------------+--------------+-----+----------+-----------+----------+-----------+



In [6]:

preprocessed_df =parsed_df.withColumn('power_factor', when(parsed_df['power_factor'] >= 0, parsed_df['power_factor']).otherwise(0)) \
    .withColumn('compteur_id', when(parsed_df['compteur_id'].isNull(), 0).otherwise(parsed_df['compteur_id'])) \
    .withColumn('voltage', when(parsed_df['voltage'].isNull(), 0).otherwise(parsed_df['voltage'])) \
    .withColumn('current', when(parsed_df['current'] >= 0, parsed_df['current']).otherwise(0)) \
    .withColumn('consumption_KW', when(parsed_df['consumption_KW'].isNull(), 0).otherwise(parsed_df['consumption_KW'])) \
    .withColumn('price', when(parsed_df['price'].isNull(), 0).otherwise(parsed_df['price'])) \
    .withColumn('id_Machine', when(parsed_df['id_Machine'].isNull(), 0).otherwise(parsed_df['id_Machine'])) \
    .withColumn('id_consumer', when(parsed_df['id_consumer'].isNull(), 0).otherwise(parsed_df['id_consumer'])) \
    .withColumn('Nbr_Person', when(parsed_df['Nbr_Person'].isNull(), 0).otherwise(parsed_df['Nbr_Person'])) \
    .withColumn('Nbr_machine', when(parsed_df['Nbr_machine'].isNull(), 0).otherwise(parsed_df['Nbr_machine']))
# Write data to MinIO

In [7]:
#printing the dataframe
preprocessed_df.show()

+--------------------+-------+-----------+-------+------------+--------------+------+----------+-----------+----------+-----------+
|           Timestamp|voltage|compteur_id|current|power_factor|consumption_KW| price|id_Machine|id_consumer|Nbr_Person|Nbr_machine|
+--------------------+-------+-----------+-------+------------+--------------+------+----------+-----------+----------+-----------+
|2023-05-24 12:58:...| 225.72|        996|      0|        0.95|        794.15|959.02|       746|        698|       545|        218|
|2023-05-24 12:58:...| 220.14|        576|      0|        0.99|        231.47|337.32|       440|        420|       189|        108|
|2023-05-24 12:58:...| 228.65|        955|      3|        0.83|        271.62| 202.1|       758|        142|       558|        388|
|2023-05-24 12:58:...| 226.94|        267|      1|        0.87|        218.09|593.35|       625|        670|       753|        453|
|2023-05-24 12:58:...| 232.79|        597|      0|        0.85|        149.9


# **Data Post-Processing**


- We wll use a **VectorAssembler**.

- VectorAssember from **Spark ML library** is a module that allows to convert **numerical features into a single vector** that is used by the machine learning models.

- It takes a **list of columns** (features) and **combines it** into a single **vector column** (feature vector).

- It is then used as an **input** into the **machine learning models** in Spark ML.

In [8]:
#This performs additional data type conversions on the DataFrame parsed_df
parsed_df = parsed_df.withColumn('consumption_KW', round(col('consumption_KW'), 2).cast(DoubleType()))
parsed_df = parsed_df.withColumn('price', round(col('price'), 2).cast(DoubleType()))
parsed_df = parsed_df.withColumn('Nbr_Person', col('Nbr_Person').cast(IntegerType()))
parsed_df = parsed_df.withColumn('Nbr_machine', col('Nbr_machine').cast(IntegerType()))

# Define the VectorAssembler
assembler = VectorAssembler(inputCols=['consumption_KW', 'price', 'Nbr_Person', 'Nbr_machine'],
                            outputCol='Independent')

- Using this assembler, we can transform the original dataset and take a look at the result:

In [9]:
output = assembler.transform(parsed_df)
output.show()

+--------------------+-------+-----------+-------+------------+--------------+------+----------+-----------+----------+-----------+--------------------+
|           Timestamp|voltage|compteur_id|current|power_factor|consumption_KW| price|id_Machine|id_consumer|Nbr_Person|Nbr_machine|         Independent|
+--------------------+-------+-----------+-------+------------+--------------+------+----------+-----------+----------+-----------+--------------------+
|2023-05-24 12:58:...| 225.72|        996|     -6|        0.95|        794.15|959.02|       746|        698|       545|        218|[794.15,959.02,54...|
|2023-05-24 12:58:...| 220.14|        576|     -4|        0.99|        231.47|337.32|       440|        420|       189|        108|[231.47,337.32,18...|
|2023-05-24 12:58:...| 228.65|        955|      3|        0.83|        271.62| 202.1|       758|        142|       558|        388|[271.62,202.1,558...|
|2023-05-24 12:58:...| 226.94|        267|      1|        0.87|        218.09|593.

- This DataFrame can now be used for training models available in Spark ML by passing `Independent` vector column as your **input variable** and `Seller_Type` as your **target variable**.

In [10]:
output.columns

['Timestamp',
 'voltage',
 'compteur_id',
 'current',
 'power_factor',
 'consumption_KW',
 'price',
 'id_Machine',
 'id_consumer',
 'Nbr_Person',
 'Nbr_machine',
 'Independent']

In [11]:
final_data = output.select("Independent", "Price")
final_data.show()

+--------------------+------+
|         Independent| Price|
+--------------------+------+
|[794.15,959.02,54...|959.02|
|[231.47,337.32,18...|337.32|
|[271.62,202.1,558...| 202.1|
|[218.09,593.35,75...|593.35|
|[149.93,697.82,72...|697.82|
|[269.2,316.38,968...|316.38|
|[796.25,291.41,47...|291.41|
|[621.05,134.95,72...|134.95|
|[930.95,617.27,73...|617.27|
|[360.87,515.0,491...| 515.0|
|[327.44,720.55,11...|720.55|
|[842.15,193.87,15...|193.87|
|[623.95,909.93,22...|909.93|
|[263.53,372.77,19...|372.77|
|[527.93,847.73,24...|847.73|
|[691.8,591.9,616....| 591.9|
|[251.97,175.25,50...|175.25|
|[697.94,686.71,83...|686.71|
|[667.08,129.58,48...|129.58|
|[115.34,335.66,61...|335.66|
+--------------------+------+
only showing top 20 rows



#### **Train Test Split**:

In [12]:
train_data, validation_data = final_data.randomSplit(weights=[0.7, 0.3], seed=49)

# **Linear Regression Using PySpark**

### **Model Initialization and Training**

In [13]:
# Initializing a Linear Regression model
ss = LinearRegression(featuresCol='Independent', labelCol='Price')

# Training the model
ss = ss.fit(train_data)

In [14]:
ss.intercept

1.5400415741225712e-13

In [15]:
ss.coefficients

DenseVector([0.0, 1.0, 0.0, -0.0])

In [16]:
pred_train = ss.evaluate(train_data)
pred_train.predictions.show()

+--------------------+------+------------------+
|         Independent| Price|        prediction|
+--------------------+------+------------------+
|[1.2,754.65,270.0...|754.65|            754.65|
|[1.26,483.65,565....|483.65| 483.6500000000001|
|[3.16,606.31,290....|606.31| 606.3099999999998|
|[4.51,794.91,707....|794.91| 794.9099999999999|
|[4.57,631.01,762....|631.01|            631.01|
|[4.81,111.71,308....|111.71|111.71000000000012|
|[5.51,571.43,896....|571.43| 571.4299999999998|
|[7.01,807.8,402.0...| 807.8| 807.7999999999998|
|[7.02,378.04,806....|378.04| 378.0400000000001|
|[8.37,628.89,351....|628.89| 628.8899999999999|
|[8.79,324.03,80.0...|324.03|            324.03|
|[9.32,564.03,299....|564.03|            564.03|
|[12.98,108.12,221...|108.12|108.12000000000012|
|[21.65,47.27,499....| 47.27|  47.2700000000001|
|[23.95,73.34,126....| 73.34| 73.34000000000015|
|[25.28,853.19,261...|853.19|            853.19|
|[26.66,813.09,197...|813.09|            813.09|
|[28.23,27.18,500...

### **Model Evaluation on Test Set**

In [17]:
pred = ss.evaluate(validation_data)
pred.predictions.show()

+--------------------+------+------------------+
|         Independent| Price|        prediction|
+--------------------+------+------------------+
|[2.53,850.95,224....|850.95|            850.95|
|[9.66,943.01,847....|943.01|            943.01|
|[17.75,731.51,894...|731.51| 731.5099999999999|
|[18.44,918.3,235....| 918.3|             918.3|
|[20.03,819.09,530...|819.09| 819.0899999999999|
|[22.0,840.67,654....|840.67| 840.6699999999998|
|[25.12,565.54,250...|565.54|            565.54|
|[25.77,279.02,382...|279.02|            279.02|
|[27.16,843.26,629...|843.26| 843.2599999999999|
|[34.75,913.23,474...|913.23|            913.23|
|[40.87,295.07,514...|295.07| 295.0700000000001|
|[47.54,270.62,445...|270.62|270.62000000000006|
|[48.33,965.46,596...|965.46|            965.46|
|[53.02,320.0,330....| 320.0|320.00000000000006|
|[57.25,695.7,575....| 695.7| 695.6999999999999|
|[60.06,27.48,275....| 27.48| 27.48000000000015|
|[64.4,948.44,169....|948.44|            948.44|
|[65.76,469.96,367..

In [18]:
preprocessed_df.write.format("csv") \
    .mode("overwrite") \
    .save("s3a://compteurbucket/output-data-ML.csv")

# Stop the Spark session
spark.stop()

- We used **VectorAssembler** for **preparing our data** for machine learning model.

- This was proceeded by a **linear regression** training and evaluation which observed **a good fit** of the model with the current constraints of data.