In [108]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.sql.functions import mean
from pyspark.sql.types import IntegerType, FloatType, StringType, StructType, StructField

import pandas as pd

In [35]:
spark = (
    SparkSession
    .builder
    .appName("PySpark Lection 2")
    .master("local")
    .getOrCreate()
)

In [75]:
train_df = pd.read_csv('data/train.csv', usecols=['MSSubClass', 'MSZoning', 'LotFrontage', 'LotArea',
                                                  'Street', '1stFlrSF', '2ndFlrSF', 'OverallQual', 'SalePrice'])
test_df = pd.read_csv('data/test.csv', usecols=['MSSubClass', 'MSZoning', 'LotFrontage', 'LotArea',
                                                  'Street', '1stFlrSF', '2ndFlrSF', 'OverallQual'])

In [76]:
train_df

Unnamed: 0,MSSubClass,MSZoning,LotFrontage,LotArea,Street,OverallQual,1stFlrSF,2ndFlrSF,SalePrice
0,60,RL,65.0,8450,Pave,7,856,854,208500
1,20,RL,80.0,9600,Pave,6,1262,0,181500
2,60,RL,68.0,11250,Pave,7,920,866,223500
3,70,RL,60.0,9550,Pave,7,961,756,140000
4,60,RL,84.0,14260,Pave,8,1145,1053,250000
...,...,...,...,...,...,...,...,...,...
1455,60,RL,62.0,7917,Pave,6,953,694,175000
1456,20,RL,85.0,13175,Pave,6,2073,0,210000
1457,70,RL,66.0,9042,Pave,7,1188,1152,266500
1458,20,RL,68.0,9717,Pave,5,1078,0,142125


In [77]:
# Somehow it does not work. Throws `Index out of range` error

train_schema = StructType(fields=[
    StructField("MSSubClass", IntegerType()),
    StructField("MSZoning", StructType()),
    StructField("LotFrontage", FloatType(), nullable=True),
    StructField("LotArea", IntegerType()),
    StructField("Street", StringType()),
    StructField("OverallQual", IntegerType()),
    StructField("1stFlrSF", IntegerType()),
    StructField("2ndFlrSF", IntegerType()),
    StructField("SalePrice", FloatType())
])

test_schema = StructType(fields=[
    StructField("MSSubClass", IntegerType()),
    StructField("MSZoning", StructType()),
    StructField("LotFrontage", FloatType(), nullable=True),
    StructField("LotArea", IntegerType()),
    StructField("Street", StringType()),
    StructField("OverallQual", IntegerType()),
    StructField("1stFlrSF", IntegerType()),
    StructField("2ndFlrSF", IntegerType())
])

In [78]:
train = spark.createDataFrame(train_df)
test = spark.createDataFrame(test_df)

## Показать датафрейм

In [79]:
train.show()

+----------+--------+-----------+-------+------+-----------+--------+--------+---------+
|MSSubClass|MSZoning|LotFrontage|LotArea|Street|OverallQual|1stFlrSF|2ndFlrSF|SalePrice|
+----------+--------+-----------+-------+------+-----------+--------+--------+---------+
|        60|      RL|       65.0|   8450|  Pave|          7|     856|     854|   208500|
|        20|      RL|       80.0|   9600|  Pave|          6|    1262|       0|   181500|
|        60|      RL|       68.0|  11250|  Pave|          7|     920|     866|   223500|
|        70|      RL|       60.0|   9550|  Pave|          7|     961|     756|   140000|
|        60|      RL|       84.0|  14260|  Pave|          8|    1145|    1053|   250000|
|        50|      RL|       85.0|  14115|  Pave|          5|     796|     566|   143000|
|        20|      RL|       75.0|  10084|  Pave|          8|    1694|       0|   307000|
|        60|      RL|        NaN|  10382|  Pave|          7|    1107|     983|   200000|
|        50|      RM|

## Заполнить в колонке NaN средним значением

### Удалить таргет из трейна (1-е преобразование)

In [80]:
train_without_target = train.drop("SalePrice")

### Объединить датафреймы (2-е преобразование)

In [81]:
combined_df = train_without_target.union(test)

### Получить среднее для колонки (3-е преобразование)

In [82]:
mean_value = combined_df.dropna().select(mean(combined_df['LotFrontage'])).collect()[0][0]

In [83]:
train = train.fillna({'LotFrontage': round(mean_value, 1)})
test = test.fillna({'LotFrontage': round(mean_value, 1)})

In [84]:
train.show()

+----------+--------+-----------+-------+------+-----------+--------+--------+---------+
|MSSubClass|MSZoning|LotFrontage|LotArea|Street|OverallQual|1stFlrSF|2ndFlrSF|SalePrice|
+----------+--------+-----------+-------+------+-----------+--------+--------+---------+
|        60|      RL|       65.0|   8450|  Pave|          7|     856|     854|   208500|
|        20|      RL|       80.0|   9600|  Pave|          6|    1262|       0|   181500|
|        60|      RL|       68.0|  11250|  Pave|          7|     920|     866|   223500|
|        70|      RL|       60.0|   9550|  Pave|          7|     961|     756|   140000|
|        60|      RL|       84.0|  14260|  Pave|          8|    1145|    1053|   250000|
|        50|      RL|       85.0|  14115|  Pave|          5|     796|     566|   143000|
|        20|      RL|       75.0|  10084|  Pave|          8|    1694|       0|   307000|
|        60|      RL|       69.3|  10382|  Pave|          7|    1107|     983|   200000|
|        50|      RM|

## Удалить колонку (4-е преобразование)

In [85]:
train = train.drop("MSZoning")
test = test.drop('MSZoning')

## Переименуем колонку (5-е преобразование)

In [88]:
train = train.withColumnRenamed("MSSubClass", "DwellingClass")
test = test.withColumnRenamed("MSSubClass", "DwellingClass")

## Покажем в train только те дома, у которых `Overall Quality` = 10 (6-е преобразование)

In [91]:
train.filter(train.OverallQual == 10).show()

+-------------+-----------+-------+------+-----------+--------+--------+---------+
|DwellingClass|LotFrontage|LotArea|Street|OverallQual|1stFlrSF|2ndFlrSF|SalePrice|
+-------------+-----------+-------+------+-----------+--------+--------+---------+
|           60|       66.0|  13682|  Pave|         10|    1426|    1519|   438780|
|           75|       90.0|  22950|  Pave|         10|    1518|    1518|   475000|
|           20|      103.0|  13472|  Pave|         10|    2392|       0|   386250|
|           60|       96.0|  12474|  Pave|         10|    1742|     590|   426000|
|           20|      105.0|  15431|  Pave|         10|    2402|       0|   555000|
|           20|       94.0|  12220|  Pave|         10|    2020|       0|   402861|
|           60|      130.0|  40094|  Pave|         10|    3138|    1538|   184750|
|           75|       75.0|  13500|  Pave|         10|    1521|    1254|   325000|
|           60|       97.0|  13478|  Pave|         10|    1728|     568|   451950|
|   

## One-Hot Encoding (7-е и тд преобразование)

In [97]:
categorical_col = "Street"
string_indexer = StringIndexer(inputCol=categorical_col, outputCol="indexed_" + categorical_col)
encoder = OneHotEncoder(inputCol="indexed_" + categorical_col, outputCol="onehot_" + categorical_col)

In [98]:
pipeline = Pipeline(stages=[string_indexer, encoder])
pipeline_model = pipeline.fit(train)
train = pipeline_model.transform(train)

In [99]:
test = pipeline_model.transform(test)

In [107]:
train = train.drop("Street", "indexed_Street")
test = test.drop("Street", "indexed_Street")

In [109]:
train.show()

+-------------+-----------+-------+-----------+--------+--------+---------+-------------+
|DwellingClass|LotFrontage|LotArea|OverallQual|1stFlrSF|2ndFlrSF|SalePrice|onehot_Street|
+-------------+-----------+-------+-----------+--------+--------+---------+-------------+
|           60|       65.0|   8450|          7|     856|     854|   208500|(1,[0],[1.0])|
|           20|       80.0|   9600|          6|    1262|       0|   181500|(1,[0],[1.0])|
|           60|       68.0|  11250|          7|     920|     866|   223500|(1,[0],[1.0])|
|           70|       60.0|   9550|          7|     961|     756|   140000|(1,[0],[1.0])|
|           60|       84.0|  14260|          8|    1145|    1053|   250000|(1,[0],[1.0])|
|           50|       85.0|  14115|          5|     796|     566|   143000|(1,[0],[1.0])|
|           20|       75.0|  10084|          8|    1694|       0|   307000|(1,[0],[1.0])|
|           60|       69.3|  10382|          7|    1107|     983|   200000|(1,[0],[1.0])|
|         

In [110]:
assembler = VectorAssembler(
    inputCols=["DwellingClass", "LotFrontage", "LotArea", "OverallQual", "1stFlrSF", "2ndFlrSF", "onehot_Street"],
    outputCol="features")

In [111]:
train_features = assembler.transform(train)
test_features = assembler.transform(test)

In [113]:
lr = LinearRegression(featuresCol="features", labelCol="SalePrice")
lr_model = lr.fit(train_features)

24/04/04 00:48:19 WARN Instrumentation: [4f0bd5fc] regParam is zero, which might cause numerical instability and overfitting.
24/04/04 00:48:19 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/04/04 00:48:19 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [114]:
predictions = lr_model.transform(test_features)

In [122]:
train_preds = lr_model.transform(train_features)

In [123]:
# predictions.select("features", "prediction").show()

In [124]:
# Evaluate predictions using regression evaluator
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SalePrice", metricName="mse")
mse = evaluator.evaluate(train_preds)

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SalePrice", metricName="rmse")
rmse = evaluator.evaluate(train_preds)

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SalePrice", metricName="mae")
mae = evaluator.evaluate(train_preds)

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SalePrice", metricName="r2")
r2 = evaluator.evaluate(train_preds)

print("Mean Squared Error (MSE):", mse)
print("Root Mean Squared Error (RMSE):", rmse)
print("Mean Absolute Error (MAE):", mae)
print("R-squared (R²):", r2)

Mean Squared Error (MSE): 1567029928.6011755
Root Mean Squared Error (RMSE): 39585.728850195184
Mean Absolute Error (MAE): 25983.82098196202
R-squared (R²): 0.751532827302727


In [None]:
spark.stop()