<a href="https://colab.research.google.com/github/carlosdcorona/PySpark/blob/main/CurveFittingWithPySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Classwork 08: Curve Fitting with pySpark

### Bernardo Samuel Salazar de Hoyos A00825722
### Carlos Daniel Corona Cortez A01656002
### Sebastian Andres Saldaña Cárdenas A01570274
### Sergio Noé Torres Rodríguez A00825706
### Raúl V. Ramírez Velarde
### October 11, 2022

# Boston House Market Dataset Curve Fitting

## Import pySpark modules

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.1.3/spark-3.1.3-bin-hadoop2.7.tgz
!tar xf spark-3.1.3-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
#All imports
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.3-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

## Mount Google Drive

In [None]:
#Mount Google drive, read local file. The file was previously uploaded to Google Drive
from google.colab import drive
drive.mount("/content/drive")
%ls

Mounted at /content/drive
[0m[01;34mdrive[0m/  [01;34msample_data[0m/  [01;34mspark-3.1.3-bin-hadoop2.7[0m/  spark-3.1.3-bin-hadoop2.7.tgz


## Load Boston Dataset

In [None]:
path_csv="/content/drive/Shareddrives/Infra DS Equipo 4/Classwork08/fat.csv"
dataset = spark.read.csv(path_csv,inferSchema=True, header =True)
dataset.show(10)

+---+------+----+-------+---+------+------+------+-----+----+-----+-----+-----+-----+----+-----+------+-------+-----+
|_c0|brozek|siri|density|age|weight|height|adipos| free|neck|chest|abdom|  hip|thigh|knee|ankle|biceps|forearm|wrist|
+---+------+----+-------+---+------+------+------+-----+----+-----+-----+-----+-----+----+-----+------+-------+-----+
|  1|  12.6|12.3| 1.0708| 23|154.25| 67.75|  23.7|134.9|36.2| 93.1| 85.2| 94.5| 59.0|37.3| 21.9|  32.0|   27.4| 17.1|
|  2|   6.9| 6.1| 1.0853| 22|173.25| 72.25|  23.4|161.3|38.5| 93.6| 83.0| 98.7| 58.7|37.3| 23.4|  30.5|   28.9| 18.2|
|  3|  24.6|25.3| 1.0414| 22| 154.0| 66.25|  24.7|116.0|34.0| 95.8| 87.9| 99.2| 59.6|38.9| 24.0|  28.8|   25.2| 16.6|
|  4|  10.9|10.4| 1.0751| 26|184.75| 72.25|  24.9|164.7|37.4|101.8| 86.4|101.2| 60.1|37.3| 22.8|  32.4|   29.4| 18.2|
|  5|  27.8|28.7|  1.034| 24|184.25| 71.25|  25.6|133.1|34.4| 97.3|100.0|101.9| 63.2|42.2| 24.0|  32.2|   27.7| 17.7|
|  6|  20.6|20.9| 1.0502| 24|210.25| 74.75|  26.5|167.0|

## Fit the dataset to a linear regression model

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

In [None]:
dataset.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- brozek: double (nullable = true)
 |-- siri: double (nullable = true)
 |-- density: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- weight: double (nullable = true)
 |-- height: double (nullable = true)
 |-- adipos: double (nullable = true)
 |-- free: double (nullable = true)
 |-- neck: double (nullable = true)
 |-- chest: double (nullable = true)
 |-- abdom: double (nullable = true)
 |-- hip: double (nullable = true)
 |-- thigh: double (nullable = true)
 |-- knee: double (nullable = true)
 |-- ankle: double (nullable = true)
 |-- biceps: double (nullable = true)
 |-- forearm: double (nullable = true)
 |-- wrist: double (nullable = true)



In [None]:
#Input all the features in one vector column
assembler = VectorAssembler(inputCols=['wrist'], outputCol = 'Attributes')

output = assembler.transform(dataset)

#Input vs Output
finalized_data = output.select("Attributes","weight")

finalized_data.show(10)

+----------+------+
|Attributes|weight|
+----------+------+
|    [17.1]|154.25|
|    [18.2]|173.25|
|    [16.6]| 154.0|
|    [18.2]|184.75|
|    [17.7]|184.25|
|    [18.8]|210.25|
|    [17.7]| 181.0|
|    [18.8]| 176.0|
|    [18.2]| 191.0|
|    [19.2]|198.25|
+----------+------+
only showing top 10 rows



In [None]:
#Split training and testing data
train_data,test_data = finalized_data.randomSplit([0.8,0.2])


regressor = LinearRegression(featuresCol = 'Attributes', labelCol = 'weight')

#Learn to fit the model from training set
regressor = regressor.fit(train_data)

#To predict the prices on testing set
#pred_lr = regressor.evaluate(test_data)

#Predict the model
#pred_lr.predictions.show(10)

#It can also be done in this way
pred_lr = regressor.transform(test_data)
pred_lr.select('Attributes', 'weight', 'prediction').show(15)

+----------+------+------------------+
|Attributes|weight|        prediction|
+----------+------+------------------+
|    [16.5]| 131.5|138.46022923469417|
|    [16.5]|151.25|138.46022923469417|
|    [16.7]| 151.0|143.14944152907378|
|    [16.8]| 127.5|145.49404767626362|
|    [16.9]| 125.0| 147.8386538234534|
|    [16.9]|137.25| 147.8386538234534|
|    [16.9]|145.25| 147.8386538234534|
|    [16.9]|147.75| 147.8386538234534|
|    [16.9]|162.75| 147.8386538234534|
|    [16.9]| 168.0| 147.8386538234534|
|    [17.0]|191.75|150.18325997064323|
|    [17.3]| 156.0|157.21707841221274|
|    [17.3]|195.75|157.21707841221274|
|    [17.4]|172.75|159.56168455940247|
|    [17.6]|159.75| 164.2508968537822|
+----------+------+------------------+
only showing top 15 rows



In [None]:
import numpy as np
print ("Note: the last rows are the information for Intercept")
print ("##","-------------------------------------------------")
print ("##","  Estimate   |   Std.Error | t Values  |  P-value")
coef = np.append(list(regressor.coefficients),regressor.intercept)
Summary=regressor.summary

for i in range(len(Summary.pValues)):
    print ("##",'{:10.6f}'.format(coef[i]),\
    '{:10.6f}'.format(Summary.coefficientStandardErrors[i]),\
    '{:8.3f}'.format(Summary.tValues[i]),\
    '{:10.6f}'.format(Summary.pValues[i]))

print ("##",'---')
print ("##","Mean squared error: % .6f" \
        % Summary.meanSquaredError, ", \
        RMSE: % .6f" \
        % Summary.rootMeanSquaredError )
print ("##","Multiple R-squared: %f" % Summary.r2, ", \
        Total iterations: %i"% Summary.totalIterations)

Note: the last rows are the information for Intercept
## -------------------------------------------------
##   Estimate   |   Std.Error | t Values  |  P-value
##  23.446061   1.597387   14.678   0.000000
## -248.399785  29.180445   -8.513   0.000000
## ---
## Mean squared error:  413.236140 ,         RMSE:  20.328210
## Multiple R-squared: 0.531370 ,         Total iterations: 0


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
#Evaluate linear regression
eval_lr = RegressionEvaluator(labelCol="weight", predictionCol="prediction", metricName="rmse")
print("Linear regression model\n")

# Root Mean Square Error
rmse = eval_lr.evaluate(pred_lr)
print("RMSE: %.3f" % rmse)
# r2 - coefficient of determination
r2 = eval_lr.evaluate(pred_lr, {eval_lr.metricName: "r2"})
print("r2: %.3f" %r2)


Linear regression model

RMSE: 19.168
r2: 0.534


# Fat Dataset Curve Fitting