# Imports

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, avg, regexp_replace, stddev, min, max, count, expr, substring
from pyspark import SparkConf, SparkContext
import time
import pandas as pd

import os
os.environ["PYSPARK_PYTHON"] = "C:/Python/python.exe"
os.environ["PYSPARK_DRIVER_PYTHON"] = "C:/Python/python.exe"


In [2]:

# Initialize Spark session
spark = SparkSession.builder.appName("IrelandPropertyPrices").getOrCreate()

In [3]:
# Load dataset
data_path = "House Prices.csv"
data_path_2 = "House Prices Large.csv"
df = spark.read.csv(data_path_2, header=True, inferSchema=True)
df.head()


Row(Date='28/01/2010', Price='250,000.00', Address='18 Wolseley Court, Tullow', County='Carlow')

# Data Cleanup

In [4]:
# Convert date string to standard SQL format
df = df.withColumn("Date", to_date(col("Date"), "dd/MM/yyyy"))
# Remove commas from Price column and convert to float
df = df.withColumn("Price", regexp_replace(col("Price"), ",", "").cast("float"))

df = df.filter(col("Price").isNotNull())
df.head()

Row(Date=datetime.date(2010, 1, 28), Price=250000.0, Address='18 Wolseley Court, Tullow', County='Carlow')

# Section 3

In [5]:
start_time = time.time()
df_count = df.groupBy("County").agg(count("*").alias("transaction_count"))
df_count.show()
print("Transaction count computation time: {:.2f} seconds".format(time.time() - start_time))

+---------+-----------------+
|   County|transaction_count|
+---------+-----------------+
|    Clare|           230098|
|  Wexford|           342342|
|  Leitrim|            90750|
|Roscommon|           143726|
|   Dublin|          3225112|
| Limerick|           376618|
|  Donegal|           275968|
|   Galway|           507606|
|   Offaly|           120252|
|     Cork|          1103102|
|  Kildare|           506660|
|Tipperary|           270094|
| Monaghan|            78166|
|  Wicklow|           313940|
|     Mayo|           238304|
|    Meath|           401214|
|   Carlow|           109296|
|Westmeath|           194040|
|    Louth|           268378|
|    Sligo|           149050|
+---------+-----------------+
only showing top 20 rows

Transaction count computation time: 4.25 seconds


In [6]:
start_time = time.time()
df_stats = df.groupBy("County").agg(
    avg("Price").alias("avg_price"),
    stddev("Price").alias("std_price"),
    min("Price").alias("min_price"),
    max("Price").alias("max_price")
)
df_stats.show()
print("Price statistics computation time: {:.2f} seconds".format(time.time() - start_time))


+---------+------------------+------------------+---------+------------+
|   County|         avg_price|         std_price|min_price|   max_price|
+---------+------------------+------------------+---------+------------+
|    Clare|157858.54780366342| 417076.8922726006|   5987.0|     3.318E7|
|  Wexford|157917.46489019023| 152657.1161820317|   5500.0|    1.3995E7|
|  Leitrim|100152.79677462121|  73509.4532566325|   6000.0|   1234513.0|
|Roscommon|102819.36880758026| 80877.64824711277|   5179.0|   1670000.0|
|   Dublin|407106.22332968016|1366344.2071813613|   5250.0|1.42256576E8|
| Limerick|167566.64301940074|176922.32066084692|   6000.0| 1.1159572E7|
|  Donegal|116489.85552402418| 96869.32158033803|   5079.0|   2904000.0|
|   Galway|202985.68767926714| 325728.7438527414|   5864.0|    3.4781E7|
|   Offaly| 137048.4111739046| 91011.57060714523|   6200.0|   1460000.0|
|     Cork| 226722.7130350809| 609174.3794845189|  5030.53|  6.987348E7|
|  Kildare|264110.19415758655|329899.73468304885|  

In [7]:
start_time = time.time()
df_percentiles = df.groupBy("County").agg(
    expr("percentile_approx(Price, 0.25) as price_25"),
    expr("percentile_approx(Price, 0.5) as median_price"),
    expr("percentile_approx(Price, 0.75) as price_75")
)
df_percentiles.show()
print("Price percentiles computation time: {:.2f} seconds".format(time.time() - start_time))


+---------+--------+------------+---------+
|   County|price_25|median_price| price_75|
+---------+--------+------------+---------+
|    Clare| 80000.0|    133000.0| 195000.0|
|  Wexford| 90000.0|    144000.0| 199000.0|
|  Leitrim| 51300.0|     83000.0| 130000.0|
|Roscommon| 50000.0|     83500.0| 140000.0|
|   Dublin|210000.0|    300000.0| 430000.0|
| Limerick| 80500.0|    145000.0| 220000.0|
|  Donegal| 60000.0|    100000.0| 150000.0|
|   Galway|105000.0|    172000.0| 251580.0|
|   Offaly| 75000.0|    123000.0| 175000.0|
|     Cork|120000.0|    190000.0| 272000.0|
|  Kildare|170000.0|    243000.0|317180.62|
|Tipperary| 67000.0|    120000.0| 175000.0|
| Monaghan| 64709.0|    115000.0| 170000.0|
|  Wicklow|181500.0|    263000.0| 375000.0|
|     Mayo| 60000.0|    105000.0| 161000.0|
|    Meath|155000.0|    225000.0| 288986.0|
|   Carlow| 87000.0|    135000.0| 186000.0|
|Westmeath| 85000.0|    130000.0| 188000.0|
|    Louth|105000.0|    163000.0|220264.31|
|    Sligo| 65000.0|    105000.0

In [8]:
start_time = time.time()
# Create a Month column in the format 'YYYY-MM'
df = df.withColumn("Month", substring(col("Date").cast("string"), 1, 7))
df_monthly_avg = df.groupBy("County", "Month").agg(avg("Price").alias("avg_price"))
df_monthly_avg.show()
print("Monthly average price computation time: {:.2f} seconds".format(time.time() - start_time))

+---------+-------+------------------+
|   County|  Month|         avg_price|
+---------+-------+------------------+
|  Donegal|2017-10|111062.43911047149|
|  Donegal|2018-01|123682.99894425676|
|   Galway|2012-06| 156283.1541498656|
|   Galway|2019-05|216289.01270933016|
|    Kerry|2012-02|143969.69850852274|
|    Kerry|2014-02| 115519.3711622807|
|  Kildare|2020-04| 370937.2877731822|
|  Kildare|2020-10| 308641.8259715545|
| Kilkenny|2017-07|185898.83333333334|
| Limerick|2019-05|179867.83138420247|
|    Louth|2020-04|205108.65651939655|
|     Mayo|2015-03|107395.60106382979|
| Monaghan|2014-09|     109111.460625|
|   Offaly|2014-03|112511.11111111111|
|   Offaly|2018-03| 126440.1689453125|
|Waterford|2013-02|         114253.24|
|Waterford|2020-02|191644.66349003234|
|Waterford|2020-06|196748.84053938356|
|Westmeath|2018-01|206035.15735035212|
|  Wexford|2010-12|161412.25137061405|
+---------+-------+------------------+
only showing top 20 rows

Monthly average price computation time

# Section 4
<br> parallel

In [9]:
# Convert DataFrame to RDD for MapReduce processing
rdd = df.select("County", "Price").rdd.map(lambda row: (row.County, (row.Price, 1)))

# Reducer function to sum prices and counts
def reducer(val1, val2):
    return (val1[0] + val2[0], val1[1] + val2[1])

start_time = time.time()
reduced_rdd = rdd.reduceByKey(reducer)
final_rdd = reduced_rdd.mapValues(lambda x: x[0] / x[1])
result = final_rdd.collect()

print("MapReduce average price per county:")
for county, avg_price in result:
    print(f"{county}: {avg_price:.2f}")
print("MapReduce computation time: {:.2f} seconds".format(time.time() - start_time))


MapReduce average price per county:
Cavan: 117502.10
Clare: 157858.55
Laois: 147404.01
Carlow: 150984.66
Leitrim: 100152.80
Offaly: 137048.41
Waterford: 156911.61
Wexford: 157917.46
Meath: 238104.12
Dublin: 407106.22
Kildare: 264110.19
Kilkenny: 180548.36
Sligo: 133207.24
Monaghan: 129981.76
Kerry: 159410.14
Donegal: 116489.86
Louth: 182448.91
Roscommon: 102819.37
Wicklow: 309327.62
Cork: 226722.71
Galway: 202985.69
Westmeath: 150854.49
Limerick: 167566.64
Tipperary: 137579.83
Longford: 95573.43
Mayo: 126441.33
MapReduce computation time: 36.77 seconds


In [None]:
start_time = time.time()
df_monthly_metrics = df.groupBy("County", "Month").agg(
    avg("Price").alias("avg_price"),
    count("*").alias("transaction_count")
)
df_monthly_metrics.show()
print("Monthly metrics (average price and count) computation time: {:.2f} seconds".format(time.time() - start_time))

non parallel

In [10]:
df_non_parallel = df.coalesce(1)

start_time = time.time()
avg_price_np_df = df_non_parallel.groupBy("County").agg(avg("Price").alias("avg_price"))
avg_price_np_df.show()
print("Non-parallel (single partition) average price per county computation time: {:.2f} seconds".format(time.time() - start_time))

+---------+------------------+
|   County|         avg_price|
+---------+------------------+
|   Carlow| 150984.6575803895|
|    Cavan| 117502.1000625127|
|    Clare|157858.54780366342|
|     Cork| 226722.7130350809|
|  Donegal|116489.85552402418|
|   Dublin|407106.22332968016|
|   Galway|202985.68767926714|
|    Kerry|159410.14217606955|
|  Kildare|264110.19415758655|
| Kilkenny|180548.35543866633|
|    Laois|147404.00575155977|
|  Leitrim|100152.79677462121|
| Limerick|167566.64301940074|
| Longford| 95573.42759475826|
|    Louth|182448.90702370903|
|     Mayo| 126441.3280941218|
|    Meath|238104.12462376882|
| Monaghan|129981.76315844445|
|   Offaly| 137048.4111739046|
|Roscommon|102819.36880758026|
+---------+------------------+
only showing top 20 rows

Non-parallel (single partition) average price per county computation time: 21.38 seconds


In [11]:
# Force the DataFrame to a single partition to simulate non-parallel execution
df_non_parallel = df.coalesce(1)

start_time = time.time()
monthly_metrics_np_df = df_non_parallel.groupBy("County", "Month").agg(
    avg("Price").alias("avg_price"),
    count("*").alias("transaction_count")
)
monthly_metrics_np_df.show()
print("Non-parallel (single partition) monthly metrics computation time: {:.2f} seconds".format(time.time() - start_time))


+------+-------+------------------+-----------------+
|County|  Month|         avg_price|transaction_count|
+------+-------+------------------+-----------------+
|Carlow|2010-01|169445.59479166666|              330|
|Carlow|2010-02|177205.77083333334|              264|
|Carlow|2010-03|169528.29447115384|              286|
|Carlow|2010-04|         161593.75|              352|
|Carlow|2010-05|205699.17647058822|              374|
|Carlow|2010-06|          165359.0|              242|
|Carlow|2010-07|153316.91974431818|              484|
|Carlow|2010-08|212958.17310855264|              418|
|Carlow|2010-09| 178012.7762784091|              484|
|Carlow|2010-10|178776.13454861112|              396|
|Carlow|2010-11|160224.91623263888|              792|
|Carlow|2010-12|171669.46510416668|              660|
|Carlow|2011-01|170409.58353365384|              286|
|Carlow|2011-02|168266.91517857142|              308|
|Carlow|2011-03|154126.38461538462|              286|
|Carlow|2011-04|127955.54616