In [1]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('linear_regression').getOrCreate()

# If you're getting an error with numpy, please type 'sudo pip install numpy --user' into the EC2 console.
from pyspark.ml.regression import LinearRegression
x = spark.read.csv('sample 2.csv', inferSchema=True, header=True)
df = spark.read.csv('initial_data1.csv', inferSchema=True, header=True)
m1 = spark.read.csv('initial_data2.csv', inferSchema=True, header=True)
df.columns

['Suburb',
 'Address',
 'Rooms',
 'Type',
 'Price',
 'SellerG',
 'Date',
 'Postcode',
 'Region name ',
 '_c9',
 'Property Count',
 'Distance',
 'CouncilArea']

In [2]:
df.show()
m1.columns
x.printSchema()

+--------------+------------------+-----+----+-------+-------------+----------+--------+--------------------+----+--------------+--------+--------------------+
|        Suburb|           Address|Rooms|Type|  Price|      SellerG|      Date|Postcode|        Region name | _c9|Property Count|Distance|         CouncilArea|
+--------------+------------------+-----+----+-------+-------------+----------+--------+--------------------+----+--------------+--------+--------------------+
|    Abbotsford|     49 Lithgow St|    3|   h|1490000|       Jellis|01-04-2017|    3067|Northern Metropol...|null|          4019|     3.0|  Yarra City Council|
|    Abbotsford|     59A Turner St|    3|   h|1220000|     Marshall|01-04-2017|    3067|Northern Metropol...|null|          4019|     3.0|  Yarra City Council|
|    Abbotsford|     119B Yarra St|    3|   h|1420000|       Nelson|01-04-2017|    3067|Northern Metropol...|null|          4019|     3.0|  Yarra City Council|
|    Aberfeldie|        68 Vida St|    3

In [3]:
# Print the schema of the DataFrame. You can see potential features as well as the predictor.
df.printSchema()

root
 |-- Suburb: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Rooms: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- Price: integer (nullable = true)
 |-- SellerG: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Postcode: integer (nullable = true)
 |-- Region name : string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- Property Count: integer (nullable = true)
 |-- Distance: double (nullable = true)
 |-- CouncilArea: string (nullable = true)



In [4]:
#a= df.drop('_c9')
for item in df.head():
    print(item)

Abbotsford
49 Lithgow St
3
h
1490000
Jellis
01-04-2017
3067
Northern Metropolitan
None
4019
3.0
Yarra City Council


In [5]:
m= df.union(m1)
m.count()

1498

In [6]:
a = m.na.drop()
a.count()

375

In [7]:
final = a.drop('_c9')
final.count()

375

In [8]:
final.show()
final.printSchema()

+--------------+------------------+-----+----+-------+-------+-------------+----------+------------+--------------+--------+--------------------+
|        Suburb|           Address|Rooms|Type|  Price|SellerG|         Date|  Postcode|Region name |Property Count|Distance|         CouncilArea|
+--------------+------------------+-----+----+-------+-------+-------------+----------+------------+--------------+--------+--------------------+
|      Parkdale|        13B Elm Gr|    3|   t|1000000|      S|       Obrien|01-07-2017|        3195|          5087|    21.5|Kingston City Cou...|
|      Parkdale|  13 Lawborough Av|    3|   h| 985000|      S|     Maitland|01-07-2017|        3195|          5087|    21.5|Kingston City Cou...|
|      Parkdale|     44 Melrose St|    3|   h| 820000|      S|      Thomson|01-07-2017|        3195|          5087|    21.5|Kingston City Cou...|
|   Pascoe Vale| 123 Cumberland Rd|    3|   h| 870000|     PI|          Ray|01-07-2017|        3044|          7485|     8.5|

In [9]:
# Import VectorAssembler and Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [10]:
# The input columns are the feature column names, and the output column is what you'd like the new column to be named. 
assembler = VectorAssembler(
    inputCols=["Price","Rooms","Distance"],
    outputCol="features")

In [11]:
# Now that we've created the assembler variable, let's actually transform the data.
output = assembler.transform(final)

In [12]:
# Using print schema, you see that the features output column has been added. 
output.printSchema()

# You can see that the features column is a dense vector that combines the various features as expected.
output.head(1)

root
 |-- Suburb: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Rooms: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- Price: integer (nullable = true)
 |-- SellerG: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Postcode: string (nullable = true)
 |-- Region name : string (nullable = true)
 |-- Property Count: integer (nullable = true)
 |-- Distance: double (nullable = true)
 |-- CouncilArea: string (nullable = true)
 |-- features: vector (nullable = true)



[Row(Suburb='Parkdale', Address='13B Elm Gr', Rooms=3, Type='t', Price=1000000, SellerG='S', Date='Obrien', Postcode='01-07-2017', Region name ='3195', Property Count=5087, Distance=21.5, CouncilArea='Kingston City Council', features=DenseVector([1000000.0, 3.0, 21.5]))]

In [13]:
# Let's select two columns (the feature and predictor).
# This is now in the appropriate format to be processed by Spark.
final_data = output.select("features",'Price')
final_data.show()

+--------------------+-------+
|            features|  Price|
+--------------------+-------+
|[1000000.0,3.0,21.5]|1000000|
| [985000.0,3.0,21.5]| 985000|
| [820000.0,3.0,21.5]| 820000|
|  [870000.0,3.0,8.5]| 870000|
|  [800000.0,3.0,8.5]| 800000|
| [1501000.0,5.0,8.5]|1501000|
| [1663000.0,3.0,8.5]|1663000|
|  [780000.0,4.0,8.5]| 780000|
| [660000.0,5.0,14.7]| 660000|
| [663500.0,4.0,14.7]| 663500|
| [1631000.0,3.0,3.5]|1631000|
| [1665000.0,3.0,3.5]|1665000|
| [2450000.0,4.0,3.5]|2450000|
|  [399000.0,1.0,4.6]| 399000|
| [1240000.0,2.0,4.6]|1240000|
|  [727500.0,3.0,8.4]| 727500|
|  [965000.0,3.0,8.4]| 965000|
| [2530000.0,2.0,3.5]|2530000|
| [1930000.0,3.0,3.5]|1930000|
| [840000.0,3.0,12.0]| 840000|
+--------------------+-------+
only showing top 20 rows



In [14]:
# Let's do a randomised 70/30 split. 
# Remember, you can use other splits depending on how easy/difficult it is to train your model.
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [15]:
# Let's see our training data.
train_data.describe().show()

# And our testing data.
test_data.describe().show()

+-------+-----------------+
|summary|            Price|
+-------+-----------------+
|  count|              250|
|   mean|         993940.8|
| stddev|547454.2046602262|
|    min|           340000|
|    max|          5000000|
+-------+-----------------+

+-------+-----------------+
|summary|            Price|
+-------+-----------------+
|  count|              125|
|   mean|        1024012.0|
| stddev|546847.4157061697|
|    min|           377000|
|    max|          2850000|
+-------+-----------------+



In [16]:
lr = LinearRegression(labelCol='Price')

In [17]:
# Fit the model to the data.
lrModel = lr.fit(train_data)

In [18]:
# Print the coefficients and intercept for linear regression.
print("Coefficients: {} Intercept: {}".format(lrModel.coefficients,lrModel.intercept))

Coefficients: [0.9999999999999983,6.655285360639536e-10,-5.271961956991881e-11] Intercept: 3.585523874941904e-10


In [19]:
# Let's evaluate the model against the test data.
test_results = lrModel.evaluate(test_data)

In [20]:
# Interesting results! This shows the difference between the predicted value and the test data.
test_results.residuals.show()

# Let's get some evaluation metrics (as discussed in the previous linear regression notebook).
print("RSME: {}".format(test_results.rootMeanSquaredError))

+--------------------+
|           residuals|
+--------------------+
|5.820766091346741...|
|-5.82076609134674...|
|-1.16415321826934...|
|5.820766091346741...|
|1.164153218269348...|
|-5.23868948221206...|
|-1.74622982740402...|
|                 0.0|
|-5.82076609134674...|
|-9.31322574615478...|
|-6.40284270048141...|
|-5.23868948221206...|
|-4.65661287307739...|
|-5.82076609134674...|
|-1.22236087918281...|
|-1.16415321826934...|
|-5.23868948221206...|
|-4.65661287307739...|
|-8.14907252788543...|
|-6.98491930961608...|
+--------------------+
only showing top 20 rows

RSME: 7.450944385923186e-10


In [21]:
# We can also get the R2 value. 
print("R2: {}".format(test_results.r2))

R2: 1.0


In [22]:
final_data.describe().show()

+-------+------------------+
|summary|             Price|
+-------+------------------+
|  count|               375|
|   mean|1003964.5333333333|
| stddev| 546704.7536307834|
|    min|            340000|
|    max|           5000000|
+-------+------------------+



In [23]:
# Let's just select the features column (removing the label column).
unlabeled_data = test_data.select('features')
unlabeled_data.show()

+-------------------+
|           features|
+-------------------+
| [377000.0,1.0,7.3]|
|[385000.0,2.0,18.4]|
| [399000.0,1.0,4.6]|
|[413000.0,3.0,31.7]|
| [420000.0,1.0,7.2]|
|[420000.0,3.0,20.6]|
|[445000.0,2.0,14.0]|
|[450000.0,2.0,17.9]|
|[451500.0,3.0,18.4]|
| [452000.0,2.0,0.0]|
| [460000.0,2.0,5.0]|
| [460000.0,2.0,7.5]|
|[460000.0,3.0,20.6]|
|[480000.0,3.0,27.2]|
|[510000.0,4.0,17.6]|
|[515000.0,3.0,25.9]|
| [520000.0,2.0,5.7]|
| [525000.0,2.0,5.0]|
|[525000.0,3.0,10.5]|
|[525000.0,4.0,25.5]|
+-------------------+
only showing top 20 rows



In [24]:
# Now we can transform the unlabeled data.
predictions = lrModel.transform(unlabeled_data)

In [25]:
# It worked! Feeding the unlabeled data features into the model results in a prediction, 
# which is the amount someone with those features is likely to spend in a year.
predictions.show()
predictions.head(1)

+-------------------+------------------+
|           features|        prediction|
+-------------------+------------------+
| [377000.0,1.0,7.3]|376999.99999999994|
|[385000.0,2.0,18.4]|385000.00000000006|
| [399000.0,1.0,4.6]| 399000.0000000001|
|[413000.0,3.0,31.7]|412999.99999999994|
| [420000.0,1.0,7.2]| 419999.9999999999|
|[420000.0,3.0,20.6]| 420000.0000000005|
|[445000.0,2.0,14.0]| 445000.0000000002|
|[450000.0,2.0,17.9]|          450000.0|
|[451500.0,3.0,18.4]| 451500.0000000006|
| [452000.0,2.0,0.0]|452000.00000000093|
| [460000.0,2.0,5.0]|460000.00000000064|
| [460000.0,2.0,7.5]| 460000.0000000005|
|[460000.0,3.0,20.6]|460000.00000000047|
|[480000.0,3.0,27.2]|480000.00000000006|
|[510000.0,4.0,17.6]| 510000.0000000012|
|[515000.0,3.0,25.9]| 515000.0000000001|
| [520000.0,2.0,5.7]| 520000.0000000005|
| [525000.0,2.0,5.0]| 525000.0000000005|
|[525000.0,3.0,10.5]| 525000.0000000008|
|[525000.0,4.0,25.5]| 525000.0000000007|
+-------------------+------------------+
only showing top

[Row(features=DenseVector([377000.0, 1.0, 7.3]), prediction=376999.99999999994)]