Following this tutorial and modifying some parts: https://www.datacamp.com/community/tutorials/apache-spark-tutorial-machine-learning

In [40]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [41]:
# Build the SparkSession
spark = SparkSession.builder \
   .master("local") \
   .appName("Linear Regression Model") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()

sc = spark.sparkContext


Download the Data here: https://www.kaggle.com/camnugent/california-housing-prices

In [42]:
df = spark.read.format('com.databricks.spark.csv').\
                               options(header='true', \
                               inferschema='true').\
                load("./sample_data/housing.csv",header=True)


In [43]:
df.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [44]:
print("schema:")
df.printSchema()
print(f"Data type: {type(df)}")

schema:
root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)

Data type: <class 'pyspark.sql.dataframe.DataFrame'>


In [45]:
df.describe().show()

+-------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+---------------+
|summary|          longitude|         latitude|housing_median_age|       total_rooms|    total_bedrooms|        population|       households|     median_income|median_house_value|ocean_proximity|
+-------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+---------------+
|  count|              20640|            20640|             20640|             20640|             20433|             20640|            20640|             20640|             20640|          20640|
|   mean|-119.56970445736148| 35.6318614341087|28.639486434108527|2635.7630813953488| 537.8705525375618|1425.4767441860465|499.5396802325581|3.8706710029070246|206855.81690891474|           null|
| stddev|  2.0035317

In [46]:
df.groupby('ocean_proximity').count().sort('count', ascending=False).show()

+---------------+-----+
|ocean_proximity|count|
+---------------+-----+
|      <1H OCEAN| 9136|
|         INLAND| 6551|
|     NEAR OCEAN| 2658|
|       NEAR BAY| 2290|
|         ISLAND|    5|
+---------------+-----+



In [47]:
df.groupBy("housing_median_age").count().sort("housing_median_age",ascending=False).show()

+------------------+-----+
|housing_median_age|count|
+------------------+-----+
|              52.0| 1273|
|              51.0|   48|
|              50.0|  136|
|              49.0|  134|
|              48.0|  177|
|              47.0|  198|
|              46.0|  245|
|              45.0|  294|
|              44.0|  356|
|              43.0|  353|
|              42.0|  368|
|              41.0|  296|
|              40.0|  304|
|              39.0|  369|
|              38.0|  394|
|              37.0|  537|
|              36.0|  862|
|              35.0|  824|
|              34.0|  689|
|              33.0|  615|
+------------------+-----+
only showing top 20 rows



Add some features

In [48]:
rooms_per_house = df.select(col("total_rooms")/col("households"))
pop_per_house = df.select(col("population")/col("households"))
bedrooms_per_house = df.select(col("total_bedrooms")/col("total_rooms"))

In [49]:
# Add the new columns to `df`
df = df.withColumn("rooms_per_house", col("total_rooms")/col("households")) \
   .withColumn("pop_per_house", col("population")/col("households")) \
   .withColumn("bedrooms_per_house", col("total_bedrooms")/col("total_rooms"))

In [50]:
df.show(5)


+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------+------------------+-------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|   rooms_per_house|     pop_per_house| bedrooms_per_house|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------+------------------+-------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY| 6.984126984126984|2.5555555555555554|0.14659090909090908|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY| 6.238137082601054| 2.109841827768014|0.15579659106916466|
|  -122.24|   37.85|

In [51]:
columns_to_drop = ['longitude', 'latitude', 'total_rooms', 'housing_median_age', 'ocean_proximity']

df = df.drop(*columns_to_drop)

In [52]:
df.show(5)

+--------------+----------+----------+-------------+------------------+------------------+------------------+-------------------+
|total_bedrooms|population|households|median_income|median_house_value|   rooms_per_house|     pop_per_house| bedrooms_per_house|
+--------------+----------+----------+-------------+------------------+------------------+------------------+-------------------+
|         129.0|     322.0|     126.0|       8.3252|          452600.0| 6.984126984126984|2.5555555555555554|0.14659090909090908|
|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0| 6.238137082601054| 2.109841827768014|0.15579659106916466|
|         190.0|     496.0|     177.0|       7.2574|          352100.0| 8.288135593220339|2.8022598870056497|0.12951601908657123|
|         235.0|     558.0|     219.0|       5.6431|          341300.0|5.8173515981735155| 2.547945205479452|0.18445839874411302|
|         280.0|     565.0|     259.0|       3.8462|          342200.0| 6.281853281853282|

Scale Down the House Values 

In [53]:
df = df.withColumn("median_house_value", col("median_house_value")/100000)

In [54]:
df.show(5)

+--------------+----------+----------+-------------+------------------+------------------+------------------+-------------------+
|total_bedrooms|population|households|median_income|median_house_value|   rooms_per_house|     pop_per_house| bedrooms_per_house|
+--------------+----------+----------+-------------+------------------+------------------+------------------+-------------------+
|         129.0|     322.0|     126.0|       8.3252|             4.526| 6.984126984126984|2.5555555555555554|0.14659090909090908|
|        1106.0|    2401.0|    1138.0|       8.3014|             3.585| 6.238137082601054| 2.109841827768014|0.15579659106916466|
|         190.0|     496.0|     177.0|       7.2574|             3.521| 8.288135593220339|2.8022598870056497|0.12951601908657123|
|         235.0|     558.0|     219.0|       5.6431|             3.413|5.8173515981735155| 2.547945205479452|0.18445839874411302|
|         280.0|     565.0|     259.0|       3.8462|             3.422| 6.281853281853282|

In [55]:
# Import `DenseVector`
from pyspark.ml.linalg import DenseVector

# Define the `input_data` 
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Replace `df` with the new DataFrame
df = spark.createDataFrame(input_data, ["label", "features"])

In [56]:
df.show()

+------+--------------------+
| label|            features|
+------+--------------------+
| 129.0|[322.0,126.0,8.32...|
|1106.0|[2401.0,1138.0,8....|
| 190.0|[496.0,177.0,7.25...|
| 235.0|[558.0,219.0,5.64...|
| 280.0|[565.0,259.0,3.84...|
| 213.0|[413.0,193.0,4.03...|
| 489.0|[1094.0,514.0,3.6...|
| 687.0|[1157.0,647.0,3.1...|
| 665.0|[1206.0,595.0,2.0...|
| 707.0|[1551.0,714.0,3.6...|
| 434.0|[910.0,402.0,3.20...|
| 752.0|[1504.0,734.0,3.2...|
| 474.0|[1098.0,468.0,3.0...|
| 191.0|[345.0,174.0,2.67...|
| 626.0|[1212.0,620.0,1.9...|
| 283.0|[697.0,264.0,2.12...|
| 347.0|[793.0,331.0,2.77...|
| 293.0|[648.0,303.0,2.12...|
| 455.0|[990.0,419.0,1.99...|
| 298.0|[690.0,275.0,2.60...|
+------+--------------------+
only showing top 20 rows



In [57]:
# Import `StandardScaler` 
from pyspark.ml.feature import StandardScaler

# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df)

# Transform the data in `df` with the scaler
scaled_df = scaler.transform(df)

# Inspect the result
scaled_df.take(2)

[Row(label=129.0, features=DenseVector([322.0, 126.0, 8.3252, 4.526, 6.9841, 2.5556, 0.1466]), features_scaled=DenseVector([0.2843, 0.3296, 4.3821, 3.9222, 2.8228, 0.2461, nan])),
 Row(label=1106.0, features=DenseVector([2401.0, 1138.0, 8.3014, 3.585, 6.2381, 2.1098, 0.1558]), features_scaled=DenseVector([2.1202, 2.9765, 4.3696, 3.1067, 2.5213, 0.2031, nan]))]

In [58]:
# Split the data into train and test sets
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234)

In [60]:
# need to fix this.

# Import `LinearRegression`
from pyspark.ml.regression import LinearRegression

# Initialize `lr`
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the data to the model
# linearModel = lr.fit(train_data)