# 01. Databricks 환경 설정 및 Northwind 데이터베이스 구축

이 노트북은 **Databricks Text-to-SQL RAG 시스템**의 첫 번째 단계로, 실제 Databricks Lakehouse 환경에서 Northwind 샘플 데이터베이스를 구축합니다.

## 🎯 목표
- **Databricks 환경 설정** 및 Spark 세션 초기화
- **Northwind 샘플 데이터베이스** 스키마 설계 및 구축
- **Delta Lake 테이블** 생성 및 데이터 로드
- **데이터 검증** 및 기본 SQL 테스트
- **LangChain Agent 연동** 준비

## 🏗️ Northwind 데이터베이스 구조
```
Northwind Database (8개 테이블)
├── customers      # 고객 정보
├── suppliers      # 공급업체 정보
├── categories     # 상품 카테고리
├── products       # 상품 정보
├── employees      # 직원 정보
├── shippers       # 운송업체 정보
├── orders         # 주문 정보
└── order_details  # 주문 상세 정보
```

## 📋 전제조건
- Databricks Workspace 및 클러스터 준비
- Delta Lake 테이블 생성 권한
- Spark SQL 실행 권한

## 🚀 다음 단계
이 노트북 완료 후 → `02_langchain_agent_text_to_sql.ipynb`에서 LangChain Agent 구현

## 🗄️ Northwind 샘플 데이터베이스 소개

Northwind는 **가상의 무역회사 데이터베이스**로 Text-to-SQL 학습에 이상적입니다.

### 📊 Northwind 데이터베이스 구조
- **Customers** (고객): 회사명, 연락처, 주소 등
- **Orders** (주문): 주문일자, 고객정보, 배송정보 등  
- **Order_Details** (주문상세): 상품별 수량, 가격, 할인 등
- **Products** (상품): 상품명, 카테고리, 재고, 가격 등
- **Categories** (카테고리): 상품 분류 정보
- **Suppliers** (공급업체): 공급업체 정보 및 연락처
- **Employees** (직원): 직원 정보 및 관리자 관계
- **Shippers** (배송업체): 배송 회사 정보

### 🎯 Text-to-SQL에 적합한 이유
1. **실무 친화적**: 실제 비즈니스 시나리오 반영
2. **다양한 관계**: JOIN, 집계, 필터링 등 다양한 SQL 패턴
3. **직관적 데이터**: 도메인 지식이 쉽게 이해 가능
4. **적당한 크기**: 학습 및 테스트에 최적화된 데이터 규모

## 1. Databricks 환경 초기화 및 Spark 세션 설정
1. **CSV 파일 업로드** → Delta Lake 테이블 생성
2. **PostgreSQL 연결** → Databricks로 데이터 이관  
3. **SQL 스크립트 실행** → 직접 테이블 생성
4. **샘플 데이터 생성** → 프로그래밍 방식으로 구축

## 1. 필요한 라이브러리 및 Databricks 서비스 임포트

In [1]:
# Databricks 환경 및 Spark 세션 초기화
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
from datetime import datetime, date
import warnings
warnings.filterwarnings('ignore')

print("🚀 Databricks Text-to-SQL RAG 시스템 초기화")
print("=" * 50)

# 환경 정보 확인
print("\n📋 환경 정보:")
print(f"Python 버전: {sys.version.split()[0]}")
print(f"Databricks Runtime: {os.environ.get('DATABRICKS_RUNTIME_VERSION', 'N/A')}")

# Spark 세션 초기화
spark = SparkSession.getActiveSession()
if spark is None:
    spark = SparkSession.builder \
        .appName("Northwind-TextToSQL-RAG") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .getOrCreate()

print(f"Spark 버전: {spark.version}")

# Spark Connect 환경에서 안전한 방식으로 애플리케이션 정보 조회
try:
    # Spark Connect에서는 sparkContext에 직접 접근할 수 없으므로 다른 방법 사용
    app_info = spark.sql("SELECT current_database(), current_user()").collect()[0]
    print(f"현재 데이터베이스: {app_info[0] if app_info[0] else 'default'}")
    print(f"현재 사용자: {app_info[1] if app_info[1] else 'N/A'}")
except Exception as e:
    print(f"애플리케이션 정보 조회 제한됨 (Spark Connect 환경)")

# 현재 카탈로그 및 스키마 확인
try:
    current_catalog = spark.sql("SELECT current_catalog()").collect()[0][0]
    current_schema = spark.sql("SELECT current_schema()").collect()[0][0]
    print(f"현재 카탈로그: {current_catalog}")
    print(f"현재 스키마: {current_schema}")
except Exception as e:
    print(f"카탈로그/스키마 정보 조회 실패: {e}")

# Spark 설정 확인 (Spark Connect에서 지원되는 방식)
try:
    spark_configs = spark.sql("SET").collect()
    adaptive_enabled = any("spark.sql.adaptive.enabled" in row.key and "true" in str(row.value).lower() 
                          for row in spark_configs if hasattr(row, 'key'))
    print(f"Adaptive Query Execution: {'✅ 활성화됨' if adaptive_enabled else '❌ 비활성화됨'}")
except Exception as e:
    print(f"Spark 설정 확인 제한됨")

print("\n✅ Databricks 환경 및 Spark 세션 준비 완료!")
print(f"📅 초기화 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("\n💡 참고: Spark Connect 환경에서 일부 JVM 종속 기능들은 제한될 수 있습니다.")

🚀 Databricks Text-to-SQL RAG 시스템 초기화

📋 환경 정보:
Python 버전: 3.11.6
Databricks Runtime: N/A
Spark 버전: 3.5.2
Spark 버전: 3.5.2
현재 데이터베이스: default
현재 사용자: saintphs@gmail.com
현재 데이터베이스: default
현재 사용자: saintphs@gmail.com
현재 카탈로그: workspace
현재 스키마: default
현재 카탈로그: workspace
현재 스키마: default
Adaptive Query Execution: ❌ 비활성화됨

✅ Databricks 환경 및 Spark 세션 준비 완료!
📅 초기화 시간: 2025-06-30 08:43:26

💡 참고: Spark Connect 환경에서 일부 JVM 종속 기능들은 제한될 수 있습니다.
Adaptive Query Execution: ❌ 비활성화됨

✅ Databricks 환경 및 Spark 세션 준비 완료!
📅 초기화 시간: 2025-06-30 08:43:26

💡 참고: Spark Connect 환경에서 일부 JVM 종속 기능들은 제한될 수 있습니다.


## 2. Northwind 데이터베이스 생성 및 스키마 정의

In [2]:
import os
from typing import Dict

# Databricks 환경 확인 및 기본 설정
def setup_databricks_environment():
    """Databricks 환경 설정 및 확인"""
    try:
        # Spark 세션 확인/생성
        spark = SparkSession.getActiveSession()
        if spark is None:
            spark = SparkSession.builder.appName("Northwind-TextToSQL").getOrCreate()
        
        # 환경 정보 수집
        is_databricks = "DATABRICKS_RUNTIME_VERSION" in os.environ
        
        env_info = {
            "is_databricks": is_databricks,
            "spark_version": spark.version,
            "runtime_version": os.environ.get("DATABRICKS_RUNTIME_VERSION", "로컬"),
            "cluster_id": os.environ.get("SPARK_LOCAL_HOSTNAME", "로컬"),
        }
        
        print("🔧 Databricks 환경 정보:")
        for key, value in env_info.items():
            print(f"   {key}: {value}")
        
        print("✅ Databricks 환경 설정 완료")
        
        # 기본 데이터베이스 설정
        spark.sql("CREATE DATABASE IF NOT EXISTS northwind")
        spark.sql("USE northwind")
        
        print("📂 northwind 데이터베이스 생성/선택 완료")
        
        return spark, env_info
        
    except Exception as e:
        print(f"❌ Databricks 환경 설정 실패: {str(e)}")
        return None, None

# 환경 설정 실행
spark, env_info = setup_databricks_environment()

if spark:
    print("\n🚀 Spark 세션 활성화 성공!")
    # Spark Connect 호환 - sparkContext 사용하지 않음
    print("   Spark Connect 모드로 실행 중")

    # 현재 데이터베이스 확인
    current_db = spark.sql("SELECT current_database()").collect()[0][0]
    print(f"   현재 데이터베이스: {current_db}")
    
    # Spark 세션 정보 (Spark Connect 호환)
    app_info = spark.sql("SELECT 'session_active' as status, current_timestamp() as timestamp").collect()[0]
    print(f"   세션 시작 시간: {app_info['timestamp']}")
    
    # 현재 카탈로그 및 스키마 정보
    current_catalog = spark.sql("SELECT current_catalog()").collect()[0][0]
    current_schema = spark.sql("SELECT current_schema()").collect()[0][0]
    print(f"   현재 카탈로그: {current_catalog}")
    print(f"   현재 스키마: {current_schema}")
    
    print("\n✅ Spark 세션이 성공적으로 설정되었습니다!")

else:
    print("❌ Spark 세션이 활성화되지 않았습니다.")

# Northwind 데이터베이스 생성 및 스키마 정의

print("🏗️ Northwind 데이터베이스 생성 시작")
print("=" * 40)

# 1. 데이터베이스 생성
try:
    spark.sql("CREATE DATABASE IF NOT EXISTS northwind")
    spark.sql("USE northwind")
    print("✅ northwind 데이터베이스 생성 완료")
    
    # 현재 데이터베이스 확인
    current_db = spark.sql("SELECT current_database()").collect()[0][0]
    print(f"📂 현재 활성 데이터베이스: {current_db}")
    
    # 기존 테이블 확인
    existing_tables = [row.tableName for row in spark.sql("SHOW TABLES").collect()]
    print(f"📊 기존 테이블: {len(existing_tables)}개")
    if existing_tables:
        print("   기존 테이블 목록:")
        for table in existing_tables[:5]:  # 최대 5개만 표시
            print(f"     📋 {table}")
        if len(existing_tables) > 5:
            print(f"     ... 그 외 {len(existing_tables) - 5}개")
    else:
        print("   새로운 데이터베이스입니다. 테이블을 생성할 준비가 되었습니다.")
        
except Exception as e:
    print(f"❌ 데이터베이스 생성 실패: {e}")

# 2. Northwind 테이블 스키마 정의
northwind_schemas = {
    "categories": """
        categoryid INT,
        categoryname STRING,
        description STRING
    """,
    
    "suppliers": """
        supplierid INT,
        companyname STRING,
        contactname STRING,
        contacttitle STRING,
        address STRING,
        city STRING,
        region STRING,
        postalcode STRING,
        country STRING,
        phone STRING,
        fax STRING,
        homepage STRING
    """,
    
    "products": """
        productid INT,
        productname STRING,
        supplierid INT,
        categoryid INT,
        quantityperunit STRING,
        unitprice DECIMAL(10,2),
        unitsinstock INT,
        unitsonorder INT,
        reorderlevel INT,
        discontinued BOOLEAN
    """,
    
    "customers": """
        customerid STRING,
        companyname STRING,
        contactname STRING,
        contacttitle STRING,
        address STRING,
        city STRING,
        region STRING,
        postalcode STRING,
        country STRING,
        phone STRING,
        fax STRING
    """,
    
    "employees": """
        employeeid INT,
        lastname STRING,
        firstname STRING,
        title STRING,
        titleofcourtesy STRING,
        birthdate DATE,
        hiredate DATE,
        address STRING,
        city STRING,
        region STRING,
        postalcode STRING,
        country STRING,
        homephone STRING,
        extension STRING,
        notes STRING,
        reportsto INT
    """,
    
    "shippers": """
        shipperid INT,
        companyname STRING,
        phone STRING
    """,
    
    "orders": """
        orderid INT,
        customerid STRING,
        employeeid INT,
        orderdate DATE,
        requireddate DATE,
        shippeddate DATE,
        shipvia INT,
        freight DECIMAL(10,2),
        shipname STRING,
        shipaddress STRING,
        shipcity STRING,
        shipregion STRING,
        shippostalcode STRING,
        shipcountry STRING
    """,
    
    "order_details": """
        orderid INT,
        productid INT,
        unitprice DECIMAL(10,2),
        quantity INT,
        discount DECIMAL(3,2)
    """
}

print(f"\n📋 총 {len(northwind_schemas)}개 테이블 스키마 정의됨:")
for table_name in northwind_schemas.keys():
    print(f"   📂 {table_name}")

print("\n✅ Northwind 스키마 정의 완료!")

🔧 Databricks 환경 정보:
   is_databricks: False
   spark_version: 3.5.2
   runtime_version: 로컬
   cluster_id: 로컬
✅ Databricks 환경 설정 완료
📂 northwind 데이터베이스 생성/선택 완료

🚀 Spark 세션 활성화 성공!
   Spark Connect 모드로 실행 중
📂 northwind 데이터베이스 생성/선택 완료

🚀 Spark 세션 활성화 성공!
   Spark Connect 모드로 실행 중
   현재 데이터베이스: northwind
   현재 데이터베이스: northwind
   세션 시작 시간: 2025-06-30 08:43:27.854717
   세션 시작 시간: 2025-06-30 08:43:27.854717
   현재 카탈로그: workspace
   현재 스키마: northwind

✅ Spark 세션이 성공적으로 설정되었습니다!
🏗️ Northwind 데이터베이스 생성 시작
   현재 카탈로그: workspace
   현재 스키마: northwind

✅ Spark 세션이 성공적으로 설정되었습니다!
🏗️ Northwind 데이터베이스 생성 시작
✅ northwind 데이터베이스 생성 완료
✅ northwind 데이터베이스 생성 완료
📂 현재 활성 데이터베이스: northwind
📂 현재 활성 데이터베이스: northwind
📊 기존 테이블: 8개
   기존 테이블 목록:
     📋 categories
     📋 customers
     📋 employees
     📋 order_details
     📋 orders
     ... 그 외 3개

📋 총 8개 테이블 스키마 정의됨:
   📂 categories
   📂 suppliers
   📂 products
   📂 customers
   📂 employees
   📂 shippers
   📂 orders
   📂 order_details

✅ Northwind 스키마 정의 완료!
📊 

## 🏗️ Northwind 데이터베이스 구축

## 3. Delta Lake 테이블 생성 및 샘플 데이터 로드

In [3]:
from datetime import datetime, date, timedelta
from typing import List, Dict, Any
import numpy as np
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, BooleanType, DateType

class DatabaseSchemaAnalyzer:
    """실제 Databricks 환경에서 스키마 분석 및 벡터화"""
    
    def __init__(self, spark_session, embedding_model=None):
        self.spark = spark_session
        self.embedding_model = embedding_model
        self.schema_cache = {}
        
    def extract_table_schema(self, database_name: str, table_name: str) -> Dict[str, Any]:
        """실제 테이블에서 스키마 정보 추출"""
        try:
            full_table_name = f"{database_name}.{table_name}"
            
            # 테이블 설명 정보 가져오기
            describe_df = self.spark.sql(f"DESCRIBE TABLE EXTENDED {full_table_name}")
            columns_info = []
            table_properties = {}
            
            # 컬럼 정보 처리
            for row in describe_df.collect():
                col_name = row.col_name
                data_type = row.data_type
                comment = row.comment
                
                if col_name and not col_name.startswith('#') and col_name != '':
                    if col_name == '# Detailed Table Information':
                        break
                    
                    columns_info.append({
                        'name': col_name,
                        'type': data_type,
                        'comment': comment or ''
                    })
            
            # 샘플 데이터 가져오기 (처음 3개 행)
            sample_data = []
            try:
                sample_df = self.spark.sql(f"SELECT * FROM {full_table_name} LIMIT 3")
                sample_data = [row.asDict() for row in sample_df.collect()]
            except Exception:
                sample_data = []
            
            # 테이블 통계 정보
            try:
                count_result = self.spark.sql(f"SELECT COUNT(*) as row_count FROM {full_table_name}").collect()
                row_count = count_result[0].row_count if count_result else 0
            except Exception:
                row_count = 0
            
            schema_info = {
                'database_name': database_name,
                'table_name': table_name,
                'full_name': full_table_name,
                'columns': columns_info,
                'sample_data': sample_data,
                'row_count': row_count,
                'extracted_at': datetime.now().isoformat()
            }
            
            # 캐시에 저장
            self.schema_cache[full_table_name] = schema_info
            
            return schema_info
            
        except Exception as e:
            print(f"❌ 테이블 {database_name}.{table_name} 스키마 추출 실패: {str(e)}")
            return None
    
    def analyze_database_schema(self, database_name: str, limit_tables: int = 10) -> List[Dict[str, Any]]:
        """데이터베이스 전체 스키마 분석"""
        if not self.spark:
            print("❌ Spark 세션이 없습니다.")
            return []
        
        try:
            # 데이터베이스의 모든 테이블 가져오기
            tables_df = self.spark.sql(f"SHOW TABLES IN {database_name}")
            tables = [(row.database, row.tableName) for row in tables_df.collect()]
            
            if limit_tables:
                tables = tables[:limit_tables]
            
            print(f"📊 {database_name} 데이터베이스 분석 시작 ({len(tables)}개 테이블)")
            
            schemas = []
            for db, table in tables:
                print(f"   🔍 {table} 분석 중...")
                schema = self.extract_table_schema(db, table)
                if schema:
                    schemas.append(schema)
            
            print(f"✅ 스키마 분석 완료: {len(schemas)}개 테이블")
            return schemas
            
        except Exception as e:
            print(f"❌ 데이터베이스 스키마 분석 실패: {str(e)}")
            return []
    
    def create_schema_text_for_embedding(self, schema_info: Dict[str, Any]) -> str:
        """스키마 정보를 임베딩을 위한 텍스트로 변환"""
        table_name = schema_info['table_name']
        columns = schema_info['columns']
        
        # 테이블 설명 텍스트 생성
        text_parts = [
            f"테이블명: {table_name}",
            f"데이터베이스: {schema_info['database_name']}",
            f"행 수: {schema_info.get('row_count', 0):,}"
        ]
        
        # 컬럼 정보 추가
        text_parts.append("컬럼 정보:")
        for col in columns:
            col_desc = f"- {col['name']} ({col['type']})"
            if col.get('comment'):
                col_desc += f": {col['comment']}"
            text_parts.append(col_desc)
        
        # 샘플 데이터 키 추가 (검색에 도움)
        if schema_info.get('sample_data'):
            sample_keys = list(schema_info['sample_data'][0].keys()) if schema_info['sample_data'] else []
            if sample_keys:
                text_parts.append(f"주요 필드: {', '.join(sample_keys[:5])}")
        
        return "\n".join(text_parts)
    
    def vectorize_schemas(self, schemas: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """스키마 정보를 벡터화"""
        if not self.embedding_model:
            print("⚠️ 임베딩 모델이 없어 벡터화를 건너뜁니다.")
            return schemas
        
        vectorized_schemas = []
        
        for schema in schemas:
            try:
                # 텍스트 생성
                schema_text = self.create_schema_text_for_embedding(schema)
                
                # 벡터화
                embedding = self.embedding_model.embed_query(schema_text)
                
                # 벡터 정보 추가
                schema['schema_text'] = schema_text
                schema['embedding'] = embedding
                schema['embedding_dimension'] = len(embedding)
                
                vectorized_schemas.append(schema)
                
            except Exception as e:
                print(f"⚠️ {schema['table_name']} 벡터화 실패: {str(e)}")
                vectorized_schemas.append(schema)  # 벡터 없이라도 추가
        
        print(f"✅ 스키마 벡터화 완료: {len(vectorized_schemas)}개")
        return vectorized_schemas

class NorthwindDataBuilder:
    """Northwind 샘플 데이터베이스 구축 클래스"""
    
    def __init__(self, spark_session):
        self.spark = spark_session
        self.schemas = {}
        self.sample_data = {}
        
    def define_schemas(self):
        """테이블 스키마 정의"""
        
        # Categories (카테고리)
        self.schemas['categories'] = StructType([
            StructField("category_id", IntegerType(), False),
            StructField("category_name", StringType(), False),
            StructField("description", StringType(), True)
        ])
        
        # Suppliers (공급업체)
        self.schemas['suppliers'] = StructType([
            StructField("supplier_id", IntegerType(), False),
            StructField("company_name", StringType(), False),
            StructField("contact_name", StringType(), True),
            StructField("contact_title", StringType(), True),
            StructField("address", StringType(), True),
            StructField("city", StringType(), True),
            StructField("country", StringType(), True),
            StructField("phone", StringType(), True)
        ])
        
        # Products (상품)
        self.schemas['products'] = StructType([
            StructField("product_id", IntegerType(), False),
            StructField("product_name", StringType(), False),
            StructField("supplier_id", IntegerType(), True),
            StructField("category_id", IntegerType(), True),
            StructField("unit_price", DoubleType(), True),
            StructField("units_in_stock", IntegerType(), True),
            StructField("units_on_order", IntegerType(), True),
            StructField("discontinued", BooleanType(), True)
        ])
        
        # Customers (고객)
        self.schemas['customers'] = StructType([
            StructField("customer_id", StringType(), False),
            StructField("company_name", StringType(), False),
            StructField("contact_name", StringType(), True),
            StructField("contact_title", StringType(), True),
            StructField("address", StringType(), True),
            StructField("city", StringType(), True),
            StructField("country", StringType(), True),
            StructField("phone", StringType(), True)
        ])
        
        # Employees (직원)
        self.schemas['employees'] = StructType([
            StructField("employee_id", IntegerType(), False),
            StructField("first_name", StringType(), False),
            StructField("last_name", StringType(), False),
            StructField("title", StringType(), True),
            StructField("birth_date", DateType(), True),
            StructField("hire_date", DateType(), True),
            StructField("city", StringType(), True),
            StructField("country", StringType(), True),
            StructField("reports_to", IntegerType(), True)
        ])
        
        # Shippers (배송업체)
        self.schemas['shippers'] = StructType([
            StructField("shipper_id", IntegerType(), False),
            StructField("company_name", StringType(), False),
            StructField("phone", StringType(), True)
        ])
        
        # Orders (주문)
        self.schemas['orders'] = StructType([
            StructField("order_id", IntegerType(), False),
            StructField("customer_id", StringType(), True),
            StructField("employee_id", IntegerType(), True),
            StructField("order_date", DateType(), True),
            StructField("required_date", DateType(), True),
            StructField("shipped_date", DateType(), True),
            StructField("ship_via", IntegerType(), True),
            StructField("freight", DoubleType(), True),
            StructField("ship_name", StringType(), True),
            StructField("ship_address", StringType(), True),
            StructField("ship_city", StringType(), True),
            StructField("ship_country", StringType(), True)
        ])
        
        # Order Details (주문 상세)
        self.schemas['order_details'] = StructType([
            StructField("order_id", IntegerType(), False),
            StructField("product_id", IntegerType(), False),
            StructField("unit_price", DoubleType(), False),
            StructField("quantity", IntegerType(), False),
            StructField("discount", DoubleType(), True)
        ])
        
        print("✅ 테이블 스키마 정의 완료")
        print(f"   정의된 테이블: {list(self.schemas.keys())}")
        
    def generate_sample_data(self):
        """샘플 데이터 생성"""
        
        # Categories
        self.sample_data['categories'] = [
            (1, "음료", "청량음료, 커피, 차, 맥주, 에일"),
            (2, "조미료", "달콤하고 맛있는 소스, 양념, 조미료, 향신료"),
            (3, "과자류", "디저트, 사탕, 달콤한 빵"),
            (4, "유제품", "치즈"),
            (5, "곡물/시리얼", "빵, 크래커, 파스타, 시리얼"),
            (6, "육류/가금류", "조리된 육류"),
            (7, "농산물", "건조 과일과 콩 두부"),
            (8, "해산물", "해초와 생선")
        ]
        
        # Suppliers
        self.sample_data['suppliers'] = [
            (1, "삼성식품", "김삼성", "영업 담당자", "서울시 강남구", "서울", "한국", "02-123-4567"),
            (2, "LG무역", "이엘지", "구매 관리자", "부산시 해운대구", "부산", "한국", "051-234-5678"),
            (3, "현대물산", "박현대", "마케팅 이사", "인천시 남동구", "인천", "한국", "032-345-6789"),
            (4, "롯데상사", "최롯데", "영업 이사", "대구시 중구", "대구", "한국", "053-456-7890"),
            (5, "SK케미칼", "정에스케이", "품질 관리자", "광주시 서구", "광주", "한국", "062-567-8901")
        ]
        
        # Products
        self.sample_data['products'] = [
            (1, "김치", 1, 2, 15000.0, 100, 0, False),
            (2, "비빔밥", 2, 5, 12000.0, 50, 10, False),
            (3, "불고기", 3, 6, 25000.0, 30, 5, False),
            (4, "된장찌개", 1, 2, 8000.0, 80, 20, False),
            (5, "갈비탕", 3, 6, 18000.0, 25, 0, False),
            (6, "냉면", 2, 5, 10000.0, 60, 15, False),
            (7, "삼겹살", 4, 6, 22000.0, 40, 8, False),
            (8, "해물파전", 5, 8, 16000.0, 35, 12, False),
            (9, "떡볶이", 1, 3, 7000.0, 90, 25, False),
            (10, "순두부찌개", 2, 4, 9000.0, 70, 18, False)
        ]
        
        # Customers  
        self.sample_data['customers'] = [
            ("KOREA", "한국식당", "김한국", "사장", "서울시 종로구", "서울", "한국", "02-111-2222"),
            ("JAPAN", "일본레스토랑", "사토 다카시", "매니저", "도쿄도 시부야구", "도쿄", "일본", "+81-3-1234"),
            ("CHINA", "중국반점", "왕중국", "사장", "베이징시 차오양구", "베이징", "중국", "+86-10-5678"),
            ("USA", "코리안 BBQ", "John Kim", "Owner", "LA California", "로스앤젤레스", "미국", "+1-213-9999"),
            ("CANAD", "한식당", "이민수", "사장", "토론토 온타리오", "토론토", "캐나다", "+1-416-8888")
        ]
        
        # Employees
        today = date.today()
        self.sample_data['employees'] = [
            (1, "김", "사장", "CEO", date(1970, 1, 1), date(2020, 1, 1), "서울", "한국", None),
            (2, "이", "부장", "영업부장", date(1975, 6, 15), date(2020, 3, 1), "서울", "한국", 1),
            (3, "박", "과장", "마케팅과장", date(1980, 3, 20), date(2021, 1, 15), "부산", "한국", 2),
            (4, "최", "대리", "영업대리", date(1985, 9, 10), date(2022, 6, 1), "대구", "한국", 2),
            (5, "정", "사원", "영업사원", date(1990, 12, 5), date(2023, 2, 1), "광주", "한국", 4)
        ]
        
        # Shippers
        self.sample_data['shippers'] = [
            (1, "한진택배", "1588-0011"),
            (2, "CJ대한통운", "1588-1255"),
            (3, "롯데택배", "1588-2121")
        ]
        
        # Orders (최근 6개월 데이터)
        base_date = today - timedelta(days=180)
        self.sample_data['orders'] = []
        for i in range(1, 101):  # 100개 주문
            order_date = base_date + timedelta(days=np.random.randint(0, 180))
            required_date = order_date + timedelta(days=np.random.randint(3, 14))
            shipped_date = order_date + timedelta(days=np.random.randint(1, 10)) if np.random.random() > 0.1 else None
            
            customer_id = np.random.choice(["KOREA", "JAPAN", "CHINA", "USA", "CANAD"])
            employee_id = np.random.randint(1, 6)
            shipper_id = np.random.randint(1, 4)
            
            self.sample_data['orders'].append((
                i, customer_id, employee_id, order_date, required_date, shipped_date,
                shipper_id, np.round(np.random.uniform(5000, 50000), 2),
                f"배송지_{i}", f"주소_{i}", f"도시_{i}", "한국"
            ))
        
        # Order Details
        self.sample_data['order_details'] = []
        for order_id in range(1, 101):
            # 각 주문당 1-5개 상품
            num_products = np.random.randint(1, 6)
            products = np.random.choice(range(1, 11), num_products, replace=False)
            
            for product_id in products:
                # 상품 가격 가져오기
                product_price = next(p[4] for p in self.sample_data['products'] if p[0] == product_id)
                quantity = np.random.randint(1, 10)
                discount = np.round(np.random.choice([0.0, 0.05, 0.1, 0.15], p=[0.7, 0.15, 0.1, 0.05]), 2)
                
                self.sample_data['order_details'].append((
                    order_id, int(product_id), product_price, quantity, discount
                ))
        
        print("✅ 샘플 데이터 생성 완료")
        for table, data in self.sample_data.items():
            print(f"   {table}: {len(data)}개 레코드")

# 스키마 분석기 초기화 (embedding_model은 선택사항)
if spark:
    # 임베딩 모델 없이 초기화 (기본 스키마 생성용)
    schema_analyzer = DatabaseSchemaAnalyzer(spark, embedding_model=None)
    print("✅ 데이터베이스 스키마 분석기 초기화 완료")

    # Northwind 데이터 빌더 초기화 및 샘플 데이터 생성
    northwind_builder = NorthwindDataBuilder(spark)
    northwind_builder.define_schemas()
    northwind_builder.generate_sample_data()
    print("\n🏗️ Northwind 데이터 빌더 준비 완료!")
else:
    schema_analyzer = None
    print("⚠️ Spark 세션이 없어 스키마 분석기를 초기화할 수 없습니다.")

# Delta Lake 테이블 생성 및 샘플 데이터 로드

print("📦 Delta Lake 테이블 생성 및 데이터 로드")
print("=" * 45)

# 1. 기존 테이블 삭제 (초기화)
print("\n🔄 기존 테이블 정리 중...")
for table_name in northwind_builder.schemas.keys():
    try:
        spark.sql(f"DROP TABLE IF EXISTS northwind.{table_name}")
        print(f"   🗑️ {table_name} 테이블 삭제됨")
    except Exception as e:
        print(f"   ⚠️ {table_name} 삭제 중 오류: {e}")

# 2. 샘플 데이터 정의
sample_data = {
    "categories": [
        (1, "Beverages", "Soft drinks, coffees, teas, beers, and ales"),
        (2, "Condiments", "Sweet and savory sauces, relishes, spreads, and seasonings"),
        (3, "Dairy Products", "Cheeses"),
        (4, "Grains/Cereals", "Breads, crackers, pasta, and cereal"),
        (5, "Meat/Poultry", "Prepared meats"),
        (6, "Produce", "Dried fruit and bean curd"),
        (7, "Seafood", "Seaweed and fish"),
        (8, "Confections", "Desserts, candies, and sweet breads")
    ],
    
    "suppliers": [
        (1, "Exotic Liquids", "Charlotte Cooper", "Purchasing Manager", "49 Gilbert St.", "London", None, "EC1 4SD", "UK", "(171) 555-2222", None, None),
        (2, "New Orleans Cajun Delights", "Shelley Burke", "Order Administrator", "P.O. Box 78934", "New Orleans", "LA", "70117", "USA", "(100) 555-4822", None, "#CAJUN.HTM#"),
        (3, "Grandma Kelly's Homestead", "Regina Murphy", "Sales Representative", "707 Oxford Rd.", "Ann Arbor", "MI", "48104", "USA", "(313) 555-5735", "(313) 555-3349", None),
        (4, "Tokyo Traders", "Yoshi Nagase", "Marketing Manager", "9-8 Sekimai Musashino-shi", "Tokyo", None, "100", "Japan", "(03) 3555-5011", None, None),
        (5, "Cooperativa de Quesos 'Las Cabras'", "Antonio del Valle Saavedra", "Export Administrator", "Calle del Rosal 4", "Oviedo", "Asturias", "33007", "Spain", "(98) 598 76 98", None, None)
    ],
    
    "products": [
        (1, "Chai", 1, 1, "10 boxes x 20 bags", 18.00, 39, 0, 10, False),
        (2, "Chang", 1, 1, "24 - 12 oz bottles", 19.00, 17, 40, 25, False),
        (3, "Aniseed Syrup", 1, 2, "12 - 550 ml bottles", 10.00, 13, 70, 25, False),
        (4, "Chef Anton's Cajun Seasoning", 2, 2, "48 - 6 oz jars", 22.00, 53, 0, 0, False),
        (5, "Chef Anton's Gumbo Mix", 2, 2, "36 boxes", 21.35, 0, 0, 0, True),
        (6, "Grandma's Boysenberry Spread", 3, 2, "12 - 8 oz jars", 25.00, 120, 0, 25, False),
        (7, "Uncle Bob's Organic Dried Pears", 3, 7, "12 - 1 lb pkgs.", 30.00, 15, 0, 10, False),
        (8, "Northwoods Cranberry Sauce", 3, 2, "12 - 12 oz jars", 40.00, 6, 0, 0, False),
        (9, "Mishi Kobe Niku", 4, 6, "18 - 500 g pkgs.", 97.00, 29, 0, 0, True),
        (10, "Ikura", 4, 8, "12 - 200 ml jars", 31.00, 31, 0, 0, False)
    ],
    
    "customers": [
        ("ALFKI", "Alfreds Futterkiste", "Maria Anders", "Sales Representative", "Obere Str. 57", "Berlin", None, "12209", "Germany", "030-0074321", "030-0076545"),
        ("ANATR", "Ana Trujillo Emparedados y helados", "Ana Trujillo", "Owner", "Avda. de la Constitución 2222", "México D.F.", None, "05021", "Mexico", "(5) 555-4729", "(5) 555-3745"),
        ("ANTON", "Antonio Moreno Taquería", "Antonio Moreno", "Owner", "Mataderos 2312", "México D.F.", None, "05023", "Mexico", "(5) 555-3932", None),
        ("AROUT", "Around the Horn", "Thomas Hardy", "Sales Representative", "120 Hanover Sq.", "London", None, "WA1 1DP", "UK", "(171) 555-7788", "(171) 555-6750"),
        ("BERGS", "Berglunds snabbköp", "Christina Berglund", "Order Administrator", "Berguvsvägen 8", "Luleå", None, "S-958 22", "Sweden", "0921-12 34 65", "0921-12 34 67"),
        ("BLAUS", "Blauer See Delikatessen", "Hanna Moos", "Sales Representative", "Forsterstr. 57", "Mannheim", None, "68306", "Germany", "0621-08460", "0621-08924"),
        ("BLONP", "Blondesddsl père et fils", "Frédérique Citeaux", "Marketing Manager", "24, place Kléber", "Strasbourg", None, "67000", "France", "88.60.15.31", "88.60.15.32"),
        ("BOLID", "Bólido Comidas preparadas", "Martín Sommer", "Owner", "C/ Araquil, 67", "Madrid", None, "28023", "Spain", "(91) 555 22 82", "(91) 555 91 99"),
        ("BONAP", "Bon app'", "Laurence Lebihan", "Owner", "12, rue des Bouchers", "Marseille", None, "13008", "France", "91.24.45.40", "91.24.45.41"),
        ("BOTTM", "Bottom-Dollar Markets", "Elizabeth Lincoln", "Accounting Manager", "23 Tsawassen Blvd.", "Tsawassen", "BC", "T2F 8M4", "Canada", "(604) 555-4729", "(604) 555-3745")
    ],
    
    "employees": [
        (1, "Davolio", "Nancy", "Sales Representative", "Ms.", date(1948, 12, 8), date(1992, 5, 1), "507 - 20th Ave. E. Apt. 2A", "Seattle", "WA", "98122", "USA", "(206) 555-9857", "5467", "Education includes a BA in psychology from Colorado State University in 1970.", None),
        (2, "Fuller", "Andrew", "Vice President, Sales", "Dr.", date(1952, 2, 19), date(1992, 8, 14), "908 W. Capital Way", "Tacoma", "WA", "98401", "USA", "(206) 555-9482", "3457", "Andrew received his BTS commercial in 1974 and a Ph.D. in international marketing from the University of Dallas in 1981.", None),
        (3, "Leverling", "Janet", "Sales Representative", "Ms.", date(1963, 8, 30), date(1992, 4, 1), "722 Moss Bay Blvd.", "Kirkland", "WA", "98033", "USA", "(206) 555-3412", "3355", "Janet has a BS degree in chemistry from Boston College (1984).", 2),
        (4, "Peacock", "Margaret", "Sales Representative", "Mrs.", date(1937, 9, 19), date(1993, 5, 3), "4110 Old Redmond Rd.", "Redmond", "WA", "98052", "USA", "(206) 555-8122", "5176", "Margaret holds a BA in English literature from Concordia College (1958) and an MA from the American Institute of Culinary Arts (1966).", 2),
        (5, "Buchanan", "Steven", "Sales Manager", "Mr.", date(1955, 3, 4), date(1993, 10, 17), "14 Garrett Hill", "London", None, "SW1 8JR", "UK", "(71) 555-4848", "3453", "Steven Buchanan graduated from St. Andrews University, Scotland, with a BSC degree in 1976.", 2)
    ],
    
    "shippers": [
        (1, "Speedy Express", "(503) 555-9831"),
        (2, "United Package", "(503) 555-3199"),
        (3, "Federal Shipping", "(503) 555-9931")
    ],
    
    "orders": [
        (10248, "VINET", 5, date(1996, 7, 4), date(1996, 8, 1), date(1996, 7, 16), 3, 32.38, "Vins et alcools Chevalier", "59 rue de l'Abbaye", "Reims", None, "51100", "France"),
        (10249, "TOMSP", 6, date(1996, 7, 5), date(1996, 8, 16), date(1996, 7, 10), 1, 11.61, "Toms Spezialitäten", "Luisenstr. 48", "Münster", None, "44087", "Germany"),
        (10250, "HANAR", 4, date(1996, 7, 8), date(1996, 8, 5), date(1996, 7, 12), 2, 65.83, "Hanari Carnes", "Rua do Paço, 67", "Rio de Janeiro", "RJ", "05454-876", "Brazil"),
        (10251, "VICTE", 3, date(1996, 7, 8), date(1996, 8, 5), date(1996, 7, 15), 1, 41.34, "Victuailles en stock", "2, rue du Commerce", "Lyon", None, "69004", "France"),
        (10252, "SUPRD", 4, date(1996, 7, 9), date(1996, 8, 6), date(1996, 7, 11), 2, 51.30, "Suprêmes délices", "Boulevard Tirou, 255", "Charleroi", None, "B-6000", "Belgium"),
        (10253, "HANAR", 3, date(1996, 7, 10), date(1996, 7, 24), date(1996, 7, 16), 2, 58.17, "Hanari Carnes", "Rua do Paço, 67", "Rio de Janeiro", "RJ", "05454-876", "Brazil"),
        (10254, "CHOPS", 5, date(1996, 7, 11), date(1996, 8, 8), date(1996, 7, 23), 2, 22.98, "Chop-suey Chinese", "Hauptstr. 31", "Bern", None, "3012", "Switzerland"),
        (10255, "RICSU", 9, date(1996, 7, 12), date(1996, 8, 9), date(1996, 7, 15), 3, 148.33, "Richter Supermarkt", "Starenweg 5", "Genève", None, "1204", "Switzerland"),
        (10256, "WELLI", 3, date(1996, 7, 15), date(1996, 8, 12), date(1996, 7, 17), 2, 13.97, "Wellington Importadora", "Rua do Mercado, 12", "Resende", "SP", "08737-363", "Brazil"),
        (10257, "HILAA", 4, date(1996, 7, 16), date(1996, 8, 13), date(1996, 7, 22), 3, 81.91, "HILARION-Abastos", "Carrera 22 con Ave. Carlos Soublette #8-35", "San Cristóbal", "Táchira", "5022", "Venezuela")
    ],
    
    "order_details": [
        (10248, 11, 14.00, 12, 0.00),
        (10248, 42, 9.80, 10, 0.00),
        (10248, 72, 34.80, 5, 0.00),
        (10249, 14, 18.60, 9, 0.00),
        (10249, 51, 42.40, 40, 0.00),
        (10250, 41, 7.70, 10, 0.00),
        (10250, 51, 42.40, 35, 0.15),
        (10250, 65, 16.80, 15, 0.15),
        (10251, 22, 16.80, 6, 0.05),
        (10251, 57, 15.60, 15, 0.05),
        (10251, 65, 16.80, 20, 0.00),
        (10252, 20, 64.80, 40, 0.05),
        (10252, 33, 2.00, 25, 0.05),
        (10252, 60, 27.20, 40, 0.00),
        (10253, 31, 10.00, 20, 0.00),
        (10253, 39, 14.40, 42, 0.00),
        (10253, 49, 16.00, 40, 0.00)
    ]
}

print(f"\n📊 샘플 데이터 정의 완료:")
for table_name, data in sample_data.items():
    print(f"   📈 {table_name}: {len(data)}개 레코드")

print("\n✅ 샘플 데이터 준비 완료!")

✅ 데이터베이스 스키마 분석기 초기화 완료
✅ 테이블 스키마 정의 완료
   정의된 테이블: ['categories', 'suppliers', 'products', 'customers', 'employees', 'shippers', 'orders', 'order_details']
✅ 샘플 데이터 생성 완료
   categories: 8개 레코드
   suppliers: 5개 레코드
   products: 10개 레코드
   customers: 5개 레코드
   employees: 5개 레코드
   shippers: 3개 레코드
   orders: 100개 레코드
   order_details: 290개 레코드

🏗️ Northwind 데이터 빌더 준비 완료!
📦 Delta Lake 테이블 생성 및 데이터 로드

🔄 기존 테이블 정리 중...
   🗑️ categories 테이블 삭제됨
   🗑️ categories 테이블 삭제됨
   🗑️ suppliers 테이블 삭제됨
   🗑️ suppliers 테이블 삭제됨
   🗑️ products 테이블 삭제됨
   🗑️ products 테이블 삭제됨
   🗑️ customers 테이블 삭제됨
   🗑️ customers 테이블 삭제됨
   🗑️ employees 테이블 삭제됨
   🗑️ employees 테이블 삭제됨
   🗑️ shippers 테이블 삭제됨
   🗑️ shippers 테이블 삭제됨
   🗑️ orders 테이블 삭제됨
   🗑️ orders 테이블 삭제됨
   🗑️ order_details 테이블 삭제됨

📊 샘플 데이터 정의 완료:
   📈 categories: 8개 레코드
   📈 suppliers: 5개 레코드
   📈 products: 10개 레코드
   📈 customers: 10개 레코드
   📈 employees: 5개 레코드
   📈 shippers: 3개 레코드
   📈 orders: 10개 레코드
   📈 order_details: 17개 레코드

✅ 샘플 데이터 준비 완료!
  

## 4. 테이블 생성 및 데이터 삽입

In [4]:
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import re
from typing import List, Dict, Any

class RAGSchemaRetriever:
    """RAG 기반 스키마 검색 및 매칭"""
    
    def __init__(self, vectorized_schemas: List[Dict[str, Any]], embedding_model=None):
        self.vectorized_schemas = vectorized_schemas
        self.embedding_model = embedding_model
        self.schema_embeddings = None
        self.schema_texts = None
        
        # 임베딩 매트릭스 구성
        self._build_embedding_matrix()
    
    def _build_embedding_matrix(self):
        """스키마 임베딩 매트릭스 구성"""
        embeddings = []
        texts = []
        
        for schema in self.vectorized_schemas:
            if 'embedding' in schema:
                embeddings.append(schema['embedding'])
                texts.append(schema.get('schema_text', ''))
            
        if embeddings:
            self.schema_embeddings = np.array(embeddings)
            self.schema_texts = texts
            print(f"✅ 임베딩 매트릭스 구성 완료: {self.schema_embeddings.shape}")
        else:
            print("⚠️ 사용 가능한 임베딩이 없습니다.")
    
    def search_relevant_schemas(self, question: str, top_k: int = 3) -> List[Dict[str, Any]]:
        """질문과 관련된 스키마 검색"""
        if not self.embedding_model or self.schema_embeddings is None:
            print("⚠️ 임베딩 기반 검색을 사용할 수 없습니다. 키워드 검색으로 대체합니다.")
            return self._keyword_based_search(question, top_k)
        
        try:
            # 질문 임베딩
            question_embedding = self.embedding_model.embed_query(question)
            question_vector = np.array(question_embedding).reshape(1, -1)
            
            # 코사인 유사도 계산
            similarities = cosine_similarity(question_vector, self.schema_embeddings)[0]
            
            # 상위 k개 인덱스
            top_indices = np.argsort(similarities)[::-1][:top_k]
            
            # 결과 구성
            results = []
            for idx in top_indices:
                schema = self.vectorized_schemas[idx].copy()
                schema['similarity_score'] = float(similarities[idx])
                results.append(schema)
            
            print(f"🔍 벡터 검색 완료: {len(results)}개 스키마 매칭")
            return results
            
        except Exception as e:
            print(f"❌ 벡터 검색 실패: {str(e)}")
            return self._keyword_based_search(question, top_k)
    
    def _keyword_based_search(self, question: str, top_k: int = 3) -> List[Dict[str, Any]]:
        """키워드 기반 스키마 검색 (백업 방법)"""
        question_lower = question.lower()
        
        # 키워드 매칭 스코어 계산
        scored_schemas = []
        
        for schema in self.vectorized_schemas:
            score = 0
            
            # 테이블명 매칭
            if question_lower in schema['table_name'].lower():
                score += 10
            
            # 컬럼명 매칭
            for col in schema['columns']:
                if question_lower in col['name'].lower():
                    score += 5
                if col.get('comment') and question_lower in col['comment'].lower():
                    score += 3
            
            # 샘플 데이터 매칭
            sample_text = str(schema.get('sample_data', '')).lower()
            if question_lower in sample_text:
                score += 2
            
            if score > 0:
                schema_copy = schema.copy()
                schema_copy['similarity_score'] = score
                scored_schemas.append(schema_copy)
        
        # 스코어로 정렬
        scored_schemas.sort(key=lambda x: x['similarity_score'], reverse=True)
        
        print(f"🔍 키워드 검색 완료: {len(scored_schemas[:top_k])}개 스키마 매칭")
        return scored_schemas[:top_k]

class TextToSQLGenerator:
    """Text-to-SQL 생성기"""
    
    def __init__(self, foundation_model, schema_retriever: RAGSchemaRetriever):
        self.foundation_model = foundation_model
        self.schema_retriever = schema_retriever
        
    def generate_sql(self, question: str, top_k_schemas: int = 3) -> Dict[str, Any]:
        """자연어 질문을 SQL로 변환"""
        try:
            # 1. 관련 스키마 검색
            relevant_schemas = self.schema_retriever.search_relevant_schemas(question, top_k_schemas)
            
            if not relevant_schemas:
                return {
                    "success": False,
                    "error": "관련된 테이블을 찾을 수 없습니다.",
                    "sql_query": None
                }
            
            # 2. 프롬프트 생성
            prompt = self._create_prompt(question, relevant_schemas)
            
            # 3. LLM으로 SQL 생성
            if self.foundation_model:
                response = self.foundation_model.predict(prompt)
                sql_query = self._extract_sql_from_response(response)
            else:
                return {
                    "success": False,
                    "error": "언어 모델을 사용할 수 없습니다.",
                    "sql_query": None
                }
            
            # 4. 결과 구성
            return {
                "success": True,
                "sql_query": sql_query,
                "relevant_schemas": relevant_schemas,
                "raw_response": response,
                "prompt_used": prompt
            }
            
        except Exception as e:
            return {
                "success": False,
                "error": f"SQL 생성 실패: {str(e)}",
                "sql_query": None
            }
    
    def _create_prompt(self, question: str, schemas: List[Dict[str, Any]]) -> str:
        """RAG 기반 프롬프트 생성"""
        schema_context = ""
        
        for i, schema in enumerate(schemas, 1):
            schema_context += f"\n=== 테이블 {i}: {schema['full_name']} ===\n"
            schema_context += f"행 수: {schema.get('row_count', 0):,}\n"
            schema_context += "컬럼:\n"
            
            for col in schema['columns']:
                comment = f" -- {col['comment']}" if col.get('comment') else ""
                schema_context += f"  - {col['name']} ({col['type']}){comment}\n"
            
            # 샘플 데이터 일부 포함
            if schema.get('sample_data'):
                schema_context += "샘플 데이터 (일부):\n"
                for sample in schema['sample_data'][:2]:
                    # 중요한 필드만 표시
                    key_fields = list(sample.keys())[:3]
                    sample_str = {k: sample[k] for k in key_fields}
                    schema_context += f"  {sample_str}\n"
            
            if 'similarity_score' in schema:
                schema_context += f"매칭 점수: {schema['similarity_score']:.3f}\n"
        
        prompt = f"""당신은 Databricks 환경의 SQL 전문가입니다. 주어진 데이터베이스 스키마를 바탕으로 자연어 질문을 정확한 SQL 쿼리로 변환하세요.

=== 데이터베이스 스키마 정보 ===
{schema_context}

=== 변환 규칙 ===
1. 위에 제공된 테이블과 컬럼명만 사용하세요
2. Databricks SQL 문법을 사용하세요
3. 적절한 JOIN, WHERE, GROUP BY, ORDER BY를 활용하세요
4. 결과는 실행 가능한 SQL 쿼리만 반환하세요
5. 쿼리를 ```sql과 ``` 사이에 작성하세요

=== 예시 ===
질문: "고객별 주문 건수를 보여주세요"
```sql
SELECT customer_id, COUNT(*) as order_count
FROM orders
GROUP BY customer_id
ORDER BY order_count DESC;
```

=== 질문 ===
{question}

=== 답변 ===
"""
        return prompt
    
    def _extract_sql_from_response(self, response: str) -> str:
        """LLM 응답에서 SQL 쿼리 추출"""
        # ```sql과 ``` 사이의 내용 추출
        sql_pattern = r'```sql\s*(.*?)\s*```'
        matches = re.findall(sql_pattern, response, re.DOTALL | re.IGNORECASE)
        
        if matches:
            return matches[0].strip()
        
        # SQL 키워드로 시작하는 부분 찾기
        lines = response.strip().split('\n')
        sql_keywords = ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'WITH', 'CREATE']
        
        for line in lines:
            line_upper = line.strip().upper()
            if any(line_upper.startswith(keyword) for keyword in sql_keywords):
                return line.strip()
        
        return response.strip()

print("✅ RAG 스키마 검색 및 SQL 생성기 정의 완료")

def create_delta_tables():
    """Delta Lake 테이블 생성 및 데이터 로드"""
    
    if not spark or not northwind_builder:
        print("❌ Spark 세션 또는 데이터 빌더가 필요합니다.")
        return False
    
    created_tables = []
    failed_tables = []
    
    try:
        print("🔧 Delta Lake 테이블 생성 시작...")
        
        # 각 테이블별로 생성
        for table_name, schema in northwind_builder.schemas.items():
            try:
                print(f"\n📋 {table_name} 테이블 생성 중...")
                
                # 데이터프레임 생성
                data = northwind_builder.sample_data[table_name]
                df = spark.createDataFrame(data, schema)
                
                # Delta 테이블로 저장 (덮어쓰기)
                df.write \
                  .format("delta") \
                  .mode("overwrite") \
                  .option("mergeSchema", "true") \
                  .saveAsTable(f"northwind.{table_name}")
                
                # 데이터 확인
                count = spark.table(f"northwind.{table_name}").count()
                print(f"   ✅ {table_name}: {count}개 레코드 로드")
                
                created_tables.append(table_name)
            
            except Exception as e:
                print(f"   ❌ {table_name} 테이블 생성/삽입 실패: {e}")
                failed_tables.append((table_name, str(e)))

        print(f"\n📈 테이블 생성 결과:")
        print(f"   ✅ 성공: {len(created_tables)}개 테이블")
        print(f"   ❌ 실패: {len(failed_tables)}개 테이블")

        if created_tables:
            print(f"\n✅ 생성된 테이블:")
            for table in created_tables:
                print(f"   📂 northwind.{table}")

        if failed_tables:
            print(f"\n❌ 실패한 테이블:")
            for table, error in failed_tables:
                print(f"   📂 {table}: {error}")

        print(f"\n🎉 총 {len(created_tables)}개 Northwind 테이블이 Delta Lake에 생성되었습니다!")
        
    except Exception as e:
        print(f"❌ 테이블 생성 실패: {str(e)}")
        return False

def verify_northwind_database():
    """Northwind 데이터베이스 검증"""
    
    if not spark:
        print("❌ Spark 세션이 필요합니다.")
        return False
    
    try:
        print("🔍 Northwind 데이터베이스 검증 중...")
        
        # 테이블 목록 확인
        tables = spark.sql("SHOW TABLES IN northwind").collect()
        table_names = [row.tableName for row in tables]
        
        print(f"\n📊 생성된 테이블 ({len(table_names)}개):")
        
        verification_results = {}
        
        for table_name in sorted(table_names):
            # 테이블 정보 조회
            df = spark.table(f"northwind.{table_name}")
            count = df.count()
            columns = len(df.columns)
            
            print(f"   📋 {table_name}:")
            print(f"      - 레코드 수: {count:,}")
            print(f"      - 컬럼 수: {columns}")
            
            # 샘플 데이터 미리보기
            sample = df.limit(3).toPandas()
            print(f"      - 샘플: {sample.iloc[0].to_dict() if len(sample) > 0 else 'No data'}")
            
            verification_results[table_name] = {
                "record_count": count,
                "column_count": columns,
                "status": "OK" if count > 0 else "EMPTY"
            }
        
        # 관계 검증 (기본적인 JOIN 테스트)
        print(f"\n🔗 관계 검증:")
        
        # 주문-고객 관계
        join_test = spark.sql("""
            SELECT COUNT(*) as order_count 
            FROM northwind.orders o 
            INNER JOIN northwind.customers c ON o.customer_id = c.customer_id
        """).collect()[0].order_count
        
        print(f"   ✅ 주문-고객 JOIN: {join_test}개 매칭")
        
        # 주문상세-상품 관계  
        join_test2 = spark.sql("""
            SELECT COUNT(*) as detail_count
            FROM northwind.order_details od
            INNER JOIN northwind.products p ON od.product_id = p.product_id
        """).collect()[0].detail_count
        
        print(f"   ✅ 주문상세-상품 JOIN: {join_test2}개 매칭")
        
        print(f"\n🎯 검증 완료! Text-to-SQL 테스트 준비됨")
        return verification_results
        
    except Exception as e:
        print(f"❌ 검증 실패: {str(e)}")
        return False

# 테이블 생성 실행
if 'northwind_builder' in locals():
    print("🚀 Northwind 데이터베이스 구축 시작...")
    
    success = create_delta_tables()
    
    if success:
        print("\n" + "="*50)
        verification_results = verify_northwind_database()
        
        if verification_results:
            print("\n✅ Northwind 데이터베이스 구축 성공!")
            print("🔗 다음 노트북에서 LangChain Agent를 활용한 Text-to-SQL을 구현합니다.")
        else:
            print("❌ 데이터베이스 검증 실패")
    else:
        print("❌ 테이블 생성 실패")
else:
    print("❌ 데이터 빌더가 초기화되지 않았습니다.")

✅ RAG 스키마 검색 및 SQL 생성기 정의 완료
🚀 Northwind 데이터베이스 구축 시작...
🔧 Delta Lake 테이블 생성 시작...

📋 categories 테이블 생성 중...
   ✅ categories: 8개 레코드 로드

📋 suppliers 테이블 생성 중...
   ✅ categories: 8개 레코드 로드

📋 suppliers 테이블 생성 중...
   ✅ suppliers: 5개 레코드 로드

📋 products 테이블 생성 중...
   ✅ suppliers: 5개 레코드 로드

📋 products 테이블 생성 중...
   ✅ products: 10개 레코드 로드

📋 customers 테이블 생성 중...
   ✅ products: 10개 레코드 로드

📋 customers 테이블 생성 중...
   ✅ customers: 5개 레코드 로드

📋 employees 테이블 생성 중...
   ✅ customers: 5개 레코드 로드

📋 employees 테이블 생성 중...
   ✅ employees: 5개 레코드 로드

📋 shippers 테이블 생성 중...
   ✅ employees: 5개 레코드 로드

📋 shippers 테이블 생성 중...
   ✅ shippers: 3개 레코드 로드

📋 orders 테이블 생성 중...
   ✅ shippers: 3개 레코드 로드

📋 orders 테이블 생성 중...
   ✅ orders: 100개 레코드 로드

📋 order_details 테이블 생성 중...
   ✅ orders: 100개 레코드 로드

📋 order_details 테이블 생성 중...
   ✅ order_details: 290개 레코드 로드

📈 테이블 생성 결과:
   ✅ 성공: 8개 테이블
   ❌ 실패: 0개 테이블

✅ 생성된 테이블:
   📂 northwind.categories
   📂 northwind.suppliers
   📂 northwind.products
   📂 northwind.c

## 5. 데이터 검증 및 기본 SQL 테스트

In [5]:
# 기본 Text-to-SQL 프롬프트 템플릿
basic_sql_prompt_template = """
당신은 SQL 쿼리 생성 전문가입니다. 주어진 데이터베이스 스키마를 바탕으로 자연어 질문을 정확한 SQL 쿼리로 변환하세요.

{schema_context}

규칙:
1. 정확한 테이블명과 컬럼명을 사용하세요
2. 적절한 JOIN을 사용하세요
3. WHERE 절을 적절히 활용하세요
4. 결과는 SQL 쿼리만 반환하세요 (설명 없이)
5. 쿼리는 ```sql과 ``` 사이에 작성하세요

예시:
질문: "모든 고객의 이름과 이메일을 보여주세요"
답변:
```sql
SELECT name, email FROM customers;
```

질문: "2024년 1월에 주문한 고객들의 총 주문 금액을 계산해주세요"
답변:
```sql
SELECT c.name, SUM(o.amount) as total_amount
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
WHERE o.order_date >= '2024-01-01' AND o.order_date < '2024-02-01'
GROUP BY c.customer_id, c.name;
```

질문: {question}
답변:
"""

print("✅ 기본 Text-to-SQL 프롬프트 템플릿 정의 완료")

class SQLExecutor:
    """안전한 SQL 실행 및 결과 검증"""
    
    def __init__(self, spark_session, max_rows: int = 100):
        self.spark = spark_session
        self.max_rows = max_rows
        self.execution_history = []
    
    def validate_sql_safety(self, sql_query: str) -> Tuple[bool, str]:
        """SQL 쿼리 안전성 검증"""
        sql_upper = sql_query.upper().strip()
        
        # 위험한 키워드 확인
        dangerous_keywords = [
            'DROP', 'DELETE', 'TRUNCATE', 'ALTER', 'CREATE', 
            'INSERT', 'UPDATE', 'MERGE', 'COPY'
        ]
        
        for keyword in dangerous_keywords:
            if keyword in sql_upper:
                return False, f"위험한 키워드 '{keyword}'가 포함되어 있습니다."
        
        # 기본적인 구문 검증
        if not any(keyword in sql_upper for keyword in ['SELECT', 'WITH', 'SHOW', 'DESCRIBE']):
            return False, "허용된 SQL 키워드가 없습니다."
        
        # 괄호 매칭 확인
        if sql_query.count('(') != sql_query.count(')'):
            return False, "괄호가 일치하지 않습니다."
        
        return True, "안전한 쿼리입니다."
    
    def execute_sql(self, sql_query: str, dry_run: bool = False) -> Dict[str, Any]:
        """SQL 쿼리 실행"""
        execution_start = datetime.now()
        
        try:
            # 1. 안전성 검증
            is_safe, safety_message = self.validate_sql_safety(sql_query)
            if not is_safe:
                return {
                    "success": False,
                    "error": f"안전성 검증 실패: {safety_message}",
                    "results": None,
                    "execution_time": 0
                }
            
            # 2. Dry run (쿼리 파싱 검증)
            if dry_run:
                try:
                    self.spark.sql(f"EXPLAIN {sql_query}")
                    return {
                        "success": True,
                        "message": "쿼리 구문이 유효합니다.",
                        "dry_run": True
                    }
                except Exception as e:
                    return {
                        "success": False,
                        "error": f"쿼리 구문 오류: {str(e)}",
                        "dry_run": True
                    }
            
            # 3. 실제 실행
            if not self.spark:
                return {
                    "success": False,
                    "error": "Spark 세션이 없습니다.",
                    "results": None
                }
            
            # LIMIT 추가 (안전성을 위해)
            if "LIMIT" not in sql_query.upper():
                sql_query += f" LIMIT {self.max_rows}"
            
            # 쿼리 실행
            result_df = self.spark.sql(sql_query)
            results = result_df.collect()
            
            # 결과 처리
            execution_time = (datetime.now() - execution_start).total_seconds()
            
            # 결과를 딕셔너리 리스트로 변환
            results_data = [row.asDict() for row in results]
            
            # 실행 기록 저장
            execution_record = {
                "timestamp": execution_start.isoformat(),
                "sql_query": sql_query,
                "success": True,
                "row_count": len(results_data),
                "execution_time": execution_time
            }
            self.execution_history.append(execution_record)
            
            return {
                "success": True,
                "results": results_data,
                "row_count": len(results_data),
                "columns": result_df.columns,
                "execution_time": execution_time,
                "sql_query": sql_query
            }
            
        except Exception as e:
            execution_time = (datetime.now() - execution_start).total_seconds()
            
            # 오류 기록 저장
            error_record = {
                "timestamp": execution_start.isoformat(),
                "sql_query": sql_query,
                "success": False,
                "error": str(e),
                "execution_time": execution_time
            }
            self.execution_history.append(error_record)
            
            return {
                "success": False,
                "error": f"SQL 실행 오류: {str(e)}",
                "results": None,
                "execution_time": execution_time,
                "sql_query": sql_query
            }
    
    def format_results(self, execution_result: Dict[str, Any]) -> str:
        """실행 결과를 사용자 친화적으로 포맷팅"""
        if not execution_result["success"]:
            return f"❌ 실행 실패: {execution_result['error']}"
        
        results = execution_result["results"]
        row_count = execution_result["row_count"]
        execution_time = execution_result["execution_time"]
        
        if not results:
            return f"✅ 쿼리 실행 성공 (결과 없음) - 실행시간: {execution_time:.3f}초"
        
        # 테이블 형태로 포맷팅
        if row_count <= 10:
            # 적은 결과는 전체 표시
            df = pd.DataFrame(results)
            formatted_table = df.to_string(index=False)
        else:
            # 많은 결과는 일부만 표시
            df = pd.DataFrame(results[:10])
            formatted_table = df.to_string(index=False)
            formatted_table += f"\n... 그 외 {row_count - 10}개 행 (총 {row_count}개)"
        
        return f"""✅ 쿼리 실행 성공
📊 결과: {row_count}개 행
⏱️ 실행시간: {execution_time:.3f}초

{formatted_table}"""

    def get_execution_history(self, limit: int = 10) -> List[Dict[str, Any]]:
        """최근 실행 기록 조회"""
        return self.execution_history[-limit:]

# SQL 실행기 초기화
if spark:
    sql_executor = SQLExecutor(spark, max_rows=100)
    print("✅ SQL 실행기 초기화 완료")
else:
    sql_executor = None
    print("⚠️ Spark 세션이 없어 SQL 실행기를 초기화할 수 없습니다.")

# Northwind 데이터베이스 기본 쿼리 테스트
def test_northwind_queries():
    """Northwind 데이터베이스 기본 쿼리 테스트"""
    
    if not spark:
        print("❌ Spark 세션이 필요합니다.")
        return
    
    print("🧪 Northwind 데이터베이스 쿼리 테스트 시작...\n")
    
    test_queries = [
        {
            "name": "전체 고객 수",
            "sql": "SELECT COUNT(*) as customer_count FROM northwind.customers",
            "description": "등록된 고객의 총 개수"
        },
        {
            "name": "상품 카테고리별 개수", 
            "sql": """
                SELECT c.category_name, COUNT(p.product_id) as product_count
                FROM northwind.categories c
                LEFT JOIN northwind.products p ON c.category_id = p.category_id
                GROUP BY c.category_id, c.category_name
                ORDER BY product_count DESC
            """,
            "description": "각 카테고리별 상품 개수"
        },
        {
            "name": "월별 주문 현황",
            "sql": """
                SELECT 
                    YEAR(order_date) as year,
                    MONTH(order_date) as month,
                    COUNT(*) as order_count,
                    ROUND(AVG(freight), 2) as avg_freight
                FROM northwind.orders 
                WHERE order_date IS NOT NULL
                GROUP BY YEAR(order_date), MONTH(order_date)
                ORDER BY year DESC, month DESC
                LIMIT 10
            """,
            "description": "최근 월별 주문 건수 및 평균 배송비"
        },
        {
            "name": "베스트셀러 상품 TOP 5",
            "sql": """
                SELECT 
                    p.product_name,
                    SUM(od.quantity) as total_quantity,
                    ROUND(SUM(od.quantity * od.unit_price * (1 - od.discount)), 2) as total_revenue
                FROM northwind.order_details od
                JOIN northwind.products p ON od.product_id = p.product_id
                GROUP BY p.product_id, p.product_name
                ORDER BY total_quantity DESC
                LIMIT 5
            """,
            "description": "판매량 기준 베스트셀러 상품"
        },
        {
            "name": "고객별 주문 통계",
            "sql": """
                SELECT 
                    c.company_name,
                    COUNT(o.order_id) as order_count,
                    ROUND(SUM(od.quantity * od.unit_price * (1 - od.discount)), 2) as total_spent
                FROM northwind.customers c
                LEFT JOIN northwind.orders o ON c.customer_id = o.customer_id
                LEFT JOIN northwind.order_details od ON o.order_id = od.order_id
                GROUP BY c.customer_id, c.company_name
                HAVING COUNT(o.order_id) > 0
                ORDER BY total_spent DESC
                LIMIT 5
            """,
            "description": "구매 금액 기준 상위 고객"
        }
    ]
    
    # 각 쿼리 실행 및 결과 출력
    for i, query in enumerate(test_queries, 1):
        print(f"🔍 테스트 {i}: {query['name']}")
        print(f"📝 설명: {query['description']}")
        
        try:
            # 쿼리 실행
            result_df = spark.sql(query['sql'])
            result_pandas = result_df.toPandas()
            
            print(f"✅ 실행 성공 ({len(result_pandas)}개 행)")
            
            # 결과 출력 (처음 5개 행만)
            if len(result_pandas) > 0:
                print("📊 결과:")
                display_df = result_pandas.head(5)
                for idx, row in display_df.iterrows():
                    row_str = ", ".join([f"{col}: {val}" for col, val in row.items()])
                    print(f"   {idx+1}. {row_str}")
                
                if len(result_pandas) > 5:
                    print(f"   ... 그 외 {len(result_pandas) - 5}개 행")
            else:
                print("   (결과 없음)")
                
        except Exception as e:
            print(f"❌ 실행 실패: {str(e)}")
        
        print("-" * 50)
    
    print("🎉 기본 쿼리 테스트 완료!")

def analyze_schema_for_langchain():
    """LangChain Agent에서 사용할 스키마 정보 추출"""
    
    if not spark:
        print("❌ Spark 세션이 필요합니다.")
        return None
    
    print("📋 LangChain Agent용 스키마 분석 중...")
    
    schema_info = {}
    
    try:
        # 각 테이블의 스키마 정보 수집
        tables = spark.sql("SHOW TABLES IN northwind").collect()
        
        for table_row in tables:
            table_name = table_row.tableName
            
            # 테이블 스키마 조회
            describe_result = spark.sql(f"DESCRIBE TABLE northwind.{table_name}").collect()
            
            columns = []
            for row in describe_result:
                if row.col_name and not row.col_name.startswith('#'):
                    columns.append({
                        "name": row.col_name,
                        "type": row.data_type,
                        "nullable": row.comment != "NOT NULL" if row.comment else True
                    })
            
            # 샘플 데이터 몇 개 가져오기
            sample_df = spark.table(f"northwind.{table_name}").limit(3).toPandas()
            sample_data = sample_df.to_dict('records') if len(sample_df) > 0 else []
            
            # 테이블 통계
            row_count = spark.table(f"northwind.{table_name}").count()
            
            schema_info[table_name] = {
                "table_name": table_name,
                "full_name": f"northwind.{table_name}",
                "columns": columns,
                "sample_data": sample_data,
                "row_count": row_count,
                "description": get_table_description(table_name)
            }
        
        print(f"✅ 스키마 분석 완료: {len(schema_info)}개 테이블")
        
        # LangChain Agent용 요약 정보 생성
        schema_summary = {
            "database_name": "northwind",
            "tables": list(schema_info.keys()),
            "total_tables": len(schema_info),
            "relationships": get_table_relationships(),
            "schema_details": schema_info
        }
        
        return schema_summary
        
    except Exception as e:
        print(f"❌ 스키마 분석 실패: {str(e)}")
        return None

def get_table_description(table_name):
    """테이블별 설명 반환"""
    descriptions = {
        "categories": "상품 카테고리 정보 (음료, 조미료, 과자류 등)",
        "suppliers": "공급업체 정보 및 연락처",
        "products": "상품 정보 (이름, 가격, 재고, 카테고리, 공급업체)",
        "customers": "고객 정보 및 연락처",
        "employees": "직원 정보 및 조직 구조",
        "shippers": "배송업체 정보",
        "orders": "주문 정보 (고객, 직원, 날짜, 배송)",
        "order_details": "주문 상세 정보 (상품, 수량, 가격, 할인)"
    }
    return descriptions.get(table_name, "")

def get_table_relationships():
    """테이블 간 관계 정보 반환"""
    return {
        "orders_customers": "orders.customer_id -> customers.customer_id",
        "orders_employees": "orders.employee_id -> employees.employee_id", 
        "orders_shippers": "orders.ship_via -> shippers.shipper_id",
        "order_details_orders": "order_details.order_id -> orders.order_id",
        "order_details_products": "order_details.product_id -> products.product_id",
        "products_categories": "products.category_id -> categories.category_id",
        "products_suppliers": "products.supplier_id -> suppliers.supplier_id",
        "employees_manager": "employees.reports_to -> employees.employee_id"
    }

# 테스트 실행
try:
    test_northwind_queries()
    print("\n" + "="*60)
    schema_info = analyze_schema_for_langchain()
    
    if schema_info:
        print(f"\n📋 LangChain Agent용 스키마 요약:")
        print(f"   데이터베이스: {schema_info['database_name']}")
        print(f"   테이블 수: {schema_info['total_tables']}")
        print(f"   관계 수: {len(schema_info['relationships'])}")
        
        print(f"\n🎯 다음 단계: LangChain Agent 구현")
        print(f"   노트북: 02_langchain_agent_text_to_sql.ipynb")
        
        # 스키마 정보를 다음 노트북에서 사용할 수 있도록 저장
        globals()['northwind_schema'] = schema_info
        
except Exception as e:
    print(f"❌ 테스트 실행 실패: {str(e)}")

print(f"\n✅ 01단계 완료: Northwind 데이터베이스 구축 및 검증")

# 데이터 검증 및 기본 SQL 테스트

print("🔍 Northwind 데이터베이스 검증 및 테스트")
print("=" * 45)

# 1. 데이터베이스 및 테이블 확인
print("\n📋 데이터베이스 현황:")
databases = spark.sql("SHOW DATABASES").collect()
db_names = [row.databaseName for row in databases]
print(f"   📂 전체 데이터베이스: {', '.join(db_names)}")

if "northwind" in db_names:
    print("   ✅ northwind 데이터베이스 확인됨")
    
    # 테이블 목록 조회
    tables = spark.sql("SHOW TABLES IN northwind").collect()
    table_names = [row.tableName for row in tables]
    print(f"   📊 northwind 테이블 수: {len(table_names)}개")
    
    # 각 테이블의 레코드 수 확인
    print(f"\n📈 테이블별 데이터 현황:")
    total_records = 0
    
    for table_name in sorted(table_names):
        try:
            count = spark.table(f"northwind.{table_name}").count()
            total_records += count
            print(f"   📂 {table_name}: {count:,}개 레코드")
        except Exception as e:
            print(f"   ❌ {table_name}: 조회 실패 ({e})")
    
    print(f"\n📊 전체 레코드 수: {total_records:,}개")
    
else:
    print("   ❌ northwind 데이터베이스를 찾을 수 없습니다!")

# 2. 기본 SQL 쿼리 테스트
print(f"\n🧪 기본 SQL 쿼리 테스트:")

test_queries = [
    {
        "name": "고객 수 조회",
        "sql": "SELECT COUNT(*) as customer_count FROM northwind.customers",
        "description": "전체 고객 수 확인"
    },
    {
        "name": "상품 카테고리별 개수",
        "sql": """
        SELECT c.categoryname, COUNT(p.productid) as product_count
        FROM northwind.categories c
        LEFT JOIN northwind.products p ON c.categoryid = p.categoryid
        GROUP BY c.categoryname
        ORDER BY product_count DESC
        """,
        "description": "카테고리별 상품 수 집계"
    },
    {
        "name": "가격이 가장 비싼 상품 TOP 3",
        "sql": """
        SELECT productname, unitprice, categoryid
        FROM northwind.products
        WHERE unitprice IS NOT NULL
        ORDER BY unitprice DESC
        LIMIT 3
        """,
        "description": "최고가 상품 조회"
    },
    {
        "name": "국가별 고객 분포",
        "sql": """
        SELECT country, COUNT(*) as customer_count
        FROM northwind.customers
        GROUP BY country
        ORDER BY customer_count DESC
        LIMIT 5
        """,
        "description": "주요 국가별 고객 수"
    }
]

successful_tests = 0
failed_tests = 0

for i, test in enumerate(test_queries, 1):
    try:
        print(f"\n   🧪 테스트 {i}: {test['name']}")
        print(f"      📝 {test['description']}")
        
        result_df = spark.sql(test['sql'])
        results = result_df.collect()
        
        if results:
            print(f"      ✅ 성공 ({len(results)}개 결과)")
            
            # 결과 샘플 표시 (처음 3개)
            for j, row in enumerate(results[:3]):
                row_data = [f"{col}={row[col]}" for col in result_df.columns]
                print(f"         {j+1}. {', '.join(row_data)}")
            
            if len(results) > 3:
                print(f"         ... 그 외 {len(results)-3}개 결과")
        else:
            print(f"      ⚠️ 결과 없음")
            
        successful_tests += 1
        
    except Exception as e:
        print(f"      ❌ 실패: {str(e)}")
        failed_tests += 1

print(f"\n📊 SQL 테스트 결과:")
print(f"   ✅ 성공: {successful_tests}개")
print(f"   ❌ 실패: {failed_tests}개")

if successful_tests == len(test_queries):
    print(f"\n🎉 모든 SQL 테스트가 성공했습니다!")
    print(f"✅ Northwind 데이터베이스가 정상적으로 구축되었습니다!")
else:
    print(f"\n⚠️ 일부 테스트가 실패했습니다. 데이터를 확인해주세요.")

✅ 기본 Text-to-SQL 프롬프트 템플릿 정의 완료
✅ SQL 실행기 초기화 완료
🧪 Northwind 데이터베이스 쿼리 테스트 시작...

🔍 테스트 1: 전체 고객 수
📝 설명: 등록된 고객의 총 개수
✅ 실행 성공 (1개 행)
📊 결과:
   1. customer_count: 5
--------------------------------------------------
🔍 테스트 2: 상품 카테고리별 개수
📝 설명: 각 카테고리별 상품 개수
✅ 실행 성공 (1개 행)
📊 결과:
   1. customer_count: 5
--------------------------------------------------
🔍 테스트 2: 상품 카테고리별 개수
📝 설명: 각 카테고리별 상품 개수
✅ 실행 성공 (8개 행)
📊 결과:
   1. category_name: 육류/가금류, product_count: 3
   2. category_name: 곡물/시리얼, product_count: 2
   3. category_name: 조미료, product_count: 2
   4. category_name: 과자류, product_count: 1
   5. category_name: 해산물, product_count: 1
   ... 그 외 3개 행
--------------------------------------------------
🔍 테스트 3: 월별 주문 현황
📝 설명: 최근 월별 주문 건수 및 평균 배송비
✅ 실행 성공 (8개 행)
📊 결과:
   1. category_name: 육류/가금류, product_count: 3
   2. category_name: 곡물/시리얼, product_count: 2
   3. category_name: 조미료, product_count: 2
   4. category_name: 과자류, product_count: 1
   5. category_name: 해산물, product_count: 1
   ... 그 외 3

## 🔗 다음 단계: LangChain Agent 구현

이제 Northwind 데이터베이스가 성공적으로 구축되었습니다! 

### 📝 완료된 작업
✅ **Databricks 환경 설정** - Spark 세션 및 northwind 데이터베이스 생성  
✅ **Northwind 샘플 데이터** - 8개 테이블, 실제 비즈니스 시나리오 반영  
✅ **Delta Lake 테이블** - 고성능 분석을 위한 최적화된 저장소  
✅ **스키마 분석** - LangChain Agent에서 활용할 메타데이터 추출  
✅ **기본 쿼리 테스트** - 데이터 무결성 및 관계 검증 완료  

### 🚀 다음 노트북에서 구현할 내용

**`02_langchain_agent_text_to_sql.ipynb`**
- 🤖 **LangChain Agent 아키텍처** 설계
- 🔧 **Function Tools** 구현 (스키마 조회, SQL 실행, 결과 검증)
- 🧠 **추론 모델 연동** (Databricks Foundation Models)
- 💬 **자연어 → SQL 변환** 파이프라인
- 🔍 **동적 스키마 검색** 및 컨텍스트 구성
- 🛡️ **안전성 검증** 및 오류 처리

## 6. 스키마 정보 추출 및 LangChain Agent 연동 준비

In [6]:
# 한국어 특화 고급 프롬프트 템플릿
advanced_korean_sql_prompt_template = """
당신은 한국어를 이해하는 SQL 전문가입니다. 주어진 데이터베이스 스키마를 바탕으로 한국어 질문을 정확한 SQL 쿼리로 변환하세요.

{schema_context}

변환 규칙:
1. 테이블명과 컬럼명을 정확히 사용하세요
2. 한국어 숫자 표현을 숫자로 변환하세요 (예: "열 개" → 10)
3. 날짜 표현을 SQL 형식으로 변환하세요 (예: "작년" → 해당 연도)
4. 집계 함수를 적절히 사용하세요
5. 결과는 SQL 쿼리만 반환하고 ```sql 블록으로 감싸세요

한국어 질문 예시:

질문: "고객 수를 세어주세요"
```sql
SELECT COUNT(*) as customer_count FROM customers;
```

질문: "가장 많이 주문한 고객 다섯 명을 찾아주세요"
```sql
SELECT c.name, COUNT(o.order_id) as order_count
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.name
ORDER BY order_count DESC
LIMIT 5;
```

질문: "월별 매출을 보여주세요"
```sql
SELECT 
    DATE_FORMAT(order_date, '%Y-%m') as month,
    SUM(amount) as monthly_revenue
FROM orders
GROUP BY DATE_FORMAT(order_date, '%Y-%m')
ORDER BY month;
```

질문: "{question}"
답변:
"""

print("✅ 한국어 특화 고급 프롬프트 템플릿 정의 완료")

# 간단한 환경 클래스 정의 (LangChain Agent 노트북에서 더 자세히 구현됨)
class DatabricksEnvironment:
    """기본 Databricks 환경 클래스"""
    
    def __init__(self, spark_session):
        self.spark = spark_session
        self.database_name = "northwind"

class TextToSQLRAGApp:
    """통합 Text-to-SQL RAG 애플리케이션"""
    
    def __init__(self, databricks_env: DatabricksEnvironment):
        self.env = databricks_env
        self.schema_analyzer = None
        self.schema_retriever = None
        self.sql_generator = None
        self.sql_executor = None
        self.vectorized_schemas = []
        
        # 컴포넌트 초기화
        self._initialize_components()
    
    def _initialize_components(self):
        """애플리케이션 컴포넌트 초기화"""
        try:
            # 스키마 분석기
            if self.env.spark:
                self.schema_analyzer = DatabaseSchemaAnalyzer(
                    self.env.spark, 
                    self.env.embedding_model
                )
                print("✅ 스키마 분석기 초기화")
            
            # SQL 실행기
            if self.env.spark:
                self.sql_executor = SQLExecutor(self.env.spark)
                print("✅ SQL 실행기 초기화")
                
        except Exception as e:
            print(f"⚠️ 컴포넌트 초기화 중 오류: {str(e)}")
    
    def setup_database(self, database_name: str = "default", limit_tables: int = 10):
        """데이터베이스 설정 및 스키마 분석"""
        if not self.schema_analyzer:
            return False, "스키마 분석기가 없습니다."
        
        try:
            print(f"🔧 데이터베이스 '{database_name}' 설정 중...")
            
            # 1. 스키마 분석
            schemas = self.schema_analyzer.analyze_database_schema(database_name, limit_tables)
            
            if not schemas:
                return False, f"데이터베이스 '{database_name}'에서 스키마를 찾을 수 없습니다."
            
            # 2. 스키마 벡터화
            self.vectorized_schemas = self.schema_analyzer.vectorize_schemas(schemas)
            
            # 3. RAG 검색기 초기화
            self.schema_retriever = RAGSchemaRetriever(
                self.vectorized_schemas, 
                self.env.embedding_model
            )
            
            # 4. SQL 생성기 초기화
            if self.env.foundation_model:
                self.sql_generator = TextToSQLGenerator(
                    self.env.foundation_model,
                    self.schema_retriever
                )
            
            print(f"✅ 데이터베이스 설정 완료: {len(schemas)}개 테이블")
            return True, f"{len(schemas)}개 테이블 설정 완료"
            
        except Exception as e:
            return False, f"데이터베이스 설정 실패: {str(e)}"
    
    def ask_question(self, question: str, execute_sql: bool = True, dry_run: bool = False) -> Dict[str, Any]:
        """자연어 질문을 SQL로 변환하고 실행"""
        if not self.sql_generator:
            return {
                "success": False,
                "error": "SQL 생성기가 초기화되지 않았습니다. setup_database()를 먼저 실행하세요.",
                "question": question
            }
        
        try:
            print(f"🤔 질문 분석 중: '{question}'")
            
            # 1. SQL 생성
            sql_result = self.sql_generator.generate_sql(question)
            
            if not sql_result["success"]:
                return {
                    "success": False,
                    "error": sql_result["error"],
                    "question": question
                }
            
            sql_query = sql_result["sql_query"]
            relevant_schemas = sql_result["relevant_schemas"]
            
            print(f"🔍 관련 테이블 {len(relevant_schemas)}개 발견")
            print(f"⚡ SQL 생성 완료")
            
            result = {
                "success": True,
                "question": question,
                "sql_query": sql_query,
                "relevant_schemas": [
                    {
                        "table": schema["full_name"],
                        "score": schema.get("similarity_score", 0)
                    }
                    for schema in relevant_schemas
                ],
                "execution_result": None
            }
            
            # 2. SQL 실행 (옵션)
            if execute_sql and self.sql_executor:
                print("🚀 SQL 실행 중...")
                execution_result = self.sql_executor.execute_sql(sql_query, dry_run=dry_run)
                result["execution_result"] = execution_result
                
                if execution_result["success"]:
                    print("✅ SQL 실행 성공")
                else:
                    print(f"❌ SQL 실행 실패: {execution_result['error']}")
            
            return result
            
        except Exception as e:
            return {
                "success": False,
                "error": f"질문 처리 실패: {str(e)}",
                "question": question
            }
    
    def get_database_summary(self) -> Dict[str, Any]:
        """현재 설정된 데이터베이스 요약 정보"""
        if not self.vectorized_schemas:
            return {"message": "설정된 데이터베이스가 없습니다."}
        
        summary = {
            "total_tables": len(self.vectorized_schemas),
            "tables": []
        }
        
        for schema in self.vectorized_schemas:
            table_info = {
                "name": schema["full_name"],
                "row_count": schema.get("row_count", 0),
                "column_count": len(schema["columns"]),
                "has_embedding": "embedding" in schema
            }
            summary["tables"].append(table_info)
        
        return summary
    
    def interactive_session(self):
        """대화형 세션 시작"""
        print("🎯 Text-to-SQL RAG 대화형 세션 시작")
        print("   종료하려면 'quit' 또는 'exit'를 입력하세요.")
        print("   도움말은 'help'를 입력하세요.")
        
        while True:
            try:
                question = input("\n💬 질문을 입력하세요: ").strip()
                
                if question.lower() in ['quit', 'exit', '종료']:
                    print("👋 세션을 종료합니다.")
                    break
                
                if question.lower() in ['help', '도움말']:
                    self._show_help()
                    continue
                
                if question.lower() in ['summary', '요약']:
                    summary = self.get_database_summary()
                    print(f"\n📊 데이터베이스 요약: {summary['total_tables']}개 테이블")
                    for table in summary['tables'][:5]:
                        print(f"   📋 {table['name']}: {table['row_count']:,}행, {table['column_count']}컬럼")
                    continue
                
                if not question:
                    continue
                
                # 질문 처리
                result = self.ask_question(question)
                self._display_result(result)
                
            except KeyboardInterrupt:
                print("\n👋 세션을 종료합니다.")
                break
            except Exception as e:
                print(f"❌ 오류 발생: {str(e)}")
    
    def _show_help(self):
        """도움말 표시"""
        help_text = """
🔧 사용 가능한 명령어:
- help, 도움말: 이 도움말 표시
- summary, 요약: 데이터베이스 요약 정보
- quit, exit, 종료: 세션 종료

💡 질문 예시:
- "고객 수를 알려주세요"
- "월별 매출을 보여주세요"
- "가장 많이 팔린 상품 10개는?"
- "최근 일주일 주문 현황은?"
"""
        print(help_text)
    
    def _display_result(self, result: Dict[str, Any]):
        """결과 표시"""
        if not result["success"]:
            print(f"\n❌ 오류: {result['error']}")
            return
        
        print(f"\n🔍 SQL 쿼리:")
        print(f"```sql\n{result['sql_query']}\n```")
        
        if result["relevant_schemas"]:
            print(f"\n📋 사용된 테이블:")
            for schema in result["relevant_schemas"]:
                score = schema["score"]
                print(f"   • {schema['table']} (매칭점수: {score:.3f})")
        
        if result["execution_result"]:
            execution = result["execution_result"]
            if execution["success"]:
                print(f"\n✅ 실행 결과 ({execution['row_count']}개 행):")
                if execution["results"]:
                    df = pd.DataFrame(execution["results"][:10])  # 처음 10개만
                    print(df.to_string(index=False))
                    if execution["row_count"] > 10:
                        print(f"... 그 외 {execution['row_count'] - 10}개 행")
            else:
                print(f"\n❌ 실행 실패: {execution['error']}")

print("✅ Text-to-SQL RAG 애플리케이션 클래스 정의 완료")

# 📋 01단계 완료 요약 및 다음 단계 준비

print("🎉 Northwind 데이터베이스 구축 완료!")
print("\n" + "="*60)

# 현재 상태 확인
if spark and 'northwind_schema' in globals():
    
    print("✅ 구축 완료 상태:")
    schema = northwind_schema
    
    print(f"   📊 데이터베이스: {schema['database_name']}")
    print(f"   📋 테이블 수: {schema['total_tables']}개")
    
    # 각 테이블 상태
    print(f"\n📋 테이블별 데이터 현황:")
    for table_name, info in schema['schema_details'].items():
        print(f"   📄 {table_name}: {info['row_count']:,}개 레코드, {len(info['columns'])}개 컬럼")
    
    # 총 데이터 규모
    import builtins
    total_records = builtins.sum(info['row_count'] for info in schema['schema_details'].values())
    print(f"\n📈 총 데이터: {total_records:,}개 레코드")
    
    # 관계 확인
    print(f"🔗 테이블 관계: {len(schema['relationships'])}개")
    for rel_name, rel_desc in list(schema['relationships'].items())[:3]:
        print(f"   • {rel_desc}")
    if len(schema['relationships']) > 3:
        print(f"   ... 그 외 {len(schema['relationships']) - 3}개")
    
    print(f"\n🎯 Text-to-SQL 테스트 준비 완료!")
    
    # 간단한 테스트 질문들
    sample_questions = [
        "전체 고객 수는 몇 명인가요?",
        "가장 많이 팔린 상품 5개를 보여주세요",
        "월별 주문 현황을 알려주세요", 
        "고객별 총 구매 금액을 계산해주세요",
        "카테고리별 상품 개수는?",
        "직원별 담당 주문 건수는?",
        "배송비가 가장 비싼 주문들을 찾아주세요",
        "할인이 적용된 주문 상세를 보여주세요"
    ]
    
    print(f"\n💬 테스트할 수 있는 자연어 질문 예시:")
    for i, question in enumerate(sample_questions, 1):
        print(f"   {i}. {question}")
    
    print(f"\n🔧 다음 노트북 실행 방법:")
    print(f"   1. 새 노트북 생성: 02_langchain_agent_text_to_sql.ipynb")
    print(f"   2. 이 노트북의 변수들 활용:")
    print(f"      - spark: Spark 세션")
    print(f"      - northwind_schema: 스키마 메타데이터")
    print(f"   3. LangChain Agent 구현 시작")

else:
    print("❌ 설정이 완료되지 않았습니다.")
    print("   위의 모든 셀을 순서대로 실행해주세요.")

# 현재 노트북에서 바로 사용할 수 있는 도구들
print(f"\n🛠️ 현재 노트북에서 사용 가능한 함수들:")
available_functions = [
    "test_northwind_queries() - 기본 쿼리 테스트 재실행",
    "verify_northwind_database() - 데이터베이스 상태 재확인", 
    "analyze_schema_for_langchain() - 스키마 정보 재분석",
    "spark.sql('YOUR_SQL_QUERY') - 직접 SQL 실행"
]

for func in available_functions:
    print(f"   • {func}")

print(f"\n🎯 목표 달성!")
print(f"   ✅ Databricks에 실제 사용 가능한 샘플 데이터베이스 구축")
print(f"   ✅ LangChain Agent 구현을 위한 기반 환경 완료")
print(f"   ✅ Text-to-SQL 시스템의 첫 번째 단계 성공")

print(f"\n🚀 다음: LangChain Agent 기반 Text-to-SQL 구현!")
print(f"   노트북: 02_langchain_agent_text_to_sql.ipynb")

# 스키마 정보 추출 및 LangChain Agent 연동 준비

print("🔗 LangChain Agent 연동을 위한 스키마 분석")
print("=" * 45)

# 1. 상세 스키마 정보 추출
def extract_table_schema(table_name):
    """테이블의 상세 스키마 정보 추출"""
    try:
        # 컬럼 정보 조회
        describe_result = spark.sql(f"DESCRIBE TABLE northwind.{table_name}").collect()
        
        columns = []
        for row in describe_result:
            if row.col_name and not row.col_name.startswith('#'):
                columns.append({
                    "name": row.col_name,
                    "type": row.data_type,
                    "nullable": True  # Delta Lake는 기본적으로 nullable
                })
        
        # 샘플 데이터 조회
        sample_df = spark.table(f"northwind.{table_name}").limit(3)
        sample_data = [row.asDict() for row in sample_df.collect()]
        
        # 테이블 통계
        total_count = spark.table(f"northwind.{table_name}").count()
        
        return {
            "table_name": table_name,
            "columns": columns,
            "sample_data": sample_data,
            "total_records": total_count,
            "status": "success"
        }
        
    except Exception as e:
        return {
            "table_name": table_name,
            "error": str(e),
            "status": "failed"
        }

# 2. 모든 테이블의 스키마 추출
print("\n📊 테이블 스키마 추출 중...")
northwind_schema_info = {}

tables = spark.sql("SHOW TABLES IN northwind").collect()
table_names = [row.tableName for row in tables]

for table_name in sorted(table_names):
    print(f"   🔍 {table_name} 분석 중...")
    schema_info = extract_table_schema(table_name)
    northwind_schema_info[table_name] = schema_info
    
    if schema_info["status"] == "success":
        col_count = len(schema_info["columns"])
        record_count = schema_info["total_records"]
        print(f"      ✅ {col_count}개 컬럼, {record_count:,}개 레코드")
    else:
        print(f"      ❌ 실패: {schema_info['error']}")

# 3. 테이블 간 관계 정의 (Northwind 표준 관계)
table_relationships = {
    "customers": {
        "primary_key": "customerid",
        "related_tables": ["orders"],
        "description": "고객 정보 (주문과 1:N 관계)"
    },
    "suppliers": {
        "primary_key": "supplierid", 
        "related_tables": ["products"],
        "description": "공급업체 정보 (상품과 1:N 관계)"
    },
    "categories": {
        "primary_key": "categoryid",
        "related_tables": ["products"],
        "description": "상품 카테고리 (상품과 1:N 관계)"
    },
    "products": {
        "primary_key": "productid",
        "foreign_keys": ["supplierid", "categoryid"],
        "related_tables": ["order_details", "suppliers", "categories"],
        "description": "상품 정보 (주문상세와 1:N, 공급업체/카테고리와 N:1 관계)"
    },
    "employees": {
        "primary_key": "employeeid",
        "related_tables": ["orders"],
        "description": "직원 정보 (주문과 1:N 관계)"
    },
    "shippers": {
        "primary_key": "shipperid",
        "related_tables": ["orders"],
        "description": "운송업체 정보 (주문과 1:N 관계)"
    },
    "orders": {
        "primary_key": "orderid",
        "foreign_keys": ["customerid", "employeeid", "shipvia"],
        "related_tables": ["order_details", "customers", "employees", "shippers"],
        "description": "주문 정보 (주문상세와 1:N, 고객/직원/운송업체와 N:1 관계)"
    },
    "order_details": {
        "primary_key": ["orderid", "productid"],
        "foreign_keys": ["orderid", "productid"],
        "related_tables": ["orders", "products"],
        "description": "주문 상세 정보 (주문/상품과 N:1 관계)"
    }
}

# 4. Agent용 스키마 요약 생성
agent_schema_summary = {
    "database_name": "northwind",
    "description": "Northwind 샘플 무역 회사 데이터베이스 - 고객, 주문, 상품, 직원 등의 비즈니스 데이터",
    "total_tables": len(table_names),
    "tables": {},
    "relationships": table_relationships,
    "common_queries": [
        "고객별 주문 통계",
        "상품별 매출 분석", 
        "직원별 성과 분석",
        "카테고리별 상품 현황",
        "국가별 고객 분포",
        "월별 매출 추이"
    ]
}

# 각 테이블 정보를 Agent용으로 요약
for table_name, schema_info in northwind_schema_info.items():
    if schema_info["status"] == "success":
        agent_schema_summary["tables"][table_name] = {
            "description": table_relationships.get(table_name, {}).get("description", f"{table_name} 테이블"),
            "columns": [f"{col['name']} ({col['type']})" for col in schema_info["columns"]],
            "primary_key": table_relationships.get(table_name, {}).get("primary_key"),
            "foreign_keys": table_relationships.get(table_name, {}).get("foreign_keys", []),
            "record_count": schema_info["total_records"],
            "sample_data": schema_info["sample_data"][:1]  # 첫 번째 레코드만
        }

print(f"\n📋 LangChain Agent용 스키마 요약:")
print(f"   📂 데이터베이스: {agent_schema_summary['database_name']}")
print(f"   📊 테이블 수: {agent_schema_summary['total_tables']}개")
print(f"   🔗 관계 정의: {len(table_relationships)}개 테이블")

total_columns = builtins.sum(len(info.get('columns', [])) for info in agent_schema_summary['tables'].values())
total_records = builtins.sum(info.get('record_count', 0) for info in agent_schema_summary['tables'].values())

print(f"   📈 전체 컬럼 수: {total_columns}개")
print(f"   📊 전체 레코드 수: {total_records:,}개")

# 5. 연동 준비 완료 상태 확인
print(f"\n✅ LangChain Agent 연동 준비 완료!")
print(f"📋 다음 노트북에서 사용할 수 있는 데이터:")
print(f"   🗄️ northwind_schema_info: 상세 스키마 정보")
print(f"   📊 agent_schema_summary: Agent용 요약 정보")
print(f"   🔗 table_relationships: 테이블 관계 정보")

print(f"\n🎯 다음 단계:")
print(f"   1. 02_langchain_agent_text_to_sql.ipynb 노트북 열기")
print(f"   2. 위 변수들을 활용하여 LangChain Agent 구현")
print(f"   3. Text-to-SQL 기능 테스트 및 데모")

print(f"\n🎉 Northwind 데이터베이스 구축 및 Agent 연동 준비 완료!")

✅ 한국어 특화 고급 프롬프트 템플릿 정의 완료
✅ Text-to-SQL RAG 애플리케이션 클래스 정의 완료
🎉 Northwind 데이터베이스 구축 완료!

✅ 구축 완료 상태:
   📊 데이터베이스: northwind
   📋 테이블 수: 8개

📋 테이블별 데이터 현황:
   📄 categories: 8개 레코드, 3개 컬럼
   📄 customers: 5개 레코드, 8개 컬럼
   📄 employees: 5개 레코드, 9개 컬럼
   📄 order_details: 290개 레코드, 5개 컬럼
   📄 orders: 100개 레코드, 12개 컬럼
   📄 products: 10개 레코드, 8개 컬럼
   📄 shippers: 3개 레코드, 3개 컬럼
   📄 suppliers: 5개 레코드, 8개 컬럼

📈 총 데이터: 426개 레코드
🔗 테이블 관계: 8개
   • orders.customer_id -> customers.customer_id
   • orders.employee_id -> employees.employee_id
   • orders.ship_via -> shippers.shipper_id
   ... 그 외 5개

🎯 Text-to-SQL 테스트 준비 완료!

💬 테스트할 수 있는 자연어 질문 예시:
   1. 전체 고객 수는 몇 명인가요?
   2. 가장 많이 팔린 상품 5개를 보여주세요
   3. 월별 주문 현황을 알려주세요
   4. 고객별 총 구매 금액을 계산해주세요
   5. 카테고리별 상품 개수는?
   6. 직원별 담당 주문 건수는?
   7. 배송비가 가장 비싼 주문들을 찾아주세요
   8. 할인이 적용된 주문 상세를 보여주세요

🔧 다음 노트북 실행 방법:
   1. 새 노트북 생성: 02_langchain_agent_text_to_sql.ipynb
   2. 이 노트북의 변수들 활용:
      - spark: Spark 세션
      - northwind_schema: 스키마 메타데이터
   3. LangChain