# Structure: Water Pump Preprocessor
Below is a class that wraps your entire logic in method-based steps, along with a run_all() method to execute the full pipeline:

In [None]:
import pandas as pd
import numpy as np
import geopandas as gpd
from sklearn.neighbors import NearestNeighbors
from rapidfuzz import process, fuzz
import re

class WaterPumpPreprocessor:
    def __init__(self, df, gadm_path=None):
        self.df = df.copy()
        self.gadm_path = gadm_path

    def impute_coords_from_shapefile(self, score_threshold=80):
        if self.gadm_path is None:
            raise ValueError("GADM shapefile path must be provided.")

        wards_gdf = gpd.read_file(self.gadm_path)
        wards_gdf.rename(columns={'NAME_3': 'ward', 'NAME_2': 'district'}, inplace=True)
        self.df['ward_clean'] = self.df['ward'].astype(str).str.lower()
        wards_gdf['ward_clean'] = wards_gdf['ward'].astype(str).str.lower()

        self.df['latitude'] = self.df['latitude'].replace([-2e-08, 0.0], pd.NA)
        self.df['longitude'] = self.df['longitude'].replace([0.0], pd.NA)

        df_valid = self.df[self.df['latitude'].notna() & self.df['longitude'].notna()]
        df_missing = self.df[self.df['latitude'].isna() | self.df['longitude'].isna()]

        matches = []
        for ward in df_missing['ward_clean'].unique():
            match, score, _ = process.extractOne(
                query=ward, choices=wards_gdf['ward_clean'], scorer=fuzz.token_sort_ratio
            )
            matches.append({'ward_clean': ward, 'matched_ward': match, 'score': score})
        match_df = pd.DataFrame(matches)
        match_df = match_df[match_df['score'] >= score_threshold]

        df_missing = df_missing.merge(match_df, on='ward_clean', how='left')
        wards_unique = wards_gdf.drop_duplicates(subset=['ward_clean'])

        df_missing = df_missing.merge(
            wards_unique[['ward_clean', 'geometry']],
            left_on='matched_ward', right_on='ward_clean', how='left'
        )

        def get_centroid_coords(geom):
            if geom and not geom.is_empty:
                return pd.Series([geom.centroid.y, geom.centroid.x])
            return pd.Series([pd.NA, pd.NA])

        df_missing[['latitude', 'longitude']] = df_missing['geometry'].apply(get_centroid_coords)

        self.df = pd.concat([df_valid, df_missing], ignore_index=True)

    def impute_subvillage_by_location(self):
        known = self.df[self.df['subvillage'].notnull()]
        unknown = self.df[self.df['subvillage'].isnull()]

        if not unknown.empty and not known.empty:
            nn = NearestNeighbors(n_neighbors=1, algorithm='ball_tree')
            nn.fit(known[['latitude', 'longitude']])
            distances, indices = nn.kneighbors(unknown[['latitude', 'longitude']])
            self.df.loc[unknown.index, 'subvillage'] = known.iloc[indices.flatten()]['subvillage'].values

    def fuzzy_clean_column(self, column, top_n=50, threshold=75, force_include=None):
        def clean_text(val):
            val = str(val).lower().strip()
            return re.sub(r'[^a-z0-9\s]', '', val)

        col_clean = f"{column}_clean"
        col_grouped = f"{column}_grouped"

        self.df[column] = self.df[column].fillna("missing")
        self.df[col_clean] = self.df[column].astype(str).apply(clean_text)

        top_values = self.df[col_clean].value_counts().head(top_n).index.tolist()
        if force_include:
            top_values += force_include
            top_values = list(set(top_values))

        def match_func(val):
            result = process.extractOne(val, top_values, scorer=fuzz.ratio)
            if result and result[1] >= threshold:
                return result[0]
            return val

        self.df[col_grouped] = self.df[col_clean].apply(match_func)

    def combine_funder_installer_grouped(self):
        self.fuzzy_clean_column("installer", force_include=["hesawa", "government", "unicef", "jica", "rc church", "danida"])
        self.fuzzy_clean_column("funder", force_include=["hesawa", "government", "unicef", "jica", "rc church", "danida"])

        def group_names(x):
            x = str(x).lower()
            if 'gov' in x: return 'government'
            if 'japan' in x or x in ['jica', 'jaica']: return 'japan'
            if 'german' in x: return 'germany'
            if 'village' in x: return 'village'
            return x

        self.df['installer_grouped'] = self.df['installer_grouped'].apply(group_names)
        self.df['funder_grouped'] = self.df['funder_grouped'].apply(group_names)

        self.df['installer_grouped'] = self.df['installer_grouped'].replace({'danid': 'danida', 'commu': 'community', '0': 'unknown'})
        self.df['funder_grouped'] = self.df['funder_grouped'].replace({
            'fini water': 'ministry of water',
            '0': 'unknown',
            'germany republi': 'germany',
            'adb': 'african development bank'
        })

        self.df['funder_installer_grouped'] = self.df['funder_grouped'] + "_" + self.df['installer_grouped']
        rare = self.df['funder_installer_grouped'].value_counts()[lambda x: x < 10].index
        self.df['funder_installer_pair_grouped'] = self.df['funder_installer_grouped'].apply(lambda x: x if x not in rare else 'other')

        self.df.drop(columns=['funder_clean', 'installer_clean', 'funder', 'installer', 'funder_installer_grouped'], inplace=True)

    def encode_frequency(self):
        self.df['subvillage_funder_installer'] = (
            self.df['subvillage'].astype(str) + "_" + self.df['funder_installer_pair_grouped'].astype(str)
        )
        freq_map = self.df['subvillage_funder_installer'].value_counts().to_dict()
        self.df['subvillage_funder_installer_freq'] = self.df['subvillage_funder_installer'].map(freq_map)

    def impute_construction_year(self):
        self.df['construction_year'] = self.df['construction_year'].replace(0, np.nan)
        self.df['unknown_construction_year'] = self.df['construction_year'].isna()

        med_fi = self.df.groupby('subvillage_funder_installer')['construction_year'].median()
        self.df = self.df.merge(med_fi.rename('median_fi'), on='subvillage_funder_installer', how='left')
        self.df['construction_year'].fillna(self.df['median_fi'], inplace=True)
        self.df.drop(columns='median_fi', inplace=True)

        med_region = self.df.groupby('region')['construction_year'].median()
        self.df = self.df.merge(med_region.rename('median_region'), on='region', how='left')
        self.df['construction_year'].fillna(self.df['median_region'], inplace=True)
        self.df.drop(columns='median_region', inplace=True)

        self.df['construction_year'].fillna(self.df['construction_year'].median(), inplace=True)

    def calculate_pump_age(self):
        self.df['date_recorded'] = pd.to_datetime(self.df['date_recorded'], errors='coerce')
        self.df['pump_age'] = self.df.apply(
            lambda row: max(row['date_recorded'].year - row['construction_year'], 0)
            if not pd.isna(row['date_recorded']) and not pd.isna(row['construction_year']) else np.nan,
            axis=1
        )

    def compute_water_scores(self):
        quantity_map = {'dry': 1, 'insufficient': 2, 'seasonal': 3, 'enough': 4, 'unknown': 1}
        quality_map = {
            'fluoride abandoned': 0, 'salty abandoned': 0,
            'fluoride': 1, 'salty': 1,
            'coloured': 2, 'milky': 2,
            'soft': 3, 'unknown': 0
        }
        self.df['water_quantity_score'] = self.df['quantity'].map(quantity_map).fillna(1).astype(int)
        self.df['water_quality_score'] = self.df['water_quality'].map(quality_map).fillna(0).astype(int)
        self.df['water_availability_index'] = self.df['water_quantity_score'] / self.df['population'].replace(0, np.nan)
        self.df['water_safety_sum'] = self.df['water_quality_score'] + self.df['water_quantity_score']

    def impute_population_by_median(self):
        self.df['population'] = self.df['population'].replace(0, pd.NA)
        self.df['gps_height_bin'] = pd.qcut(self.df['gps_height'], q=5, duplicates='drop')
        self.df['lat_lon_bin'] = pd.qcut(self.df['latitude'] * self.df['longitude'], q=10, duplicates='drop')

        groupings = [
            ['region', 'water_quantity_score', 'waterpoint_type'],
            ['region', 'gps_height_bin'],
            ['region', 'lat_lon_bin'],
            ['region', 'waterpoint_type'],
            ['region']
        ]
        for group in groupings:
            med = self.df.groupby(group)['population'].transform('median')
            self.df['population'] = self.df['population'].fillna(med)

        self.df['population'] = self.df['population'].fillna(self.df['population'].median())

    def impute_gps_height_by_ward(self):
        self.df.loc[self.df['gps_height'] <= 0, 'gps_height'] = np.nan
        self.df['gps_height'] = self.df.groupby('ward')['gps_height'].transform(lambda x: x.fillna(x.median()))
        self.df['gps_height'].fillna(self.df['gps_height'].median(), inplace=True)

    def generate_interaction_features(self):
        self.df['lat_bin'] = (self.df['latitude'] * 10).round(0)
        self.df['lon_bin'] = (self.df['longitude'] * 10).round(0)
        self.df['location_bucket'] = self.df['lat_bin'].astype(str) + "_" + self.df['lon_bin'].astype(str)
        self.df['lat_long_interaction'] = self.df['latitude'] + self.df['longitude']
        self.df['pumpage_safety_inter'] = self.df['pump_age'] * self.df['water_safety_sum']
        self.df['extraction_type_class'], _ = pd.factorize(self.df['extraction_type_class'])
        self.df['quantity_extraction_inter'] = self.df['water_quantity_score'] * self.df['extraction_type_class']

    def run_all(self):
        self.impute_coords_from_shapefile()
        self.impute_subvillage_by_location()
        self.combine_funder_installer_grouped()
        self.encode_frequency()
        self.impute_construction_year()
        self.calculate_pump_age()
        self.compute_water_scores()
        self.impute_population_by_median()
        self.impute_gps_height_by_ward()
        self.generate_interaction_features()
        return self.df


In [None]:
raw_df = pd.read_csv("../data/processed/Merged_Training_Set.csv")

preprocessor = WaterPumpPreprocessor(
    raw_df,
    gadm_path="../data/external/gadm41_TZA_shp/gadm41_TZA_3.shp"
)
final_df = preprocessor.run_all()
