# Preprocessing Functions

In [6]:
import inspect
import re
import pandas as pd
from sklearn.preprocessing import OneHotEncoder

from pyspark.sql import functions as F
from pyspark.sql.functions import split, explode, trim, regexp_replace, to_date, datediff, months_between, abs as Fabs, floor, col, lit


# ==============================
# 1. Preprocessing Functions
# ==============================

def preprocess_non_encoded(df):
    """
    Preprocess dataset without one-hot encoding.
    - Fill missing categorical values with the most frequent value.
    - Fill missing numerical values with the median.
    - If a column contains space-separated dates (e.g., "01/01/2021 01/01/2022"),
      split them into separate rows.
    """
    df = df.copy()

    # Step 1: Handle missing values
    for col in df.columns:
        if df[col].dtype == 'object':  # Categorical columns
            df[col].fillna(df[col].mode()[0], inplace=True)  # Fill with most frequent value
        else:  # Numerical columns
            df[col].fillna(df[col].median(), inplace=True)  # Fill with median

    # Step 2: Check for space-separated date patterns and split into rows
    for col in df.select_dtypes(include=['object']).columns:
        if df[col].str.contains(r'\d{2}/\d{2}/\d{4}').any():
            df[col] = df[col].str.split()
            df = df.explode(col).reset_index(drop=True)

    return df



def compute_months_diff(later, earlier):
    """Return full months difference between two datetimes (later - earlier)."""
    if pd.isna(later) or pd.isna(earlier):
        return pd.NA
    years = later.year - earlier.year
    months = later.month - earlier.month
    total = years * 12 + months
    # subtract one month if the day-of-month in `later` is earlier than `earlier`
    if later.day < earlier.day:
        total -= 1
    return total

def parse_stay(cell):
    """Return (days, location) based on rules:
       - 'No' or 'No Answer' -> (0, same text)
       - '<n> week(s)' -> days = n*7
       - '<n> day(s)'  -> days = n
       - If there is additional text besides the duration, treat that as location
       - Pure duration only => location = pd.NA
       - Unparsable => days = pd.NA
    """
    if pd.isna(cell):
        return pd.NA, pd.NA

    s = str(cell).strip()
    sl = s.lower().strip()

    # treat common no-values
    if sl in {"no", "no answer"}:
        return 0, s  # days 0, location same string

    # find a week or day duration anywhere
    week_m = re.search(r'(\d+)\s*weeks?\b', s, flags=re.IGNORECASE)
    day_m  = re.search(r'(\d+)\s*days?\b', s, flags=re.IGNORECASE)

    days = pd.NA
    matched_spans = []
    if week_m:
        weeks = int(week_m.group(1))
        days = weeks * 7
        matched_spans.append(week_m.span())
    elif day_m:
        days = int(day_m.group(1))
        matched_spans.append(day_m.span())

    # Remove any matched duration tokens to try to extract a location
    loc_candidate = s
    # Remove all duration patterns (e.g., "2 weeks", "3 days", "2week", "1weeks")
    loc_candidate = re.sub(r'\d+\s*weeks?\b', '', loc_candidate, flags=re.IGNORECASE)
    loc_candidate = re.sub(r'\d+\s*days?\b', '', loc_candidate, flags=re.IGNORECASE)
    # remove common separators leftover
    loc_candidate = re.sub(r'[,;:/\-]+', ' ', loc_candidate)
    loc_candidate = re.sub(r'\s+', ' ', loc_candidate).strip()

    # Decide location: show it when there is non-empty text besides a pure duration
    if loc_candidate and loc_candidate.lower() not in {"", "nan"}:
        location = loc_candidate
    else:
        # No extra text; for pure duration we want location = NaN
        location = pd.NA

    # Special rule: if the original was exactly '1 weeks' (or any pure duration) then location should be null
    # already handled by loc_candidate being empty -> pd.NA

    return (int(days) if pd.notna(days) else pd.NA), location

def explode_and_diff(df, col1, col2):
    df = df.copy()

    # Normalize and replace common separators with a single space
    df[col2] = df[col2].astype(str).replace({r'[,;]': ' '}, regex=True)
    df[col2] = df[col2].str.replace(r'\s+', ' ', regex=True).str.strip()
    df.loc[df[col2] == '', col2] = pd.NA

    # Split into lists and explode into separate rows
    df[col2 + "_list"] = df[col2].dropna().str.split(' ')
    df_exp = df.explode(col2 + "_list").reset_index().rename(columns={'index': '_orig_index'})

    # Parse dates (use dayfirst=True for dd/mm/yyyy)
    df_exp[col1 + "_dt"] = pd.to_datetime(df_exp[col1], dayfirst=True, errors='coerce')
    df_exp[col2 + "_dt"] = pd.to_datetime(df_exp[col2 + "_list"], dayfirst=True, errors='coerce')

    # Signed differences
    df_exp['days_diff_signed'] = (df_exp[col1 + "_dt"] - df_exp[col2 + "_dt"]).dt.days
    df_exp['months_diff_signed'] = df_exp.apply(
        lambda r: compute_months_diff(r[col1 + "_dt"], r[col2 + "_dt"]),
        axis=1
    )

    # Convert negatives to positive while preserving NA
    df_exp['Days_diff'] = df_exp['days_diff_signed'].apply(lambda x: abs(x) if pd.notna(x) else pd.NA)
    df_exp['Months_diff'] = df_exp['months_diff_signed'].apply(lambda x: abs(x) if pd.notna(x) else pd.NA)

    # If you don't want to keep signed columns, drop them:
    df_exp = df_exp.drop(columns=['days_diff_signed', 'months_diff_signed'])
    df_exp.fillna(0)

    colname = "How would you describe the severity of your malaria symptoms during your most recent diagnosis?"
    pattern = r'^(.*?)\s*\((.*?)\)\s*$'
    s = df[colname].astype(str)
    extracted = s.str.extract(pattern)

    # create new columns
    df_exp['Severity_Main'] = extracted[0].fillna(df_exp[colname]).str.strip()
    df_exp['Severity_Type'] = extracted[1].replace({r'^\s*$': None}, regex=True).str.strip()

    df_exp['Severity_Type'] = df_exp['Severity_Type'].where(df_exp['Severity_Type'] != '', pd.NA)

    # Apply parser and create new columns
    parsed = df_exp['If Yes, specify the location(s) and duration of stay'].apply(parse_stay)
    df_exp[['Stay_Numbr_Days', 'Stay_Location']] = pd.DataFrame(parsed.tolist(), index=df_exp.index)

    # choose integer nullable dtype for days
    df_exp['Stay_Numbr_Days'] = df_exp['Stay_Numbr_Days'].astype('Int64')

    parsed = df_exp[col].apply(parse_months)
    df_exp[['months_list', 'months_num_list', 'season_label']] = pd.DataFrame(parsed.tolist(), index=df.index)

    df_exp.fillna(0)

    return df_exp

def preprocess_one_hot(df):
    """
    Preprocess dataset and one-hot encode categorical columns.

    Steps:
    - Convert list/tuple/set entries in object columns to comma-separated strings
    - Fill missing categorical values with the mode (most frequent value)
    - Fill missing numeric values with the median
    - Skip datetime columns from encoding
    - One-hot encode categorical columns
    """
    import pandas as pd
    import inspect
    from sklearn.preprocessing import OneHotEncoder

    df = df.copy()

    # Convert lists/tuples/sets in object columns to strings
    for col in df.select_dtypes(include=['object']).columns:
        df[col] = df[col].apply(lambda x: ','.join(map(str, x)) if isinstance(x, (list, tuple, set)) else x)

    # Fill missing values
    for col in df.columns:
        if pd.api.types.is_object_dtype(df[col]):
            mode_vals = df[col].mode(dropna=True)
            fill_val = mode_vals.iloc[0] if not mode_vals.empty else ''
            df[col] = df[col].fillna(fill_val)
        elif pd.api.types.is_numeric_dtype(df[col]):
            df[col] = df[col].fillna(df[col].median())
        elif pd.api.types.is_datetime64_any_dtype(df[col]):
            # Fill missing dates with earliest available date
            min_date = df[col].min()
            df[col] = df[col].fillna(min_date)

    # Identify categorical columns (excluding datetime)
    categorical_cols = [
        col for col in df.select_dtypes(include=['object']).columns
    ]

    if not categorical_cols:
        return df

    # Configure OneHotEncoder for sklearn version compatibility
    params = {'handle_unknown': 'ignore'}
    sig = inspect.signature(OneHotEncoder)
    if 'sparse_output' in sig.parameters:
        params['sparse_output'] = False
    else:
        params['sparse'] = False

    encoder = OneHotEncoder(**params)
    encoded = encoder.fit_transform(df[categorical_cols])

    if hasattr(encoded, "toarray"):
        encoded = encoded.toarray()

    encoded_df = pd.DataFrame(
        encoded,
        columns=encoder.get_feature_names_out(categorical_cols),
        index=df.index
    )

    # Combine with non-categorical data
    df_out = pd.concat([df.drop(columns=categorical_cols), encoded_df], axis=1)
    return df_out


In [2]:
col = "If Yes, specify months"   # replace with your real column name

# canonical month order and maps
months = ["January","February","March","April","May","June",
          "July","August","September","October","November","December"]
name_to_num = {m.lower(): i+1 for i, m in enumerate(months)}
abbr_to_full = {
    'jan':'January','feb':'February','mar':'March','apr':'April','may':'May','jun':'June',
    'jul':'July','aug':'August','sep':'September','sept':'September','oct':'October',
    'nov':'November','dec':'December'
}

month_pattern = r'\b(' + '|'.join([m.lower() for m in months] + list(abbr_to_full.keys())) + r')\b'
month_regex = re.compile(month_pattern, flags=re.IGNORECASE)

def normalize_token(tok):
    tok = tok.strip().strip(' ,;:-').lower()
    if tok in abbr_to_full:
        return abbr_to_full[tok]
    # full month spelled out?
    for m in months:
        if tok == m.lower():
            return m
    return None

def expand_range(start_name, end_name):
    """Return inclusive list of month names between start and end, wrapping year if needed."""
    s = name_to_num[start_name.lower()]
    e = name_to_num[end_name.lower()]
    if s <= e:
        nums = list(range(s, e+1))
    else:  # wrap around (e.g., Nov-Feb)
        nums = list(range(s, 13)) + list(range(1, e+1))
    return [months[n-1] for n in nums]

def parse_months(cell):
    if pd.isna(cell):
        return pd.NA, pd.NA, pd.NA  # months_list, months_num_list, season_label

    s = str(cell).strip()

    # treat obvious unknown answers
    if s.lower() in {"i don't know", "dont know", "dont know.", "no", "no answer", "unknown", "i dont know"}:
        return pd.NA, pd.NA, s

    # standardize separators and remove trailing commas
    s_clean = re.sub(r'[,;/]+', ' ', s)
    s_clean = re.sub(r'\s+', ' ', s_clean).strip()

    # 1) direct month names anywhere
    found = month_regex.findall(s_clean)  # list of matches
    found = [normalize_token(x) for x in found if normalize_token(x) is not None]

    # If we have two month names and a hyphen in original, treat it as a range and expand
    hyphen_range_match = re.search(r'([A-Za-z]+)\s*[-–]\s*([A-Za-z]+)', s_clean)
    if hyphen_range_match:
        a = normalize_token(hyphen_range_match.group(1))
        b = normalize_token(hyphen_range_match.group(2))
        if a and b:
            expanded = expand_range(a, b)
            nums = [name_to_num[m.lower()] for m in expanded]
            return expanded, nums, pd.NA

    # If multiple explicit months found (commas or spaces), return unique ordered list
    if found:
        # remove duplicates while preserving first-occurrence order
        seen = []
        for m in found:
            if m not in seen:
                seen.append(m)
        nums = [name_to_num[m.lower()] for m in seen]
        return seen, nums, pd.NA

    # no explicit month names found: capture season/words like 'Harmattan', 'Raining', etc.
    season_keywords = ['harmattan', 'raining', 'rainy', 'rain', 'dry season', 'wet season', 'monsoon']
    lower = s.lower()
    for kw in season_keywords:
        if kw in lower:
            return pd.NA, pd.NA, s  # put original text in season_label

    # fallback: nothing useful found
    return pd.NA, pd.NA, s  # keep original as season_label so you can inspect



# One-hot Encoding

In [3]:
# ==============================
# 2. Load and Process Dataset
# ==============================

# Load dataset (Upload manually in Colab)
from google.colab import files
uploaded = files.upload()

# Get file name (assuming CSV format)
file_name = list(uploaded.keys())[0]

# Read dataset
df = pd.read_csv(file_name)

Saving AI4M Raw Data Set - Form Responses 1.csv to AI4M Raw Data Set - Form Responses 1.csv


## PreProcessing and Cleaning data

In [None]:
# Preprocess without encoding
col1 = "Date of diagnosis\nOn what date were you last diagnosed with malaria?"
col2 = "If diagnosed multiple times, list the dates of diagnosis in the past year"

df_non_encoded = preprocess_non_encoded(df)
sc = explode_and_diff(df_non_encoded,col1,col2)
print("Preprocessed Data (Non-Encoded):")
display(sc)

In [12]:
from google.colab import files
sc.to_csv('preproessed_output.csv', index=False)   # save to disk in Colab
files.download('preproessed_output.csv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

## One Hot Encoding

In [None]:
# Preprocess with one-hot encoding
df_one_hot = preprocess_one_hot(sc)
print("Preprocessed Data (One-Hot Encoded):")
display(df_one_hot)

In [8]:
from google.colab import files
df_one_hot.to_csv('hotencoded_output.csv', index=False)   # save to disk in Colab
files.download('hotencoded_output.csv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# Unit Testing

In [None]:
# ==============================
# 3. Unit Testing
# ==============================

class TestPreprocessing(unittest.TestCase):
    """
    Unit tests for preprocessing functions.
    - Check missing value handling in non-encoded preprocessing.
    - Validate one-hot encoding transformation.
    """
    def setUp(self):
        """
        Create a sample dataset with missing and categorical data.
        """
        self.data = pd.DataFrame({
            'Category': ['A', 'B', np.nan, 'A', 'C'],
            'Value': [10, 20, 30, np.nan, 50]
        })

    def test_non_encoded_preprocessing(self):
        """
        Test preprocess_non_encoded function.
        - Ensure no missing values remain.
        - Check categorical values are correctly filled.
        """
        processed_df = preprocess_non_encoded(self.data)
        self.assertFalse(processed_df.isnull().values.any(), "There should be no missing values.")
        self.assertIn(processed_df['Category'][2], ['A', 'B', 'C'], "Categorical missing values should be filled.")

    def test_one_hot_encoding(self):
        """
        Test preprocess_one_hot function.
        - Check if categorical columns are transformed correctly.
        - Validate column naming after one-hot encoding.
        """
        processed_df = preprocess_one_hot(self.data)
        self.assertFalse(processed_df.isnull().values.any(), "There should be no missing values.")
        self.assertTrue(all(col.startswith('Category_') for col in processed_df.columns if 'Category_' in col), "One-hot encoding should create correct column names.")

# Run unit tests
unittest.TextTestRunner().run(unittest.TestLoader().loadTestsFromTestCase(TestPreprocessing))