# ⚡ Processamento com Apache Spark - Big Data Finance

Este notebook demonstra como usar Apache Spark para processamento distribuído de dados financeiros.

## Objetivos
- Configurar e inicializar Spark Session
- Carregar dados financeiros no Spark
- Realizar transformações distribuídas
- Calcular indicadores técnicos
- Salvar resultados no HDFS
- Demonstrar otimizações de performance

**Autor:** Ana Luiza Pazze (Arquitetura e Infraestrutura) & Equipe Big Data Finance  
**Gestão:** Fabio  
**Processamento Spark:** Ana Luiza Pazze  
**Data:** 2024

In [1]:
# Imports necessários
import sys
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# Adicionar src ao path
sys.path.append('../src')

# Imports do Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *

# Imports dos módulos do projeto
from infrastructure.spark_manager import SparkManager
from infrastructure.hdfs_manager import HDFSManager

print("✅ Imports realizados com sucesso!")

✅ Imports realizados com sucesso!


## 1. ⚙️ Configuração do Ambiente Spark

Vamos configurar e inicializar o Spark para processamento distribuído.

In [2]:
# Inicializar Spark Manager
spark_manager = SparkManager(app_name="BigDataFinance_Processing")

# Criar Spark Session otimizada
spark = spark_manager.create_spark_session(
    master="local[*]",
    executor_memory="2g",
    driver_memory="2g"
)

print("✅ Spark Session criada com sucesso!")
print(f"🔧 Spark Version: {spark.version}")
print(f"🔧 Application ID: {spark.sparkContext.applicationId}")
print(f"🔧 Master: {spark.sparkContext.master}")

# Configurações do Spark
print("\n⚙️ Configurações do Spark:")
print("=" * 40)
configs = spark.sparkContext.getConf().getAll()
for key, value in sorted(configs):
    if 'spark.sql' in key or 'spark.executor' in key or 'spark.driver' in key:
        print(f"{key}: {value}")

Erro ao criar sessão Spark: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.UnsupportedOperationException: getSubject is supported only if a security manager is allowed
	at java.base/javax.security.auth.Subject.getSubject(Subject.java:347)
	at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:588)
	at org.apache.spark.util.Utils$.$anonfun$getCurrentUserName$1(Utils.scala:2446)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2446)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:339)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:59)
	at java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:501)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.UnsupportedOperationException: getSubject is supported only if a security manager is allowed
	at java.base/javax.security.auth.Subject.getSubject(Subject.java:347)
	at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:588)
	at org.apache.spark.util.Utils$.$anonfun$getCurrentUserName$1(Utils.scala:2446)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2446)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:339)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:59)
	at java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:501)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:485)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:1575)


## 2. 📊 Carregamento de Dados

Vamos carregar os dados financeiros coletados anteriormente no Spark.

In [None]:
# Verificar se existem dados para carregar
data_dir = '../data/raw'
stock_files = [f for f in os.listdir(data_dir) if f.startswith('stock_data_') and f.endswith('.csv')]

if stock_files:
    # Usar o arquivo mais recente
    latest_file = sorted(stock_files)[-1]
    stock_file_path = os.path.join(data_dir, latest_file)
    print(f"📁 Carregando dados de: {latest_file}")
else:
    print("⚠️ Nenhum arquivo de dados encontrado. Execute primeiro o notebook 01_data_collection_example.ipynb")
    # Criar dados de exemplo para demonstração
    print("🔧 Criando dados de exemplo...")
    
    # Gerar dados sintéticos
    dates = pd.date_range(start='2023-01-01', end='2024-01-01', freq='D')
    symbols = ['AAPL', 'GOOGL', 'MSFT', 'TSLA', 'AMZN']
    
    data = []
    for symbol in symbols:
        base_price = np.random.uniform(100, 300)
        for date in dates:
            price_change = np.random.normal(0, 0.02)
            base_price *= (1 + price_change)
            
            data.append({
                'date': date.strftime('%Y-%m-%d'),
                'symbol': symbol,
                'open': base_price * np.random.uniform(0.98, 1.02),
                'high': base_price * np.random.uniform(1.00, 1.05),
                'low': base_price * np.random.uniform(0.95, 1.00),
                'close': base_price,
                'volume': np.random.randint(1000000, 10000000)
            })
    
    # Salvar dados sintéticos
    os.makedirs(data_dir, exist_ok=True)
    stock_file_path = os.path.join(data_dir, 'stock_data_synthetic.csv')
    pd.DataFrame(data).to_csv(stock_file_path, index=False)
    print(f"✅ Dados sintéticos criados: {stock_file_path}")

In [None]:
# Carregar dados no Spark
print("📊 Carregando dados no Spark...")

# Definir schema para otimizar carregamento
schema = StructType([
    StructField("date", StringType(), True),
    StructField("symbol", StringType(), True),
    StructField("open", DoubleType(), True),
    StructField("high", DoubleType(), True),
    StructField("low", DoubleType(), True),
    StructField("close", DoubleType(), True),
    StructField("volume", LongType(), True)
])

# Carregar DataFrame
df = spark.read.csv(
    stock_file_path,
    header=True,
    schema=schema
)

# Converter coluna de data
df = df.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))

# Cache para melhor performance
df.cache()

print(f"✅ Dados carregados: {df.count():,} registros")
print(f"📋 Schema:")
df.printSchema()

# Visualizar primeiros registros
print("\n📊 Primeiros registros:")
df.show(10)

## 3. 🔄 Transformações Básicas

Vamos realizar transformações básicas nos dados usando Spark SQL.

In [None]:
# Registrar DataFrame como tabela temporária
df.createOrReplaceTempView("stock_data")

# Estatísticas básicas por símbolo
print("📊 Estatísticas por Símbolo:")
stats_df = spark.sql("""
    SELECT 
        symbol,
        COUNT(*) as records,
        MIN(date) as start_date,
        MAX(date) as end_date,
        ROUND(AVG(close), 2) as avg_price,
        ROUND(MIN(close), 2) as min_price,
        ROUND(MAX(close), 2) as max_price,
        ROUND(AVG(volume), 0) as avg_volume
    FROM stock_data 
    GROUP BY symbol
    ORDER BY symbol
""")

stats_df.show()

# Converter para Pandas para visualização
stats_pandas = stats_df.toPandas()
print(f"\n📈 Resumo: {len(stats_pandas)} símbolos analisados")

## 4. 📈 Cálculo de Indicadores Técnicos

Vamos calcular indicadores técnicos usando Window Functions do Spark.

In [None]:
# Definir window specifications
window_spec = Window.partitionBy("symbol").orderBy("date")
window_7d = Window.partitionBy("symbol").orderBy("date").rowsBetween(-6, 0)
window_20d = Window.partitionBy("symbol").orderBy("date").rowsBetween(-19, 0)
window_50d = Window.partitionBy("symbol").orderBy("date").rowsBetween(-49, 0)

print("📊 Calculando indicadores técnicos...")

# Calcular indicadores
df_indicators = df.withColumn(
    "daily_return", 
    (col("close") / lag("close", 1).over(window_spec) - 1) * 100
).withColumn(
    "sma_7", 
    avg("close").over(window_7d)
).withColumn(
    "sma_20", 
    avg("close").over(window_20d)
).withColumn(
    "sma_50", 
    avg("close").over(window_50d)
).withColumn(
    "volatility_7d", 
    stddev("daily_return").over(window_7d)
).withColumn(
    "price_change", 
    col("close") - col("open")
).withColumn(
    "price_range", 
    col("high") - col("low")
).withColumn(
    "volume_sma_20", 
    avg("volume").over(window_20d)
)

# Cache do resultado
df_indicators.cache()

print("✅ Indicadores calculados!")
print("\n📊 Exemplo de dados com indicadores:")
df_indicators.select(
    "date", "symbol", "close", "daily_return", 
    "sma_7", "sma_20", "volatility_7d"
).filter(
    col("symbol") == "AAPL"
).orderBy(
    desc("date")
).show(10)

### 4.1 Análise de Sinais de Trading

In [None]:
# Gerar sinais de trading baseados em médias móveis
print("📈 Gerando sinais de trading...")

df_signals = df_indicators.withColumn(
    "signal",
    when(col("sma_7") > col("sma_20"), "BUY")
    .when(col("sma_7") < col("sma_20"), "SELL")
    .otherwise("HOLD")
).withColumn(
    "trend",
    when(col("close") > col("sma_50"), "UPTREND")
    .when(col("close") < col("sma_50"), "DOWNTREND")
    .otherwise("SIDEWAYS")
).withColumn(
    "volatility_level",
    when(col("volatility_7d") > 3.0, "HIGH")
    .when(col("volatility_7d") > 1.5, "MEDIUM")
    .otherwise("LOW")
)

# Análise de sinais por símbolo
signals_summary = spark.sql("""
    SELECT 
        symbol,
        signal,
        COUNT(*) as count,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (PARTITION BY symbol), 2) as percentage
    FROM (
        SELECT symbol, 
               CASE 
                   WHEN sma_7 > sma_20 THEN 'BUY'
                   WHEN sma_7 < sma_20 THEN 'SELL'
                   ELSE 'HOLD'
               END as signal
        FROM stock_data_indicators
        WHERE sma_7 IS NOT NULL AND sma_20 IS NOT NULL
    )
    GROUP BY symbol, signal
    ORDER BY symbol, signal
""")

# Registrar nova tabela
df_signals.createOrReplaceTempView("stock_data_indicators")

print("📊 Distribuição de Sinais por Símbolo:")
signals_summary.show()

## 5. 🔍 Análises Avançadas com Spark SQL

Vamos realizar análises mais complexas usando Spark SQL.

In [None]:
# Análise de performance mensal
print("📅 Análise de Performance Mensal:")

monthly_performance = spark.sql("""
    WITH monthly_data AS (
        SELECT 
            symbol,
            YEAR(date) as year,
            MONTH(date) as month,
            FIRST_VALUE(close) OVER (
                PARTITION BY symbol, YEAR(date), MONTH(date) 
                ORDER BY date
            ) as month_open,
            LAST_VALUE(close) OVER (
                PARTITION BY symbol, YEAR(date), MONTH(date) 
                ORDER BY date 
                ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
            ) as month_close,
            AVG(volume) as avg_volume
        FROM stock_data_indicators
    )
    SELECT DISTINCT
        symbol,
        year,
        month,
        ROUND((month_close / month_open - 1) * 100, 2) as monthly_return,
        ROUND(avg_volume, 0) as avg_volume
    FROM monthly_data
    ORDER BY symbol, year, month
""")

monthly_performance.show(20)

# Estatísticas de performance
print("\n📊 Estatísticas de Performance:")
performance_stats = spark.sql("""
    WITH monthly_returns AS (
        SELECT 
            symbol,
            ROUND((month_close / month_open - 1) * 100, 2) as monthly_return
        FROM (
            SELECT DISTINCT
                symbol,
                YEAR(date) as year,
                MONTH(date) as month,
                FIRST_VALUE(close) OVER (
                    PARTITION BY symbol, YEAR(date), MONTH(date) 
                    ORDER BY date
                ) as month_open,
                LAST_VALUE(close) OVER (
                    PARTITION BY symbol, YEAR(date), MONTH(date) 
                    ORDER BY date 
                    ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
                ) as month_close
            FROM stock_data_indicators
        )
    )
    SELECT 
        symbol,
        ROUND(AVG(monthly_return), 2) as avg_monthly_return,
        ROUND(STDDEV(monthly_return), 2) as volatility,
        ROUND(MIN(monthly_return), 2) as worst_month,
        ROUND(MAX(monthly_return), 2) as best_month,
        COUNT(*) as months_analyzed
    FROM monthly_returns
    GROUP BY symbol
    ORDER BY avg_monthly_return DESC
""")

performance_stats.show()

### 5.1 Análise de Correlações

In [None]:
# Calcular correlações entre ativos
print("🔗 Análise de Correlações entre Ativos:")

# Pivot dos retornos diários
returns_pivot = spark.sql("""
    SELECT 
        date,
        MAX(CASE WHEN symbol = 'AAPL' THEN daily_return END) as AAPL,
        MAX(CASE WHEN symbol = 'GOOGL' THEN daily_return END) as GOOGL,
        MAX(CASE WHEN symbol = 'MSFT' THEN daily_return END) as MSFT,
        MAX(CASE WHEN symbol = 'TSLA' THEN daily_return END) as TSLA,
        MAX(CASE WHEN symbol = 'AMZN' THEN daily_return END) as AMZN
    FROM stock_data_indicators
    WHERE daily_return IS NOT NULL
    GROUP BY date
    ORDER BY date
""")

returns_pivot.cache()
print("📊 Matriz de retornos criada")
returns_pivot.show(10)

# Converter para Pandas para calcular correlações
returns_pandas = returns_pivot.toPandas().set_index('date')
correlation_matrix = returns_pandas.corr()

print("\n🔗 Matriz de Correlação:")
print(correlation_matrix.round(3))

## 6. 💾 Salvamento no HDFS

Vamos salvar os dados processados no HDFS para uso posterior.

In [None]:
# Inicializar HDFS Manager (simulado para ambiente local)
print("💾 Preparando salvamento dos dados processados...")

# Criar diretório de saída
output_dir = '../data/processed'
os.makedirs(output_dir, exist_ok=True)

# Salvar dados com indicadores
print("📊 Salvando dados com indicadores técnicos...")
indicators_path = f"{output_dir}/stock_indicators"

# Salvar como Parquet (formato otimizado)
df_signals.coalesce(1).write.mode("overwrite").parquet(indicators_path)
print(f"✅ Dados salvos em: {indicators_path}")

# Salvar estatísticas de performance
print("📈 Salvando estatísticas de performance...")
performance_path = f"{output_dir}/performance_stats"
performance_stats.coalesce(1).write.mode("overwrite").parquet(performance_path)
print(f"✅ Estatísticas salvas em: {performance_path}")

# Salvar dados mensais
print("📅 Salvando dados de performance mensal...")
monthly_path = f"{output_dir}/monthly_performance"
monthly_performance.coalesce(1).write.mode("overwrite").parquet(monthly_path)
print(f"✅ Dados mensais salvos em: {monthly_path}")

# Salvar como CSV também para compatibilidade
print("📄 Salvando versões CSV...")
df_signals.coalesce(1).write.mode("overwrite").option("header", "true").csv(f"{output_dir}/stock_indicators_csv")
performance_stats.coalesce(1).write.mode("overwrite").option("header", "true").csv(f"{output_dir}/performance_stats_csv")

print("\n✅ Todos os dados processados foram salvos com sucesso!")

## 7. 📊 Visualizações dos Resultados

Vamos criar algumas visualizações dos dados processados.

In [None]:
# Converter dados para visualização
performance_pandas = performance_stats.toPandas()
monthly_pandas = monthly_performance.toPandas()

# Gráfico de performance média mensal
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
performance_pandas.set_index('symbol')['avg_monthly_return'].plot(kind='bar', color='skyblue')
plt.title('📈 Retorno Médio Mensal por Ativo', fontweight='bold')
plt.ylabel('Retorno (%)')
plt.xticks(rotation=45)
plt.grid(True, alpha=0.3)

# Gráfico de volatilidade
plt.subplot(1, 2, 2)
performance_pandas.set_index('symbol')['volatility'].plot(kind='bar', color='coral')
plt.title('📊 Volatilidade Mensal por Ativo', fontweight='bold')
plt.ylabel('Volatilidade (%)')
plt.xticks(rotation=45)
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Heatmap de correlações
plt.figure(figsize=(10, 8))
sns.heatmap(correlation_matrix, 
            annot=True, 
            cmap='RdBu_r', 
            center=0,
            square=True, 
            linewidths=0.5,
            fmt='.3f')
plt.title('🔗 Matriz de Correlação dos Retornos (Spark Processing)', fontweight='bold')
plt.tight_layout()
plt.show()

## 8. ⚡ Otimizações de Performance

Vamos demonstrar algumas otimizações importantes do Spark.

In [None]:
# Análise de performance do Spark
print("⚡ Análise de Performance do Spark")
print("=" * 40)

# Informações sobre cache
print("💾 DataFrames em Cache:")
cached_tables = spark.catalog.listTables()
for table in cached_tables:
    if table.isTemporary:
        print(f"  - {table.name}")

# Estatísticas do Spark Context
sc = spark.sparkContext
print(f"\n📊 Estatísticas do Spark Context:")
print(f"  - Application ID: {sc.applicationId}")
print(f"  - Default Parallelism: {sc.defaultParallelism}")
print(f"  - Status: {sc.statusTracker().getExecutorInfos()}")

# Exemplo de particionamento otimizado
print("\n🔧 Otimização de Particionamento:")
print(f"Partições atuais do DataFrame: {df_signals.rdd.getNumPartitions()}")

# Reparticionamento por símbolo para otimizar operações por grupo
df_optimized = df_signals.repartition(col("symbol"))
print(f"Partições após reparticionamento: {df_optimized.rdd.getNumPartitions()}")

# Exemplo de broadcast join (simulado)
print("\n📡 Exemplo de Broadcast Join:")
# Criar pequena tabela de metadados
metadata = spark.createDataFrame([
    ("AAPL", "Technology", "Apple Inc."),
    ("GOOGL", "Technology", "Alphabet Inc."),
    ("MSFT", "Technology", "Microsoft Corp."),
    ("TSLA", "Automotive", "Tesla Inc."),
    ("AMZN", "E-commerce", "Amazon.com Inc.")
], ["symbol", "sector", "company_name"])

# Broadcast da tabela pequena
from pyspark.sql.functions import broadcast
df_with_metadata = df_signals.join(
    broadcast(metadata), 
    "symbol", 
    "left"
)

print("✅ Broadcast join configurado para otimizar performance")

## 9. 🔍 Monitoramento e Debugging

Vamos ver como monitorar jobs do Spark.

In [None]:
# Informações sobre jobs executados
print("🔍 Informações de Jobs do Spark")
print("=" * 35)

# Status tracker
status_tracker = sc.statusTracker()

# Informações dos executors
executor_infos = status_tracker.getExecutorInfos()
print(f"📊 Número de Executors: {len(executor_infos)}")

for executor in executor_infos:
    print(f"\n🔧 Executor {executor.executorId}:")
    print(f"  - Host: {executor.host}")
    print(f"  - Cores: {executor.totalCores}")
    print(f"  - Memória Máxima: {executor.maxMemory / (1024**3):.2f} GB")
    print(f"  - Tasks Ativas: {executor.activeTasks}")
    print(f"  - Tasks Completadas: {executor.completedTasks}")

# URL da Spark UI
spark_ui_url = spark.sparkContext.uiWebUrl
if spark_ui_url:
    print(f"\n🌐 Spark UI disponível em: {spark_ui_url}")
else:
    print("\n⚠️ Spark UI não disponível (modo local)")

# Exemplo de explain plan
print("\n📋 Plano de Execução (Explain):")
print("=" * 40)
df_signals.filter(col("symbol") == "AAPL").select("date", "close", "sma_20").explain(True)

## 10. 🧹 Limpeza e Finalização

Vamos limpar recursos e finalizar a sessão Spark.

In [None]:
# Limpar cache
print("🧹 Limpando cache...")
spark.catalog.clearCache()

# Unpersist DataFrames
df.unpersist()
df_indicators.unpersist()
returns_pivot.unpersist()

print("✅ Cache limpo")

# Estatísticas finais
print("\n📊 RESUMO DO PROCESSAMENTO SPARK")
print("=" * 45)
print(f"📈 Registros processados: {df.count():,}")
print(f"🔧 Indicadores calculados: 8 (SMA, volatilidade, sinais, etc.)")
print(f"💾 Arquivos salvos: 6 (Parquet + CSV)")
print(f"⚡ Executors utilizados: {len(executor_infos)}")
print(f"🎯 Performance: Otimizada com cache e particionamento")

print("\n🎯 PRÓXIMOS PASSOS:")
print("=" * 25)
print("1. 📊 Análise estatística avançada")
print("2. 🤖 Aplicar modelos de ML")
print("3. 💭 Análise de sentimentos")
print("4. 📈 Dashboards interativos")
print("5. 🔄 Pipeline automatizado")

print("\n✨ Processamento Spark concluído com sucesso!")

In [None]:
# Finalizar Spark Session
print("🔚 Finalizando Spark Session...")
spark.stop()
print("✅ Spark Session finalizada")

print("\n🎉 Notebook concluído com sucesso!")
print("📁 Dados processados disponíveis em: ../data/processed/")
print("🔄 Execute o próximo notebook para análises de ML")

---

## 📚 Referências e Links Úteis

- **Apache Spark**: [spark.apache.org](https://spark.apache.org/)
- **PySpark Documentation**: [spark.apache.org/docs/latest/api/python/](https://spark.apache.org/docs/latest/api/python/)
- **Spark SQL Guide**: [spark.apache.org/docs/latest/sql-programming-guide.html](https://spark.apache.org/docs/latest/sql-programming-guide.html)
- **Performance Tuning**: [spark.apache.org/docs/latest/tuning.html](https://spark.apache.org/docs/latest/tuning.html)

---

**Desenvolvido pela Equipe Big Data Finance**  
**Notebook:** 02_spark_processing_example.ipynb  
**Versão:** 1.0