In [None]:
"""
Pipeline para carregar posição mensal de carteira de investimentos
Abordagem funcional
"""

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.window import Window


def extrair_operacoes_mes(spark: SparkSession, mes_ref: str, cpf: str = None) -> DataFrame:
    """
    Extrai operações do mês da tabela silver
    
    Args:
        spark: SparkSession
        mes_ref: Mês de referência no formato 'YYYY-MM'
        cpf: CPF do investidor (opcional)
    """
    query = f"""
    SELECT 
        mes_ref,
        cpf,
        cotista,
        cd_ativo,
        dt_operacao,
        cd_tipo_operacao,
        qt_operacao,
        vl_preco_ativo,
        vl_custo_total,
        vl_liquido,
        qt_estoque,
        vl_pmedio,
        vl_ganho_perda,
        ir_mes,
        vl_vendas_mes,
        ts_insercao
    FROM silver.stg_controle_ativo
    WHERE mes_ref = '{mes_ref}'
    """
    
    if cpf:
        query += f" AND cpf = '{cpf}'"
        
    return spark.sql(query)


def calcular_posicao_final(df_operacoes: DataFrame) -> DataFrame:
    """
    Calcula a posição final de cada ativo no mês
    Pega a última operação de cada ativo
    """
    window_spec = Window.partitionBy("cpf", "cd_ativo", "mes_ref") \
                        .orderBy(F.desc("dt_operacao"), F.desc("ts_insercao"))
    
    return df_operacoes \
        .withColumn("rank", F.row_number().over(window_spec)) \
        .filter(F.col("rank") == 1) \
        .select(
            "mes_ref",
            "cpf",
            "cotista",
            "cd_ativo",
            "qt_estoque",
            "vl_pmedio",
            F.expr("qt_estoque * vl_pmedio").alias("vl_posicao"),
            "vl_ganho_perda",
            "ir_mes",
            "vl_vendas_mes"
        )


def agregar_por_cotista(df_posicao: DataFrame) -> DataFrame:
    """
    Agrega métricas por cotista
    """
    return df_posicao.groupBy("mes_ref", "cpf", "cotista").agg(
        F.count("cd_ativo").alias("qt_ativos_carteira"),
        F.sum("qt_estoque").alias("qt_total_acoes"),
        F.sum("vl_posicao").alias("vl_total_carteira"),
        F.sum("vl_ganho_perda").alias("vl_total_ganho_perda"),
        F.sum("ir_mes").alias("vl_total_ir_mes"),
        F.sum("vl_vendas_mes").alias("vl_total_vendas_mes"),
        F.avg("vl_pmedio").alias("vl_pmedio_ponderado")
    )


def calcular_metricas_financeiras(df_agregado: DataFrame) -> DataFrame:
    """
    Adiciona métricas financeiras calculadas
    """
    return df_agregado \
        .withColumn(
            "pc_rentabilidade_mes",
            F.when(F.col("vl_total_carteira") > 0,
                   (F.col("vl_total_ganho_perda") / F.col("vl_total_carteira")) * 100
            ).otherwise(0)
        ) \
        .withColumn(
            "pc_ir_sobre_ganho",
            F.when(F.col("vl_total_ganho_perda") > 0,
                   (F.col("vl_total_ir_mes") / F.col("vl_total_ganho_perda")) * 100
            ).otherwise(0)
        ) \
        .withColumn(
            "vl_liquido_apos_ir",
            F.col("vl_total_ganho_perda") - F.col("vl_total_ir_mes")
        ) \
        .withColumn(
            "ts_processamento",
            F.current_timestamp()
        )


def salvar_posicao_mensal(df_gold: DataFrame, modo: str = "append"):
    """
    Salva posição mensal agregada na camada gold
    """
    df_gold.write \
        .format("delta") \
        .mode(modo) \
        .option("mergeSchema", "true") \
        .saveAsTable("gold.posicao_mensal_carteira")
    
    print(f"✓ Posição mensal salva em gold.posicao_mensal_carteira")


def salvar_posicao_detalhada(df_posicao: DataFrame, modo: str = "append"):
    """
    Salva posição detalhada por ativo na camada gold
    """
    df_detalhada = df_posicao.withColumn("ts_processamento", F.current_timestamp())
    
    df_detalhada.write \
        .format("delta") \
        .mode(modo) \
        .option("mergeSchema", "true") \
        .saveAsTable("gold.posicao_detalhada_ativo")
    
    print(f"✓ Posição detalhada salva em gold.posicao_detalhada_ativo")


def executar_pipeline(spark: SparkSession, mes_ref: str, cpf: str = None):
    """
    Executa a pipeline completa de carga da posição mensal
    
    Args:
        spark: SparkSession
        mes_ref: Mês de referência no formato 'YYYY-MM'
        cpf: CPF do investidor (opcional)
        
    Returns:
        tuple: (df_gold, df_posicao)
    """
    print(f"Iniciando pipeline para {mes_ref}...")
    
    # 1. Extrair operações
    print("1. Extraindo operações da silver...")
    df_operacoes = extrair_operacoes_mes(spark, mes_ref, cpf)
    count_operacoes = df_operacoes.count()
    print(f"   → {count_operacoes} operações encontradas")
    
    if count_operacoes == 0:
        print("⚠ Nenhuma operação encontrada para o período")
        return None, None
    
    # 2. Calcular posição final por ativo
    print("2. Calculando posição final por ativo...")
    df_posicao = calcular_posicao_final(df_operacoes)
    print(f"   → {df_posicao.count()} ativos na carteira")
    
    # 3. Agregar por cotista
    print("3. Agregando métricas por cotista...")
    df_agregado = agregar_por_cotista(df_posicao)
    
    # 4. Calcular métricas financeiras
    print("4. Calculando métricas financeiras...")
    df_gold = calcular_metricas_financeiras(df_agregado)
    
    # 5. Salvar na camada gold
    print("5. Salvando na camada gold...")
    salvar_posicao_mensal(df_gold)
    salvar_posicao_detalhada(df_posicao)
    
    # 6. Exibir resumo
    print("\n" + "="*60)
    print("RESUMO DA POSIÇÃO MENSAL")
    print("="*60)
    df_gold.select(
        "cotista",
        "qt_ativos_carteira",
        "vl_total_carteira",
        "vl_total_ganho_perda",
        "pc_rentabilidade_mes",
        "vl_total_ir_mes"
    ).show(truncate=False)
    
    print("\n✓ Pipeline executada com sucesso!")
    
    return df_gold, df_posicao


# =============================================================================
# DDLs DAS TABELAS GOLD
# =============================================================================

def criar_tabelas_gold(spark: SparkSession):
    """
    Cria as tabelas da camada gold se não existirem
    """
    
    # Tabela de posição mensal agregada
    spark.sql("""
    CREATE TABLE IF NOT EXISTS gold.posicao_mensal_carteira (
        mes_ref STRING COMMENT 'Mês de referência (YYYY-MM)',
        cpf STRING COMMENT 'CPF do investidor',
        cotista STRING COMMENT 'Nome do cotista',
        qt_ativos_carteira INT COMMENT 'Quantidade de ativos diferentes na carteira',
        qt_total_acoes BIGINT COMMENT 'Quantidade total de ações/cotas',
        vl_total_carteira DECIMAL(15,2) COMMENT 'Valor total da carteira',
        vl_total_ganho_perda DECIMAL(15,2) COMMENT 'Ganho/perda total no mês',
        vl_total_ir_mes DECIMAL(15,2) COMMENT 'IR total do mês',
        vl_total_vendas_mes DECIMAL(15,2) COMMENT 'Volume total de vendas',
        vl_pmedio_ponderado DECIMAL(11,2) COMMENT 'Preço médio ponderado geral',
        pc_rentabilidade_mes DECIMAL(8,4) COMMENT 'Rentabilidade % do mês',
        pc_ir_sobre_ganho DECIMAL(8,4) COMMENT '% de IR sobre o ganho',
        vl_liquido_apos_ir DECIMAL(15,2) COMMENT 'Valor líquido após IR',
        ts_processamento TIMESTAMP COMMENT 'Timestamp do processamento'
    )
    USING DELTA
    PARTITIONED BY (mes_ref)
    COMMENT 'Posição mensal agregada da carteira por cotista'
    """)
    
    # Tabela de posição detalhada por ativo
    spark.sql("""
    CREATE TABLE IF NOT EXISTS gold.posicao_detalhada_ativo (
        mes_ref STRING COMMENT 'Mês de referência (YYYY-MM)',
        cpf STRING COMMENT 'CPF do investidor',
        cotista STRING COMMENT 'Nome do cotista',
        cd_ativo STRING COMMENT 'Código do ativo',
        qt_estoque INT COMMENT 'Quantidade em estoque no fim do mês',
        vl_pmedio DECIMAL(11,2) COMMENT 'Preço médio do ativo',
        vl_posicao DECIMAL(15,2) COMMENT 'Valor da posição (qt * preço médio)',
        vl_ganho_perda DECIMAL(15,2) COMMENT 'Ganho/perda do ativo no mês',
        ir_mes DECIMAL(11,2) COMMENT 'IR do ativo no mês',
        vl_vendas_mes DECIMAL(11,2) COMMENT 'Volume de vendas do ativo',
        ts_processamento TIMESTAMP COMMENT 'Timestamp do processamento'
    )
    USING DELTA
    PARTITIONED BY (mes_ref)
    COMMENT 'Posição detalhada por ativo no mês'
    """)
    
    print("✓ Tabelas gold criadas com sucesso")


# =============================================================================
# EXEMPLO DE USO
# =============================================================================

"""
# Criar SparkSession
spark = SparkSession.builder.appName("PosicaoMensal").getOrCreate()

# Criar tabelas gold (primeira execução)
criar_tabelas_gold(spark)

# Executar pipeline para outubro/2025
df_gold, df_detalhada = executar_pipeline(spark, "2025-10")

# Ou processar para um CPF específico
df_gold, df_detalhada = executar_pipeline(spark, "2025-10", cpf="12345678900")

# Consultar resultados
spark.sql(\"\"\"
    SELECT 
        mes_ref,
        cotista,
        qt_ativos_carteira,
        vl_total_carteira,
        pc_rentabilidade_mes,
        vl_liquido_apos_ir
    FROM gold.posicao_mensal_carteira
    WHERE mes_ref = '2025-10'
    ORDER BY vl_total_carteira DESC
\"\"\").show()

# Consultar posição detalhada de um cotista
spark.sql(\"\"\"
    SELECT 
        cd_ativo,
        qt_estoque,
        vl_pmedio,
        vl_posicao,
        vl_ganho_perda
    FROM gold.posicao_detalhada_ativo
    WHERE mes_ref = '2025-10' 
      AND cpf = '12345678900'
    ORDER BY vl_posicao DESC
\"\"\").show()
"""