# Data Cleaning Notebook for Dengue Transfer Learning Project

## Task

Conduct EDA for TensorFlow transfer learning pipeline to forecast **weekly dengue cases** (`total_cases`) from 22 multivariate weather/environmental features.

### Dataset
Dengue ML datasets track environmental and temporal factors influencing Aedes mosquito breeding and virus transmission in tropical regions like San Juan and Iquitos.

- #### Temporal Features
    - **city**: Location identifier (e.g., 'sj' for San Juan, 'iq' for Iquitos)—captures city-specific mosquito/dengue patterns.
    - **year, weekofyear, week_start_date**: Time granularity for seasonality; dengue peaks during rainy seasons (weekofyear critical for lagged effects).

- #### Vegetation Indices (NDVI)
    - **ndvi_ne, ndvi_nw, ndvi_se, ndvi_sw**: Normalized Difference Vegetation Index by city quadrant. Higher NDVI indicates lush vegetation providing mosquito shade/breeding sites; key for Aedes habitat detection via satellite.

- #### Precipitation \& Water
    - **precipitation_amt_mm**: Rainfall amount—creates standing water breeding sites.
    - **reanalysis_precip_amt_kg_per_m2, reanalysis_sat_precip_amt_mm**: Reanalysis (modeled) precipitation variants confirming observed rain.
    - **station_precip_mm**: Ground station measurements—most direct rain proxy.

- #### Temperature Metrics
    - **reanalysis_air_temp_k, reanalysis_avg_temp_k, reanalysis_max_air_temp_k, reanalysis_min_air_temp_k**: Reanalysis temps in Kelvin; optimal Aedes range 26-32°C accelerates larval development/virus replication.
    - **station_avg_temp_c, station_max_temp_c, station_min_temp_c**: Station temps in Celsius—ground truth validation.
    - **station_diur_temp_rng_c**: Diurnal range; wider swings stress mosquitoes.
    - **reanalysis_tdtr_k**: Temperature diurnal temperature range (reanalysis).

- #### Humidity \& Moisture
    - **reanalysis_dew_point_temp_k**: Dew point—direct humidity proxy; high values (>20°C) favor mosquito survival.
    - **reanalysis_relative_humidity_percent**: Relative humidity %—critical for egg/larval viability.
    - **reanalysis_specific_humidity_g_per_kg**: Absolute moisture content.


### Notebook sections for the second project notebook (Data Cleaning)
1. Get Data
2. Data Cleaning

In [1]:
import sys
import os
from pathlib import Path
from typing import List, Tuple, Any, Dict, Union
import gc
import itertools
from datetime import datetime
import logging
logging.basicConfig(level=logging.INFO)

# Set one level up as project root|
if os.path.abspath("..") not in sys.path:
    sys.path.insert(0, os.path.abspath(".."))
    
from src.config import ProjectConfig  # project config file parser
from src.utils.eda import value_streaks, top_correlations
from src.utils.visualizations import compute_correlations_matrix, \
                display_distributions, random_color, random_colormap, \
                display_timeseries

import pandas as pd
import numpy as np
import random
import time
from datetime import timedelta

from statsmodels.stats.outliers_influence import variance_inflation_factor
from statsmodels.tools.tools import add_constant

from src.utils.eda import top_correlations, top_vif
from src.utils.utils import _check_feature_presence, load_file, save_file
from src.preprocessing.clean import cap_outliers, drop_nan_rows, \
                                    median_groupwise_impute, pipe_clean
from src.preprocessing.engineer import reduce_features, remove_features

from IPython.display import display
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
# from matplotlib.axis import Axis
# from matplotlib.dates import MonthLocator, YearLocator, DateFormatter
import seaborn as sns

In [2]:
cnfg = ProjectConfig.load_configuration()
PATH_TO_RAW_DATA = cnfg.data.dirs["raw"]
FILE_TRAIN_RAW= cnfg.data.files["features_train"]
FILE_TEST_RAW = cnfg.data.files["features_test"]
FILE_LABELS_RAW = cnfg.data.files["labels_train"]

PATH_TO_INTERMEDIATE_DATA = cnfg.data.dirs["intermediate"]
FILE_NAN_CLEAN = cnfg.data.files["nan_mask"]
FILE_TRAIN_CLEAN = cnfg.data.files["features_clean"]
FILE_LABELS_CLEAN = cnfg.data.files["labels_clean"]

TARGET = cnfg.preprocess.feature_groups["target"]
ENV_FEAT_PREFIX = cnfg.preprocess.feature_groups["env_prefixes"]
CITYGROUP_FEAT = cnfg.preprocess.feature_groups["city"]
WEEK_FEAT = cnfg.preprocess.feature_groups["week"]
DATETIME_FEAT = cnfg.preprocess.feature_groups["datetime"]

### Get Data

In [3]:
type(FILE_TRAIN_RAW)

pathlib.PosixPath

In [4]:
df_train_raw = load_file(path=PATH_TO_RAW_DATA / FILE_TRAIN_RAW, datetime_col=DATETIME_FEAT)
df_test_raw = load_file(path=PATH_TO_RAW_DATA / FILE_TEST_RAW, datetime_col=DATETIME_FEAT)
df_labels_raw = load_file(path=PATH_TO_RAW_DATA / FILE_LABELS_RAW)
list_raw_df = [df_train_raw, df_test_raw, df_labels_raw]
env_features = [f for f in df_train_raw if f.startswith(tuple(ENV_FEAT_PREFIX))]

***To reduce data snooping, slice last entries for both dataset cities***

In [5]:
holdout_pct = 0.05
cities_first_i = df_train_raw.groupby(by=CITYGROUP_FEAT)["week_start_date"].idxmin()  # Series w Start indices
cities_last_i = df_train_raw.groupby(by=CITYGROUP_FEAT)["week_start_date"].idxmax()  # Series w end indices
cities_last_i = (cities_last_i - (cities_last_i - cities_first_i) * holdout_pct).astype(int)  # indice math with Series
period = tuple(slice(cities_first_i[city], cities_last_i[city], 1) for city in cities_last_i.index[::-1])  # Create tuple of slices from 2 Series
df_train_raw_eda = df_train_raw.iloc[np.r_[period]].reset_index(drop=True)  # apply defuned slices
df_labels_raw_eda = df_labels_raw.iloc[np.r_[period]].reset_index(drop=True)

## Data Cleaning

# TODO:
- [x] Cap BEFORE median computation (Winsorization - cap extreme values at specified threshold):
    - 1%/99% threshold for both models
    - do not cap targets
- [X] Remove rows with over 50% of NaN
- [X] Impute NaNs with the groupwise median.
- [ ] (OPTIONAL, if outliers still there) Reapply outlier handling AFTER imputation with 5%/95% threshold:
    - larger threshold preserves more of original distribution shape than tail-focussed 1%/99% threshold
    - should not cap much of the data at this stage (CHECK FOR AFFECTED DATAPOINT COUNT)

### Outlier handling

In [6]:
# TODO remove after cleaning

# def cap_outliers(data: pd.DataFrame, features: List[str]=None,
#                  group_keys: List[str]=None,
#                  lower_cap:float=None, upper_cap:float=None,
#                 output_stats:bool=True) -> Dict[str, Any]:
#     """
#     Perform groupwise Winsorization (percentile clipping) on specified features to handle outliers.
#     Automatically filter environmental features from config prefixes if not specified.
#     :param data: Input pandas DataFrame.
#     :param features: List of column names to clip. Default None auto-selects env features 
#            from prefixes defined in config.yaml.
#     :param group_keys: List of columns to group by for quantile calculation. Default None uses 
#            config.yaml 'city' grouping.
#     :param lower_cap: Lower percentile for clipping (0-1). Default None uses config.yaml 
#            'outlier_perc.lower' (originally 0.01).
#     :param upper_cap: Upper percentile for clipping (0-1). Default None uses config.yaml 
#            'outlier_perc.upper' (originally 0.99).
#     :param output_stats: If True, returns % rows changed per feature. Default True.
#     :return: Dict containing:
#            - 'data': Clipped DataFrame copy (original unchanged)
#            - 'capped_row_prc': Series of % rows clipped per feature (if output_stats=True)
#     """
#     if features is None:
#         features = [f for f in data.columns if f.startswith(
#             tuple(cnfg.preprocess.feature_groups["env_prefixes"]))]
#     if group_keys is None:
#         group_keys = cnfg.preprocess.feature_groups["city"]
#     if lower_cap is None:
#         lower_cap = cnfg.preprocess.outlier_perc["lower"]
#     if upper_cap is None:
#         upper_cap = cnfg.preprocess.outlier_perc["upper"]

#     data_no_outliers = data.copy()
#     data_no_outliers[features] = data_no_outliers.groupby(by=group_keys)[features].transform(
#         lambda group: group.clip(
#             lower=group.quantile(lower_cap), upper=group.quantile(upper_cap)))

#     if output_stats:
#         capped_row_percent = round(
#             ((data[features] != data_no_outliers[features]).sum() / len(data) * 100), 2)
#         return {"data": data_no_outliers,
#                 "capped_row_prc": capped_row_percent}
#     return {"data": data_no_outliers}

In [7]:
intermediate_output = cap_outliers(data=df_train_raw)
df_train_clean = intermediate_output["data"]

In [8]:
intermediate_output["capped_row_prc"]

ndvi_ne                                  15.25
ndvi_nw                                   5.63
ndvi_se                                   3.71
ndvi_sw                                   3.71
precipitation_amt_mm                      2.40
reanalysis_air_temp_k                     2.82
reanalysis_avg_temp_k                     2.82
reanalysis_dew_point_temp_k               2.88
reanalysis_max_air_temp_k                 2.68
reanalysis_min_air_temp_k                 2.61
reanalysis_precip_amt_kg_per_m2           2.88
reanalysis_relative_humidity_percent      2.88
reanalysis_sat_precip_amt_mm              2.40
reanalysis_specific_humidity_g_per_kg     2.88
reanalysis_tdtr_k                         2.82
station_avg_temp_c                        5.01
station_diur_temp_rng_c                   5.01
station_max_temp_c                        3.09
station_min_temp_c                        2.68
station_precip_mm                         2.61
dtype: float64

In [9]:
# # Uncomment cell for visual check on new distributions for selected features after outlier cliping/Winsorization 

# selected_distro_EDA_features = ["ndvi_ne", "precipitation_amt_mm", "station_diur_temp_rng_c", "station_max_temp_c",
#        "station_min_temp_c", "station_precip_mm"]
# # selected_distro_EDA_features = [feature for feature in df_train_raw_eda.select_dtypes("float") if not feature.startswith("reanalysis")]  # used for outlier check

# for numeric_feature in selected_distro_EDA_features:
#     display_distributions(data=df_train_clean[selected_distro_EDA_features],
#                           features=[numeric_feature],
#                           title_prefix=numeric_feature)

**Conclusion**:
- Extreme outliers are removed
- Data ranges and variations seam to be credible for tropical climate
- No need to adjust 1%/99% clipping percentailes or conduct second round of cliping/Winsorization
- Adjusted row percentaga is rasonable from ~2-15% (does not exceed 20%)

### NaN handling

- Remove rows with > 50% NaN values:
    - also removes all rowws for `wekofyear` # 53 that do not have observational or analytical data (a likely data collection bug)

In [10]:
# TODO remove after cleaning

# def drop_nan_rows(X: pd.DataFrame, y: pd.Series | None = None,
#                   threshold_percent: float = 0.5):
#     """
#     Drop rows with NaN values exceeding threshold_percent of columns.
    
#     :param X: pandas DataFrame of features.
#     :param y: Optional target array/series. Default None.
#     :param threshold_percent: Min non-null fraction required [0,1]. Default 0.5.
#     :return: Filtered X (and y if provided), both with reset_index().
#     """
#     row_drop_threshold = int(len(X.columns) * threshold_percent)
#     result = X.dropna(thresh=row_drop_threshold)
#     if y is not None:
#         return result.reset_index(), y.iloc[result.index].reset_index()
#     return result.reset_index()

In [11]:
df_train_clean, df_labels_clean = drop_nan_rows(X=df_train_clean, y=df_labels_raw)

In [12]:
# TODO remove after cleaning

# def median_groupwise_impute(X: pd.DataFrame,
#                             group_keys: List[str] = ['city', 'weekofyear']):
#     """
#     Impute NaN values in numeric columns using median within specified group keys.
    
#     :param X: pandas DataFrame containing grouping columns and features to impute.
#     :param group_keys: List of column names for grouping. Default ['city', 'weekofyear'].
#     :return: Copy of input DataFrame with NaNs filled by group-wise medians.
#     """
#     missing_keys = set(group_keys) - set(X.columns)
#     if missing_keys:
#         raise ValueError(f"Missing group keys {missing_keys}")

#     X_no_nan = X.copy()
#     cols_with_nan = X_no_nan.select_dtypes(include="number")\
#         .columns[X_no_nan.select_dtypes(include="number").isna().sum() > 0].to_list()

#     if len(cols_with_nan) > 0:
#         X_no_nan[cols_with_nan] = X_no_nan[cols_with_nan + group_keys]\
#             .groupby(by=group_keys)[cols_with_nan]\
#             .transform(lambda group: group.fillna(group.median()))
#     return X_no_nan

In [13]:
df_train_clean, df_nan_mask = median_groupwise_impute(X=df_train_clean)

In [14]:
df_labels_clean.columns

Index(['city', 'year', 'weekofyear', 'total_cases'], dtype='object')

In [15]:
# Check if any NaNs left
df_train_clean.isna().sum().sum()

0

In [16]:
# TODO remove after cleaning

# plt.figure(figsize=(18, 6))  # df_train_raw.shape[1]
# sns.heatmap(
#     # median_groupwise_impute(df_train_clean).isna(),
#     df_train_clean.isna(),
#     cmap='plasma', cbar=False)
# plt.title("Post-clean NaN location check.\n", 
#           fontsize=13, fontweight="bold")
# plt.xticks(rotation=45, ha='right', rotation_mode='anchor')
# plt.show()

**Conclusions**
- primary imputation method: median of all other weekly data from subset of same city data:
    - median because data has outliers
    - data misigness is acceptable - even in worst cases (after removing week 53 that has no data) thera are more than 50% and at least 6 data pints available to produce `city`+`weekofyear` medians (see EDA table).
    - Features in the datset have strong seasonality (rain, temperature, humidity, NDVI). Same week median can handle this.
    - moderately well handles different issues with the dataset - scattered NaNs, long streaks (entire season of 15-weeks for `NDVI`)
    - simple to implement
- Other imputation methods considered:
    - Data reconstruction (eg station average or range features from station_max and station_min):
        - discarded as performing same calculations on non-nan data show significant discrepancies between calculated and original data
    - horizontal imputation from potentially related Reanalysis data:
        - discarded: top correlations for station mesurement and reanalysis data do differ accross city data subsets. San Juan has more promissing correlation ranges from ~0.5 (`station_precip_mm`) to ~0.88 (`station_avg_temp_c`) while Iquitos respective ranges are from below 0.4 (`station_precip_mm`) to ~0.6 (`station_avg_temp_c`). Considering that almost all of the missing data for station measurements are in Iquitos, the correlations do not explain enough variance (R^2) and thus median imputation is potentially better tool.
    - Temporal interpolation (np.interp, splines):
        -  discarded: destroys temporal patterns for long NaN streaks (eg line pattern for entire season or month)
    -  KNN/multi-feature models:
        - discarded: Complexity vs expected gains. Too much effort and bug risk versus potentially minimal model improvements when simple median imputation used.  
    

<!-- ### Remove selected multicolinear features
- [X] Remove initial `config.yaml` milticolinear features form dataframe
- [X] Assess city-wise VIF to EDA instead of correlation matrix for one-vs-all relationships.
- [ ] Remove features with VIF > 10
- [X] always prefer station_* over reanalysis_* -->

### Cleaning pipeline
- [X] run cleaning steps in sequence
- [X] save clean data and nan mask to the disc

In [25]:
# TODO - remove after cleanimng

# def save_file(df: pd.DataFrame, path: Union[str, Path], overwrite=False):
#     """
#     Save pandas DataFrame to CSV or Parquet file with automatic directory creation 
#     and timestamped to avoid overwrites.
#     :param df: DataFrame to save.
#     :param path: File path as string or Path object (.csv, .parquet, or .pqt).
#     :param overwrite: If True, overwrite existing file. If False (default), 
#                       create timestamped version like `file_20260201_1947.csv`.
#     :return: Final Path object where file was saved.
#     :raises ValueError: If DataFrame is empty or file format unsupported.
#     """
#     path=Path(path)
#     if df.empty:
#         raise ValueError("Attemting to save empty DataFrame")
#     if not path.parent.is_dir():
#         logging.info("No directory for provided path. Creating one.")
#         path.parent.mkdir(parents=True)
#     if path.is_file():
#         if overwrite:
#             logging.info("Path file present, overwriting.")
#         else:
#             timestamp = datetime.now().strftime("%Y%m%d_%H%M")
#             path = path.with_stem(f"{path.stem}_{timestamp}")
#             logging.info(f"Path file present, creating new one: {path.name}.")
            
#     if path.suffix.lower() == ".csv":
#         df.to_csv(path, index=False)
#     elif path.suffix.lower() in [".parquet", ".pqt"]:
#         df.to_parquet(path, index=False, engine='fastparquet')
#     else:
#         raise ValueError(f"Unsupported file format: {path.suffix}")
#     logging.info(f"Saved {df.shape} shaped data to {path}")
#     return path
                

In [26]:
save_file(df=df_nan_mask, path=PATH_TO_INTERMEDIATE_DATA / FILE_NAN_CLEAN)

INFO:root:Path file present, creating new one: dengue_nan_mask_20260202_1412.parquet.
INFO:root:Saved (1446, 11) shaped data as dengue_nan_mask_20260202_1412.parquet


PosixPath('/home/butros/Documents/kursi/pasmaciba/PashPrjekts/finetune2025/dengue-transfer-learn/data/intermediate/dengue_nan_mask_20260202_1412.parquet')

In [27]:
# # TODO - remove after cleanimng

# def pipe_clean(manual_dirs: Dict[str, Path] | None=None,
#                manual_files: Dict[str, Path] | None=None,
#                datetime_col: str=None,
#                overwrite_files: bool=False) -> Dict[str, Any]:
#     """
#     Data cleaning pipeline for features and targets.
#     Handle outlier capping, NaN row dropping, and groupwise median imputation in sequence.
#     :param manual_dirs: Dict of directory paths to override config.yaml dirs. 
#         Keys: 'raw', 'intermediate'. Default None uses config.yaml data.dirs.
#     :param manual_files: Dict of filenames to override config.yaml files. 
#         Keys: 'features_train', 'labels_train', 'features_clean', 'labels_clean', 'nan_mask'. 
#         Default None uses config.yaml data.files.
#     :param datetime_col: Name of datetime column for parsing. Default None uses
#                             config.yaml preprocess.feature_groups["datetime"].
#     :param overwrite_files: If True, overwrites existing saved files. Default False.
    
#     :return: Dict containing:
#         - 'X_clean_save_path': Path where cleaned features saved
#         - 'X_clean_data': Cleaned features DataFrame
#         - 'X_capped_rows_prc': Float % of rows affected by outlier capping
#         - 'y_clean_save_path': Path where cleaned targets saved  
#         - 'y_clean_data': Cleaned targets DataFrame
#         - 'nan_mask_save_path': Path where NaN mask saved
#         - 'nan_mask_data': DataFrame tracking imputed locations (for feature engineering)
#     """
#     dirs = cnfg.data.dirs
#     filenames = cnfg.data.files
#     if manual_dirs is not None:
#         dirs = manual_dirs
#     if manual_files is not None:
#         filenames = manual_files

#     X_clean = load_file(path=dirs["raw"] / filenames["features_train"],
#                         datetime_col=datetime_col)
#     y_clean = load_file(path=dirs["raw"] / filenames["labels_train"],
#                         datetime_col=datetime_col)
    
#     caping_output = cap_outliers(data=X_clean)
#     X_clean = caping_output["data"]
#     X_clean, y_clean = drop_nan_rows(X=X_clean, y=y_clean)
#     X_clean, df_nan_mask = median_groupwise_impute(X=X_clean)

#     X_save_path = save_file(df=X_clean, path=dirs["intermediate"] / filenames["features_clean"],
#                             overwrite=overwrite_files)
#     y_save_path = save_file(df=y_clean, path=dirs["intermediate"] / filenames["labels_clean"],
#                             overwrite=overwrite_files)
#     nan_mask_save_path = save_file(df=df_nan_mask, path=dirs["intermediate"] / filenames["nan_mask"],
#                                    overwrite=overwrite_files)
    
#     return {"X_clean_save_path": X_save_path,
#             "X_clean_data": X_clean,
#             "X_capped_rows_prc": caping_output["capped_row_prc"],
#             "y_clean_save_path": y_save_path,
#             "y_clean_data": y_clean,
#             "nan_mask_save_path": nan_mask_save_path,
#             "nan_mask_data": df_nan_mask,            
#            }

In [28]:
%%time
test_pipe_output = pipe_clean(overwrite_files=True)

INFO:root:Path file present, overwriting.
INFO:root:Saved (1446, 24) shaped data as dengue_features_clean.parquet
INFO:root:Saved (1446, 4) shaped data as dengue_labels_clean.parquet
INFO:root:Path file present, overwriting.
INFO:root:Saved (1446, 11) shaped data as dengue_nan_mask.parquet


CPU times: user 530 ms, sys: 8.05 ms, total: 538 ms
Wall time: 537 ms


In [32]:
test_pipe_output["X_clean_data"].sample(1)

Unnamed: 0,city,year,weekofyear,week_start_date,ndvi_ne,ndvi_nw,ndvi_se,ndvi_sw,precipitation_amt_mm,reanalysis_air_temp_k,...,reanalysis_precip_amt_kg_per_m2,reanalysis_relative_humidity_percent,reanalysis_sat_precip_amt_mm,reanalysis_specific_humidity_g_per_kg,reanalysis_tdtr_k,station_avg_temp_c,station_diur_temp_rng_c,station_max_temp_c,station_min_temp_c,station_precip_mm
977,iq,2001,22,2001-05-28,0.299943,0.302943,0.318714,0.387443,90.07,296.845714,...,20.3,91.465714,90.07,16.694286,8.8,27.25,12.65,34.8,20.5,32.0


In [33]:
test_pipe_output["y_clean_data"].sample(1)

Unnamed: 0,city,year,weekofyear,total_cases
834,sj,2006,25,6


In [34]:
test_pipe_output["nan_mask_data"].sample(1)

Unnamed: 0,ndvi_ne,ndvi_nw,ndvi_se,ndvi_sw,precipitation_amt_mm,reanalysis_sat_precip_amt_mm,station_avg_temp_c,station_diur_temp_rng_c,station_max_temp_c,station_min_temp_c,station_precip_mm
1018,False,False,False,False,False,False,False,False,False,False,False


In [35]:
test_pipe_output["X_capped_rows_prc"]

ndvi_ne                                  15.25
ndvi_nw                                   5.63
ndvi_se                                   3.71
ndvi_sw                                   3.71
precipitation_amt_mm                      2.40
reanalysis_air_temp_k                     2.82
reanalysis_avg_temp_k                     2.82
reanalysis_dew_point_temp_k               2.88
reanalysis_max_air_temp_k                 2.68
reanalysis_min_air_temp_k                 2.61
reanalysis_precip_amt_kg_per_m2           2.88
reanalysis_relative_humidity_percent      2.88
reanalysis_sat_precip_amt_mm              2.40
reanalysis_specific_humidity_g_per_kg     2.88
reanalysis_tdtr_k                         2.82
station_avg_temp_c                        5.01
station_diur_temp_rng_c                   5.01
station_max_temp_c                        3.09
station_min_temp_c                        2.68
station_precip_mm                         2.61
dtype: float64