# River Water Quality Data Cleaning
## DKI Jakarta - Data Integration Pipeline (2015-2024)

This notebook processes and integrates river water quality data from multiple sources:
- **PDF Tables**: 2015-2019 quarterly measurements
- **PDF Table**: 2020 single measurement (October)
- **CSV Files**: 2022-2024 monthly measurements

**Output**: Unified, standardized dataset with daily interpolation

## 1. Setup & Imports

In [1]:
import pandas as pd
import geopandas as gpd
import camelot
import re
import requests
from pathlib import Path
from functools import reduce
import json
from shapely.geometry import Polygon, MultiPolygon

pd.set_option('display.max_columns', None)

## 2. Configuration

### 2.1 Station Mapping

In [2]:
stasiun_mapping = {
    'DKI1': 'Jakarta Pusat',
    'DKI2': 'Jakarta Utara',
    'DKI3': 'Jakarta Selatan',
    'DKI4': 'Jakarta Timur',
    'DKI5': 'Jakarta Barat'
}

### 2.2 Data Source Configuration

In [3]:
# PDF: 2015-2019 data separation (year: number of periods)
data_separation = {
    2015: 3,  # 3 periods (quarterly)
    2016: 2,
    2017: 3,
    2018: 4,
    2019: 4
}

# CSV files
csv_files = [
    ('data/kualitas-air-sungai/sungai2022.csv', ';'),
    ('data/kualitas-air-sungai/sungai2023.csv', ';'),
    ('data/kualitas-air-sungai/data-kualitas-air-sungai-komponen-data.csv', ',')
]

### 2.3 Parameter Standardization Dictionary

In [4]:
river_params_dict = {
    "total_dissolved_solids": {"params1": "TDS", "params2": "TDS", "params3": "ZAT PADAT TERLARUT TDS"},
    "total_suspended_solids": {"params1": "TSS", "params2": "TSS", "params3": "ZAT PADAT TERSUSPENSI TSS"},
    "ph": {"params1": "pH", "params2": "pH", "params3": "PH"},
    "biological_oxygen_demand": {"params1": "BOD", "params2": "BOD", "params3": "BOD"},
    "chemical_oxygen_demand": {"params1": "COD", "params2": "COD", "params3": "COD DICHROMAT"},
    "cadmium": {"params1": "Cd", "params2": "Cd", "params3": "KADMIUM CD"},
    "chromium_vi": {"params1": "Cr6+", "params2": "Cr6", "params3": "CROM HEXAVALEN CR6"},
    "copper": {"params1": "Cu", "params2": "Cu", "params3": "TEMBAGA CU"},
    "lead": {"params1": "Pb", "params2": "Pb", "params3": "TIMAH HITAM PB"},
    "mercury": {"params1": "Hg", "params2": "Hg", "params3": "HG"},
    "zinc": {"params1": "Zn", "params2": "Zn", "params3": "SENG ZN"},
    "oil_and_grease": {"params1": "Minyak dan Lemak", "params2": "Minyak dan Lemak", "params3": "MINYAK DAN LEMAK"},
    "mbas_detergent": {"params1": "MBAS", "params2": "MBAS", "params3": "MBAS"},
    "total_coliform": {"params1": "Bakteri Koli", "params2": "Total Coliform", "params3": "TOTAL COLIFORM"},
    "fecal_coliform": {"params1": "Bakteri Koli Tinja", "params2": "Fecal Coliform", "params3": "FECAL COLIFORM"}
}

## 3. Geographic Data Loading

### 3.1 Helper Functions for Boundary Processing

In [5]:
def parse_boundary_sql(sql_content):
    """Parse SQL file to extract boundary data."""
    lines = sql_content.split('\n')
    data = []
    
    for line in lines:
        line = line.strip()
        if line.startswith("('"):
            try:
                first_quote = line.find("'")
                second_quote = line.find("'", first_quote + 1)
                kode = line[first_quote+1:second_quote]
                
                third_quote = line.find("'", second_quote + 1)
                fourth_quote = line.find("'", third_quote + 1)
                nama = line[third_quote+1:fourth_quote]
                
                after_nama = line[fourth_quote+2:]
                parts = after_nama.split(',', 2)
                
                if len(parts) >= 3:
                    lat = parts[0].strip()
                    lng = parts[1].strip()
                    rest = parts[2]
                    path_start = rest.find("'")
                    path_end = rest.rfind("'")
                    if path_start != -1 and path_end != -1:
                        geom = rest[path_start+1:path_end]
                        data.append([kode, nama, lat, lng, geom])
            except:
                pass
    
    return pd.DataFrame(data, columns=['kode', 'nama', 'lat', 'lng', 'geom'])

In [6]:
def swap_coords(coord_list):
    """Recursively swap [lat,lng] to [lng,lat] for GeoJSON compatibility."""
    if isinstance(coord_list[0], (int, float)):
        return [coord_list[1], coord_list[0]]
    else:
        return [swap_coords(c) for c in coord_list]

In [7]:
def parse_geometry(geom_str):
    """Parse geometry string to Shapely Polygon/MultiPolygon."""
    try:
        coords = json.loads(geom_str)
        coords = swap_coords(coords)
        
        if isinstance(coords[0][0], (int, float)):
            return Polygon(coords)
        elif isinstance(coords[0][0][0], (int, float)):
            exterior = coords[0]
            holes = coords[1:] if len(coords) > 1 else []
            return Polygon(exterior, holes)
        else:
            polygons = []
            for poly_coords in coords:
                exterior = poly_coords[0]
                holes = poly_coords[1:] if len(poly_coords) > 1 else []
                polygons.append(Polygon(exterior, holes))
            return MultiPolygon(polygons)
    except:
        return None

In [8]:
def assign_to_nearest_region(gdf_points, gdf_boundaries, nama_col='kota_kabupaten'):
    """Assign unassigned points to nearest boundary."""
    unassigned_mask = gdf_points[nama_col].isna()
    unassigned_indices = gdf_points[unassigned_mask].index
    
    if len(unassigned_indices) == 0:
        return gdf_points
    
    current_counts = gdf_points[nama_col].value_counts().to_dict()
    
    for idx in unassigned_indices:
        point = gdf_points.loc[idx, 'geometry']
        distances = []
        
        for _, boundary_row in gdf_boundaries.iterrows():
            dist = point.distance(boundary_row['geometry'])
            nama = boundary_row['nama']
            count = current_counts.get(nama, 0)
            distances.append((dist, count, nama))
        
        distances.sort(key=lambda x: (x[0], x[1]))
        _, _, nearest_nama = distances[0]
        
        gdf_points.loc[idx, nama_col] = nearest_nama
        current_counts[nearest_nama] = current_counts.get(nearest_nama, 0) + 1
    
    return gdf_points

In [9]:
def add_stasiun_column(df, stasiun_mapping):
    """Add 'stasiun' column based on kota_kabupaten mapping."""
    reverse_mapping = {v: k for k, v in stasiun_mapping.items()}
    df['stasiun'] = df['kota_kabupaten'].map(reverse_mapping)
    return df

### 3.2 Load DKI Jakarta Administrative Boundaries

In [10]:
url = "https://raw.githubusercontent.com/cahyadsn/wilayah_boundaries/refs/heads/main/db/kab/wilayah_boundaries_kab_31.sql"
response = requests.get(url)
response.raise_for_status()
sql_content = response.text

# Clean SQL
clean_sql = re.sub(r'^/\*.*?\*/\s*', '', sql_content, flags=re.DOTALL | re.MULTILINE).strip()
clean_sql = re.sub(r'--.*?$', '', clean_sql, flags=re.MULTILINE)
clean_sql = re.sub(r'ENGINE=\w+\s*|DEFAULT CHARSET=\w+\s*', '', clean_sql, flags=re.IGNORECASE)
clean_sql = clean_sql.replace('`', '')

In [11]:
# Parse and filter boundaries (exclude Kepulauan Seribu)
df_batas_wilayah = parse_boundary_sql(clean_sql)
df_batas_wilayah = df_batas_wilayah[df_batas_wilayah['kode'] != '31.01'].reset_index(drop=True)

print(f"✓ Loaded {len(df_batas_wilayah)} mainland DKI Jakarta boundaries")

✓ Loaded 5 mainland DKI Jakarta boundaries


In [12]:
# Convert to GeoDataFrame
df_batas_wilayah['geometry'] = df_batas_wilayah['geom'].apply(parse_geometry)
gdf_boundaries = gpd.GeoDataFrame(
    df_batas_wilayah[df_batas_wilayah['geometry'].notna()], 
    geometry='geometry', 
    crs='EPSG:4326'
)

print(f"✓ Created GeoDataFrame with {len(gdf_boundaries)} boundaries")

✓ Created GeoDataFrame with 5 boundaries


## 4. Sampling Locations Processing

### 4.1 Extract Sampling Locations from PDF

In [13]:
sampling_locs = camelot.read_pdf("airsungai_20250702121933.pdf", pages="60-62", flavor='lattice')
print(f"✓ Extracted {len(sampling_locs)} location tables from PDF")

✓ Extracted 3 location tables from PDF


In [14]:
def correct_num_format(df, cols):
    """Convert European number format to standard format."""
    for col in cols:
        if col in df.columns:
            df[col] = (
                df[col].astype(str)
                .str.replace('.', '', regex=False)
                .str.replace(',', '.', regex=False)
                .replace(['', 'nan', 'None'], pd.NA)
            )
            df[col] = pd.to_numeric(df[col], errors='coerce')
    return df

In [15]:
sampling_locs_header = ["No","Kode","Sungai","Sub Jaringan","Alamat","DAS","Lintang (DD)","Bujur (DD)","Lintang (DMS)","Bujur (DMS)"]

slocs = []
for sloc in sampling_locs:
    df_sloc = sloc.df.iloc[2:]
    df_sloc.columns = sampling_locs_header
    df_sloc = df_sloc[['Kode', 'Lintang (DD)', 'Bujur (DD)']].rename(columns={'Lintang (DD)': 'lat', 'Bujur (DD)': 'lng'})
    df_sloc = correct_num_format(df_sloc, ['lat', 'lng'])
    slocs.append(df_sloc)

slocs = pd.concat(slocs, ignore_index=True)
print(f"✓ Processed {len(slocs)} sampling locations")

✓ Processed 111 sampling locations


### 4.2 Assign Sampling Locations to Stations

In [16]:
# Convert to GeoDataFrame and perform spatial join
gdf_slocs = gpd.GeoDataFrame(slocs, geometry=gpd.points_from_xy(slocs.lng, slocs.lat), crs='EPSG:4326')

gdf_slocs_with_region = gpd.sjoin(gdf_slocs, gdf_boundaries[['nama', 'geometry']], how='left', predicate='within')

if 'index_right' in gdf_slocs_with_region.columns:
    gdf_slocs_with_region = gdf_slocs_with_region.drop(columns=['index_right'])

gdf_slocs_with_region = gdf_slocs_with_region.rename(columns={'nama': 'kota_kabupaten'})

In [17]:
# Assign to nearest boundary and add stasiun codes
gdf_slocs_with_region = assign_to_nearest_region(gdf_slocs_with_region, gdf_boundaries, nama_col='kota_kabupaten')

slocs_with_stasiun = pd.DataFrame(gdf_slocs_with_region.drop(columns=['lat', 'lng', 'geometry']))
slocs_with_stasiun['kota_kabupaten'] = slocs_with_stasiun['kota_kabupaten'].str.replace('Kota Administrasi ', '')
slocs_with_stasiun = add_stasiun_column(slocs_with_stasiun, stasiun_mapping).drop(columns=['kota_kabupaten'])

print(f"\n✓ All {len(slocs_with_stasiun)} sampling locations assigned to stations")
print(slocs_with_stasiun['stasiun'].value_counts().sort_index())


✓ All 112 sampling locations assigned to stations
stasiun
DKI1    12
DKI2    14
DKI3    27
DKI4    36
DKI5    23
Name: count, dtype: int64


## 5. PDF Table Loading

### 5.1 Load PDF Tables (2015-2019)

In [18]:
tables = camelot.read_pdf("airsungai_20250702121933.pdf", pages="66-113", flavor='lattice')
print(f"✓ Loaded {len(tables)} tables from PDF (2015-2019)")

✓ Loaded 48 tables from PDF (2015-2019)


### 5.2 Load PDF Table (2020)

In [19]:
table2020 = camelot.read_pdf("airsungai_20250702121933.pdf", pages="115", flavor='lattice')
print(f"✓ Loaded 2020 table (split across {len(table2020)} pages)")

✓ Loaded 2020 table (split across 2 pages)


## 6. Helper Functions

### 6.1 Number Format Correction

In [20]:
def handle_range_values(value):
    """Handle range values like '<0.002', '>10', '≤5', etc."""
    if pd.isna(value) or value == '':
        return value
    value_str = str(value)
    value_str = re.sub(r'^[<>≤≥\s]+|[<>=≤≥\s]+$', '', value_str)
    return value_str

In [21]:
def correct_european_number_format(df, cols):
    """Convert European number format and handle range values."""
    for col in cols:
        if col in df.columns:
            df[col] = df[col].apply(handle_range_values)
            df[col] = (
                df[col].astype(str)
                .str.replace('.', '', regex=False)
                .str.replace(',', '.', regex=False)
                .replace(['', 'nan', 'None'], pd.NA)
            )
            df[col] = pd.to_numeric(df[col], errors='coerce')
    return df

### 6.2 Data Structure Helpers

In [22]:
def spatial_join_with_boundaries(df, lat_col, lng_col, gdf_boundaries, stasiun_mapping):
    """Perform spatial join to assign stations to coordinates."""
    gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df[lng_col], df[lat_col]), crs='EPSG:4326')
    
    gdf_with_region = gpd.sjoin(gdf, gdf_boundaries[['nama', 'geometry']], how='left', predicate='within')
    
    if 'index_right' in gdf_with_region.columns:
        gdf_with_region = gdf_with_region.drop(columns=['index_right'])
    
    gdf_with_region = gdf_with_region.rename(columns={'nama': 'kota_kabupaten'})
    gdf_with_region = assign_to_nearest_region(gdf_with_region, gdf_boundaries, nama_col='kota_kabupaten')
    
    df_with_region = pd.DataFrame(gdf_with_region.drop(columns=['geometry']))
    df_with_region['kota_kabupaten'] = df_with_region['kota_kabupaten'].str.replace('Kota Administrasi ', '')
    df_with_region = add_stasiun_column(df_with_region, stasiun_mapping)
    
    return df_with_region

In [23]:
def pivot_and_merge_parameters(df, index_columns, parameter_column, value_column, param_types):
    """Pivot and merge different parameter types."""
    dfs = []
    for param_type in param_types:
        df_filtered = df[df['jenis_parameter'] == param_type]
        df_pivoted = df_filtered.pivot_table(
            index=index_columns,
            columns=parameter_column,
            values=value_column
        )
        df_pivoted = df_pivoted.reset_index()
        dfs.append(df_pivoted)
    
    df_merged = reduce(lambda left, right: pd.merge(left, right, on=index_columns, how='inner'), dfs)
    df_merged.columns.name = None
    
    return df_merged

In [24]:
def create_date_skeleton(years, stations):
    """Create skeleton dataframe with all date-station combinations."""
    skeleton_data = []
    
    for year in years:
        date_range = pd.date_range(start=f'{year}-01-01', end=f'{year}-12-31', freq='D')
        for date in date_range:
            for stasiun_code in stations:
                skeleton_data.append({'tanggal': date, 'stasiun': stasiun_code})
    
    skeleton_df = pd.DataFrame(skeleton_data)
    return skeleton_df.sort_values(['tanggal', 'stasiun']).reset_index(drop=True)

In [25]:
def expand_temporal_data(df, skeleton, value_columns):
    """Expand sparse data to daily using forward/backward fill."""
    df_expanded = skeleton.merge(df[['tanggal', 'stasiun'] + value_columns], on=['tanggal', 'stasiun'], how='left')
    df_expanded[value_columns] = df_expanded.groupby('stasiun')[value_columns].transform(lambda x: x.bfill().ffill())
    return df_expanded

## 7. Data Processing Functions

### 7.1 PDF Data Processing (2015-2019)

In [26]:
def process_pdf_river_data(tables, data_separation, slocs_with_stasiun, river_params_dict):
    """Process river quality data from PDF tables (2015-2019)."""
    numeric_cols = ["TDS", "TSS", "pH", "BOD", "COD", "Cd", "Cr6+", "Cu", "Pb", "Hg", "Zn", 
                    "Minyak dan Lemak", "MBAS", "Bakteri Koli", "Bakteri Koli Tinja"]
    headers = ["no", "Kode", "Sungai", "DAS", *numeric_cols]
    param_rename_map = {v['params1']: k for k, v in river_params_dict.items()}
    
    processed_tables = 0
    tabs = []
    
    for year, num_periods in data_separation.items():
        for period in range(1, num_periods + 1):
            table_indices = [processed_tables, processed_tables + 1, processed_tables + 2]
            period_dfs = []
            
            for idx in table_indices:
                table = tables[idx].df.iloc[5:].copy()
                table.columns = headers
                table = correct_european_number_format(table.drop(columns=["no"]), numeric_cols)
                table = table.merge(slocs_with_stasiun[['Kode', 'stasiun']], on='Kode', how='left')
                table = table.rename(columns=param_rename_map)
                period_dfs.append(table)
            
            period_combined = pd.concat(period_dfs, ignore_index=True)
            period_agg = period_combined.groupby('stasiun').agg({col: 'mean' for col in param_rename_map.values()}).reset_index()
            period_agg['tahun'] = year
            period_agg['periode'] = period
            tabs.append(period_agg)
            processed_tables += 3
    
    river_quality_all = pd.concat(tabs, ignore_index=True)
    
    # Add date column
    month_map = {1: 3, 2: 6, 3: 9, 4: 12}
    river_quality_all['bulan'] = river_quality_all['periode'].map(month_map)
    river_quality_all['tanggal'] = pd.to_datetime(
        river_quality_all['tahun'].astype(str) + '-' + river_quality_all['bulan'].astype(str) + '-15'
    )
    
    param_cols = list(param_rename_map.values())
    return river_quality_all[['tanggal', 'stasiun'] + param_cols]

### 7.2 2020 Data Processing

In [27]:
def process_2020_river_data(table2020, slocs_with_stasiun, river_params_dict):
    """Process river quality data from 2020 PDF table (single October measurement)."""
    table2020_params = [
        ["Suhu", "TDS", "TSS", "pH", "BOD", "COD", "Total-P", "NO3", "Cd", "Cr6+", "Cu", "Pb"], 
        ["Hg", "Zn", "Flourida", "NO2", "Klorin Bebas", "H2S", "Minyak dan Lemak", "MBAS", "Fenol", "Bakteri Koli Tinja", "Bakteri Koli"]
    ]
    table2020_index = ["No", "Kode", "Sungai", "DAS"]
    
    # Parse both tables
    sungai2020 = []
    for t in range(len(table2020)):
        df = table2020[t].df.iloc[5:].copy()
        df.columns = table2020_index + table2020_params[t]
        sungai2020.append(df)
    
    # Merge and process
    sungai2020_df = reduce(lambda left, right: pd.merge(left, right, on=table2020_index, how='inner'), sungai2020)
    all_params = [col for sublist in table2020_params for col in sublist]
    sungai2020_df = correct_european_number_format(sungai2020_df, all_params)
    sungai2020_df = sungai2020_df.merge(slocs_with_stasiun[['Kode', 'stasiun']], on='Kode', how='left')
    
    # Standardize parameter names
    param_rename_map = {v['params1']: k for k, v in river_params_dict.items()}
    sungai2020_df = sungai2020_df.rename(columns=param_rename_map)
    
    # Aggregate by station
    agg_cols = [col for col in sungai2020_df.columns if col in river_params_dict.keys()]
    sungai2020_agg = sungai2020_df.groupby('stasiun').agg({col: 'mean' for col in agg_cols}).reset_index()
    sungai2020_agg['tanggal'] = pd.to_datetime('2020-10-15')
    
    return sungai2020_agg[['tanggal', 'stasiun'] + agg_cols]

### 7.3 CSV Data Processing (2022-2024)

In [28]:
def process_csv_river_data(csv_file, gdf_boundaries, stasiun_mapping, river_params_dict, delimiter=';'):
    """Process river quality data from CSV file."""
    df = pd.read_csv(csv_file, sep=delimiter)
    
    # Standardize column names
    rename_map = {}
    if 'latitude' in df.columns:
        rename_map['latitude'] = 'lintang'
    elif 'lintang_selatan' in df.columns:
        rename_map['lintang_selatan'] = 'lintang'
    
    if 'longitude' in df.columns:
        rename_map['longitude'] = 'bujur'
    elif 'bujur_timur' in df.columns:
        rename_map['bujur_timur'] = 'bujur'
    
    if 'periode_data' in df.columns and 'tahun' not in df.columns:
        rename_map['periode_data'] = 'tahun'
    
    if rename_map:
        df = df.rename(columns=rename_map)
    
    if 'periode_data' in df.columns and 'tahun' in df.columns:
        df = df.drop(columns=['periode_data'])
    
    # Correct number formats
    df = correct_european_number_format(df, ['lintang', 'bujur', 'hasil_pengukuran'])
    
    required_cols = ['tahun', 'bulan_sampling', 'lintang', 'bujur', 'jenis_parameter', 'parameter', 'hasil_pengukuran']
    df = df[required_cols]
    
    # Aggregate duplicates
    df = df.groupby(['tahun', 'bulan_sampling', 'lintang', 'bujur', 'jenis_parameter', 'parameter'], as_index=False)['hasil_pengukuran'].mean()
    
    # Pivot parameter types
    param_types = ['KIMIA', 'FISIKA', 'BIOLOGI']
    index_cols = ['tahun', 'bulan_sampling', 'lintang', 'bujur']
    df_pivoted = pivot_and_merge_parameters(df, index_cols, 'parameter', 'hasil_pengukuran', param_types)
    
    # Standardize parameter names
    param_rename_map = {v['params3']: k for k, v in river_params_dict.items()}
    df_pivoted = df_pivoted.rename(columns=param_rename_map)
    available_params = [col for col in df_pivoted.columns if col in river_params_dict.keys()]
    df_pivoted = df_pivoted[index_cols + available_params]
    
    # Create date column
    if df_pivoted['bulan_sampling'].astype(str).str.len().max() >= 6:
        df_pivoted['tanggal'] = (
            df_pivoted['bulan_sampling'].astype(str).str[:4] + '-' + 
            df_pivoted['bulan_sampling'].astype(str).str[4:] + '-15'
        )
    else:
        df_pivoted['tanggal'] = (
            df_pivoted['tahun'].astype(str) + '-' + 
            df_pivoted['bulan_sampling'].astype(str).str.zfill(2) + '-15'
        )
    
    df_pivoted['tanggal'] = pd.to_datetime(df_pivoted['tanggal'], format='%Y-%m-%d')
    
    # Spatial join to assign stations
    df_with_stasiun = spatial_join_with_boundaries(df_pivoted, 'lintang', 'bujur', gdf_boundaries, stasiun_mapping)
    
    # Drop unnecessary columns
    cols_to_drop = ['tahun', 'bulan_sampling', 'lintang', 'bujur', 'kota_kabupaten']
    df_with_stasiun = df_with_stasiun.drop(columns=[col for col in cols_to_drop if col in df_with_stasiun.columns])
    
    param_cols = [col for col in df_with_stasiun.columns if col not in ['tanggal', 'stasiun']]
    return df_with_stasiun[['tanggal', 'stasiun'] + param_cols]

## 8. Unified Data Integration Pipeline

In [29]:
print("="*70)
print("RIVER WATER QUALITY DATA INTEGRATION PIPELINE")
print("="*70)

RIVER WATER QUALITY DATA INTEGRATION PIPELINE


### Step 1: Process PDF Data (2015-2019)

In [30]:
print("\n[1/6] Processing PDF data (2015-2019)...")
river_pdf = process_pdf_river_data(tables, data_separation, slocs_with_stasiun, river_params_dict)
print(f"  → Shape: {river_pdf.shape}")
print(f"  → Date range: {river_pdf['tanggal'].min()} to {river_pdf['tanggal'].max()}")


[1/6] Processing PDF data (2015-2019)...
  → Shape: (80, 17)
  → Date range: 2015-03-15 00:00:00 to 2019-12-15 00:00:00


### Step 2: Process 2020 Data

In [31]:
print("\n[2/6] Processing 2020 PDF data...")
river_2020 = process_2020_river_data(table2020, slocs_with_stasiun, river_params_dict)
print(f"  → Shape: {river_2020.shape}")
print(f"  → Date: {river_2020['tanggal'].unique()[0]}")


[2/6] Processing 2020 PDF data...
  → Shape: (3, 17)
  → Date: 2020-10-15 00:00:00


### Step 3: Process CSV Files (2022-2024)

In [32]:
print("\n[3/6] Processing CSV files...")
data_dir = Path('data/kualitas-air-sungai')

csv_files_full = [
    (data_dir / 'sungai2022.csv', ';'),
    (data_dir / 'sungai2023.csv', ';'),
    (data_dir / 'data-kualitas-air-sungai-komponen-data.csv', ',')
]

river_csv_list = []
for csv_file, delimiter in csv_files_full:
    df_csv = process_csv_river_data(str(csv_file), gdf_boundaries, stasiun_mapping, river_params_dict, delimiter)
    river_csv_list.append(df_csv)
    print(f"  → {csv_file.name}: {df_csv.shape}")

river_csv = pd.concat(river_csv_list, ignore_index=True)
print(f"  → Combined: {river_csv.shape}")
print(f"  → Date range: {river_csv['tanggal'].min()} to {river_csv['tanggal'].max()}")


[3/6] Processing CSV files...
  → sungai2022.csv: (478, 17)
  → sungai2023.csv: (459, 9)
  → data-kualitas-air-sungai-komponen-data.csv: (0, 2)
  → Combined: (937, 17)
  → Date range: 2022-03-15 00:00:00 to 2023-09-15 00:00:00


### Step 4: Align Data Structures

In [33]:
print("\n[4/6] Aligning data structures...")

all_params = set(river_params_dict.keys())
pdf_params = set(river_pdf.columns) - {'tanggal', 'stasiun'}
params_2020 = set(river_2020.columns) - {'tanggal', 'stasiun'}
csv_params = set(river_csv.columns) - {'tanggal', 'stasiun'}

print(f"  → PDF (2015-2019): {len(pdf_params)} parameters")
print(f"  → 2020: {len(params_2020)} parameters")
print(f"  → CSV (2022-2024): {len(csv_params)} parameters")
print(f"  → Common across all: {len(pdf_params & params_2020 & csv_params)} parameters")


[4/6] Aligning data structures...
  → PDF (2015-2019): 15 parameters
  → 2020: 15 parameters
  → CSV (2022-2024): 15 parameters
  → Common across all: 15 parameters


In [34]:
# Add missing columns
for param in all_params:
    if param not in river_pdf.columns:
        river_pdf[param] = pd.NA
    if param not in river_2020.columns:
        river_2020[param] = pd.NA
    if param not in river_csv.columns:
        river_csv[param] = pd.NA

# Ensure same column order
column_order = ['tanggal', 'stasiun'] + sorted(all_params)
river_pdf = river_pdf[column_order]
river_2020 = river_2020[column_order]
river_csv = river_csv[column_order]

print("  ✓ Column structures aligned")

  ✓ Column structures aligned


### Step 5: Combine All Data Sources

In [35]:
print("\n[5/6] Combining all data sources...")

river_all = pd.concat([river_pdf, river_2020, river_csv], ignore_index=True)
river_all = river_all.sort_values(['tanggal', 'stasiun']).reset_index(drop=True)

print(f"  → Combined shape: {river_all.shape}")
print(f"  → Date range: {river_all['tanggal'].min()} to {river_all['tanggal'].max()}")
print(f"  → Unique dates: {river_all['tanggal'].nunique()}")
print(f"  → Records by station:")
for stasiun in sorted(river_all['stasiun'].unique()):
    count = (river_all['stasiun'] == stasiun).sum()
    print(f"     {stasiun}: {count}")


[5/6] Combining all data sources...
  → Combined shape: (1020, 17)
  → Date range: 2015-03-15 00:00:00 to 2023-09-15 00:00:00
  → Unique dates: 25
  → Records by station:
     DKI1: 74
     DKI2: 85
     DKI3: 208
     DKI4: 539
     DKI5: 114


### Step 6: Create Expanded Daily Dataset

In [36]:
print("\n[6/6] Creating expanded daily dataset...")

years = river_all['tanggal'].dt.year.unique()
skeleton = create_date_skeleton(years, list(stasiun_mapping.keys()))
print(f"  → Skeleton: {skeleton.shape}")

value_cols = [col for col in river_all.columns if col not in ['tanggal', 'stasiun']]
river_expanded = expand_temporal_data(river_all, skeleton, value_cols)

print(f"  → Expanded: {river_expanded.shape}")
print(f"  → Daily records per station: {len(river_expanded) // len(stasiun_mapping)}")


[6/6] Creating expanded daily dataset...
  → Skeleton: (14610, 2)
  → Expanded: (15511, 17)
  → Daily records per station: 3102


### Pipeline Summary

In [37]:
print("\n" + "="*70)
print("PROCESSING COMPLETE")
print("="*70)
print(f"\nDatasets created:")
print(f"  1. river_all      : {river_all.shape} (sparse, actual measurements)")
print(f"  2. river_expanded : {river_expanded.shape} (daily, filled)")
print(f"\nDate coverage: {river_all['tanggal'].min().date()} to {river_all['tanggal'].max().date()}")
print(f"Stations: {sorted(river_all['stasiun'].unique())}")
print(f"Parameters: {len(value_cols)}")
print("="*70)


PROCESSING COMPLETE

Datasets created:
  1. river_all      : (1020, 17) (sparse, actual measurements)
  2. river_expanded : (15511, 17) (daily, filled)

Date coverage: 2015-03-15 to 2023-09-15
Stations: ['DKI1', 'DKI2', 'DKI3', 'DKI4', 'DKI5']
Parameters: 15


## 9. Data Quality Check

In [38]:
print("="*70)
print("DATA QUALITY CHECK")
print("="*70)

DATA QUALITY CHECK


In [39]:
# Missing values in sparse data
print("\n[1] Missing values in river_all (sparse data):")
missing_counts = river_all[value_cols].isnull().sum()
missing_pct = (missing_counts / len(river_all) * 100).round(2)
missing_df = pd.DataFrame({
    'Parameter': missing_counts.index,
    'Missing Count': missing_counts.values,
    'Missing %': missing_pct.values
}).sort_values('Missing %', ascending=False)

print(missing_df[missing_df['Missing Count'] > 0].head(10).to_string(index=False))


[1] Missing values in river_all (sparse data):
             Parameter  Missing Count  Missing %
                  lead            526      51.57
total_dissolved_solids            465      45.59
total_suspended_solids            460      45.10
                copper            459      45.00
           chromium_vi            459      45.00
chemical_oxygen_demand            459      45.00
               cadmium            459      45.00
                  zinc            459      45.00
        mbas_detergent            119      11.67
                    ph              5       0.49


In [40]:
# Missing values in expanded data
print("\n[2] Missing values in river_expanded (daily data):")
missing_expanded = river_expanded[value_cols].isnull().sum()
if missing_expanded.sum() > 0:
    print(missing_expanded[missing_expanded > 0].head(10))
else:
    print("  ✓ No missing values (forward/backward fill successful)")


[2] Missing values in river_expanded (daily data):
  ✓ No missing values (forward/backward fill successful)


In [41]:
# Data completeness by year
print("\n[3] Data completeness by year:")
river_all['year'] = river_all['tanggal'].dt.year
completeness = river_all.groupby('year').agg({
    'tanggal': 'count',
    'stasiun': lambda x: x.nunique()
}).rename(columns={'tanggal': 'records', 'stasiun': 'stations'})
print(completeness)


[3] Data completeness by year:
      records  stations
year                   
2015       15         5
2016       10         5
2017       15         5
2018       20         5
2019       20         5
2020        3         3
2022      478         5
2023      459         5


In [42]:
# Duplicate records
print("\n[4] Duplicate records check:")
duplicates = river_all[river_all.duplicated(subset=['tanggal', 'stasiun'], keep=False)]
print(f"  → Duplicate date-station pairs: {len(duplicates)}")
if len(duplicates) > 0:
    print("\n  Sample duplicates:")
    print(duplicates[['tanggal', 'stasiun']].head(10).to_string(index=False))

print("\n" + "="*70)


[4] Duplicate records check:
  → Duplicate date-station pairs: 929

  Sample duplicates:
   tanggal stasiun
2022-03-15    DKI1
2022-03-15    DKI1
2022-03-15    DKI1
2022-03-15    DKI1
2022-03-15    DKI1
2022-03-15    DKI1
2022-03-15    DKI1
2022-03-15    DKI1
2022-03-15    DKI1
2022-03-15    DKI1



## 10. Export Results

In [43]:
output_daily = 'river_quality_2015-2024.csv'
river_expanded.to_csv(output_daily, index=False)

print("\n" + "="*70)
print("EXPORT COMPLETE")
print("="*70)
print(f"\n✓ Exported daily data to: {output_daily}")
print(f"  Shape: {river_expanded.shape}")
print(f"  Date range: {river_expanded['tanggal'].min().date()} to {river_expanded['tanggal'].max().date()}")
print(f"  Stations: {sorted(river_expanded['stasiun'].unique())}")
print(f"  Parameters: {len(value_cols)}")
print("\n" + "="*70)


EXPORT COMPLETE

✓ Exported daily data to: river_quality_2015-2024.csv
  Shape: (15511, 17)
  Date range: 2015-01-01 to 2023-12-31
  Stations: ['DKI1', 'DKI2', 'DKI3', 'DKI4', 'DKI5']
  Parameters: 15

