In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col
import pandas as pd

In [9]:
spark = SparkSession.builder.appName("ml_project").master("local[*]").getOrCreate()

In [37]:
data = spark.read.csv("Cleaned.csv", sep=",", header=True, inferSchema=True)
data.cache()
data.show(10)

+--------+-------------------+-------------------+-------------------+--------------------+------------------+----------+----+---------+-------------------+--------------------+--------------------+------------------+----+-----+-----------+-----------------------+-------------+------------------+------------+-----------+--------------+---------------+--------------+---------------+-----------+-------------+--------------+----------------+-------------+------------+---------+-------------+---------------------+-----------------+-----------+------------------+-----------+--------------+------------------+---------------+------------+------------+---------------+------------+-----------+--------------+-----------+------------+--------------+--------------+---------------+--------------------+------------+-----------+----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+--------------

In [106]:
#data = data.dropna() # drop rows with missing values
#exprs = [col(column).alias(column.replace(' ', '_')) for column in data.columns]

In [38]:
# Stworzenie wektora cech
feature_columns = [col for col in data.columns if col != 'price']
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(data)

# Podział danych na zbiór treningowy i testowy
train_data, test_data = data.randomSplit([0.8, 0.2])

In [39]:
test_data.show(10)

+-------------+-------------------+-------------------+-------------------+--------------------+------+----------+----+---------+-------------------+-------------+-------------------+------------------+----+-----+-----------+-----------------------+-------------+------------------+------------+-----------+--------------+---------------+--------------+---------------+-----------+-------------+--------------+----------------+-------------+------------+---------+-------------+---------------------+-----------------+-----------+------------------+-----------+--------------+------------------+---------------+------------+------------+---------------+------------+-----------+--------------+-----------+------------+--------------+--------------+---------------+--------------------+------------+-----------+----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------

In [40]:
train_data.select('features').show()

+--------------------+
|            features|
+--------------------+
|(132,[0,2,3,7,8,1...|
|(132,[0,1,2,3,7,8...|
|(132,[0,1,2,3,7,8...|
|(132,[0,1,2,3,6,7...|
|(132,[0,1,2,3,7,8...|
|(132,[0,1,2,3,7,8...|
|(132,[0,1,2,3,7,8...|
|(132,[0,1,2,3,7,8...|
|(132,[1,2,3,7,8,1...|
|(132,[0,1,2,3,7,8...|
|(132,[0,1,2,3,7,8...|
|(132,[0,1,2,3,7,8...|
|(132,[0,1,2,3,7,8...|
|(132,[1,2,3,7,8,1...|
|(132,[0,1,2,3,7,8...|
|(132,[0,1,2,3,4,7...|
|(132,[0,1,2,3,7,8...|
|(132,[0,1,2,3,4,7...|
|(132,[0,1,2,3,4,7...|
|(132,[0,1,2,3,4,7...|
+--------------------+
only showing top 20 rows



# Linear Regression

In [41]:
from pyspark.ml.evaluation import RegressionEvaluator

# Tworzenie modelu i przeprowadzenie Cross-Validation
lr = LinearRegression(featuresCol="features", labelCol="price")

# Siatka parametrów (Grid Search)
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.5, 0.2, 0.1, 0.05 ,0.01]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Cross-Validation
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol="price", metricName="rmse"),
                          numFolds=5)

# Trening
lr_model = crossval.fit(train_data)

# Wyświetlenie najlepszych parametrów
best_model = lr_model.bestModel

print("Best Model Parameters:")
print(f"  regParam: {best_model._java_obj.getRegParam()}")
print(f"  elasticNetParam: {best_model._java_obj.getElasticNetParam()}")

Best Model Parameters:
  regParam: 0.5
  elasticNetParam: 0.0


In [42]:
pred = lr_model.transform(train_data)
pred.show(10)

from pyspark.ml.evaluation import RegressionEvaluator
evalutor = RegressionEvaluator(metricName='r2', labelCol='price', predictionCol='prediction')
evalutor.evaluate(pred)

+-------------+-------------------+-------------------+-------------------+--------------------+------+----------+----+---------+-------------------+-------------------+-------------------+------------------+----+-----+-----------+-----------------------+-------------+------------------+------------+-----------+--------------+---------------+--------------+---------------+-----------+-------------+--------------+----------------+-------------+------------+---------+-------------+---------------------+-----------------+-----------+------------------+-----------+--------------+------------------+---------------+------------+------------+---------------+------------+-----------+--------------+-----------+------------+--------------+--------------+---------------+--------------------+------------+-----------+----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----

0.7938843796204281

# Decision Tree Regressor

In [43]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

# Tworzenie modelu DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol="features", labelCol="price")

# Grid Search - hiperparametry dla DecisionTreeRegressor
paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [3, 5, 10]) \
    .addGrid(dt.maxBins, [32, 64, 128]) \
    .addGrid(dt.minInstancesPerNode, [1, 5, 10]) \
    .build()

# Ewaluator regresji
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")  # Root Mean Squared Error

# Cross-Validation
crossval = CrossValidator(estimator=dt,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)  # Liczba podziałów dla walidacji krzyżowej

# Trening modelu
dt_model = crossval.fit(train_data)

# Wyświetlenie najlepszych parametrów
best_model = dt_model.bestModel

print("Best Model Parameters:")
print(f"  maxDepth: {best_model._java_obj.getMaxDepth()}")
print(f"  maxBins: {best_model._java_obj.getMaxBins()}")
print(f"  minInstancesPerNode: {best_model._java_obj.getMinInstancesPerNode()}")

Best Model Parameters:
  maxDepth: 10
  maxBins: 32
  minInstancesPerNode: 5


In [44]:
pred = dt_model.transform(test_data)
pred.show(10)

from pyspark.ml.evaluation import RegressionEvaluator
evalutor = RegressionEvaluator(metricName='r2', labelCol='price', predictionCol='prediction')
evalutor.evaluate(pred)

+-------------+-------------------+-------------------+-------------------+--------------------+------+----------+----+---------+-------------------+-------------+-------------------+------------------+----+-----+-----------+-----------------------+-------------+------------------+------------+-----------+--------------+---------------+--------------+---------------+-----------+-------------+--------------+----------------+-------------+------------+---------+-------------+---------------------+-----------------+-----------+------------------+-----------+--------------+------------------+---------------+------------+------------+---------------+------------+-----------+--------------+-----------+------------+--------------+--------------+---------------+--------------------+------------+-----------+----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------

0.5644422833092169

# Random Forest Regressor

In [85]:
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(featuresCol="features", labelCol="price")

paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 50, 100]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .addGrid(rf.maxBins, [32, 64]) \
    .addGrid(rf.minInstancesPerNode, [1, 2]) \
    .addGrid(rf.featureSubsetStrategy, ["auto", "sqrt", "log2"]) \
    .build()

# Cross-Validation
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol="price", metricName="accuracy"),
                          numFolds=5)

# Trening
rf_model = crossval.fit(train_data)

# Wyświetlenie najlepszych parametrów
best_model = rf_model.bestModel

print("Best Model Parameters:")
print(f"  numTrees: {best_model.getNumTrees}")
print(f"  maxDepth: {best_model.getOrDefault('maxDepth')}")
print(f"  maxBins: {best_model.getOrDefault('maxBins')}")
print(f"  featureSubsetStrategy: {best_model.getOrDefault('featureSubsetStrategy')}")

In [86]:
pred = rf_model.transform(test_data)
pred.show(10)

from pyspark.ml.evaluation import RegressionEvaluator
evalutor = RegressionEvaluator(metricName='r2', labelCol='price', predictionCol='prediction')
evalutor.evaluate(pred)

+--------+--------+---------+-----------+--------+------+----------+----+---------+----------+-------------+--------+------------+--------------------+------------------+
|   price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|condition|sqft_above|sqft_basement|yr_built|yr_renovated|            features|        prediction|
+--------+--------+---------+-----------+--------+------+----------+----+---------+----------+-------------+--------+------------+--------------------+------------------+
|110700.0|     2.0|      1.0|        680|    8064|   1.0|         0|   0|        3|       680|            0|    1941|        1994|[2.0,1.0,680.0,80...| 305893.7774438728|
|155000.0|     2.0|      1.0|        700|    5200|   1.0|         0|   0|        5|       700|            0|    1952|        1998|[2.0,1.0,700.0,52...|307432.07651081856|
|156000.0|     3.0|      1.0|        970|    8580|   1.0|         0|   0|        3|       970|            0|    1959|        1989|[3.0,1.0,970.0,

0.19897292777452602

# Gradient Based Tree Regressor

In [87]:
from pyspark.ml.regression import GBTRegressor

gbt = GBTRegressor(featuresCol="features", labelCol="price")

# Grid Search (opcjonalne)
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Cross-Validation
crossval = CrossValidator(estimator=gbt,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol="price", metricName="accuracy"),
                          numFolds=5)

# Trening
gbt_model = gbt.fit(train_data)

In [88]:
pred = gbt_model.transform(test_data)
pred.show(10)

from pyspark.ml.evaluation import RegressionEvaluator
evalutor = RegressionEvaluator(metricName='r2', labelCol='price', predictionCol='prediction')
evalutor.evaluate(pred)

+--------+--------+---------+-----------+--------+------+----------+----+---------+----------+-------------+--------+------------+--------------------+------------------+
|   price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|condition|sqft_above|sqft_basement|yr_built|yr_renovated|            features|        prediction|
+--------+--------+---------+-----------+--------+------+----------+----+---------+----------+-------------+--------+------------+--------------------+------------------+
|110700.0|     2.0|      1.0|        680|    8064|   1.0|         0|   0|        3|       680|            0|    1941|        1994|[2.0,1.0,680.0,80...|207423.92746863712|
|155000.0|     2.0|      1.0|        700|    5200|   1.0|         0|   0|        5|       700|            0|    1952|        1998|[2.0,1.0,700.0,52...| 361955.7889082445|
|156000.0|     3.0|      1.0|        970|    8580|   1.0|         0|   0|        3|       970|            0|    1959|        1989|[3.0,1.0,970.0,

0.142654704619797

In [89]:
from pyspark.ml.regression import RidgeRegression

rrm = RidgeRegression(featuresCol="features", labelCol="price")

# Grid Search (opcjonalne)
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Cross-Validation
crossval = CrossValidator(estimator=gbt,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol="price", metricName="accuracy"),
                          numFolds=5)

# Trening
gbt_model = rrm.fit(train_data)

TypeError: LinearModel.__init__() got an unexpected keyword argument 'featuresCol'

In [None]:
pred = gbt_model.transform(test_data)
pred.show(10)

from pyspark.ml.evaluation import RegressionEvaluator
evalutor = RegressionEvaluator(metricName='r2', labelCol='price', predictionCol='prediction')
evalutor.evaluate(pred)