In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, desc, count, year
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
import pandas as pd

# Tworzenie sesji Spark
spark = SparkSession.builder \
    .appName("Real Estate Analysis") \
    .getOrCreate()

# Wczytanie danych
df = spark.read.csv("real_estate.csv", header=True, inferSchema=True, sep=",")

# Wstępna analiza danych
df.show(5)
df.describe().show()

# Analiza średnich cen sprzedaży w zależności od roku
df.groupBy("List Year").agg(avg("Sale Amount").alias("Average Sale Amount")).orderBy("List Year").show()

# Analiza średnich cen sprzedaży w zależności od typu nieruchomości
df.groupBy("Property Type").agg(avg("Sale Amount").alias("Average Sale Amount")).show()

# Analiza współczynnika sprzedaży (Sales Ratio) w zależności od typu nieruchomości
df.groupBy("Property Type").agg(avg("Sales Ratio").alias("Average Sales Ratio")).show()

# Usunięcie wierszy zawierających wartości NULL w kolumnach używanych do regresji
df_ml = df.select("Assessed Value", "Sale Amount", "Sales Ratio", "List Year", "Longitude", "Latitude").na.drop()

# Przygotowanie danych do regresji
feature_cols = ["Assessed Value", "Sales Ratio", "List Year", "Longitude", "Latitude"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_ml = assembler.transform(df_ml).select("features", "Sale Amount")

# Podział na zbiory treningowy i testowy
train_data, test_data = df_ml.randomSplit([0.8, 0.2])

# Tworzenie modelu regresji liniowej
lr = LinearRegression(labelCol="Sale Amount")
lr_model = lr.fit(train_data)

# Wyniki modelu
training_summary = lr_model.summary
print("Coefficients: ", lr_model.coefficients)
print("Intercept: ", lr_model.intercept)
print("RMSE: ", training_summary.rootMeanSquaredError)
print("R2: ", training_summary.r2)

# Predykcja na zbiorze testowym
predictions = lr_model.transform(test_data)
predictions.select("prediction", "Sale Amount", "features").show()

# Przygotowanie danych do klasteryzacji
feature_cols = ["Assessed Value", "Sale Amount"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_ml_kmeans = assembler.transform(df.select(*feature_cols).na.drop())

# Tworzenie modelu K-means
kmeans = KMeans(featuresCol="features", k=3)
model = kmeans.fit(df_ml_kmeans)

# Predykcja klastrów
predictions_kmeans = model.transform(df_ml_kmeans)
predictions_kmeans.select("Assessed Value", "Sale Amount", "prediction").show()

# Wyświetlenie środków klastrów
centers = model.clusterCenters()
print("Cluster Centers:")
for center in centers:
    print(center)

+-------------+---------+-------+---------------+--------------+-----------+-----------+-------------+----------------+---------+--------+
|Date Recorded|List Year|   Town|        Address|Assessed Value|Sale Amount|Sales Ratio|Property Type|Residential Type|Longitude|Latitude|
+-------------+---------+-------+---------------+--------------+-----------+-----------+-------------+----------------+---------+--------+
|   2021-04-14|     2020|Ansonia|  323 BEAVER ST|      133000.0|   248400.0|     0.5354|  Residential|   Single Family|-73.06822|41.35014|
|   2021-05-26|     2020|Ansonia| 152 JACKSON ST|      110500.0|   239900.0|     0.4606|  Residential|    Three Family|     NULL|    NULL|
|   2021-09-13|     2020|Ansonia|230 WAKELEE AVE|      150500.0|   325000.0|      0.463|   Commercial|            NULL|     NULL|    NULL|
|   2020-12-14|     2020|Ansonia|    57 PLATT ST|      127400.0|   202500.0|     0.6291|  Residential|      Two Family|     NULL|    NULL|
|   2021-09-07|     2020|  