## 1. Running Pyspark in Jupyter Notebook

In [1]:
# spark-3.2.2-bin-hadoop3.2
# install findspark using pip
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1




In [3]:
import findspark

findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

## 2. Analysis and Regression on Boston Housing Dataset

### 2.1 Importing the dataset

In [4]:
# import the boston housing dataset
dataset = spark.read.csv('data/BostonHousing.csv', header=True, inferSchema=True)

### 2.2 Data Exploration : Transformations

In [5]:
# Transformations
# Convert all the features from different columns into a single column
# Let's call this new vector column as 'Attributes' in the outputCol parameter
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

# Input all the features in one vector column
assembler = VectorAssembler(inputCols=['crim', 'zn', 'indus', 'chas', 'nox', 'rm', 'age', 'dis', 'rad', 'tax', 'ptratio', 'b', 'lstat'], outputCol='Attributes')

# Use the assembler object to transform the dataset
output = assembler.transform(dataset)

# Input vs Output
finalized_data = output.select('Attributes', 'medv')

finalized_data.show()

+--------------------+----+
|          Attributes|medv|
+--------------------+----+
|[0.00632,18.0,2.3...|24.0|
|[0.02731,0.0,7.07...|21.6|
|[0.02729,0.0,7.07...|34.7|
|[0.03237,0.0,2.18...|33.4|
|[0.06905,0.0,2.18...|36.2|
|[0.02985,0.0,2.18...|28.7|
|[0.08829,12.5,7.8...|22.9|
|[0.14455,12.5,7.8...|27.1|
|[0.21124,12.5,7.8...|16.5|
|[0.17004,12.5,7.8...|18.9|
|[0.22489,12.5,7.8...|15.0|
|[0.11747,12.5,7.8...|18.9|
|[0.09378,12.5,7.8...|21.7|
|[0.62976,0.0,8.14...|20.4|
|[0.63796,0.0,8.14...|18.2|
|[0.62739,0.0,8.14...|19.9|
|[1.05393,0.0,8.14...|23.1|
|[0.7842,0.0,8.14,...|17.5|
|[0.80271,0.0,8.14...|20.2|
|[0.7258,0.0,8.14,...|18.2|
+--------------------+----+
only showing top 20 rows



### 2.3 Splitting the dataset

In [6]:
# split training and testing data
train_data, test_data = finalized_data.randomSplit([0.8, 0.2])

### 2.4 Learn and predict the Linear Regression

In [7]:
regressor = LinearRegression(featuresCol='Attributes', labelCol='medv')

# Fit the model from the training data
regressor = regressor.fit(train_data)

# Predict the output for the test data
pred_results = regressor.evaluate(test_data)

# Predict the model
pred_results.predictions.show()



+--------------------+----+------------------+
|          Attributes|medv|        prediction|
+--------------------+----+------------------+
|[0.01311,90.0,1.2...|35.4|30.821593745716687|
|[0.01439,60.0,2.9...|29.1|31.437539965084618|
|[0.01501,90.0,1.2...|50.0| 44.22391666316062|
|[0.02763,75.0,2.9...|30.8| 31.13315749669333|
|[0.02875,28.0,15....|25.0| 29.22304824205157|
|[0.03049,55.0,3.7...|31.2| 28.43219932672701|
|[0.03306,0.0,5.19...|20.6|  22.2272722519759|
|[0.0351,95.0,2.68...|48.5| 41.40237453294464|
|[0.03705,20.0,3.3...|35.4| 34.25571742639445|
|[0.03738,0.0,5.19...|20.7|  21.6785733482478|
|[0.04379,80.0,3.3...|19.4|25.580952485457537|
|[0.04544,0.0,3.24...|19.8| 21.74344974781248|
|[0.0456,0.0,13.89...|23.3|26.939480648525834|
|[0.04684,0.0,3.41...|22.6|27.038073621716606|
|[0.05372,0.0,13.9...|27.1| 27.67283738153981|
|[0.05602,0.0,2.46...|50.0| 35.38451242537469|
|[0.05646,0.0,12.8...|21.2| 21.44889986808647|
|[0.06162,0.0,4.39...|17.2|15.025809064293554|
|[0.06617,0.0

### 2.5 Print the coefficients and intercept for linear regression

In [8]:
# Coefficient of the regression model
coeff = regressor.coefficients

# X and Y intercept
intercept = regressor.intercept

print('Coefficients: %s' % str(coeff))
print('Intercept: %s' % str(intercept))


Coefficients: [-0.1157945336358715,0.0432073246799142,0.03809939606334178,2.882274497724834,-18.168634075393037,3.454920153795976,0.003904420309647196,-1.4186836397036897,0.31902440449205643,-0.011921183209813328,-0.9718708057963296,0.010273693350855977,-0.559439071902172]
Intercept: 38.536223335602166


### 2.6 Evaluating the model

In [9]:
from pyspark.ml.evaluation import RegressionEvaluator

eval = RegressionEvaluator(labelCol='medv', predictionCol='prediction', metricName='rmse')

# Root Mean Square Error
rmse = eval.evaluate(pred_results.predictions, {eval.metricName: "rmse"})
print("Root Mean Square Error (RMSE) on test data = %g" % rmse)

# Mean Square Error
mse = eval.evaluate(pred_results.predictions, {eval.metricName: "mse"})
print('Root Mean Square Error: ', rmse)

# Mean Absolute Error
mae = eval.evaluate(pred_results.predictions, {eval.metricName: "mae"})
print('Mean Absolute Error: ', mae)

# R2
r2 = eval.evaluate(pred_results.predictions, {eval.metricName: "r2"})
print('R2: ', r2)

Root Mean Square Error (RMSE) on test data = 3.70757
Root Mean Square Error:  3.7075683189104818
Mean Absolute Error:  2.660222890548023
R2:  0.8284633487334137


### 2.7 Clustering the dataset

In [15]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Trains a k-means model.
kmeans = KMeans(featuresCol='Attributes').setK(2).setSeed(1)
model = kmeans.fit(finalized_data)

# Make predictions
predictions = model.transform(finalized_data)
predictions.show(500)

+--------------------+----+----------+
|          Attributes|medv|prediction|
+--------------------+----+----------+
|[0.00632,18.0,2.3...|24.0|         0|
|[0.02731,0.0,7.07...|21.6|         0|
|[0.02729,0.0,7.07...|34.7|         0|
|[0.03237,0.0,2.18...|33.4|         0|
|[0.06905,0.0,2.18...|36.2|         0|
|[0.02985,0.0,2.18...|28.7|         0|
|[0.08829,12.5,7.8...|22.9|         0|
|[0.14455,12.5,7.8...|27.1|         0|
|[0.21124,12.5,7.8...|16.5|         0|
|[0.17004,12.5,7.8...|18.9|         0|
|[0.22489,12.5,7.8...|15.0|         0|
|[0.11747,12.5,7.8...|18.9|         0|
|[0.09378,12.5,7.8...|21.7|         0|
|[0.62976,0.0,8.14...|20.4|         0|
|[0.63796,0.0,8.14...|18.2|         0|
|[0.62739,0.0,8.14...|19.9|         0|
|[1.05393,0.0,8.14...|23.1|         0|
|[0.7842,0.0,8.14,...|17.5|         0|
|[0.80271,0.0,8.14...|20.2|         0|
|[0.7258,0.0,8.14,...|18.2|         0|
|[1.25179,0.0,8.14...|13.6|         0|
|[0.85204,0.0,8.14...|19.6|         0|
|[1.23247,0.0,8.14...|15.

### 3. Churn(change and turn) analysis in Spark
#### 3.1 Loading the dataset



In [12]:
datasetcalls = spark.read.csv('data/callsData.csv', header=True, inferSchema=True)
datasetContract = spark.read.csv('data/contractData.csv', header=True, inferSchema=True)

datasetcalls.show()
datasetContract.show()



+-------------+--------+--------+----------+---------+--------------+---------+----------+---------+----------+-----------+------------+----------+-----------+---------+--------+
|VMail Message|Day Mins|Eve Mins|Night Mins|Intl Mins|CustServ Calls|Day Calls|Day Charge|Eve Calls|Eve Charge|Night Calls|Night Charge|Intl Calls|Intl Charge|Area Code|   Phone|
+-------------+--------+--------+----------+---------+--------------+---------+----------+---------+----------+-----------+------------+----------+-----------+---------+--------+
|           25|   265.1|   197.4|     244.7|     10.0|             1|      110|     45.07|       99|     16.78|         91|       11.01|         3|        2.7|      415|382-4657|
|           26|   161.6|   195.5|     254.4|     13.7|             1|      123|     27.47|      103|     16.62|        103|       11.45|         3|        3.7|      415|371-7191|
|            0|   243.4|   121.2|     162.6|     12.2|             0|      114|     41.38|      110|     