# This is simple regression in Pyspark

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=70fe92c508df8eec41a87855320ffe24b28e103300bc3fa3b9fc85c2d2e29141
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [None]:
import pyspark

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
spark = SparkSession.builder.getOrCreate()


In [5]:
df = spark.read.format("csv").load("/content/sample_data/california_housing_train.csv", header=True, inferSchema=True)

df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [6]:
df.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
|  -114.58|   33.63|    

In [7]:
train, test = df.randomSplit([0.7,0.3],seed = 4)

In [32]:
features = [x for (x,datatype) in df.dtypes if x != "median_house_value"]
features

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income']

Vector assembling

In [33]:
from pyspark.ml.feature import VectorAssembler


In [34]:
assemblerInput = [x for x in features]
vector_assembler = VectorAssembler(inputCols = assemblerInput, outputCol = "VectorAssembler_features")

In [38]:
from pyspark.ml import Pipeline
#pipeline = Pipeline().setStages([vector_assembler])
#model = pipeline.fit()
model = vector_assembler.transform(df)

linear regression 

In [39]:
from pyspark.ml.regression import LinearRegression
data = model.select(
    F.col("VectorAssembler_features").alias('features'),
    F.col("median_house_value").alias("label"),
)

In [44]:
train, test = data.randomSplit([0.7,0.3],seed = 4)

In [45]:
data.show(truncate = False)

+-------------------------------------------------------+-------+
|features                                               |label  |
+-------------------------------------------------------+-------+
|[-114.31,34.19,15.0,5612.0,1283.0,1015.0,472.0,1.4936] |66900.0|
|[-114.47,34.4,19.0,7650.0,1901.0,1129.0,463.0,1.82]    |80100.0|
|[-114.56,33.69,17.0,720.0,174.0,333.0,117.0,1.6509]    |85700.0|
|[-114.57,33.64,14.0,1501.0,337.0,515.0,226.0,3.1917]   |73400.0|
|[-114.57,33.57,20.0,1454.0,326.0,624.0,262.0,1.925]    |65500.0|
|[-114.58,33.63,29.0,1387.0,236.0,671.0,239.0,3.3438]   |74000.0|
|[-114.58,33.61,25.0,2907.0,680.0,1841.0,633.0,2.6768]  |82400.0|
|[-114.59,34.83,41.0,812.0,168.0,375.0,158.0,1.7083]    |48500.0|
|[-114.59,33.61,34.0,4789.0,1175.0,3134.0,1056.0,2.1782]|58400.0|
|[-114.6,34.83,46.0,1497.0,309.0,787.0,271.0,2.1908]    |48100.0|
|[-114.6,33.62,16.0,3741.0,801.0,2434.0,824.0,2.6797]   |86500.0|
|[-114.6,33.6,21.0,1988.0,483.0,1182.0,437.0,1.625]     |62000.0|
|[-114.61,

In [46]:
model = LinearRegression().fit(train)

In [48]:
pred = model.evaluate(test)
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

# Root Mean Square Error
rmse = eval.evaluate(pred.predictions)
print("RMSE: %.3f" % rmse)

# Mean Square Error
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

RMSE: 70334.427
MSE: 4946931660.000
MAE: 51693.045
r2: 0.633
