In [61]:
import findspark

findspark.init()

# Spark program: linear regression model application 

In [62]:
from pyspark.sql import SparkSession

# build the spark session
spark = SparkSession.builder \
    .master("local") \
    .appName("Linear Regression Model") \
    .config("spark.executer.memory", "1gb") \
    .getOrCreate()

In [63]:
sc = spark.sparkContext

In [64]:
rdd = sc.textFile('/Users/amirkotobi/VSProjects/spark_ml_regression/dataset/CaliforniaHousing/cal_housing.data')
header = sc.textFile('/Users/amirkotobi/VSProjects/spark_ml_regression/dataset/CaliforniaHousing/cal_housing.domain')

In [65]:
header.collect()

['longitude: continuous.',
 'latitude: continuous.',
 'housingMedianAge: continuous. ',
 'totalRooms: continuous. ',
 'totalBedrooms: continuous. ',
 'population: continuous. ',
 'households: continuous. ',
 'medianIncome: continuous. ',
 'medianHouseValue: continuous. ']

In [66]:
rdd.take(2)

['-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000',
 '-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000']

In [67]:
# split lines on commas
rdd = rdd.map(lambda line: line.split(","))

In [68]:
rdd.take(2)

[['-122.230000',
  '37.880000',
  '41.000000',
  '880.000000',
  '129.000000',
  '322.000000',
  '126.000000',
  '8.325200',
  '452600.000000'],
 ['-122.220000',
  '37.860000',
  '21.000000',
  '7099.000000',
  '1106.000000',
  '2401.000000',
  '1138.000000',
  '8.301400',
  '358500.000000']]

inspect the data 

In [69]:
rdd.first()

['-122.230000',
 '37.880000',
 '41.000000',
 '880.000000',
 '129.000000',
 '322.000000',
 '126.000000',
 '8.325200',
 '452600.000000']

In [70]:
rdd.top(2)

[['-124.350000',
  '40.540000',
  '52.000000',
  '1820.000000',
  '300.000000',
  '806.000000',
  '270.000000',
  '3.014700',
  '94600.000000'],
 ['-124.300000',
  '41.840000',
  '17.000000',
  '2677.000000',
  '531.000000',
  '1244.000000',
  '456.000000',
  '3.031300',
  '103600.000000']]

## Convert RDD to data frame

In [71]:
from pyspark.sql import Row

In [72]:
df = rdd.map(lambda line: Row(longitude=line[0],
                              latitude=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()

In [73]:
df.show()

[Stage 31:>                                                         (0 + 1) / 1]

+-----------+---------+----------------+-----------+-------------+-----------+-----------+------------+----------------+
|  longitude| latitude|housingMedianAge| totalRooms|totalBedRooms| population| households|medianIncome|medianHouseValue|
+-----------+---------+----------------+-----------+-------------+-----------+-----------+------------+----------------+
|-122.230000|37.880000|       41.000000| 880.000000|   129.000000| 322.000000| 126.000000|    8.325200|   452600.000000|
|-122.220000|37.860000|       21.000000|7099.000000|  1106.000000|2401.000000|1138.000000|    8.301400|   358500.000000|
|-122.240000|37.850000|       52.000000|1467.000000|   190.000000| 496.000000| 177.000000|    7.257400|   352100.000000|
|-122.250000|37.850000|       52.000000|1274.000000|   235.000000| 558.000000| 219.000000|    5.643100|   341300.000000|
|-122.250000|37.850000|       52.000000|1627.000000|   280.000000| 565.000000| 259.000000|    3.846200|   342200.000000|
|-122.250000|37.850000|       52

23/10/20 17:01:16 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 31 (TID 27): Attempting to kill Python Worker
                                                                                

In [74]:
df.head()

23/10/20 17:01:20 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 32 (TID 28): Attempting to kill Python Worker
                                                                                

Row(longitude='-122.230000', latitude='37.880000', housingMedianAge='41.000000', totalRooms='880.000000', totalBedRooms='129.000000', population='322.000000', households='126.000000', medianIncome='8.325200', medianHouseValue='452600.000000')

In [75]:
# print data types
df.dtypes

[('longitude', 'string'),
 ('latitude', 'string'),
 ('housingMedianAge', 'string'),
 ('totalRooms', 'string'),
 ('totalBedRooms', 'string'),
 ('population', 'string'),
 ('households', 'string'),
 ('medianIncome', 'string'),
 ('medianHouseValue', 'string')]

In [76]:
# print data types
df.printSchema()

root
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- housingMedianAge: string (nullable = true)
 |-- totalRooms: string (nullable = true)
 |-- totalBedRooms: string (nullable = true)
 |-- population: string (nullable = true)
 |-- households: string (nullable = true)
 |-- medianIncome: string (nullable = true)
 |-- medianHouseValue: string (nullable = true)



## Casting float data type to each column 

In [77]:
from pyspark.sql.types import *

In [78]:
df = df.withColumn("longitude", df["longitude"].cast(FloatType())) \
    .withColumn("latitude", df["latitude"].cast(FloatType())) \
    .withColumn("housingMedianAge", df["housingMedianAge"].cast(FloatType())) \
    .withColumn("totalRooms", df["totalRooms"].cast(FloatType())) \
    .withColumn("totalBedRooms", df["totalBedRooms"].cast(FloatType())) \
    .withColumn("population", df["population"].cast(FloatType())) \
    .withColumn("households", df["households"].cast(FloatType())) \
    .withColumn("medianIncome", df["medianIncome"].cast(FloatType())) \
    .withColumn("medianHouseValue", df["medianHouseValue"].cast(FloatType()))

In [79]:
df.printSchema()

root
 |-- longitude: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- housingMedianAge: float (nullable = true)
 |-- totalRooms: float (nullable = true)
 |-- totalBedRooms: float (nullable = true)
 |-- population: float (nullable = true)
 |-- households: float (nullable = true)
 |-- medianIncome: float (nullable = true)
 |-- medianHouseValue: float (nullable = true)



In [80]:
type(df)

pyspark.sql.dataframe.DataFrame

In [81]:
# writing a function to convert datatypes for all columns 
def convert_column_dtype(df, col_names, new_dtype):
    for name in col_names:
        df = df.withColumn(name, df[name].cast(new_dtype))
    return df

In [82]:
columns = ['households', 'housingMedianAge', 'latitude', 'longitude', 
             'medianHouseValue', 'medianIncome', 'population', 
             'totalBedRooms', 'totalRooms']

convert_column_dtype(df, columns, FloatType())

DataFrame[longitude: float, latitude: float, housingMedianAge: float, totalRooms: float, totalBedRooms: float, population: float, households: float, medianIncome: float, medianHouseValue: float]

In [83]:
df.select('population', 'totalRooms').show(20)

[Stage 33:>                                                         (0 + 1) / 1]

+----------+----------+
|population|totalRooms|
+----------+----------+
|     322.0|     880.0|
|    2401.0|    7099.0|
|     496.0|    1467.0|
|     558.0|    1274.0|
|     565.0|    1627.0|
|     413.0|     919.0|
|    1094.0|    2535.0|
|    1157.0|    3104.0|
|    1206.0|    2555.0|
|    1551.0|    3549.0|
|     910.0|    2202.0|
|    1504.0|    3503.0|
|    1098.0|    2491.0|
|     345.0|     696.0|
|    1212.0|    2643.0|
|     697.0|    1120.0|
|     793.0|    1966.0|
|     648.0|    1228.0|
|     990.0|    2239.0|
|     690.0|    1503.0|
+----------+----------+
only showing top 20 rows



23/10/20 17:01:25 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 33 (TID 29): Attempting to kill Python Worker
                                                                                

In [84]:
# sql query on housingMedianAge variable
df.groupby("housingMedianAge").count() \
    .sort("housingMedianAge", ascending=False).show()

+----------------+-----+
|housingMedianAge|count|
+----------------+-----+
|            52.0| 1273|
|            51.0|   48|
|            50.0|  136|
|            49.0|  134|
|            48.0|  177|
|            47.0|  198|
|            46.0|  245|
|            45.0|  294|
|            44.0|  356|
|            43.0|  353|
|            42.0|  368|
|            41.0|  296|
|            40.0|  304|
|            39.0|  369|
|            38.0|  394|
|            37.0|  537|
|            36.0|  862|
|            35.0|  824|
|            34.0|  689|
|            33.0|  615|
+----------------+-----+
only showing top 20 rows



In [85]:
# summary of the data
df.describe().show()

+-------+-------------------+-----------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+
|summary|          longitude|         latitude|  housingMedianAge|        totalRooms|    totalBedRooms|        population|       households|      medianIncome|  medianHouseValue|
+-------+-------------------+-----------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+
|  count|              20640|            20640|             20640|             20640|            20640|             20640|            20640|             20640|             20640|
|   mean|-119.56970444871473|35.63186143109965|28.639486434108527|2635.7630813953488|537.8980135658915|1425.4767441860465|499.5396802325581|3.8706710030346416|206855.81690891474|
| stddev|  2.003531742932898|2.135952380602968| 12.58555761211163|2181.6152515827944| 421.247905943133|  

Due to the big range of data for each variable, normalisation is needed 

## processing the target values

In [86]:
from pyspark.sql.functions import *

In [87]:
df = df.withColumn("medianHouseValue", col("medianHouseValue") / 100000)

In [88]:
df.take(2)

[Row(longitude=-122.2300033569336, latitude=37.880001068115234, housingMedianAge=41.0, totalRooms=880.0, totalBedRooms=129.0, population=322.0, households=126.0, medianIncome=8.325200080871582, medianHouseValue=4.526),
 Row(longitude=-122.22000122070312, latitude=37.86000061035156, housingMedianAge=21.0, totalRooms=7099.0, totalBedRooms=1106.0, population=2401.0, households=1138.0, medianIncome=8.301400184631348, medianHouseValue=3.585)]

## Feature engineering 

add new features to the data

- rooms per household
- population per household
- bedrooms per room

In [89]:
df = df.withColumn("roomsPerHousehold", col("totalRooms") / col("households")) \
    .withColumn("popPerHousehold", col("population") / col("households")) \
    .withColumn("bedroomsPerRoom", col("totalBedRooms") / col("totalRooms"))

In [90]:
df.show(10)

[Stage 41:>                                                         (0 + 1) / 1]

+---------+--------+----------------+----------+-------------+----------+----------+------------+----------------+------------------+------------------+-------------------+
|longitude|latitude|housingMedianAge|totalRooms|totalBedRooms|population|households|medianIncome|medianHouseValue| roomsPerHousehold|   popPerHousehold|    bedroomsPerRoom|
+---------+--------+----------------+----------+-------------+----------+----------+------------+----------------+------------------+------------------+-------------------+
|  -122.23|   37.88|            41.0|     880.0|        129.0|     322.0|     126.0|      8.3252|           4.526| 6.984126984126984|2.5555555555555554|0.14659090909090908|
|  -122.22|   37.86|            21.0|    7099.0|       1106.0|    2401.0|    1138.0|      8.3014|           3.585| 6.238137082601054| 2.109841827768014|0.15579659106916466|
|  -122.24|   37.85|            52.0|    1467.0|        190.0|     496.0|     177.0|      7.2574|           3.521| 8.288135593220339|2.

23/10/20 17:01:29 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 41 (TID 35): Attempting to kill Python Worker
                                                                                

reordering the columns for later standardizatiom

In [91]:
df = df.select(
              "medianHouseValue", 
              "totalBedRooms", 
              "population", 
              "households", 
              "medianIncome", 
              "roomsPerHousehold", 
              "popPerHousehold", 
              "bedroomsPerRoom"
)

In [92]:
df.first()

23/10/20 17:01:33 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 42 (TID 36): Attempting to kill Python Worker
                                                                                

Row(medianHouseValue=4.526, totalBedRooms=129.0, population=322.0, households=126.0, medianIncome=8.325200080871582, roomsPerHousehold=6.984126984126984, popPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908)

# Standardization

seperate the target variable from the rest 

In [93]:
from pyspark.ml.linalg import DenseVector

In [95]:
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))
df = spark.createDataFrame(input_data, ["label", "features"])

In [96]:
df.show(10)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|4.526|[129.0,322.0,126....|
|3.585|[1106.0,2401.0,11...|
|3.521|[190.0,496.0,177....|
|3.413|[235.0,558.0,219....|
|3.422|[280.0,565.0,259....|
|2.697|[213.0,413.0,193....|
|2.992|[489.0,1094.0,514...|
|2.414|[687.0,1157.0,647...|
|2.267|[665.0,1206.0,595...|
|2.611|[707.0,1551.0,714...|
+-----+--------------------+
only showing top 10 rows

