In [1]:
# Standard library imports
import re

# Third-party imports
from Levenshtein import ratio as levenshtein_ratio

# PySpark core
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import *

# PySpark functions
from pyspark.sql.functions import (col, lit, when, expr, trim, upper, isnull, broadcast,
    substring, length, concat_ws, regexp_replace, regexp_extract, greatest, lower,
    coalesce, soundex, levenshtein
)

# PySpark types
from pyspark.sql.types import (StringType, IntegerType, StructType, StructField, 
    BooleanType, FloatType
)

# UDF
from pyspark.sql.functions import udf

In [2]:
# Initialize Spark with optimizations for large datasets
spark = SparkSession.builder.getOrCreate()

In [3]:
# Define dynamic paths
input_pattern = "/mmfs1/projects/f8d7c0/2024-25/Gate City Bank/Data/ndsu_split_file_*.csv"
sic_code_path = "/mmfs1/projects/f8d7c0/2024-25/Gate City Bank/Agustin/cleaned_SIC_code_list.csv"
cities_path = "/mmfs1/projects/f8d7c0/2024-25/Gate City Bank/Agustin/cities.csv"
fortune_path = "/mmfs1/projects/f8d7c0/2024-25/Gate City Bank/Agustin/fortune1000_2024.csv"

# Load the datasets
df = spark.read.csv(input_pattern, header=True, inferSchema=True)
sic_code_df = spark.read.csv(sic_code_path, header=True, inferSchema=True)
cities_df = spark.read.csv(cities_path, header=True, inferSchema=True)

# Load and prepare Fortune table with Company and Alt_name
fortune_df = spark.read.csv(fortune_path, header=True) \
    .select(
        col("Company").alias("fortune_company"),
        col("Alt_name").alias("fortune_alt_name")
    ) \
    .filter(col("fortune_company").isNotNull()) \
    .distinct() \
    .cache()


print(f"Total imput rows: {df.count()}")

Total imput rows: 10208188


In [4]:
# Define validation sets for geographic validation
valid_provinces = {'AB', 'BC', 'MB', 'NB', 'NL', 'NS', 'NT', 'NU', 'ON', 'PE', 'QC', 'SK', 'YT'}
valid_states = {
    'PR', 'DC', 'AL', 'AK', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'FL', 'GA', 
    'HI', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MD', 
    'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ', 
    'NM', 'NY', 'NC', 'ND', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC', 
    'SD', 'TN', 'TX', 'UT', 'VT', 'VA', 'WA', 'WV', 'WI', 'WY'
}
valid_countries = {
    'AF', 'AL', 'DZ', 'AS', 'AD', 'AO', 'AI', 'AQ', 'AG', 'AR', 
    'AM', 'AW', 'AU', 'AT', 'AZ', 'BS', 'BH', 'BD', 'BB', 'BY', 
    'BE', 'BZ', 'BJ', 'BM', 'BT', 'BO', 'BA', 'BW', 'BV', 'BR', 
    'IO', 'BN', 'BG', 'BF', 'BI', 'CV', 'KH', 'CM', 'CA', 'KY', 
    'CF', 'TD', 'CL', 'CN', 'CX', 'CC', 'CO', 'KM', 'CG', 'CD', 
    'CK', 'CR', 'CI', 'HR', 'CU', 'CY', 'CZ', 'DK', 'DJ', 'DM', 
    'DO', 'EC', 'EG', 'SV', 'GQ', 'ER', 'EE', 'ET', 'FK', 'FO', 
    'FJ', 'FI', 'FR', 'GF', 'PF', 'TF', 'GA', 'GM', 'GE', 'DE', 
    'GH', 'GI', 'GR', 'GL', 'GD', 'GP', 'GU', 'GT', 'GG', 'GN', 
    'GW', 'GY', 'HT', 'HM', 'VA', 'HN', 'HK', 'HU', 'IS', 'IN', 
    'ID', 'IR', 'IQ', 'IE', 'IM', 'IL', 'IT', 'JM', 'JP', 'JE', 
    'JO', 'KZ', 'KE', 'KI', 'KP', 'KR', 'KW', 'KG', 'LA', 'LV', 
    'LB', 'LS', 'LR', 'LY', 'LI', 'LT', 'LU', 'MO', 'MK', 'MG', 
    'MW', 'MY', 'MV', 'ML', 'MT', 'MH', 'MQ', 'MR', 'MU', 'YT', 
    'MX', 'FM', 'MD', 'MC', 'MN', 'ME', 'MS', 'MA', 'MZ', 'MM', 
    'NA', 'NR', 'NP', 'NL', 'NC', 'NZ', 'NI', 'NE', 'NG', 'NU', 
    'NF', 'MP', 'NO', 'OM', 'PK', 'PW', 'PS', 'PA', 'PG', 'PY', 
    'PE', 'PH', 'PN', 'PL', 'PT', 'PR', 'QA', 'RE', 'RO', 'RU', 
    'RW', 'BL', 'SH', 'KN', 'LC', 'MF', 'PM', 'VC', 'WS', 'SM', 
    'ST', 'SA', 'SN', 'RS', 'SC', 'SL', 'SG', 'SX', 'SK', 'SI', 
    'SB', 'SO', 'ZA', 'GS', 'SS', 'ES', 'LK', 'SD', 'SR', 'SJ', 
    'SZ', 'SE', 'CH', 'SY', 'TW', 'TJ', 'TZ', 'TH', 'TL', 'TG', 
    'TK', 'TO', 'TT', 'TN', 'TR', 'TM', 'TC', 'TV', 'UG', 'UA', 
    'AE', 'GB', 'US', 'UM', 'UY', 'UZ', 'VU', 'VE', 'VN', 'VG', 
    'VI', 'WF', 'EH', 'YE', 'ZM', 'ZW'
}

In [5]:
# Standardize abbreviations (e.g., "ST." → "ST")
abbrev_replacements = {
    r"\bST\.\b": "ST",
    r"\bRD\.\b": "RD",
    r"\bAVE\.\b": "AVE",
    r"\bBLVD\.\b": "BLVD"
}

In [6]:
# 1. Merge with SIC codes
deduped_sic = sic_code_df.dropDuplicates(["Related SIC Code"])

merged_df = df.join(
    deduped_sic.select(
        col("Related SIC Code").alias("SICSUBCD"), 
        col("Related SIC Code Description")
    ),
    on="SICSUBCD",
    how="left"
)
print(f"Total rows: {merged_df.count()}")


Total rows: 10208188


In [7]:
# 2. Add Online_status (1/0)
# Define online-exclusive SIC codes (4-digit)
online_sic_codes = {
    # Software/Internet Services
    "7371", "7372", "7373", "7374", "7375", "7376", "7377", "7378", "7379",
    
    # E-Commerce/Digital Content
    "5961",  # Mail-order/catalog retail
    "5962",  # Automatic merchandising (digital vending)
    "5944",  # Computer software stores (digital distribution)
    "4841",  # Video streaming
    "7822",  # Motion picture and video distribution
    
    # Digital Financial Services
    "6099",  # Cryptocurrency exchanges
    "6211",  # Online brokerages
    
    # Telecommunications
    "4812",  # Mobile apps
    "4813",  # VoIP services
    
    # Gaming/Entertainment
    "7999",  # Online gaming platforms
    "7841"   # Video streaming rental
    
    # Digital Services
    "5045",  # Computers and software wholesale (mostly online)
    "5963",  # Direct selling establishments (online)
    "7370",  # Computer programming and data processing
    "8999",  # Services, not elsewhere classified (many digital services)
            
    # E-commerce
    "5961",  # Catalog and mail-order houses
    "5944",  # Jewelry, watch, and silverware stores (online)
    "5734",  # Computer and software stores (online)
    "5999",  # Miscellaneous retail (many online)
    
    # Digital Education
    "8299",  # Schools and educational services (online learning)
    
    # Digital Travel
    "4724",  # Travel agencies (online booking)
    "4725",  # Tour operators (online)
    
    # Digital Advertising
    "7311",  # Advertising agencies (digital ads)
    "7319",  # Advertising, not elsewhere classified
     
}

# Define patterns that indicate online transactions in both TXN_DESCRIPTION and TERM_ADDR
online_patterns = r"""(?i)(  # Case insensitive flag
    www\.|\.com|\.net|\.org|\.io|\.co\b|\.ai|\.tech|  # Common TLDs
    online|internet|web|digital|e[\s-]?commerce|  # Common terms
    AliExpress|amazon|ebay|alibaba|shopify|etsy|walmart\.com|target\.com|  # Major e-commerce
    Peer-to-Peer|payrange|paypal|SQ|square|venmo|apple\s?pay|google\s?pay|zelle|  # Payment processors
    cloud|streaming|app\b|api\b|saas|platform|# Tech terms
    FACEBK|facebook|TWITTER|LinkedIn|TikTok|OnlyFans|Patreon|GoFundMe| # social networks
    zoom|webex|gotomeeting|teams|skype|  # Video conferencing
    uber|lyft|doordash|grubhub|instacart|  # On-demand services
    twitch|netflix|hulu|HLU|disney\+|hbo\s?max|peacock|  # Streaming
    spotify|apple\s?music|pandora|  # Music streaming
    audible|kindle|  # Digital books
    netmarble|Blizzard|steam|epic\s?games|playstation\s?network|xbox\s?live|  # Gaming
    dropbox|google\s?drive|onedrive|box\s?dot\s?com|  # Cloud storage
    quickbooks\s?online|xero|freshbooks|  # Online accounting
    salesforce|hubspot|zoho|  # SaaS platforms
    airbnb|vrbo|expedia|priceline|  # Online travel
    \*ecomm\*|\*online\*|\*digital\*  # Common asterisk patterns
)"""



# logic to check both description and terminal address
merged_df = merged_df.withColumn(
    "Online_status",
    when(
        (regexp_extract(col("TXN_DESCRIPTION"), r"\*", 0) != "") |  # Asterisk in description
        (col("SICSUBCD").isin(online_sic_codes)) |  # Online SIC code
        (regexp_extract(lower(col("TXN_DESCRIPTION")), online_patterns, 0) != "") |  # Patterns in description
        (regexp_extract(lower(col("TERM_ADDR")), online_patterns, 0) != ""),  # New: Patterns in terminal address
        1
    ).otherwise(0))

In [8]:
# 3. Extract State and Country from TXN_DESCRIPTION with validation
merged_df = merged_df.withColumn(
    "state_country",
    regexp_extract(trim(col("TXN_DESCRIPTION")), r'.*\s{2,}([A-Za-z]{4})$', 1)
)

# Split and validate State (first 2 chars)
merged_df = merged_df.withColumn(
    "State",
    when(
        upper(substring(col("state_country"), 1, 2)).isin(list(valid_states) + list(valid_provinces)),
        upper(substring(col("state_country"), 1, 2))
    ).otherwise(None)
)

# Split and validate Country (last 2 chars)
merged_df = merged_df.withColumn(
    "Country",
    when(
        upper(substring(col("state_country"), 3, 2)).isin(list(valid_countries)),
        upper(substring(col("state_country"), 3, 2))
    ).otherwise(None)
).drop("state_country")

# Additional validation for US states
merged_df = merged_df.withColumn(
    "State",
    when(
        (col("Country") == "US") & 
        (~col("State").isin(list(valid_states))),
        None
    ).otherwise(col("State"))
)

# Additional validation for CA provinces
merged_df = merged_df.withColumn(
    "State",
    when(
        (col("Country") == "CA") & 
        (~col("State").isin(list(valid_provinces))),
        None
    ).otherwise(col("State"))
)

In [9]:
# 4. Company Name Extraction with Fortune 1000 matching

# 4.1. Clean Fortune table data (preserve original case)
fortune_df = spark.read.csv(fortune_path, header=True) \
    .select(
        trim(col("Company")).alias("fortune_company"),  # Keep original case
        trim(col("Alt_name")).alias("fortune_alt_name")  # Keep original case
    ) \
    .filter(col("fortune_company").isNotNull()) \
    .distinct()

# 4.2. Merchant name extraction function with title case conversion
def extract_merchant_name(txn_description):
    """Extracts the likely merchant name and converts to title case"""
    if not txn_description:
        return None
    
    # Remove everything after double spaces (state/country info)
    name = re.sub(r"\s{2,}.*", "", str(txn_description))
    
    # Remove common suffixes and special characters (but keep dots)
    name = re.sub(r"\b(FACEBK|AMZ|TST|ABC|GOOGLE|SQ|PAYPAL|LLC|INC|LTD|CORP|CO|#\d+|\*\w+)\b|[^A-Za-z0-9.\s]", "", name)
    
    # Convert to title case (first letter uppercase, rest lowercase)
    name = ' '.join([w.capitalize() for w in name.split()])
    
    # Take first 3 words
    return ' '.join(name.split()[:3]).strip()

extract_merchant_udf = udf(extract_merchant_name, StringType())

# 4.3. Fortune matching function (case-insensitive comparison)
def match_fortune_company(merchant_name, fortune_broadcast):
    """Matches against Fortune table (case-insensitive) but returns proper case"""
    if not merchant_name:
        return None
    
    merchant_name_upper = str(merchant_name).upper()
    
    for row in fortune_broadcast.value:
        # 1. Check exact match with company name (case-insensitive)
        if row['fortune_company'] and merchant_name_upper == row['fortune_company'].upper():
            return row['fortune_company']  # Return original case from Fortune table
        
        # 2. Check exact match with alt name (case-insensitive)
        if row['fortune_alt_name'] and merchant_name_upper == row['fortune_alt_name'].upper():
            return row['fortune_company']  # Return company name in original case
        
        # 3. Check if merchant name starts with company name (case-insensitive)
        if row['fortune_company'] and merchant_name_upper.startswith(row['fortune_company'].upper()):
            return row['fortune_company']
        
        # 4. Check if merchant name starts with alt name (case-insensitive)
        if row['fortune_alt_name'] and merchant_name_upper.startswith(row['fortune_alt_name'].upper()):
            return row['fortune_company']
    
    return None

fortune_broadcast = spark.sparkContext.broadcast(fortune_df.collect())
fortune_match_udf = udf(lambda x: match_fortune_company(x, fortune_broadcast), StringType())

# 4.4. Apply to dataframe
merged_df = merged_df.withColumn(
    "Cleaned_Merchant",
    extract_merchant_udf(col("TXN_DESCRIPTION"))
)

# First try matching with Fortune table
merged_df = merged_df.withColumn(
    "Fortune_Match",
    fortune_match_udf(col("Cleaned_Merchant"))
)

# Final Parent_Company: Use Fortune match if found (with original case), 
# otherwise use cleaned merchant name (in title case)
merged_df = merged_df.withColumn(
    "Parent_Company",
    coalesce(col("Fortune_Match"), col("Cleaned_Merchant"))
)

# 4.5. Add Parent_Company_Flag

merged_df = merged_df.withColumn(
    "Parent_Company_Flag",
    when(
        col("Fortune_Match").isNotNull(),  # If we got a match from Fortune table
        1
    ).when(
        col("Parent_Company").isNotNull(),  # If we have a parent company but not from Fortune
        0
    ).otherwise(  # If Parent_Company is null
        None
    )
)

# Cleanup
merged_df = merged_df.drop("Fortune_Match")

In [10]:
#print(merged_df.columns)

In [11]:
# Show sample results
#merged_df.select("TXN_DESCRIPTION", "Parent_Company").show(30, truncate=False)

In [12]:
# 5. Enhanced City Extraction from TXN_DESCRIPTION (Handles multi-word cities)
valid_cities = cities_df.select(
    upper(col("city")).alias("city"),
    col("country_id")
).filter(col("city").isNotNull()).collect()

# Create a dictionary of cities by country, sorted by length (longest first)
valid_cities_dict = {}
for row in valid_cities:
    country = row["country_id"]
    city = row["city"]
    if country not in valid_cities_dict:
        valid_cities_dict[country] = []
    valid_cities_dict[country].append(city)

# Sort city lists by length (longest first) to prioritize multi-word matches
for country in valid_cities_dict:
    valid_cities_dict[country].sort(key=len, reverse=True)

def extract_city(txn_description, country, parent_company, online_status):
    if online_status == 1:
        return "Online"
    if txn_description is None or country is None:
        return None
    
    country = str(country).upper()
    txn_description = str(txn_description).strip()
    
    # Remove parent company name if present
    if parent_company is not None:
        parent_company = str(parent_company)
        txn_description = txn_description.replace(parent_company, '').strip()
    
    # Remove state/country suffix if present (e.g., "MNUS")
    state_country_pattern = r"[A-Z]{2}[A-Z]{2}$"
    txn_description = re.sub(state_country_pattern, "", txn_description).strip()
    
    # Get valid cities for this country (already sorted longest first)
    country_cities = valid_cities_dict.get(country, [])
    
    # Try to find the longest possible city name match first
    for city in country_cities:
        # Look for city at end of string (after stripping whitespace)
        if txn_description.rstrip().endswith(city):
            return city
        
        # Look for city followed by multiple spaces (common pattern)
        if f"{city}  " in txn_description:
            return city
    
    # Fallback: Split by multiple spaces and check last components
    parts = [p.strip() for p in re.split(r"\s{2,}", txn_description) if p.strip()]
    if parts:
        # Check last part (most likely city location)
        last_part = parts[-1].upper()
        for city in country_cities:
            if last_part == city:
                return city
            if last_part.endswith(city):
                return city
    
    return None

extract_city_udf = F.udf(extract_city, StringType())

# Apply city extraction first
merged_df = merged_df.withColumn(
    "City",
    extract_city_udf(
        col("TXN_DESCRIPTION"),
        col("Country"), 
        col("Parent_Company"),
        col("Online_status")
    )
)

In [13]:
# 6. Address Extraction
def extract_address_only(txn_description, parent_company, city, state, country):
    if txn_description is None or state is None or country is None:
        return None
    
    txn_description = txn_description.strip()
    state_country = f"{state}{country}"
    
    if state_country in txn_description:
        remaining = txn_description.replace(state_country, '').strip()
    else:
        remaining = txn_description
    
    if parent_company is not None:
        remaining = remaining.replace(str(parent_company), '').strip()
    
    if city is not None:
        remaining = remaining.replace(str(city), '').strip()
    
    return remaining if remaining else None

extract_address_only_udf = F.udf(extract_address_only, StringType())

# 6.1 Identification Flag
def validate_address(address):
    if address is None or not isinstance(address, str):
        return 0
    address_suffixes = r'\b(AV|AVE|ST|EASTERN|Street|RD|Road|BLVD|Boulevard|DR|Drive|LN|Lane|South|North|East|West|S|N|E|W)\b'
    return 1 if re.search(address_suffixes, address, flags=re.IGNORECASE) else 0

validate_address_udf = F.udf(validate_address, IntegerType())

# Apply both stages sequentially
merged_df = merged_df.withColumn(
    "Address_Candidate",
    extract_address_only_udf(
        col("TXN_DESCRIPTION"),
        col("Parent_Company"), 
        col("City"),
        col("State"),
        col("Country")
    )
)

# Then validate and flag
merged_df = merged_df.withColumn(
    "Address_Flag",
    validate_address_udf(col("Address_Candidate"))
)

# Final clean address (None if not valid)
merged_df = merged_df.withColumn(
    "Address",
    when(col("Address_Flag") == 1, col("Address_Candidate")).otherwise(None)
)

# Drop the temporary candidate column
merged_df = merged_df.drop("Address_Candidate")

# Verify the Address column was created
merged_df = merged_df.withColumn("Address_debug", 
    when(col("Address").isNotNull(), lit("Valid")))
    
print(f"Total rows: {merged_df.count()}")

Total rows: 10208188


In [14]:
# 7. Extract State from TERM_ADDR
merged_df = (
    merged_df
    # Extract first 2 characters (uppercased)
    .withColumn(
        "TERM_State_Raw",
        upper(substring(trim(col("TERM_ADDR")), 1, 2))
    )
    
    # Validate against states/provinces
    .withColumn(
        "TERM_State",
        when(
            col("TERM_State_Raw").isin(list(valid_states) + list(valid_provinces)),
            col("TERM_State_Raw")
        ).otherwise(None)
    )
    
    # Additional country-specific validation
    .withColumn(
        "TERM_State",
        when(
            (col("Country") == "US") & 
            (~col("TERM_State").isin(list(valid_states))),
            None
        )
        .when(
            (col("Country") == "CA") & 
            (~col("TERM_State").isin(list(valid_provinces))),
            None
        )
        .otherwise(col("TERM_State"))
    )
    
    # Cleanup temporary column
    .drop("TERM_State_Raw")
)

In [15]:
# 8. Extract City from TERM_ADDR
state_column_name = 'state_id'

us_cities = cities_df.filter(col("country_id") == "US") \
    .select(
        upper(col("city")).alias("city"),
        col(state_column_name)
    ).filter(col("city").isNotNull()).collect()

valid_cities_by_state = {}
for row in us_cities:
    state = row[state_column_name]
    city = row["city"]
    if state not in valid_cities_by_state:
        valid_cities_by_state[state] = set()
    valid_cities_by_state[state].add(city)

def extract_term_city(term_addr, term_state):
    if term_addr is None or term_state is None:
        return None
    
    term_state = str(term_state).upper()
    term_addr = str(term_addr).strip().upper()
    state_cities = valid_cities_by_state.get(term_state, set())
    
    space_indices = [m.start() for m in re.finditer(r'  +', term_addr)]
    
    if not space_indices:
        for city in state_cities:
            city = str(city)
            if city in term_addr:
                return city
        return None
    
    if len(space_indices) == 1:
        city_candidate = term_addr[:space_indices[0]].strip()
        city_candidate = str(city_candidate)
        for city in state_cities:
            city = str(city)
            if city in city_candidate:
                return city
        
        for city in state_cities:
            city = str(city)
            if city in term_addr:
                return city
    
    else:
        city_candidate = term_addr[space_indices[0]:space_indices[1]].strip()
        city_candidate = str(city_candidate)
        for city in state_cities:
            city = str(city)
            if city in city_candidate:
                return city
    
    return None

extract_term_city_udf = F.udf(extract_term_city, StringType())
merged_df = merged_df.withColumn(
    "TERM_City",
    extract_term_city_udf(
        col("TERM_ADDR"),
        col("TERM_State")
    )
)

In [16]:
# 9. Extract Address from TERM_ADDR (State first, then City)
from pyspark.sql.functions import regexp_replace, trim, when, col, expr

# First create a cleaned version
merged_df = merged_df.withColumn(
    "term_street", 
    trim(col("TERM_ADDR"))
)

# Remove phone numbers in formats like 307-222-0026 or (307) 222-0026
merged_df = merged_df.withColumn(
    "term_street",
    regexp_replace(
        col("term_street"),
        r"\b(\d{3}[-\.\s]??\d{3}[-\.\s]??\d{4}|\(\d{3}\)\s*\d{3}[-\.\s]??\d{4}|\d{3}[-\.\s]??\d{4})\b",
        ""
    )
)

# Handle cases where both state and city exist in order: STATE CITY STREET
merged_df = merged_df.withColumn(
    "term_street",
    when(
        (col("TERM_State").isNotNull() & col("TERM_City").isNotNull()),
        expr("""
            regexp_replace(
                regexp_replace(
                    term_street,
                    concat('^\\\\s*', TERM_State, '\\\\s+', TERM_City, '\\\\s+'),
                    ''
                ),
                concat('^\\\\s*', TERM_State, '\\\\s*$'),
                ''
            )
        """)
    ).otherwise(col("term_street"))
)

# Handle cases where only state exists
merged_df = merged_df.withColumn(
    "term_street",
    when(
        (col("TERM_State").isNotNull() & col("TERM_City").isNull()),
        expr("regexp_replace(term_street, concat('^\\\\s*', TERM_State, '\\\\s+'), '')")
    ).otherwise(col("term_street"))
)

# Final cleanup - remove extra spaces and trim
merged_df = merged_df.withColumn(
    "term_street",
    trim(regexp_replace(col("term_street"), r'\\s{2,}', ' '))
)

In [17]:
# 9.1 Validation flag
merged_df = merged_df.withColumn(
    "term_street_flag",
    when(
        (col("TERM_State").isNotNull()) & 
        (col("TERM_City").isNotNull()) & 
        (col("term_street").isNotNull()) & 
        (col("term_street") != ""),
        1  # Valid if all three fields exist
    ).otherwise(0)  # Otherwise, flag as 0
)

In [18]:
# 10. Create Company column by combining Parent_Company + Address + City + State (skip nulls)
merged_df = merged_df.withColumn(
    "Company",
    concat_ws(" ",
        *[when(col(c).isNotNull(), col(c)).otherwise("") 
         for c in ["Parent_Company", "Address", "City", "State"]]
    )
)

# Clean up the Company column to remove extra spaces
merged_df = merged_df.withColumn(
    "Company",
    regexp_replace(trim(col("Company")), r" +", " ")
)

In [19]:
# 11. Unique Company Key Generation
from pyspark.sql.functions import col, when, lit, concat_ws, upper, regexp_replace, coalesce, regexp_extract, substring

# Step 1: Clean and standardize company name
merged_df = merged_df.withColumn(
    "clean_company",
    coalesce(
        upper(trim(col("Parent_Company"))),  # rim and uppercase
        lit("UNKNOWN")
    )
)

# Step 2: Extract address component
merged_df = merged_df.withColumn(
    "clean_address_part",
    when(
        col("Address").isNotNull(),
        upper(
            coalesce(
                regexp_extract(col("Address"), r".*?(\d+).*", 1),  # Extract ANY digits
                regexp_extract(col("Address"), r"^(\w+)", 1),       # Fallback: first word
                lit("")
            )
        )
    ).otherwise(lit("NA"))
)

# Step 3: Build key components with validation
merged_df = merged_df.withColumn(
    "merchant_component",
    coalesce(col("HASHED_MERCH_ID"), lit("NA"))
)

# Location-based key (for physical stores)
merged_df = merged_df.withColumn(
    "location_component",
    when(
        (col("Address_Flag") == 1) &
        (col("City").isNotNull()) &
        (col("State").isNotNull()),
        concat_ws("_",
            col("clean_company"),
            substring(col("clean_address_part"), 1, 10),  # Use first 10 chars of address part
            upper(regexp_replace(col("City"), r"[^A-Z]", "")),
            col("State")
        )
    ).otherwise(lit("NA"))
)

# Online key (for e-commerce/online transactions)
merged_df = merged_df.withColumn(
    "online_component",
    when(
        col("Online_status") == 1,
        concat_ws("_",
            col("clean_company"),
            lit("ONLINE"),
            coalesce(col("Country"), lit("UNK"))
        )
    ).otherwise(lit("NA"))
)

# Step 4: Combine into final composite key
merged_df = merged_df.withColumn(
    "COMPANY_KEY",
    concat_ws("|",
        col("merchant_component"),
        col("location_component"),
        col("online_component")
    )
)

# Step 5: Cleanup temporary columns
merged_df = merged_df.drop(
    "clean_company", "clean_address_part",
    "merchant_component", "location_component", "online_component"
)

In [20]:
# 12. Final Cleanup and Output
# First ensure we have the term_street column
if 'term_street' not in merged_df.columns:
    merged_df = merged_df.withColumn(
        "term_street",
        when(col("TERM_State").isNotNull(),
            regexp_replace(
                regexp_extract(col("TERM_ADDR"), r'^(.*?)(?=\\\\s+[A-Z]{2}\\\\s+[A-Z]{2}$)', 0),
                r'\\\\d+$', ''
            )
        ).otherwise(None)
    )

# Cache DataFrame for performance
merged_df.cache()

# Add match quality flag based on extracted components only
validated_df = merged_df.withColumn(
    "term_street_flag",
    when(col("TERM_State").isNull(), 0)  # No state extracted
    .when(col("term_street").isNull(), 1)  # State but no address extracted
    .otherwise(2)  # Valid address components extracted
)

# Create final output DataFrame
#final_df = validated_df.drop("term_street")
final_df = validated_df.cache()

# Clean column names
for col_name in final_df.columns:
    clean_name = (col_name.replace(" ", "_")
                        .replace("-", "_")
                        .replace("(", "")
                        .replace(")", "")
                        .replace("%", "pct")
                        .replace(".", "_"))
    final_df = final_df.withColumnRenamed(col_name, clean_name)

print("\nFinal dataset columns:")
print(final_df.columns)

# Clean up cached DataFrame
merged_df.unpersist()


Final dataset columns:
['SICSUBCD', 'TXN_DESCRIPTION', 'TERM_ADDR', 'HASHED_MERCH_ID', 'Related_SIC_Code_Description', 'Online_status', 'State', 'Country', 'Cleaned_Merchant', 'Parent_Company', 'Parent_Company_Flag', 'City', 'Address_Flag', 'Address', 'Address_debug', 'TERM_State', 'TERM_City', 'term_street', 'term_street_flag', 'Company', 'COMPANY_KEY']


DataFrame[SICSUBCD: string, TXN_DESCRIPTION: string, TERM_ADDR: string, HASHED_MERCH_ID: string, Related SIC Code Description: string, Online_status: int, State: string, Country: string, Cleaned_Merchant: string, Parent_Company: string, Parent_Company_Flag: int, City: string, Address_Flag: int, Address: string, Address_debug: string, TERM_State: string, TERM_City: string, term_street: string, term_street_flag: int, Company: string, COMPANY_KEY: string]

In [21]:
# 13. Save the final processed dataset
# First drop innecesary columns
merged_df = merged_df.drop("Address_debug","Cleaned_Merchant")

for col_name in merged_df.columns:
    clean_name = (col_name.replace(" ", "_")
                        .replace("-", "_")
                        .replace("(", "")
                        .replace(")", "")
                        .replace("%", "pct")
                        .replace(".", "_"))
    merged_df = merged_df.withColumnRenamed(col_name, clean_name)
    
#merged_df.show()

output_path = '/mmfs1/projects/f8d7c0/2024-25/Gate City Bank/Agustin/Gate_City_processed_completed_TEST.csv'
#output_path = '/mmfs1/home/a.patronigranda/MIS795 Capstone Project/Output'
merged_df.write.csv(output_path, header=True, mode="overwrite")
print(f"Final dataset also saved to CSV: {output_path}")

# Optional: Save as Parquet
#parquet_path = '/mmfs1/projects/f8d7c0/2024-25/Gate City Bank/Agustin/Gate_City_processed_completed.parquet'
#merged_df.write.parquet(parquet_path, mode="overwrite")
#print(f"Final dataset saved as Parquet to: {parquet_path}")

# Stop Spark session
spark.stop()

Final dataset also saved to CSV: /mmfs1/projects/f8d7c0/2024-25/Gate City Bank/Agustin/Gate_City_processed_completed_TEST.csv
