# Semantic Duplicate Detection Test Data Setup

This notebook creates schemas, tables, and sample data for testing the duplicate detection feature.

## Structure
- **hurcy.bronze**: Source tables (10)
- **hurcy.silver**: Cleaned tables (10) - derived from bronze
- **hurcy.gold**: Aggregated tables (5) - derived from silver
- **hurcy.legacy**: Duplicate tables (15) - similar tables derived from the same sources
- **hurcy.mart**: Additional tables (5)

## 1. Create Schemas

In [0]:
%sql
-- Create schemas
CREATE SCHEMA IF NOT EXISTS hurcy.bronze COMMENT 'Raw data layer';
CREATE SCHEMA IF NOT EXISTS hurcy.silver COMMENT 'Cleaned and enriched data';
CREATE SCHEMA IF NOT EXISTS hurcy.gold COMMENT 'Business-level aggregates';
CREATE SCHEMA IF NOT EXISTS hurcy.legacy COMMENT 'Legacy tables - candidates for dedup';
CREATE SCHEMA IF NOT EXISTS hurcy.mart COMMENT 'Data mart tables';

## 2. Create Bronze Tables and Load Sample Data

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as T
from datetime import datetime, timedelta
import random
import string

# Helper functions for sample data generation
def random_string(length: int = 8) -> str:
    return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))

def random_email() -> str:
    return f"{random_string(8)}@{random.choice(['gmail.com', 'yahoo.com', 'hotmail.com'])}"

def random_date(start_days_ago: int = 365, end_days_ago: int = 0) -> datetime:
    days = random.randint(end_days_ago, start_days_ago)
    return datetime.now() - timedelta(days=days)

### 2.1 user_events 테이블

In [0]:
# Generate user events sample data
num_events = 10000
num_users = 500
num_sessions = 2000

event_types = ['page_view', 'click', 'add_to_cart', 'purchase', 'search']
device_types = ['desktop', 'mobile', 'tablet']
browsers = ['Chrome', 'Safari', 'Firefox', 'Edge']
page_urls = ['/home', '/product/123', '/product/456', '/cart', '/checkout', '/category/electronics']

events_data = []
for i in range(num_events):
    event_time = random_date(90, 0)
    events_data.append((
        f"evt_{random_string(12)}",
        f"user_{random.randint(1, num_users)}",
        random.choice(event_types),
        event_time,
        random.choice(page_urls),
        f"sess_{random.randint(1, num_sessions)}",
        random.choice(device_types),
        random.choice(browsers),
        f"192.168.{random.randint(1, 255)}.{random.randint(1, 255)}",
        '{"data": "sample"}',
        datetime.now()
    ))

events_schema = T.StructType([
    T.StructField("event_id", T.StringType()),
    T.StructField("user_id", T.StringType()),
    T.StructField("event_type", T.StringType()),
    T.StructField("event_timestamp", T.TimestampType()),
    T.StructField("page_url", T.StringType()),
    T.StructField("session_id", T.StringType()),
    T.StructField("device_type", T.StringType()),
    T.StructField("browser", T.StringType()),
    T.StructField("ip_address", T.StringType()),
    T.StructField("raw_payload", T.StringType()),
    T.StructField("ingested_at", T.TimestampType())
])

events_df = spark.createDataFrame(events_data, events_schema)
events_df.write.mode("overwrite").saveAsTable("hurcy.bronze.user_events")
print(f"Created hurcy.bronze.user_events with {num_events} rows")

Created hurcy.bronze.user_events with 10000 rows


### 2.2 customers 테이블

In [0]:
# Customer sample data
num_customers = 1000
genders = ['M', 'F', 'Other']
countries = ['KR', 'US', 'JP', 'CN', 'UK']
cities = ['Seoul', 'New York', 'Tokyo', 'Beijing', 'London']

customers_data = []
for i in range(num_customers):
    birth_date = random_date(365*60, 365*18)  # 18-60 years old
    reg_date = random_date(365*3, 0)
    customers_data.append((
        f"cust_{i+1:05d}",
        random_email(),
        f"010-{random.randint(1000, 9999)}-{random.randint(1000, 9999)}",
        random.choice(['John', 'Jane', 'Mike', 'Sarah', 'Kim', 'Park']),
        random.choice(['Smith', 'Johnson', 'Lee', 'Kim', 'Park']),
        birth_date.date(),
        random.choice(genders),
        reg_date.date(),
        random.choice(countries),
        random.choice(cities)
    ))

customers_schema = T.StructType([
    T.StructField("customer_id", T.StringType()),
    T.StructField("email", T.StringType()),
    T.StructField("phone", T.StringType()),
    T.StructField("first_name", T.StringType()),
    T.StructField("last_name", T.StringType()),
    T.StructField("birth_date", T.DateType()),
    T.StructField("gender", T.StringType()),
    T.StructField("registration_date", T.DateType()),
    T.StructField("country", T.StringType()),
    T.StructField("city", T.StringType())
])

customers_df = spark.createDataFrame(customers_data, customers_schema)
customers_df.write.mode("overwrite").saveAsTable("hurcy.bronze.customers")
print(f"Created hurcy.bronze.customers with {num_customers} rows")

Created hurcy.bronze.customers with 1000 rows


### 2.3 products_master 테이블

In [0]:
# Product sample data
num_products = 200
categories = ['Electronics', 'Clothing', 'Home', 'Sports', 'Beauty']
brands = ['BrandA', 'BrandB', 'BrandC', 'BrandD', 'BrandE']
statuses = ['ACTIVE', 'INACTIVE', 'DISCONTINUED']

import decimal

products_data = []
for i in range(num_products):
    price = decimal.Decimal(str(round(random.uniform(10, 1000), 2)))
    cost = decimal.Decimal(str(round(float(price) * random.uniform(0.4, 0.7), 2)))
    products_data.append((
        f"prod_{i+1:05d}",
        f"Product {i+1}",
        f"cat_{random.randint(1, 5)}",
        f"subcat_{random.randint(1, 20)}",
        random.choice(brands),
        price,
        cost,
        f"SKU{random_string(8).upper()}",
        random.choice(statuses),
        random_date(365*2, 0).date()
    ))

products_schema = T.StructType([
    T.StructField("product_id", T.StringType()),
    T.StructField("product_name", T.StringType()),
    T.StructField("category_id", T.StringType()),
    T.StructField("subcategory_id", T.StringType()),
    T.StructField("brand", T.StringType()),
    T.StructField("price", T.DecimalType(18, 2)),
    T.StructField("cost", T.DecimalType(18, 2)),
    T.StructField("sku", T.StringType()),
    T.StructField("status", T.StringType()),
    T.StructField("created_date", T.DateType())
])

products_df = spark.createDataFrame(products_data, products_schema)
products_df.write.mode("overwrite").saveAsTable("hurcy.bronze.products_master")
print(f"Created hurcy.bronze.products_master with {num_products} rows")

Created hurcy.bronze.products_master with 200 rows


### 2.4 orders_raw 테이블

In [0]:
# Order sample data
num_orders = 5000
order_statuses = ['PENDING', 'CONFIRMED', 'SHIPPED', 'COMPLETED', 'CANCELLED']
currencies = ['USD', 'KRW', 'JPY']

import decimal

orders_data = []
for i in range(num_orders):
    order_date = random_date(180, 0)
    total_amount = decimal.Decimal(str(round(random.uniform(50, 2000), 2)))
    orders_data.append((
        f"ord_{i+1:06d}",
        f"cust_{random.randint(1, num_customers):05d}",
        order_date.date(),
        total_amount,
        random.choice(currencies),
        random.choice(order_statuses),
        f"{random.randint(1, 999)} Main St, City {random.randint(1, 100)}",
        f"{random.randint(1, 999)} Billing Ave, City {random.randint(1, 100)}",
        order_date,
        order_date + timedelta(hours=random.randint(1, 48))
    ))

orders_schema = T.StructType([
    T.StructField("order_id", T.StringType()),
    T.StructField("customer_id", T.StringType()),
    T.StructField("order_date", T.DateType()),
    T.StructField("total_amount", T.DecimalType(18, 2)),
    T.StructField("currency", T.StringType()),
    T.StructField("status", T.StringType()),
    T.StructField("shipping_address", T.StringType()),
    T.StructField("billing_address", T.StringType()),
    T.StructField("created_at", T.TimestampType()),
    T.StructField("updated_at", T.TimestampType())
])

orders_df = spark.createDataFrame(orders_data, orders_schema)
orders_df.write.mode("overwrite").saveAsTable("hurcy.bronze.orders_raw")
print(f"Created hurcy.bronze.orders_raw with {num_orders} rows")

Created hurcy.bronze.orders_raw with 5000 rows


### 2.5 payments 테이블

In [0]:
import decimal

# Payment sample data
payment_methods = ['VISA', 'MASTERCARD', 'AMEX', 'PAYPAL', 'APPLEPAY']
payment_statuses = ['SUCCESS', 'FAILED', 'PENDING']

payments_data = []
for i in range(num_orders):
    processed_at = random_date(180, 0)
    payments_data.append((
        f"pay_{i+1:06d}",
        f"ord_{i+1:06d}",
        random.choice(payment_methods),
        decimal.Decimal(str(round(random.uniform(50, 2000), 2))),
        random.choice(currencies),
        random.choices(payment_statuses, weights=[0.85, 0.1, 0.05])[0],
        f"txn_{random_string(16)}",
        processed_at
    ))

payments_schema = T.StructType([
    T.StructField("payment_id", T.StringType()),
    T.StructField("order_id", T.StringType()),
    T.StructField("payment_method", T.StringType()),
    T.StructField("amount", T.DecimalType(18, 2)),
    T.StructField("currency", T.StringType()),
    T.StructField("status", T.StringType()),
    T.StructField("transaction_id", T.StringType()),
    T.StructField("processed_at", T.TimestampType())
])

payments_df = spark.createDataFrame(payments_data, payments_schema)
payments_df.write.mode("overwrite").saveAsTable("hurcy.bronze.payments")
print(f"Created hurcy.bronze.payments with {num_orders} rows")

Created hurcy.bronze.payments with 5000 rows


### 2.6 reviews 테이블

In [0]:
# Review sample data
num_reviews = 3000

reviews_data = []
for i in range(num_reviews):
    review_date = random_date(365, 0)
    rating = random.choices([1, 2, 3, 4, 5], weights=[0.05, 0.1, 0.15, 0.35, 0.35])[0]
    reviews_data.append((
        f"rev_{i+1:06d}",
        f"prod_{random.randint(1, num_products):05d}",
        f"cust_{random.randint(1, num_customers):05d}",
        rating,
        f"This is a sample review text for rating {rating}. " * random.randint(1, 5),
        review_date.date(),
        random.randint(0, 100),
        random.choice([True, False])
    ))

reviews_schema = T.StructType([
    T.StructField("review_id", T.StringType()),
    T.StructField("product_id", T.StringType()),
    T.StructField("customer_id", T.StringType()),
    T.StructField("rating", T.IntegerType()),
    T.StructField("review_text", T.StringType()),
    T.StructField("review_date", T.DateType()),
    T.StructField("helpful_votes", T.IntegerType()),
    T.StructField("verified_purchase", T.BooleanType())
])

reviews_df = spark.createDataFrame(reviews_data, reviews_schema)
reviews_df.write.mode("overwrite").saveAsTable("hurcy.bronze.reviews")
print(f"Created hurcy.bronze.reviews with {num_reviews} rows")

Created hurcy.bronze.reviews with 3000 rows


### 2.7 sessions 테이블

In [0]:
# Session sample data
sessions_data = []
for i in range(num_sessions):
    start_time = random_date(90, 0)
    duration = random.randint(30, 3600)
    sessions_data.append((
        f"sess_{i+1}",
        f"user_{random.randint(1, num_users)}",
        start_time,
        start_time + timedelta(seconds=duration),
        random.choice(device_types),
        random.choice(['Windows', 'macOS', 'iOS', 'Android']),
        random.choice(browsers),
        random.choice(countries),
        random.choice(cities)
    ))

sessions_schema = T.StructType([
    T.StructField("session_id", T.StringType()),
    T.StructField("user_id", T.StringType()),
    T.StructField("start_time", T.TimestampType()),
    T.StructField("end_time", T.TimestampType()),
    T.StructField("device_type", T.StringType()),
    T.StructField("os", T.StringType()),
    T.StructField("browser", T.StringType()),
    T.StructField("country", T.StringType()),
    T.StructField("city", T.StringType())
])

sessions_df = spark.createDataFrame(sessions_data, sessions_schema)
sessions_df.write.mode("overwrite").saveAsTable("hurcy.bronze.sessions")
print(f"Created hurcy.bronze.sessions with {num_sessions} rows")

Created hurcy.bronze.sessions with 2000 rows


### 2.8 inventory 테이블

In [0]:
# Inventory sample data
num_warehouses = 5
inventory_data = []
inv_id = 0
for p in range(1, num_products + 1):
    for w in range(1, num_warehouses + 1):
        if random.random() > 0.3:  # 70% chance of inventory existing
            inv_id += 1
            qty = random.randint(0, 500)
            reserved = min(qty, random.randint(0, 50))
            inventory_data.append((
                f"inv_{inv_id:06d}",
                f"prod_{p:05d}",
                f"wh_{w:02d}",
                qty,
                reserved,
                datetime.now() - timedelta(hours=random.randint(1, 168))
            ))

inventory_schema = T.StructType([
    T.StructField("inventory_id", T.StringType()),
    T.StructField("product_id", T.StringType()),
    T.StructField("warehouse_id", T.StringType()),
    T.StructField("quantity", T.IntegerType()),
    T.StructField("reserved_quantity", T.IntegerType()),
    T.StructField("last_updated", T.TimestampType())
])

inventory_df = spark.createDataFrame(inventory_data, inventory_schema)
inventory_df.write.mode("overwrite").saveAsTable("hurcy.bronze.inventory")
print(f"Created hurcy.bronze.inventory with {len(inventory_data)} rows")

Created hurcy.bronze.inventory with 683 rows


### 2.9 shipments 테이블

In [0]:
# Shipment sample data
carriers = ['FedEx', 'UPS', 'USPS', 'DHL', 'Amazon']
ship_statuses = ['PENDING', 'SHIPPED', 'IN_TRANSIT', 'DELIVERED', 'RETURNED']

shipments_data = []
for i in range(num_orders):
    shipped_date = random_date(180, 0)
    est_delivery = shipped_date + timedelta(days=random.randint(3, 10))
    actual_delivery = est_delivery + timedelta(days=random.randint(-2, 3))
    shipments_data.append((
        f"ship_{i+1:06d}",
        f"ord_{i+1:06d}",
        random.choice(carriers),
        f"1Z{random_string(16).upper()}",
        random.choice(ship_statuses),
        shipped_date.date(),
        est_delivery.date(),
        actual_delivery.date() if random.random() > 0.2 else None
    ))

shipments_schema = T.StructType([
    T.StructField("shipment_id", T.StringType()),
    T.StructField("order_id", T.StringType()),
    T.StructField("carrier", T.StringType()),
    T.StructField("tracking_number", T.StringType()),
    T.StructField("status", T.StringType()),
    T.StructField("shipped_date", T.DateType()),
    T.StructField("estimated_delivery", T.DateType()),
    T.StructField("actual_delivery", T.DateType())
])

shipments_df = spark.createDataFrame(shipments_data, shipments_schema)
shipments_df.write.mode("overwrite").saveAsTable("hurcy.bronze.shipments")
print(f"Created hurcy.bronze.shipments with {num_orders} rows")

Created hurcy.bronze.shipments with 5000 rows


### 2.10 campaigns 테이블

In [0]:
import decimal

# Campaign sample data
num_campaigns = 50
campaign_types = ['Email', 'Display', 'Search', 'Social', 'Affiliate']
channels = ['Google', 'Facebook', 'Instagram', 'Email', 'Twitter']

campaigns_data = []
for i in range(num_campaigns):
    start_date = random_date(180, 30)
    end_date = start_date + timedelta(days=random.randint(7, 60))
    campaigns_data.append((
        f"camp_{i+1:04d}",
        f"Campaign {i+1}",
        random.choice(campaign_types),
        start_date.date(),
        end_date.date(),
        decimal.Decimal(str(round(random.uniform(1000, 50000), 2))),
        random.choice(channels),
        f"Audience Segment {random.randint(1, 10)}"
    ))

campaigns_schema = T.StructType([
    T.StructField("campaign_id", T.StringType()),
    T.StructField("campaign_name", T.StringType()),
    T.StructField("campaign_type", T.StringType()),
    T.StructField("start_date", T.DateType()),
    T.StructField("end_date", T.DateType()),
    T.StructField("budget", T.DecimalType(18, 2)),
    T.StructField("channel", T.StringType()),
    T.StructField("target_audience", T.StringType())
])

campaigns_df = spark.createDataFrame(campaigns_data, campaigns_schema)
campaigns_df.write.mode("overwrite").saveAsTable("hurcy.bronze.campaigns")
print(f"Created hurcy.bronze.campaigns with {num_campaigns} rows")

Created hurcy.bronze.campaigns with 50 rows


## 3. Create Silver, Gold, Legacy, Mart Tables and Run Lineage Pipelines

Execute the SQL below to create derived tables and record lineage.

In [0]:
%run ./test_data_lineage

## 4. Verify Created Tables

In [0]:
# Verify created table list
schemas = ['bronze', 'silver', 'gold', 'legacy', 'mart']
for schema in schemas:
    tables = spark.sql(f"SHOW TABLES IN hurcy.{schema}").collect()
    print(f"\n=== hurcy.{schema} ({len(tables)} tables) ===")
    for t in tables:
        count = spark.sql(f"SELECT COUNT(*) FROM hurcy.{schema}.{t.tableName}").first()[0]
        print(f"  - {t.tableName}: {count:,} rows")


=== hurcy.bronze (10 tables) ===
  - campaigns: 50 rows
  - customers: 1,000 rows
  - inventory: 683 rows
  - orders_raw: 5,000 rows
  - payments: 5,000 rows
  - products_master: 200 rows
  - reviews: 3,000 rows
  - sessions: 2,000 rows
  - shipments: 5,000 rows
  - user_events: 10,000 rows

=== hurcy.silver (10 tables) ===
  - customer_profiles: 1,000 rows
  - inventory_status: 683 rows
  - order_items: 0 rows
  - orders_cleaned: 5,000 rows
  - payment_details: 5,000 rows
  - products_enriched: 200 rows
  - review_analysis: 3,000 rows
  - session_analysis: 2,000 rows
  - shipment_tracking: 5,000 rows
  - user_events_cleaned: 10,000 rows

=== hurcy.gold (5 tables) ===
  - channel_performance: 273 rows
  - customer_segments: 1,000 rows
  - daily_sales_summary: 181 rows
  - monthly_kpi: 7 rows
  - product_performance: 200 rows

=== hurcy.legacy (15 tables) ===
  - customer_master_old: 1,000 rows
  - customer_segmentation_old: 1,000 rows
  - daily_revenue_report: 181 rows
  - delivery_tr

## 5. Verify Lineage

- It takes approximately 30 minutes for lineage to be reflected in system tables after table creation.

In [0]:
# 리니지 정보 확인 (최근 데이터)
lineage_df = spark.sql("""
SELECT 
    source_table_full_name,
    target_table_full_name,
    COUNT(*) AS event_count,
    MAX(event_time) AS last_event
FROM system.access.table_lineage
WHERE (source_table_full_name LIKE 'hurcy.%' OR target_table_full_name LIKE 'hurcy.%')
    AND event_time >= current_date() - INTERVAL 1 DAY
    AND source_table_full_name IS NOT NULL
    AND target_table_full_name IS NOT NULL
GROUP BY source_table_full_name, target_table_full_name
ORDER BY event_count DESC
""")

print(f"Lineage events found: {lineage_df.count()}")
display(lineage_df)

Lineage events found: 51


source_table_full_name,target_table_full_name,event_count,last_event
hurcy.information_schema.columns,,4,2025-12-11T12:49:39.447Z
,hurcy.gold.daily_sales,1,2025-12-11T12:42:32.004Z
,hurcy.bronze.user_activity_log,1,2025-12-11T12:42:27.573Z
,hurcy.silver.purchase_orders,1,2025-12-11T12:42:20.067Z
,hurcy.analytics.customer_behavior_log,1,2025-12-11T12:42:29.009Z
,hurcy.bronze.users,1,2025-12-11T12:42:14.025Z
,hurcy.marketing.customer_orders,1,2025-12-11T12:42:21.523Z
,hurcy.raw.web_access_log,1,2025-12-11T12:42:30.571Z
,hurcy.legacy.tb_user_info,1,2025-12-11T12:42:16.978Z
,hurcy.legacy.tb_product_master,1,2025-12-11T12:42:26.025Z


## 6. Duplicate Candidate Table Summary

Below are the intentionally created duplicate table pairs:

| Silver/Gold Table | Legacy Duplicate Table | Similarity |
|---|---|---|
| silver.user_events_cleaned | legacy.tb_user_activity_log | ~99% |
| silver.orders_cleaned | legacy.order_transaction_history | ~95% |
| silver.customer_profiles | legacy.customer_master_old | ~90% |
| gold.daily_sales_summary | legacy.daily_revenue_report | ~95% |
| silver.products_enriched | legacy.product_catalog_v1 | ~95% |
| silver.payment_details | legacy.payment_transaction_log | ~90% |
| gold.customer_segments | legacy.customer_segmentation_old | ~85% |
| silver.review_analysis | legacy.product_review_stats | ~90% |
| gold.product_performance | legacy.item_sales_analytics | ~85% |
| silver.shipment_tracking | legacy.delivery_tracking_old | ~90% |
| silver.session_analysis | legacy.web_session_metrics | ~90% |
| silver.inventory_status | legacy.stock_levels | ~85% |
| gold.monthly_kpi | legacy.monthly_business_metrics | ~90% |
| gold.channel_performance | legacy.marketing_channel_stats | ~90% |
| silver.order_items | legacy.order_line_details | ~85% |
