In [2]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('Homes').getOrCreate()

In [4]:
!ls

[34mAutoML551a6fc0840[m[m              Untitled.ipynb
Context_v_Session.ipynb        house_data.csv
MLR_in_R.pdf                   influencial_outliers_notes.pdf
README.md


In [6]:
df_pyspark = spark.read.option('header', 'true').csv('house_data.csv', inferSchema=True)

In [8]:
df_pyspark.printSchema()

root
 |-- id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- price: double (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- bathrooms: double (nullable = true)
 |-- sqft_living: integer (nullable = true)
 |-- sqft_lot: integer (nullable = true)
 |-- floors: double (nullable = true)
 |-- waterfront: integer (nullable = true)
 |-- view: integer (nullable = true)
 |-- condition: integer (nullable = true)
 |-- grade: integer (nullable = true)
 |-- sqft_above: integer (nullable = true)
 |-- sqft_basement: integer (nullable = true)
 |-- yr_built: integer (nullable = true)
 |-- yr_renovated: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- sqft_living15: integer (nullable = true)
 |-- sqft_lot15: integer (nullable = true)
 |-- num: integer (nullable = true)



In [10]:
df_pyspark.show(2)

+----------+---------------+--------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+------------+-------+-------+--------+-------------+----------+---+
|        id|           date|   price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|condition|grade|sqft_above|sqft_basement|yr_built|yr_renovated|zipcode|    lat|    long|sqft_living15|sqft_lot15|num|
+----------+---------------+--------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+------------+-------+-------+--------+-------------+----------+---+
|7129300520|20141013T000000|221900.0|       3|      1.0|       1180|    5650|   1.0|         0|   0|        3|    7|      1180|            0|    1955|           0|  98178|47.5112|-122.257|         1340|      5650|  1|
|6414100192|20141209T000000|538000.0|       3|     2.25|       2570|    7242|   2.0|         0|   0|        3|    7|      2170| 

In [12]:
df_pyspark.select(['bedrooms','bedrooms']).show(2)

+--------+--------+
|bedrooms|bedrooms|
+--------+--------+
|       3|       3|
|       3|       3|
+--------+--------+
only showing top 2 rows



In [16]:
df_pyspark.describe(['price', 'bedrooms', 'bathrooms']).show()

+-------+------------------+-----------------+------------------+
|summary|             price|         bedrooms|         bathrooms|
+-------+------------------+-----------------+------------------+
|  count|             21613|            21613|             21613|
|   mean| 540182.1587933188| 3.37084162309721|2.1147573219821405|
| stddev|367362.23171800876|0.930061831147451| 0.770163157217741|
|    min|           75000.0|                0|               0.0|
|    max|         7700000.0|               33|               8.0|
+-------+------------------+-----------------+------------------+



In [46]:
from pyspark.sql.functions import col
from pyspark.sql.functions import log

df_pyspark = df_pyspark.withColumn("logvalue", log("price"))

#df.withColumn("bla", log("double_col")).show()

In [50]:
df_pyspark.select(['price', 'logvalue']).show(2)

+--------+------------------+
|   price|          logvalue|
+--------+------------------+
|221900.0|12.309982108920686|
|538000.0|13.195613839143922|
+--------+------------------+
only showing top 2 rows



In [52]:
df_pyspark.drop('date', 'id', 'lat', 'long')

DataFrame[price: double, bedrooms: int, bathrooms: double, sqft_living: int, sqft_lot: int, floors: double, waterfront: int, view: int, condition: int, grade: int, sqft_above: int, sqft_basement: int, yr_built: int, yr_renovated: int, zipcode: int, sqft_living15: int, sqft_lot15: int, num: int, logvalue: double]

In [58]:
selection = df_pyspark.filter('bedrooms > 11')

In [60]:
selection.show()

+----------+---------------+--------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+------------+-------+-------+--------+-------------+----------+---+------------------+
|        id|           date|   price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|condition|grade|sqft_above|sqft_basement|yr_built|yr_renovated|zipcode|    lat|    long|sqft_living15|sqft_lot15|num|          logvalue|
+----------+---------------+--------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+------------+-------+-------+--------+-------------+----------+---+------------------+
|2402100895|20140625T000000|640000.0|      33|     1.75|       1620|    6000|   1.0|         0|   0|        5|    7|      1040|          580|    1947|           0|  98103|47.6878|-122.331|         1330|      4700|  1|13.369223455335854|
+----------+---------------+--------+--------+------

In [67]:
df_pyspark.select(['price', 'bedrooms']).groupBy('bedrooms').max().show()

+--------+----------+-------------+
|bedrooms|max(price)|max(bedrooms)|
+--------+----------+-------------+
|       1| 1250000.0|            1|
|       6| 7700000.0|            6|
|       3| 3800000.0|            3|
|       5| 7060000.0|            5|
|       9| 1400000.0|            9|
|       4| 4490000.0|            4|
|       8| 3300000.0|            8|
|       7| 3200000.0|            7|
|      10| 1150000.0|           10|
|      11|  520000.0|           11|
|      33|  640000.0|           33|
|       2| 3280000.0|            2|
|       0| 1300000.0|            0|
+--------+----------+-------------+



In [70]:
df_pyspark.columns

['id',
 'date',
 'price',
 'bedrooms',
 'bathrooms',
 'sqft_living',
 'sqft_lot',
 'floors',
 'waterfront',
 'view',
 'condition',
 'grade',
 'sqft_above',
 'sqft_basement',
 'yr_built',
 'yr_renovated',
 'zipcode',
 'lat',
 'long',
 'sqft_living15',
 'sqft_lot15',
 'num',
 'logvalue']

In [72]:
from pyspark.ml.feature import VectorAssembler

In [73]:
featureAssembler = VectorAssembler(inputCols=['bedrooms', 'bathrooms', 'floors'], outputCol="Independent Features")

In [78]:
output = featureAssembler.transform(df_pyspark)

In [86]:
output.select("Independent Features").show(3)

+--------------------+
|Independent Features|
+--------------------+
|       [3.0,1.0,1.0]|
|      [3.0,2.25,2.0]|
|       [2.0,1.0,1.0]|
+--------------------+
only showing top 3 rows



In [85]:
finalized_dataset = output.select(['logvalue', 'Independent Features'])
finalized_dataset.show(3)

+------------------+--------------------+
|          logvalue|Independent Features|
+------------------+--------------------+
|12.309982108920686|       [3.0,1.0,1.0]|
|13.195613839143922|      [3.0,2.25,2.0]|
|12.100712129872347|       [2.0,1.0,1.0]|
+------------------+--------------------+
only showing top 3 rows



In [87]:
from pyspark.ml.regression import LinearRegression 
train_data, test_data = finalized_dataset.randomSplit([0.75, 0.25])

In [90]:
regr = LinearRegression(featuresCol="Independent Features", labelCol="logvalue")
regr = regr.fit(train_data)

Exception ignored in: <function JavaWrapper.__del__ at 0x7f852c2e6310>
Traceback (most recent call last):
  File "/Users/dmitrymikhaylov/opt/anaconda3/lib/python3.8/site-packages/pyspark/ml/wrapper.py", line 39, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'LinearRegression' object has no attribute '_java_obj'


In [91]:
regr.coefficients

DenseVector([0.0498, 0.3233, 0.0559])

In [92]:
regr.intercept

12.112741435876528

In [96]:
pred_results = regr.evaluate(test_data)
pred_results.predictions.show(10)

+------------------+--------------------+------------------+
|          logvalue|Independent Features|        prediction|
+------------------+--------------------+------------------+
| 11.26446410567173|       [2.0,1.0,1.0]| 12.59150574718085|
|11.289781913656018|      [1.0,0.75,1.0]|12.460902813068595|
|11.326595886778735|       [2.0,1.0,1.0]| 12.59150574718085|
|11.350406535472453|       [2.0,1.0,1.0]| 12.59150574718085|
| 11.36789969291997|       [3.0,1.0,1.0]|12.641289364067557|
|11.396391648714276|       [3.0,1.0,1.0]|12.641289364067557|
|11.407564949312402|       [1.0,1.0,1.0]|12.541722130294144|
|11.477298287327077|       [3.0,1.0,1.0]|12.641289364067557|
|11.502875129116727|       [2.0,1.0,1.0]| 12.59150574718085|
|11.512925464970229|       [2.0,1.0,1.0]| 12.59150574718085|
+------------------+--------------------+------------------+
only showing top 10 rows





In [97]:
pred_results.meanAbsoluteError, pred_results.meanSquaredError

(0.34453702107444745, 0.18589940619284742)

In [98]:
pred_results.r2

0.328485851990698