The primary business case for using this "Boston dataset" in the context of MLlib (Spark's machine learning library) is to predict the median value of owner-occupied homes (MEDV) in Boston suburbs based on various features of the homes and their neighborhoods.

In below each column are described:

CRIM-> Represents the crime rate per capita in each town.
ZN-> Indicates the proportion of residential land zoned for large lots.
INDUS-> Represents the proportion of land used for non -retail business purposes.
CHAS-> A binary variable indicating whether the tract bounds the Charles River.
NOX-> Measures the concentration of nitric oxides, indicating air pollution levels.
RM-> Represents the average number of rooms in each dwelling.
AGE->Indicates the proportion of owner-occupied units built before 1940, reflecting the age of the buildings.
DIS->Measures the weighted distance to five major employment centers in Boston.
RAD->An index measuring the accessibility of the area to radial highways.
TAX->Represents the property tax rate per $10,000 of full property value.
PTRATIO-> Indicates the ratio of students to teachers in each town’s schools.
B->A transformation of the proportion of Black residents in each town, used to measure demographic information.
LSTAT-> Represents the percentage of the population considered to have a lower socioeconomic status.
MEDV-> The target variable, representing the median value of owner-occupied homes in $1000s.


In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=835b73c1bf6541b9895921b79325f8bec757ce1a8905067065d0db420acd08f9
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


"pip install pyspark" is used to install the PySpark library using the Python package manager pip.

In [2]:
import os
os.environ["JAVA_HOME"]="/lib/jvm/java-11-openjdk-amd64"

 The 'os' module in Python provides a way to interact with the operating system. It includes functions to read or write to the file system, manage environment variables, and more.

 'os.environ' is a dictionary in Python that contains the environment variables of the current process.

In [3]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

SparkConf: A class used to configure Spark applications. It allows you to set various parameters for the Spark context, such as application name, master URL, and other settings.

SparkContext: This is the main entry point for Spark functionality. It represents the connection to a Spark cluster and can be used to create RDDs (Resilient Distributed Datasets) and perform operations on them.


In [4]:
conf = SparkConf().set('spark.ui.port', '4050').setAppName("films").setMaster("local[2]")
sc = SparkContext.getOrCreate(conf=conf)
sqlContext = SQLContext(sc)
#sc.stop()



SparkConf(): Creates a new SparkConf object, which holds configuration settings for your Spark application.

.set('spark.ui.port', '4050'): Sets the Spark UI port to 4050. The Spark UI is a web interface for monitoring and managing Spark jobs.



In [6]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [8]:
house_df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
inferschema='true').load('/content/drive/MyDrive/SME@DM/BDF/Boston.csv')
house_df.show()

+---+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+
|_c0|   crim|  zn|indus|chas|  nox|   rm|  age|   dis|rad|tax|ptratio| black|lstat|medv|
+---+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+
|  1|0.00632|18.0| 2.31|   0|0.538|6.575| 65.2|  4.09|  1|296|   15.3| 396.9| 4.98|24.0|
|  2|0.02731| 0.0| 7.07|   0|0.469|6.421| 78.9|4.9671|  2|242|   17.8| 396.9| 9.14|21.6|
|  3|0.02729| 0.0| 7.07|   0|0.469|7.185| 61.1|4.9671|  2|242|   17.8|392.83| 4.03|34.7|
|  4|0.03237| 0.0| 2.18|   0|0.458|6.998| 45.8|6.0622|  3|222|   18.7|394.63| 2.94|33.4|
|  5|0.06905| 0.0| 2.18|   0|0.458|7.147| 54.2|6.0622|  3|222|   18.7| 396.9| 5.33|36.2|
|  6|0.02985| 0.0| 2.18|   0|0.458| 6.43| 58.7|6.0622|  3|222|   18.7|394.12| 5.21|28.7|
|  7|0.08829|12.5| 7.87|   0|0.524|6.012| 66.6|5.5605|  5|311|   15.2| 395.6|12.43|22.9|
|  8|0.14455|12.5| 7.87|   0|0.524|6.172| 96.1|5.9505|  5|311|   15.2| 396.9|19.15|27.1|
|  9|0.21124|12.5| 7.

The code reads a CSV file from a specified path into a Spark DataFrame with automatic header detection and schema inference.

It then displays the content of the DataFrame to provide a preview of the data.

In [9]:
## Printing schema
house_df.printSchema()


root
 |-- _c0: integer (nullable = true)
 |-- crim: double (nullable = true)
 |-- zn: double (nullable = true)
 |-- indus: double (nullable = true)
 |-- chas: integer (nullable = true)
 |-- nox: double (nullable = true)
 |-- rm: double (nullable = true)
 |-- age: double (nullable = true)
 |-- dis: double (nullable = true)
 |-- rad: integer (nullable = true)
 |-- tax: integer (nullable = true)
 |-- ptratio: double (nullable = true)
 |-- black: double (nullable = true)
 |-- lstat: double (nullable = true)
 |-- medv: double (nullable = true)



In [10]:
## Descriptive analysis
df = house_df.toPandas()
df

Unnamed: 0,_c0,crim,zn,indus,chas,nox,rm,age,dis,rad,tax,ptratio,black,lstat,medv
0,1,0.00632,18.0,2.31,0,0.538,6.575,65.2,4.0900,1,296,15.3,396.90,4.98,24.0
1,2,0.02731,0.0,7.07,0,0.469,6.421,78.9,4.9671,2,242,17.8,396.90,9.14,21.6
2,3,0.02729,0.0,7.07,0,0.469,7.185,61.1,4.9671,2,242,17.8,392.83,4.03,34.7
3,4,0.03237,0.0,2.18,0,0.458,6.998,45.8,6.0622,3,222,18.7,394.63,2.94,33.4
4,5,0.06905,0.0,2.18,0,0.458,7.147,54.2,6.0622,3,222,18.7,396.90,5.33,36.2
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
501,502,0.06263,0.0,11.93,0,0.573,6.593,69.1,2.4786,1,273,21.0,391.99,9.67,22.4
502,503,0.04527,0.0,11.93,0,0.573,6.120,76.7,2.2875,1,273,21.0,396.90,9.08,20.6
503,504,0.06076,0.0,11.93,0,0.573,6.976,91.0,2.1675,1,273,21.0,396.90,5.64,23.9
504,505,0.10959,0.0,11.93,0,0.573,6.794,89.3,2.3889,1,273,21.0,393.45,6.48,22.0


In [11]:
type(df)

In [12]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['crim', 'zn', 'indus', 'chas', 'nox', 'rm', 'age', 'dis', 'rad', 'tax', 'ptratio', 'black', 'lstat'], outputCol = 'features')
vhouse_df = vectorAssembler.transform(house_df)
vhouse_df = vhouse_df.select(['features', 'medv'])
va = vhouse_df.show(3)
va

+--------------------+----+
|            features|medv|
+--------------------+----+
|[0.00632,18.0,2.3...|24.0|
|[0.02731,0.0,7.07...|21.6|
|[0.02729,0.0,7.07...|34.7|
+--------------------+----+
only showing top 3 rows



VectorAssembler: Combines multiple feature columns into a single vector column.

Transform: Applies this combination to the DataFrame.

Select: Chooses the features and medv columns.

Show: Displays the first 3 rows of the resulting DataFrame for preview.

In [13]:
splits = vhouse_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]
#train_df,test_df=vhouse_df.randomSplit([0.7,0.3])

Random Split: Divides the DataFrame into training and testing sets based on specified ratios.

train_df: Contains 70% of the data for training models.

test_df: Contains 30% of the data for testing and evaluating models.

In [14]:
from pyspark.ml.regression import LinearRegression

In [15]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='medv', maxIter=10)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-0.08576559038365843,0.05583354691529264,0.002950680839091643,1.8838603098446742,-14.003135975765161,3.2534792316909993,0.002708247744231599,-1.6194946794409517,0.3508543393564254,-0.017730329429780388,-1.0147881584165894,0.008262789796814083,-0.5772866271116525]
Intercept: 42.463522473909215


Linear Regression: Initializes and configures a linear regression model.

Fit: Trains the model on the training dataset.

Print: Outputs the model’s coefficients and intercept, which define the linear relationship between features and the target variable.

In [16]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 4.640095
r2: 0.747581


In [17]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","medv","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="medv",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+------------------+----+--------------------+
|        prediction|medv|            features|
+------------------+----+--------------------+
|30.866615531715723|24.0|[0.00632,18.0,2.3...|
| 32.07567114486248|29.1|[0.01439,60.0,2.9...|
| 25.25087367927878|23.1|[0.0187,85.0,4.15...|
| 32.45111254102451|31.1|[0.02187,60.0,2.9...|
|27.889120460067645|23.9|[0.02543,55.0,3.7...|
+------------------+----+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.708224


Predict: Generates predictions for the test dataset using the trained model.

Display: Shows the predictions alongside the actual values and features.

Evaluate: Computes the R-squared metric to assess the model's performance on the test data.

In [18]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()


numIterations: 0
objectiveHistory: [0.0]
+--------------------+
|           residuals|
+--------------------+
|   0.520627338083159|
|  -5.355723948877202|
|   2.495627747378471|
|  3.9153156134731404|
|    4.48776028044702|
|    8.60655292352051|
|  -2.355858289034572|
|  -3.457526230440724|
|   5.443815193055848|
|    5.94805615416481|
|   4.233651884667875|
|   2.636126874123846|
|  10.096422731862287|
|-0.04403452752777426|
|   6.037654468153008|
| -0.2977201546108148|
|   5.304375239690273|
|  -4.965709545617379|
| -1.4217397217718037|
|   5.237813583558129|
+--------------------+
only showing top 20 rows



numIterations: Prints the total number of iterations used in training.

objectiveHistory: Shows the history of the objective function values during training.

residuals: Displays the residuals of the training data to assess model fit.

In [19]:
predictions = lr_model.transform(test_df)
predictions.select("prediction","medv","features").show()

+------------------+----+--------------------+
|        prediction|medv|            features|
+------------------+----+--------------------+
|30.866615531715723|24.0|[0.00632,18.0,2.3...|
| 32.07567114486248|29.1|[0.01439,60.0,2.9...|
| 25.25087367927878|23.1|[0.0187,85.0,4.15...|
| 32.45111254102451|31.1|[0.02187,60.0,2.9...|
|27.889120460067645|23.9|[0.02543,55.0,3.7...|
|30.679249011980616|34.7|[0.02729,0.0,7.07...|
|25.325490863436695|21.6|[0.02731,0.0,7.07...|
|29.756966950368323|25.0|[0.02875,28.0,15....|
|15.845315284805256|17.5|[0.03113,0.0,4.39...|
|  22.9293715366828|20.6|[0.03306,0.0,5.19...|
|29.837795228233936|24.1|[0.03445,82.5,2.0...|
| 29.03989247064443|22.0|[0.03537,34.0,6.0...|
| 25.38217008447745|22.9|[0.03551,25.0,4.8...|
|39.232649840480676|45.4|[0.03578,20.0,3.3...|
| 34.70796807534117|34.6|[0.03768,80.0,1.5...|
| 36.24382339935137|33.3|[0.04011,80.0,1.5...|
|28.501188957698524|28.0|[0.04113,25.0,4.8...|
|29.437659599731653|22.9|[0.04203,28.0,15....|
|27.569087980

Transform: Uses the model to make predictions on the test dataset.

Select & Show: Displays the predicted values, actual values, and features to evaluate the model’s performance.

## Decision tree regression

In [None]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'medv')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(
    labelCol="medv", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [None]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'medv')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(
    labelCol="medv", predictionCol="prediction", metricName="r2")
r2 = dt_evaluator.evaluate(dt_predictions)
print("R2 on test data = %g" % r2)

In [None]:
lr_evaluator.evaluate(dt_predictions)

In [None]:
 dt_model.featureImportances

In [None]:
house_df.take(1)

## Gradient-boosted tree regression

In [None]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'medv', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('prediction', 'medv', 'features').show(5)

In [None]:
gbt_evaluator = RegressionEvaluator(
    labelCol="medv", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [None]:
gbt_evaluator = RegressionEvaluator(
    labelCol="medv", predictionCol="prediction", metricName="r2")
r2 = gbt_evaluator.evaluate(gbt_predictions)
print("R2 Score on test data = %g" % r2)