# GDELT Silver Table 분석 노트북

이 노트북에서는 Silver 버킷의 Delta Table에서 정제된 GDELT 데이터를 분석합니다.

**데이터 플로우:**
```
Raw Producer → Kafka → Spark Processor → Silver Delta Table → Jupyter (여기!)
```

In [None]:
import sys
sys.path.append('/app')

from src.utils.spark_builder import get_spark_session
from pyspark.sql import functions as F
from pyspark.sql.types import *
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# 플롯 설정
plt.rcParams['figure.figsize'] = (12, 8)
sns.set_style("whitegrid")

print("📚 라이브러리 로드 완료")

In [None]:
# Spark 세션 생성
spark = get_spark_session("GDELT_Silver_Analysis", "spark://spark-master:7077")
print("✅ Spark 세션 생성 완료")
print(f"🔗 Spark UI: http://localhost:8081")

In [None]:
# Silver Delta Table 읽기
silver_path = "s3a://silver/gdelt_events"

try:
    # Delta Table 로드
    df = spark.read.format("delta").load(silver_path)
    
    total_records = df.count()
    total_columns = len(df.columns)
    
    print(f"🎉 Silver Table 로드 성공!")
    print(f"📊 총 레코드 수: {total_records:,}")
    print(f"📋 총 컬럼 수: {total_columns}")
    print(f"📍 테이블 위치: {silver_path}")
    
except Exception as e:
    print(f"❌ Silver Table 로드 실패: {e}")
    print("먼저 다음 단계를 실행하세요:")
    print("1. Raw Producer 실행")
    print("2. Silver Processor 실행")

In [None]:
# 스키마 확인
if 'df' in locals():
    print("📝 Silver Table 스키마:")
    df.printSchema()
    
    print("\n📋 컬럼 목록:")
    for i, col in enumerate(df.columns, 1):
        print(f"{i:2d}. {col}")

In [None]:
# 샘플 데이터 확인
if 'df' in locals() and df.count() > 0:
    print("🔍 샘플 데이터 (상위 10개 레코드):")
    
    # 주요 컬럼만 선택해서 보기
    key_columns = [
        'global_event_id', 'day', 'year',
        'actor1_name', 'actor1_country_code',
        'actor2_name', 'actor2_country_code', 
        'event_root_code', 'avg_tone',
        'action_geo_country_code'
    ]
    
    sample_df = df.select(*key_columns).limit(10)
    sample_df.show(truncate=False)
    
    print("\n📈 기본 통계:")
    df.select(
        F.count("*").alias("총_레코드"),
        F.countDistinct("global_event_id").alias("고유_이벤트"),
        F.countDistinct("actor1_country_code").alias("고유_국가수"),
        F.min("day").alias("최초_날짜"),
        F.max("day").alias("최신_날짜")
    ).show()

In [None]:
# 국가별 이벤트 분석
if 'df' in locals() and df.count() > 0:
    print("🌍 국가별 이벤트 분석:")
    
    # 주체 국가별 이벤트 수 (상위 20개)
    country_events = df.filter(F.col("actor1_country_code").isNotNull()) \
        .groupBy("actor1_country_code") \
        .agg(
            F.count("*").alias("이벤트_수"),
            F.avg("avg_tone").alias("평균_톤"),
            F.countDistinct("event_root_code").alias("이벤트_유형수")
        ) \
        .orderBy(F.desc("이벤트_수")) \
        .limit(20)
    
    print("\n📊 주체 국가별 이벤트 순위 (Top 20):")
    country_events.show()
    
    # Pandas로 변환하여 시각화
    country_pd = country_events.limit(10).toPandas()
    
    if not country_pd.empty:
        plt.figure(figsize=(12, 6))
        plt.subplot(1, 2, 1)
        plt.bar(country_pd['actor1_country_code'], country_pd['이벤트_수'])
        plt.title('국가별 이벤트 수 (Top 10)')
        plt.xlabel('국가 코드')
        plt.ylabel('이벤트 수')
        plt.xticks(rotation=45)
        
        plt.subplot(1, 2, 2)
        plt.bar(country_pd['actor1_country_code'], country_pd['평균_톤'])
        plt.title('국가별 평균 톤 (Top 10)')
        plt.xlabel('국가 코드')
        plt.ylabel('평균 톤')
        plt.xticks(rotation=45)
        plt.axhline(y=0, color='r', linestyle='--', alpha=0.5)
        
        plt.tight_layout()
        plt.show()

In [None]:
# 이벤트 타입 분석
if 'df' in locals() and df.count() > 0:
    print("📊 이벤트 타입 분석:")
    
    # 이벤트 루트 코드별 분석
    event_analysis = df.filter(F.col("event_root_code").isNotNull()) \
        .groupBy("event_root_code") \
        .agg(
            F.count("*").alias("이벤트_수"),
            F.avg("avg_tone").alias("평균_톤"),
            F.avg("goldstein_scale").alias("평균_골드스타인")
        ) \
        .orderBy(F.desc("이벤트_수")) \
        .limit(15)
    
    print("\n📈 이벤트 루트 코드별 분석 (Top 15):")
    event_analysis.show()
    
    # QuadClass 분석
    quad_analysis = df.filter(F.col("quad_class").isNotNull()) \
        .groupBy("quad_class") \
        .agg(
            F.count("*").alias("이벤트_수"),
            F.avg("avg_tone").alias("평균_톤")
        ) \
        .orderBy("quad_class")
    
    print("\n🔢 QuadClass별 분석:")
    print("QuadClass: 1=Verbal Cooperation, 2=Material Cooperation, 3=Verbal Conflict, 4=Material Conflict")
    quad_analysis.show()

In [None]:
# 지리적 분석
if 'df' in locals() and df.count() > 0:
    print("🗺️ 지리적 분석:")
    
    # 사건 발생 지역 분석
    geo_analysis = df.filter(
        F.col("action_geo_country_code").isNotNull() &
        F.col("action_geo_lat").isNotNull() &
        F.col("action_geo_long").isNotNull()
    ).groupBy("action_geo_country_code") \
        .agg(
            F.count("*").alias("이벤트_수"),
            F.avg("avg_tone").alias("평균_톤"),
            F.avg("action_geo_lat").alias("평균_위도"),
            F.avg("action_geo_long").alias("평균_경도")
        ) \
        .orderBy(F.desc("이벤트_수")) \
        .limit(15)
    
    print("\n🌐 사건 발생 지역별 분석 (Top 15):")
    geo_analysis.show()
    
    # 지리 정보가 있는 이벤트 비율
    total_events = df.count()
    geo_events = df.filter(
        F.col("action_geo_lat").isNotNull() & 
        F.col("action_geo_long").isNotNull()
    ).count()
    
    geo_ratio = (geo_events / total_events * 100) if total_events > 0 else 0
    print(f"\n📍 지리 정보 포함 비율: {geo_ratio:.2f}% ({geo_events:,}/{total_events:,})")

In [None]:
# 톤(Tone) 분석
if 'df' in locals() and df.count() > 0:
    print("😊 톤(Tone) 분석:")
    
    # 톤 통계
    tone_stats = df.filter(F.col("avg_tone").isNotNull()) \
        .select(
            F.min("avg_tone").alias("최소_톤"),
            F.max("avg_tone").alias("최대_톤"),
            F.avg("avg_tone").alias("평균_톤"),
            F.expr("percentile_approx(avg_tone, 0.5)").alias("중간값_톤"),
            F.stddev("avg_tone").alias("표준편차")
        )
    
    print("\n📊 톤 기본 통계:")
    tone_stats.show()
    
    # 톤 분포 (히스토그램용)
    tone_distribution = df.filter(F.col("avg_tone").isNotNull()) \
        .select("avg_tone") \
        .sample(0.1) \
        .toPandas()
    
    if not tone_distribution.empty:
        plt.figure(figsize=(12, 4))
        
        plt.subplot(1, 2, 1)
        plt.hist(tone_distribution['avg_tone'], bins=50, alpha=0.7)
        plt.title('톤 분포 히스토그램')
        plt.xlabel('평균 톤')
        plt.ylabel('빈도')
        plt.axvline(x=0, color='r', linestyle='--', alpha=0.5, label='중립')
        plt.legend()
        
        plt.subplot(1, 2, 2)
        plt.boxplot(tone_distribution['avg_tone'])
        plt.title('톤 분포 박스플롯')
        plt.ylabel('평균 톤')
        
        plt.tight_layout()
        plt.show()
        
        # 긍정/부정/중립 이벤트 비율
        positive = len(tone_distribution[tone_distribution['avg_tone'] > 0])
        negative = len(tone_distribution[tone_distribution['avg_tone'] < 0])
        neutral = len(tone_distribution[tone_distribution['avg_tone'] == 0])
        total = len(tone_distribution)
        
        print(f"\n📊 톤 분류 (샘플 기준):")
        print(f"긍정적: {positive/total*100:.1f}% ({positive:,})")
        print(f"부정적: {negative/total*100:.1f}% ({negative:,})")
        print(f"중립적: {neutral/total*100:.1f}% ({neutral:,})")

In [None]:
# 데이터 품질 확인
if 'df' in locals() and df.count() > 0:
    print("🔍 데이터 품질 확인:")
    
    # 주요 컬럼별 NULL 비율
    key_columns = [
        'global_event_id', 'day', 'actor1_country_code', 'actor2_country_code',
        'event_root_code', 'avg_tone', 'action_geo_country_code'
    ]
    
    total_count = df.count()
    
    print("\n📊 주요 컬럼별 데이터 완성도:")
    for col in key_columns:
        null_count = df.filter(F.col(col).isNull()).count()
        completeness = ((total_count - null_count) / total_count * 100) if total_count > 0 else 0
        print(f"{col:25s}: {completeness:5.1f}% 완성도 ({total_count - null_count:,}/{total_count:,})")
    
    # 중복 이벤트 확인
    unique_events = df.select("global_event_id").distinct().count()
    duplicate_ratio = ((total_count - unique_events) / total_count * 100) if total_count > 0 else 0
    print(f"\n🔄 중복 이벤트: {duplicate_ratio:.2f}% ({total_count - unique_events:,}/{total_count:,})")

In [None]:
# 임시 뷰 생성 (SQL 쿼리용)
if 'df' in locals():
    df.createOrReplaceTempView("gdelt_silver")
    print("✅ 임시 뷰 'gdelt_silver' 생성 완료")
    print("\n💡 이제 SQL 쿼리를 사용할 수 있습니다:")
    print("spark.sql(\"SELECT * FROM gdelt_silver WHERE actor1_country_code = 'USA' LIMIT 10\").show()")

In [None]:
# SQL 쿼리 예제
if 'df' in locals():
    print("🔍 SQL 쿼리 예제:")
    
    # 예제 1: 특정 국가의 최근 이벤트
    print("\n1️⃣ 미국 관련 최근 이벤트 (Top 5):")
    usa_events = spark.sql("""
        SELECT global_event_id, day, actor1_name, actor2_name, event_root_code, avg_tone
        FROM gdelt_silver 
        WHERE actor1_country_code = 'USA' OR actor2_country_code = 'USA'
        ORDER BY day DESC, global_event_id DESC
        LIMIT 5
    """)
    usa_events.show(truncate=False)
    
    # 예제 2: 가장 부정적인 이벤트
    print("\n2️⃣ 가장 부정적인 이벤트 (Top 5):")
    negative_events = spark.sql("""
        SELECT global_event_id, day, actor1_name, actor1_country_code, 
               actor2_name, actor2_country_code, avg_tone
        FROM gdelt_silver 
        WHERE avg_tone IS NOT NULL 
        ORDER BY avg_tone ASC
        LIMIT 5
    """)
    negative_events.show(truncate=False)
    
    # 예제 3: 국가간 관계 분석
    print("\n3️⃣ 미국-중국 관계 이벤트:")
    us_china = spark.sql("""
        SELECT COUNT(*) as 이벤트_수, AVG(avg_tone) as 평균_톤, 
               MIN(avg_tone) as 최소_톤, MAX(avg_tone) as 최대_톤
        FROM gdelt_silver 
        WHERE (actor1_country_code = 'USA' AND actor2_country_code = 'CHN')
           OR (actor1_country_code = 'CHN' AND actor2_country_code = 'USA')
    """)
    us_china.show()

In [None]:
# 세션 정리
print("🧹 분석 완료 및 세션 정리")
spark.stop()
print("✅ Spark 세션 종료 완료")
print("\n📋 분석 요약:")
print("- Silver Delta Table에서 정제된 GDELT 데이터 분석 완료")
print("- 61개 전체 컬럼 스키마 확인")
print("- 국가별, 이벤트별, 지리적, 톤 분석 수행")
print("- 데이터 품질 및 완성도 검증")
print("- SQL 쿼리 예제 제공")