<a href="https://colab.research.google.com/github/charoo-rumsan/community_tool_research/blob/main/data_validation_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [48]:
!pip install phonenumbers



In [49]:
!pip install fuzzywuzzy python-Levenshtein



In [50]:
import polars as pl
import re
import json
import phonenumbers
from phonenumbers import geocoder
from pathlib import Path
from fuzzywuzzy import fuzz
from Levenshtein import ratio
import unicodedata
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline

In [65]:
# Step 1: Fuzzy matcher for column classification
def normalize_text(s):
    """Normalize text for matching."""
    if s is None:
        return ""
    s = str(s)
    s = unicodedata.normalize("NFKD", s)
    s = s.lower()
    s = re.sub(r"[^0-9A-Za-z\u0080-\uFFFF\s]", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

def token_jaccard(a, b):
    """Calculate Jaccard similarity between tokenized strings."""
    ta = set(a.split())
    tb = set(b.split())
    if not ta and not tb:
        return 0.0
    inter = len(ta & tb)
    union = len(ta | tb)
    return 0.0 if union == 0 else (inter / union) * 100.0

def fuzzy_matcher(header, standard_labels, threshold=80):
    """Match a header against standard labels using fuzzy matching."""
    header_norm = normalize_text(header)
    best_score = 0
    best_label = "other"
    best_method = ""
    best_jacc = 0

    for label in standard_labels:
        label_norm = normalize_text(label)
        scores = {
            "ratio": fuzz.ratio(header_norm, label_norm),
            "partial": fuzz.partial_ratio(header_norm, label_norm),
            "token_sort": fuzz.token_sort_ratio(header_norm, label_norm),
            "token_set": fuzz.token_set_ratio(header_norm, label_norm),
        }
        jacc = token_jaccard(header_norm, label_norm)
        len_avg = (len(header_norm) + len(label_norm)) / 2.0
        if len_avg < 20:
            combined = 0.6 * scores["ratio"] + 0.4 * scores["partial"]
            method = "ratio+partial"
        else:
            combined = 0.6 * scores["token_set"] + 0.25 * scores["token_sort"] + 0.15 * scores["partial"]
            method = "token_set/sort+partial"
            if scores["token_set"] == 100 and jacc < 8:
                combined = min(combined, 90)

        if combined > best_score:
            best_score = combined
            best_label = label
            best_method = method
            best_jacc = jacc

    return {
        "header": header,
        "matched_label": best_label if best_score >= threshold else "other",
        "score": round(best_score, 2),
        "method": best_method,
        "jaccard": round(best_jacc, 2)
    }

In [66]:
# Step 2: Define scikit-learn transformers
class ColumnClassifier(BaseEstimator, TransformerMixin):
    """Classify column headers using fuzzy matching."""
    def __init__(self):
        self.standard_labels = [
            "phone number", "citizenship number", "address", "municipality", "ward", "house number",
            "tole", "name", "age", "gender", "family members", "demographics", "gps coordinates",
            "latitude", "longitude", "ethnicity", "mother tongue"
        ]
        self.column_classifications = {}

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        """Classify columns and save to JSON."""
        self.column_classifications = {
            col: fuzzy_matcher(col, self.standard_labels)["matched_label"]
            for col in X.columns
        }
        with open('column_classifications.json', 'w', encoding='utf-8') as f:
            json.dump(self.column_classifications, f, indent=2, ensure_ascii=False)
        print("Column classifications saved to 'column_classifications.json'")
        return X

class PhoneValidator(BaseEstimator, TransformerMixin):
    """Validate phone numbers."""
    def __init__(self, phone_col='Phone number (फोन नं)'):
        self.phone_col = phone_col

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        """Validate phone numbers and add validation columns."""
        if self.phone_col not in X.columns:
            raise ValueError(f"Column '{self.phone_col}' not found in dataset")

        def check_number(phone_str, default_region="NP"):
            try:
                if phone_str is None:
                    return {"valid": False, "country": None}
                # Prepend +977 for Nepali numbers starting with 9
                cleaned_phone = re.sub(r'[^0-9+]', '', str(phone_str))
                if cleaned_phone and cleaned_phone.startswith('9') and not cleaned_phone.startswith('+'):
                    cleaned_phone = f"+977{cleaned_phone}"
                num = phonenumbers.parse(cleaned_phone, default_region)
                if not phonenumbers.is_valid_number(num):
                    return {"valid": False, "country": None}
                country = geocoder.description_for_number(num, "en")
                return {"valid": True, "country": country}
            except Exception:
                return {"valid": False, "country": None}

        phone_results = X[self.phone_col].fill_null("").map_elements(check_number, return_dtype=pl.Struct([pl.Field("valid", pl.Boolean), pl.Field("country", pl.Utf8)]))

        X = X.with_columns([
            phone_results.struct.field('valid').alias('phone_valid'),
            phone_results.struct.field('country').alias('phone_country'),
        ])

        X = X.with_columns(
            pl.when(pl.col('phone_valid'))
            .then(
                pl.when(pl.col('phone_country') == 'Nepal')
                .then(pl.lit('nepali'))
                .otherwise(pl.lit('international'))
            )
            .otherwise(pl.lit('invalid')).alias('phone_type')
        )

        return X


class CitizenshipValidator(BaseEstimator, TransformerMixin):
    """Validate citizenship numbers."""
    def __init__(self, citizenship_col='Citizenship Number of House Owner (नागरिकता नं)'):
        self.citizenship_col = citizenship_col

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        """Validate citizenship numbers and add validation columns."""
        if self.citizenship_col not in X.columns:
            raise ValueError(f"Column '{self.citizenship_col}' not found in dataset")

        def is_valid_citizenship(num):
            if num is None or not isinstance(num, str) or num.strip() == '':
                return {"valid": False, "reason": "Empty or null"}

            num_normalized = num.replace('/', '-')

            # Standard format: XX-XX-XX-XXXXX
            if re.match(r'^\d{1,2}-\d{1,2}-\d{2,3}-\d{4,7}$', num_normalized):
                parts = num_normalized.split('-')
                district = int(parts[0])
                year = int(parts[2])
                if 1 <= district <= 77 and 0 <= year <= 82:
                    return {"valid": True, "reason": "Valid standard format"}
                return {"valid": False, "reason": f"Invalid district or year: {district}, {year}"}

            # Simple / format: XXXX/XXX
            if re.match(r'^\d{4,8}/\d{3,5}$', num):
                parts = num.split('/')
                year_str = parts[0][-2:] if len(parts[0]) > 2 else parts[0]
                year = int(year_str)
                if 0 <= year <= 82:
                    return {"valid": True, "reason": "Valid slash format"}
                return {"valid": False, "reason": f"Invalid year: {year}"}

            # Plain digits: 10-12 long
            if re.match(r'^\d{10,12}$', num):
                return {"valid": True, "reason": "Valid plain digits"}

            return {"valid": False, "reason": "Invalid format"}

        citizenship_results = X[self.citizenship_col].fill_null("").map_elements(is_valid_citizenship, return_dtype=pl.Struct([pl.Field("valid", pl.Boolean), pl.Field("reason", pl.Utf8)]))

        X = X.with_columns([
            citizenship_results.struct.field('valid').alias('citizenship_valid'),
            citizenship_results.struct.field('reason').alias('citizenship_reason')
        ])
        return X

In [67]:
# Step 3: Define scikit-learn pipeline
pipeline = Pipeline([
    ('column_classifier', ColumnClassifier()),
    ('phone_validator', PhoneValidator(phone_col='Phone number (फोन नं)')),
    ('citizenship_validator', CitizenshipValidator(citizenship_col='Citizenship Number of House Owner (नागरिकता नं)'))
])


In [68]:
# Step 4: Process dataset
def process_dataset(input_path):
    """Process the dataset using the pipeline and split into valid/invalid CSVs."""
    # Load dataset
    df = pl.read_csv(input_path)
    print(f"Dataset shape: {df.shape}")
    print("Dataset head:")
    print(df.head())

    # Apply pipeline
    df_transformed = pipeline.fit_transform(df)

    # Define output columns
    output_cols = [
        'Phone number (फोन नं)', 'phone_valid', 'phone_type', 'phone_country',
        'Citizenship Number of House Owner (नागरिकता नं)', 'citizenship_valid', 'citizenship_reason'
    ]

    # Split into valid and invalid data
    valid_df = df_transformed.filter(
        (pl.col('phone_valid') == True) &
        (pl.col('citizenship_valid') == True)
    ).select(output_cols)
    invalid_df = df_transformed.filter(
        ~((pl.col('phone_valid') == True) &
          (pl.col('citizenship_valid') == True))
    ).select(output_cols)

    # Save CSVs
    valid_df.write_csv('valid_rahat_data.csv')
    invalid_df.write_csv('invalid_rahat_data.csv')
    print("Valid data saved as 'valid_rahat_data.csv'")
    print("Invalid data saved as 'invalid_rahat_data.csv'")

    # Print summary
    print("\nValidation Summary:")
    print(f"Total rows: {df_transformed.height}")
    print(f"Valid phone numbers: {df_transformed['phone_valid'].sum()}")
    print(f"Nepali phones: {df_transformed.filter(pl.col('phone_type') == 'nepali').height}")
    print(f"International phones: {df_transformed.filter(pl.col('phone_type') == 'international').height}")
    print(f"Invalid phones: {df_transformed.height - df_transformed['phone_valid'].sum()}")
    print(f"Valid citizenship numbers: {df_transformed['citizenship_valid'].sum()}")
    print(f"Invalid or empty citizenships: {df_transformed.height - df_transformed['citizenship_valid'].sum()}")
    print(f"Valid rows (both phone and citizenship valid): {valid_df.height}")
    print(f"Invalid rows (either phone or citizenship invalid): {invalid_df.height}")

    # Generate chart for validation results
    chart_config = {
        "type": "bar",
        "data": {
            "labels": ["Valid Phones", "Invalid Phones", "Valid Citizenships", "Invalid Citizenships", "Valid Rows", "Invalid Rows"],
            "datasets": [{
                "label": "Validation Counts",
                "data": [
                    df_transformed['phone_valid'].sum(),
                    df_transformed.height - df_transformed['phone_valid'].sum(),
                    df_transformed['citizenship_valid'].sum(),
                    df_transformed.height - df_transformed['citizenship_valid'].sum(),
                    valid_df.height,
                    invalid_df.height
                ],
                "backgroundColor": ["#36A2EB", "#FF6384", "#90EE90", "#FFCE56", "#4BC0C0", "#9966FF"],
                "borderColor": ["#36A2EB", "#FF6384", "#90EE90", "#FFCE56", "#4BC0C0", "#9966FF"],
                "borderWidth": 1
            }]
        },
        "options": {
            "scales": {
                "y": {"beginAtZero": True, "title": {"display": True, "text": "Count"}},
                "x": {"title": {"display": True, "text": "Validation Status"}}
            },
            "plugins": {"title": {"display": True, "text": "Phone and Citizenship Validation Results"}}
        }
    }
    with open('validation_chart.json', 'w') as f:
        json.dump(chart_config, f, indent=2)
    print("Chart configuration saved to 'validation_chart.json'")

    return df_transformed

In [69]:
# Step 5: Execute
input_path = 'first_100_rows (1) - first_100_rows (1).csv.csv'  # Provided filename
try:
    df = process_dataset(input_path)
except FileNotFoundError:
    print(f"Error: Input file '{input_path}' not found. Please provide the correct path.")

Dataset shape: (100, 353)
Dataset head:
shape: (5, 353)
┌─────┬─────────────┬─────────────┬────────────┬───┬────────┬────────────┬────────────┬────────────┐
│     ┆ start       ┆ end         ┆ today      ┆ … ┆ घरबाट  ┆ (घरबाट     ┆ (नजिकैको    ┆ (Distance  │
│ --- ┆ ---         ┆ ---         ┆ ---        ┆   ┆ स्वास्थ्य ┆ बजारको दुरी ┆ शौचालय र   ┆ to safe    │
│ i64 ┆ str         ┆ str         ┆ str        ┆   ┆ संस्थाको ┆ in METER   ┆ घरको दुरी   ┆ Shelter in │
│     ┆             ┆             ┆            ┆   ┆ दुरी …  ┆ ---        ┆ in…        ┆ M…         │
│     ┆             ┆             ┆            ┆   ┆ ---    ┆ i64        ┆ ---        ┆ ---        │
│     ┆             ┆             ┆            ┆   ┆ i64    ┆            ┆ str        ┆ i64        │
╞═════╪═════════════╪═════════════╪════════════╪═══╪════════╪════════════╪════════════╪════════════╡
│ 0   ┆ 2022-04-13T ┆ 2022-04-13T ┆ 2022-04-13 ┆ … ┆ 2      ┆ 100        ┆ null       ┆ 100        │
│     ┆ 10:14:57.94 ┆ 10:4