# Análise por Eventos lendo direto do HDFS
Este notebook conecta-se ao cluster Spark (standalone) e lê as janelas de eventos direto do HDFS, gerando gráficos por evento com linhas por **categoria**.

In [2]:
import os, json, re
from datetime import datetime
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession, functions as F

sns.set(style='whitegrid')
events_cfg = '/opt/config/events.json'  # montado no container

spark = (SparkSession.builder
         .appName('EventAnalysisNotebook')
         .master('spark://spark-master:7077')
         .config('spark.hadoop.fs.defaultFS','hdfs://namenode:8020')
         .getOrCreate())

def slugify(name: str) -> str:
    return re.sub(r'[^a-z0-9_\-]', '-', name.lower().replace(' ', '-'))

def pick_price_column_spark(columns):
    if 'AdjClose' in columns: return 'AdjClose'
    if 'Adj Close' in columns: return 'Adj Close'
    return 'Close'

def normalize_by_anchor_spark(df, anchor_date: str):
    df = df.withColumn('date', F.to_date('date'))
    price_col = pick_price_column_spark(df.columns)
    # baseline: valor por categoria no dia da âncora; fallback: primeiro dia disponível
    anchor_df = (df.filter(F.col('date') == F.lit(anchor_date))
                   .groupBy('category').agg(F.first(price_col).alias('baseline')))
    if anchor_df.rdd.isEmpty():
        first_day = df.groupBy('category').agg(F.min('date').alias('min_date'))
        anchor_df = (df.join(first_day, ['category'])
                       .filter(F.col('date') == F.col('min_date'))
                       .groupBy('category').agg(F.first(price_col).alias('baseline')))
    out = (df.join(anchor_df, ['category'], 'left')
             .withColumn('index', (F.col(price_col) / F.col('baseline')) * F.lit(100.0)))
    return out

def aggregate_for_plot(df):
    return (df.groupBy('category','date')
              .agg(F.mean('index').alias('index'))
              .orderBy('date'))

events = json.load(open(events_cfg, 'r'))
events


ModuleNotFoundError: No module named 'seaborn'

In [None]:
for ev in events:
    name = ev['name']
    anchor = ev['anchor_date']
    slug = slugify(name)
    print(f'Evento: {name} (âncora: {anchor})')

    base = 'hdfs://namenode:8020/datasets/yahoo_finance/curated/event_windows'

    win_paths = {
        'pre': f"{base}/{slug}/pre",
        'during': f"{base}/{slug}/during",
        'post': f"{base}/{slug}/post"
    }

    dfs = {}
    for w, p in win_paths.items():
        sdf = spark.read.parquet(p)
        sdf = normalize_by_anchor_spark(sdf, anchor)
        dfs[w] = aggregate_for_plot(sdf).toPandas()

    fig, ax = plt.subplots(figsize=(10,5))
    for w, dfp in dfs.items():
        for cat in sorted(dfp['category'].unique()):
            dcat = dfp[dfp['category']==cat]
            ax.plot(dcat['date'], dcat['index'], label=f'{cat} ({w})')
    ax.axvline(datetime.strptime(anchor, '%Y-%m-%d'), color='black', linestyle='--', linewidth=1, label='âncora')
    ax.set_title(name)
    ax.set_ylabel('Índice (base=100)')
    ax.legend(loc='best', fontsize=8)
    plt.show()

spark.stop()
