In [1]:
import os
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
os.environ['HADOOP_HOME'] = "C:\\spark-3.2.0-bin-hadoop3.2"

In [3]:
spark = SparkSession.builder.appName("App Driver...").master("local[4]").getOrCreate()

In [4]:
spark.sparkContext.setLogLevel("ERROR")
print(spark.sparkContext.getConf().getAll())

[('spark.master', 'local[4]'), ('spark.rdd.compress', 'True'), ('spark.driver.port', '52604'), ('spark.serializer.objectStreamReset', '100'), ('spark.app.startTime', '1635189732027'), ('spark.submit.pyFiles', ''), ('spark.executor.id', 'driver'), ('spark.submit.deployMode', 'client'), ('spark.driver.host', 'LAPTOP-CA8SB239'), ('spark.sql.warehouse.dir', 'file:/C:/Users/Sid%20Ousmane/spark-warehouse'), ('spark.app.id', 'local-1635189732975'), ('spark.app.name', 'App Driver...'), ('spark.ui.showConsoleProgress', 'true')]


In [5]:
input_file = "C:/Users/Sid Ousmane/Documents/IAA/séminaire 1/kc_house_data.csv"

In [6]:
raw_data_df = spark.read.option("header","true").csv(input_file)

In [7]:
raw_data_df.printSchema

<bound method DataFrame.printSchema of DataFrame[id: string, date: string, price: string, bedrooms: string, bathrooms: string, sqft_living: string, sqft_lot: string, floors: string, waterfront: string, view: string, condition: string, grade: string, sqft_above: string, sqft_basement: string, yr_built: string, yr_renovated: string, zipcode: string, lat: string, long: string, sqft_living15: string, sqft_lot15: string]>

In [8]:
print('Number of rows: ', raw_data_df.count())

Number of rows:  21613


In [9]:
raw_data_df.show(5, truncate=False)

+----------+---------------+------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+------------+-------+-------+--------+-------------+----------+
|id        |date           |price |bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|condition|grade|sqft_above|sqft_basement|yr_built|yr_renovated|zipcode|lat    |long    |sqft_living15|sqft_lot15|
+----------+---------------+------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+------------+-------+-------+--------+-------------+----------+
|7129300520|20141013T000000|221900|3       |1        |1180       |5650    |1     |0         |0   |3        |7    |1180      |0            |1955    |0           |98178  |47.5112|-122.257|1340         |5650      |
|6414100192|20141209T000000|538000|3       |2.25     |2570       |7242    |2     |0         |0   |3        |7    |2170      |400          |1951    |1991

In [10]:
raw_data_df.createOrReplaceTempView("KingCountyHouses")

In [11]:
query = 'SELECT zipcode, avg(price) avgPrice '\
        'FROM KingCountyHouses '\
        'GROUP BY zipcode ' \
        'ORDER BY avgPrice DESC'

In [12]:
result = spark.sql(query)

In [13]:
result.show()

+-------+------------------+
|zipcode|          avgPrice|
+-------+------------------+
|  98039|         2160606.6|
|  98004|1355927.0820189274|
|  98040|1194230.0212765958|
|  98112| 1095499.342007435|
|  98102| 901258.2666666667|
|  98109| 879623.6238532111|
|  98105| 862825.2314410481|
|  98006| 859684.7791164658|
|  98119| 849448.0163043478|
|  98005|        810164.875|
|  98033| 803719.5231481482|
|  98199|  791820.807570978|
|  98075| 790576.6545961003|
|  98074|  685605.775510204|
|  98077| 682774.8787878788|
|  98053| 678163.0592592593|
|  98177| 676185.3921568628|
|  98008| 645507.3780918728|
|  98052|  645231.456445993|
|  98122| 634360.1793103449|
+-------+------------------+
only showing top 20 rows



In [14]:
fact_table_query = 'SELECT bedrooms, bathrooms, sqft_living, sqft_lot, price' \
                    ' FROM  KingCountyHouses'

In [15]:
fact_table = spark.sql(fact_table_query)
fact_table = fact_table.select(*(F.col(c).cast('float').alias(c) for c in fact_table.columns))
fact_table.show()

+--------+---------+-----------+--------+---------+
|bedrooms|bathrooms|sqft_living|sqft_lot|    price|
+--------+---------+-----------+--------+---------+
|     3.0|      1.0|     1180.0|  5650.0| 221900.0|
|     3.0|     2.25|     2570.0|  7242.0| 538000.0|
|     2.0|      1.0|      770.0| 10000.0| 180000.0|
|     4.0|      3.0|     1960.0|  5000.0| 604000.0|
|     3.0|      2.0|     1680.0|  8080.0| 510000.0|
|     4.0|      4.5|     5420.0|101930.0|1225000.0|
|     3.0|     2.25|     1715.0|  6819.0| 257500.0|
|     3.0|      1.5|     1060.0|  9711.0| 291850.0|
|     3.0|      1.0|     1780.0|  7470.0| 229500.0|
|     3.0|      2.5|     1890.0|  6560.0| 323000.0|
|     3.0|      2.5|     3560.0|  9796.0| 662500.0|
|     2.0|      1.0|     1160.0|  6000.0| 468000.0|
|     3.0|      1.0|     1430.0| 19901.0| 310000.0|
|     3.0|     1.75|     1370.0|  9680.0| 400000.0|
|     5.0|      2.0|     1810.0|  4850.0| 530000.0|
|     4.0|      3.0|     2950.0|  5000.0| 650000.0|
|     3.0|  

# Regression lineaire avec PySpark

In [16]:
# Mettons les données sous la forme de d'un couple (labelo, features)
features = ['bedrooms', 'bathrooms', 'sqft_living', 'sqft_lot']
vectorAssembler = VectorAssembler(inputCols=features, outputCol='features')
fact_df = vectorAssembler.transform(fact_table)
fact_df = fact_df.select(['features', 'price'])

In [17]:
# Etudions l'évolution des prix des maisons
train_data, test_data = fact_df.randomSplit([0.75, 0.25])

lr = LinearRegression(featuresCol='features', labelCol='price',
                     maxIter=10, regParam=0.3, elasticNetParam=0.8)

lrModel = lr.fit(train_data)


# Affichons les coefficients et l'intercept pour la regression logistique
trainingSummary = lrModel.summary
print("nombre d'iteration: ", trainingSummary.totalIterations)
print("objective historique: ", str(trainingSummary.objectiveHistory))

nombre d'iteration:  10
objective historique:  [0.5, 0.45618203070336844, 0.33210399191215556, 0.29229958639810133, 0.2763400517547125, 0.2668392840264362, 0.2601244734641173, 0.25295248749683713, 0.2506322455641146, 0.24883007039356209, 0.24755509067246323]


In [18]:
# test du modèle
predictions =lrModel.transform(test_data)
x = ((predictions['price'] - predictions['prediction'])/predictions['price'])*100
predictions = predictions.withColumn('Accuracy', x)
predictions.select("Prediction", "price", "Accuracy", "features").show()

+------------------+---------+--------------------+--------------------+
|        Prediction|    price|            Accuracy|            features|
+------------------+---------+--------------------+--------------------+
|180438.68843722742| 142000.0|  -27.06949889945593|[0.0,0.0,290.0,20...|
| 557422.2025759039| 380000.0|  -46.69005330944838|[0.0,0.0,1470.0,9...|
| 862508.3390181202| 355000.0| -142.96009549806203|[0.0,0.0,2460.0,8...|
|1585205.8804395886|1295650.0| -22.348310148542318|[0.0,0.0,4810.0,2...|
| 219204.2097058549|  75000.0| -192.27227960780655|[1.0,0.0,670.0,43...|
|234921.36246798674| 484000.0|   51.46252841570522|[1.0,0.0,690.0,23...|
|187116.38628554292| 145000.0|  -29.04578364520201|[1.0,0.75,480.0,9...|
|202812.42694310925| 310000.0|  34.576636469964754|[1.0,0.75,520.0,2...|
| 223316.8777861139| 202000.0| -10.552909795105895|[1.0,0.75,590.0,5...|
|248604.47067526786| 190000.0| -30.844458250140978|[1.0,0.75,780.0,7...|
|  328790.393751844| 351000.0|   6.327523147622791|

In [19]:
# La valeur du R carré pour la base de test
predictions_eveluator = RegressionEvaluator(predictionCol="prediction", labelCol="price", metricName="r2")
print("R carré (R2) sur les données de test: ", predictions_eveluator.evaluate(predictions))

R carré (R2) sur les données de test:  0.5161835960714858


In [20]:
spark.stop()