<a href="https://colab.research.google.com/github/bodadaniel/Spark/blob/main/pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Install Pyspark

In [1]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m18.2 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=0a2faaf0f52fc72cb3a20145be7d261ccd58770cb7316e9da16e6b93018a3780
  Stored in directory: /root/.cache/pip/wheels/b1/59/a0/a1a0624b5e865fd389919c1a10f53aec9b12195d6747710baf
Successfully built pyspark
Installing collected packages: py4j, pyspa

# Load packages

In [3]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row
from pyspark.sql import SparkSession

import os
import numpy as np

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col

from pyspark.ml.feature import VectorAssembler, StandardScaler

from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.mllib.evaluation import RegressionMetrics

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel, TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml import Pipeline

# Some basic operations


In [4]:
spark = SparkSession.builder.getOrCreate()

In [5]:
df = spark.createDataFrame([
    Row(a=1, b=2., c='a b', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='a b', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='a b', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df.show()

+---+---+---+----------+-------------------+
|  a|  b|  c|         d|                  e|
+---+---+---+----------+-------------------+
|  1|2.0|a b|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|a b|2000-02-01|2000-01-02 12:00:00|
|  4|5.0|a b|2000-03-01|2000-01-03 12:00:00|
+---+---+---+----------+-------------------+



In [6]:
df.select(F.split(col('c'), ' ').getItem(1).alias('last')).show()

+----+
|last|
+----+
|   b|
|   b|
|   b|
+----+



In [7]:
df = spark.createDataFrame([
    Row(a=1, b=2., c=['a', 'b', 'c'], d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c=['a', 'c'], d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c=['a', 'b', 'c'], d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df.show()

+---+---+---------+----------+-------------------+
|  a|  b|        c|         d|                  e|
+---+---+---------+----------+-------------------+
|  1|2.0|[a, b, c]|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|   [a, c]|2000-02-01|2000-01-02 12:00:00|
|  4|5.0|[a, b, c]|2000-03-01|2000-01-03 12:00:00|
+---+---+---------+----------+-------------------+



In [8]:
df.withColumn('size', F.size(col('c'))).show()

+---+---+---------+----------+-------------------+----+
|  a|  b|        c|         d|                  e|size|
+---+---+---------+----------+-------------------+----+
|  1|2.0|[a, b, c]|2000-01-01|2000-01-01 12:00:00|   3|
|  2|3.0|   [a, c]|2000-02-01|2000-01-02 12:00:00|   2|
|  4|5.0|[a, b, c]|2000-03-01|2000-01-03 12:00:00|   3|
+---+---+---------+----------+-------------------+----+



In [9]:
df.select(df.a, df.b, 
  F.when(F.size(df.c) > 2, "Long")
  .otherwise("Short").alias('Desc')).show()

+---+---+-----+
|  a|  b| Desc|
+---+---+-----+
|  1|2.0| Long|
|  2|3.0|Short|
|  4|5.0| Long|
+---+---+-----+



In [10]:
df.withColumn('random_val',
              F.when(df.b == 2, F.rand())
              .when(df.b < 2, 1)
              .otherwise(0)).show()

+---+---+---------+----------+-------------------+------------------+
|  a|  b|        c|         d|                  e|        random_val|
+---+---+---------+----------+-------------------+------------------+
|  1|2.0|[a, b, c]|2000-01-01|2000-01-01 12:00:00|0.5666190129827455|
|  2|3.0|   [a, c]|2000-02-01|2000-01-02 12:00:00|               0.0|
|  4|5.0|[a, b, c]|2000-03-01|2000-01-03 12:00:00|               0.0|
+---+---+---------+----------+-------------------+------------------+



In [11]:
df.orderBy(df.b.desc()).show()

+---+---+---------+----------+-------------------+
|  a|  b|        c|         d|                  e|
+---+---+---------+----------+-------------------+
|  4|5.0|[a, b, c]|2000-03-01|2000-01-03 12:00:00|
|  2|3.0|   [a, c]|2000-02-01|2000-01-02 12:00:00|
|  1|2.0|[a, b, c]|2000-01-01|2000-01-01 12:00:00|
+---+---+---------+----------+-------------------+



In [12]:
df.select('b').rdd.max()[0]

5.0

In [13]:
def add_together(x, y):
  return x + y

udf_add_together = udf(add_together, FloatType())
df.select(df.a, df.b, udf_add_together(df.a, df.b).alias('a+b') ).show()

+---+---+---+
|  a|  b|a+b|
+---+---+---+
|  1|2.0|3.0|
|  2|3.0|5.0|
|  4|5.0|9.0|
+---+---+---+



In [14]:
df = spark.createDataFrame([
    Row(b=['1', '2'], c=['a', 'b', 'c'] ),
    Row(b=['1', '4'], c=['a', 'c']),
    Row(b=['1', '2'], c=['a', 'b', 'c'])
])
df.show()

+------+---------+
|     b|        c|
+------+---------+
|[1, 2]|[a, b, c]|
|[1, 4]|   [a, c]|
|[1, 2]|[a, b, c]|
+------+---------+



In [15]:
df.select(df.b).distinct().show()

+------+
|     b|
+------+
|[1, 2]|
|[1, 4]|
+------+



In [16]:
df.select( [col for col in df.columns  ] ).distinct().show()

+------+---------+
|     b|        c|
+------+---------+
|[1, 2]|[a, b, c]|
|[1, 4]|   [a, c]|
+------+---------+



In [18]:
df.select( df.b, df.c ).distinct().show()

+------+---------+
|     b|        c|
+------+---------+
|[1, 2]|[a, b, c]|
|[1, 4]|   [a, c]|
+------+---------+



In [19]:
spark.stop()

# Load data

In [4]:
from google.colab import drive 
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [5]:
schema = StructType([
    StructField("long", FloatType(), nullable=True),
    StructField("lat", FloatType(), nullable=True),
    StructField("medage", FloatType(), nullable=True),
    StructField("totrooms", FloatType(), nullable=True),
    StructField("totbdrms", FloatType(), nullable=True),
    StructField("pop", FloatType(), nullable=True),
    StructField("houshlds", FloatType(), nullable=True),
    StructField("medinc", FloatType(), nullable=True),
    StructField("medhv", FloatType(), nullable=True)]
)

In [6]:
HOUSING_DATA = '/content/gdrive/MyDrive/Colab Notebooks/Spark/housing.csv'

In [7]:
spark = SparkSession.builder.getOrCreate()

In [12]:
housing_df = spark.read.csv(path=HOUSING_DATA, schema=schema).cache()
#housing_df = spark.read.csv(path=HOUSING_DATA).cache()

# Examine data

In [29]:
housing_df.columns

['long',
 'lat',
 'medage',
 'totrooms',
 'totbdrms',
 'pop',
 'houshlds',
 'medinc',
 'medhv']

In [30]:
type(housing_df)

pyspark.sql.dataframe.DataFrame

In [31]:
housing_df.show(5)

+-------+-----+------+--------+--------+------+--------+------+--------+
|   long|  lat|medage|totrooms|totbdrms|   pop|houshlds|medinc|   medhv|
+-------+-----+------+--------+--------+------+--------+------+--------+
|   null| null|  null|    null|    null|  null|    null|  null|    null|
|-122.23|37.88|  41.0|   880.0|   129.0| 322.0|   126.0|8.3252|452600.0|
|-122.22|37.86|  21.0|  7099.0|  1106.0|2401.0|  1138.0|8.3014|358500.0|
|-122.24|37.85|  52.0|  1467.0|   190.0| 496.0|   177.0|7.2574|352100.0|
|-122.25|37.85|  52.0|  1274.0|   235.0| 558.0|   219.0|5.6431|341300.0|
+-------+-----+------+--------+--------+------+--------+------+--------+
only showing top 5 rows



In [32]:
housing_df.describe().show()

+-------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+
|summary|               long|              lat|            medage|          totrooms|          totbdrms|               pop|         houshlds|            medinc|             medhv|
+-------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+
|  count|              20640|            20640|             20640|             20640|             20433|             20640|            20640|             20640|             20640|
|   mean|-119.56970444871473|35.63186143109965|28.639486434108527|2635.7630813953488| 537.8705525375618|1425.4767441860465|499.5396802325581|3.8706710030346416|206855.81690891474|
| stddev|  2.003531742932898|2.135952380602968| 12.58555761211163|2181.6152515827944|421.38507007403

In [33]:
housing_df.printSchema()

root
 |-- long: float (nullable = true)
 |-- lat: float (nullable = true)
 |-- medage: float (nullable = true)
 |-- totrooms: float (nullable = true)
 |-- totbdrms: float (nullable = true)
 |-- pop: float (nullable = true)
 |-- houshlds: float (nullable = true)
 |-- medinc: float (nullable = true)
 |-- medhv: float (nullable = true)



In [38]:
housing_df.columns[0]

'long'

In [39]:
housing_df.describe().select(
                    "summary",
                    F.round("medage", 4).alias("medage"),
                    F.round("totrooms", 4).alias("totrooms"),
                    F.round("totbdrms", 4).alias("totbdrms"),
                    F.round("pop", 4).alias("pop"),
                    F.round("houshlds", 4).alias("houshlds"),
                    F.round("medinc", 4).alias("medinc"),
                    F.round("medhv", 4).alias("medhv")).show()

+-------+-------+---------+--------+---------+--------+-------+-----------+
|summary| medage| totrooms|totbdrms|      pop|houshlds| medinc|      medhv|
+-------+-------+---------+--------+---------+--------+-------+-----------+
|  count|20640.0|  20640.0| 20433.0|  20640.0| 20640.0|20640.0|    20640.0|
|   mean|28.6395|2635.7631|537.8706|1425.4767|499.5397| 3.8707|206855.8169|
| stddev|12.5856|2181.6153|421.3851|1132.4621|382.3298| 1.8998|115395.6159|
|    min|    1.0|      2.0|     1.0|      3.0|     1.0| 0.4999|    14999.0|
|    max|   52.0|  39320.0|  6445.0|  35682.0|  6082.0|15.0001|   500001.0|
+-------+-------+---------+--------+---------+--------+-------+-----------+



In [41]:
housing_df.groupBy("medage").count().sort("count", ascending=False).show()

+------+-----+
|medage|count|
+------+-----+
|  52.0| 1273|
|  36.0|  862|
|  35.0|  824|
|  16.0|  771|
|  17.0|  698|
|  34.0|  689|
|  26.0|  619|
|  33.0|  615|
|  18.0|  570|
|  25.0|  566|
|  32.0|  565|
|  37.0|  537|
|  15.0|  512|
|  19.0|  502|
|  27.0|  488|
|  24.0|  478|
|  30.0|  476|
|  28.0|  471|
|  20.0|  465|
|  29.0|  461|
+------+-----+
only showing top 20 rows



In [53]:
housing_df.where(col('medage') > 50).describe().show()

+-------+-------------------+------------------+-------------------+-----------------+------------------+-----------------+------------------+-----------------+-----------------+
|summary|               long|               lat|             medage|         totrooms|          totbdrms|              pop|          houshlds|           medinc|            medhv|
+-------+-------------------+------------------+-------------------+-----------------+------------------+-----------------+------------------+-----------------+-----------------+
|  count|               1321|              1321|               1321|             1321|              1312|             1321|              1321|             1321|             1321|
|   mean|-120.87149896845503| 36.59622257356838| 51.963663890991675|1858.430734292203| 403.4169207317073|934.5268735806208| 378.3436790310371|3.872136638053523|273687.5647236942|
| stddev| 1.9709576459240237|1.9193302506626395|0.18719594896468292|957.0860779154284|242.61603881783026|

In [52]:
housing_df.filter(col('medage') > 50).describe().select('summary',*[F.round(col , 4).alias(col) for col in housing_df.columns] ).show()

+-------+---------+-------+-------+---------+--------+--------+--------+-------+-----------+
|summary|     long|    lat| medage| totrooms|totbdrms|     pop|houshlds| medinc|      medhv|
+-------+---------+-------+-------+---------+--------+--------+--------+-------+-----------+
|  count|   1321.0| 1321.0| 1321.0|   1321.0|  1312.0|  1321.0|  1321.0| 1321.0|     1321.0|
|   mean|-120.8715|36.5962|51.9637|1858.4307|403.4169|934.5269|378.3437| 3.8721|273687.5647|
| stddev|    1.971| 1.9193| 0.1872| 957.0861| 242.616|524.6449|221.4431| 2.2938| 139266.334|
|    min|  -124.35|  32.66|   51.0|      8.0|     1.0|     8.0|     1.0| 0.4999|    14999.0|
|    max|  -116.88|  41.32|   52.0|   6186.0|  2747.0|  6675.0|  2538.0|15.0001|   500001.0|
+-------+---------+-------+-------+---------+--------+--------+--------+-------+-----------+



In [54]:
housing_df.withColumn('totrooms_int', col('totrooms').cast(IntegerType())).show(5)

+-------+-----+------+--------+--------+------+--------+------+--------+------------+
|   long|  lat|medage|totrooms|totbdrms|   pop|houshlds|medinc|   medhv|totrooms_int|
+-------+-----+------+--------+--------+------+--------+------+--------+------------+
|   null| null|  null|    null|    null|  null|    null|  null|    null|        null|
|-122.23|37.88|  41.0|   880.0|   129.0| 322.0|   126.0|8.3252|452600.0|         880|
|-122.22|37.86|  21.0|  7099.0|  1106.0|2401.0|  1138.0|8.3014|358500.0|        7099|
|-122.24|37.85|  52.0|  1467.0|   190.0| 496.0|   177.0|7.2574|352100.0|        1467|
|-122.25|37.85|  52.0|  1274.0|   235.0| 558.0|   219.0|5.6431|341300.0|        1274|
+-------+-----+------+--------+--------+------+--------+------+--------+------------+
only showing top 5 rows



In [60]:
housing_df.select( [F.count(F.when(F.isnan(c) | col(c).isNull(), c)).alias(c) for c in housing_df.columns] ).show()

+----+---+------+--------+--------+---+--------+------+-----+
|long|lat|medage|totrooms|totbdrms|pop|houshlds|medinc|medhv|
+----+---+------+--------+--------+---+--------+------+-----+
|   1|  1|     1|       1|     208|  1|       1|     1|    1|
+----+---+------+--------+--------+---+--------+------+-----+



In [55]:
housing_df.where(F.greatest(*[F.col(coll_i).isNull() for coll_i in housing_df.columns])).show()

+-------+-----+------+--------+--------+------+--------+------+--------+
|   long|  lat|medage|totrooms|totbdrms|   pop|houshlds|medinc|   medhv|
+-------+-----+------+--------+--------+------+--------+------+--------+
|   null| null|  null|    null|    null|  null|    null|  null|    null|
|-122.16|37.77|  47.0|  1256.0|    null| 570.0|   218.0| 4.375|161900.0|
|-122.17|37.75|  38.0|   992.0|    null| 732.0|   259.0|1.6196| 85100.0|
|-122.28|37.78|  29.0|  5154.0|    null|3741.0|  1273.0|2.5762|173400.0|
|-122.24|37.75|  45.0|   891.0|    null| 384.0|   146.0|4.9489|247100.0|
| -122.1|37.69|  41.0|   746.0|    null| 387.0|   161.0|3.9063|178400.0|
|-122.14|37.67|  37.0|  3342.0|    null|1635.0|   557.0|4.7933|186900.0|
|-121.77|39.66|  20.0|  3759.0|    null|1705.0|   600.0| 4.712|158600.0|
|-121.95|38.03|   5.0|  5526.0|    null|3207.0|  1012.0|4.0767|143100.0|
|-121.98|37.96|  22.0|  2987.0|    null|1420.0|   540.0|  3.65|204100.0|
|-122.01|37.94|  23.0|  3741.0|    null|1339.0|   4

In [67]:
housing_df.filter( F.isnan(col('totrooms')) | col('totrooms').isNull()  == True).show()

+----+----+------+--------+--------+----+--------+------+-----+
|long| lat|medage|totrooms|totbdrms| pop|houshlds|medinc|medhv|
+----+----+------+--------+--------+----+--------+------+-----+
|null|null|  null|    null|    null|null|    null|  null| null|
+----+----+------+--------+--------+----+--------+------+-----+



# Filtering and data imputation

In [13]:
housing_df = housing_df.filter( F.isnan(col('totrooms')) | col('totrooms').isNull() == False)

In [14]:
housing_df.filter( F.isnan(col('totbdrms')) | col('totbdrms').isNull() == True).show()

+-------+-----+------+--------+--------+------+--------+------+--------+
|   long|  lat|medage|totrooms|totbdrms|   pop|houshlds|medinc|   medhv|
+-------+-----+------+--------+--------+------+--------+------+--------+
|-122.16|37.77|  47.0|  1256.0|    null| 570.0|   218.0| 4.375|161900.0|
|-122.17|37.75|  38.0|   992.0|    null| 732.0|   259.0|1.6196| 85100.0|
|-122.28|37.78|  29.0|  5154.0|    null|3741.0|  1273.0|2.5762|173400.0|
|-122.24|37.75|  45.0|   891.0|    null| 384.0|   146.0|4.9489|247100.0|
| -122.1|37.69|  41.0|   746.0|    null| 387.0|   161.0|3.9063|178400.0|
|-122.14|37.67|  37.0|  3342.0|    null|1635.0|   557.0|4.7933|186900.0|
|-121.77|39.66|  20.0|  3759.0|    null|1705.0|   600.0| 4.712|158600.0|
|-121.95|38.03|   5.0|  5526.0|    null|3207.0|  1012.0|4.0767|143100.0|
|-121.98|37.96|  22.0|  2987.0|    null|1420.0|   540.0|  3.65|204100.0|
|-122.01|37.94|  23.0|  3741.0|    null|1339.0|   499.0|6.7061|322300.0|
|-122.08|37.88|  26.0|  2947.0|    null| 825.0|   6

In [15]:
avg_totbdrms = housing_df.select(F.mean(col('totbdrms'))).rdd.max()[0]

housing_df = housing_df.na.fill(avg_totbdrms,["totbdrms"])

In [16]:
housing_df.select( [F.count(F.when(F.isnan(c) | col(c).isNull(), c)).alias(c) for c in housing_df.columns] ).show()

+----+---+------+--------+--------+---+--------+------+-----+
|long|lat|medage|totrooms|totbdrms|pop|houshlds|medinc|medhv|
+----+---+------+--------+--------+---+--------+------+-----+
|   0|  0|     0|       0|       0|  0|       0|     0|    0|
+----+---+------+--------+--------+---+--------+------+-----+



In [64]:
housing_df.createOrReplaceTempView('housing')

In [65]:
spark.sql('SELECT * from housing').collect()[1]

Row(long=-122.2300033569336, lat=37.880001068115234, medage=41.0, totrooms=880.0, totbdrms=129.0, pop=322.0, houshlds=126.0, medinc=8.325200080871582, medhv=452600.0)

In [66]:
spark.catalog.listTables('default')

[Table(name='housing', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

# Transformation

In [19]:
featureCols = ["medage", "totrooms", "totbdrms", "pop", "houshlds", "medinc"]

In [20]:
assembler = VectorAssembler(inputCols=featureCols, outputCol="features") 

In [21]:
assembled_df = assembler.transform(housing_df)

In [22]:
assembled_df.show(10, truncate = False)

+-------+-----+------+--------+--------+------+--------+------+--------+----------------------------------------------------+
|long   |lat  |medage|totrooms|totbdrms|pop   |houshlds|medinc|medhv   |features                                            |
+-------+-----+------+--------+--------+------+--------+------+--------+----------------------------------------------------+
|-122.23|37.88|41.0  |880.0   |129.0   |322.0 |126.0   |8.3252|452600.0|[41.0,880.0,129.0,322.0,126.0,8.325200080871582]    |
|-122.22|37.86|21.0  |7099.0  |1106.0  |2401.0|1138.0  |8.3014|358500.0|[21.0,7099.0,1106.0,2401.0,1138.0,8.301400184631348]|
|-122.24|37.85|52.0  |1467.0  |190.0   |496.0 |177.0   |7.2574|352100.0|[52.0,1467.0,190.0,496.0,177.0,7.257400035858154]   |
|-122.25|37.85|52.0  |1274.0  |235.0   |558.0 |219.0   |5.6431|341300.0|[52.0,1274.0,235.0,558.0,219.0,5.643099784851074]   |
|-122.25|37.85|52.0  |1627.0  |280.0   |565.0 |259.0   |3.8462|342200.0|[52.0,1627.0,280.0,565.0,259.0,3.8461999893188

In [23]:
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

In [24]:
scaled_df = standardScaler.fit(assembled_df).transform(assembled_df)

In [25]:
scaled_df.show(10, truncate = False)

+-------+-----+------+--------+--------+------+--------+------+--------+----------------------------------------------------+------------------------------------------------------------------------------------------------------------------+
|long   |lat  |medage|totrooms|totbdrms|pop   |houshlds|medinc|medhv   |features                                            |features_scaled                                                                                                   |
+-------+-----+------+--------+--------+------+--------+------+--------+----------------------------------------------------+------------------------------------------------------------------------------------------------------------------+
|-122.23|37.88|41.0  |880.0   |129.0   |322.0 |126.0   |8.3252|452600.0|[41.0,880.0,129.0,322.0,126.0,8.325200080871582]    |[3.2577023016083064,0.40337085073160667,0.3076801308792145,0.2843362208866199,0.3295584480852433,4.38209543579743]|
|-122.22|37.86|21.0  |7099.0  |1106.

# Split

In [26]:
train_data, test_data = scaled_df.randomSplit([.8,.2], seed=123456789)

#Linear regression

## Train

In [27]:
lr = LinearRegression(featuresCol='features_scaled', labelCol="medhv", predictionCol='predmedhv', maxIter=10, regParam=0.3, elasticNetParam=0.8, standardization=False)
linearModel = lr.fit(train_data)

In [28]:
print(f' intercept: {linearModel.intercept}, coeff: {linearModel.coefficients}')

 intercept: -47749.48274656101, coeff: [25415.946341808318,-20607.597995951917,35841.50225519154,-43191.066160920374,39616.00389458362,87443.15307026169]


In [29]:
print(type(linearModel.coefficients))
type(linearModel.coefficients.toArray())

<class 'pyspark.ml.linalg.DenseVector'>


numpy.ndarray

In [30]:
coeff_df = pd.DataFrame({"Feature": ["Intercept"] + featureCols, "Coefficients": np.insert(linearModel.coefficients.toArray(), 0, linearModel.intercept)})
coeff_df

Unnamed: 0,Feature,Coefficients
0,Intercept,-47749.482747
1,medage,25415.946342
2,totrooms,-20607.597996
3,totbdrms,35841.502255
4,pop,-43191.066161
5,houshlds,39616.003895
6,medinc,87443.15307


## Test

In [31]:
predictions = linearModel.transform(test_data)

In [32]:
predictions.show()

+-------+-----+------+--------+--------+------+--------+------+--------+--------------------+--------------------+------------------+
|   long|  lat|medage|totrooms|totbdrms|   pop|houshlds|medinc|   medhv|            features|     features_scaled|         predmedhv|
+-------+-----+------+--------+--------+------+--------+------+--------+--------------------+--------------------+------------------+
|-124.27|40.69|  36.0|  2349.0|   528.0|1194.0|   465.0|2.5179| 79000.0|[36.0,2349.0,528....|[2.86042153311948...|166434.29919072316|
|-124.26|40.58|  52.0|  2217.0|   394.0| 907.0|   369.0|2.3571|111400.0|[52.0,2217.0,394....|[4.13171999228370...|182134.78257328103|
|-124.21|40.75|  32.0|  1218.0|   331.0| 620.0|   268.0|1.6528| 58100.0|[32.0,1218.0,331....|[2.54259691832843...| 113860.3897806863|
|-124.21|41.75|  20.0|  3810.0|   787.0|1993.0|   721.0|2.0074| 66900.0|[20.0,3810.0,787....|[1.58912307395527...| 115019.4447429098|
|-124.18|40.79|  40.0|  1398.0|   311.0| 788.0|   279.0|1.4668

In [33]:
predandlabels = predictions.select("predmedhv", "medhv")

In [34]:
print("Train RMSE: {0}".format(linearModel.summary.rootMeanSquaredError))
print("Train MAE: {0}".format(linearModel.summary.meanAbsoluteError))
print("Train R2: {0}".format(linearModel.summary.r2))

Train RMSE: 76125.67017789836
Train MAE: 56060.110530257036
Train R2: 0.5591032979158966


In [35]:
evaluator = RegressionEvaluator(predictionCol="predmedhv", labelCol='medhv', metricName='rmse')
print("Test RMSE: {0}".format(evaluator.evaluate(predandlabels)))

evaluator = RegressionEvaluator(predictionCol="predmedhv", labelCol='medhv', metricName='mae')
print("Test MAE: {0}".format(evaluator.evaluate(predandlabels)))

evaluator = RegressionEvaluator(predictionCol="predmedhv", labelCol='medhv', metricName='r2')
print("Test R2: {0}".format(evaluator.evaluate(predandlabels)))

Test RMSE: 77635.4221529499
Test MAE: 57598.66503478673
Test R2: 0.5687809177941909


In [36]:
metrics = RegressionMetrics(predandlabels.rdd)



In [37]:
print("RMSE: {0}".format(metrics.rootMeanSquaredError))
print("MAE: {0}".format(metrics.meanAbsoluteError))
print("R2: {0}".format(metrics.r2))

RMSE: 77635.4221529499
MAE: 57598.66503478673
R2: 0.5687809177941909


## Paramgrid builder

In [38]:
lr = LinearRegression(featuresCol='features_scaled', labelCol="medhv", predictionCol='predmedhv', standardization=False)

In [39]:
paramGrid = ParamGridBuilder().addGrid(lr.regParam, (0.01, 0.1))\
                              .addGrid(lr.maxIter, (5, 10))\
                              .addGrid(lr.tol, (1e-4, 1e-5))\
                              .addGrid(lr.elasticNetParam, (0.25,0.75))\
                              .build()

In [40]:
len(paramGrid)

16

In [41]:
tvs = TrainValidationSplit( estimator=lr
                           ,estimatorParamMaps=paramGrid
                           ,evaluator=RegressionEvaluator(predictionCol="predmedhv", labelCol='medhv', metricName='rmse')
                           ,trainRatio=0.8)

In [42]:
model = tvs.fit(train_data)
model_predictions= model.transform(test_data)

print('Train RMSE: ', RegressionEvaluator(predictionCol="predmedhv", labelCol='medhv', metricName='rmse').evaluate(model.transform(train_data)))
print('Test RMSE: ', RegressionEvaluator(predictionCol="predmedhv", labelCol='medhv', metricName='rmse').evaluate(model_predictions))

Train RMSE:  76630.04647668403
Test RMSE:  78258.07237425014


In [44]:
cv = CrossValidator( estimator=lr
                    ,estimatorParamMaps=paramGrid
                    ,evaluator=RegressionEvaluator(predictionCol="predmedhv", labelCol='medhv', metricName='rmse'))

In [45]:
model = cv.fit(train_data)
model_predictions= model.transform(test_data)

print('Train RMSE: ', RegressionEvaluator(predictionCol="predmedhv", labelCol='medhv', metricName='rmse').evaluate(model.transform(train_data)))
print('Test RMSE: ', RegressionEvaluator(predictionCol="predmedhv", labelCol='medhv', metricName='rmse').evaluate(model_predictions))

Train RMSE:  77197.76388718792
Test RMSE:  78716.86561614335


In [46]:
model.getNumFolds()

3

In [47]:
model.avgMetrics, len(model.avgMetrics)

([78344.25448076906,
  78344.25504600893,
  78344.25448076906,
  78344.25504600893,
  76721.56449966664,
  76721.56434105495,
  76721.56449966664,
  76721.56434105495,
  78344.25991835706,
  78309.22468039529,
  78344.25991835706,
  78309.22468039529,
  76968.70112839884,
  76611.74835314082,
  76968.70112839884,
  76611.74835314082],
 16)

In [48]:
model.getEstimator()
len(model.getEstimatorParamMaps())
model.getEvaluator()
model.getNumFolds()
#model.getParam('')
model.explainParams()

"estimator: estimator to be cross-validated (current: LinearRegression_a0eabf8fa93d)\nestimatorParamMaps: estimator param maps (current: [{Param(parent='LinearRegression_a0eabf8fa93d', name='regParam', doc='regularization parameter (>= 0).'): 0.01, Param(parent='LinearRegression_a0eabf8fa93d', name='maxIter', doc='max number of iterations (>= 0).'): 5, Param(parent='LinearRegression_a0eabf8fa93d', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0).'): 0.0001, Param(parent='LinearRegression_a0eabf8fa93d', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.25}, {Param(parent='LinearRegression_a0eabf8fa93d', name='regParam', doc='regularization parameter (>= 0).'): 0.01, Param(parent='LinearRegression_a0eabf8fa93d', name='maxIter', doc='max number of iterations (>= 0).'): 5, Param(parent='LinearRegression_a0eabf8fa93d', name='tol', doc='the convergence 

In [49]:
model.bestModel

LinearRegressionModel: uid=LinearRegression_a0eabf8fa93d, numFeatures=6

In [50]:
model.getEstimatorParamMaps()[ np.argmax(model.avgMetrics) ]

{Param(parent='LinearRegression_a0eabf8fa93d', name='regParam', doc='regularization parameter (>= 0).'): 0.1,
 Param(parent='LinearRegression_a0eabf8fa93d', name='maxIter', doc='max number of iterations (>= 0).'): 5,
 Param(parent='LinearRegression_a0eabf8fa93d', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0).'): 0.0001,
 Param(parent='LinearRegression_a0eabf8fa93d', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.25}

# Random Forest

##Train

In [51]:
rf = RandomForestRegressor(featuresCol='features_scaled', labelCol="medhv", predictionCol='predmedhv', numTrees=50)

In [52]:
model = rf.fit(train_data)

## Test

In [53]:
predictions = model.transform(test_data)

In [54]:
predictions.select("predmedhv", "medhv", "features_scaled").show(5)

+------------------+--------+--------------------+
|         predmedhv|   medhv|     features_scaled|
+------------------+--------+--------------------+
|150573.14640285523| 79000.0|[2.86042153311948...|
| 192949.6226532538|111400.0|[4.13171999228370...|
| 143066.1209227725| 58100.0|[2.54259691832843...|
|161533.24859331062| 66900.0|[1.58912307395527...|
|147887.67512998977| 64600.0|[3.17824614791054...|
+------------------+--------+--------------------+
only showing top 5 rows



In [56]:
evaluator = RegressionEvaluator(labelCol="medhv", predictionCol="predmedhv", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = ", rmse)

Root Mean Squared Error (RMSE) on test data =  82708.94529873368


## Paramgrid builder

In [57]:
rf = RandomForestRegressor(featuresCol='features_scaled', labelCol="medhv", predictionCol='predmedhv')

In [64]:
paramGrid = ParamGridBuilder().addGrid(rf.maxDepth, (2, 3, 5))\
                              .addGrid(rf.numTrees, (10, 20, 50))\
                              .build()

In [65]:
cv = CrossValidator( estimator=rf
                    ,estimatorParamMaps=paramGrid
                    ,evaluator=RegressionEvaluator(predictionCol="predmedhv", labelCol='medhv', metricName='rmse'))

In [66]:
model = cv.fit(train_data)
model_predictions= model.transform(test_data)

print('Train RMSE: ', RegressionEvaluator(predictionCol="predmedhv", labelCol='medhv', metricName='rmse').evaluate(model.transform(train_data)))
print('Test RMSE: ', RegressionEvaluator(predictionCol="predmedhv", labelCol='medhv', metricName='rmse').evaluate(model_predictions))

Train RMSE:  79923.10890011917
Test RMSE:  83281.57845203881


In [None]:
spark.stop()

#RDD

In [None]:
housing_rdd = housing_df.rdd

In [None]:
type(housing_rdd) 

pyspark.rdd.RDD

In [None]:
housing_rdd.collect()[1]

Row(long=-122.2300033569336, lat=37.880001068115234, medage=41.0, totrooms=880.0, totbdrms=129.0, pop=322.0, houshlds=126.0, medinc=8.325200080871582, medhv=452600.0)

In [None]:
housing_rdd.take(5)

[Row(long=None, lat=None, medage=None, totrooms=None, totbdrms=None, pop=None, houshlds=None, medinc=None, medhv=None),
 Row(long=-122.2300033569336, lat=37.880001068115234, medage=41.0, totrooms=880.0, totbdrms=129.0, pop=322.0, houshlds=126.0, medinc=8.325200080871582, medhv=452600.0),
 Row(long=-122.22000122070312, lat=37.86000061035156, medage=21.0, totrooms=7099.0, totbdrms=1106.0, pop=2401.0, houshlds=1138.0, medinc=8.301400184631348, medhv=358500.0),
 Row(long=-122.23999786376953, lat=37.849998474121094, medage=52.0, totrooms=1467.0, totbdrms=190.0, pop=496.0, houshlds=177.0, medinc=7.257400035858154, medhv=352100.0),
 Row(long=-122.25, lat=37.849998474121094, medage=52.0, totrooms=1274.0, totbdrms=235.0, pop=558.0, houshlds=219.0, medinc=5.643099784851074, medhv=341300.0)]

In [None]:
from pyspark.context import SparkContext

sc = SparkContext('local', 'test')

In [None]:
data = sc.textFile(HOUSING_DATA)

In [None]:
data.take(5)

['longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity',
 '-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0,NEAR BAY',
 '-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY',
 '-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0,NEAR BAY',
 '-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0,NEAR BAY']

In [None]:
from itertools import islice

data = data.mapPartitionsWithIndex(lambda idx, it: islice(it, 1, None) if idx == 0 else it)

In [None]:
data.take(211)

In [None]:
data2 = data.map(lambda l: l.strip(' "'))
data2.take(5)

['-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0,NEAR BAY',
 '-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY',
 '-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0,NEAR BAY',
 '-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0,NEAR BAY',
 '-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0,NEAR BAY']

In [None]:
data2 = data2.map(lambda l: l.split(','))
data2.take(211)

In [None]:
data3 = data2.map(lambda line: [float(line[0]), float(line[1]), float(line[2]), float(line[3]), float(line[4]), float(line[5]), float(line[6]), float(line[7]), float(line[8]), str(line[9])] )
data3.take(2)

[[-122.23,
  37.88,
  41.0,
  880.0,
  129.0,
  322.0,
  126.0,
  8.3252,
  452600.0,
  'NEAR BAY'],
 [-122.22,
  37.86,
  21.0,
  7099.0,
  1106.0,
  2401.0,
  1138.0,
  8.3014,
  358500.0,
  'NEAR BAY']]

In [None]:
training_data, test_data = data3.randomSplit([0.8, 0.2])

In [None]:
sample = data2.map(lambda line: [float(line[0]), float(line[1])])
sample.take(2)

[[-122.23, 37.88], [-122.22, 37.86]]

In [None]:
sample.count()

20640

In [None]:
sample.collect()

In [None]:
sample.map(lambda e: e[0] + e[1]).take(2)

[-84.35, -84.36]

In [None]:
sample.map(lambda e: e[0] + e[1]).mean()

-83.93784302325572

In [None]:
sample.map(lambda e: e[0] + e[1]).sum()

-1732477.0799999735

In [None]:
x = sample.reduce(lambda x, y: x + y)

list

In [None]:
len(sample.reduce(lambda x, y: x + y))/2

20640.0

In [None]:
type(sample.reduce(lambda x, y: x + y)[1:10])

list

In [None]:
x = [1,3,4,6]
x = [[1,3],[4,6]]
RDD = sc.parallelize(x)
RDD.reduce(lambda x, y : x + y)

[1, 3, 4, 6]

In [None]:
sum(sample.reduce(lambda x, y: x + y))

-1732477.0799999489

In [None]:
data3.filter(lambda line: line[2] > 40).collect()

In [None]:
d = data2.map(lambda line: [str(line[9]), float(line[1])]).groupByKey().collect()

In [None]:
for cont, air in d:
  print(cont, sum(air))

NEAR BAY 86564.41999999991
<1H OCEAN 315745.4300000009
INLAND 240630.20999999953
NEAR OCEAN 92334.77000000002
ISLAND 166.79000000000002


In [None]:
for cont, air in d:
  print(cont, np.mean(list(air)))

NEAR BAY 37.80105676855895
<1H OCEAN 34.56057683887916
INLAND 36.73182872843841
NEAR OCEAN 34.73843867569601
ISLAND 33.358000000000004
