In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, substring, when, hour, month, dayofyear
from pyspark.sql.types import DoubleType, TimestampType

# Build the SparkSession
spark = SparkSession.builder \
    .appName("GlobalHourlyWeather") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

print(spark.version)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/21 16:57:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


4.0.1


In [2]:
# Update this path to where your unzipped '2024' folder is
data_path = "2024.tar.gz"

# Load all CSVs, using the first file's header
df = spark.read.csv(data_path, header=True, inferSchema=True)

# Let's see what we've got!
df.printSchema()
df.show(5, truncate=False)

[Stage 1:>                                                          (0 + 1) / 1]

root
 |-- 01001099999.csv                                                                                     0100644 0000000 0000000 00010620251 14760163177 011523  0                                                                                                    ustar 00                                                                0000000 0000000                                                                                                                                                                        "STATION": string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- SOURCE: string (nullable = true)
 |-- LATITUDE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)
 |-- ELEVATION: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- REPORT_TYPE: string (nullable = true)
 |-- CALL_SIGN: string (nullable = true)
 |-- QUALITY_CONTROL: string (nullable = true)
 |-- WND: string (nullable = true)
 |-- CIG: string (nullable = true)
 |-- VIS:

25/10/21 17:06:06 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


## Parse and clean the target variable

In [4]:
# ----- THIS IS THE FIX -----
# First, filter out any rows where TMP doesn't have a comma (or is null)
df_with_comma = df.where(col("TMP").contains(","))
# ---------------------------

# 1. Split TMP into its value and quality flag
# Now, we use df_with_comma, not df
df_parsed = df_with_comma.withColumn("tmp_parts", split(col("TMP"), ","))

# 2. Separate the value and flag into their own columns
# This is now safe because we know a comma exists
df_parsed = df_parsed.withColumn("tmp_value", col("tmp_parts")[0]) \
                     .withColumn("tmp_flag", col("tmp_parts")[1])

# 3. Filter out bad data *before* scaling
df_cleaned = df_parsed.where(
    (col("tmp_value") != "+9999") &
    (col("tmp_flag").isin(['1', '5']))
)

# 4. Cast to a number and apply the scaling factor (divide by 10)
df_with_temp = df_cleaned.withColumn(
    "temperature",
    col("tmp_value").cast(DoubleType()) / 10.0
)

# Let's check our work!
print("Original vs. Cleaned Temperature:")
df_with_temp.select("TMP", "temperature").show(10)

# See how many rows we kept
print(f"Total records after cleaning TMP: {df_with_temp.count()}")

Original vs. Cleaned Temperature:
+-------+-----------+
|    TMP|temperature|
+-------+-----------+
|-0070,1|       -7.0|
|-0065,1|       -6.5|
|-0065,1|       -6.5|
|-0064,1|       -6.4|
|-0070,1|       -7.0|
|-0057,1|       -5.7|
|-0052,1|       -5.2|
|-0057,1|       -5.7|
|-0047,1|       -4.7|
|-0042,1|       -4.2|
+-------+-----------+
only showing top 10 rows


[Stage 6:>                                                          (0 + 1) / 1]

Total records after cleaning TMP: 122341374


                                                                                

## Feature engineering

Based on that doc, your main goal is to parse DEW, SLP, and WND in the same way you parsed TMP. These columns are strong predictors of temperature.

Why These Features?

    DEW (Dew Point): This is the single most important predictor after time and location. The dew point is the temperature at which air becomes saturated. The difference between temperature and dew_point (called the "spread") directly relates to relative humidity, which is a massive factor in how much the temperature can change.

    SLP (Sea Level Pressure): High pressure and low pressure systems are what cause large-scale weather and temperature changes. This is a very strong feature.

    WND (Wind Observation): This column contains both wind direction and speed. Wind speed, in particular, has a strong effect on temperature (e.g., wind chill, mixing of air).

### Firstly, parse the date and cast the geographic column

In [6]:
# 1. Parse the DATE column
df_featured = df_with_temp.withColumn("timestamp", col("DATE").cast(TimestampType()))

# 2. Extract time-based features
df_featured = df_featured.withColumn("hour", hour(col("timestamp"))) \
                         .withColumn("month", month(col("timestamp"))) \
                         .withColumn("day_of_year", dayofyear(col("timestamp")))

# 3. Cast geographic features to numeric
df_featured = df_featured.withColumn("latitude", col("LATITUDE").cast(DoubleType())) \
                         .withColumn("longitude", col("LONGITUDE").cast(DoubleType())) \
                         .withColumn("elevation", col("ELEVATION").cast(DoubleType()))

# 4. (Recommended) Parse other weather features
# You must repeat the "Parse and Clean" logic for any other
# weather columns you want to use, like 'DEW' (Dew Point) or 'WND' (Wind).
# Check the dataset documentation for their specific flags and scaling factors.

# Let's check our new features
print("Schema after feature engineering:")
df_featured.printSchema()

print("\nSample of new features:")
df_featured.select("timestamp", "hour", "month", "latitude", "elevation").show(10)

Schema after feature engineering:
root
 |-- 01001099999.csv                                                                                     0100644 0000000 0000000 00010620251 14760163177 011523  0                                                                                                    ustar 00                                                                0000000 0000000                                                                                                                                                                        "STATION": string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- SOURCE: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- elevation: double (nullable = true)
 |-- NAME: string (nullable = true)
 |-- REPORT_TYPE: string (nullable = true)
 |-- CALL_SIGN: string (nullable = true)
 |-- QUALITY_CONTROL: string (nullable = true)
 |-- WND: string (nullable = true)
 |-- CIG: 

### Define the cleaning function

In [7]:
from pyspark.sql.functions import col, split
from pyspark.sql.types import DoubleType

def clean_weather_column(input_df, col_name, missing_code, quality_flags, scale_factor):
    """
    Cleans a NOAA weather column that has a 'value,flag' format.
    
    :param input_df: The DataFrame to transform
    :param col_name: The name of the raw column to clean (e.g., "DEW", "SLP")
    :param missing_code: The string code for missing values (e.g., "+9999", "99999")
    :param quality_flags: A list of good-quality flags to keep (e.g., ['1', '5'])
    :param scale_factor: The number to divide the value by (e.g., 10.0)
    :return: A new DataFrame with a clean column named '<col_name>_clean'
    """
    print(f"Cleaning column: {col_name}...")
    
    # 1. Filter out rows without a comma (like we did for TMP)
    df_with_comma = input_df.where(col(col_name).contains(","))

    # 2. Split into value and flag
    df_parsed = df_with_comma.withColumn(f"{col_name}_parts", split(col(col_name), ","))
    
    df_parsed = df_parsed.withColumn(f"{col_name}_value", col(f"{col_name}_parts")[0]) \
                         .withColumn(f"{col_name}_flag", col(f"{col_name}_parts")[1])

    # 3. Filter out bad data
    df_cleaned = df_parsed.where(
        (col(f"{col_name}_value") != missing_code) &
        (col(f"{col_name}_flag").isin(quality_flags))
    )
    
    # 4. Create the final scaled, numeric column
    clean_col_name = col_name.lower() + "_clean" # e.g., 'dew_clean'
    
    df_final = df_cleaned.withColumn(
        clean_col_name,
        col(f"{col_name}_value").cast(DoubleType()) / scale_factor
    )
    
    # 5. Drop the intermediate columns
    df_final = df_final.drop(col_name, f"{col_name}_parts", f"{col_name}_value", f"{col_name}_flag")
    
    return df_final

### Clean the feature DEW (Dew Point) and SLP (Sea Level Pressure)

In [8]:
# We start with df_featured from the previous step
df_final_features = df_featured

# Clean the DEW column
# Missing code: +9999, Quality flags: '1', '5', Scale: 10.0
df_final_features = clean_weather_column(
    input_df=df_final_features,
    col_name="DEW",
    missing_code="+9999",
    quality_flags=['1', '5'],
    scale_factor=10.0
)

# Clean the SLP column
# Missing code: 99999, Quality flags: '1', '5', Scale: 10.0
df_final_features = clean_weather_column(
    input_df=df_final_features,
    col_name="SLP",
    missing_code="99999",
    quality_flags=['1', '5'],
    scale_factor=10.0
)

# Check our new clean columns
df_final_features.select("temperature", "dew_clean", "slp_clean").show(10)

Cleaning column: DEW...
Cleaning column: SLP...
+-----------+---------+---------+
|temperature|dew_clean|slp_clean|
+-----------+---------+---------+
|       -7.0|    -13.0|   1020.8|
|       -6.5|    -12.4|   1020.4|
|       -6.5|    -11.3|   1020.5|
|       -6.4|    -10.5|   1020.2|
|       -7.0|    -10.6|   1020.0|
|       -5.7|     -9.9|   1019.6|
|       -5.2|     -9.5|   1019.6|
|       -5.7|     -9.9|   1019.5|
|       -4.7|     -8.6|   1019.2|
|       -4.2|     -7.7|   1018.7|
+-----------+---------+---------+
only showing top 10 rows


### Now for the WND column

The WND (Wind) column is more complex. It's not just one value and a flag. It's typically: Direction, DirectionQuality, Type, Speed, SpeedQuality
You'll need a separate, custom-parsing logic for this

In [9]:
# 1. Split WND into its 5 parts
wnd_parts = split(col("WND"), ",")

# 2. Get the Speed (index 3) and SpeedQuality (index 4)
df_wind = df_final_features.withColumn("wind_speed_raw", wnd_parts[3])
df_wind = df_wind.withColumn("wind_speed_flag", wnd_parts[4])

# 3. Clean and scale it
df_wind_cleaned = df_wind.where(
    (col("wind_speed_raw") != "9999") &
    (col("wind_speed_flag").isin(['1', '5']))
)

df_wind_final = df_wind_cleaned.withColumn(
    "wind_speed_clean",
    col("wind_speed_raw").cast(DoubleType()) / 10.0  # Assuming 10.0 scale, check docs!
)

# See the result
df_wind_final.select("WND", "wind_speed_clean").show(10)

+--------------+----------------+
|           WND|wind_speed_clean|
+--------------+----------------+
|318,1,N,0061,1|             6.1|
|330,1,N,0051,1|             5.1|
|348,1,N,0035,1|             3.5|
|357,1,N,0019,1|             1.9|
|241,1,N,0008,1|             0.8|
|076,1,N,0048,1|             4.8|
|084,1,N,0054,1|             5.4|
|040,1,N,0023,1|             2.3|
|098,1,N,0057,1|             5.7|
|086,1,N,0063,1|             6.3|
+--------------+----------------+
only showing top 10 rows


## Checking after cleaning and engineering

In [10]:
# Run this in a new cell
df_final_features.printSchema()

root
 |-- 01001099999.csv                                                                                     0100644 0000000 0000000 00010620251 14760163177 011523  0                                                                                                    ustar 00                                                                0000000 0000000                                                                                                                                                                        "STATION": string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- SOURCE: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- elevation: double (nullable = true)
 |-- NAME: string (nullable = true)
 |-- REPORT_TYPE: string (nullable = true)
 |-- CALL_SIGN: string (nullable = true)
 |-- QUALITY_CONTROL: string (nullable = true)
 |-- WND: string (nullable = true)
 |-- CIG: string (nullable = true)
 |-- VIS:

In [11]:
# Check summary statistics

# Define the list of all our final numeric features
numeric_cols = [
    "temperature", 
    "dew_clean", 
    "slp_clean", 
    "latitude", 
    "longitude", 
    "elevation", 
    "hour", 
    "month", 
    "day_of_year"
    # Add 'wind_speed_clean' if you also engineered that
]

# Get the statistics for just those columns
df_final_features.select(numeric_cols).describe().show()

[Stage 12:>                                                         (0 + 1) / 1]

+-------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+-----------------+------------------+
|summary|       temperature|         dew_clean|         slp_clean|          latitude|          longitude|         elevation|              hour|            month|       day_of_year|
+-------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+-----------------+------------------+
|  count|          49055736|          49055736|          49055736|          49055736|           49055736|          49055736|          49055736|         49055736|          49055736|
|   mean|12.831891946655992| 7.024021272452314|1014.4458740646248| 32.88926715142788|-14.876760894417405|281.07311849390624|11.364602316842214|6.493168464540008| 182.8215298410771|
| stddev|12.696444601219277|11.624523086464317| 9.060583091053223|29.368480707623316|   89.0963

                                                                                

In [12]:
# Checking for Null or Nan Value
from pyspark.sql.functions import count, when, isnan

# We'll use the same list of columns from before
cols_to_check = numeric_cols 

# This command counts nulls AND NaNs for each column
df_final_features.select([
    count(when(col(c).isNull() | isnan(c), c)).alias(c) 
    for c in cols_to_check
]).show()

[Stage 15:>                                                         (0 + 1) / 1]

+-----------+---------+---------+--------+---------+---------+----+-----+-----------+
|temperature|dew_clean|slp_clean|latitude|longitude|elevation|hour|month|day_of_year|
+-----------+---------+---------+--------+---------+---------+----+-----+-----------+
|          0|        0|        0|       0|        0|        0|   0|    0|          0|
+-----------+---------+---------+--------+---------+---------+----+-----+-----------+



                                                                                

In [13]:
# Spot-check the final data
df_final_features.select(numeric_cols).show(20)

+-----------+---------+---------+----------+----------+---------+----+-----+-----------+
|temperature|dew_clean|slp_clean|  latitude| longitude|elevation|hour|month|day_of_year|
+-----------+---------+---------+----------+----------+---------+----+-----+-----------+
|       -7.0|    -13.0|   1020.8|70.9333333|-8.6666667|      9.0|   0|    1|          1|
|       -6.5|    -12.4|   1020.4|70.9333333|-8.6666667|      9.0|   1|    1|          1|
|       -6.5|    -11.3|   1020.5|70.9333333|-8.6666667|      9.0|   2|    1|          1|
|       -6.4|    -10.5|   1020.2|70.9333333|-8.6666667|      9.0|   3|    1|          1|
|       -7.0|    -10.6|   1020.0|70.9333333|-8.6666667|      9.0|   4|    1|          1|
|       -5.7|     -9.9|   1019.6|70.9333333|-8.6666667|      9.0|   5|    1|          1|
|       -5.2|     -9.5|   1019.6|70.9333333|-8.6666667|      9.0|   6|    1|          1|
|       -5.7|     -9.9|   1019.5|70.9333333|-8.6666667|      9.0|   7|    1|          1|
|       -4.7|     -8.

### Final filter and pipeline prep

Look at the describe() output for elevation:

    min: -999.9

This -999.9 is a sentinel value, which is just another code for "missing data" that the documentation didn't explicitly state (it's a common one for this dataset). A real elevation of -999.9 meters is not physically possible (the lowest point on Earth is around -430m).

If we leave this in, it will completely break the StandardScaler and ruin our model.

Here's the code for the next cell. This cell will:

    Create our final, model-ready DataFrame (model_ready_df).

    Define the Spark Pipeline for VectorAssembler and StandardScaler.

    Split the data into training and test sets.

In [14]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

# 1. Define our final feature and label columns
feature_cols = [
    "latitude", 
    "longitude", 
    "elevation", 
    "dew_clean", 
    "slp_clean", 
    "hour", 
    "month", 
    "day_of_year"
    # Add 'wind_speed_clean' here if you also built it
]

label_col = "temperature"

# 2. Select only these columns AND filter out the bad elevation data
model_df = df_final_features \
    .select(*feature_cols, label_col) \
    .where(col("elevation") != -999.9)

print(f"Original count: {df_final_features.count()}")
print(f"Count after elevation filter: {model_df.count()}")

# 3. Create the VectorAssembler
# This combines all feature columns into one vector
assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

# 4. Create the StandardScaler
# This scales the "features" vector
scaler = StandardScaler(
    inputCol="features",
    outputCol="scaledFeatures",
    withStd=True,
    withMean=True
)

# 5. Define the preprocessing pipeline
preprocessing_pipeline = Pipeline(stages=[assembler, scaler])

# 6. "Fit" the pipeline to the data
# This learns the mean and stddev for the scaler
preprocessing_model = preprocessing_pipeline.fit(model_df)

# 7. "Transform" the data
# This applies the assembly and scaling
processed_df = preprocessing_model.transform(model_df)

# 8. Select the final columns for modeling and rename label
# We only need the scaled features and the label
final_data = processed_df.select(
    col("scaledFeatures").alias("features"),
    col(label_col).alias("label")
)

# 9. Split the dataset (70% train, 30% test)
(train_data, test_data) = final_data.randomSplit([0.7, 0.3], seed=42)

# 10. Cache the data (important for speed!)
train_data.cache()
test_data.cache()

print("\n--- Preprocessing Complete ---")
print(f"Training set count: {train_data.count()}")
print(f"Test set count: {test_data.count()}")
print("\nReady for modeling:")
train_data.show(5, truncate=False)

                                                                                

Original count: 49055736


                                                                                

Count after elevation filter: 49045972


                                                                                


--- Preprocessing Complete ---


25/10/21 18:12:50 WARN MemoryStore: Not enough space to cache rdd_90_0 in memory! (computed 1197.3 MiB so far)
25/10/21 18:12:50 WARN BlockManager: Persisting block rdd_90_0 to disk instead.
25/10/21 18:13:00 WARN MemoryStore: Not enough space to cache rdd_90_0 in memory! (computed 1807.1 MiB so far)
25/10/21 18:13:01 WARN MemoryStore: Not enough space to cache rdd_90_0 in memory! (computed 1807.1 MiB so far)
                                                                                

Training set count: 34334647


25/10/21 18:17:23 WARN MemoryStore: Not enough space to cache rdd_103_0 in memory! (computed 1197.3 MiB so far)
25/10/21 18:17:23 WARN BlockManager: Persisting block rdd_103_0 to disk instead.
                                                                                

Test set count: 14711325

Ready for modeling:


[Stage 36:>                                                         (0 + 1) / 1]

+-------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|features                                                                                                                                                     |label|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|[-3.9380883949570795,0.020745968315379146,4.117809924285261,-6.608937344385862,-0.3804191421648169,1.5469362208613393,0.14653691809229613,0.1432076101427129]|-61.0|
|[-3.9380883949570795,0.020745968315379146,4.117809924285261,-6.608937344385862,-0.3804191421648169,1.5469362208613393,0.14653691809229613,0.1432076101427129]|-60.9|
|[-3.9380883949570795,0.020745968315379146,4.117809924285261,-6.557319306480147,-0.4356014221947958,1.256023131560539,0.14653691809229613,0.1432076101427129] |-60.4|
|[-3

25/10/21 18:17:28 WARN MemoryStore: Not enough space to cache rdd_90_0 in memory! (computed 1807.1 MiB so far)
                                                                                