# Chargement et analyse de prix de maisons sur un cluster Spark

## Preparation de l'environnement Spark

In [1]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
import findspark
findspark.init() 

In [3]:
import pyspark

number_cores = 8
memory_gb = 24
conf = (
    pyspark.SparkConf()
        .setMaster('local[{}]'.format(number_cores))
        .set('spark.driver.memory', '{}g'.format(memory_gb))
)
sc = pyspark.SparkContext(conf=conf)

In [4]:
from pyspark.sql import *
spark = SparkSession.builder.master("local[1]").appName("SparkByExamples.com").getOrCreate()

In [5]:
df = spark.read.csv("houses.csv")

## Renommage des colonnes

In [6]:
df = df.selectExpr("_c0 as size", "_c1 as nb_rooms","_c2 as garden","_c3 as orientation","_c4 as price")


## Conversions de  types

In [7]:
df = df.withColumn("nb_rooms", df["nb_rooms"].cast("int"))
df = df.withColumn("price", df["price"].cast("double"))
df = df.withColumn("garden", df["garden"].cast("boolean"))
df = df.withColumn("size", df["size"].cast("double"))

## Affichages

In [8]:
df.createOrReplaceTempView("houses")


### Prix moyen par nombre de chambres

In [9]:
display(spark.sql("SELECT nb_rooms,avg(price) FROM houses group by nb_rooms "))

DataFrame[nb_rooms: int, avg(price): double]

### Prix moyen par orientation(Nord, Sud, Est,Ouest)

In [10]:
display(spark.sql("SELECT orientation,avg(price) FROM houses group by orientation "))

DataFrame[orientation: string, avg(price): double]

### Prix moyen selon la présence d'un jardin

In [11]:
display(spark.sql("SELECT garden,avg(price) FROM houses group by garden "))

DataFrame[garden: boolean, avg(price): double]

 ### Corrélation entre prix et taille des maisons

In [12]:
display(spark.sql("SELECT size,price FROM houses order by price "))

DataFrame[size: double, price: double]

### Filtre des maisons de plus de 2 chambres

In [13]:
more_2_rooms=df.filter(df.nb_rooms>= 2)

In [14]:
more_2_rooms.describe().show()


+-------+--------------------+------------------+-----------+------------------+
|summary|                size|          nb_rooms|orientation|             price|
+-------+--------------------+------------------+-----------+------------------+
|  count|            66656777|          66656777|   66656777|          66656777|
|   mean|  150.03015945270818|2.5000093238831513|       null|277154.00704793504|
| stddev|  49.881609517985765|0.5000000036636225|       null| 35747.05594084833|
|    min|4.878437778188527E-5|                 2|        Est| 82.00769678005963|
|    max|  423.18622475256666|                 3|        Sud|380365.10005544615|
+-------+--------------------+------------------+-----------+------------------+



## Pipeline for Machine learning

In [15]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [16]:
assembler = VectorAssembler(
    inputCols=["size"],
    outputCol="size_vector").setHandleInvalid("skip")

In [17]:
from pyspark.ml.feature import Normalizer

In [18]:
normalizer=Normalizer(inputCol="size_vector",outputCol="size_vector_normalized",p=1.0)

In [27]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="orientation", outputCol="orientation_index")

In [19]:
from pyspark.ml.feature import OneHotEncoder

In [20]:
encoder = OneHotEncoder(inputCols=["orientation_index"],
                        outputCols=["orientation_encoding"])

In [21]:
feature_assembler = VectorAssembler(
    inputCols=["size_vector_normalized","nb_rooms","garden","orientation_encoding"],
    outputCol="features")

In [22]:
from pyspark.ml.regression import LinearRegression

In [23]:
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr.setLabelCol("price")
lr.setFeaturesCol("features")

LinearRegression_84479b530832

In [28]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[assembler, normalizer,indexer,encoder,feature_assembler,lr])

### Création du modèle

In [29]:
lrModel = pipeline.fit(df)

In [31]:
test=spark.read.csv("houses2.csv")

AnalysisException: Path does not exist: file:/home/ngoupandecedric/houses2.csv;

### Affichage des predictions

In [32]:
#predictions=lrModel.transform(test)
predictions=spark.read.csv("predictions.csv")

In [33]:
predictions.head(20)

[Row(_c0=None, _c1='size', _c2='nb_rooms', _c3='garden', _c4='orientation', _c5='price', _c6='orientation_index', _c7='size_vector', _c8='size_vector_normalized', _c9='orientation_encoding', _c10='features'),
 Row(_c0='0', _c1='154.83230012308735', _c2='1', _c3='False', _c4='Nord', _c5='207195.11934156134', _c6='3.0', _c7='[154.83230012308735]', _c8='[1.0]', _c9='(4,[3],[1.0])', _c10='(7,[0,1,6],[1.0,1.0,1.0])'),
 Row(_c0='1', _c1='92.77258540453674', _c2='2', _c3='True', _c4='Nord', _c5='254339.26460680345', _c6='3.0', _c7='[92.77258540453674]', _c8='[1.0]', _c9='(4,[3],[1.0])', _c10='[1.0,2.0,1.0,0.0,0.0,0.0,1.0]'),
 Row(_c0='2', _c1='157.49511991833157', _c2='1', _c3='False', _c4='Est', _c5='224957.5711658047', _c6='0.0', _c7='[157.49511991833157]', _c8='[1.0]', _c9='(4,[0],[1.0])', _c10='(7,[0,1,3],[1.0,1.0,1.0])'),
 Row(_c0='3', _c1='202.89541456241577', _c2='3', _c3='True', _c4='Sud', _c5='349292.323490081', _c6='2.0', _c7='[202.89541456241577]', _c8='[1.0]', _c9='(4,[2],[1.0])',