In [6]:
import requests
from datetime import date

# Get today's date in YYYY-MM-DD format
today_str = date.today().isoformat()

# URL to download
url = "https://www.shell.co.uk/fuel-prices-data.html"
response = requests.get(url)

# Create filename with today's date
filename = f"./data/shell/shell_fuel_prices_{today_str}.json"

# Write to file
with open(filename, "wb") as file:
    file.write(response.content)

print(f"Download complete: {filename}")


Download complete: ./data/shell/shell_fuel_prices_2025-04-14.json


In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Read Shell Fuel Prices JSON").getOrCreate()

25/04/14 10:25:57 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [9]:
from datetime import date
# Get today's date
today_str = date.today().isoformat()

# File path
json_path = f"data/shell/shell_fuel_prices_{today_str}.json"

# Read JSON file
df = spark.read.json(json_path)

df.show()

+-------------------+--------------------+
|       last_updated|            stations|
+-------------------+--------------------+
|13/04/2025 00:00:00|[{CARWIN RISE, SH...|
+-------------------+--------------------+



In [None]:
from pyspark.sql import functions as F

df_unpacked = df.select(
    "last_updated",
    F.explode("stations").alias("stations")
).select(
    F.col("last_updated"),
    F.col("stations")["site_id"].alias("site_id"),
    F.col("stations")["brand"].alias("brand"),
    F.col("stations")["address"].alias("address"),
    F.col("stations")["postcode"].alias("postcode"),
    F.col("stations")["location"]["latitude"].alias("lat"),
    F.col("stations")["location"]["longitude"].alias("lon"),
    F.col("stations")["prices"]["B7"].alias("B7"),
    F.col("stations")["prices"]["E10"].alias("E10")
)

df_unpacked.show()

+-------------------+------------+-----+--------------------+--------+---------+---------+-----+-----+
|       last_updated|     site_id|brand|             address|postcode|      lat|      lon|   B7|  E10|
+-------------------+------------+-----+--------------------+--------+---------+---------+-----+-----+
|13/04/2025 00:00:00|gbujt4qe0k18|SHELL|         CARWIN RISE|TR27 5DG|50.198443|-5.396334|149.9|141.9|
|13/04/2025 00:00:00|gbumu87qeccz|SHELL|       PLAYING PLACE| TR3 6HA|50.231992|-5.071057|143.9|138.9|
|13/04/2025 00:00:00|gbumvnm9yuyg|SHELL|       TREGOLLS ROAD| TR1 1PU|50.264151|-5.045859|143.9|138.9|
|13/04/2025 00:00:00|gbuqmtk5sfy5|SHELL|                 A30| TR8 5AY|50.346838|-5.026033|147.9|141.9|
|13/04/2025 00:00:00|gbuqv45v3fzr|SHELL|         HENVER ROAD| TR7 3EH|50.417177|-5.048476|143.9|136.9|
|13/04/2025 00:00:00|gbuqxe1kw2vj|SHELL|    KINGSLEY VILLAGE| TR9 6NA|50.378585|-4.941856|147.9|141.9|
|13/04/2025 00:00:00|gbuwf7snkt92|SHELL|A30 VICTORIA JUNC...|PL26 8UF|50.

In [2]:
from sedona.spark import *

config = (
    SedonaContext.builder()
    .config(
        "spark.jars.packages",
        "org.apache.sedona:sedona-spark-3.3_2.12:1.7.1,"
        "org.datasyslab:geotools-wrapper:1.7.1-28.5",
    )
    .config(
        "spark.jars.repositories",
        "https://artifacts.unidata.ucar.edu/repository/unidata-all",
    )
    .getOrCreate()
)
sedona = SedonaContext.create(config)
sc = sedona.sparkContext

25/04/14 10:20:53 WARN UDTRegistration: Cannot register UDT for org.geotools.coverage.grid.GridCoverage2D, which is already registered.
25/04/14 10:20:53 WARN SimpleFunctionRegistry: The function rs_union_aggr replaced a previously registered function.
25/04/14 10:20:53 WARN UDTRegistration: Cannot register UDT for org.locationtech.jts.geom.Geometry, which is already registered.
25/04/14 10:20:53 WARN UDTRegistration: Cannot register UDT for org.apache.sedona.common.geometryObjects.Geography, which is already registered.
25/04/14 10:20:53 WARN UDTRegistration: Cannot register UDT for org.locationtech.jts.index.SpatialIndex, which is already registered.
25/04/14 10:20:53 WARN SimpleFunctionRegistry: The function st_envelope_aggr replaced a previously registered function.
25/04/14 10:20:53 WARN SimpleFunctionRegistry: The function st_intersection_aggr replaced a previously registered function.
25/04/14 10:20:53 WARN SimpleFunctionRegistry: The function st_union_aggr replaced a previously

In [None]:
df_unpacked = df_unpacked.withColumn(
    "geom",
    F.expr("ST_Point(cast(lon as double), cast(lat as double))")
).withColumn("geom", F.col("geom").cast(GeometryType()))

df_unpacked.show()

In [24]:
from pyspark.sql import functions as F

# Define input point (longitude, latitude)
lon = -0.3346
lat = 51.5943

# Create a reference geometry point
point_wkt = f"POINT({lon} {lat})"
reference_point_df = spark.sql(f"SELECT ST_GeomFromText('{point_wkt}') AS ref_geom")
reference_point = reference_point_df.first()['ref_geom']

# Add distance column to df_geo using ST_Distance
df_with_distance = df_unpacked.withColumn(
    "distance_meters",
    F.expr(f"ST_Distance(geom, ST_GeomFromText('{point_wkt}'))")
)

# Sort and get the closest
closest_station = df_with_distance.orderBy("distance_meters").limit(1)

closest_station.select(
    "site_id", "brand", "postcode", "address", "lat", "lon", "distance_meters"
).show(truncate=False)


+------------+-----+--------+-----------------+--------+---------+--------------------+
|site_id     |brand|postcode|address          |lat     |lon      |distance_meters     |
+------------+-----+--------+-----------------+--------+---------+--------------------+
|gcpv258c1pxf|SHELL|HA2 0EG |140 NORTHOLT ROAD|51.56729|-0.350479|0.031331816752302157|
+------------+-----+--------+-----------------+--------+---------+--------------------+

