In [1]:
import yaml
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, col, lag, avg, abs, lit, min, udf, dense_rank
from pyspark.sql.window import Window
from haversine import haversine, Unit
from pyspark.sql.types import FloatType

In [2]:
spark = SparkSession.builder.appName('finalproject')\
        .config('spark.driver.extraClassPath','/usr/lib/jvm/java-17-openjdk-amd64/lib/postgresql-42.6.0.jar')\
        .getOrCreate()

your 131072x1 screen size is bogus. expect trouble
23/09/13 17:26:36 WARN Utils: Your hostname, SUSHAN resolves to a loopback address: 127.0.1.1; using 172.22.230.162 instead (on interface eth0)
23/09/13 17:26:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/13 17:26:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
stationInfo_df = spark.read.csv("data/Fuel_Station_Information.csv", header=True, inferSchema=True)
hourlyPrices_df = spark.read.csv("data/Hourly_Gasoline_Prices.csv", header=True, inferSchema=True)
hourlyPrices_df = hourlyPrices_df.dropDuplicates()
stationInfo_df = stationInfo_df.dropDuplicates()
hourlyPrices_df = hourlyPrices_df.dropna()
stationInfo_df = stationInfo_df.dropna()
hourlyPrices_df = hourlyPrices_df.withColumn("Date", to_timestamp(col("Date"), "yyyy-MM-dd HH:mm:ss"))
hourlyPrices_df.write.parquet("data/cleaned_fuel_prices.parquet", mode= "overwrite", compression= "snappy")
stationInfo_df.write.parquet("data/cleaned_station_info.parquet", mode= "overwrite", compression= "snappy")

23/09/13 17:27:16 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [4]:
yaml_file_path = 'credentials.yaml'

with open(yaml_file_path, 'r') as yaml_file:
    config = yaml.safe_load(yaml_file)

In [5]:
# Read the Parquet files
hourlyPrices_df = spark.read.parquet("data/cleaned_fuel_prices.parquet")
stationInfo_df = spark.read.parquet("data/cleaned_station_info.parquet")
stationInfo_df = stationInfo_df.withColumn("Latitude", stationInfo_df["Latitude"].cast(FloatType()))
stationInfo_df = stationInfo_df.withColumn("Longitudine", stationInfo_df["Longitudine"].cast(FloatType()))
# Define the JDBC connection properties
jdbc_url = "jdbc:postgresql://localhost:5432/sparkProject"
jdbc_properties = {
    "user": config['postgres']["user"],
    "password": str(config['postgres']["password"]),
    "driver": "org.postgresql.Driver"
}
# Save DataFrames to PostgreSQL tables
hourlyPrices_df.write.jdbc(url=jdbc_url, table="hourly_gasoline_prices", mode="overwrite", properties=jdbc_properties)
stationInfo_df.write.jdbc(url=jdbc_url, table="fuel_station_information", mode="overwrite", properties=jdbc_properties)

                                                                                

In [6]:
# Read DataFrames to PostgreSQL tables
hourlyPrices_df = spark.read.format('jdbc').options(url="jdbc:postgresql://localhost:5432/sparkProject", driver = 'org.postgresql.Driver', dbtable='hourly_gasoline_prices', user=config['postgres']["user"],password=str(config['postgres']["password"])).load()
stationInfo_df = spark.read.format('jdbc').options(url="jdbc:postgresql://localhost:5432/sparkProject", driver = 'org.postgresql.Driver', dbtable='fuel_station_information', user=config['postgres']["user"],password=str(config['postgres']["password"])).load()

### Q1. Find the `Most Stable Company` and `Most Volatile Company` based on `Average Daily Price Variation`


In [7]:
df_forAvg = hourlyPrices_df.join(stationInfo_df.select("Id", "Petrol_company"), "Id")

window_spec = Window.partitionBy("Petrol_company").orderBy("Date")

daily_var = df_forAvg.withColumn("Previous_Price", lag("Price").over(window_spec))
daily_var = daily_var.withColumn("Price_Variation", abs(col("Price") - col("Previous_Price")))
daily_var = daily_var.filter(col("Previous_Price").isNotNull())
avg_variation_df = daily_var.groupBy("Petrol_company").agg(avg("Price_Variation").alias("Avg_Daily_Variation"))

min_variation_company = avg_variation_df.orderBy("Avg_Daily_Variation").first()
max_variation_company = avg_variation_df.orderBy(avg_variation_df["Avg_Daily_Variation"].desc()).first()

print("Most Stable Company:")
print("Company:", min_variation_company["Petrol_company"])
print("Average Daily Price Variation:", min_variation_company["Avg_Daily_Variation"])

print("_*"*33)

print("Most Volatile Company:")
print("Company:", max_variation_company["Petrol_company"])
print("Average Daily Price Variation:", max_variation_company["Avg_Daily_Variation"])




Most Stable Company:
Company: Blanco petroli
Average Daily Price Variation: 0.008190127970749542
_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*_*
Most Volatile Company:
Company: COLAGROSSI CARBURANTI
Average Daily Price Variation: 0.2734666666666667


                                                                                

In [8]:
data_t1 = [
    ("Most Stable Company", min_variation_company["Petrol_company"], min_variation_company["Avg_Daily_Variation"]),
    ("Most Volatile Company", max_variation_company["Petrol_company"],  max_variation_company["Avg_Daily_Variation"])
]

schema_t1 = ["Category", "Company", "Average_Daily_Price_Variation"]
df_t1 = spark.createDataFrame(data_t1, schema_t1)

jdbc_url = "jdbc:postgresql://localhost:5432/sparkProject"
jdbc_properties = {
    "user": config['postgres']["user"],
    "password": str(config['postgres']["password"]),
    "driver": "org.postgresql.Driver"
}
df_t1.write.jdbc(url=jdbc_url, table="stable_volatile_company", mode="overwrite", properties=jdbc_properties)

                                                                                

### Q2. `Find 9 nearest stations to a certain reference station and calculate the price difference between the fuel station and its nearest competitor`

In [9]:
ref_latitude = 40.7160385
ref_longitude = 14.9413282

In [10]:
@udf(FloatType())
def calculate_distance(lat1, lon1, lat2, lon2):
    return haversine((lat1, lon1), (lat2, lon2), unit=Unit.KILOMETERS)

stationInfo_df = stationInfo_df.filter((col("Latitude").isNotNull()) & (col("Longitudine").isNotNull()))
dis_stationInfo_df = stationInfo_df.withColumn("distance",calculate_distance(lit(ref_latitude),
                     lit(ref_longitude),col("Latitude"),col("Longitudine")))
min_distance_df = dis_stationInfo_df.groupBy("Id").agg(min("distance").alias("min_distance"))
price_diff_df = hourlyPrices_df.join(min_distance_df, "Id", "inner")

window_spec = Window.orderBy(col("min_distance"))
ranked_stations_df = price_diff_df.withColumn("rank", dense_rank().over(window_spec))
deduplicated_stations_df = ranked_stations_df.dropDuplicates(['Id']).drop("Date")
filtered_stations_df = deduplicated_stations_df.filter(col("rank") <= 9).orderBy("rank")
filtered_stations_df = filtered_stations_df.drop('rank')

window_spec = Window.orderBy("min_distance")
filtered_stations_df = filtered_stations_df.withColumn("Price_of_Nearest_Competitor", lag("Price").over(window_spec))
filtered_stations_df = filtered_stations_df.withColumn(
    "PriceDifferenceFromNearestCompetitor",
    col("Price") - col("Price_of_Nearest_Competitor")
)
filtered_stations_df.show()

23/09/13 17:29:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/13 17:29:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/13 17:29:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/13 17:29:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/13 17:29:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/13 17:29:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/13 1

+-----+------+-----+------------+---------------------------+------------------------------------+
|   Id|isSelf|Price|min_distance|Price_of_Nearest_Competitor|PriceDifferenceFromNearestCompetitor|
+-----+------+-----+------------+---------------------------+------------------------------------+
|38792|     1| 1.82|  0.22640005|                       null|                                null|
|52830|     1|1.929|   0.3241125|                       1.82|                 0.10899999999999999|
|17152|     0|2.014|    2.730627|                      1.929|                 0.08499999999999974|
|17409|     0|1.979|    2.848063|                      2.014|                 -0.0349999999999997|
|51180|     1|1.759|   2.9082253|                      1.979|                 -0.2200000000000002|
|27109|     1|1.709|   4.2238717|                      1.759|                -0.04999999999999982|
|46547|     1|1.799|    4.730566|                      1.709|                 0.08999999999999986|
|23047|   

                                                                                

In [11]:
jdbc_url = "jdbc:postgresql://localhost:5432/sparkProject"
jdbc_properties = {
    "user": config['postgres']["user"],
    "password": str(config['postgres']["password"]),
    "driver": "org.postgresql.Driver"
}
# Save DataFrames to PostgreSQL tables
filtered_stations_df.write.jdbc(url=jdbc_url, table="Nearest_competitor_comparision", mode="overwrite", properties=jdbc_properties)

23/09/13 17:29:51 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/13 17:29:51 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/13 17:29:51 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/13 17:29:51 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/13 17:29:51 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/13 17:29:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/13 1

In [12]:
# Stop the SparkSession
spark.stop()