## **Feature:** Pattern Matching

**Names:** Tanat

### **What it does**
[Brief description]

### **Helper Functions**
[List Helper Functions]

In [202]:
# Get API Key
from dotenv import load_dotenv
load_dotenv()
import os
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
if not OPENAI_API_KEY:
    print("OpenAI API Key not found")

# Import libraries
import pandas as pd
import numpy as np
import math
import re
import datetime
from pint import UnitRegistry
from pint.errors import UndefinedUnitError
ureg = UnitRegistry()

# Langchain imports
from langchain.chat_models import ChatOpenAI  
from langchain.schema import HumanMessage, SystemMessage

In [None]:
patterns = {
    # Multiselect patterns - "A; B", "A/B", {'X', 'Y', 'Z'}, [1, 2, 3],
    "multiselect": r"^\s*[^,;/|&]+(?:\s*[,;/|&]\s*[^,;/|&]+|\s+(?:and|AND)\s+[^,;/|&]+)+\s*$",
    "multiselect_structured": r"^\s*[\[\{]\s*[^,\]\}]+(?:\s*,\s*[^,\]\}]+)+\s*[\]\}]\s*$",

    # Numerical values
    "numeric_plain": r"^\s*[-+]?(?:\d{1,3}(?:[,.\s]\d{3})*(?:[,.]\d+)?|\d+(?:[,.]\d+)?)\s*$",
    "percentage": r"^\s*[-+]?\d+(?:[,.]\d+)?\s*%\s*$",
    "currency": r"^\s*[$€£¥₹¢₽₦₴₪₩]?\s*[-+]?(?:\d{1,3}(?:[,.\s]\d{3})*(?:[,.]\d+)?|\d+(?:[,.]\d+)?)\s*(?:USD|EUR|AUD|GBP|INR|JPY|CAD|CHF|SEK|NOK|DKK|CNY|KRW|RUB|BRL|MXN)?\s*$",
    "range": r"^(?:[^\d]+)?([-+]?\d+(?:[.,:]?\d+)?)\s*(?:[-–—]\s*|\s+(?:to|TO|bis|à|and|AND)\s+)\s*([-+]?\d+(?:[.,:]?\d+)?)(?:\s+([a-zA-Z ]+))?\s*$",
    "numeric_with_units": r"^\s*(?:\w+(?:\s+\w+)*)?\s*[-+]?\d+(?:[,.]\d+)?\s*(?:[-–—]\s*|\s+(?:to|TO)\s+)\s*[-+]?\d+(?:[,.]\d+)?\s*(?:\w+(?:\s+\w+)*)?\s*$",
    "scientific": r"^\s*[-+]?\d+(?:[,.]\d+)?[eE][-+]?\d+\s*$",   
    
    # Date-time patterns (not  tested)
    "date_like": r"^\s*(?:\d{1,2}[/.-]\d{1,2}[/.-]\d{2,4}|\d{4}[/.-]\d{1,2}[/.-]\d{1,2})\s*$",
    "time_like": r"^\s*(?:\d{1,2}:\d{2}(?::\d{2})?(?:\s*[APap][Mm])?)\s*$",
}

In [232]:
def classify_column_pattern(series, patterns=patterns):
    """
    Classify an object column into a numeric-like pattern if most values match.
    """
    # Remove null values and convert to string
    vals = series.dropna().astype(str)
    if vals.empty:
        return None
    
    # Get unique values - much more efficient than sampling
    unique_vals = vals.unique()[0:30]
    total_unique = len(unique_vals)

    if total_unique < 10:
        min_support = 0.5
    elif total_unique < 30:
        min_support = 0.6
    else:
        min_support = 0.7

    # Check each pattern
    for pattern_name, pattern_regex in patterns.items():
        matches = 0
        for val in unique_vals:
            if re.match(pattern_regex, val.strip()):
                matches += 1

        # If this pattern matches enough values, return it
        if matches / total_unique >= min_support:
            return pattern_name
    
    return None

In [229]:
def classify_object_columns(df):
    col_types = {}
    object_df = df.select_dtypes(include=['object'])
    for col in object_df:
        col_type = classify_column_pattern(df[col], patterns)
        if col_type:
            col_types[col] = col_type
    return col_types

In [140]:
def clean_numeric_plain(val):
    s = str(val).strip()

    # Remove spaces, symbols, currency codes, %
    s = re.sub(r'\s+', '', s)
    s = re.sub(r'[$€£¥₹¢₽₦₴₪₩]', '', s)
    s = re.sub(r'(USD|EUR|AUD|GBP|INR|JPY|CAD|CHF|SEK|NOK|DKK|CNY|KRW|RUB|BRL|MXN)', '', s, flags=re.IGNORECASE)
    is_percent = False
    if '%' in s:
        s = s.replace('%', '')
        is_percent = True

    # Handle different number formats: 1.234,56 or 1,234.56
    if re.match(r'^\d{1,3}(\.\d{3})+,\d+$', s):
        s = s.replace('.', '').replace(',', '.')
    elif re.match(r'^\d{1,3}(,\d{3})+(\.\d+)?$', s):
        s = s.replace(',', '')

    # Handle any decimal commas and stray commas
    elif re.match(r'^\d+,\d+$', s):
        s = s.replace(',', '.')
    s = s.replace(',', '')

    # Convert to float
    try:
        num = float(s)
        if is_percent:
            num = num / 100
        return num
    except:
        return s

In [151]:
def handle_numeric(col):
    errors = {}
    cleaned = [clean_numeric_plain(val) for val in col]
    converted = pd.to_numeric(cleaned, errors='coerce')
    for orig, conv in zip(col, converted):
        if pd.isna(conv) and pd.notna(orig):
            if orig not in errors:
                errors[orig] = 0
            errors[orig] += 1
    print(f"Unmatched values in '{col.name}': {errors}")
    return converted, errors

In [234]:
def handle_range(val):
    """
    Extracts numeric ranges and units from strings, computes the midpoint.
    Returns a string "{midpoint} {unit}"
    """
    s = str(val).strip()
    # Match the range pattern
    range_regex = r"^(?:[^\d]+)?([-+]?\d+(?:[.,:]?\d+)?)\s*(?:[-–—]\s*|\s+(?:to|TO|bis|à|and|AND)\s+)\s*([-+]?\d+(?:[.,:]?\d+)?)(?:\s+([a-zA-Z ]+))?\s*$"
    match = re.match(range_regex, s)
    if not match:
        return s
    num1, num2, unit = match.group(1), match.group(2), match.group(3)
    # Normalize decimals
    num1 = num1.replace(',', '.').replace(':', '.')
    num2 = num2.replace(',', '.').replace(':', '.')
    try:
        n1 = float(num1)
        n2 = float(num2)
        midpoint = (n1 + n2) / 2
    except:
        return s
    return f"{midpoint} {unit}"

In [264]:
def unit_exists(unit, ureg):
    try:
        ureg.Unit(unit)
        return True
    except UndefinedUnitError:
        return False
    
def handle_units(col):
    """
    Normalize all units to the first unit found in the col and return numeric values.
    """
    errors = {}
    ureg = UnitRegistry()    
    # Find the first unit in the col
    target_unit = None
    for text in col.dropna():
        match = re.search(r'(\d+\.?\d*)\s*([a-zA-Z]+)', str(text))
        if match and unit_exists(match.group(2), ureg):
            target_unit = match.group(2)
            break
    
    if not target_unit:
        return col, errors

    def convert_units(text):
        # Handle no match or null
        if pd.isna(text):
            return None
        match = re.search(r'(\d+\.?\d*)\s*([a-zA-Z]+)', str(text))
        if not match:
            if str(text) not in errors:
                errors[str(text)] = 0
            errors[str(text)] += 1
            return None

        value, unit = match.groups()
        try:
            quantity = ureg.Quantity(float(value), unit)
            converted = quantity.to(target_unit)
            return converted.magnitude
        except Exception as e:
            if str(text) not in errors:
                errors[str(text)] = 0
            errors[str(text)] += 1
            return text
    # Apply conversion
    numeric_col = col.apply(convert_units) 
    numeric_col.name = f"{col.name or 'values'}_{target_unit}"
    print(f"Unmatched values in '{col.name}': {errors}")
    return numeric_col, errors

In [277]:
def handle_range_col(col):
    """
    Applies handle_range to a pandas Series.
    Returns a pandas Series with "{midpoint} {unit}" or np.nan if parsing fails.
    """
    result = []
    errors = {}
    valid_mask = []
    for val in col:
        if val is None or (isinstance(val, float)):
            result.append(val)
            valid_mask.append(False)
            continue
        cleaned = handle_range(val)
        if isinstance(cleaned, str) and cleaned == str(val).strip():
            if val not in errors:
                errors[val] = 0
            errors[val] += 1
            result.append(val)
            valid_mask.append(False)
        else:
            result.append(cleaned)
            valid_mask.append(True)

    result = pd.Series(result)
    result.name = col.name
    print(f"Unmatched values in '{col.name}': {errors}")

    valid_series = result[valid_mask]
    converted, unit_errors = handle_units(valid_series)
    errors.update(unit_errors)

    # Update only valid positions in the result
    result.loc[valid_series.index] = converted
    return result, errors

In [278]:
col, _ = handle_range_col(df['HoursOutside'])

Unmatched values in 'HoursOutside': {'Less than 30 minutes': 536, 'Over 4 hours': 70}
Unmatched values in 'HoursOutside': {}


In [240]:
# df = pd.read_csv("../sample_data/smoke.csv")[0:5000]
df = pd.read_csv("../sample_data/survey_results_public.csv")[0:5000]

  df = pd.read_csv("../sample_data/survey_results_public.csv")[0:5000]


In [241]:
col_types = classify_object_columns(df)

In [None]:
clean_object_df = df.select_dtypes(include=['object'])
log = []
numeric_cols = []
for col, col_type in col_types.items():
    if col_type in {'multiselect, multiselect_structured'}:
        pass 
    if col_type in {'range'}:
        converted, errors = handle_range_col(clean_object_df[col])
        log.append(errors)
        clean_object_df[col] = converted
        clean_object_df[col].name = converted.name
        numeric_cols.append((col, col_type))
    if col_type in {'percentage', 'numeric_plain', 'currency', 'scientific'}:
        converted, errors = handle_numeric(clean_object_df[col])
        clean_object_df[col] = converted
        log.append(errors)
        numeric_cols.append((col, col_type))
    if col_type in {'numeric_with_units'}:
        converted, errors = handle_units(clean_object_df[col])
        clean_object_df[col] = converted
        log.append(errors)
        numeric_cols.append((col, col_type))