## **Data Format**
* Data is taken from ERA5 website from 2013 to 2025. The data available is not in csv, json format it is in grib format
* GRIB is a machine-optimized format designed to efficiently store huge, multi-dimensional weather datasets such as temperature, wind, pressure, humidity, rainfall, etc., over latitude–longitude grids, multiple heights, and many time steps.

## **Data Preparation**
* The dataset initially contained 7 columns, out of which number, step, surface, and valid_time were not relevant for the analysis and were therefore removed.
* Temperature values were converted from Kelvin to Celsius.
* To optimize memory usage, latitude and longitude were converted from float64 to float32, and temperature was converted from float32 to float16.

## **Creating Unique IDS**
* In the context of a time series model, an ID typically refers to an identifier that distinguishes different time series within the same dataset or model, especially when working with panel data or multiple time series simultaneously.
* Latitude and longitude pairs can serve as potential IDs. To validate this, the number of unique (latitude, longitude) pairs was computed, resulting in 14,625 unique pairs.
* Each latitude–longitude pair contains 8,760 data points, corresponding to hourly observations over one year.

In [1]:
import numpy as np
import pandas as pd
import xarray as xr
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
import gc
from pathlib import Path
warnings.filterwarnings('ignore')

In [2]:
def load_grib_dataset(file_path):
    print(f"Loading GRIB file: {file_path}")
    ds = xr.open_dataset(
        file_path,
        engine="cfgrib",
        backend_kwargs={"indexpath": ""}
    )
    return ds

In [3]:
def is_year_processed(year, base_name="weather_data"):
    year_dir = Path(f"{year}_{base_name}")
    if not year_dir.exists():
        return False

    parquet_files = list(year_dir.glob("weather_part_*.parquet"))
    return len(parquet_files) > 0

In [4]:
def extract_year_dataframe_lazy(ds, year):
    print(f"Extracting {year} safely...")
    ds_year = ds.sel(time=slice(f"{year}-01-01", f"{year}-12-31"))

    df = ds_year.to_dataframe().reset_index()

    return df

In [5]:
def clean_weather_dataframe(df):
    df = df.drop(columns=["number", "step", "surface", "valid_time"], errors="ignore")
    df = df.rename(columns={"t2m": "temperature"})

    df["temperature"] = (df["temperature"] - 273.15).astype("float16")
    df["latitude"] = df["latitude"].astype("float32")
    df["longitude"] = df["longitude"].astype("float32")

    return df

In [6]:
def create_series_id(df, batch_size=1_000_000):
    df["series_id"] = None
    n = len(df)

    for start in range(0, n, batch_size):
        end = min(start + batch_size, n)

        lat = df.iloc[start:end]["latitude"].round(2).astype("float32")
        lon = df.iloc[start:end]["longitude"].round(2).astype("float32")

        df.iloc[start:end, df.columns.get_loc("series_id")] = (
            "<" + lat.astype(str) + ", " + lon.astype(str) + ">"
        ).values

    return df

In [7]:
def save_series_chunks_to_parquet(df, year, n_files=5):
    output_dir = Path(f"{year}_weather_data")
    output_dir.mkdir(exist_ok=True)

    series_ids = df["series_id"].unique()
    series_id_chunks = np.array_split(series_ids, n_files)

    for idx, chunk_ids in enumerate(series_id_chunks, 1):
        chunk_df = df[df["series_id"].isin(chunk_ids)]
        chunk_df.to_parquet(output_dir / f"weather_part_{idx}.parquet", index=False)

        del chunk_df
        gc.collect()

    print(f"Saved parquet files for {year}")

In [8]:
def process_single_year(file_path, year):
    if is_year_processed(year):
        print(f"Skipping {year} — already processed.")
        return

    print(f"\nProcessing year {year}")

    ds = load_grib_dataset(file_path)
    df = extract_year_dataframe_lazy(ds, year)

    del ds
    gc.collect()

    df = clean_weather_dataframe(df)
    df = create_series_id(df)
    df = df.drop(columns=["latitude", "longitude"])

    save_series_chunks_to_parquet(df, year)

    del df
    gc.collect()

    print(f"Finished processing {year}\n")

In [9]:
def process_multiple_years(file_path, years):
    print("Starting multi-year processing...\n")

    for year in years:
        process_single_year(file_path, year)

    print("\nAll requested years checked/processed successfully.")

In [None]:
process_multiple_years(
    file_path="data.grib",
    years=[2013,2014,2015,2016,2017,2018,2019,2020,2021,2022,2023,2024,2025]
)

In [11]:
def build_series_id_index_for_year(year, base_name="weather_data"):
    year_dir = Path(f"{year}_{base_name}")

    parquet_files = sorted(year_dir.glob("weather_part_*.parquet"))

    if not parquet_files:
        print(f"No parquet files found for {year}")
        return

    index_records = []

    print(f"\nBuilding series_id index for {year}...")

    for file_path in parquet_files:
        print(f"Scanning {file_path.name}")

        df = pd.read_parquet(file_path, columns=["series_id"])
        unique_ids = df["series_id"].unique()

        index_records.extend(
            {"series_id": sid, "file_name": file_path.name}
            for sid in unique_ids
        )

        del df

    index_df = pd.DataFrame(index_records)

    index_path = year_dir / "series_id_index.parquet"
    index_df.to_parquet(index_path, index=False)

    print(f"Index saved: {index_path}")

In [12]:
def build_indices_for_years(years):
    for year in years:
        build_series_id_index_for_year(year)

In [13]:
build_indices_for_years([2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024, 2025])


Building series_id index for 2013...
Scanning weather_part_1.parquet
Scanning weather_part_2.parquet
Scanning weather_part_3.parquet
Scanning weather_part_4.parquet
Scanning weather_part_5.parquet
Index saved: 2013_weather_data\series_id_index.parquet

Building series_id index for 2014...
Scanning weather_part_1.parquet
Scanning weather_part_2.parquet
Scanning weather_part_3.parquet
Scanning weather_part_4.parquet
Scanning weather_part_5.parquet
Index saved: 2014_weather_data\series_id_index.parquet

Building series_id index for 2015...
Scanning weather_part_1.parquet
Scanning weather_part_2.parquet
Scanning weather_part_3.parquet
Scanning weather_part_4.parquet
Scanning weather_part_5.parquet
Index saved: 2015_weather_data\series_id_index.parquet

Building series_id index for 2016...
Scanning weather_part_1.parquet
Scanning weather_part_2.parquet
Scanning weather_part_3.parquet
Scanning weather_part_4.parquet
Scanning weather_part_5.parquet
Index saved: 2016_weather_data\series_id_in

In [14]:
def lookup_series_file(year, series_id, base_name="weather_data"):
    index_path = Path(f"{year}_{base_name}") / "series_id_index.parquet"

    if not index_path.exists():
        raise FileNotFoundError(f"Index not found for year {year}. Build it first.")

    index_df = pd.read_parquet(index_path)

    match = index_df[index_df["series_id"] == series_id]

    if match.empty:
        print(f"Series ID {series_id} not found for {year}")
        return None

    file_name = match.iloc[0]["file_name"]
    print(f"Load this file: {file_name}")
    return file_name

## **Anomalies Data preparation**
* Points that do not fall within the IQR range are considered anomalies.
* The number of anomalies is very small; the percentage of non-anomalous points is significantly higher.
* series_anomaly_summary is a nested dictionary that tracks, for each series_id, the number of anomalies in each respective year.
* We restricted the analysis to series_ids that are present across all years, as some series appear in only one year and are missing in others.

In [2]:
def extract_iqr_anomalies(group, temp_col, time_col):
    q1 = group[temp_col].quantile(0.25)
    q3 = group[temp_col].quantile(0.75)
    iqr = q3 - q1

    lower = q1 - 1.5 * iqr
    upper = q3 + 1.5 * iqr

    mask = (group[temp_col] < lower) | (group[temp_col] > upper)

    return pd.Series({
        "anomaly_count": int(mask.sum()),
        "anomaly_times": group.loc[mask, time_col].tolist(),
        "anomaly_values": group.loc[mask, temp_col].tolist()
    })

In [3]:
def process_weather_file_for_anomalies(
    file_path,
    temp_col="temperature",
    time_col="time",
    series_col="series_id"
):
    print(f"Processing: {file_path.name}")

    df = pd.read_parquet(file_path)
    df[time_col] = pd.to_datetime(df[time_col])

    df["year"] = df[time_col].dt.year
    df["month"] = df[time_col].dt.month

    result = (
        df
        .groupby(series_col, sort=False)
        .apply(extract_iqr_anomalies, temp_col=temp_col, time_col=time_col)
        .reset_index()
    )

    return result

In [4]:
def process_year_for_anomalies(
    year,
    base_dir=".",
    file_pattern="weather_part_*.parquet",
    temp_col="temperature",
    time_col="time",
    series_col="series_id"
):
    data_dir = Path(base_dir) / f"{year}_weather_data"
    anomaly_file = data_dir / f"anomalies_{year}.parquet"

    # Skip if year folder missing
    if not data_dir.exists():
        print(f"Skipping {year} — folder not found.")
        return None

    if anomaly_file.exists():
        print(f"Skipping {year} — anomalies file already exists.")
        return None

    print(f"\nProcessing anomalies for {year} in {data_dir}")

    all_files = sorted(data_dir.glob(file_pattern))
    if not all_files:
        print(f"Skipping {year} — no parquet files found.")
        return None

    all_results = []

    for file_path in all_files:
        file_result = process_weather_file_for_anomalies(
            file_path,
            temp_col=temp_col,
            time_col=time_col,
            series_col=series_col
        )
        all_results.append(file_result)

    final_df = pd.concat(all_results, ignore_index=True)

    # Save result
    final_df.to_parquet(anomaly_file, index=False)

    print(f"Saved anomalies file: {anomaly_file}")
    print(f"Finished {year}. Rows: {len(final_df)}")

    return final_df

In [5]:
def run_iqr_anomaly_pipeline_for_years(years, base_dir="."):
    combined_results = []

    for year in years:
        year_result = process_year_for_anomalies(year, base_dir=base_dir)

        if year_result is not None:
            combined_results.append(year_result)

    if not combined_results:
        print("No anomaly data generated.")
        return None

    final_df = pd.concat(combined_results, ignore_index=True)

    print(f"\nMulti-year anomaly detection complete. Total rows: {len(final_df)}")

    return final_df

In [29]:
all_anomalies = run_iqr_anomaly_pipeline_for_years(range(2013, 2026))


Processing anomalies for 2013 in 2013_weather_data
Processing: weather_part_1.parquet
Processing: weather_part_2.parquet
Processing: weather_part_3.parquet
Processing: weather_part_4.parquet
Processing: weather_part_5.parquet
Saved anomalies file: 2013_weather_data\anomalies_2013.parquet
Finished 2013. Rows: 14625

Processing anomalies for 2014 in 2014_weather_data
Processing: weather_part_1.parquet
Processing: weather_part_2.parquet
Processing: weather_part_3.parquet
Processing: weather_part_4.parquet
Processing: weather_part_5.parquet
Saved anomalies file: 2014_weather_data\anomalies_2014.parquet
Finished 2014. Rows: 14625

Processing anomalies for 2015 in 2015_weather_data
Processing: weather_part_1.parquet
Processing: weather_part_2.parquet
Processing: weather_part_3.parquet
Processing: weather_part_4.parquet
Processing: weather_part_5.parquet
Saved anomalies file: 2015_weather_data\anomalies_2015.parquet
Finished 2015. Rows: 14625

Processing anomalies for 2016 in 2016_weather_da

In [30]:
anomalies_2013 = pd.read_parquet('2013_weather_data/anomalies_2013.parquet')
anomalies_2013.head()

Unnamed: 0,series_id,anomaly_count,anomaly_times,anomaly_values
0,"<37.0, 68.0>",0,[],[]
1,"<37.0, 68.25>",0,[],[]
2,"<37.0, 68.5>",0,[],[]
3,"<37.0, 68.75>",0,[],[]
4,"<37.0, 69.0>",0,[],[]


In [31]:
anomalies_2013[anomalies_2013['anomaly_count'] > 0]

Unnamed: 0,series_id,anomaly_count,anomaly_times,anomaly_values
36,"<37.0, 77.0>",8,"[2013-01-23T22:00:00.000000, 2013-02-06T23:00:...","[-30.859375, -30.71875, -31.046875, -31.015625..."
68,"<37.0, 85.0>",1,[2013-01-06T00:00:00.000000],[-37.0]
69,"<37.0, 85.25>",4,"[2013-01-05T23:00:00.000000, 2013-01-06T00:00:...","[-39.15625, -38.625, -38.625, -39.125]"
74,"<37.0, 86.5>",8,"[2013-01-05T23:00:00.000000, 2013-01-06T00:00:...","[-41.75, -41.59375, -41.5625, -41.78125, -42.0..."
153,"<36.75, 77.0>",3,"[2013-02-07T03:00:00.000000, 2013-02-08T01:00:...","[-37.15625, -36.6875, -37.15625]"
...,...,...,...,...
14604,"<6.0, 92.0>",5,"[2013-04-30T10:00:00.000000, 2013-04-30T11:00:...","[29.9375, 30.0, 29.984375, 29.984375, 29.90625]"
14605,"<6.0, 92.25>",4,"[2013-04-30T10:00:00.000000, 2013-04-30T11:00:...","[29.921875, 29.984375, 30.0625, 30.015625]"
14606,"<6.0, 92.5>",6,"[2013-04-29T14:00:00.000000, 2013-04-29T16:00:...","[29.90625, 29.984375, 29.90625, 30.015625, 30...."
14607,"<6.0, 92.75>",3,"[2013-04-29T14:00:00.000000, 2013-04-29T15:00:...","[29.90625, 29.90625, 29.953125]"


In [2]:
## Run this code once when you get all the years anomalies data currently i am having data till 2020
from collections import defaultdict

def load_year_anomaly_counts(year, base_dir="."):
    year_dir = Path(base_dir) / f"{year}_weather_data"
    anomaly_file = year_dir / f"anomalies_{year}.parquet"

    if not anomaly_file.exists():
        print(f"Skipping {year} — anomaly file not found.")
        return None

    print(f"Reading anomalies for {year}")

    # Load only needed columns
    df = pd.read_parquet(anomaly_file, columns=["series_id", "anomaly_count"])

    df = df[df["anomaly_count"] > 0]

    if df.empty:
        print(f"No anomalies recorded for {year}")
        del df
        gc.collect()
        return None

    # Aggregate per series_id
    year_counts = df.groupby("series_id")["anomaly_count"].sum().to_dict()

    del df
    gc.collect()

    return year_counts

In [3]:
def build_series_anomaly_dictionary(years, base_dir="."):
    series_anomaly_dict = defaultdict(dict)

    for year in years:
        year_counts = load_year_anomaly_counts(year, base_dir=base_dir)

        if year_counts is None:
            continue

        for series_id, count in year_counts.items():
            series_anomaly_dict[series_id][year] = int(count)

        gc.collect()

    print("Finished building anomaly dictionary")
    return dict(series_anomaly_dict)

In [4]:
series_anomaly_summary = build_series_anomaly_dictionary(range(2013, 2026))

Reading anomalies for 2013
Reading anomalies for 2014
Reading anomalies for 2015
Reading anomalies for 2016
Reading anomalies for 2017
Reading anomalies for 2018
Reading anomalies for 2019
Skipping 2020 — anomaly file not found.
Skipping 2021 — anomaly file not found.
Skipping 2022 — anomaly file not found.
Skipping 2023 — anomaly file not found.
Skipping 2024 — anomaly file not found.
Skipping 2025 — anomaly file not found.
Finished building anomaly dictionary


In [5]:
len(series_anomaly_summary)

11385

In [5]:
series_id_fluctuations = []
for k,v in series_anomaly_summary.items():
    if len(v.keys()) == 7:
        series_id_fluctuations.append(k)
print(f'Series ids which are anomalies in all 7 years {len(series_id_fluctuations)}')

Series ids which are anomalies in all 7 years 5278


In [6]:
from pathlib import Path
import pandas as pd

def get_series_file_for_year(year, series_id, base_dir="."):
    index_path = Path(base_dir) / f"{year}_weather_data" / "series_id_index.parquet"

    if not index_path.exists():
        return None

    idx_df = pd.read_parquet(index_path)
    match = idx_df[idx_df["series_id"] == series_id]

    if match.empty:
        return None

    return match.iloc[0]["file_name"]

In [20]:
from pathlib import Path
import pandas as pd
import gc

def save_filtered_series_dataset_chunked(series_id_list, years, base_dir=".", output_dir="filtered_anomaly_dataset"):
    output_path = Path(output_dir)
    output_path.mkdir(exist_ok=True)

    for year in years:
        print(f"\nProcessing year {year}")
        year_dir = Path(base_dir) / f"{year}_weather_data"

        if not year_dir.exists():
            print("  Folder missing, skipping")
            continue

        relevant_series = set(series_id_list)
        parquet_files = sorted(year_dir.glob("weather_part_*.parquet"))

        for file_path in parquet_files:
            print(f"  Reading file {file_path.name}")
            df = pd.read_parquet(file_path, columns=["series_id", "temperature"])

            df = df[df["series_id"].isin(relevant_series)]
            if df.empty:
                continue

            # --- GROUPED IQR ---
            def label_group(group):
                q1 = group["temperature"].quantile(0.25)
                q3 = group["temperature"].quantile(0.75)
                iqr = q3 - q1
                lower = q1 - 1.5 * iqr
                upper = q3 + 1.5 * iqr
                group["anomaly_label"] = ((group["temperature"] < lower) | (group["temperature"] > upper)).astype("int8")
                return group[["series_id", "temperature", "anomaly_label"]]

            df = df.groupby("series_id", group_keys=False).apply(label_group)

            anomalies_count = df["anomaly_label"].sum()
            print(f"    anomalies in chunk: {anomalies_count}")

            save_file = output_path / f"{year}_{file_path.stem}_anomalies.parquet"
            df.to_parquet(save_file, index=False)

            print(f"    Saved chunk → {save_file}")

            del df
            gc.collect()

    print("\nAll chunks saved successfully.")

In [21]:
years = range(2013, 2020)

save_filtered_series_dataset_chunked(
    series_id_fluctuations,
    years,
    output_dir="filtered_anomaly_dataset"
)


Processing year 2013
  Reading file weather_part_1.parquet
    anomalies in chunk: 55
    Saved chunk → filtered_anomaly_dataset\2013_weather_part_1_anomalies.parquet
  Reading file weather_part_2.parquet
    anomalies in chunk: 25822
    Saved chunk → filtered_anomaly_dataset\2013_weather_part_2_anomalies.parquet
  Reading file weather_part_3.parquet
    anomalies in chunk: 788429
    Saved chunk → filtered_anomaly_dataset\2013_weather_part_3_anomalies.parquet
  Reading file weather_part_4.parquet
    anomalies in chunk: 241022
    Saved chunk → filtered_anomaly_dataset\2013_weather_part_4_anomalies.parquet
  Reading file weather_part_5.parquet
    anomalies in chunk: 106402
    Saved chunk → filtered_anomaly_dataset\2013_weather_part_5_anomalies.parquet

Processing year 2014
  Reading file weather_part_1.parquet
    anomalies in chunk: 28
    Saved chunk → filtered_anomaly_dataset\2014_weather_part_1_anomalies.parquet
  Reading file weather_part_2.parquet
    anomalies in chunk: 107