In [29]:
import findspark
from pyspark.sql.functions import *

findspark.init("G:/hadoop/Github/Hadoop-Fundamentals/venv/Lib/site-packages/pyspark")

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.ml.linalg import DenseVector
from pyspark.ml.feature import StandardScaler
from pyspark.ml.regression import LinearRegression

spark = SparkSession.builder.master("local").appName("Linear Regression Model").config("spark.executor.memory", "1gb").getOrCreate()

sc = spark.sparkContext

rdd = sc.textFile('C:/Users/lemon/Downloads/CaliforniaHousing/cal_housing.data')

header = sc.textFile('C:Users/lemon/Downloads/CaliforniaHousing/cal_housing.domain')


rdd = rdd.map(lambda line: line.split(","))

rdd.first()
rdd.take(2)


df = rdd.map(lambda line: Row(longitude=line[0],
                              latitude=line[1],
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5],
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()

df = df.withColumn("longitude", df["longitude"].cast(FloatType()))\
    .withColumn("latitude", df["latitude"].cast(FloatType()))\
    .withColumn("housingMedianAge", df["housingMedianAge"].cast(FloatType()))\
    .withColumn("totalRooms", df["totalRooms"].cast(FloatType()))\
    .withColumn("totalBedRooms", df["totalBedRooms"].cast(FloatType()))\
    .withColumn("population", df["population"].cast(FloatType()))\
    .withColumn("households", df["households"].cast(FloatType()))\
    .withColumn("medianIncome", df["medianIncome"].cast(FloatType()))\
    .withColumn("medianHouseValue", df["medianHouseValue"].cast(FloatType()))

df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)

roomsPerHousehold = df.select(col("totalRooms")/col("households"))
populationPerHousehold = df.select(col("population")/col("households"))
bedroomsPerRoom = df.select(col("totalBedRooms")/col("totalRooms"))

df = df.withColumn("roomsPerHousehold",col("totalRooms")/col("households"))\
    .withColumn("populationPerHousehold", col("population")/col("households"))\
    .withColumn("bedroomsPerRoom",col("totalBedRooms")/col("totalRooms"))

df = df.select("medianHouseValue",
               "totalBedRooms",
               "population",
               "households",
               "medianIncome",
               "roomsPerHousehold",
               "populationPerHousehold",
               "bedroomsPerRoom")

input_data = df.rdd.map(lambda x:(x[0], DenseVector(x[1:])))

df = spark.createDataFrame(input_data, ["label", "features"])

standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

scaler = standardScaler.fit(df)

scaled_df = scaler.transform(df)

scaled_df.take(2)




df.first()

df.take(2)

df.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|4.526|[129.0,322.0,126....|
|3.585|[1106.0,2401.0,11...|
|3.521|[190.0,496.0,177....|
|3.413|[235.0,558.0,219....|
|3.422|[280.0,565.0,259....|
|2.697|[213.0,413.0,193....|
|2.992|[489.0,1094.0,514...|
|2.414|[687.0,1157.0,647...|
|2.267|[665.0,1206.0,595...|
|2.611|[707.0,1551.0,714...|
|2.815|[434.0,910.0,402....|
|2.418|[752.0,1504.0,734...|
|2.135|[474.0,1098.0,468...|
|1.913|[191.0,345.0,174....|
|1.592|[626.0,1212.0,620...|
|  1.4|[283.0,697.0,264....|
|1.525|[347.0,793.0,331....|
|1.555|[293.0,648.0,303....|
|1.587|[455.0,990.0,419....|
|1.629|[298.0,690.0,275....|
+-----+--------------------+
only showing top 20 rows



In [30]:
train_data, test_data = scaled_df.randomSplit([.8,.2], seed=1234)
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)

linearModel = lr.fit(train_data)
predicted = linearModel.transform(test_data)

predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x:x[0])

predictionAndLabel = predictions.zip(labels).collect()

predictionAndLabel[:5]

print(linearModel.coefficients)

print(linearModel.intercept)

[0.0,0.0,0.0,0.27670678884943006,0.0,0.0,0.0]
0.9947076240544817


In [31]:
print(linearModel.summary.rootMeanSquaredError)

print(linearModel.summary.r2)

spark.stop()

0.8794333071050418
0.4192463677898063
