In [88]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, lit , avg,desc,from_utc_timestamp, hour, max, min,year, month
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType



# Create a Spark session
spark = SparkSession.builder.appName("project").getOrCreate()


23/09/05 13:12:04 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [89]:
data_path = "./Hourly_Gasoline_Prices.csv"
hourly_gasoline_df = spark.read.csv(data_path, header=True, inferSchema=True)

data_path = "./Fuel_Station_Information.csv"  # Replace with the actual path
fuel_station_info_df = spark.read.csv(data_path, header=True, inferSchema=True)


In [90]:
merged_df = hourly_gasoline_df.join(fuel_station_info_df, on="Id", how="inner")

merged_df.show()


+-----+------+-----+-------------------+--------------------+--------------------+------------+--------------------+--------------------+------------------+------------------+
|   Id|isSelf|Price|               Date|Fuel_station_manager|      Petrol_company|        Type|        Station_name|                City|          Latitude|       Longitudine|
+-----+------+-----+-------------------+--------------------+--------------------+------------+--------------------+--------------------+------------------+------------------+
|51169|     1|1.943|2022-01-01 11:45:53|AURA PETROLSERVIC...|              Api-Ip|Autostradale|     BAUDUCCHI OVEST|          MONCALIERI|44.977977829079535| 7.700215696606636|
|44566|     1|1.725|2022-01-02 11:15:08|BOVE SOCIETA' A R...|                Esso|    Stradale|                EASY|           GALLIPOLI|  40.0591184592996|18.032322694481195|
|44566|     0|1.775|2022-01-02 11:15:08|BOVE SOCIETA' A R...|                Esso|    Stradale|                EASY|    

In [91]:
fuel_station_info_df = fuel_station_info_df.withColumn("Id", fuel_station_info_df["Id"].cast("int"))


In [92]:
from pyspark.sql.window import Window
# Import necessary PySpark modules
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, hour
import pyspark.sql.functions as F

# Create a Spark session
spark = SparkSession.builder.appName("Total Revenue").getOrCreate()

# Assuming the DataFrames are named 'hourly_gasoline_df' and 'fuel_station_info_df'

# Merge the two datasets based on the common "Id" column
merged_df = hourly_gasoline_df.join(fuel_station_info_df, on="Id", how="inner")

# Extract the hour from the "Date" timestamp and add it as a new column "Hour"
merged_df = merged_df.withColumn("Hour", hour(merged_df["Date"]))

# Filter the data to include only self-service fuel stations in specific cities (Calcinate, Osio Sopra, Treviglio, Biella) and with Petrol_company Q8
cities_to_include = ["AGRIGENTO", "OSIO SOPRA"]
filtered_df = merged_df.filter((merged_df["isSelf"] == 1) & (merged_df["City"].isin(cities_to_include)) & (merged_df["Petrol_company"] == "Q8"))

print("Filtered Data Count:", filtered_df.count())

# Define a window specification to partition data by 'Petrol_company' and 'Hour'
window_spec = Window.partitionBy('Petrol_company', 'Hour')

# Calculate the total revenue for each group
total_revenue_df = filtered_df.withColumn('Total_Revenue', sum('Price').over(window_spec))

# Display the resulting DataFrame
total_revenue_df.show()


23/09/05 13:12:05 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Filtered Data Count: 648
+-----+------+-----+-------------------+--------------------+--------------+--------+--------------------+---------+-----------------+------------------+----+------------------+
|   Id|isSelf|Price|               Date|Fuel_station_manager|Petrol_company|    Type|        Station_name|     City|         Latitude|       Longitudine|Hour|     Total_Revenue|
+-----+------+-----+-------------------+--------------------+--------------+--------+--------------------+---------+-----------------+------------------+----+------------------+
|25614|     1|1.619|2022-03-10 00:36:30|SERVIZI & GESTION...|            Q8|Stradale|AGRIGENTO VIA FOS...|AGRIGENTO|37.29582883251122|13.547611503041026|   0|100.88299999999997|
|50463|     1|1.619|2022-03-10 00:36:30|SERVIZI & GESTION...|            Q8|Stradale|VIA LEONARDO SCIA...|AGRIGENTO|37.27432157330476|13.614238371163946|   0|100.88299999999997|
|28277|     1|1.609|2022-03-10 00:36:30|SERVIZI & GESTION...|            Q8|Stradale|

In [108]:
from pyspark.sql.functions import from_utc_timestamp, hour, max, min, array

# Assuming hourly_gasoline_df and fuel_station_info_df are your two DataFrames

# Join the two DataFrames on the common key "Id"
merged_df = hourly_gasoline_df.join(fuel_station_info_df, "Id", "inner")

# Convert the timestamp to the specified timezone (America/New_York) and extract the hour
hour_df = merged_df.withColumn("Hour", hour(from_utc_timestamp(merged_df["Date"], "America/New_York")))

# Calculate hourly price fluctuations for each city
price_fluctuations_df = hour_df.groupBy("City", "Hour").agg((max("Price") - min("Price")).alias("Price_Fluctuation"))

# Calculate max and min prices separately
max_prices_df = hour_df.groupBy("City", "Hour").agg(max("Price").alias("Max_Price"))
min_prices_df = hour_df.groupBy("City", "Hour").agg(min("Price").alias("Min_Price"))

# Join max and min prices together
fluctuation_with_max_min_df = price_fluctuations_df.join(max_prices_df, ["City", "Hour"], "inner").join(min_prices_df, ["City", "Hour"], "inner")

# Create an array column combining max and min prices
price_array_df = fluctuation_with_max_min_df.withColumn("Prices", array("Max_Price", "Min_Price"))

# Find the top 10 cities with the highest price fluctuations
top_cities_fluctuations = price_array_df.groupBy("City").agg(avg("Price_Fluctuation").alias("Avg_Fluctuation")).orderBy("Avg_Fluctuation", ascending=False).limit(10)

# Join the DataFrames on the "City" column
result_df = top_cities_fluctuations.join(price_array_df, ["City"], "inner")

# Select the desired columns and order by "Avg_Fluctuation" in descending order
result_df = result_df.select("Prices", "Avg_Fluctuation", "City").distinct().orderBy(result_df["Avg_Fluctuation"].desc())

# Show the result
result_df.show(truncate=False)




                                                                                

+--------------+------------------+-------------------+
|Prices        |Avg_Fluctuation   |City               |
+--------------+------------------+-------------------+
|[4.0, 1.669]  |1.5020909090909091|SAN DEMETRIO CORONE|
|[4.0, 1.789]  |1.5020909090909091|SAN DEMETRIO CORONE|
|[4.0, 1.699]  |1.5020909090909091|SAN DEMETRIO CORONE|
|[4.0, 4.0]    |1.5020909090909091|SAN DEMETRIO CORONE|
|[1.639, 1.639]|1.5020909090909091|SAN DEMETRIO CORONE|
|[4.0, 1.659]  |1.5020909090909091|SAN DEMETRIO CORONE|
|[1.955, 1.629]|1.5020909090909091|SAN DEMETRIO CORONE|
|[4.0, 1.679]  |1.5020909090909091|SAN DEMETRIO CORONE|
|[4.0, 1.639]  |1.5020909090909091|SAN DEMETRIO CORONE|
|[4.0, 1.579]  |1.4498260869565216|TARANTO            |
|[2.484, 1.589]|1.4498260869565216|TARANTO            |
|[2.298, 1.565]|1.4498260869565216|TARANTO            |
|[2.089, 1.645]|1.4498260869565216|TARANTO            |
|[4.0, 1.559]  |1.4498260869565216|TARANTO            |
|[2.497, 1.559]|1.4498260869565216|TARANTO      

In [94]:
hourly_gasoline_df.printSchema()


root
 |-- Id: integer (nullable = true)
 |-- isSelf: integer (nullable = true)
 |-- Price: double (nullable = true)
 |-- Date: timestamp (nullable = true)



In [95]:
fuel_station_info_df.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- Fuel_station_manager: string (nullable = true)
 |-- Petrol_company: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Station_name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitudine: string (nullable = true)



In [96]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F

# Define the window specification for the rolling average
rolling_window_spec = Window.partitionBy("City").orderBy("Date").rowsBetween(-6, 0)

# Calculate the rolling 7-hour average price
rolling_avg_price = filtered_df.withColumn("7_Hour_Avg_Price", F.avg("Price").over(rolling_window_spec))

# Determine the row with the maximum 7-hour average price within each city
max_avg_price_row = rolling_avg_price.withColumn("max_7_Hour_Avg_Price", F.max("7_Hour_Avg_Price").over(Window.partitionBy("City")))
result_df = max_avg_price_row.filter(F.col("7_Hour_Avg_Price") == F.col("max_7_Hour_Avg_Price")).drop("max_7_Hour_Avg_Price")

# Show the result
result_df.show()


+-----+------+-----+-------------------+--------------------+--------------+------------+------------+----------+-----------------+------------------+----+------------------+
|   Id|isSelf|Price|               Date|Fuel_station_manager|Petrol_company|        Type|Station_name|      City|         Latitude|       Longitudine|Hour|  7_Hour_Avg_Price|
+-----+------+-----+-------------------+--------------------+--------------+------------+------------+----------+-----------------+------------------+----+------------------+
|49460|     1|2.319|2022-10-06 12:38:53|EOS SERVICES S.R....|            Q8|    Stradale|       AG023| AGRIGENTO|37.32612049037331|13.591820001602168|  12|2.0947142857142858|
|25613|     1|1.839|2022-07-04 14:30:07| NUOVA SIDAP  S.R.L.|            Q8|Autostradale|  BREMBO SUD|OSIO SOPRA|45.63188736814508| 9.600096659217913|  14|2.0618571428571433|
|25613|     1|2.099|2022-07-06 14:30:45| NUOVA SIDAP  S.R.L.|            Q8|Autostradale|  BREMBO SUD|OSIO SOPRA|45.631887368

Determine the month within this combination ('fuel station type' and 'city') that had the lowest number of hourly records.


In [97]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, min

# Initialize a Spark session
spark = SparkSession.builder.appName("LowestHourlyRecords").getOrCreate()

# Assuming 'gasoline_df' and 'fuel_station_df' are your DataFrames

# Define the specific 'fuel station type' and 'city' you want to analyze
selected_fuel_station_type = 'Stradale'
selected_city = 'AGRIGENTO'  # Replace with the actual city name

# Filter 'fuel_station_df' for the specific combination
filtered_fuel_station_df = fuel_station_info_df.filter((col('Type') == selected_fuel_station_type) & (col('City') == selected_city))

# Join 'filtered_fuel_station_df' with 'gasoline_df' on the 'Id' column
joined_df = hourly_gasoline_df.join(filtered_fuel_station_df, on='Id', how='inner')

# Extract the month from the 'Date' column
joined_df = joined_df.withColumn('YearMonth', year(col('Date')) * 100 + month(col('Date')))

# Group by 'YearMonth' and find the minimum timestamp and minimum price within each group
min_date_price_per_month = joined_df.groupBy('YearMonth').agg(
    min('Date').cast('timestamp').alias('LowestRecordTimestamp'),
    min('Price').alias('LowestPrice')
)

# Find the month with the minimum timestamp and its corresponding lowest price
lowest_timestamp_month = min_date_price_per_month.orderBy('LowestRecordTimestamp').first()

# Display the result (if a lowest timestamp month is found)
if lowest_timestamp_month:
    print(f"The date with the lowest price for '{selected_fuel_station_type}' fuel stations in '{selected_city}' is: {lowest_timestamp_month['LowestRecordTimestamp']}")
    print(f"The lowest price for that month is: {lowest_timestamp_month['LowestPrice']}")
else:
    print(f"No data found for '{selected_fuel_station_type}' fuel stations in '{selected_city}'.")


23/09/05 13:12:08 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


The date with the lowest price for 'Stradale' fuel stations in 'AGRIGENTO' is: 2022-01-04 12:56:17
The lowest price for that month is: 1.608
