<a href="https://colab.research.google.com/github/SKitavi/pyspark/blob/main/Spark_ML_Lib_Introduction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#**1. Setting Up the Environment**

In [None]:
!pip install pyspark



In [None]:
import pyspark

#**2. Loading the Dataset**

* The dataset used is California Housing Prices (from
Kaggle).
* It is loaded into PySpark DataFrame for distributed processing:

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CaliforniaHousing").getOrCreate()
df = spark.read.csv("housing.csv", header=True,
inferSchema=True)

In [4]:
df.printSchema()
df.show(5)

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR B

# 3. **Data Preprocessing**

* Handling missing values:



In [5]:
df = df.dropna()

* Selecting features and the target variable:

In [6]:
from pyspark.ml.feature import VectorAssembler

feature_cols = ['longitude', 'latitude', 'housing_median_age', 'total_rooms',
 'total_bedrooms', 'population', 'households', 'median_income']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_transformed = assembler.transform(df)

# **4. Splitting Data for Training and Testing**
* The dataset is split into training and test sets:



In [7]:
train_data, test_data = df_transformed.randomSplit([0.8, 0.2], seed=42)

#**5. Building a Machine Learning Model**
* Using Linear Regression from PySpark MLlib:

In [8]:
from pyspark.ml.regression import LinearRegression

In [9]:
lr = LinearRegression(featuresCol="features", labelCol="median_house_value")
model = lr.fit(train_data)

#**6. Evaluating the Model**
* Predictions on test data:

In [10]:
predictions = model.transform(test_data)
predictions.select("median_house_value", "prediction").show(15)

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          103600.0|102138.32319061086|
|          106700.0|190038.23036733456|
|           73200.0| 77197.94015233312|
|           90100.0| 165461.8792793178|
|           67000.0|120982.11327198846|
|           86400.0|  155596.987874303|
|           70500.0|128682.11129985936|
|           85100.0| 148049.0329841054|
|           80500.0| 148573.9335442488|
|           96000.0|  133456.888130059|
|           75500.0| 97768.32878634613|
|           75000.0| 50460.39456075849|
|          100600.0|156059.88453309098|
|           74100.0|122045.03343804413|
|           66800.0| 99964.70208527287|
+------------------+------------------+
only showing top 15 rows


* Checking model performance using evaluation metrics:

In [11]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="median_house_value",
predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

Root Mean Squared Error (RMSE): 69685.92905618671
