In [6]:
import os
os.environ['JAVA_HOME'] = "C:\\Program Files\\Java\\jre1.8.0_251"

In [7]:
import findspark
findspark.init()

In [8]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local") \
    .appName("Linear Regression Model") \
    .config("spark.executor.memory", "1gb") \
    .getOrCreate()
    
sc = spark.sparkContext

In [9]:
rdd = sc.textFile(".\\CaliforniaHousing\\cal_housing.data")
header = sc.textFile(".\\CaliforniaHousing\\cal_housing.domain")

In [10]:
rdd.take(2)

['-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000',
 '-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000']

In [11]:
rdd = rdd.map(lambda line: line.split(","))

In [12]:
rdd.first()

['-122.230000',
 '37.880000',
 '41.000000',
 '880.000000',
 '129.000000',
 '322.000000',
 '126.000000',
 '8.325200',
 '452600.000000']

In [13]:
rdd.top(2)

[['-124.350000',
  '40.540000',
  '52.000000',
  '1820.000000',
  '300.000000',
  '806.000000',
  '270.000000',
  '3.014700',
  '94600.000000'],
 ['-124.300000',
  '41.840000',
  '17.000000',
  '2677.000000',
  '531.000000',
  '1244.000000',
  '456.000000',
  '3.031300',
  '103600.000000']]

In [14]:
from pyspark.sql import Row

df = rdd.map(lambda line: Row(longitude=line[0],
                              latitude=line[1],
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5],
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()

In [15]:
df.show()

+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| households|housingMedianAge| latitude|  longitude|medianHouseValue|medianIncome| population|totalBedRooms| totalRooms|
+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| 126.000000|       41.000000|37.880000|-122.230000|   452600.000000|    8.325200| 322.000000|   129.000000| 880.000000|
|1138.000000|       21.000000|37.860000|-122.220000|   358500.000000|    8.301400|2401.000000|  1106.000000|7099.000000|
| 177.000000|       52.000000|37.850000|-122.240000|   352100.000000|    7.257400| 496.000000|   190.000000|1467.000000|
| 219.000000|       52.000000|37.850000|-122.250000|   341300.000000|    5.643100| 558.000000|   235.000000|1274.000000|
| 259.000000|       52.000000|37.850000|-122.250000|   342200.000000|    3.846200| 565.000000|   280.000000|1627.000000|
| 193.000000|       52.000000|37

In [16]:
from pyspark.sql.types import *

df = df.withColumn("longitude", df["longitude"].cast(FloatType())) \
    .withColumn("latitude", df["latitude"].cast(FloatType())) \
    .withColumn("housingMedianAge", df["housingMedianAge"].cast(FloatType())) \
    .withColumn("totalRooms", df["totalRooms"].cast(FloatType())) \
    .withColumn("totalBedRooms", df["totalBedRooms"].cast(FloatType())) \
    .withColumn("population", df["population"].cast(FloatType())) \
    .withColumn("households", df["households"].cast(FloatType())) \
    .withColumn("medianIncome", df["medianIncome"].cast(FloatType())) \
    .withColumn("medianHouseValue", df["medianHouseValue"].cast(FloatType()))

In [17]:
from pyspark.sql.functions import *

df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)
df.take(2)

[Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0),
 Row(households=1138.0, housingMedianAge=21.0, latitude=37.86000061035156, longitude=-122.22000122070312, medianHouseValue=3.585, medianIncome=8.301400184631348, population=2401.0, totalBedRooms=1106.0, totalRooms=7099.0)]

In [18]:
from pyspark.sql.functions import *

roomsPerHousehold = df.select(col("totalRooms")/col("households"))
populationPerHousehold = df.select(col("population")/col("households"))
bedroomsPerRoom = df.select(col("totalBedRooms")/col("totalRooms"))

df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
    .withColumn("populationPerHousehold", col("population")/col("households")) \
    .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))

df.first()

Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0, roomsPerHousehold=6.984126984126984, populationPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908)

In [19]:
df = df.select("medianHouseValue",
              "totalBedRooms",
              "population",
              "households",
              "medianIncome",
              "roomsPerHousehold",
              "populationPerHousehold",
              "bedroomsPerRoom")

In [20]:
from pyspark.ml.linalg import DenseVector

input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))
df = spark.createDataFrame(input_data, ["label", "features"])

In [21]:
from pyspark.ml.feature import StandardScaler

standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")
scaler = standardScaler.fit(df)
scaled_df = scaler.transform(df)

scaled_df.take(2)

[Row(label=4.526, features=DenseVector([129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466]), features_scaled=DenseVector([0.3062, 0.2843, 0.3296, 4.3821, 2.8228, 0.2461, 2.5264])),
 Row(label=3.585, features=DenseVector([1106.0, 2401.0, 1138.0, 8.3014, 6.2381, 2.1098, 0.1558]), features_scaled=DenseVector([2.6255, 2.1202, 2.9765, 4.3696, 2.5213, 0.2031, 2.6851]))]

In [22]:
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234)

In [23]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)

linearModel = lr.fit(train_data)

In [24]:
predicted = linearModel.transform(test_data)

predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

predictionAndLabel = predictions.zip(labels).collect()

predictionAndLabel[:5]

[(1.1340115638008952, 0.14999),
 (1.4485018834650096, 0.14999),
 (1.5713396046425587, 0.14999),
 (1.7496542762527307, 0.283),
 (1.2438468929500472, 0.366)]

In [25]:
linearModel.coefficients
linearModel.intercept

0.9841344205626824

In [26]:
linearModel.summary.rootMeanSquaredError
linearModel.summary.r2

0.42282227755911483

In [27]:
spark.stop()