Process Extra Data

1. Import required libraries

In [None]:
import os, re, warnings
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, MinMaxScaler
import joblib


warnings.filterwarnings('ignore')

2. Read the file names to be processed

In [None]:
# -----------------------------
# Config
# -----------------------------
INPUT_DIR  = '../dataset/extraData'
OUTPUT_DIR = '../dataset/processedData'
SCALER_TYPE = 'standard'     # 'standard' or 'minmax'
FREQ = '2T'                  # 2-minute resampling
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(os.path.join(OUTPUT_DIR, 'scalers'), exist_ok=True)

In [None]:

# -----------------------------
# Helpers
# -----------------------------
def extract_area_date(fname):
    """Parse area code and yyyymmdd from names like:
       RIY0329_DL_DataTransmission20250813171446.csv"""
    base = os.path.basename(fname)
    m = re.match(r'^(RIY\d+)_.*?(\d{8})\d{6}\.csv$', base)
    if m:
        return m.group(1), m.group(2)  # ('RIY0329', '20250813')
    return None, None

def normalize_date(yyyymmdd):
    return f'{yyyymmdd[:4]}-{yyyymmdd[4:6]}-{yyyymmdd[6:]}'

def load_and_timestamp(path, yyyymmdd):
    """Read CSV where first col is HH:MM:SS, build full datetime 'time'."""
    df = pd.read_csv(path)
    # First column is time-of-day
    first_col = df.columns[0]
    df = df.rename(columns={first_col: 'HH:MM:SS'})
    base_date = normalize_date(yyyymmdd)
    df['time'] = pd.to_datetime(base_date + ' ' + df['HH:MM:SS'].astype(str), errors='coerce')
    df = df.dropna(subset=['time'])
    return df

def aggregate_dl(dl):
    # Column name options (vendors differ)
    down_col   = next((c for c in dl.columns if 'DLMACTHP' in c), None)
    mcs_col    = 'DLSchMcs'
    rb_col     = next((c for c in dl.columns if c in ['DLRb_Used','DLRb_Use','DLAvailRb']), None)
    users_col  = 'DLSchUserNum'

    agg = dl.resample(FREQ, on='time').agg({
        down_col: 'mean',
        mcs_col: ['mean', 'var'],
        rb_col:  ['mean', 'var'],
        users_col: 'sum'
    })
    agg.columns = ['down', 'mcs_down', 'mcs_down_var', 'rb_down', 'rb_down_var', 'rnti_count_dl']
    # Convert down from Mbps → bps
    agg['down'] = agg['down'] * 1_000_000
    return agg

def aggregate_ul(ul):
    up_col     = next((c for c in ul.columns if 'UplinkMACTHP' in c), None)
    mcs_col    = 'ULSchMcs'
    rb_col     = 'UplinkRb'
    users_col  = 'ULSchUserNum'

    agg = ul.resample(FREQ, on='time').agg({
        up_col: 'mean',
        mcs_col: ['mean', 'var'],
        rb_col:  ['mean', 'var'],
        users_col: 'sum'
    })
    agg.columns = ['up', 'mcs_up', 'mcs_up_var', 'rb_up', 'rb_up_var', 'rnti_count_ul']

    # Convert down from Mbps → bps
    agg['up'] = agg['up'] * 1_000_000
    return agg

def fix_zeros_and_nans(df, cols_to_clean):
    """Treat isolated zeros as missing for continuous metrics, then fill."""
    for c in cols_to_clean:
        if c not in df.columns:
            continue
        # Mark zeros as NaN only if the column has nonzero values elsewhere
        if (df[c] == 0).any() and (df[c] != 0).any():
            df.loc[df[c] == 0, c] = np.nan
        # Forward/backward fill then median as last resort
        df[c] = df[c].ffill().bfill()
        if df[c].isna().any():
            df[c] = df[c].fillna(df[c].median())
    return df

def get_scaler(scaler_type='standard'):
    return StandardScaler() if scaler_type == 'standard' else MinMaxScaler()

def scale_safe(df, feature_cols, scaler_type, scaler_path):
    """Scale features, skipping constant-variance columns; save scaler."""
    # Skip columns with all equal values (std ~ 0)
    non_constant = [c for c in feature_cols if df[c].std(skipna=True) > 1e-12]
    constant     = [c for c in feature_cols if c not in non_constant]

    scaler = get_scaler(scaler_type)
    df_scaled = df.copy()
    if non_constant:
        vals = scaler.fit_transform(df[non_constant].values)
        df_scaled[non_constant] = vals
        joblib.dump({'scaler': scaler,
                     'columns': non_constant,
                     'constant_cols': constant,
                     'type': scaler_type}, scaler_path)
    else:
        # Nothing to scale, but still save an identity config
        joblib.dump({'scaler': None,
                     'columns': [],
                     'constant_cols': constant,
                     'type': scaler_type}, scaler_path)

    # Copy constant columns as-is
    for c in constant:
        df_scaled[c] = df[c]
    return df_scaled

# -----------------------------
# Pair UL/DL files by area+date
# -----------------------------
paths = [os.path.join(INPUT_DIR, f) for f in os.listdir(INPUT_DIR) if f.startswith('RIY') and f.endswith('.csv')]
pairs = {}  # {(area, date): {'DL': path, 'UL': path}}
for p in paths:
    area, date = extract_area_date(p)
    if not area or not date:
        continue
    key = (area, date)
    d = pairs.setdefault(key, {})
    if '_DL_' in p:
        d['DL'] = p
    elif '_UL_' in p:
        d['UL'] = p

# -----------------------------
# Process each (area, date)
# -----------------------------
for (area, date), files in sorted(pairs.items()):
    if 'DL' not in files or 'UL' not in files:
        print(f'Skipping {area} {date}: missing DL or UL file')
        continue

    print(f'Processing {area} {date} …')

    dl = load_and_timestamp(files['DL'], date)
    ul = load_and_timestamp(files['UL'], date)

    dl_agg = aggregate_dl(dl)
    ul_agg = aggregate_ul(ul)

    combined = dl_agg.join(ul_agg, how='outer').reset_index()  # has 'time'
    combined['rnti_count'] = combined['rnti_count_dl'].fillna(0) + combined['rnti_count_ul'].fillna(0)

    # Keep exactly the forecasting schema
    cols_final = [
        'time', 'down', 'up', 'rnti_count',
        'mcs_down', 'mcs_down_var',
        'mcs_up', 'mcs_up_var',
        'rb_down', 'rb_down_var',
        'rb_up', 'rb_up_var'
    ]
    for c in cols_final:
        if c not in combined.columns:
            combined[c] = np.nan
    combined = combined[cols_final].sort_values('time')

    # Clean zeros / fill NaNs for continuous metrics
    cont_cols = ['down','up','mcs_down','mcs_down_var','mcs_up','mcs_up_var',
                 'rb_down','rb_down_var','rb_up','rb_up_var']
    combined = fix_zeros_and_nans(combined, cont_cols)

    # Save RAW (unscaled)
    raw_out = os.path.join(OUTPUT_DIR, f'{area}_Processed_{date}.csv')
    combined.to_csv(raw_out, index=False)
    print(f'  -> wrote raw: {raw_out}')

    # SCALE + NORMALIZE (save as separate file)
    scaler_path = os.path.join(OUTPUT_DIR, 'scalers', f'{area}_{date}_{SCALER_TYPE}.joblib')
    scaled = combined.copy()
    scaled_features = cont_cols + ['rnti_count']  # include rnti_count too; drop if you prefer it unscaled
    scaled = scale_safe(scaled, scaled_features, SCALER_TYPE, scaler_path)

    norm_out = os.path.join(OUTPUT_DIR, f'{area}_Processed_{date}_scaled_{SCALER_TYPE}.csv')
    scaled.to_csv(norm_out, index=False)
    print(f'  -> wrote scaled: {norm_out}')
    print(f'  -> saved scaler: {scaler_path}')


  from pandas.core import (


Processing RIY0329 20250813 …
  -> wrote raw: ../dataset/processedData\RIY0329_Processed_20250813.csv
  -> wrote scaled: ../dataset/processedData\RIY0329_Processed_20250813_scaled_standard.csv
  -> saved scaler: ../dataset/processedData\scalers\RIY0329_20250813_standard.joblib
Processing RIY0443 20250813 …
  -> wrote raw: ../dataset/processedData\RIY0443_Processed_20250813.csv
  -> wrote scaled: ../dataset/processedData\RIY0443_Processed_20250813_scaled_standard.csv
  -> saved scaler: ../dataset/processedData\scalers\RIY0443_20250813_standard.joblib
Processing RIY2110 20250813 …
  -> wrote raw: ../dataset/processedData\RIY2110_Processed_20250813.csv
  -> wrote scaled: ../dataset/processedData\RIY2110_Processed_20250813_scaled_standard.csv
  -> saved scaler: ../dataset/processedData\scalers\RIY2110_20250813_standard.joblib
Processing RIY2227 20250813 …
  -> wrote raw: ../dataset/processedData\RIY2227_Processed_20250813.csv
  -> wrote scaled: ../dataset/processedData\RIY2227_Processed_20