In [45]:
import pyspark
from pyspark.sql import SparkSession

In [46]:
spark = SparkSession.builder \
    .appName("MiApp") \
    .master("local[*]") \
    .getOrCreate()

In [47]:
df = spark.read.option("header","true").csv("house.csv" , inferSchema=True)

In [48]:
df.show(3)

+---+----------+-------------------+-------------------+---------------------+-------+----------------+--------------+--------------+--------------+
|_c0|      Date|               Time|Global_active_power|Global_reactive_power|Voltage|Global_intensity|Sub_metering_1|Sub_metering_2|Sub_metering_3|
+---+----------+-------------------+-------------------+---------------------+-------+----------------+--------------+--------------+--------------+
|  0|16/12/2006|2024-10-09 17:24:00|              4.216|                0.418|234.840|          18.400|         0.000|         1.000|          17.0|
|  1|16/12/2006|2024-10-09 17:25:00|              5.360|                0.436|233.630|          23.000|         0.000|         1.000|          16.0|
|  2|16/12/2006|2024-10-09 17:26:00|              5.374|                0.498|233.290|          23.000|         0.000|         2.000|          17.0|
+---+----------+-------------------+-------------------+---------------------+-------+----------------+---

In [49]:
df = df.na.drop()

In [50]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Time: timestamp (nullable = true)
 |-- Global_active_power: string (nullable = true)
 |-- Global_reactive_power: string (nullable = true)
 |-- Voltage: string (nullable = true)
 |-- Global_intensity: string (nullable = true)
 |-- Sub_metering_1: string (nullable = true)
 |-- Sub_metering_2: string (nullable = true)
 |-- Sub_metering_3: double (nullable = true)



In [51]:
#Convert columns type to double

from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType

columns_to_doub= ["Global_active_power", "Global_reactive_power","Voltage","Global_intensity","Sub_metering_1","Sub_metering_2"]

for column in columns_to_doub:
          
        df=df.withColumn(column + "_doub" , F.col(column).cast(DoubleType()))

df=df.drop(*columns_to_doub)

df.printSchema()



root
 |-- _c0: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Time: timestamp (nullable = true)
 |-- Sub_metering_3: double (nullable = true)
 |-- Global_active_power_doub: double (nullable = true)
 |-- Global_reactive_power_doub: double (nullable = true)
 |-- Voltage_doub: double (nullable = true)
 |-- Global_intensity_doub: double (nullable = true)
 |-- Sub_metering_1_doub: double (nullable = true)
 |-- Sub_metering_2_doub: double (nullable = true)



### CORRELATION

In [52]:
from pyspark.ml.feature import VectorAssembler

selected_features = ["Global_active_power_doub", "Global_reactive_power_doub","Voltage_doub","Global_intensity_doub","Sub_metering_1_doub","Sub_metering_2_doub","Sub_metering_3"]


assembler = VectorAssembler(inputCols=selected_features, outputCol="features")
df_vectorized = assembler.transform(df.select(selected_features)).select("features")


In [90]:
from pyspark.ml.stat import Correlation

pearsonCorr = Correlation.corr(df_vectorized, 'features', 'pearson').collect()[0][0]

In [95]:
pearsonCorr.toArray()


array([[ 1.        ,  0.24701705, -0.39976161,  0.9988886 ,  0.48440128,
         0.43456872,  0.63855542],
       [ 0.24701705,  1.        , -0.11224557,  0.26612039,  0.12311057,
         0.13923089,  0.08961653],
       [-0.39976161, -0.11224557,  1.        , -0.41136307, -0.19597555,
        -0.16740476, -0.26817208],
       [ 0.9988886 ,  0.26612039, -0.41136307,  1.        ,  0.48929822,
         0.44034654,  0.62654275],
       [ 0.48440128,  0.12311057, -0.19597555,  0.48929822,  1.        ,
         0.05472086,  0.10257107],
       [ 0.43456872,  0.13923089, -0.16740476,  0.44034654,  0.05472086,
         1.        ,  0.08087205],
       [ 0.63855542,  0.08961653, -0.26817208,  0.62654275,  0.10257107,
         0.08087205,  1.        ]])

### Linear regression 

In [108]:

independent_var=["Global_active_power_doub", "Global_reactive_power_doub","Global_intensity_doub","Sub_metering_1_doub","Sub_metering_2_doub","Sub_metering_3"]

dependent_var= "Voltage_doub"

featureasembler=VectorAssembler(inputCols=independent_var , outputCol="independent")

output=featureasembler.transform(df)

In [109]:
final_df = output.select("independent" , "Voltage_doub")

In [138]:
from pyspark.ml.regression import LinearRegression 

train_data, test_data = final_df.randomSplit([0.75 , 0.25])

regressor= LinearRegression(featuresCol="independent" , labelCol="Voltage_doub" , maxIter=10)
regressor = regressor.fit(train_data)

regressor.coefficients

DenseVector([21.2816, 3.6219, -5.3393, 0.0105, 0.0173, -0.0497])

In [139]:
#Prediction statistics

pred_results = regressor.evaluate(test_data)
pred_results.predictions.describe().show()

+-------+------------------+------------------+
|summary|      Voltage_doub|        prediction|
+-------+------------------+------------------+
|  count|            511770|            511770|
|   mean|240.83951890497553|240.84040841507687|
| stddev|3.2441211136411416|1.6182189562357407|
|    min|            223.99|219.70258211909382|
|    max|            253.61|243.30580325415545|
+-------+------------------+------------------+



In [140]:
# Metrics

print("R2 :" , pred_results.r2)
print("Mean absolute error: " , pred_results.meanAbsoluteError ) 
print("Mean squared error :" , pred_results.meanSquaredError)

R2 : 0.24950607839807137
Mean absolute error:  2.125165132884811
Mean squared error : 7.89842410628906
