<a href="https://colab.research.google.com/github/Akshata4/data_engineering/blob/main/pyspark/PySpark_Working_example.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
!java -version

openjdk version "11.0.26" 2025-01-21
OpenJDK Runtime Environment (build 11.0.26+4-post-Ubuntu-1ubuntu122.04)
OpenJDK 64-Bit Server VM (build 11.0.26+4-post-Ubuntu-1ubuntu122.04, mixed mode, sharing)


In [None]:
import findspark
findspark.init()

In [None]:
%cd /content/

/content


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

In [None]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
!wget https://raw.githubusercontent.com/asifahmed90/pyspark-ML-in-Colab/refs/heads/master/BostonHousing.csv

--2025-04-28 05:31:44--  https://raw.githubusercontent.com/asifahmed90/pyspark-ML-in-Colab/refs/heads/master/BostonHousing.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 35735 (35K) [text/plain]
Saving to: ‘BostonHousing.csv’


2025-04-28 05:31:44 (3.34 MB/s) - ‘BostonHousing.csv’ saved [35735/35735]



In [None]:
dataset = spark.read.csv('BostonHousing.csv',inferSchema=True, header =True)
dataset.printSchema()

root
 |-- crim: double (nullable = true)
 |-- zn: double (nullable = true)
 |-- indus: double (nullable = true)
 |-- chas: integer (nullable = true)
 |-- nox: double (nullable = true)
 |-- rm: double (nullable = true)
 |-- age: double (nullable = true)
 |-- dis: double (nullable = true)
 |-- rad: integer (nullable = true)
 |-- tax: integer (nullable = true)
 |-- ptratio: double (nullable = true)
 |-- b: double (nullable = true)
 |-- lstat: double (nullable = true)
 |-- medv: double (nullable = true)



In [None]:
#Input all the features in one vector column
assembler = VectorAssembler(inputCols=['crim', 'zn', 'indus', 'chas', 'nox', 'rm', 'age', 'dis', 'rad', 'tax', 'ptratio', 'b', 'lstat'], outputCol = 'Attributes')

output = assembler.transform(dataset)

#Input vs Output
finalized_data = output.select("Attributes","medv")
finalized_data.show()

+--------------------+----+
|          Attributes|medv|
+--------------------+----+
|[0.00632,18.0,2.3...|24.0|
|[0.02731,0.0,7.07...|21.6|
|[0.02729,0.0,7.07...|34.7|
|[0.03237,0.0,2.18...|33.4|
|[0.06905,0.0,2.18...|36.2|
|[0.02985,0.0,2.18...|28.7|
|[0.08829,12.5,7.8...|22.9|
|[0.14455,12.5,7.8...|27.1|
|[0.21124,12.5,7.8...|16.5|
|[0.17004,12.5,7.8...|18.9|
|[0.22489,12.5,7.8...|15.0|
|[0.11747,12.5,7.8...|18.9|
|[0.09378,12.5,7.8...|21.7|
|[0.62976,0.0,8.14...|20.4|
|[0.63796,0.0,8.14...|18.2|
|[0.62739,0.0,8.14...|19.9|
|[1.05393,0.0,8.14...|23.1|
|[0.7842,0.0,8.14,...|17.5|
|[0.80271,0.0,8.14...|20.2|
|[0.7258,0.0,8.14,...|18.2|
+--------------------+----+
only showing top 20 rows



In [None]:
#Split training and testing data
train_data,test_data = finalized_data.randomSplit([0.8,0.2])


regressor = LinearRegression(featuresCol = 'Attributes', labelCol = 'medv')

#Learn to fit the model from training set
regressor = regressor.fit(train_data)

#To predict the prices on testing set
pred = regressor.evaluate(test_data)

#Predict the model
pred.predictions.show()

+--------------------+----+------------------+
|          Attributes|medv|        prediction|
+--------------------+----+------------------+
|[0.00632,18.0,2.3...|24.0|29.904650666609765|
|[0.01301,35.0,1.5...|32.7|30.203349615990472|
|[0.01311,90.0,1.2...|35.4|31.070544103588183|
|[0.01501,80.0,2.0...|24.5| 27.28099638851585|
|[0.01501,90.0,1.2...|50.0|44.454123661919155|
|[0.01538,90.0,3.7...|44.0| 36.99040555914256|
|[0.01951,17.5,1.3...|33.0| 23.34436048725287|
|[0.02498,0.0,1.89...|16.5|22.300349403443565|
|[0.0351,95.0,2.68...|48.5|  41.7235728930852|
|[0.03578,20.0,3.3...|45.4|38.407271625637364|
|[0.03615,80.0,4.9...|27.9| 31.76486875105664|
|[0.03932,0.0,3.41...|22.0|27.455318187555182|
|[0.04203,28.0,15....|22.9|28.473308840385805|
|[0.04462,25.0,4.8...|23.9| 26.63151640124533|
|[0.04527,0.0,11.9...|20.6|22.712158033154786|
|[0.04741,0.0,11.9...|11.9|22.490913456322026|
|[0.04932,33.0,2.1...|28.2| 32.65470919141108|
|[0.05059,0.0,4.49...|23.9| 25.06777349412058|
|[0.05083,0.0

In [None]:
#coefficient of the regression model
coeff = regressor.coefficients

#X and Y intercept
intr = regressor.intercept

print ("The coefficient of the model is : %a" %coeff)
print ("The Intercept of the model is : %f" %intr)

The coefficient of the model is : DenseVector([-0.1146, 0.0417, -0.0283, 2.7084, -13.675, 4.1931, -0.0154, -1.5275, 0.2733, -0.0119, -0.832, 0.0097, -0.4597])
The Intercept of the model is : 30.675181


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol="medv", predictionCol="prediction", metricName="rmse")

# Root Mean Square Error
rmse = eval.evaluate(pred.predictions)
print("RMSE: %.3f" % rmse)

# Mean Square Error
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

RMSE: 5.345
MSE: 28.571
MAE: 3.796
r2: 0.727
