# Spark Machine Learning - Regression: HOUSE PRICE

In diesem Projekt wird Spark ML für eine Regressionsanalyse zur Vorhersage von Immobilienpreisen eingesetzt.

## Read Data 

Überprüfung des Vorhandenseins unseres Datensatzes auf HDFS:

In [1]:
! hdfs dfs -ls /user/train/datasets

Found 5 items
-rw-r--r--   1 train supergroup       4556 2020-09-23 20:56 /user/train/datasets/Advertising.csv
drwxr-xr-x   - train supergroup          0 2020-11-19 21:02 /user/train/datasets/churn-telecom
-rw-r--r--   1 train supergroup   19589922 2022-06-07 20:09 /user/train/datasets/flo100k.csv
drwxr-xr-x   - train supergroup          0 2022-06-12 20:10 /user/train/datasets/house-prices
drwxr-xr-x   - train supergroup          0 2020-11-21 11:16 /user/train/datasets/retail_db


Hochladen des Datensatzes auf HDFS, falls nicht vorhanden:

In [2]:
! hdfs dfs -put /home/train/datasets/house-prices/ /user/train/datasets

put: `/user/train/datasets/house-prices/data_description.txt': File exists
put: `/user/train/datasets/house-prices/sample_submission.csv': File exists
put: `/user/train/datasets/house-prices/test.csv': File exists
put: `/user/train/datasets/house-prices/train.csv': File exists


Bibliotheken importieren und Spark aufbauen:

In [3]:
import findspark

In [4]:
findspark.init("/opt/manual/spark")

# Dieser Code initialisiert die `findspark`-Bibliothek und legt den Pfad zur Spark-Installation fest, damit Spark 
# von Python aus gefunden und verwendet werden kann.

In [5]:
from pyspark.sql import SparkSession, functions as F
import pandas as pd

Erstellen und Konfigurieren einer SparkSession:

In [6]:
spark= (
SparkSession.builder
    .appName("House Price Regression")
    .master("yarn")
    .config("spark.sql.shuffle.partitions","2")
    .getOrCreate()
)

2022-06-12 22:40:49,050 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2022-06-12 22:40:50,627 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


Einlesen der CSV-Datei in ein DataFrame:

In [7]:
df = (spark.read.format("csv")
      .option("header",True)
      .option("inferSchema",True)
      .load("/user/train/datasets/house-prices/train.csv")
     )

                                                                                

## Data Explore

DataFrame im Speicher halten:

In [8]:
df.persist()

2022-06-12 22:41:15,989 WARN util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


DataFrame[Id: int, MSSubClass: int, MSZoning: string, LotFrontage: string, LotArea: int, Street: string, Alley: string, LotShape: string, LandContour: string, Utilities: string, LotConfig: string, LandSlope: string, Neighborhood: string, Condition1: string, Condition2: string, BldgType: string, HouseStyle: string, OverallQual: int, OverallCond: int, YearBuilt: int, YearRemodAdd: int, RoofStyle: string, RoofMatl: string, Exterior1st: string, Exterior2nd: string, MasVnrType: string, MasVnrArea: string, ExterQual: string, ExterCond: string, Foundation: string, BsmtQual: string, BsmtCond: string, BsmtExposure: string, BsmtFinType1: string, BsmtFinSF1: int, BsmtFinType2: string, BsmtFinSF2: int, BsmtUnfSF: int, TotalBsmtSF: int, Heating: string, HeatingQC: string, CentralAir: string, Electrical: string, 1stFlrSF: int, 2ndFlrSF: int, LowQualFinSF: int, GrLivArea: int, BsmtFullBath: int, BsmtHalfBath: int, FullBath: int, HalfBath: int, BedroomAbvGr: int, KitchenAbvGr: int, KitchenQual: string

## Schemaüberprüfung

In [14]:
df_count = df.count()

In [15]:
print(df_count)

1460


In [16]:
print(len(df.columns))

81


In [17]:
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

In [18]:
df.limit(5).toPandas()

Unnamed: 0,Id,MSSubClass,MSZoning,LotFrontage,LotArea,Street,Alley,LotShape,LandContour,Utilities,LotConfig,LandSlope,Neighborhood,Condition1,Condition2,BldgType,HouseStyle,OverallQual,OverallCond,YearBuilt,YearRemodAdd,RoofStyle,RoofMatl,Exterior1st,Exterior2nd,MasVnrType,MasVnrArea,ExterQual,ExterCond,Foundation,BsmtQual,BsmtCond,BsmtExposure,BsmtFinType1,BsmtFinSF1,BsmtFinType2,BsmtFinSF2,BsmtUnfSF,TotalBsmtSF,Heating,HeatingQC,CentralAir,Electrical,1stFlrSF,2ndFlrSF,LowQualFinSF,GrLivArea,BsmtFullBath,BsmtHalfBath,FullBath,HalfBath,BedroomAbvGr,KitchenAbvGr,KitchenQual,TotRmsAbvGrd,Functional,Fireplaces,FireplaceQu,GarageType,GarageYrBlt,GarageFinish,GarageCars,GarageArea,GarageQual,GarageCond,PavedDrive,WoodDeckSF,OpenPorchSF,EnclosedPorch,3SsnPorch,ScreenPorch,PoolArea,PoolQC,Fence,MiscFeature,MiscVal,MoSold,YrSold,SaleType,SaleCondition,SalePrice
0,1,60,RL,65,8450,Pave,,Reg,Lvl,AllPub,Inside,Gtl,CollgCr,Norm,Norm,1Fam,2Story,7,5,2003,2003,Gable,CompShg,VinylSd,VinylSd,BrkFace,196,Gd,TA,PConc,Gd,TA,No,GLQ,706,Unf,0,150,856,GasA,Ex,Y,SBrkr,856,854,0,1710,1,0,2,1,3,1,Gd,8,Typ,0,,Attchd,2003,RFn,2,548,TA,TA,Y,0,61,0,0,0,0,,,,0,2,2008,WD,Normal,208500
1,2,20,RL,80,9600,Pave,,Reg,Lvl,AllPub,FR2,Gtl,Veenker,Feedr,Norm,1Fam,1Story,6,8,1976,1976,Gable,CompShg,MetalSd,MetalSd,,0,TA,TA,CBlock,Gd,TA,Gd,ALQ,978,Unf,0,284,1262,GasA,Ex,Y,SBrkr,1262,0,0,1262,0,1,2,0,3,1,TA,6,Typ,1,TA,Attchd,1976,RFn,2,460,TA,TA,Y,298,0,0,0,0,0,,,,0,5,2007,WD,Normal,181500
2,3,60,RL,68,11250,Pave,,IR1,Lvl,AllPub,Inside,Gtl,CollgCr,Norm,Norm,1Fam,2Story,7,5,2001,2002,Gable,CompShg,VinylSd,VinylSd,BrkFace,162,Gd,TA,PConc,Gd,TA,Mn,GLQ,486,Unf,0,434,920,GasA,Ex,Y,SBrkr,920,866,0,1786,1,0,2,1,3,1,Gd,6,Typ,1,TA,Attchd,2001,RFn,2,608,TA,TA,Y,0,42,0,0,0,0,,,,0,9,2008,WD,Normal,223500
3,4,70,RL,60,9550,Pave,,IR1,Lvl,AllPub,Corner,Gtl,Crawfor,Norm,Norm,1Fam,2Story,7,5,1915,1970,Gable,CompShg,Wd Sdng,Wd Shng,,0,TA,TA,BrkTil,TA,Gd,No,ALQ,216,Unf,0,540,756,GasA,Gd,Y,SBrkr,961,756,0,1717,1,0,1,0,3,1,Gd,7,Typ,1,Gd,Detchd,1998,Unf,3,642,TA,TA,Y,0,35,272,0,0,0,,,,0,2,2006,WD,Abnorml,140000
4,5,60,RL,84,14260,Pave,,IR1,Lvl,AllPub,FR2,Gtl,NoRidge,Norm,Norm,1Fam,2Story,8,5,2000,2000,Gable,CompShg,VinylSd,VinylSd,BrkFace,350,Gd,TA,PConc,Gd,TA,Av,GLQ,655,Unf,0,490,1145,GasA,Ex,Y,SBrkr,1145,1053,0,2198,1,0,2,1,4,1,Gd,9,Typ,1,TA,Attchd,2000,RFn,3,836,TA,TA,Y,192,84,0,0,0,0,,,,0,12,2008,WD,Normal,250000


In [19]:
df.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- MSSubClass: integer (nullable = true)
 |-- MSZoning: string (nullable = true)
 |-- LotFrontage: string (nullable = true)
 |-- LotArea: integer (nullable = true)
 |-- Street: string (nullable = true)
 |-- Alley: string (nullable = true)
 |-- LotShape: string (nullable = true)
 |-- LandContour: string (nullable = true)
 |-- Utilities: string (nullable = true)
 |-- LotConfig: string (nullable = true)
 |-- LandSlope: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Condition1: string (nullable = true)
 |-- Condition2: string (nullable = true)
 |-- BldgType: string (nullable = true)
 |-- HouseStyle: string (nullable = true)
 |-- OverallQual: integer (nullable = true)
 |-- OverallCond: integer (nullable = true)
 |-- YearBuilt: integer (nullable = true)
 |-- YearRemodAdd: integer (nullable = true)
 |-- RoofStyle: string (nullable = true)
 |-- RoofMatl: string (nullable = true)
 |-- Exterior1st: string (nullable = true)
 |--

Alternativer Weg zur Anzeige des Schemas:

In [20]:
df.dtypes

[('Id', 'int'),
 ('MSSubClass', 'int'),
 ('MSZoning', 'string'),
 ('LotFrontage', 'string'),
 ('LotArea', 'int'),
 ('Street', 'string'),
 ('Alley', 'string'),
 ('LotShape', 'string'),
 ('LandContour', 'string'),
 ('Utilities', 'string'),
 ('LotConfig', 'string'),
 ('LandSlope', 'string'),
 ('Neighborhood', 'string'),
 ('Condition1', 'string'),
 ('Condition2', 'string'),
 ('BldgType', 'string'),
 ('HouseStyle', 'string'),
 ('OverallQual', 'int'),
 ('OverallCond', 'int'),
 ('YearBuilt', 'int'),
 ('YearRemodAdd', 'int'),
 ('RoofStyle', 'string'),
 ('RoofMatl', 'string'),
 ('Exterior1st', 'string'),
 ('Exterior2nd', 'string'),
 ('MasVnrType', 'string'),
 ('MasVnrArea', 'string'),
 ('ExterQual', 'string'),
 ('ExterCond', 'string'),
 ('Foundation', 'string'),
 ('BsmtQual', 'string'),
 ('BsmtCond', 'string'),
 ('BsmtExposure', 'string'),
 ('BsmtFinType1', 'string'),
 ('BsmtFinSF1', 'int'),
 ('BsmtFinType2', 'string'),
 ('BsmtFinSF2', 'int'),
 ('BsmtUnfSF', 'int'),
 ('TotalBsmtSF', 'int'),
 ('

'LotFrontage', 'string') --> int 

('MasVnrArea', 'string') --> int

('GarageYrBlt', 'string') --> int


Diese Spalten sollten Ganzzahlen enthalten, wurden jedoch von Spark aufgrund von 'NA'-Werten als Strings interpretiert.

Der folgende Code ermöglicht es, zu überprüfen, ob diese Spalten NA-Werte enthalten:

## Schemaanpassungen

Filterung der Spalte MasVnrArea zur Anzeige fehlender Werte:

In [21]:
df.select('MasVnrArea').filter("MasVnrArea == 'NA'").show()

+----------+
|MasVnrArea|
+----------+
|        NA|
|        NA|
|        NA|
|        NA|
|        NA|
|        NA|
|        NA|
|        NA|
+----------+



In [22]:
df.select('GarageYrBlt').filter("GarageYrBlt == 'NA'").show()

+-----------+
|GarageYrBlt|
+-----------+
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
+-----------+
only showing top 20 rows



## Cast mis-iferred dtypes

In [23]:
df.selectExpr("AVG(LotFrontage)").show()

+--------------------------------+
|avg(CAST(LotFrontage AS DOUBLE))|
+--------------------------------+
|               70.04995836802665|
+--------------------------------+



In [24]:
df.selectExpr("AVG(MasVnrArea)").show()

+-------------------------------+
|avg(CAST(MasVnrArea AS DOUBLE))|
+-------------------------------+
|             103.68526170798899|
+-------------------------------+



In [25]:
df.selectExpr("AVG(GarageYrBlt)").show()

+--------------------------------+
|avg(CAST(GarageYrBlt AS DOUBLE))|
+--------------------------------+
|              1978.5061638868744|
+--------------------------------+



Fehlende Werte durch Standardwerte ersetzen und Spalten in Ganzzahlen konvertieren:

In [26]:
df1 = df.withColumn("LotFrontage", F.when(F.col("LotFrontage") == 'NA', 70).otherwise(F.col("LotFrontage"))) \
         .withColumn("LotFrontage", F.col("LotFrontage").cast("int")) \
        .withColumn("MasVnrArea", F.when(F.col("MasVnrArea") == 'NA', 104).otherwise(F.col("MasVnrArea"))) \
         .withColumn("MasVnrArea", F.col("MasVnrArea").cast("int")) \
        .withColumn("GarageYrBlt", F.when(F.col("GarageYrBlt") == 'NA', 1978).otherwise(F.col("GarageYrBlt"))) \
         .withColumn("GarageYrBlt", F.col("GarageYrBlt").cast("int"))

In [27]:
df.select("LotFrontage", "MasVnrArea", "GarageYrBlt").printSchema()


root
 |-- LotFrontage: string (nullable = true)
 |-- MasVnrArea: string (nullable = true)
 |-- GarageYrBlt: string (nullable = true)



In [28]:
df1.select("LotFrontage", "MasVnrArea", "GarageYrBlt").printSchema()

root
 |-- LotFrontage: integer (nullable = true)
 |-- MasVnrArea: integer (nullable = true)
 |-- GarageYrBlt: integer (nullable = true)



Die Anpassungen sind abgeschlossen, die Umwandlung der Spalten in Ganzzahlen wurde erfolgreich durchgeführt und die Schemafehler wurden behoben.

## Behandlung fehlender Werte

### Null Check

Manuelle Abfrage:

In [29]:
df1.select("FireplaceQu").filter(F.col("FireplaceQu") == "NA").show()

+-----------+
|FireplaceQu|
+-----------+
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
|         NA|
+-----------+
only showing top 20 rows



Funktion zur Zählung fehlender Werte:

In [30]:
def null_count(df, col_name):
    nc = df1.select(col_name).filter( 
        (F.col(col_name) == "NA") |
        (F.col(col_name) == "") |
        (F.col(col_name).isNull())
                                 ).count()
    return nc

In [31]:
nnn_cc = null_count(df1, "FireplaceQu")
print(nnn_cc)

690


In [32]:
nnn_cc = null_count(df1, "LotFrontage")
print(nnn_cc)

0


Batch-Überprüfung aller Spalten:

In [33]:
for col_name in df1.dtypes:
    nc = null_count(df1, col_name[0])

    if nc > 0:
        print("{} has {}.".format(col_name[0], nc))

Alley has 1369.
MasVnrType has 8.
BsmtQual has 37.
BsmtCond has 37.
BsmtExposure has 38.
BsmtFinType1 has 37.
BsmtFinType2 has 38.
Electrical has 1.
FireplaceQu has 690.
GarageType has 81.
GarageFinish has 81.
GarageQual has 81.
GarageCond has 81.
PoolQC has 1453.
Fence has 1179.
MiscFeature has 1406.


In [34]:
for col_name in df1.dtypes:
    nc = null_count(df1, col_name[0])

    if nc > 0:
        print("{} has {} % {}".format(col_name[0], nc, (nc/df_count)*100))


Alley has 1369 % 93.76712328767123
MasVnrType has 8 % 0.547945205479452
BsmtQual has 37 % 2.5342465753424657
BsmtCond has 37 % 2.5342465753424657
BsmtExposure has 38 % 2.6027397260273974
BsmtFinType1 has 37 % 2.5342465753424657
BsmtFinType2 has 38 % 2.6027397260273974
Electrical has 1 % 0.0684931506849315
FireplaceQu has 690 % 47.26027397260274
GarageType has 81 % 5.5479452054794525
GarageFinish has 81 % 5.5479452054794525
GarageQual has 81 % 5.5479452054794525
GarageCond has 81 % 5.5479452054794525
PoolQC has 1453 % 99.52054794520548
Fence has 1179 % 80.75342465753424
MiscFeature has 1406 % 96.30136986301369


## Gruppierung von Attributen nach Datentypen

### Group Columns

In [35]:
categoric_cols = []

numeric_cols = []

label_col = ['SalePrice']

discarted_cols = ['Id', 'Alley', 'FireplaceQu', 'PoolQC', 'Fence', 'MiscFeature', 'Utilities', 'Condition2', 'RoofMatl', 'Heating']

# Alley, FireplaceQu, PoolQC, Fence, MiscFeature enthalten zu viele fehlende Werte.
# Id hat zu viele Kategorien.
# Utilities, Condition2, RoofMatl, Heating enthalten schwache Kategorien und binäre Werte und sollten daher verworfen werden.


In [36]:
for col_name in df1.dtypes:
    if (col_name[0] not in label_col + discarted_cols):
        if col_name[1] == 'string':
            categoric_cols.append(col_name[0])
        else:
            numeric_cols.append(col_name[0])


In [37]:
print(categoric_cols)
print(len(categoric_cols))

['MSZoning', 'Street', 'LotShape', 'LandContour', 'LotConfig', 'LandSlope', 'Neighborhood', 'Condition1', 'BldgType', 'HouseStyle', 'RoofStyle', 'Exterior1st', 'Exterior2nd', 'MasVnrType', 'ExterQual', 'ExterCond', 'Foundation', 'BsmtQual', 'BsmtCond', 'BsmtExposure', 'BsmtFinType1', 'BsmtFinType2', 'HeatingQC', 'CentralAir', 'Electrical', 'KitchenQual', 'Functional', 'GarageType', 'GarageFinish', 'GarageQual', 'GarageCond', 'PavedDrive', 'SaleType', 'SaleCondition']
34


In [38]:
print(numeric_cols)
print(len(numeric_cols))

['MSSubClass', 'LotFrontage', 'LotArea', 'OverallQual', 'OverallCond', 'YearBuilt', 'YearRemodAdd', 'MasVnrArea', 'BsmtFinSF1', 'BsmtFinSF2', 'BsmtUnfSF', 'TotalBsmtSF', '1stFlrSF', '2ndFlrSF', 'LowQualFinSF', 'GrLivArea', 'BsmtFullBath', 'BsmtHalfBath', 'FullBath', 'HalfBath', 'BedroomAbvGr', 'KitchenAbvGr', 'TotRmsAbvGrd', 'Fireplaces', 'GarageYrBlt', 'GarageCars', 'GarageArea', 'WoodDeckSF', 'OpenPorchSF', 'EnclosedPorch', '3SsnPorch', 'ScreenPorch', 'PoolArea', 'MiscVal', 'MoSold', 'YrSold']
36


In [39]:
print(discarted_cols)
print(len(discarted_cols))

['Id', 'Alley', 'FireplaceQu', 'PoolQC', 'Fence', 'MiscFeature', 'Utilities', 'Condition2', 'RoofMatl', 'Heating']
10


In [40]:
print(label_col)
print(len(label_col))

['SalePrice']
1


### Column Check

In [41]:
len(df1.columns) == (len(label_col) + len(discarted_cols) + len(numeric_cols) + len(categoric_cols))

True

In [42]:
if(len(df1.columns) == len(label_col) + len(discarted_cols) + len(numeric_cols) + len(categoric_cols)):
    print("column check is True")
else: print("there is problem for column check")
 

column check is True


## Untersuchung kategorialer Attribute

### Examine categoricals

In [43]:
for cat_col in categoric_cols:
    print(cat_col)
    df1.groupBy(cat_col).count().orderBy(F.desc("count")).show()


MSZoning
+--------+-----+
|MSZoning|count|
+--------+-----+
|      RL| 1151|
|      RM|  218|
|      FV|   65|
|      RH|   16|
| C (all)|   10|
+--------+-----+

Street
+------+-----+
|Street|count|
+------+-----+
|  Pave| 1454|
|  Grvl|    6|
+------+-----+

LotShape
+--------+-----+
|LotShape|count|
+--------+-----+
|     Reg|  925|
|     IR1|  484|
|     IR2|   41|
|     IR3|   10|
+--------+-----+

LandContour
+-----------+-----+
|LandContour|count|
+-----------+-----+
|        Lvl| 1311|
|        Bnk|   63|
|        HLS|   50|
|        Low|   36|
+-----------+-----+

LotConfig
+---------+-----+
|LotConfig|count|
+---------+-----+
|   Inside| 1052|
|   Corner|  263|
|  CulDSac|   94|
|      FR2|   47|
|      FR3|    4|
+---------+-----+

LandSlope
+---------+-----+
|LandSlope|count|
+---------+-----+
|      Gtl| 1382|
|      Mod|   65|
|      Sev|   13|
+---------+-----+

Neighborhood
+------------+-----+
|Neighborhood|count|
+------------+-----+
|       NAmes|  225|
|     CollgCr

In [44]:
binary_cat_cols = ['Street', 'CentralAir']

## Verwendung von String Indexer und OneHotEncoder zur Verarbeitung kategorialer Attribute

### StringIndexer

In [45]:
from pyspark.ml.feature import StringIndexer

In [46]:
# Es wird ein Dictionary erstellt, um `StringIndexer`-Objekte für jede kategoriale Spalte zu speichern und zu konfigurieren. 
# Diese Objekte werden gesammelt, und die entsprechenden Ausgabespaltennamen werden festgelegt. Für nicht-binäre kategoriale 
# Spalten werden zusätzlich die Namen für die Eingabe- und Ausgabespalten von `OneHotEncoder` vorbereitet.




my_dict = {}

string_indexer_objs = []

string_indexer_output_names = []

ohe_input_names = []

ohe_output_names = []

for col_name in categoric_cols:
    my_dict[col_name+"_index_obj"] = StringIndexer() \
    .setHandleInvalid("skip") \
    .setInputCol(col_name) \
    .setOutputCol(col_name+"_indexed")


    string_indexer_objs.append(my_dict.get(col_name+"_index_obj"))
    string_indexer_output_names.append(col_name+"_indexed")

    if col_name not in binary_cat_cols:
        ohe_input_names.append(col_name+"_indexed")
        ohe_output_names.append(col_name+"_ohe")


In [47]:
print(string_indexer_objs)
print(len(string_indexer_objs))

[StringIndexer_0fcc050c2601, StringIndexer_48a2dd793dd2, StringIndexer_71abfda13e8e, StringIndexer_572296267594, StringIndexer_f41d4dd82039, StringIndexer_b1508ed0b0b0, StringIndexer_ee9a25d70d15, StringIndexer_c394744e0780, StringIndexer_6eddde81e07c, StringIndexer_3f022c532919, StringIndexer_6fda725b30d4, StringIndexer_e4a0a0c0c46b, StringIndexer_3a33da3aa41f, StringIndexer_459203042149, StringIndexer_68415a12e9d2, StringIndexer_35a148433f7a, StringIndexer_e43927b34b7e, StringIndexer_f0c87e3c0ffe, StringIndexer_ec865f938690, StringIndexer_345651355bf3, StringIndexer_26cd4f0bd910, StringIndexer_6db088f029b3, StringIndexer_8a4b0fadbc08, StringIndexer_2442453c045b, StringIndexer_0e9557d768c9, StringIndexer_f572158a090e, StringIndexer_554133e9c131, StringIndexer_bde2a211ca4f, StringIndexer_21329ad7935d, StringIndexer_ba2563965118, StringIndexer_c096f5cd066a, StringIndexer_c4075795db5f, StringIndexer_f165284ee71c, StringIndexer_ef2311557641]
34


In [48]:
print(string_indexer_output_names)
print(len(string_indexer_output_names))

['MSZoning_indexed', 'Street_indexed', 'LotShape_indexed', 'LandContour_indexed', 'LotConfig_indexed', 'LandSlope_indexed', 'Neighborhood_indexed', 'Condition1_indexed', 'BldgType_indexed', 'HouseStyle_indexed', 'RoofStyle_indexed', 'Exterior1st_indexed', 'Exterior2nd_indexed', 'MasVnrType_indexed', 'ExterQual_indexed', 'ExterCond_indexed', 'Foundation_indexed', 'BsmtQual_indexed', 'BsmtCond_indexed', 'BsmtExposure_indexed', 'BsmtFinType1_indexed', 'BsmtFinType2_indexed', 'HeatingQC_indexed', 'CentralAir_indexed', 'Electrical_indexed', 'KitchenQual_indexed', 'Functional_indexed', 'GarageType_indexed', 'GarageFinish_indexed', 'GarageQual_indexed', 'GarageCond_indexed', 'PavedDrive_indexed', 'SaleType_indexed', 'SaleCondition_indexed']
34


In [49]:
print(ohe_input_names)
print(len(ohe_input_names))

['MSZoning_indexed', 'LotShape_indexed', 'LandContour_indexed', 'LotConfig_indexed', 'LandSlope_indexed', 'Neighborhood_indexed', 'Condition1_indexed', 'BldgType_indexed', 'HouseStyle_indexed', 'RoofStyle_indexed', 'Exterior1st_indexed', 'Exterior2nd_indexed', 'MasVnrType_indexed', 'ExterQual_indexed', 'ExterCond_indexed', 'Foundation_indexed', 'BsmtQual_indexed', 'BsmtCond_indexed', 'BsmtExposure_indexed', 'BsmtFinType1_indexed', 'BsmtFinType2_indexed', 'HeatingQC_indexed', 'Electrical_indexed', 'KitchenQual_indexed', 'Functional_indexed', 'GarageType_indexed', 'GarageFinish_indexed', 'GarageQual_indexed', 'GarageCond_indexed', 'PavedDrive_indexed', 'SaleType_indexed', 'SaleCondition_indexed']
32


In [50]:
print(ohe_output_names)
print(len(ohe_output_names))

['MSZoning_ohe', 'LotShape_ohe', 'LandContour_ohe', 'LotConfig_ohe', 'LandSlope_ohe', 'Neighborhood_ohe', 'Condition1_ohe', 'BldgType_ohe', 'HouseStyle_ohe', 'RoofStyle_ohe', 'Exterior1st_ohe', 'Exterior2nd_ohe', 'MasVnrType_ohe', 'ExterQual_ohe', 'ExterCond_ohe', 'Foundation_ohe', 'BsmtQual_ohe', 'BsmtCond_ohe', 'BsmtExposure_ohe', 'BsmtFinType1_ohe', 'BsmtFinType2_ohe', 'HeatingQC_ohe', 'Electrical_ohe', 'KitchenQual_ohe', 'Functional_ohe', 'GarageType_ohe', 'GarageFinish_ohe', 'GarageQual_ohe', 'GarageCond_ohe', 'PavedDrive_ohe', 'SaleType_ohe', 'SaleCondition_ohe']
32


## Zusammenführung der Eingabemerkmale mit VectorAssembler, Scaler und Estimator

### OneHotEncoder

In [51]:
from pyspark.ml.feature import OneHotEncoder

Konfiguration des OneHotEncoders für Eingabe- und Ausgabespalten:

In [52]:
encoder = OneHotEncoder() \
.setInputCols(ohe_input_names) \
.setOutputCols(ohe_output_names)

In [53]:
# Es gibt zwei Spalten, die mit StringIndexer verarbeitet, aber nicht mit OneHotEncoder kodiert wurden. Diese Spalten müssen 
# zuerst erfasst werden:

not_to_hot_coded = list(set(string_indexer_output_names ).difference(set(ohe_input_names)))
print(not_to_hot_coded)

['Street_indexed', 'CentralAir_indexed']


### VectorAssembler

In [54]:
from pyspark.ml.feature import VectorAssembler

Konfiguration des VectorAssemblers zur Zusammenführung von Merkmalen:

In [55]:
assembler = VectorAssembler() \
.setHandleInvalid("skip") \
.setInputCols(numeric_cols+ not_to_hot_coded+ ohe_output_names) \
.setOutputCol("unscaled_features")

### Feature Scaling

In [56]:
from pyspark.ml.feature import StandardScaler

Konfiguration des StandardScalers zur Skalierung der Merkmale:

In [57]:
scaler = StandardScaler() \
.setInputCol("unscaled_features") \
.setOutputCol("features")

### Estimator

In [58]:
from pyspark.ml.regression import GBTRegressor


# Konfiguration des GBTRegressor mit der Zielspalte:
estimator = GBTRegressor(labelCol = label_col[0])

## Erstellung der ML-Pipeline, Modelltraining, -test und -bewertung



### Pipeline

In [59]:
from pyspark.ml import Pipeline

Erstellung der Pipeline mit den definierten Stufen:

In [60]:
pipeline_obj = Pipeline().setStages(string_indexer_objs + [encoder, assembler, scaler, estimator])

### Split Dataset

In [61]:
train_df, test_df = df1.randomSplit([.8, .2], seed =142)

In [62]:
train_df.count()

1163

In [63]:
test_df.count() 

297

### Train Model

In [64]:
pipeline_model = pipeline_obj.fit(train_df)

                                                                                

### Prediction

In [65]:
transformed_df = pipeline_model.transform(test_df)

transformed_df.select("SalePrice", "prediction").show()

+---------+------------------+
|SalePrice|        prediction|
+---------+------------------+
|   140000|166418.00194302486|
|   129900| 186938.8536276622|
|   129500| 145038.8401122167|
|   157000|150866.06806781224|
|   154000| 156686.9315937667|
|   207500|189485.31450506533|
|   165500|158250.67025043225|
|   160000|  152051.549367772|
|   239686| 339009.4308244648|
|   127000|120698.47175147035|
|   110000| 94470.79060351106|
|   385000|  794776.825576351|
|   101000| 83558.52498552404|
|   245000| 231726.6914173506|
|   260000| 284801.2000961671|
|   204750|195432.63382324998|
|   214000|230876.43925416976|
|   198900|194057.01214842233|
|   169500| 177679.2237630042|
|   180000|150103.03444576386|
+---------+------------------+
only showing top 20 rows



### Evaluate Model

In [66]:
from pyspark.ml.evaluation import RegressionEvaluator

Konfiguration des RegressionEvaluators zur Bewertung des Modells und Berechnung des R²-Scores:

In [67]:
evaluator = RegressionEvaluator(labelCol=label_col[0], metricName='r2')

evaluator.evaluate(transformed_df)

0.7133529989253724