# Table of Contents

### 1. [Setup](#Setup)


# Setup

#### Imports

In [15]:
import os
import csv
import time
import pandas as pd
import numpy as np
from pathlib import Path
import matplotlib.pyplot as plt
import pyarrow.parquet as pq


In [5]:
# Try to import parquet engines
try:
    import pyarrow
    PARQUET_ENGINE = 'pyarrow'
except ImportError:
    try:
        import fastparquet
        PARQUET_ENGINE = 'fastparquet'
    except ImportError:
        print("Warning: No parquet engine found. Installing pyarrow recommended.")
        print("Run: pip install pyarrow")
        PARQUET_ENGINE = None

#### Mkdirs

In [None]:
# os.makedirs('data/processed', exist_ok=True)

#### Consts and Vars

In [34]:
PARAMS = [
    'AvgSurfT_inst',
    'CanopInt_inst',
    'LWdown_f_tavg',
    'Psurf_f_inst',
    'Qair_f_inst',
    'SnowDepth_inst',
    'SWdown_f_tavg',
    'Tair_f_inst',
    'TVeg_tavg',
    'Wind_f_inst',
    'Rainf_tavg'
]  

AGGREGATION = {
    'Rainf_tavg': 'sum',        # Rain accumulates over time
    'SnowDepth_inst': 'sum',    # Snow accumulation over time  
    'CanopInt_inst': 'sum',     # Water accumulation over time
    'Tair_f_inst': 'mean',      # Daily average temperature
    'AvgSurfT_inst': 'mean',    # Daily average surface temperature
    'Psurf_f_inst': 'mean',     # Daily average pressure
    'Qair_f_inst': 'mean',      # Daily average humidity
    'Wind_f_inst': 'mean',      # Daily average wind speed
    'LWdown_f_tavg': 'mean',    # Daily average longwave radiation
    'SWdown_f_tavg': 'mean',    # Daily average shortwave radiation
    'TVeg_tavg': 'mean'         # Daily average transpiration
}

DATES = [
    '2024_March',
    '2024_April',
    '2024_May',
    '2024_June',
    '2024_July',
    '2024_Aug',
    '2024_Sept',
    '2024_Oct',
    '2024_Nov',
    '2024_Dec',
    '2025_Jan',
    '2025_Feb',
]

# ReUsable Functions

#### Global Functions

In [7]:
def read_raw_csv(filename: str) -> pd.DataFrame:
    """Read a CSV file and return a pandas DataFrame."""
    file_path = Path('data/raw') / filename
    return pd.read_csv(file_path)

In [8]:
def read_csv_to_numpy(filename: str, subfolder: str) -> np.ndarray:
    filename = f"{filename}.csv"
    file_path = os.path.join('data', subfolder, filename)

    data_as_list = []

    with open(file_path) as csv_file:
        csv_reader = csv.reader(csv_file, delimiter=',')

        next(csv_reader)  # Skip header
        
        for row in csv_reader:
            data_as_list.append([float(val) for val in row])
    
    data = np.array(data_as_list)
    return data

In [57]:
def save_as_parquet(arr, filename: str, subfolder: str):
    filename = f"{filename}.parquet"
    file_path = os.path.join('data', subfolder, filename)
    os.makedirs(os.path.dirname(file_path), exist_ok=True)
    df = pd.DataFrame(arr, columns=['longitude', 'latitude', 'year', 'month', 'day', 'value'])
    df.to_parquet(file_path, index=False, engine=PARQUET_ENGINE)

    print(f"\n✅ Data saved successfully!")
    print(f"📁 Location: {file_path}")
    print(f"�� Shape: {arr.shape[0]:,} rows × {arr.shape[1]} columns")
    print(f"💾 File size: {os.path.getsize(file_path) / (1024*1024):.2f} MB")

#### Complex Functions

In [10]:
def aggregate_3hourly_to_daily_pandas(arr, parameter):
    # Convert array to pandas DataFrame
    df = pd.DataFrame(arr, columns=['year', 'month', 'day', 'hour', 'longitude', 'latitude', 'value'])
    
    # Get the aggregation method for this parameter
    agg_method = AGGREGATION.get(parameter, 'mean')  # Default to 'mean' if not found
    
    # Aggregate to daily using the appropriate method
    daily_df = df.groupby(['longitude', 'latitude', 'year', 'month', 'day'])['value'].agg(agg_method).reset_index()
    
    # Convert back to numpy array
    daily_arr = daily_df.values
    
    return daily_arr

In [11]:
def print_basic_stats(arr):

    print(f"Total rows: {arr.shape[0]}")
    print(f"Missing values in final column: {np.isnan(arr[:, -1]).sum()}")
    print(f"Missing percentage: {np.isnan(arr[:, -1]).sum() / arr.shape[0] * 100:.2f}%")

    print(f"Range: {arr[:, -1].min():.2f} to {arr[:, -1].max():.2f}")
    print(f"Average: {arr[:, -1].mean():.2f}")

    print(f"Longitude range: {arr[:, 4].min():.2f} to {arr[:, 4].max():.2f}")
    print(f"Latitude range: {arr[:, 5].min():.2f} to {arr[:, 5].max():.2f}")


# Raw Data Proccess

- Loop through and read monthly parameter CSVs
- Run basic checks: missing data
- Select data types so can be held as numpy array
- Aggregate to daily
- Concat monthlys into a single array
- Sort by long, lat, year, month, day
- Save as parqets in aggregated folder

#### Implementation

In [40]:
def consolidate_raw_data(parameter_list: list, date_list: list):    
    start_time = time.time()

    for parameter in parameter_list:
        print(f'Parameter: {parameter}')

        monthly = []

        for date in date_list:
            print(f"\n===\n")

            # File to be processed:
            filename = f"{parameter}_data_{date}"
            print(f"date: {date}")
            print(filename + '.csv')

            # Load CSV to numpy array:
            try:
                arr = read_csv_to_numpy(filename, 'raw')
            except Exception as e:
                print(f"Error: {e}")
            
            # Basic stats/checks:
            print_basic_stats(arr)

            # Aggregation from 3hourly to daily:
            daily_arr_numpy = aggregate_3hourly_to_daily_pandas(arr, parameter)
            print(f"Original 3-hourly data: {arr.shape}")
            print(f"Aggregated daily data: {daily_arr_numpy.shape}")

            # Add to monthly list
            monthly.append(daily_arr_numpy)
            print(f"Monthly list now contains {len(monthly)} datasets")
        
        print(f"\n===\n")

        if monthly:
            print(f"\nStacking {len(monthly)} monthly datasets...")
        
            # Stack all monthly arrays vertically (one on top of the other)
            consolidated_arr = np.vstack(monthly)
            
            print(f"Consolidated array shape: {consolidated_arr.shape}")
            print(f"Total rows: {consolidated_arr.shape[0]:,}")
            print(f"Total columns: {consolidated_arr.shape[1]}")
            print(f"Memory usage: {consolidated_arr.nbytes / (1024*1024):.2f} MB")

            print(consolidated_arr[0])

            # Basic stats/checks:
            print_basic_stats(consolidated_arr)
            save_as_parquet(consolidated_arr, parameter, 'daily')

    end_time = time.time()
    print(f"Time taken: {end_time - start_time:.2f} seconds")

# Reference Table

In [44]:
def references_tables(parameters: list):

    references = pd.DataFrame()

    for parameter in parameters:
        df = pd.read_parquet(
            f'data/daily/{parameter}.parquet', 
            columns=['longitude', 'latitude', 'year', 'month', 'day']
        )
        unique_combinations = df.drop_duplicates()

        if parameter == 'Rainf_tavg':
            label_combinations = unique_combinations.copy()

        if references.empty:
            references = unique_combinations.copy()
        else:
            references = pd.concat([references, unique_combinations], ignore_index=True)

            references = references.drop_duplicates(subset=['longitude', 'latitude', 'year', 'month', 'day'])

    save_as_parquet(references, "total_references", "references")
    save_as_parquet(label_combinations, "label_references", "references")

# Spitting

In [56]:
def split_dataset_train_test_valid(parameter_name):
    """
    Split a parameter dataset into train/test/valid sets.
    - Test: February data
    - Valid: January data  
    - Train: All other months
    """

    label_reference = pd.read_parquet('data/references/label_references.parquet')
    
    print(f"🔄 Processing {parameter_name}...")
    
    df = pd.read_parquet(f'data/daily/{parameter_name}.parquet')
    print(f"   Loaded: {len(df):,} rows")

    print(df.head())

    # print(f"Month distribution:\n{df['month'].value_counts().sort_index().to_string()}")
    
    print(f"   Filtering to label reference combinations...")
    df_filtered = df.merge(label_reference, 
                          on=['longitude', 'latitude', 'year', 'month', 'day'], 
                          how='inner')
    print(f"   After filtering: {len(df_filtered):,} rows")
    
    # Split by month
    print(f"   Splitting into train/test/valid...")
    
    test_df = df_filtered[df_filtered['month'] == 2].copy()
    valid_df = df_filtered[df_filtered['month'] == 1].copy()
    train_df = df_filtered[~df_filtered['month'].isin([1, 2])].copy()
    
    # Print split statistics
    print(f"   Train set: {len(train_df):,} rows")
    print(f"   Test set: {len(test_df):,} rows") 
    print(f"   Valid set: {len(valid_df):,} rows")
    
    # Save the split datasets
    # os.makedirs('data/splits', exist_ok=True)
    
    # Save train set
    # train_path = f'data/splits/{parameter_name}_train.parquet'
    # train_df.to_parquet(train_path, index=False)
    
    # Save test set
    # test_path = f'data/splits/{parameter_name}_test.parquet'
    # test_df.to_parquet(test_path, index=False)
    
    # Save validation set
    # valid_path = f'data/splits/{parameter_name}_valid.parquet'
    # valid_df.to_parquet(valid_path, index=False)
    
    # print(f"   ✅ Saved to data/splits/")
    
    # return train_df, test_df, valid_df

# Process all parameters
print("🚀 Starting dataset splitting process...")
print("="*60)

split_results = {}

for parameter in PARAMS:
    print(f"\n📊 Processing {parameter}...")
    split_dataset_train_test_valid(parameter)
    
    # try:
    #     train_df, test_df, valid_df = split_dataset_train_test_valid(parameter, label_reference)
        
    #     # Store results
    #     split_results[parameter] = {
    #         'train': train_df,
    #         'test': test_df, 
    #         'valid': valid_df
    #     }
        
    # except Exception as e:
    #     print(f"   ❌ Error processing {parameter}: {e}")
    #     continue

# # Final summary
# print(f"\n�� SPLITTING COMPLETE!")
# print("="*60)

# for param, splits in split_results.items():
#     print(f"{param}:")
#     print(f"   Train: {len(splits['train']):,} rows")
#     print(f"   Test: {len(splits['test']):,} rows")
#     print(f"   Valid: {len(splits['valid']):,} rows")
#     print(f"   Total: {len(splits['train']) + len(splits['test']) + len(splits['valid']):,} rows")
#     print()

🚀 Starting dataset splitting process...

📊 Processing AvgSurfT_inst...
🔄 Processing AvgSurfT_inst...
   Loaded: 5,533,035 rows
    year  month     day  longitude  latitude       value
0 -179.5   66.5  2024.0        3.0       1.0  240.669425
1 -179.5   66.5  2024.0        3.0       2.0  254.237700
2 -179.5   66.5  2024.0        3.0       3.0  251.013837
3 -179.5   66.5  2024.0        3.0       4.0  252.279575
4 -179.5   66.5  2024.0        3.0       5.0  250.055550
   Filtering to label reference combinations...
   After filtering: 5,533,035 rows
   Splitting into train/test/valid...
   Train set: 5,533,035 rows
   Test set: 0 rows
   Valid set: 0 rows

📊 Processing CanopInt_inst...
🔄 Processing CanopInt_inst...
   Loaded: 5,533,035 rows
    year  month     day  longitude  latitude    value
0 -179.5   66.5  2024.0        3.0       1.0  0.08007
1 -179.5   66.5  2024.0        3.0       2.0  0.08007
2 -179.5   66.5  2024.0        3.0       3.0  0.08007
3 -179.5   66.5  2024.0        3.0   

KeyboardInterrupt: 

# Main 

In [None]:
consolidate_raw_data(PARAMS, DATES)
references_tables(PARAMS)


Parameter: AvgSurfT_inst

===

date: 2024_March
AvgSurfT_inst_data_2024_March.csv
Total rows: 3759432
Missing values in final column: 0
Missing percentage: 0.00%
Range: 206.82 to 349.63
Average: 277.67
Longitude range: -179.50 to 179.50
Latitude range: -54.50 to 83.50
Original 3-hourly data: (3759432, 7)
Aggregated daily data: (469929, 6)
Monthly list now contains 1 datasets

===

date: 2024_April
AvgSurfT_inst_data_2024_April.csv
Total rows: 3638160
Missing values in final column: 0
Missing percentage: 0.00%
Range: 215.17 to 346.24
Average: 283.46
Longitude range: -179.50 to 179.50
Latitude range: -54.50 to 83.50
Original 3-hourly data: (3638160, 7)
Aggregated daily data: (454770, 6)
Monthly list now contains 2 datasets

===

date: 2024_May
AvgSurfT_inst_data_2024_May.csv
Total rows: 3759432
Missing values in final column: 0
Missing percentage: 0.00%
Range: 223.41 to 345.40
Average: 287.95
Longitude range: -179.50 to 179.50
Latitude range: -54.50 to 83.50
Original 3-hourly data: (3759