### 1. Installation

In [None]:
# Run below commands in google colab
# install Java8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# download spark3.0.0
!wget -q http://apache.osuosl.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
# unzip it
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
# install findspark 
!pip install -q findspark


Download and install Apache Spark 3.2.1, along with its dependencies (Java Development Kit 8+)

Set Environment Variables to make the Spark runtime visible to Linux OS. Please, note that you can manage multiple versions of spark by pointing to the correct version through environment variables. Run below set of commands to point to Apache Spark 3.2.1 version downloaded earlier.

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

Test the installation worked successfully.

In [None]:
import findspark
findspark.init()

In [None]:
# Verify the Spark version running on the virtual cluster
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()

assert  "3." in sc.version, "Verify that the cluster Spark's version is 3.x"

If the environment is not configured properly, the instruction will show the Spark's version that would not be the 3.0.

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession(sc)
print(spark)

<pyspark.sql.session.SparkSession object at 0x7fe35a1ca950>


In [None]:
#Collegamento al Drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
#importing libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns; sns.set()

#Pyspark

In [None]:
with open("/content/drive/MyDrive/KPMG/RegressionDataset.xlsx", 'rb') as fp:
    df = pd.read_excel(fp)


In [None]:
df

Unnamed: 0,AnnoAccademico,Ateneo,PopolazioneP,AteneiP,PopolazioneR,AteneiR,Fuorisede,Immatricolati
0,2010/2011,Aosta,126957,1,126957,1,31,188
1,2011/2012,Aosta,127153,1,127153,1,21,172
2,2012/2013,Aosta,127305,1,127305,1,31,173
3,2013/2014,Aosta,127951,1,127951,1,54,192
4,2014/2015,Aosta,128245,1,128245,1,52,181
...,...,...,...,...,...,...,...,...
644,2016/2017,Verona,918050,1,4890648,4,2509,4553
645,2017/2018,Verona,918069,1,4883373,4,2477,4484
646,2018/2019,Verona,919179,1,4880936,4,2098,3874
647,2019/2020,Verona,922857,1,4884590,4,2082,3785


In [None]:
df=df.to_dict('records')

In [None]:
import json

path = "/content/drive/MyDrive/KPMG/"

with open(path + 'RegressionDataset.json', 'w') as jsonfile:
    json.dump(df, jsonfile)

In [None]:
# Read JSON file into dataframe
path = "/content/drive/MyDrive/KPMG/"

df_spark = spark.read.json(path + 'RegressionDataset.json')
df_spark.printSchema()
df_spark.show()

root
 |-- AnnoAccademico: string (nullable = true)
 |-- AteneiP: long (nullable = true)
 |-- AteneiR: long (nullable = true)
 |-- Ateneo: string (nullable = true)
 |-- Fuorisede: long (nullable = true)
 |-- Immatricolati: long (nullable = true)
 |-- PopolazioneP: long (nullable = true)
 |-- PopolazioneR: long (nullable = true)

+--------------+-------+-------+------+---------+-------------+------------+------------+
|AnnoAccademico|AteneiP|AteneiR|Ateneo|Fuorisede|Immatricolati|PopolazioneP|PopolazioneR|
+--------------+-------+-------+------+---------+-------------+------------+------------+
|     2010/2011|      1|      1| Aosta|       31|          188|      126957|      126957|
|     2011/2012|      1|      1| Aosta|       21|          172|      127153|      127153|
|     2012/2013|      1|      1| Aosta|       31|          173|      127305|      127305|
|     2013/2014|      1|      1| Aosta|       54|          192|      127951|      127951|
|     2014/2015|      1|      1| Aosta| 

In [None]:
df_spark=df_spark.na.drop()


In [None]:
from pyspark.sql.functions import col, split
df = df_spark.withColumn("Ateneo_Array", split(col("Ateneo")," "))
df = df.withColumn('AnnoAccademico_Array', split(col("AnnoAccademico")," "))
df.show()

+--------------+-------+-------+------+---------+-------------+------------+------------+------------+--------------------+
|AnnoAccademico|AteneiP|AteneiR|Ateneo|Fuorisede|Immatricolati|PopolazioneP|PopolazioneR|Ateneo_Array|AnnoAccademico_Array|
+--------------+-------+-------+------+---------+-------------+------------+------------+------------+--------------------+
|     2010/2011|      1|      1| Aosta|       31|          188|      126957|      126957|     [Aosta]|         [2010/2011]|
|     2011/2012|      1|      1| Aosta|       21|          172|      127153|      127153|     [Aosta]|         [2011/2012]|
|     2012/2013|      1|      1| Aosta|       31|          173|      127305|      127305|     [Aosta]|         [2012/2013]|
|     2013/2014|      1|      1| Aosta|       54|          192|      127951|      127951|     [Aosta]|         [2013/2014]|
|     2014/2015|      1|      1| Aosta|       52|          181|      128245|      128245|     [Aosta]|         [2014/2015]|
|     20

In [None]:
#Import Spark CountVectorizer
from pyspark.ml.feature import CountVectorizer

# Initialize a CountVectorizer.
colorVectorizer = CountVectorizer(inputCol="Ateneo_Array", outputCol="Ateneo_OneHotEncoded", vocabSize=4, minDF=1.0)

#Get a VectorizerModel
colorVectorizer_model = colorVectorizer.fit(df)
df_ohe = colorVectorizer_model.transform(df)

# Initialize a CountVectorizer.
colorVectorizer = CountVectorizer(inputCol="AnnoAccademico_Array", outputCol="AnnoAccademico_OneHotEncoded", vocabSize=4, minDF=1.0)

#Get a VectorizerModel
colorVectorizer_model = colorVectorizer.fit(df)
df_ohe = colorVectorizer_model.transform(df_ohe)
df_ohe.show(truncate=False)


+--------------+-------+-------+------+---------+-------------+------------+------------+------------+--------------------+--------------------+----------------------------+
|AnnoAccademico|AteneiP|AteneiR|Ateneo|Fuorisede|Immatricolati|PopolazioneP|PopolazioneR|Ateneo_Array|AnnoAccademico_Array|Ateneo_OneHotEncoded|AnnoAccademico_OneHotEncoded|
+--------------+-------+-------+------+---------+-------------+------------+------------+------------+--------------------+--------------------+----------------------------+
|2010/2011     |1      |1      |Aosta |31       |188          |126957      |126957      |[Aosta]     |[2010/2011]         |(4,[],[])           |(4,[1],[1.0])               |
|2011/2012     |1      |1      |Aosta |21       |172          |127153      |127153      |[Aosta]     |[2011/2012]         |(4,[],[])           |(4,[],[])                   |
|2012/2013     |1      |1      |Aosta |31       |173          |127305      |127305      |[Aosta]     |[2012/2013]         |(4,[],[

In [None]:
df_ohe=df_ohe.drop('Ateneo')
df_ohe=df_ohe.drop('AnnoAccademico')
df_ohe=df_ohe.drop('Ateneo_Array')
df_ohe=df_ohe.drop('AnnoAccademico_Array')

In [None]:
from pyspark.ml.feature import VectorAssembler,StringIndexer
from pyspark.ml.feature import StandardScaler

columns=df_ohe.columns
columns.remove('Immatricolati')
assembler=VectorAssembler(inputCols=columns, outputCol='features_prescaled')
df=assembler.transform(df_ohe)

# Normalizing each feature to have unit standard deviation
scaler = StandardScaler(inputCol="features_prescaled", outputCol="features", withStd=True, withMean=False)
scalerModel = scaler.fit(df)
# Normalize each feature to have unit standard deviation.
df = scalerModel.transform(df)


stringIndexer=StringIndexer(inputCol='Immatricolati',outputCol='label')
si_model=stringIndexer.fit(df)
df=si_model.transform(df)

In [None]:
df=df.drop('features_prescaled')
df.show()

+-------+-------+---------+-------------+------------+------------+--------------------+----------------------------+--------------------+-----+
|AteneiP|AteneiR|Fuorisede|Immatricolati|PopolazioneP|PopolazioneR|Ateneo_OneHotEncoded|AnnoAccademico_OneHotEncoded|            features|label|
+-------+-------+---------+-------------+------------+------------+--------------------+----------------------------+--------------------+-----+
|      1|      1|       31|          188|      126957|      126957|           (4,[],[])|               (4,[1],[1.0])|(13,[0,1,2,3,4,10...|173.0|
|      1|      1|       21|          172|      127153|      127153|           (4,[],[])|                   (4,[],[])|(13,[0,1,2,3,4],[...|157.0|
|      1|      1|       31|          173|      127305|      127305|           (4,[],[])|                   (4,[],[])|(13,[0,1,2,3,4],[...|159.0|
|      1|      1|       54|          192|      127951|      127951|           (4,[],[])|                   (4,[],[])|(13,[0,1,2,3,

#Split

In [None]:
df_ml=df

In [None]:
(trainingData, testData) = df_ml.randomSplit([0.75, 0.25])
(partialTrainingData, validationData) = trainingData.randomSplit([0.75, 0.25])

#Linear regressor


In [None]:
from pyspark.ml.regression import LinearRegression

linear_regressor = LinearRegression(featuresCol = 'features', labelCol = 'label')

#Learn to fit the model from training set
regressor = linear_regressor.fit(partialTrainingData)

#To predict the prices on testing set
pred = regressor.evaluate(validationData)

#Predict the model
pred.predictions.show()



+-------+-------+---------+-------------+------------+------------+--------------------+----------------------------+--------------------+-----+------------------+
|AteneiP|AteneiR|Fuorisede|Immatricolati|PopolazioneP|PopolazioneR|Ateneo_OneHotEncoded|AnnoAccademico_OneHotEncoded|            features|label|        prediction|
+-------+-------+---------+-------------+------------+------------+--------------------+----------------------------+--------------------+-----+------------------+
|      1|      1|       31|          173|      127305|      127305|           (4,[],[])|                   (4,[],[])|(13,[0,1,2,3,4],[...|159.0|  310.877691403365|
|      1|      1|       54|          192|      127951|      127951|           (4,[],[])|                   (4,[],[])|(13,[0,1,2,3,4],[...|  0.0| 310.7237868492553|
|      1|      1|       70|          185|      127972|      127972|           (4,[],[])|               (4,[3],[1.0])|(13,[0,1,2,3,4,12...|  8.0|319.83710241691944|
|      1|      1

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

# Root Mean Square Error
rmse = eval.evaluate(pred.predictions)
print("RMSE: %.3f" % rmse)

# Mean Square Error
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)



RMSE: 187.342
MSE: 35097.094
MAE: 159.654
r2: -0.021


test


In [None]:
from pyspark.ml.regression import LinearRegression

linear_regressor = LinearRegression(featuresCol = 'features', labelCol = 'label')

#Learn to fit the model from training set
regressor = linear_regressor.fit(partialTrainingData)

#To predict the prices on testing set
pred = regressor.evaluate(testData)

#Predict the model
pred.predictions.show()

from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

# Root Mean Square Error
rmse = eval.evaluate(pred.predictions)
print("RMSE: %.3f" % rmse)

# Mean Square Error
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)



+-------+-------+---------+-------------+------------+------------+--------------------+----------------------------+--------------------+-----+------------------+
|AteneiP|AteneiR|Fuorisede|Immatricolati|PopolazioneP|PopolazioneR|Ateneo_OneHotEncoded|AnnoAccademico_OneHotEncoded|            features|label|        prediction|
+-------+-------+---------+-------------+------------+------------+--------------------+----------------------------+--------------------+-----+------------------+
|      1|      1|       31|          188|      126957|      126957|           (4,[],[])|               (4,[1],[1.0])|(13,[0,1,2,3,4,10...|173.0| 291.4931916150157|
|      1|      1|       47|          166|      127030|      127030|           (4,[],[])|               (4,[0],[1.0])|(13,[0,1,2,3,4,9]...|148.0|357.86035078004204|
|      1|      1|       59|          185|      126677|      126677|           (4,[],[])|                   (4,[],[])|(13,[0,1,2,3,4],[...|  8.0| 310.7085623172335|
|      1|      1

Train - Test


In [None]:
from pyspark.ml.regression import LinearRegression
regressor = LinearRegression(featuresCol = 'features', labelCol = 'label')

linear_regressor = LinearRegression(featuresCol = 'features', labelCol = 'label')

#Learn to fit the model from training set
regressor = linear_regressor.fit(trainingData)

#To predict the prices on testing set
pred = regressor.evaluate(validationData)

#Predict the model
pred.predictions.show()



+-------+-------+---------+-------------+------------+------------+--------------------+----------------------------+--------------------+-----+------------------+
|AteneiP|AteneiR|Fuorisede|Immatricolati|PopolazioneP|PopolazioneR|Ateneo_OneHotEncoded|AnnoAccademico_OneHotEncoded|            features|label|        prediction|
+-------+-------+---------+-------------+------------+------------+--------------------+----------------------------+--------------------+-----+------------------+
|      1|      1|       31|          188|      126957|      126957|           (4,[],[])|               (4,[1],[1.0])|(13,[0,1,2,3,4,10...|173.0|290.43018123661557|
|      1|      1|       47|          166|      127030|      127030|           (4,[],[])|               (4,[0],[1.0])|(13,[0,1,2,3,4,9]...|148.0|342.85855379670244|
|      1|      1|       59|          185|      126677|      126677|           (4,[],[])|                   (4,[],[])|(13,[0,1,2,3,4],[...|  8.0|292.60501927110386|
|      1|      1

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

# Root Mean Square Error
rmse = eval.evaluate(pred.predictions)
print("RMSE: %.3f" % rmse)

# Mean Square Error
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)



RMSE: 189.441
MSE: 35888.034
MAE: 163.081
r2: -0.024


#Random Forest Regressor


In [None]:
import time
start = time.time()

from pyspark.ml.regression import RandomForestRegressor

lr = RandomForestRegressor(featuresCol = 'features', labelCol = 'label', numTrees=1000, maxDepth=3)
lr_model = lr.fit(partialTrainingData)

end = time.time()
print(format(end-start))

predictions = lr_model.transform(validationData)
predictions.select("prediction", "label", "features").show()

2.8430683612823486
+------------------+-----+--------------------+
|        prediction|label|            features|
+------------------+-----+--------------------+
|173.69238278386763|173.0|(13,[0,1,2,3,4,10...|
|221.80013167439577|613.0|(13,[0,1,2,3,4],[...|
|223.70908766424952|610.0|(13,[0,1,2,3,4],[...|
|180.89191235313706|616.0|(13,[0,1,2,3,4],[...|
|180.89191235313706|600.0|(13,[0,1,2,3,4],[...|
|174.28983613442313| 38.0|(13,[0,1,2,3,4],[...|
|278.04577622752214|320.0|(13,[0,1,2,3,4],[...|
|277.93524163226994|400.0|(13,[0,1,2,3,4,11...|
| 286.1207964352287|450.0|(13,[0,1,2,3,4,12...|
|289.52771747365273|401.0|(13,[0,1,2,3,4,10...|
| 292.2062887099926|448.0|(13,[0,1,2,3,4,9]...|
| 296.2296780127857|440.0|(13,[0,1,2,3,4],[...|
| 301.8683585399289|490.0|(13,[0,1,2,3,4,10...|
|271.55529518740076|  6.0|(13,[0,1,2,3,4],[...|
| 272.1492637565533|181.0|(13,[0,1,2,3,4,10...|
|249.58509417588948|586.0|(13,[0,1,2,3,4],[...|
| 247.5485671410971|582.0|(13,[0,1,2,3,4,11...|
| 275.8296383263944|1

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

eval = RegressionEvaluator(labelCol="label",predictionCol="prediction", metricName="rmse")
rmse = eval.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g"% rmse)

mse = eval.evaluate(predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

Root Mean Squared Error (RMSE) on test data = 170.279
MSE: 28994.827
MAE: 139.223
r2: 0.127


Test

In [None]:
predictions = lr_model.transform(testData)
predictions.select("prediction", "label", "features").show()

from pyspark.ml.evaluation import RegressionEvaluator

eval = RegressionEvaluator(labelCol="label",predictionCol="prediction", metricName="rmse")
rmse = eval.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g"% rmse)

mse = eval.evaluate(predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

+------------------+-----+--------------------+
|        prediction|label|            features|
+------------------+-----+--------------------+
|172.78957504324137|168.0|(13,[0,1,2,3,4],[...|
|172.78957504324137|  0.0|(13,[0,1,2,3,4],[...|
|161.27459352914568|175.0|(13,[0,1,2,3,4,11...|
|165.15282186884954|  8.0|(13,[0,1,2,3,4,12...|
|276.95334786621265| 36.0|(13,[0,1,2,3,4,10...|
|275.93291433666656|604.0|(13,[0,1,2,3,4,12...|
| 221.0184239277235|599.0|(13,[0,1,2,3,4,11...|
|183.89616794437455|605.0|(13,[0,1,2,3,4],[...|
|182.43398326187614| 59.0|(13,[0,1,2,3,4,10...|
|174.69678743333574|611.0|(13,[0,1,2,3,4,12...|
| 197.8780377677727|617.0|(13,[0,1,2,3,4,9]...|
| 277.7763163329844|294.0|(13,[0,1,2,3,4,10...|
| 286.2743210536742|444.0|(13,[0,1,2,3,4],[...|
|287.19518242464295|459.0|(13,[0,1,2,3,4,11...|
|  287.398088314558|297.0|(13,[0,1,2,3,4],[...|
|296.55402826223053|453.0|(13,[0,1,2,3,4],[...|
| 296.2296780127857|458.0|(13,[0,1,2,3,4],[...|
| 307.3684990474598|436.0|(13,[0,1,2,3,4

Train - Test

In [None]:
import time
start = time.time()


from pyspark.ml.regression import RandomForestRegressor

lr = RandomForestRegressor(featuresCol = 'features', labelCol = 'label', numTrees=1000, maxDepth=3)
lr_model = lr.fit(trainingData)

end = time.time()
print(format(end-start))

predictions = lr_model.transform(testData)
predictions.select("prediction", "label", "features").show()

from pyspark.ml.evaluation import RegressionEvaluator

eval = RegressionEvaluator(labelCol="label",predictionCol="prediction", metricName="rmse")
rmse = eval.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g"% rmse)

mse = eval.evaluate(predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

4.897110223770142
+------------------+-----+--------------------+
|        prediction|label|            features|
+------------------+-----+--------------------+
|240.28697807014234|168.0|(13,[0,1,2,3,4],[...|
|240.28697807014234|  0.0|(13,[0,1,2,3,4],[...|
|239.50470249908315|175.0|(13,[0,1,2,3,4,11...|
|242.93406133695217|  8.0|(13,[0,1,2,3,4,12...|
|289.99431102598726| 36.0|(13,[0,1,2,3,4,10...|
| 291.0242990601009|604.0|(13,[0,1,2,3,4,12...|
|291.65379333394156|599.0|(13,[0,1,2,3,4,11...|
| 262.6992196413562|605.0|(13,[0,1,2,3,4],[...|
| 258.4743911992622| 59.0|(13,[0,1,2,3,4,10...|
|249.41815290801708|611.0|(13,[0,1,2,3,4,12...|
| 250.2905376704807|617.0|(13,[0,1,2,3,4,9]...|
|291.19052336263394|294.0|(13,[0,1,2,3,4,10...|
|296.38671541388754|444.0|(13,[0,1,2,3,4],[...|
| 301.2702379310147|459.0|(13,[0,1,2,3,4,11...|
|  298.309658584582|297.0|(13,[0,1,2,3,4],[...|
| 306.9126624891273|453.0|(13,[0,1,2,3,4],[...|
| 305.6371420924116|458.0|(13,[0,1,2,3,4],[...|
|316.93590790421194|43

#GBT Regressor

In [None]:
import time
start = time.time()

from pyspark.ml.regression import GBTRegressor

lr = GBTRegressor(featuresCol = "features", labelCol="label", maxIter=100, stepSize=0.01)
lr_model = lr.fit(partialTrainingData)

end = time.time()
print(format(end-start))

predictions = lr_model.transform(validationData)
predictions.select("prediction", "label", "features").show()

29.012925386428833
+------------------+-----+--------------------+
|        prediction|label|            features|
+------------------+-----+--------------------+
|16.138370558288752|173.0|(13,[0,1,2,3,4,10...|
| 273.1133849239007|613.0|(13,[0,1,2,3,4],[...|
| 273.1133849239007|610.0|(13,[0,1,2,3,4],[...|
| 50.41307825760068|616.0|(13,[0,1,2,3,4],[...|
| 50.41307825760068|600.0|(13,[0,1,2,3,4],[...|
| 66.57510107286966| 38.0|(13,[0,1,2,3,4],[...|
|305.06129154248293|320.0|(13,[0,1,2,3,4],[...|
| 263.4387381955056|400.0|(13,[0,1,2,3,4,11...|
|292.10923731714536|450.0|(13,[0,1,2,3,4,12...|
| 270.0312217010542|401.0|(13,[0,1,2,3,4,10...|
|297.06403121037664|448.0|(13,[0,1,2,3,4,9]...|
|251.33943165858898|440.0|(13,[0,1,2,3,4],[...|
| 355.9847384548606|490.0|(13,[0,1,2,3,4,10...|
|177.04953370350998|  6.0|(13,[0,1,2,3,4],[...|
|147.19810673979163|181.0|(13,[0,1,2,3,4,10...|
|262.77521999848955|586.0|(13,[0,1,2,3,4],[...|
|256.03140308439276|582.0|(13,[0,1,2,3,4,11...|
| 173.8560225789993|1

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

eval = RegressionEvaluator(labelCol="label",predictionCol="prediction", metricName="rmse")
rmse = eval.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g"% rmse)

mse = eval.evaluate(predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

Root Mean Squared Error (RMSE) on test data = 160.776
MSE: 25848.762
MAE: 104.378
r2: 0.222


Test

In [None]:
predictions = lr_model.transform(testData)
predictions.select("prediction", "label", "features").show()

from pyspark.ml.evaluation import RegressionEvaluator

eval = RegressionEvaluator(labelCol="label",predictionCol="prediction", metricName="rmse")
rmse = eval.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g"% rmse)

mse = eval.evaluate(predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

+------------------+-----+--------------------+
|        prediction|label|            features|
+------------------+-----+--------------------+
| 71.80461393093287|168.0|(13,[0,1,2,3,4],[...|
| 71.80461393093287|  0.0|(13,[0,1,2,3,4],[...|
| 52.38389559831317|175.0|(13,[0,1,2,3,4,11...|
| 59.12771251240994|  8.0|(13,[0,1,2,3,4,12...|
|369.04045291308495| 36.0|(13,[0,1,2,3,4,10...|
| 399.2313766902551|604.0|(13,[0,1,2,3,4,12...|
| 266.3695680098039|599.0|(13,[0,1,2,3,4,11...|
|42.321006588229885|605.0|(13,[0,1,2,3,4],[...|
| 6.111030969358688| 59.0|(13,[0,1,2,3,4,10...|
| 66.57510107286966|611.0|(13,[0,1,2,3,4,12...|
|139.44669588742792|617.0|(13,[0,1,2,3,4,9]...|
| 230.0816914200024|294.0|(13,[0,1,2,3,4,10...|
|292.10923731714536|444.0|(13,[0,1,2,3,4],[...|
|298.80105879492504|459.0|(13,[0,1,2,3,4,11...|
|244.94855782788093|297.0|(13,[0,1,2,3,4],[...|
|251.33943165858898|453.0|(13,[0,1,2,3,4],[...|
|251.33943165858898|458.0|(13,[0,1,2,3,4],[...|
| 409.7920361825685|436.0|(13,[0,1,2,3,4

Retrain

In [None]:
import time
start = time.time()

from pyspark.ml.regression import GBTRegressor

lr = GBTRegressor(featuresCol = "features", labelCol="label", maxIter=100, stepSize=0.01)
lr_model = lr.fit(trainingData)

end = time.time()
print(format(end-start))

predictions = lr_model.transform(testData)
predictions.select("prediction", "label", "features").show()

29.79860782623291
+------------------+-----+--------------------+
|        prediction|label|            features|
+------------------+-----+--------------------+
|148.86544248297773|168.0|(13,[0,1,2,3,4],[...|
|148.86544248297773|  0.0|(13,[0,1,2,3,4],[...|
|57.341813326368055|175.0|(13,[0,1,2,3,4,11...|
| 72.13322223074542|  8.0|(13,[0,1,2,3,4,12...|
| 357.4599696924289| 36.0|(13,[0,1,2,3,4,10...|
| 383.2016903514941|604.0|(13,[0,1,2,3,4,12...|
|362.52249821408316|599.0|(13,[0,1,2,3,4,11...|
|   413.06875047959|605.0|(13,[0,1,2,3,4],[...|
|164.22659747343113| 59.0|(13,[0,1,2,3,4,10...|
|152.32146174455463|611.0|(13,[0,1,2,3,4,12...|
|152.32146174455463|617.0|(13,[0,1,2,3,4,9]...|
|251.08548699971206|294.0|(13,[0,1,2,3,4,10...|
|223.98985211849543|444.0|(13,[0,1,2,3,4],[...|
| 332.9397672288519|459.0|(13,[0,1,2,3,4,11...|
| 280.2734628999176|297.0|(13,[0,1,2,3,4],[...|
| 332.9397672288519|453.0|(13,[0,1,2,3,4],[...|
| 332.9397672288519|458.0|(13,[0,1,2,3,4],[...|
|446.30180479238595|43

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

eval = RegressionEvaluator(labelCol="label",predictionCol="prediction", metricName="rmse")
rmse = eval.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g"% rmse)

mse = eval.evaluate(predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

Root Mean Squared Error (RMSE) on test data = 145.937
MSE: 21297.622
MAE: 101.411
r2: 0.391


#TRAINING FINALE


In [None]:
lr = GBTRegressor(featuresCol = "features", labelCol="label", maxIter=100, stepSize=0.01)
lr_model = lr.fit(df)

#Testing

In [None]:
with open("/content/drive/MyDrive/KPMG/RegressionDatasetTest_finale.xlsx", 'rb') as fp:
    df = pd.read_excel(fp)

In [None]:
df=df.to_dict('records')

In [None]:
import json

path = "/content/drive/MyDrive/KPMG/"

with open(path + 'RegressionDataset_Test.json', 'w') as jsonfile:
    json.dump(df, jsonfile)

In [None]:
# Read JSON file into dataframe
path = "/content/drive/MyDrive/KPMG/"

df_spark = spark.read.json(path + 'RegressionDataset_Test.json')
df_spark.printSchema()
df_spark.show()

root
 |-- AnnoAccademico: string (nullable = true)
 |-- AteneiP: long (nullable = true)
 |-- AteneiR: long (nullable = true)
 |-- Ateneo: string (nullable = true)
 |-- FuoriSede: long (nullable = true)
 |-- Immatricolati: long (nullable = true)
 |-- PopolazioneP: long (nullable = true)
 |-- PopolazioneR: long (nullable = true)

+--------------+-------+-------+----------------+---------+-------------+------------+------------+
|AnnoAccademico|AteneiP|AteneiR|          Ateneo|FuoriSede|Immatricolati|PopolazioneP|PopolazioneR|
+--------------+-------+-------+----------------+---------+-------------+------------+------------+
|     2021/2022|      1|      1|           Aosta|       77|          246|      123041|      123704|
|     2022/2023|      1|      1|           Aosta|       81|          248|      122745|      123408|
|     2023/2024|      1|      1|           Aosta|       85|          250|      122449|      123112|
|     2024/2025|      1|      1|           Aosta|       89|          2

In [None]:
df_spark=df_spark.na.drop()

In [None]:
from pyspark.sql.functions import col, split
df = df_spark.withColumn("Ateneo_Array", split(col("Ateneo")," "))
df.show()

+--------------+-------+-------+----------------+---------+-------------+------------+------------+-------------------+
|AnnoAccademico|AteneiP|AteneiR|          Ateneo|FuoriSede|Immatricolati|PopolazioneP|PopolazioneR|       Ateneo_Array|
+--------------+-------+-------+----------------+---------+-------------+------------+------------+-------------------+
|     2021/2022|      1|      1|           Aosta|       77|          246|      123041|      123704|            [Aosta]|
|     2022/2023|      1|      1|           Aosta|       81|          248|      122745|      123408|            [Aosta]|
|     2023/2024|      1|      1|           Aosta|       85|          250|      122449|      123112|            [Aosta]|
|     2024/2025|      1|      1|           Aosta|       89|          252|      122153|      122816|            [Aosta]|
|     2025/2026|      1|      1|           Aosta|       93|          254|      121857|      122520|            [Aosta]|
|     2021/2022|      2|      3|        

In [None]:
from pyspark.sql.functions import col, split
df = df_spark.withColumn("Ateneo_Array", split(col("Ateneo")," "))
df = df.withColumn('AnnoAccademico_Array', split(col("AnnoAccademico")," "))
df.show()

#Import Spark CountVectorizer
from pyspark.ml.feature import CountVectorizer

# Initialize a CountVectorizer.
colorVectorizer = CountVectorizer(inputCol="Ateneo_Array", outputCol="Ateneo_OneHotEncoded", vocabSize=4, minDF=1.0)

#Get a VectorizerModel
colorVectorizer_model = colorVectorizer.fit(df)

df_ohe = colorVectorizer_model.transform(df)
df_ohe.show(truncate=False)



+--------------+-------+-------+----------------+---------+-------------+------------+------------+-------------------+--------------------+
|AnnoAccademico|AteneiP|AteneiR|          Ateneo|FuoriSede|Immatricolati|PopolazioneP|PopolazioneR|       Ateneo_Array|AnnoAccademico_Array|
+--------------+-------+-------+----------------+---------+-------------+------------+------------+-------------------+--------------------+
|     2021/2022|      1|      1|           Aosta|       77|          246|      123041|      123704|            [Aosta]|         [2021/2022]|
|     2022/2023|      1|      1|           Aosta|       81|          248|      122745|      123408|            [Aosta]|         [2022/2023]|
|     2023/2024|      1|      1|           Aosta|       85|          250|      122449|      123112|            [Aosta]|         [2023/2024]|
|     2024/2025|      1|      1|           Aosta|       89|          252|      122153|      122816|            [Aosta]|         [2024/2025]|
|     2025/20

In [None]:
#Import Spark CountVectorizer
from pyspark.ml.feature import CountVectorizer

# Initialize a CountVectorizer.
colorVectorizer = CountVectorizer(inputCol="AnnoAccademico_Array", outputCol="AnnoAccademico_OneHotEncoded", vocabSize=4, minDF=1.0)

#Get a VectorizerModel
colorVectorizer_model = colorVectorizer.fit(df)

df_ohe = colorVectorizer_model.transform(df_ohe)
df_ohe.show(truncate=False)

+--------------+-------+-------+----------------+---------+-------------+------------+------------+-------------------+--------------------+--------------------+----------------------------+
|AnnoAccademico|AteneiP|AteneiR|Ateneo          |FuoriSede|Immatricolati|PopolazioneP|PopolazioneR|Ateneo_Array       |AnnoAccademico_Array|Ateneo_OneHotEncoded|AnnoAccademico_OneHotEncoded|
+--------------+-------+-------+----------------+---------+-------------+------------+------------+-------------------+--------------------+--------------------+----------------------------+
|2021/2022     |1      |1      |Aosta           |77       |246          |123041      |123704      |[Aosta]            |[2021/2022]         |(4,[],[])           |(4,[1],[1.0])               |
|2022/2023     |1      |1      |Aosta           |81       |248          |122745      |123408      |[Aosta]            |[2022/2023]         |(4,[],[])           |(4,[2],[1.0])               |
|2023/2024     |1      |1      |Aosta        

In [None]:
df_ohe=df_ohe.drop('Ateneo')
df_ohe=df_ohe.drop('AnnoAccademico')
df_ohe=df_ohe.drop('Ateneo_Array')
df_ohe=df_ohe.drop('AnnoAccademico_Array')

In [None]:
from pyspark.ml.feature import VectorAssembler,StringIndexer

columns=df_ohe.columns
columns.remove('Immatricolati')
assembler=VectorAssembler(inputCols=columns, outputCol='features_prescaled')
df=assembler.transform(df_ohe)

# Normalizing each feature to have unit standard deviation
scaler = StandardScaler(inputCol="features_prescaled", outputCol="features", withStd=True, withMean=False)
scalerModel = scaler.fit(df)
# Normalize each feature to have unit standard deviation.
df = scalerModel.transform(df)

stringIndexer=StringIndexer(inputCol='Immatricolati',outputCol='label')
si_model=stringIndexer.fit(df)
df=si_model.transform(df)

In [None]:
predictions = lr_model.transform(df)
predictions.select("prediction", "label", "features").show()

+------------------+-----+--------------------+
|        prediction|label|            features|
+------------------+-----+--------------------+
|54.383610330079485| 91.0|(13,[0,1,2,3,4,10...|
| 185.4469393195335| 93.0|(13,[0,1,2,3,4,11...|
| 51.94209601468587| 95.0|(13,[0,1,2,3,4,9]...|
| 51.94209601468587| 96.0|(13,[0,1,2,3,4],[...|
|  52.9577491748624| 98.0|(13,[0,1,2,3,4,12...|
|  529.555755325561|250.0|(13,[0,1,2,3,4,10...|
| 525.2748927221161|246.0|(13,[0,1,2,3,4,11...|
| 526.9773614497258|242.0|(13,[0,1,2,3,4,9]...|
| 525.2748927221161|241.0|(13,[0,1,2,3,4],[...|
| 513.9923196845755|239.0|(13,[0,1,2,3,4,12...|
|184.86776682960377| 63.0|(13,[0,1,2,3,4,8,...|
|200.29802181539995| 65.0|(13,[0,1,2,3,4,8,...|
|203.14213626700317| 67.0|(13,[0,1,2,3,4,8,...|
|212.45349987464326|  0.0|(13,[0,1,2,3,4,8]...|
| 222.8647988864094| 69.0|(13,[0,1,2,3,4,8,...|
| 134.2547326256147|281.0|(13,[0,1,2,3,4,10...|
|233.60602096569392|279.0|(13,[0,1,2,3,4,11...|
|227.15458328400751|278.0|(13,[0,1,2,3,4