<a href="https://colab.research.google.com/github/SanzogniCarlo/Algorithms-For-Massive-Data/blob/main/AlgorithmForMassiveData.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz
!tar xf spark-3.1.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext("local", "Models")

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("American Survey Project") \
    .config("spark.executor.memory", "12gb") \
    .getOrCreate()

In [None]:
###load kaggle api file
from google.colab import files

uploaded = files.upload()

Saving kaggle.json to kaggle.json


In [None]:
!mkdir ~/.kaggle/
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
from kaggle.api.kaggle_api_extended import KaggleApi
api = KaggleApi()
api.authenticate()

In [None]:
###download dataset from kaggle
dataset=api.dataset_download_file(dataset='census/2013-american-community-survey', file_name='ss13husa.csv',quiet=False,force=True)
dataset=api.dataset_download_file(dataset='census/2013-american-community-survey', file_name='ss13husb.csv',quiet=False,force=True)
!unzip ss13husa.csv
!unzip ss13husb.csv

  0%|          | 0.00/134M [00:00<?, ?B/s]

Downloading ss13husa.csv.zip to /content


100%|██████████| 134M/134M [00:03<00:00, 41.5MB/s]





  0%|          | 0.00/128M [00:00<?, ?B/s]

Downloading ss13husb.csv.zip to /content


100%|██████████| 128M/128M [00:05<00:00, 26.2MB/s]



Archive:  ss13husa.csv.zip
  inflating: ss13husa.csv            
Archive:  ss13husb.csv.zip
  inflating: ss13husb.csv            


In [None]:
###read and join files
data=spark.read.csv('ss13husa.csv',header= True)
data1=spark.read.csv('ss13husb.csv',header= True)
data = data.union(data1)

In [None]:
import pyspark.sql.functions as sf
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler 
import numpy as np
from pyspark.ml.feature import PCA
from pyspark.sql import Row
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import MaxAbsScaler
from pyspark.sql.functions import col
from pyspark.sql.functions import count

In [None]:
#selecting only useful columns
df=data.select(data.columns[4:151])###dropping wgtp
df=df.drop('ST','HINCP','ADJHSG','ADJINC')##dropping adjustment factors and other objective variables

In [None]:
#checking no-null values by variable
def my_count(df_in):
    df_in.agg( *[ count(c).alias(c) for c in df_in.columns ] ).show()
my_count(df)

+-------+-------+-------+-------+-------+-------+------+-------+-------+-------+--------+-------+--------+-----+------+------+-------+-------+-------+-------+-------+--------+-------+------+-------+-----+------+------+------+------+------+--------+-------+-------+------+------+-------+-------+---------+-------+-----+-------+-------+-------+-------+------+------+-------+-------+-------+------+-------+------+-------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+-------+------+-------+-------+-------+-------+-------+-------+-------+------+------+-------+-------+-------+------+------+-------+--------+--------+-------+-------+-------+-------+-------+----------+-------+----------+-------+--------+-------+-------+---------+-------+-------+-------+----------+-------+-------+-------+--------+-------+-------+-------+-------+-------+-------+-------+----------+-------+-------+-------+-------+-------

In [None]:
##removing null raws on target variable
df=df.na.drop(subset='FINCP')
my_count(df)

+------+------+------+------+------+------+------+------+------+------+--------+------+--------+-----+------+------+------+-------+------+------+------+--------+------+------+------+-----+------+------+------+------+------+--------+------+------+------+------+------+------+---------+------+-----+------+------+------+------+----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-------+------+------+------+------+------+-------+------+------+------+------+------+------+------+-------+--------+--------+------+------+------+------+------+----------+------+----------+------+--------+------+------+---------+------+------+------+----------+------+------+------+--------+------+-------+------+------+------+------+------+----------+------+------+------+------+------+------+--------+-----------+------+------+------+------+------+------+------+--

In [None]:
#all df from strings to integers (all the values are whole numbers)
for c in df.columns:
  df = df.withColumn(c ,df[c].cast(IntegerType()))
df = df.na.fill(0)
df.describe().show()

+-------+------------------+-----------------+------------------+------+------------------+------------------+------------------+-------------------+------------------+-----------------+------------------+------------------+-------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+-------------------+------------------+-------------------+------------------+-------------------+------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------+------------------+------------------+-----------------+------------------+------------------+------------------+--------------------+------------------+

In [None]:
#creating the dense vector of regressors and label
names=df.columns
names.remove('FINCP')
vectorAssembler = VectorAssembler(inputCols = names,    
                                  outputCol = 'features') 
data = vectorAssembler.transform(df)
data = data.select(['features', 'FINCP'])
data.show(10)

+--------------------+------+
|            features| FINCP|
+--------------------+------+
|(142,[0,1,2,3,4,5...|151000|
|(142,[0,1,2,3,4,5...| 11400|
|(142,[0,1,2,3,4,5...|136000|
|(142,[0,1,2,3,4,5...| 52600|
|(142,[0,1,2,3,4,5...| 81600|
|(142,[0,1,2,3,4,5...| 26300|
|(142,[0,1,2,3,4,5...| 35000|
|(142,[0,1,2,3,4,5...| 60000|
|(142,[0,1,2,3,4,5...|108500|
|(142,[0,1,2,3,4,5...|175000|
+--------------------+------+
only showing top 10 rows



In [None]:
#deal categories
featureIndexer = VectorIndexer(inputCol="features", \
                               outputCol="IndexedFeatures",\
                               maxCategories=5).fit(data)

data = featureIndexer.transform(data)


In [None]:
###preprocess for linear regression/ridge/lasso
#PCA on indexed features

pca = PCA(k=20, inputCol="IndexedFeatures", outputCol="PCA_Features")
model = pca.fit(data)

data_pca = model.transform(data).select(['IndexedFeatures','FINCP','PCA_Features'])
data_pca.show(10)

+--------------------+------+--------------------+
|     IndexedFeatures| FINCP|        PCA_Features|
+--------------------+------+--------------------+
|(142,[0,1,2,3,4,5...|151000|[-25001.462468135...|
|(142,[0,1,2,3,4,5...| 11400|[0.08293104694177...|
|(142,[0,1,2,3,4,5...|136000|[-390006.15851081...|
|(142,[0,1,2,3,4,5...| 52600|[-120002.79847907...|
|(142,[0,1,2,3,4,5...| 81600|[-160003.36067697...|
|(142,[0,1,2,3,4,5...| 26300|[1.10189965855541...|
|(142,[0,1,2,3,4,5...| 35000|[-40001.520012774...|
|(142,[0,1,2,3,4,5...| 60000|[-60003.360772212...|
|(142,[0,1,2,3,4,5...|108500|[0.57439156180636...|
|(142,[0,1,2,3,4,5...|175000|[-250005.72899708...|
+--------------------+------+--------------------+
only showing top 10 rows



In [None]:
# Split dataset
(trainingData, testData) = data_pca.randomSplit([0.7, 0.3])

In [None]:
#scaling input

#maxAbs
ABSscaler = MaxAbsScaler(inputCol="PCA_Features", outputCol="scaled_PCA_Features")
scalerModel = ABSscaler.fit(trainingData)
#scaled train
data_input_train = scalerModel.transform(trainingData)
#scaled test on train
data_input_test = scalerModel.transform(testData) 

data_input_train.show(5)

+--------------------+------+--------------------+--------------------+
|     IndexedFeatures| FINCP|        PCA_Features| scaled_PCA_Features|
+--------------------+------+--------------------+--------------------+
|(142,[0,1,2,3,4,5...| 68500|[-351.97028034589...|[-7.3710714675661...|
|(142,[0,1,2,3,4,5...|161200|[-479006.91542431...|[-0.1003151232990...|
|(142,[0,1,2,3,4,5...| 76000|[-217010.50401744...|[-0.0454470170820...|
|(142,[0,1,2,3,4,5...|102000|[-190003.92910934...|[-0.0397912158721...|
|(142,[0,1,2,3,4,5...|136264|[-500008.22604167...|[-0.1047132833176...|
+--------------------+------+--------------------+--------------------+
only showing top 5 rows



In [None]:
#scaling target variable

target_min = data_input_train.agg({"FINCP": "min"}).collect()[0][0]
target_max = data_input_train.agg({"FINCP": "max"}).collect()[0][0]

#scaled train
data_input_train = data_input_train.withColumn('scaled_FINCP', (col('FINCP') - target_min )/ target_max)
#scaled test computed on train values
data_input_test = data_input_test.withColumn('scaled_FINCP', (col('FINCP')-target_min )/ target_max) 


data_input_test.show(5)

+--------------------+------+--------------------+--------------------+-------------------+
|     IndexedFeatures| FINCP|        PCA_Features| scaled_PCA_Features|       scaled_FINCP|
+--------------------+------+--------------------+--------------------+-------------------+
|(142,[0,1,2,3,4,5...| 51200|[-4727016.3490513...|[-0.9899465177278...|0.03445145631067961|
|(142,[0,1,2,3,4,5...|180000|[-4727002.1819880...|[-0.9899435508172...|0.09697572815533981|
|(142,[0,1,2,3,4,5...|130200|[-4727008.7124058...|[-0.9899449184376...|0.07280097087378641|
|(142,[0,1,2,3,4,5...|429600|[-4726998.1756776...|[-0.9899427118031...|0.21814077669902912|
|(142,[0,1,2,3,4,5...| 30000|[-300006.91405946...|[-0.0628283843204...|0.02416019417475728|
+--------------------+------+--------------------+--------------------+-------------------+
only showing top 5 rows



In [None]:
###linear regression/ridge/lasso

from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit


model= LinearRegression(featuresCol = 'scaled_PCA_Features', 
                            labelCol = 'scaled_FINCP')
param = ParamGridBuilder() \
  .addGrid(model.fitIntercept, [True, False]) \
  .addGrid(model.regParam, [0.0001, 0.001, 0.01, 0.05, 0.1]) \
  .addGrid(model.elasticNetParam, [0, 0.25, 0.5, 0.75, 1]) \
  .build()
evaluator= RegressionEvaluator(metricName='rmse', predictionCol='prediction', labelCol='scaled_FINCP')
validation= TrainValidationSplit(estimator=model, estimatorParamMaps=param, evaluator=evaluator, parallelism=2)
validation_Model = validation.fit(data_input_train)#actually validated



In [None]:
bestModel = validation_Model.bestModel
print ('Best Param (regParam): ', bestModel._java_obj.getRegParam())
print ('Best Param (elasticNetParam): ', bestModel._java_obj.getElasticNetParam())
print ('Best Param (Intercept): ', bestModel._java_obj.getFitIntercept())

Best Param (regParam):  0.0001
Best Param (elasticNetParam):  0.0
Best Param (Intercept):  True


In [None]:
print ('Number of models to be tested: ', len(param))


Number of models to be tested:  50


In [None]:
evaluator.evaluate(validation_Model.transform(data_input_train))

0.028923378015497447

In [None]:
evaluato= RegressionEvaluator(metricName='r2', predictionCol='prediction', labelCol='scaled_FINCP')
evaluato.evaluate(validation_Model.transform(data_input_train))

0.5529906522793915

In [None]:
Ridge=bestModel.transform(data_input_test).select('scaled_pca_Features', 'scaled_fincp', 'prediction') 
Ridge.show() 

+--------------------+--------------------+--------------------+
| scaled_pca_Features|        scaled_fincp|          prediction|
+--------------------+--------------------+--------------------+
|[-0.9899465177278...| 0.03445145631067961| 0.21442034612579838|
|[-0.9899435508172...| 0.09697572815533981| 0.24078772650038519|
|[-0.9899449184376...| 0.07280097087378641|  0.2568684060921241|
|[-0.9899427118031...| 0.21814077669902912|  0.2515444946390748|
|[-0.0628283843204...| 0.02416019417475728|0.004503438202366927|
|[-0.0178017255266...| 0.04964563106796117| 0.05523359668631297|
|[-0.0628278541581...|  0.0853252427184466| 0.07389086161540696|
|[-0.2513118741065...| 0.10634466019417475| 0.19697028977792813|
|[-0.1308904357954...| 0.03789805825242718| 0.08077050785880471|
|[-0.0837709462490...| 0.10061650485436893| 0.09950622802711212|
|[-0.0557076419604...|  0.0785388349514563|  0.0776622864866936|
|[-0.0240849670418...|0.020674757281553397|-6.20632742235810...|
|[-0.0083775268634...|0.0

In [None]:
Ridge_evaluator = RegressionEvaluator(labelCol="scaled_fincp", predictionCol="prediction", metricName="rmse")
rmse = Ridge_evaluator.evaluate(Ridge)

Ridge_evaluator2 = RegressionEvaluator(labelCol="scaled_fincp", predictionCol="prediction", metricName="r2")
r2 = Ridge_evaluator2.evaluate(Ridge)


print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
print("R Squared (R2) on test data = %g" % r2)

Root Mean Squared Error (RMSE) on test data = 0.0287423
R Squared (R2) on test data = 0.547936


In [None]:
y_true = Ridge.select("scaled_FINCP").toPandas()
y_pred = Ridge.select("prediction").toPandas()

import sklearn.metrics
r2_score = sklearn.metrics.r2_score(y_true, y_pred)
print('r2_score: {0}'.format(r2_score))

r2_score: 0.5479361821082294


In [None]:
from pyspark.ml.regression import LinearRegression

#define the model
Linear= LinearRegression(featuresCol = 'scaled_PCA_Features', 
                            labelCol = 'scaled_FINCP')

# Fit the model
Linear_Model = Linear.fit(data_input_train)

In [None]:
predictions = Linear_Model.transform(data_input_test)

In [None]:
def modelsummary(model):
    import numpy as np
    print ("Note: the last rows are the information for Intercept")
    print ("##","-------------------------------------------------")
    print ("##","  Estimate   |   Std.Error | t Values  |  P-value")
    coef = np.append(list(model.coefficients),model.intercept)
    Summary=model.summary

    for i in range(len(Summary.pValues)):
        print ("##",'{:10.6f}'.format(coef[i]),\
        '{:10.6f}'.format(Summary.coefficientStandardErrors[i]),\
        '{:8.3f}'.format(Summary.tValues[i]),\
        '{:10.6f}'.format(Summary.pValues[i]))

    print ("##",'---')
    print ("##","Mean squared error: % .6f" \
           % Summary.meanSquaredError, ", RMSE: % .6f" \
           % Summary.rootMeanSquaredError )
    print ("##","Multiple R-squared: %f" % Summary.r2, ", \
            Total iterations: %i"% Summary.totalIterations)

In [None]:
modelsummary(Linear_Model)

Note: the last rows are the information for Intercept
## -------------------------------------------------
##   Estimate   |   Std.Error | t Values  |  P-value
##  -0.293458   0.000557 -526.429   0.000000
##   0.117629   0.000428  274.640   0.000000
##  -0.009889   0.000371  -26.661   0.000000
##   0.051093   0.000338  151.115   0.000000
##   0.012596   0.000475   26.508   0.000000
##   0.000638   0.000386    1.651   0.098731
##  -0.019506   0.000902  -21.615   0.000000
##   0.054697   0.000637   85.825   0.000000
##  -0.070098   0.000561 -125.001   0.000000
##  -0.004243   0.000269  -15.745   0.000000
##  -0.005091   0.000464  -10.961   0.000000
##   0.026574   0.000880   30.210   0.000000
##  -0.034786   0.000835  -41.652   0.000000
##   0.005724   0.000385   14.866   0.000000
##   0.126482   0.000280  451.619   0.000000
##   0.091511   0.000307  298.432   0.000000
##   0.006491   0.000270   24.052   0.000000
##  -0.015738   0.000255  -61.824   0.000000
##  -0.001969   0.000278   -7.

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(labelCol="scaled_FINCP",
                                predictionCol="prediction",
                                metricName="rmse")
evaluator1 = RegressionEvaluator(labelCol="scaled_FINCP",
                                predictionCol="prediction",
                                metricName="mse")
rmse = evaluator.evaluate(predictions)
mse = evaluator1.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
print("Mean Squared Error (MSE) on test data = %g" % mse)

Root Mean Squared Error (RMSE) on test data = 0.0287433
Mean Squared Error (MSE) on test data = 0.000826176


In [None]:
y_true = predictions.select("scaled_FINCP").toPandas()
y_pred = predictions.select("prediction").toPandas()

import sklearn.metrics
r2_score = sklearn.metrics.r2_score(y_true, y_pred)
print('r2_score: {0}'.format(r2_score))

r2_score: 0.5479054201903762


In [None]:
lr = LinearRegression(maxIter=10, regParam=0.01, elasticNetParam=0.2,featuresCol = 'scaled_PCA_Features', labelCol = 'scaled_FINCP')
lrModel = lr.fit(data_input_train)
pred= lrModel.transform(data_input_test)
pred1= lrModel.transform(data_input_train)

In [None]:
eva1 = RegressionEvaluator(labelCol="scaled_FINCP",
                                predictionCol="prediction",
                                metricName="rmse")
eva2 = RegressionEvaluator(labelCol="scaled_FINCP",
                                predictionCol="prediction",
                                metricName="mse")
eva3 = RegressionEvaluator(labelCol="scaled_FINCP",
                                predictionCol="prediction",
                                metricName="r2")
rmse = eva1.evaluate(pred1)
mse = eva2.evaluate(pred1)
r2 = eva3.evaluate(pred1)
print("Root Mean Squared Error (RMSE) on train data = %g" % rmse)
print("Mean Squared Error (MSE) on train data = %g" % mse)
print("R2 on train data = %g" % r2)
testrmse = eva1.evaluate(pred)
testmse = eva2.evaluate(pred)
testr2 = eva3.evaluate(pred)
print("Root Mean Squared Error (RMSE) on test data = %g" % testrmse)
print("Mean Squared Error (MSE) on test data = %g" % testmse)
print("R2 on test data = %g" % testr2)

Root Mean Squared Error (RMSE) on train data = 0.0305368
Mean Squared Error (MSE) on train data = 0.000932494
R2 on train data = 0.50173
Root Mean Squared Error (RMSE) on test data = 0.0302671
Mean Squared Error (MSE) on test data = 0.000916099
R2 on test data = 0.498698
