In [1]:
import os
import pandas as pd
import fetchData
import numpy as np
import glob
import Timestamp

In [2]:
data_type = 'METAR'
dataset_purposes = ['train', 'test']

path_levels = ['part_1', 'part_2', 'part_3', 'part_4', 'part_5']

years = ['2022', '2023']
monthes = ['01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12']
days = ['01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12', 
       '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', 
       '25', '26', '27', '28', '29', '30', '31']

In [3]:
def find_forecast_files(directory, date):
    year = date.to_string("%Y")
    month = date.to_string("%m")
    day = date.to_string("%d")
    
    pattern = os.path.join(directory, f"metar.{year}{month}{day}.??Z.txt")
    return glob.glob(pattern)

In [None]:
def preprocess_metar_data(df, complete_df):
    df.rename(columns={'date_time': 'timestamp_15mins'}, inplace=True)
    df = complete_df.merge(df, on='timestamp_15mins', how='left')
    """Clean and extract necessary features from METAR data."""
    # Expand cloud_layers column into sky_cover, altitude_ft, and cumulonimbus
    df['sky_cover'] = df['cloud_layers'].apply(lambda x: x[0]['sky_cover'] if isinstance(x, list) and x else np.nan).astype(float)
    df['altitude_ft'] = df['cloud_layers'].apply(lambda x: x[0]['altitude_ft'] if isinstance(x, list) and x else np.nan).astype(float)
    df['cumulonimbus'] = df['cloud_layers'].apply(lambda x: x[0]['cumulonimbus'] if isinstance(x, list) and x else np.nan).astype(float)

    # Drop the cloud_layers column as it has been expanded
    df = df.drop(columns=['cloud_layers'])
    
    # Calculate dew point spread
    df['dew_point_spread'] = df['temperature'] - df['dewpoint']
    
    # Aggregate data every 15 minutes
    preprocessed_df = df.resample('15min', on='timestamp_15mins').agg({
        'temperature': 'mean',
        'dewpoint': 'mean',
        'dew_point_spread': 'mean',
        'wind_speed_mps': 'mean',
        'pressure': 'mean',
        'visibility_meters': 'mean',
        'sky_cover': 'first',
        'altitude_ft': 'first',
        'cumulonimbus': 'first'
    }).reset_index()
    preprocessed_df = preprocessed_df.ffill().infer_objects(copy=False)
    return preprocessed_df

In [5]:
def get_final_df(file_data):
    combined_df = pd.concat(file_data, ignore_index=True)
    
    aggregated_df = combined_df.groupby("timestamp_15mins").agg({
        'temperature': 'mean',          # Average temperature in each 15-minute interval
        'dewpoint': 'mean',             # Average dewpoint
        'dew_point_spread': 'mean',     # Average dew point spread
        'wind_speed_mps': 'mean',       # Average wind speed
        'pressure': 'mean',             # Average pressure
        'visibility_meters': 'mean',    # Average visibility
        'sky_cover': 'first',           # First recorded sky cover in each interval
        'altitude_ft': 'first',         # First recorded altitude of cloud cover
        'cumulonimbus': 'first'         # First cumulonimbus presence indicator in each interval
    }).reset_index()

    return aggregated_df

In [7]:
for dataset_purpose in dataset_purposes:
    for path_level in path_levels:
        for year in years:
            for month in monthes:
                for day in days:
                    date = Timestamp.Timestamp(year=int(year), month=int(month), day=int(day))
                    directory = os.path.join(fetchData.get_defult_base_dir(), 
                                    data_type, 
                                    dataset_purpose, 
                                    path_level
                                ) if dataset_purpose == 'train' else os.path.join(fetchData.get_defult_base_dir(), 
                                                                        data_type, 
                                                                        dataset_purpose)
                
                    df_list = []
                    try:
                        # Attempt to create a valid date
                        date = pd.Timestamp(year=date.year, month=date.month, day=date.day)
                    except ValueError:
                        continue
                    date = Timestamp.Timestamp(year=int(year), month=int(month), day=int(day))
                    data_range = date.to_string("%Y-%m-%d")
                    
                    file_path = os.path.join('./', 'data', data_type, dataset_purpose, f'{data_range}.csv')
                    if os.path.exists(file_path):
                        print(file_path, "saved!")
                        continue

                    start_date = pd.to_datetime(data_range).tz_localize('UTC')
                    end_date = start_date + pd.Timedelta(days=1)
                    complete_times = pd.date_range(start=start_date, end=end_date, freq='15min')
                    complete_index = pd.MultiIndex.from_product([complete_times], names=['timestamp_15mins'])
                    complete_df = pd.DataFrame(index=complete_index).reset_index()

                    forecast_files = find_forecast_files(directory, date)
                    if not forecast_files:
                        continue
                    for forecast_file in forecast_files:
                        file_name = forecast_file.split(os.path.sep)[-1]
                        file_name = file_name[:len(file_name)-4]
                        df = fetchData.load_data(
                            data_type=data_type, 
                            dataset_purpose=dataset_purpose,
                            path_level=path_level,  
                            file_name=file_name
                        ) if dataset_purpose == 'train' else fetchData.load_data(
                                                                    data_type=data_type, 
                                                                    dataset_purpose=dataset_purpose,  
                                                                    file_name=file_name)
                        preproceed_df = preprocess_metar_data(df=df, complete_df=complete_df)
                        df_list.append(preproceed_df)
                    final_df = get_final_df(df_list)
                    final_df = final_df.ffill().bfill()
                    final_df = final_df.interpolate(method='linear')

                    file_path = os.path.join('./', 'data', data_type, dataset_purpose, f'{data_range}.csv')
                    os.makedirs(os.path.dirname(file_path), exist_ok=True)
                    final_df.to_csv(file_path, index=False)
                    print(file_path, "saved!")     
                           
                            

./data/METAR/train/2022-09-01.csv saved!
./data/METAR/train/2022-09-02.csv saved!
./data/METAR/train/2022-09-03.csv saved!
./data/METAR/train/2022-09-04.csv saved!
./data/METAR/train/2022-09-05.csv saved!
./data/METAR/train/2022-09-06.csv saved!
./data/METAR/train/2022-09-07.csv saved!
./data/METAR/train/2022-09-08.csv saved!
./data/METAR/train/2022-09-09.csv saved!
./data/METAR/train/2022-09-10.csv saved!
./data/METAR/train/2022-09-11.csv saved!
./data/METAR/train/2022-09-12.csv saved!
./data/METAR/train/2022-09-13.csv saved!
./data/METAR/train/2022-09-14.csv saved!
./data/METAR/train/2022-09-15.csv saved!
./data/METAR/train/2022-09-16.csv saved!
./data/METAR/train/2022-09-17.csv saved!
./data/METAR/train/2022-09-18.csv saved!
./data/METAR/train/2022-09-19.csv saved!
./data/METAR/train/2022-09-20.csv saved!
./data/METAR/train/2022-09-21.csv saved!
./data/METAR/train/2022-09-22.csv saved!
./data/METAR/train/2022-09-23.csv saved!
./data/METAR/train/2022-09-24.csv saved!
./data/METAR/tra