In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, substring, when, lit
from pyspark.sql.types import DoubleType, IntegerType

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("NOAA Snow Data Processing") \
    .config("spark.hadoop.fs.s3a.connection.maximum", "200") \
    .config("spark.hadoop.fs.s3a.threads.max", "200") \
    .config("spark.hadoop.fs.s3a.multipart.size", "104857600") \
    .config("spark.hadoop.fs.s3a.fast.upload", "true") \
    .config("spark.speculation", "true") \
    .getOrCreate()

# Define S3 path
s3_path = "s3a://noaa-isd-pds/data/2025/"
#s3_path = "s3a://noaa-isd-pds/data/2025/010010-99999-2025.gz"

# Helper function for substring extraction
def extract_field(value_col, marker, offset, length):
    """
    Extracts a substring from the value column based on a marker, offset, and length.

    Args:
        value_col (str): The column name or expression to extract from.
        marker (str): The string marker to locate within the value.
        offset (int): The position offset from the marker.
        length (int): The length of the substring to extract.

    Returns:
        Column: A PySpark Column object with the extracted substring.
    """
    return expr(f"substring({value_col}, instr({value_col}, '{marker}') + {offset}, {length})")

# Read and process the data in a single pipeline
df = spark.read.text(s3_path).filter(
    col("value").rlike("AJ1|AK1|AL1|AL2|AL3|AL4|AM1|AN1")
).select(
    # Mandatory fields
    substring(col("value"), 5, 6).alias("weather_station_usaf_id"),
    substring(col("value"), 11, 5).alias("weather_station_wban_id"),
    substring(col("value"), 16, 8).alias("observation_date"),
    substring(col("value"), 24, 4).alias("observation_time"),
    (substring(col("value"), 29, 6).cast(DoubleType()) / 1000).alias("latitude_dec"),
    (substring(col("value"), 35, 7).cast(DoubleType()) / 1000).alias("longitude_dec"),
    substring(col("value"), 47, 5).cast(IntegerType()).alias("elevation"),

    # Snow-related fields
    when(col("value").contains("AJ1"), lit("AJ1"))
    .when(col("value").contains("AK1"), lit("AK1"))
    .when(col("value").contains("AL1"), lit("AL1"))
    .when(col("value").contains("AL2"), lit("AL2"))
    .when(col("value").contains("AL3"), lit("AL3"))
    .when(col("value").contains("AL4"), lit("AL4"))
    .when(col("value").contains("AM1"), lit("AM1"))
    .when(col("value").contains("AN1"), lit("AN1"))
    .alias("identifier"),

    # AJ1 Fields
    extract_field("value", "AJ1", 3, 4).alias("snow_depth"),
    extract_field("value", "AJ1", 7, 1).alias("snow_condition_code"),
    extract_field("value", "AJ1", 8, 1).alias("snow_quality_code"),
    extract_field("value", "AJ1", 9, 6).alias("equiv_water_depth"),
    extract_field("value", "AJ1", 15, 1).alias("equiv_water_condition_code"),
    extract_field("value", "AJ1", 16, 1).alias("equiv_water_quality_code"),

    # AK1 Fields
    extract_field("value", "AK1", 3, 4).alias("greatest_snow_depth"),
    extract_field("value", "AK1", 7, 1).alias("greatest_condition_code"),
    extract_field("value", "AK1", 8, 6).alias("greatest_dates_of_occurrence"),
    extract_field("value", "AK1", 14, 1).alias("greatest_quality_code"),

    # AL1 to AL4 Fields
    extract_field("value", "AL1", 3, 2).alias("al1_accumulation_period_quantity"),
    extract_field("value", "AL1", 5, 3).alias("al1_accumulation_depth"),
    extract_field("value", "AL1", 8, 1).alias("al1_accumulation_condition_code"),
    extract_field("value", "AL1", 9, 1).alias("al1_accumulation_quality_code"),
    extract_field("value", "AL2", 3, 2).alias("al2_accumulation_period_quantity"),
    extract_field("value", "AL2", 5, 3).alias("al2_accumulation_depth"),
    extract_field("value", "AL2", 8, 1).alias("al2_accumulation_condition_code"),
    extract_field("value", "AL2", 9, 1).alias("al2_accumulation_quality_code"),
    extract_field("value", "AL3", 3, 2).alias("al3_accumulation_period_quantity"),
    extract_field("value", "AL3", 5, 3).alias("al3_accumulation_depth"),
    extract_field("value", "AL3", 8, 1).alias("al3_accumulation_condition_code"),
    extract_field("value", "AL3", 9, 1).alias("al3_accumulation_quality_code"),
    extract_field("value", "AL4", 3, 2).alias("al4_accumulation_period_quantity"),
    extract_field("value", "AL4", 5, 3).alias("al4_accumulation_depth"),
    extract_field("value", "AL4", 8, 1).alias("al4_accumulation_condition_code"),
    extract_field("value", "AL4", 9, 1).alias("al4_accumulation_quality_code"),

    # AM1 Fields
    extract_field("value", "AM1", 3, 4).alias("greatest_accumulation_depth"),
    extract_field("value", "AM1", 7, 1).alias("am1_accumulation_condition_code"),
    extract_field("value", "AM1", 8, 4).alias("am1_dates_of_occurrence_1"),
    extract_field("value", "AM1", 12, 4).alias("am1_dates_of_occurrence_2"),
    extract_field("value", "AM1", 16, 4).alias("am1_dates_of_occurrence_3"),
    extract_field("value", "AM1", 20, 1).alias("am1_accumulation_quality_code"),

    # AN1 Fields
    extract_field("value", "AN1", 3, 3).alias("monthly_period_quantity"),
    extract_field("value", "AN1", 6, 4).alias("monthly_accumulation_depth"),
    extract_field("value", "AN1", 10, 1).alias("monthly_condition_code"),
    extract_field("value", "AN1", 11, 1).alias("monthly_quality_code")
).select(
    # Cast all fields to strings for consistency
    *(col(field).cast("string").alias(field) for field in [
        "weather_station_usaf_id", "weather_station_wban_id", "observation_date", "observation_time",
        "latitude_dec", "longitude_dec", "elevation", "identifier", "snow_depth", "snow_condition_code",
        "snow_quality_code", "equiv_water_depth", "equiv_water_condition_code", "equiv_water_quality_code",
        "greatest_snow_depth", "greatest_condition_code", "greatest_dates_of_occurrence", "greatest_quality_code",
        "al1_accumulation_period_quantity", "al1_accumulation_depth", "al1_accumulation_condition_code",
        "al1_accumulation_quality_code", "al2_accumulation_period_quantity", "al2_accumulation_depth",
        "al2_accumulation_condition_code", "al2_accumulation_quality_code", "al3_accumulation_period_quantity",
        "al3_accumulation_depth", "al3_accumulation_condition_code", "al3_accumulation_quality_code",
        "al4_accumulation_period_quantity", "al4_accumulation_depth", "al4_accumulation_condition_code",
        "al4_accumulation_quality_code", "greatest_accumulation_depth", "am1_accumulation_condition_code",
        "am1_dates_of_occurrence_1", "am1_dates_of_occurrence_2", "am1_dates_of_occurrence_3",
        "am1_accumulation_quality_code", "monthly_period_quantity", "monthly_accumulation_depth",
        "monthly_condition_code", "monthly_quality_code"
    ])
)

# Register the DataFrame as a temporary view in Spark
df.createOrReplaceTempView("df_temp")

# Create or replace a Delta table in the Databricks metastore
spark.sql("""
CREATE OR REPLACE TABLE df_delta
USING DELTA
AS SELECT * FROM df_temp
""")





Out[1]: DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
# Optimize Delta table
spark.sql("OPTIMIZE df_delta ZORDER BY (observation_date, weather_station_usaf_id)")


# Confirm data in Delta table
spark.sql("SELECT * FROM df_delta LIMIT 10").show()

+-----------------------+-----------------------+----------------+----------------+------------+-------------+---------+----------+----------+-------------------+-----------------+-----------------+--------------------------+------------------------+-------------------+-----------------------+----------------------------+---------------------+--------------------------------+----------------------+-------------------------------+-----------------------------+--------------------------------+----------------------+-------------------------------+-----------------------------+--------------------------------+----------------------+-------------------------------+-----------------------------+--------------------------------+----------------------+-------------------------------+-----------------------------+---------------------------+-------------------------------+-------------------------+-------------------------+-------------------------+-----------------------------+---------------

In [0]:
%sql
select * from df_delta limit 10


weather_station_usaf_id,weather_station_wban_id,observation_date,observation_time,latitude_dec,longitude_dec,elevation,identifier,snow_depth,snow_condition_code,snow_quality_code,equiv_water_depth,equiv_water_condition_code,equiv_water_quality_code,greatest_snow_depth,greatest_condition_code,greatest_dates_of_occurrence,greatest_quality_code,al1_accumulation_period_quantity,al1_accumulation_depth,al1_accumulation_condition_code,al1_accumulation_quality_code,al2_accumulation_period_quantity,al2_accumulation_depth,al2_accumulation_condition_code,al2_accumulation_quality_code,al3_accumulation_period_quantity,al3_accumulation_depth,al3_accumulation_condition_code,al3_accumulation_quality_code,al4_accumulation_period_quantity,al4_accumulation_depth,al4_accumulation_condition_code,al4_accumulation_quality_code,greatest_accumulation_depth,am1_accumulation_condition_code,am1_dates_of_occurrence_1,am1_dates_of_occurrence_2,am1_dates_of_occurrence_3,am1_accumulation_quality_code,monthly_period_quantity,monthly_accumulation_depth,monthly_condition_code,monthly_quality_code
27870,99999,20250101,0,63.083,24.267,171,AJ1,17,3,1,1700,9,9,9502,7,870999,9,95,27,8,7,95,27,8,7,95,27,8,7,95,27,8,7,9502,7,8709,9999,2025,0,950,2787,0,9
27870,99999,20250101,100,63.083,24.267,171,AJ1,17,3,1,1700,9,9,9502,7,870999,9,95,27,8,7,95,27,8,7,95,27,8,7,95,27,8,7,9502,7,8709,9999,2025,0,950,2787,0,9
27870,99999,20250101,200,63.083,24.267,171,AJ1,17,3,1,1700,9,9,9502,7,870999,9,95,27,8,7,95,27,8,7,95,27,8,7,95,27,8,7,9502,7,8709,9999,2025,0,950,2787,0,9
27870,99999,20250101,300,63.083,24.267,171,AJ1,17,3,1,1700,9,9,9502,7,870999,9,95,27,8,7,95,27,8,7,95,27,8,7,95,27,8,7,9502,7,8709,9999,2025,0,950,2787,0,9
27870,99999,20250101,400,63.083,24.267,171,AJ1,17,3,1,1700,9,9,102,7,870999,9,1,27,8,7,1,27,8,7,1,27,8,7,1,27,8,7,102,7,8709,9999,2025,0,10,2787,0,9
27870,99999,20250101,500,63.083,24.267,171,AJ1,17,3,1,1700,9,9,9502,7,870999,9,95,27,8,7,95,27,8,7,95,27,8,7,95,27,8,7,9502,7,8709,9999,2025,0,950,2787,0,9
27870,99999,20250101,600,63.083,24.267,171,AJ1,17,3,1,1700,9,9,5602,7,870999,9,56,27,8,7,56,27,8,7,56,27,8,7,56,27,8,7,5602,7,8709,9999,2025,0,560,2787,0,9
27870,99999,20250101,700,63.083,24.267,171,AJ1,17,3,1,1700,9,9,102,7,870999,9,1,27,8,7,1,27,8,7,1,27,8,7,1,27,8,7,102,7,8709,9999,2025,0,10,2787,0,9
27870,99999,20250101,800,63.083,24.267,171,AJ1,17,3,1,1700,9,9,2302,7,870999,9,23,27,8,7,23,27,8,7,23,27,8,7,23,27,8,7,2302,7,8709,9999,2025,0,230,2787,0,9
27870,99999,20250101,900,63.083,24.267,171,AJ1,17,3,1,1700,9,9,102,7,870999,9,1,27,8,7,1,27,8,7,1,27,8,7,1,27,8,7,102,7,8709,9999,2025,0,10,2787,0,9


In [0]:
%sql
select distinct latitude_dec, longitude_dec
from df_delta


latitude_dec,longitude_dec
49.217,4.15
73.217,-119.533
56.65,90.55
42.9,133.9
46.4,96.25
59.967,30.3
49.8,19.0
46.667,-60.4
44.633,-63.517
47.333,28.083


In [0]:
# Query the dataset for the first 14 days of 2025
filtered_df = spark.sql("""
    SELECT 
        latitude_dec, 
        longitude_dec, 
        snow_depth, 
        observation_date
    FROM 
        df_delta
    WHERE 
        observation_date BETWEEN '20250101' AND '20250114'
        AND snow_depth <> 9999
""")

# Convert to Pandas DataFrame for visualization
pandas_df = filtered_df.toPandas()

# Ensure snow_depth and observation_date are correctly formatted
pandas_df['snow_depth'] = pandas_df['snow_depth'].astype(float)
pandas_df['observation_date'] = pandas_df['observation_date'].astype(str)

# Ensure latitude and longitude are float values
pandas_df['latitude_dec'] = pandas_df['latitude_dec'].astype(float)
pandas_df['longitude_dec'] = pandas_df['longitude_dec'].astype(float)



In [0]:
filtered_df.createOrReplaceTempView("filtered_df_temp")


In [0]:
%sql
select distinct * from filtered_df_temp

latitude_dec,longitude_dec,snow_depth,observation_date
69.75,27.0,36,20250107
67.75,29.617,49,20250106
67.367,26.633,34,20250105
64.667,28.05,35,20250111
67.65,24.9,45,20250110
67.167,29.183,45,20250109
61.833,22.467,23,20250107
64.05,24.717,24,20250106
61.2,26.05,20,20250110
60.467,23.65,10,20250106


In [0]:
%pip install plotly


Python interpreter will be restarted.
Python interpreter will be restarted.


In [0]:
import plotly.express as px

# Create the map with Plotly
fig = px.scatter_geo(
    pandas_df,  # Data source
    lat="latitude_dec",  # Latitude column
    lon="longitude_dec",  # Longitude column
    title="Geographic Distribution of Points",  # Title of the map
    projection="natural earth"  # Map projection
)

# Display the map
fig.show()


In [0]:
# Calculate dynamic center based on mean latitude and longitude
center_lat = pandas_df['latitude_dec'].mean()
center_lon = pandas_df['longitude_dec'].mean()

print(f"Center Latitude: {center_lat}, Center Longitude: {center_lon}")


Center Latitude: 55.59815540885097, Center Longitude: 18.750928594070867


In [0]:
import plotly.express as px

# Create the animated density map
fig = px.density_mapbox(
    pandas_df,
    lat="latitude_dec",
    lon="longitude_dec",
    z="snow_depth",
    radius=10,  # Adjust radius for smoother heatmaps
    animation_frame="observation_date",  # Timeline animation based on date
    center={"lat": center_lat, "lon": center_lon},  # Dynamic centering
    zoom=2,  # Global view
    mapbox_style="carto-positron",
    title="Snow Depth Timeline - First 14 Days of 2025"
)

# Show the interactive map
fig.show()
