# NAICS-Centered Industry Classification Entity Resolution


## Purpose
This notebook focuses on entity resolution for industry classification lookup tables starting with NAICS (North American Industry Classification System) as the primary table. We'll use AI functions to standardize and match industry codes across three different classification systems:

### Target Tables
- **NAICS** (North American Industry Classification System) - PRIMARY/LEFT TABLE
- **MCC** (Merchant Category Codes)
- **SIC** (Standard Industrial Classification)  

### Approach
1. Load and explore each classification table
2. Create cross joins with NAICS as the leftmost table
3. Apply AI_SIMILARITY to find matching classifications across systems
4. Use AI_CLASSIFY to identify best matches for each NAICS code
5. Create unified NAICS-centered industry mapping for entity resolution

### Expected Output
- One record for each NAICS code
- Ability to lookup corresponding MCC and SIC values for any NAICS


In [1]:
# =============================================================================
# 🔧 ENTITY RESOLUTION FRAMEWORK
# =============================================================================

# 1. IMPORTS & SETUP
# Python Data 
import pandas as pd
from pydantic import BaseModel, Field
import json

# Python Formatting & Display
import humanize 
from datetime import datetime
from textwrap import dedent

#  Snowpark
import snowflake.snowpark.functions as F
import snowflake.snowpark.types as T
import snowflake.snowpark.window as W
from snowflake.snowpark import Session
from snowflake.snowpark.context import get_active_session

# Cortex
import snowflake.cortex as C

# Helper Functions
def display_df_info(spdf, name="DataFrame"):
    """
    Display first 10 rows and metrics for a Snowpark DataFrame
    
    Args:
        spdf: Snowpark DataFrame to analyze
        name: Name to display for the DataFrame
    """
    # Get row and column counts
    row_count = spdf.count()
    col_count = len(spdf.columns)
    
    print(f"\n📊 {name} Overview:")
    print(f"  • Rows: {humanize.intword(row_count)} ({humanize.intcomma(row_count)})")
    print(f"  • Columns: {col_count}")
    
    print("\n🔍 First 10 rows:")
    spdf.limit(10).show()

def show_full_df(df, num_rows=10):
    """Display DataFrame with full formatting"""
    return df.limit(num_rows).to_pandas().style.set_properties(**{
        'text-align': 'left',
        'white-space': 'pre-wrap'
    }).set_table_styles([dict(selector='th', props=[('text-align', 'left')])])

def clean_na_values(df, columns_to_select=None):
    """
    Replace 'NA' string values with None in all string columns of a dataframe
    
    Args:
        df: Snowpark DataFrame to clean
        columns_to_select: Optional list of columns to select in output DataFrame
        
    Returns:
        Snowpark DataFrame with 'NA' values replaced with None
    """
    # Get all string columns
    string_columns = [field.name for field in df.schema.fields 
                     if isinstance(field.datatype, T.StringType)]

    # Create list of column transformations
    column_transformations_list = [
        F.when(F.col(column) == 'NA', None)
         .when((F.col(column) == 'x') & (column == 'FEDTAXID'), None)
         .otherwise(F.col(column))
        for column in string_columns
    ]

    # Apply all transformations at once
    cleaned_df = df.with_columns(string_columns, column_transformations_list)
    
    # Select specified columns if provided
    if columns_to_select:
        cleaned_df = cleaned_df.select(columns_to_select)
        
    return cleaned_df

print("✅ All imports and helper functions loaded successfully")


* 'allow_population_by_field_name' has been renamed to 'validate_by_name'


✅ All imports and helper functions loaded successfully


In [2]:
# 2. SESSION INITIALIZATION
def initialize_session():
    """Initialize Snowflake session with fallback options"""
    try:
        # First check for active session
        session = get_active_session()
        print("🔑 Using existing active Snowflake session")
        return session
    except Exception as e:
        print(f"⚠️  No active session found: {e}")
        try:
            # Try to load local credentials
            with open("/Users/jsoliz/.creds/gpn_connection.json", 'r') as f:
                connection_params = json.load(f)
            session = Session.builder.configs(connection_params).create()
            print("🔑 Local session initialized successfully")
            return session
        except Exception as e2:
            print(f"❌ Session initialization failed: {e2}")
            return None

# Initialize session
session = initialize_session()

if session:
    print(f"📊 Session active: {session.get_current_warehouse()}")
    print(f"🏢 Database: {session.get_current_database()}")
    print(f"📁 Schema: {session.get_current_schema()}")
else:
    print("❌ No session available - please check your connection configuration")


⚠️  No active session found: (1403): No default Session is found. Please create a session before you call function 'udf' or use decorator '@udf'.
Initiating login request with your identity provider. A browser window should have opened for you to complete the login. If you can't see it, check existing browser windows, or your OS settings. Press CTRL+C to abort and try again...
Going to open: https://login.microsoftonline.com/3a7077d2-14ae-4001-a736-75e0437e2b89/saml2?SAMLRequest=lZJfb9owFMW%2FSuQ9J3ESQooFVPxRV0q6MUgrbW8muRAPx85sp4F%2B%2BplQpO6hlfZmXZ9j%2F%2B49d3h7rLjzAkozKUYo8DByQOSyYGI%2FQk%2FZnXuDHG2oKCiXAkboBBrdjoeaVrwmk8aUYg1%2FGtDGsQ8JTbqLEWqUIJJqpomgFWhicrKZPKYk9DCplTQylxy9s3zuoFqDMpbwaik0s3ilMTXx%2FbZtvTbypNr7IcbYxwPfqs6SL1f90fb0gT7wce%2BstworX72xTZm4jOAzrO1FpMl9lq3c1fdNhpzJFXUmhW4qUBtQLyyHp3V6AdCWoASqDLdDddvXfhzi0NNCtjtOD5DLqm6MfdWzJ38Hhc%2FlntnGF%2FMRqg%2BsmJXpLIsfy6x%2Fv07rbVnp0%2B%2Fnw3H6MF02i68%2F1FY9pK%2FLZjnBP3PkPF%2BTDc%2FJLrRuYCHOeRpbwmHs4sSNggzHBA9IiL04Cn4hZ27zZIKaznmF

### Schemata (Confirm Tables)


In [3]:
# Load the classification tables
print("📥 Loading classification lookup tables...")

# Load all three classification systems
naics_lu_spdf = session.table("sandbox.javier.LU_NAICS")
mcc_lu_spdf = session.table("sandbox.javier.LU_MCC").where(~F.col("MCC").startswith("3"))
sic_lu_spdf = session.table("sandbox.javier.LU_SIC")

# Get record counts
naics_count = naics_lu_spdf.count()
mcc_count = mcc_lu_spdf.count() 
sic_count = sic_lu_spdf.count()

print(f"📋 NAICS Records (Primary): {naics_count:,} total")
print(f"🎯 MCC Records: {mcc_count:,} total")
print(f"🏭 SIC Records: {sic_count:,} total")

print(f"\n🔍 NAICS Table Sample (Primary Table):")
show_full_df(naics_lu_spdf.limit(3))

print(f"\n🎯 MCC Table Sample:")
mcc_lu_spdf.limit(3).show()

print(f"\n🏭 SIC Table Sample:")  
sic_lu_spdf.limit(3).show()

# Get schema information
print("\n📊 Schema Summary:")
print("=" * 50)

print(f"\n📋 NAICS Schema (Primary - {len(naics_lu_spdf.schema.names)} columns):")
for field in naics_lu_spdf.schema.fields:
    print(f"  - {field.name}: {field.datatype}")

print(f"\n🎯 MCC Schema ({len(mcc_lu_spdf.schema.names)} columns):")
for field in mcc_lu_spdf.schema.fields:
    print(f"  - {field.name}: {field.datatype}")

print(f"\n🏭 SIC Schema ({len(sic_lu_spdf.schema.names)} columns):")
for field in sic_lu_spdf.schema.fields:
    print(f"  - {field.name}: {field.datatype}")


📥 Loading classification lookup tables...
📋 NAICS Records (Primary): 2,125 total
🎯 MCC Records: 286 total
🏭 SIC Records: 1,005 total

🔍 NAICS Table Sample (Primary Table):

🎯 MCC Table Sample:
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"MCC"  |"MCC_DESCRIPTIVE_TITLE"                 |"INCLUDED_IN_THIS_MCC"                              |"SIMILAR_MCC_CODES"                                 |"SIMILAR_MCC_CODE_READABLE"                         |"MCC_ARRAY"                                         |"MCC_ARR_MAPS"                                      |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [4]:
# Create lookup tables with text descriptions for reuse (DRY principle)
print("🔧 Creating reusable lookup tables with text concatenations...")

# NAICS lookup with concatenated text (Primary table)
naics_lu_with_text_spdf = naics_lu_spdf.select(
    "CODE",
    "TITLE", 
    "DESCRIPTION",
    "DESCRIPTION_FULL",
    F.concat(
        F.col("TITLE"),
        F.lit(" - "),
        F.col("DESCRIPTION_FULL")
    ).alias("NAICS_TEXT")
)

# MCC lookup with concatenated text
mcc_lu_with_text_spdf = mcc_lu_spdf.select(
    "MCC",
    "MCC_DESCRIPTIVE_TITLE",
    "INCLUDED_IN_THIS_MCC",
    F.concat(
        F.col("MCC_DESCRIPTIVE_TITLE"),
        F.lit(" - "),
        F.col("INCLUDED_IN_THIS_MCC")
    ).alias("MCC_TEXT")
)

# SIC lookup with concatenated text
sic_lu_with_text_spdf = sic_lu_spdf.select(
    "SIC_INDUSTRY_CODE",
    "SIC_INDUSTRY_DESCRIPTION",
    "SIC_MAJOR_GROUP_DESCRIPTION",
    "SIC_DIVISION_DESCRIPTION",
    F.concat(
        F.col("SIC_INDUSTRY_DESCRIPTION"),
        F.lit(" - "),
        F.col("SIC_MAJOR_GROUP_DESCRIPTION")
    ).alias("SIC_TEXT")
)

print("✅ Reusable lookup tables with text concatenations created!")
print(f"📋 NAICS with text: {naics_lu_with_text_spdf.count():,} records")
print(f"🎯 MCC with text: {mcc_lu_with_text_spdf.count():,} records")
print(f"🏭 SIC with text: {sic_lu_with_text_spdf.count():,} records")


🔧 Creating reusable lookup tables with text concatenations...
✅ Reusable lookup tables with text concatenations created!
📋 NAICS with text: 2,125 records
🎯 MCC with text: 286 records
🏭 SIC with text: 1,005 records


# 📋 NAICS → MCC Mapping


## Objective
Create systematic mappings between NAICS and MCC classification systems using Snowflake's AI functions.
**NAICS is the primary/leftmost table** - we want one record per NAICS code with corresponding MCC lookups.

## Step-by-Step Approach

### Step 1: Cross Join and AI Similarity Calculation
- **Goal**: Cross join NAICS (left) with MCC (right) tables
- **AI Function**: Use `AI_SIMILARITY()` to compare names and descriptions between all NAICS-MCC pairs
- **Output**: Every NAICS record paired with every MCC record, including similarity scores

### Step 2: Ranking and Top 3 Selection  
- **Goal**: Use ranking function to keep top 3 MCC matches per NAICS code
- **Method**: `ROW_NUMBER()` or `RANK()` partitioned by NAICS, ordered by similarity score DESC
- **Output**: Maximum 3 MCC candidates per NAICS record

### Step 3: AI Classification for Final Selection
- **Goal**: Use `AI_CLASSIFY()` to pick the best match among the top 3 candidates
- **Method**: Let AI choose the most appropriate MCC match for each NAICS
- **Output**: Final `naics_mcc_lu_spdf` with one-to-one NAICS→MCC mapping

---

## 🚀 Step 1: Cross Join and AI Similarity Calculation


In [5]:
# Step 1: Cross Join NAICS and MCC with AI Similarity Calculation

print("\n🔍 First, let's examine the table structures:")
print("\nNAICS Table Columns (Primary):")
print(naics_lu_with_text_spdf.columns)
print("\nMCC Table Columns:")
print(mcc_lu_with_text_spdf.columns)

print(f"\n📊 Table Sizes:")
print(f"NAICS Records (Primary): {humanize.intcomma(naics_lu_with_text_spdf.count())}")
print(f"MCC Records: {humanize.intcomma(mcc_lu_with_text_spdf.count())}")
print(f"Cross Join Size: {humanize.intcomma(naics_lu_with_text_spdf.count() * mcc_lu_with_text_spdf.count())} total combinations")

# Create cross join with AI similarity calculation
print("\n🤖 Creating cross join with AI_SIMILARITY scores...")

import time
start_time = time.time()

# Time the cross join creation
naics_mcc_cross_join = (
    naics_lu_with_text_spdf.alias("naics").cross_join(
        mcc_lu_with_text_spdf.alias("mcc")
    )
)

# Time the column selection and AI similarity calculation
naics_mcc_cross_join = (
    naics_mcc_cross_join.select(
        # NAICS fields (Primary)
        F.col("CODE").alias("NAICS_CODE"),
        F.col("NAICS_TEXT"),
        
        # MCC fields  
        F.col("MCC").alias("MCC_CODE"),
        F.col("MCC_TEXT"),
        
        # AI Similarity score
        F.call_function("AI_SIMILARITY",
            F.col("NAICS_TEXT"),
            F.col("MCC_TEXT")
        ).alias("SIMILARITY_SCORE")
    )
)

# Write results to Snowflake table
print("\n💾 Writing results to Snowflake table...")
naics_mcc_cross_join.write.mode("overwrite").saveAsTable("sandbox.javier.naics_mcc_cross_join")

print("\n✅ Cross join with AI similarity created and saved successfully!")

end_time = time.time()
print(f"⏱️ Total execution time: {round(end_time - start_time, 2)} seconds")



🔍 First, let's examine the table structures:

NAICS Table Columns (Primary):
['CODE', 'TITLE', 'DESCRIPTION', 'DESCRIPTION_FULL', 'NAICS_TEXT']

MCC Table Columns:
['MCC', 'MCC_DESCRIPTIVE_TITLE', 'INCLUDED_IN_THIS_MCC', 'MCC_TEXT']

📊 Table Sizes:
NAICS Records (Primary): 2,125
MCC Records: 286


DataFrame.alias() is experimental since 1.5.0. Do not use it in production. 


Cross Join Size: 607,750 total combinations

🤖 Creating cross join with AI_SIMILARITY scores...

💾 Writing results to Snowflake table...

✅ Cross join with AI similarity created and saved successfully!
⏱️ Total execution time: 94.54 seconds


In [6]:
# Step 2: Load Cross Join Table and Apply Ranking for NAICS-MCC

print("📂 Loading existing NAICS-MCC cross join table...")
naics_mcc_cross_join = session.table("sandbox.javier.naics_mcc_cross_join")

print("🔍 Examining cross join table structure:")
print("Columns:", naics_mcc_cross_join.columns)
print(f"Total records: {humanize.intcomma(naics_mcc_cross_join.count())}")

print("\n🏆 Applying ranking to get top 3 MCC matches per NAICS...")

# Apply ROW_NUMBER() window function to rank MCC matches by similarity score
naics_mcc_top_3_matches = naics_mcc_cross_join.select(
    "*",
    F.row_number().over(
        W.Window.partition_by("NAICS_CODE").orderBy(F.col("SIMILARITY_SCORE").desc())
    ).alias("SIMILARITY_RANK")
).filter(
    F.col("SIMILARITY_RANK") <= 3
)

print("✅ Ranking applied! Keeping top 3 MCC matches per NAICS code.")
print(f"📊 Reduced from {humanize.intcomma(naics_mcc_cross_join.count())} to {humanize.intcomma(naics_mcc_top_3_matches.count())} records")

# Write results to table for Step 3
naics_mcc_top_3_matches.write.mode("overwrite").save_as_table("sandbox.javier.naics_mcc_cross_join_top3")
print("\n💾 Results saved to table for NAICS-MCC AI classification!")


📂 Loading existing NAICS-MCC cross join table...
🔍 Examining cross join table structure:
Columns: ['NAICS_CODE', 'NAICS_TEXT', 'MCC_CODE', 'MCC_TEXT', 'SIMILARITY_SCORE']
Total records: 607,750

🏆 Applying ranking to get top 3 MCC matches per NAICS...
✅ Ranking applied! Keeping top 3 MCC matches per NAICS code.
📊 Reduced from 607,750 to 6,375 records

💾 Results saved to table for NAICS-MCC AI classification!


In [7]:
# Step 3: AI Classification for Final NAICS-MCC Selection

print("📂 Loading top 3 NAICS-MCC results table...")
naics_mcc_cross_join_top3_spdf = session.table("sandbox.javier.naics_mcc_cross_join_top3")

print("🤖 Preparing data for AI_CLASSIFY to select best MCC match for each NAICS...")

# Group the top 3 MCC codes per NAICS into arrays for AI_CLASSIFY
naics_with_mcc_categories = naics_mcc_cross_join_top3_spdf.select(
    "NAICS_CODE",
    "NAICS_TEXT", 
    "MCC_CODE",
    "MCC_TEXT",
    "SIMILARITY_SCORE",
    "SIMILARITY_RANK"
).group_by(
    "NAICS_CODE", "NAICS_TEXT"
).agg(
    # Create array of MCC codes as classification categories
    F.array_agg(F.expr("MCC_CODE")).within_group("SIMILARITY_RANK").alias("MCC_CODES"),
    # Also keep MCC titles for reference
    F.array_agg(F.expr("MCC_TEXT")).within_group("SIMILARITY_RANK").alias("MCC_TEXTS"),
    F.array_agg(F.expr("SIMILARITY_RANK")).within_group("SIMILARITY_RANK").alias("SIMILARITY_RANKS")
)

print("✅ MCC categories prepared!")

print(f"\n🤖 Using AI_CLASSIFY to select best MCC matches for {humanize.intcomma(naics_with_mcc_categories.count())} NAICS codes...")

# Use AI_CLASSIFY to classify each NAICS into one of its 3 MCC categories
naics_mcc_classified = naics_with_mcc_categories.select(
    "NAICS_CODE",
    "NAICS_TEXT",
    "MCC_CODES",
    "MCC_TEXTS",
   
    # Use AI_CLASSIFY to select best MCC match from the 3 options
    F.call_function("AI_CLASSIFY",
        F.concat(F.col("NAICS_CODE"), F.lit(". "), F.col("NAICS_TEXT")),    # input
        F.col("MCC_TEXTS"),                                                  # list_of_categories
    ).alias("AI_CLASSIFIED_MCC_TEXT")
)

# Save AI classifications
naics_mcc_classified.selectExpr("""NAICS_CODE
                                , NAICS_TEXT
                                , MCC_TEXTS
                                , AI_CLASSIFIED_MCC_TEXT:labels[0]::string                   as AI_CLASSIFIED_MCC_TEXT_STR
                                , COALESCE(AI_CLASSIFIED_MCC_TEXT_STR, MCC_TEXTS[0]::string) as MOST_LIKELY_MCC_TEXT
                                , AI_CLASSIFIED_MCC_TEXT_STR is not null as CLASSIFY_HAD_RETURN 
                                ,CLASSIFY_HAD_RETURN AND  AI_CLASSIFIED_MCC_TEXT_STR = MCC_TEXTS[0]::string as CLASSIFY_RETURNED_MOST_SIMILAR_MCC
                              """
).write.mode("overwrite").save_as_table("sandbox.javier.naics_mcc_classified")

print("✅ NAICS-MCC AI classifications complete and saved!")


📂 Loading top 3 NAICS-MCC results table...
🤖 Preparing data for AI_CLASSIFY to select best MCC match for each NAICS...
✅ MCC categories prepared!

🤖 Using AI_CLASSIFY to select best MCC matches for 2,125 NAICS codes...
✅ NAICS-MCC AI classifications complete and saved!


In [8]:
# Create NAICS to MCC Mapping Table

print("📥 Loading NAICS-MCC AI classifications...")
naics_mcc_classified = session.table("sandbox.javier.naics_mcc_classified")

# Get distinct MCC code-text pairs from cross join table for joining back MCC codes
print("📊 Getting distinct MCC code-text pairs...")
mcc_pairs = (
    mcc_lu_with_text_spdf
    .select(
        F.col("MCC").alias("MCC_CODE"),
        F.col("MCC_TEXT")
    )
    .distinct()
)

# Join MCC codes back to AI classifications based on matched text
print("🔄 Joining MCC codes to AI classifications...")
naics_mcc_lookup = (
    naics_mcc_classified.join(
        mcc_pairs.distinct(),
        naics_mcc_classified["MOST_LIKELY_MCC_TEXT"] == mcc_pairs["MCC_TEXT"],
        "left"
    )
    .select(
        naics_mcc_classified["NAICS_CODE"],
        mcc_pairs["MCC_CODE"].alias("MOST_LIKELY_MCC_CODE")
    )
)

# Create final NAICS to MCC mapping table
code_map_naics_mcc = naics_mcc_lookup.select(
    "NAICS_CODE",
    "MOST_LIKELY_MCC_CODE"
).distinct().orderBy("NAICS_CODE")

code_map_naics_mcc.write.mode("overwrite").save_as_table("sandbox.javier.code_map_naics_mcc")

print(f"✅ NAICS to MCC mapping table created!")
print(f"📊 {humanize.intcomma(code_map_naics_mcc.count())} NAICS codes mapped to MCC")


📥 Loading NAICS-MCC AI classifications...
📊 Getting distinct MCC code-text pairs...
🔄 Joining MCC codes to AI classifications...
✅ NAICS to MCC mapping table created!
📊 2,125 NAICS codes mapped to MCC


In [9]:
# Verify NAICS to MCC mapping is 1:1
print("\n🔍 Verifying NAICS to MCC mapping cardinality...")

naics_mcc_counts = (
    naics_mcc_classified
    .groupBy("NAICS_CODE")
    .agg(F.count("*").alias("MCC_COUNT"))
    .where(F.col("MCC_COUNT") > 1)
)

multiple_mappings_count = naics_mcc_counts.count()

if multiple_mappings_count > 0:
    print(f"⚠️ Found {multiple_mappings_count} NAICS codes with multiple MCC mappings:")
    naics_mcc_counts.show()
else:
    print("✅ Verified: Each NAICS code maps to exactly one MCC code")

# Create histogram of MCC mappings per NAICS code from classified data
print("\n📊 Distribution of MCC mappings per NAICS code:")

mapping_distribution = (
    naics_mcc_classified
    .groupBy("NAICS_CODE")
    .agg(F.count("*").alias("MCC_COUNT"))
    .groupBy("MCC_COUNT")
    .agg(F.count("*").alias("NAICS_COUNT"))
    .orderBy("MCC_COUNT")
)

mapping_distribution.show()

# Compare number of NAICS codes in mapping vs source table
print("\n🔍 Comparing NAICS code coverage...")

mapped_naics_count = code_map_naics_mcc.select("NAICS_CODE").distinct().count()
source_naics_count = naics_lu_with_text_spdf.select("CODE").distinct().count()

print(f"Source NAICS codes: {humanize.intcomma(source_naics_count)}")
print(f"Mapped NAICS codes: {humanize.intcomma(mapped_naics_count)}")

if mapped_naics_count < source_naics_count:
    print(f"⚠️ Missing mappings for {humanize.intcomma(source_naics_count - mapped_naics_count)} NAICS codes")
    
    # Show unmapped NAICS
    print("\nUnmapped NAICS codes:")
    unmapped_naics = (
        naics_lu_with_text_spdf.select("CODE", "TITLE")
        .join(code_map_naics_mcc, naics_lu_with_text_spdf.CODE == code_map_naics_mcc.NAICS_CODE, "left_anti")
    )
    unmapped_naics.show()
else:
    print("✅ All source NAICS codes have been mapped")



🔍 Verifying NAICS to MCC mapping cardinality...
✅ Verified: Each NAICS code maps to exactly one MCC code

📊 Distribution of MCC mappings per NAICS code:
-------------------------------
|"MCC_COUNT"  |"NAICS_COUNT"  |
-------------------------------
|1            |2125           |
-------------------------------


🔍 Comparing NAICS code coverage...
Source NAICS codes: 2,125
Mapped NAICS codes: 2,125
✅ All source NAICS codes have been mapped


# 📋 NAICS → SIC Mapping

## Objective
Create systematic mappings between NAICS and SIC classification systems using Snowflake's AI functions.
**NAICS is the primary/leftmost table** - we want one record per NAICS code with corresponding SIC lookups.

## Step-by-Step Approach
Following the same methodology as NAICS-MCC mapping:

1. **Cross Join and AI Similarity**: NAICS (left) with SIC (right) 
2. **Ranking and Top 3 Selection**: Keep best SIC matches per NAICS
3. **AI Classification**: Use AI_CLASSIFY for final selection

---


In [10]:
# NAICS-SIC Mapping Implementation (Following Same Pattern as NAICS-MCC)

print("🏭 Creating NAICS-SIC mappings using the same methodology...")

# Step 1: Cross Join NAICS and SIC with AI Similarity
print("\n📊 Step 1: Creating NAICS-SIC cross join with AI_SIMILARITY...")

naics_sic_cross_join = (
    naics_lu_with_text_spdf.alias("naics").cross_join(sic_lu_with_text_spdf.alias("sic"))
    .select(
        # NAICS fields (Primary)
        F.col("CODE").alias("NAICS_CODE"),
        F.col("NAICS_TEXT"),
        # SIC fields  
        F.col("SIC_INDUSTRY_CODE").alias("SIC_CODE"),
        F.col("SIC_TEXT"),
        # AI Similarity score
        F.call_function("AI_SIMILARITY", F.col("NAICS_TEXT"), F.col("SIC_TEXT")).alias("SIMILARITY_SCORE")
    )
)

naics_sic_cross_join.write.mode("overwrite").saveAsTable("sandbox.javier.naics_sic_cross_join")
print("✅ NAICS-SIC cross join created!")

# Step 2: Ranking - Top 3 SIC matches per NAICS
print("\n🏆 Step 2: Applying ranking to get top 3 SIC matches per NAICS...")

naics_sic_top_3 = naics_sic_cross_join.select(
    "*",
    F.row_number().over(W.Window.partition_by("NAICS_CODE").orderBy(F.col("SIMILARITY_SCORE").desc())).alias("SIMILARITY_RANK")
).filter(F.col("SIMILARITY_RANK") <= 3)

naics_sic_top_3.write.mode("overwrite").save_as_table("sandbox.javier.naics_sic_cross_join_top3")
print("✅ Top 3 SIC matches per NAICS saved!")


# Step 3: AI Classification
print("\n🤖 Step 3: Using AI_CLASSIFY for final SIC selection...")

# Group top 3 SIC options for AI_CLASSIFY
naics_with_sic_categories = naics_sic_top_3.group_by("NAICS_CODE", "NAICS_TEXT").agg(
    F.array_agg(F.expr("SIC_TEXT")).within_group("SIMILARITY_RANK").alias("SIC_TEXTS")
)

# Apply AI_CLASSIFY
naics_sic_classified = naics_with_sic_categories.select(
    "NAICS_CODE", "NAICS_TEXT", "SIC_TEXTS",
    F.call_function("AI_CLASSIFY", F.concat(F.col("NAICS_CODE"), F.lit(". "), F.col("NAICS_TEXT")), F.col("SIC_TEXTS")).alias("AI_CLASSIFIED_SIC_TEXT")
)


🏭 Creating NAICS-SIC mappings using the same methodology...

📊 Step 1: Creating NAICS-SIC cross join with AI_SIMILARITY...
✅ NAICS-SIC cross join created!

🏆 Step 2: Applying ranking to get top 3 SIC matches per NAICS...
✅ Top 3 SIC matches per NAICS saved!

🤖 Step 3: Using AI_CLASSIFY for final SIC selection...


In [11]:
# Save and create mapping table
naics_sic_classified.selectExpr("""NAICS_CODE, NAICS_TEXT, SIC_TEXTS,
                               AI_CLASSIFIED_SIC_TEXT:labels[0]::string as AI_CLASSIFIED_SIC_TEXT_STR,
                               COALESCE(AI_CLASSIFIED_SIC_TEXT_STR, SIC_TEXTS[0]::string) as MOST_LIKELY_SIC_TEXT"""
).write.mode("overwrite").save_as_table("sandbox.javier.naics_sic_classified")

session.table("sandbox.javier.naics_sic_classified").print_schema()
# Create final mapping table
sic_pairs = session.table("sandbox.javier.naics_sic_cross_join").select("SIC_CODE", "SIC_TEXT").distinct()
naics_sic_lookup = (
    session.table("sandbox.javier.naics_sic_classified").join(sic_pairs, F.col("MOST_LIKELY_SIC_TEXT") == F.col("SIC_TEXT"), "left")
    .select("NAICS_CODE", F.col("SIC_CODE").alias("MOST_LIKELY_SIC_CODE")).distinct().orderBy("NAICS_CODE")
)
naics_sic_lookup.write.mode("overwrite").save_as_table("sandbox.javier.code_map_naics_sic")

print("✅ NAICS-SIC mapping complete!")
print(f"📊 {humanize.intcomma(naics_sic_lookup.count())} NAICS codes mapped to SIC")


root
 |-- "NAICS_CODE": StringType(16777216) (nullable = True)
 |-- "NAICS_TEXT": StringType(33554435) (nullable = True)
 |-- "SIC_TEXTS": ArrayType (nullable = False)
 |   |-- element: StringType()
 |-- "AI_CLASSIFIED_SIC_TEXT_STR": StringType() (nullable = True)
 |-- "MOST_LIKELY_SIC_TEXT": StringType() (nullable = True)
✅ NAICS-SIC mapping complete!
📊 2,125 NAICS codes mapped to SIC


# 📋 Final NAICS-Centered Industry Classification Table

## Objective
Create the final NAICS-centered industry classification table by joining all mappings together.
This will provide one record per NAICS code with corresponding MCC and SIC lookups.

## Final Output Structure
- **NAICS_CODE** (Primary Key)
- **MCC_CODE** (Mapped)
- **SIC_CODE** (Mapped)
- Plus descriptive fields from all three classification systems

---


In [12]:
# Join NAICS-MCC and NAICS-SIC lookups to create final classification table
naics_mcc = session.table("sandbox.javier.code_map_naics_mcc").select(
    F.col("NAICS_CODE").alias("NAICS_CODE_MCC_SIDE"),
    "MOST_LIKELY_MCC_CODE"
)
naics_sic = session.table("sandbox.javier.code_map_naics_sic").select(
    F.col("NAICS_CODE").alias("NAICS_CODE_SIC_SIDE"), 
    "MOST_LIKELY_SIC_CODE"
)

# Create final classification table with all descriptive fields
naics_classifications = (
    naics_mcc
    .join(naics_sic, naics_mcc.NAICS_CODE_MCC_SIDE == naics_sic.NAICS_CODE_SIC_SIDE, "full")
    .join(naics_lu_with_text_spdf, F.coalesce(F.col("NAICS_CODE_MCC_SIDE"), F.col("NAICS_CODE_SIC_SIDE")) == naics_lu_with_text_spdf.CODE, "left")
    .join(
        mcc_lu_with_text_spdf.select("MCC", F.col("MCC_DESCRIPTIVE_TITLE").alias("MCC_TEXT")), 
        F.col("MOST_LIKELY_MCC_CODE") == mcc_lu_with_text_spdf.MCC,
        "left"
    )
    .join(
        sic_lu_with_text_spdf,
        F.col("MOST_LIKELY_SIC_CODE") == sic_lu_with_text_spdf.SIC_INDUSTRY_CODE,
        "left"
    )
    .select(
        F.coalesce(F.col("NAICS_CODE_MCC_SIDE"), F.col("NAICS_CODE_SIC_SIDE")).alias("NAICS_CODE"),
        F.col("TITLE").alias("NAICS_TEXT"),
        F.col("MOST_LIKELY_MCC_CODE").alias("MCC_CODE"),
        "MCC_TEXT", 
        F.col("MOST_LIKELY_SIC_CODE").alias("SIC_CODE"),
        "SIC_TEXT"
    )
    .orderBy("NAICS_CODE")
)

# Save final classification table
naics_classifications.write.mode("overwrite").save_as_table("sandbox.javier.naics_industry_classifications")


In [13]:
# Final Statistics and Verification

print("📈 Final Statistics and Verification")
print("=" * 50)


# Verify one record per NAICS code
naics_classifications = session.table("sandbox.javier.naics_industry_classifications")

# Get comprehensive statistics
total_naics_records = naics_classifications.count()
records_with_mcc = naics_classifications.filter(F.col("MCC_CODE").isNotNull()).count()
records_with_sic = naics_classifications.filter(F.col("SIC_CODE").isNotNull()).count()
records_with_both = naics_classifications.filter(
    F.col("MCC_CODE").isNotNull() & F.col("SIC_CODE").isNotNull()
).count()

print(f"\n📋 NAICS-Centered Classification Summary:")
print(f"Total NAICS records:                 {total_naics_records:>6}")
print(f"NAICS records with MCC mapping:      {records_with_mcc:>6} ({(records_with_mcc/total_naics_records*100):>5.1f}%)")
print(f"NAICS records with SIC mapping:      {records_with_sic:>6} ({(records_with_sic/total_naics_records*100):>5.1f}%)")
print(f"NAICS records with both mappings:    {records_with_both:>6} ({(records_with_both/total_naics_records*100):>5.1f}%)")

# Verify exactly one record per NAICS code
print(f"\n🔍 Verification - One Record Per NAICS Code:")
records_per_naics = naics_classifications.group_by("NAICS_CODE").count()
max_records_per_naics = records_per_naics.agg(F.max("count")).collect()[0][0]
min_records_per_naics = records_per_naics.agg(F.min("count")).collect()[0][0]

print(f"Max records per NAICS: {max_records_per_naics}")
print(f"Min records per NAICS: {min_records_per_naics}")

if max_records_per_naics == 1 and min_records_per_naics == 1:
    print("✅ SUCCESS: Exactly one record per NAICS code confirmed!")
else:
    print("⚠️  WARNING: Multiple records found for some NAICS codes")

# Show unique counts
unique_mcc = naics_classifications.select("MCC_CODE").distinct().count()
unique_sic = naics_classifications.select("SIC_CODE").distinct().count()

print(f"\n📊 Unique Code Counts:")
print(f"Unique NAICS codes:                  {total_naics_records:>6}")
print(f"Unique MCC codes mapped:             {unique_mcc:>6}")
print(f"Unique SIC codes mapped:             {unique_sic:>6}")

print(f"\n✅ NAICS-centered industry classification lookup complete!")
print(f"💡 Table: 'naics_industry_classifications' - One record per NAICS with MCC and SIC lookups")


📈 Final Statistics and Verification

📋 NAICS-Centered Classification Summary:
Total NAICS records:                   2125
NAICS records with MCC mapping:        2125 (100.0%)
NAICS records with SIC mapping:        2123 ( 99.9%)
NAICS records with both mappings:      2123 ( 99.9%)

🔍 Verification - One Record Per NAICS Code:
Max records per NAICS: 1
Min records per NAICS: 1
✅ SUCCESS: Exactly one record per NAICS code confirmed!

📊 Unique Code Counts:
Unique NAICS codes:                    2125
Unique MCC codes mapped:                235
Unique SIC codes mapped:                762

✅ NAICS-centered industry classification lookup complete!
💡 Table: 'naics_industry_classifications' - One record per NAICS with MCC and SIC lookups


In [14]:
# Create comprehensive industry classification table with descriptions

print("🔄 Creating comprehensive NAICS-centered industry classification table...")

# NOTE: This assumes the mapping tables code_map_naics_mcc and code_map_naics_sic have been created
# by following the same methodology as the original notebook but with NAICS as the primary table

try:
    # Load final mappings (these would be created by the full implementation)
    code_map_naics_mcc = session.table("sandbox.javier.code_map_naics_mcc")
    code_map_naics_sic = session.table("sandbox.javier.code_map_naics_sic")
    
    # Join the NAICS-MCC and NAICS-SIC mappings
    naics_industry_mappings = code_map_naics_mcc.join(
        code_map_naics_sic,
        on="NAICS_CODE", 
        how="left"
    ).orderBy("NAICS_CODE")

    # Join with lookup tables to get comprehensive descriptions
    naics_industry_classifications = \
    (naics_industry_mappings
        .join(naics_lu_spdf, naics_industry_mappings.NAICS_CODE == naics_lu_spdf.CODE, "left")
        .join(mcc_lu_spdf, naics_industry_mappings.MOST_LIKELY_MCC_CODE == mcc_lu_spdf.MCC, "left") 
        .join(sic_lu_spdf, naics_industry_mappings.MOST_LIKELY_SIC_CODE == sic_lu_spdf.SIC_INDUSTRY_CODE, "left")
        .selectExpr(
            """ NAICS_CODE,
                MOST_LIKELY_MCC_CODE AS MCC_CODE,
                MOST_LIKELY_SIC_CODE AS SIC_CODE,
                -- NAICS fields (Primary)
                TITLE AS NAICS_TITLE,
                DESCRIPTION_FULL AS NAICS_DESCRIPTION,
                -- MCC fields  
                MCC_DESCRIPTIVE_TITLE AS MCC_TITLE,
                INCLUDED_IN_THIS_MCC AS MCC_INCLUDED_MERCHANTS,
                -- SIC fields
                SIC_INDUSTRY_DESCRIPTION AS SIC_TITLE,
                SIC_MAJOR_GROUP_DESCRIPTION AS SIC_MAJOR_GROUP_TITLE,
                SIC_DIVISION_DESCRIPTION AS SIC_DIVISION_TITLE
            """
        )
    )

    print("🔍 Sample of NAICS-centered classification table:")
    naics_industry_classifications.limit(5).show()

    # Save the comprehensive table
    print(f"\n💾 Writing comprehensive classifications to 'naics_industry_classifications' table...")
    naics_industry_classifications.orderBy("NAICS_CODE").write.mode("overwrite").saveAsTable("sandbox.javier.naics_industry_classifications")

    print(f"\n📊 Final table contains {humanize.intcomma(naics_industry_classifications.count())} NAICS records")
    print("\n📋 Final NAICS-centered industry classification table created!")
    print("💡 You can now lookup MCC and SIC values for any NAICS code using this table")

except Exception as e:
    print(f"⚠️ Mapping tables not yet created. Complete the full implementation first.")
    print(f"   This cell shows the expected final outcome structure.")
    print(f"   Error: {e}")
    
    # Show expected structure with sample data
    print(f"\n📋 Expected Final Table Structure:")
    print("NAICS_CODE | MCC_CODE | SIC_CODE | NAICS_TITLE | MCC_TITLE | SIC_TITLE")
    print("-----------|----------|----------|-------------|-----------|----------")
    print("445110     | 5411     | 5411     | Supermarket | Grocery   | Grocery  ")
    print("722511     | 5812     | 5812     | Full-Service| Restaurant| Restaurant")
    print("...")
    print("\n📋 Each NAICS code will have exactly one record with corresponding MCC and SIC lookups")


🔄 Creating comprehensive NAICS-centered industry classification table...
🔍 Sample of NAICS-centered classification table:
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"NAICS_CODE"  |"MCC_CODE"  |"SIC_CODE"  |"NAICS_TITLE"                          |"NAICS_DESCRIPTION"                                 |"MCC_TITLE"                                         |"MCC_INCLUDED_MERCHANTS"                            |"SIC_TITLE"                                         |"SIC_MAJOR_GROUP_TITLE"                 |"SIC_DIVISION_TITLE"  |
----------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
# Reload and display the saved table
print("\n📋 Displaying saved NAICS industry classifications ordered by NAICS code:")
naics_industry_classifications = session.table("sandbox.javier.naics_industry_classifications")
naics_industry_classifications.orderBy("NAICS_CODE").show()
# Count distinct NAICS codes
distinct_naics_count = naics_industry_classifications.select("NAICS_CODE").distinct().count()
print(f"\n🔢 Total distinct NAICS codes: {humanize.intcomma(distinct_naics_count)}")


In [None]:
# Close session
session.close()
