# PAXPE - Ingestão de dados para banco de dados SQL do Azure usando serviços do Azure

## Visão geral

Este notebook demonstra como agendar um script Python para ingerir dados em um Banco de Dados SQL do Postgres, orquestrado pelo airflow.

# Documentação do Processo de Criação de Tabelas com Dados do Yahoo Finance

## Objetivo
O objetivo deste processo é obter dados financeiros, de mercado, dividendos, valuation e informações gerais de empresas listadas na bolsa, utilizando a API do Yahoo Finance. Os dados são coletados para um ou mais tickers e organizados em DataFrames utilizando PySpark para garantir performance e escalabilidade, especialmente ao lidar com uma grande quantidade de tickers.

1. **Coleta de Dados**: 
   - Para cada ticker fornecido, as informações relevantes foram extraídas da API do Yahoo Finance utilizando a biblioteca `yfinance`. Os dados foram organizados em dicionários para posterior conversão em DataFrames.

2. **Criação dos DataFrames**:
   - **Tabela Geral** (`df_geral`): Contém informações gerais da empresa, como setor, indústria, número de empregados, localização e resumo das atividades.
   - **Tabela Financeira** (`df_financeira`): Contém dados financeiros da empresa, como capitalização de mercado, receita, lucro líquido, EBITDA, dívida total, entre outros.
   - **Tabela de Mercado** (`df_mercado`): Inclui dados relacionados ao mercado, como preço atual, preço de abertura, volume de negociação, beta, entre outros.
   - **Tabela de Dividendos** (`df_dividendos`): Contém informações sobre dividendos, incluindo taxa de dividendos, data ex-dividendo e índice de distribuição.
   - **Tabela de Valuation** (`df_valuation`): Inclui dados de valuation da empresa, como índices P/E (Price to Earnings), P/B (Price to Book) e PEG (Price/Earnings to Growth).
   - **Tabela de Retorno Mensal** (`df_retorno_mensal`): Retorno mensal da ação com base em preço da ação, dividendos e percentual

## Considerações Finais
Este processo permite a coleta eficiente e escalável de dados financeiros de várias empresas, facilitando análises complexas em grandes volumes de dados. O uso do PySpark garante que mesmo listas extensas de tickers possam ser processadas rapidamente, gerando tabelas estruturadas e prontas para análise.


# Changelog

| Responsável | Data       | Change Log                                                                                      |
|-------------|------------|--------------------------------------------------------------------------------------------------|
| IGOR MENDES | 10-08-24 | Criação do script em spark                   |
| IGOR MENDES | 28-08-24 | Criação da logica de upsert com o postgresSQL                |
| IGOR MENDES | 20-10-24 | Sprint 3 - adicionando indices a um dataframe               |

In [1]:
#fontes - yahoo finance api
!pip install yahoofinance
!pip install yahooquery
!pip install yfinance
!pip install psycopg2-binary

Defaulting to user installation because normal site-packages is not writeable









[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


Defaulting to user installation because normal site-packages is not writeable









[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


Defaulting to user installation because normal site-packages is not writeable







[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


Defaulting to user installation because normal site-packages is not writeable





[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [2]:
import findspark
findspark.init()  # Inicializa o Spark
findspark.find()  # Verifica se o Spark está corretamente configurado

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("PAXPE")
    .config("spark.sql.session.timeZone", "America/Sao_Paulo")  # Define o fuso horário para São Paulo
    .config("spark.driver.memory", "16g")  # Memória do driver
    .config("spark.executor.memory", "12g")  # Memória para cada executor (ajuste conforme a carga)
    .config("spark.executor.cores", "8")  # Núcleos por executor
    .config("spark.cores.max", "24")  # Total de núcleos disponíveis
    .config("spark.dynamicAllocation.enabled", "true")
    .config("spark.dynamicAllocation.minExecutors", "2")
    .config("spark.dynamicAllocation.maxExecutors", "10")
    .config("spark.dynamicAllocation.initialExecutors", "4")
    .config("spark.default.parallelism", "24")  # Nível de paralelismo
    .config("spark.memory.fraction", "0.8")  # Memória usada para armazenamento e execução
    .config("spark.memory.storageFraction", "0.5")  # Memória usada para armazenamento
    .config("spark.jars", "/opt/airflow/jars/postgresql-42.7.4.jar")
    .getOrCreate()
)


spark

/home/airflow/.local/lib/python3.8/site-packages/pyspark/bin/load-spark-env.sh: line 68: ps: command not found


24/10/25 21:39:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
import pandas as pd

import yfinance as yf
from yfinance import Ticker
#api yahoo
from yahooquery import Screener, Ticker


#criar timestamps e automatizar a safra de tempo da análise
# apagar depois que tiver usando a api do spark sql
from datetime import datetime, timedelta

from pyspark.sql.functions import col, lit, when, lag, current_timestamp , date_format, from_utc_timestamp
from pyspark.sql.types import StructType, StructField, StringType, FloatType, LongType, DateType, DoubleType,IntegerType
from pyspark.sql.window import Window


import psycopg2
from psycopg2 import OperationalError
import sys


In [4]:
def obter_empresas_ativas():
    screener = Screener()
    dados = screener.get_screeners('most_actives', count=200)
    # print(dados)  # Linha de depuração para inspecionar a estrutura dos dados retornados
    empresas = dados['most_actives']['quotes']
    
    # Criar um DataFrame a partir dos dados
    df = spark.createDataFrame(empresas)
    
    # Colunas para corresponder ao site
    colunas = [
        'symbol', 'shortName', 'displayName', 'regularMarketPrice', 'regularMarketChange', 
        'regularMarketChangePercent', 'regularMarketVolume', 'marketCap', 
        'fullExchangeName', 'quoteSourceName'
    ]
    df = df.select(*colunas)
    
    # Renomear colunas para português
    df = df.withColumnRenamed('symbol', 'ticker') \
           .withColumnRenamed('shortName', 'nome_curto') \
           .withColumnRenamed('displayName', 'nome_exibicao') \
           .withColumnRenamed('regularMarketPrice', 'preco_mercado_regular') \
           .withColumnRenamed('regularMarketChange', 'mudanca_mercado_regular') \
           .withColumnRenamed('regularMarketChangePercent', 'mudanca_percentual_mercado_regular') \
           .withColumnRenamed('regularMarketVolume', 'volume_mercado_regular') \
           .withColumnRenamed('marketCap', 'capitalizacao_mercado') \
           .withColumnRenamed('fullExchangeName', 'nome_exchange_completa') \
           .withColumnRenamed('quoteSourceName', 'nome_fonte_cotacao')
    
    # Adicionar coluna com data e hora atual
    df = df.withColumn('dt_ptcao', date_format(current_timestamp(), 'yyyy-MM-dd'))
    df = df.withColumn('dthr_igtao', current_timestamp())
    
    # Garantir que 'ticker' não tenha valores nulos
    df = df.withColumn('ticker', col('ticker').cast('string'))
    df = df.dropna(subset=['ticker'])

    # Ordenar por capitalizacaoMercado
    df = df.orderBy(col('capitalizacao_mercado').desc())
    
    return df

In [5]:
def obter_dados_historicos(symbols, start_date, end_date):
    dados = {}
    for symbol in symbols:
        ticker = yf.Ticker(symbol)
        historico = ticker.history(start=start_date, end=end_date, interval='1mo')
        dados[symbol] = historico
    
    return dados

In [6]:

from concurrent.futures import ThreadPoolExecutor, as_completed

def obter_dados_historicos_ticker(symbol, start_date, end_date):
    ticker = yf.Ticker(symbol)
    historico = ticker.history(start=start_date, end=end_date, interval='1mo')
    return (symbol, historico)

def obter_dados_historicos(symbols, start_date, end_date):
    dados = {}
    
    # Usar ThreadPoolExecutor para executar as solicitações em paralelo
    with ThreadPoolExecutor(max_workers=10) as executor:
        # Submit tarefas para o executor
        futuros = [executor.submit(obter_dados_historicos_ticker, symbol, start_date, end_date) for symbol in symbols]
        
        # Coletar os resultados conforme as tarefas são concluídas
        for futuro in as_completed(futuros):
            symbol, historico = futuro.result()
            dados[symbol] = historico
    
    return dados

In [7]:
def retorno_mensal(dados):
    # Inicializar uma lista vazia para armazenar dados estruturados
    dados_estruturados = []
    
    # Iterar sobre os dados históricos de cada símbolo
    for symbol, df in dados.items():
        # Converter DataFrame do Pandas para PySpark
        df['symbol'] = symbol

        df_spark = spark.createDataFrame(df.reset_index())
        
        
        # Renomear colunas para português
        df_spark = df_spark.withColumnRenamed('symbol', 'ticker') \
                        .withColumnRenamed('Date', 'data') \
                        .withColumnRenamed('Open', 'abertura') \
                        .withColumnRenamed('High', 'alta') \
                        .withColumnRenamed('Low', 'baixa') \
                        .withColumnRenamed('Close', 'fechamento') \
                        .withColumnRenamed('Volume', 'volume') \
                        .withColumnRenamed('Dividends', 'dividendos') \
                        .withColumnRenamed('Stock Splits', 'desdobramentos')

        janela = Window.partitionBy('ticker').orderBy('Data')


        # Calcular preço de fechamento do mês anterior (deslocar uma linha para cima)
        df_spark = df_spark.withColumn('fechamento_mes_anterior', lag('fechamento').over(janela))

        # Calcular Retorno em valor (diferença absoluta)
        df_spark = df_spark.withColumn('valor_retorno', col('fechamento') - col('fechamento_mes_anterior'))

        # Calcular Retorno em porcentagem
        df_spark = df_spark.withColumn('porcentagem_retorno', (col('valor_retorno') / col('fechamento_mes_anterior')) * 100)

        # Adicionar coluna com data atual no formato desejado
        df_spark = df_spark.withColumn('dt_ptcao', date_format(current_timestamp(), 'yyyy-MM-dd'))
        df_spark = df_spark.withColumn('dthr_igtao', current_timestamp())

        # Reordenar colunas
        df_spark = df_spark.select(
            'ticker',               # 'symbol' traduzido para 'ticker'
            'data',                 # 'date' traduzido para 'Data'
            'abertura',             # 'open' traduzido para 'Abertura'
            'alta',                 # 'high' traduzido para 'Alta'
            'baixa',                # 'low' traduzido para 'Baixa'
            'fechamento',           # 'close' traduzido para 'Fechamento'
            'volume',               # 'volume' mantido como 'Volume'
            'dividendos',           # 'dividends' traduzido para 'Dividendos'
            'desdobramentos',       # 'splits' traduzido para 'Desdobramentos'
            'fechamento_mes_anterior', # 'Close_Last_Month' traduzido para 'Fechamento_Mes_Anterior'
            'valor_retorno',        # 'Return_Value' traduzido para 'Valor_Retorno'
            'porcentagem_retorno',  # 'Return_Percentage' traduzido para 'Porcentagem_Retorno'
            'dt_ptcao',             # 'dt_ptcao' mantido como está
            'dthr_igtao'            # 'DTHR_IGTAO' mantido como está
        )
        
        # Adicionar o DataFrame à lista de dados estruturados
        dados_estruturados.append(df_spark)

    # Unir todos os DataFrames em um único DataFrame
    df_final = dados_estruturados[0]
    for df in dados_estruturados[1:]:
        df_final = df_final.union(df)
    
    return df_final

# Tabela fato -  maiores empresas segundo a api do yahoo finance

In [8]:
df_ativas = obter_empresas_ativas()

df_ativas.show()
df_ativas.printSchema()

24/10/25 21:39:57 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


[Stage 0:>                                                        (0 + 24) / 24][Stage 0:==>                                                      (1 + 23) / 24]

                                                                                

+------+--------------------+--------------------+---------------------+-----------------------+----------------------------------+----------------------+---------------------+----------------------+--------------------+----------+--------------------+
|ticker|          nome_curto|       nome_exibicao|preco_mercado_regular|mudanca_mercado_regular|mudanca_percentual_mercado_regular|volume_mercado_regular|capitalizacao_mercado|nome_exchange_completa|  nome_fonte_cotacao|  dt_ptcao|          dthr_igtao|
+------+--------------------+--------------------+---------------------+-----------------------+----------------------------------+----------------------+---------------------+----------------------+--------------------+----------+--------------------+
|  AAPL|          Apple Inc.|               Apple|               231.41|             0.83999634|                        0.36431292|              37918358|        3521296728064|              NasdaqGS|Nasdaq Real Time ...|2024-10-25|2024-10-25

# Dimensão - Retornos mensais 10 anos

In [9]:
# 10 anos passados
start_date = datetime.today() - timedelta(days=10*365)

# hoje
end_date = datetime.today()

# 'YYYY-MM-DD'
start_date_str = start_date.strftime('%Y-%m-%d')
end_date_str = end_date.strftime('%Y-%m-%d')

print(f"start_date: {start_date_str}")
print(f"end_date: {end_date_str}")


start_date: 2014-10-28
end_date: 2024-10-25


In [10]:
# Selecionar a coluna 'symbol' e coletar os valores como uma lista
#100 maiores para dimensão das 100 maiores e retornos
symbol_list = df_ativas.select('ticker').rdd.flatMap(lambda x: x).collect()

# Converter a lista para uma tupla
df_tickers = tuple(symbol_list)

print(df_tickers)


('AAPL', 'NVDA', 'MSFT', 'GOOGL', 'GOOG', 'AMZN', 'META', 'TSM', 'TSLA', 'AVGO', 'WMT', 'JPM', 'XOM', 'JNJ', 'BAC', 'KO', 'MRK', 'AMD', 'BABA', 'CSCO', 'WFC', 'IBM', 'MCD', 'ABT', 'TXN', 'MS', 'VZ', 'DIS', 'PDD', 'UBER', 'PFE', 'CMCSA', 'T', 'BSX', 'MU', 'NKE', 'C', 'SBUX', 'LRCX', 'PLTR', 'INTC', 'BA', 'INFY', 'PBR', 'PBR-A', 'APH', 'PYPL', 'CL', 'CARR', 'CRWD', 'CVS', 'MRVL', 'NU', 'FCX', 'NEM', 'CSX', 'WMB', 'JD', 'COF', 'DLR', 'SLB', 'GM', 'TFC', 'ET', 'ITUB', 'KMI', 'KDP', 'COIN', 'OXY', 'VALE', 'BCS', 'LYG', 'KVUE', 'F', 'MSTR', 'KHC', 'VRT', 'EW', 'LVS', 'XEL', 'ABEV', 'GOLD', 'DOW', 'DAL', 'CNC', 'OWL', 'LI', 'ON', 'DXCM', 'FITB', 'CCL', 'ERIC', 'SMCI', 'BEKE', 'NOK', 'DVN', 'BBD', 'DECK', 'HPE', 'HAL', 'UAL', 'WDC', 'HOOD', 'HBAN', 'ASX', 'EQT', 'RF', 'TEVA', 'UMC', 'LUV', 'WBD', 'DKNG', 'CTRA', 'SNAP', 'KEY', 'GRAB', 'ONON', 'AMCR', 'DOC', 'AVTR', 'CNH', 'KGC', 'AES', 'TPR', 'SOFI', 'NIO', 'XPEV', 'ALAB', 'PR', 'RIVN', 'AUR', 'GME', 'BILI', 'FHN', 'SKX', 'VKTX', 'CX', 'AAL', 

In [11]:
historical_data = obter_dados_historicos(df_tickers, start_date_str, end_date_str)
df_retorno_mensal = retorno_mensal(historical_data)

# Mostrar o DataFrame final
df_retorno_mensal.show()



[Stage 13:> (4 + 20) / 24][Stage 14:>  (0 + 8) / 24][Stage 15:>  (0 + 0) / 24]

[Stage 16:> (9 + 15) / 24][Stage 17:>  (0 + 9) / 24][Stage 18:>  (0 + 0) / 24][Stage 18:>(12 + 12) / 24][Stage 19:> (0 + 12) / 24][Stage 20:>  (0 + 0) / 24]

[Stage 20:=>(22 + 2) / 24][Stage 21:> (5 + 19) / 24][Stage 22:>  (0 + 5) / 24]

[Stage 23:=>(21 + 3) / 24][Stage 24:> (0 + 21) / 24][Stage 25:>  (0 + 0) / 24][Stage 25:=>(15 + 9) / 24][Stage 26:> (4 + 19) / 24][Stage 27:>  (0 + 0) / 24]

[Stage 28:>(14 + 10) / 24][Stage 29:> (0 + 13) / 24][Stage 30:>  (0 + 0) / 24]

[Stage 30:=>(17 + 7) / 24][Stage 31:> (0 + 18) / 24][Stage 32:>  (0 + 0) / 24]

[Stage 33:=>(19 + 5) / 24][Stage 34:> (0 + 19) / 24][Stage 35:>  (0 + 0) / 24]

[Stage 35:=>(21 + 3) / 24][Stage 36:> (1 + 23) / 24][Stage 37:>  (0 + 5) / 24][Stage 38:=>(21 + 3) / 24][Stage 39:> (1 + 23) / 24][Stage 40:>  (0 + 0) / 24]

[Stage 42:> (8 + 16) / 24][Stage 43:>  (0 + 8) / 24][Stage 44:>  (0 + 0) / 24]

[Stage 44:=>(18 + 6) / 24][Stage 45:> (0 + 18) / 24][Stage 46:>  (0 + 0) / 24][Stage 47:=>(22 + 2) / 24][Stage 48:> (0 + 22) / 24][Stage 49:>  (0 + 0) / 24]

[Stage 51:>(13 + 11) / 24][Stage 52:> (0 + 13) / 24][Stage 53:>  (0 + 0) / 24][Stage 54:=>(21 + 3) / 24][Stage 55:> (0 + 21) / 24][Stage 56:>  (0 + 0) / 24]

[Stage 58:>(11 + 13) / 24][Stage 59:> (0 + 11) / 24][Stage 60:>  (0 + 0) / 24][Stage 62:> (5 + 19) / 24][Stage 63:>  (0 + 5) / 24][Stage 64:>  (0 + 0) / 24]

[Stage 65:>(11 + 13) / 24][Stage 66:> (0 + 11) / 24][Stage 67:>  (0 + 0) / 24][Stage 69:> (1 + 23) / 24][Stage 70:>  (0 + 1) / 24][Stage 71:>  (0 + 0) / 24]

[Stage 72:> (5 + 19) / 24][Stage 73:>  (0 + 6) / 24][Stage 74:>  (0 + 0) / 24][Stage 75:=>(15 + 9) / 24][Stage 76:> (0 + 17) / 24][Stage 77:>  (0 + 0) / 24]

[Stage 78:=>(22 + 2) / 24][Stage 79:> (4 + 20) / 24][Stage 80:>  (0 + 4) / 24][Stage 82:=>(18 + 6) / 24][Stage 83:> (0 + 18) / 24][Stage 84:>  (0 + 0) / 24]

[Stage 85:> (8 + 16) / 24][Stage 86:> (0 + 10) / 24][Stage 87:>  (0 + 0) / 24][Stage 88:>(12 + 12) / 24][Stage 89:> (0 + 12) / 24][Stage 90:>  (0 + 0) / 24]

[Stage 92:> (0 + 24) / 24][Stage 93:>  (0 + 0) / 24][Stage 94:>  (0 + 0) / 24][Stage 95:>(12 + 12) / 24][Stage 96:> (0 + 12) / 24][Stage 97:>  (0 + 0) / 24]

[Stage 98:=>(19 + 5) / 24][Stage 99:> (0 + 19) / 24][Stage 100:> (0 + 0) / 24][Stage 101:(12 + 12) / 24][Stage 102:>(0 + 19) / 24][Stage 103:> (0 + 0) / 24]

[Stage 105:>(3 + 21) / 24][Stage 106:> (0 + 3) / 24][Stage 107:> (0 + 0) / 24][Stage 108:>(15 + 9) / 24][Stage 109:>(0 + 15) / 24][Stage 110:> (0 + 0) / 24]

[Stage 111:>(20 + 4) / 24][Stage 112:>(5 + 19) / 24][Stage 113:> (0 + 2) / 24][Stage 115:>(3 + 21) / 24][Stage 116:> (0 + 3) / 24][Stage 117:> (0 + 0) / 24]

[Stage 118:(13 + 11) / 24][Stage 119:>(0 + 13) / 24][Stage 120:> (0 + 0) / 24][Stage 121:>(22 + 2) / 24][Stage 122:>(6 + 18) / 24][Stage 123:> (0 + 4) / 24]

[Stage 125:(14 + 10) / 24][Stage 126:>(0 + 14) / 24][Stage 127:> (0 + 0) / 24][Stage 129:>(3 + 21) / 24][Stage 130:> (0 + 3) / 24][Stage 131:> (0 + 0) / 24]

[Stage 133:>(3 + 21) / 24][Stage 134:> (0 + 3) / 24][Stage 135:> (0 + 0) / 24][Stage 137:>(0 + 24) / 24][Stage 138:> (0 + 0) / 24][Stage 139:> (0 + 0) / 24]

[Stage 140:>(20 + 4) / 24][Stage 141:>(0 + 20) / 24][Stage 142:> (0 + 0) / 24][Stage 144:>(20 + 4) / 24][Stage 145:>(0 + 20) / 24][Stage 146:> (0 + 0) / 24]

[Stage 148:>(17 + 7) / 24][Stage 149:>(0 + 17) / 24][Stage 150:> (0 + 0) / 24][Stage 152:>(17 + 7) / 24][Stage 153:>(0 + 19) / 24][Stage 154:> (0 + 0) / 24]

[Stage 156:(13 + 11) / 24][Stage 157:>(0 + 14) / 24][Stage 158:> (0 + 0) / 24][Stage 160:(13 + 11) / 24][Stage 161:>(0 + 15) / 24][Stage 162:> (0 + 0) / 24]

[Stage 164:>(9 + 15) / 24][Stage 165:>(0 + 12) / 24][Stage 166:> (0 + 0) / 24][Stage 168:(12 + 12) / 24][Stage 169:>(0 + 12) / 24][Stage 170:> (0 + 0) / 24]

[Stage 172:>(9 + 15) / 24][Stage 173:>(0 + 10) / 24][Stage 174:> (0 + 0) / 24][Stage 176:>(9 + 15) / 24][Stage 177:> (0 + 9) / 24][Stage 178:> (0 + 0) / 24]

[Stage 180:>(0 + 24) / 24][Stage 181:> (0 + 2) / 24][Stage 182:> (0 + 0) / 24][Stage 183:(12 + 12) / 24][Stage 184:>(0 + 13) / 24][Stage 185:> (0 + 0) / 24]

[Stage 187:>(4 + 20) / 24][Stage 188:> (0 + 4) / 24][Stage 189:> (0 + 0) / 24][Stage 190:>(19 + 5) / 24][Stage 191:>(1 + 19) / 24][Stage 192:> (0 + 0) / 24]

[Stage 193:>(18 + 6) / 24][Stage 194:>(2 + 18) / 24][Stage 195:> (0 + 0) / 24][Stage 197:>(15 + 9) / 24][Stage 198:>(0 + 16) / 24][Stage 199:> (0 + 0) / 24]



+------+-------------------+------------------+------------------+------------------+------------------+----------+----------+--------------+-----------------------+--------------------+--------------------+----------+--------------------+
|ticker|               data|          abertura|              alta|             baixa|        fechamento|    volume|dividendos|desdobramentos|fechamento_mes_anterior|       valor_retorno| porcentagem_retorno|  dt_ptcao|          dthr_igtao|
+------+-------------------+------------------+------------------+------------------+------------------+----------+----------+--------------+-----------------------+--------------------+--------------------+----------+--------------------+
|  TSLA|2014-11-01 02:00:00|16.200000762939453|17.332666397094727|15.233332633972168|16.301332473754883|1615827000|       0.0|           0.0|                   NULL|                NULL|                NULL|2024-10-25|2024-10-25 18:40:...|
|  TSLA|2014-12-01 03:00:00|16.077333450

24/10/25 21:40:37 WARN DAGScheduler: Broadcasting large task binary with size 3.8 MiB
                                                                                

# dimensões gerais 

2. **Criação dos DataFrames**:
   - **Tabela Geral** (`df_financeira`): Contém informações gerais da empresa, como setor, indústria, número de empregados, localização e resumo das atividades.
   - **Tabela Financeira** (`df_financeira`): Contém dados financeiros da empresa, como capitalização de mercado, receita, lucro líquido, EBITDA, dívida total, entre outros.
   - **Tabela de Mercado** (`df_mercado`): Inclui dados relacionados ao mercado, como preço atual, preço de abertura, volume de negociação, beta, entre outros.
   - **Tabela de Dividendos** (`df_dividendos`): Contém informações sobre dividendos, incluindo taxa de dividendos, data ex-dividendo e índice de distribuição.
   - **Tabela de Valuation** (`df_valuation`): Inclui dados de valuation da empresa, como índices P/E (Price to Earnings), P/B (Price to Book) e PEG (Price/Earnings to Growth).
   - **Tabela de Retorno Mensal** (`df_retorno_mensal`): Retorno mensal da ação com base em preço da ação, dividendos e percentual

In [12]:
from pyspark.sql.functions import from_unixtime, col

def criar_tabelas_spark(tickers):
    if isinstance(tickers, str):
        tickers = (tickers,)
    
    # Esquema para a tabela geral
    schema_geral = StructType([
        StructField('ticker', StringType(), False),
        StructField('setor', StringType(), True),
        StructField('industria', StringType(), True),
        StructField('funcionarios', IntegerType(), True),
        StructField('cidade', StringType(), True),
        StructField('estado', StringType(), True),
        StructField('pais', StringType(), True),
        StructField('website', StringType(), True),
        StructField('resumo_negocios', StringType(), True),
        StructField('exchange', StringType(), True)
    ])
    
    # Esquema para a tabela financeira
    schema_financeira = StructType([
        StructField('ticker', StringType(), False),
        StructField('capitalizacao_mercado', LongType(), True),
        StructField('valor_empresa', LongType(), True),
        StructField('receita', LongType(), True),
        StructField('lucros_brutos', LongType(), True),
        StructField('lucro_liquido', LongType(), True),
        StructField('ebitda', LongType(), True),
        StructField('divida_total', LongType(), True),
        StructField('caixa_total', LongType(), True),
        StructField('dividend_yield', DoubleType(), True)
    ])
    
    # Esquema para a tabela de mercado
    schema_mercado = StructType([
        StructField('ticker', StringType(), False),
        StructField('preco_atual', DoubleType(), True),
        StructField('fechamento_anterior', DoubleType(), True),
        StructField('abertura', DoubleType(), True),
        StructField('minimo_dia', DoubleType(), True),
        StructField('maximo_dia', DoubleType(), True),
        StructField('minimo_52_semanas', DoubleType(), True),
        StructField('maximo_52_semanas', DoubleType(), True),
        StructField('volume', LongType(), True),
        StructField('volume_medio', LongType(), True),
        StructField('beta', DoubleType(), True)
    ])
    
    # Esquema para a tabela de dividendos
    schema_dividendos = StructType([
        StructField('ticker', StringType(), False),
        StructField('taxa_dividendo', DoubleType(), True),
        StructField('data_exdividendo', StringType(), True),  # Temporariamente como StringType
        StructField('indice_distribuicao', DoubleType(), True)
    ])
    
    # Esquema para a tabela de valuation
    schema_valuation = StructType([
        StructField('ticker', StringType(), False),
        StructField('pl_futuro', DoubleType(), True),
        StructField('pl_retroativo', DoubleType(), True),
        StructField('preco_booking', DoubleType(), True),
        StructField('indice_preco_lucro_cresc', DoubleType(), True)
    ])
    
    # Inicializa as listas de dicionários para cada tabela
    geral = []
    financeira = []
    mercado = []
    dividendos = []
    valuation = []
    
    for ticker in tickers:
        try:
            empresa = yf.Ticker(ticker)
            info = empresa.info
            
            # Filtra e trata valores infinitos
            def safe_get(key, default=None):
                value = info.get(key)
                if isinstance(value, str) and value in ('Infinity', '-Infinity'):
                    return default
                return value
            
            # Preencher dados da tabela geral
            geral.append({
                'ticker': ticker,
                'setor': info.get('sector'),
                'industria': info.get('industry'),
                'funcionarios': info.get('fullTimeEmployees'),
                'cidade': info.get('city'),
                'estado': info.get('state'),
                'pais': info.get('country'),
                'website': info.get('website'),
                'resumo_negocios': info.get('longBusinessSummary'),
                'exchange': info.get('exchange')
            })
            
            # Preencher dados da tabela financeira
            financeira.append({
                'ticker': ticker,
                'capitalizacao_mercado': safe_get('marketCap', 0),
                'valor_empresa': safe_get('enterpriseValue', 0),
                'receita': safe_get('revenue', 0),
                'lucros_brutos': safe_get('grossProfits', 0),
                'lucro_liquido': safe_get('netIncome', 0),
                'ebitda': safe_get('ebitda', 0),
                'divida_total': safe_get('totalDebt', 0),
                'caixa_total': safe_get('totalCash', 0),
                'dividend_yield': safe_get('dividendYield', 0.0)
            })
            
            # Preencher dados da tabela de mercado
            mercado.append({
                'ticker': ticker,
                'preco_atual': safe_get('currentPrice', 0.0),
                'fechamento_anterior': safe_get('previousClose', 0.0),
                'abertura': safe_get('open', 0.0),
                'minimo_dia': safe_get('dayLow', 0.0),
                'maximo_dia': safe_get('dayHigh', 0.0),
                'minimo_52_semanas': safe_get('fiftyTwoWeekLow', 0.0),
                'maximo_52_semanas': safe_get('fiftyTwoWeekHigh', 0.0),
                'volume': safe_get('volume', 0),
                'volume_medio': safe_get('averageVolume', 0),
                'beta': safe_get('beta', 0.0)
            })
            
            # Preencher dados da tabela de dividendos
            dividendos.append({
                'ticker': ticker,
                'taxa_dividendo': safe_get('dividendRate', 0.0),
                'data_exdividendo': safe_get('exDividendDate'),  # Unix timestamp
                'indice_distribuicao': safe_get('payoutRatio', 0.0)
            })
            
            # Preencher dados da tabela de valuation
            valuation.append({
                'ticker': ticker,
                'pl_futuro': safe_get('forwardPE', 0.0),
                'pl_retroativo': safe_get('trailingPE', 0.0),
                'preco_booking': safe_get('priceToBook', 0.0),
                'indice_preco_lucro_cresc': safe_get('pegRatio', 0.0)
            })
            
        except Exception as e:
            print(f"Erro ao processar o ticker {ticker}: {e}")
    
    # Criar DataFrames Spark com esquema definido
    df_geral = spark.createDataFrame(geral, schema=schema_geral)
    df_financeira = spark.createDataFrame(financeira, schema=schema_financeira)
    df_mercado = spark.createDataFrame(mercado, schema=schema_mercado)
    df_dividendos = spark.createDataFrame(dividendos, schema=schema_dividendos)
    df_valuation = spark.createDataFrame(valuation, schema=schema_valuation)

    # Converter Unix timestamp para data legível
    df_dividendos = df_dividendos.withColumn('data_exdividendo', from_unixtime(col('data_exdividendo').cast('bigint')))
    
    # Adicionar colunas de timestamp
    df_geral = df_geral.withColumn('dt_ptcao', date_format(current_timestamp(), 'yyyy-MM-dd'))
    df_geral = df_geral.withColumn('dthr_igtao', current_timestamp())

    df_financeira = df_financeira.withColumn('dt_ptcao', date_format(current_timestamp(), 'yyyy-MM-dd'))
    df_financeira = df_financeira.withColumn('dthr_igtao', current_timestamp())

    df_mercado = df_mercado.withColumn('dt_ptcao', date_format(current_timestamp(), 'yyyy-MM-dd'))
    df_mercado = df_mercado.withColumn('dthr_igtao', current_timestamp())

    df_dividendos = df_dividendos.withColumn('dt_ptcao', date_format(current_timestamp(), 'yyyy-MM-dd'))
    df_dividendos = df_dividendos.withColumn('dthr_igtao', current_timestamp())

    df_valuation = df_valuation.withColumn('dt_ptcao', date_format(current_timestamp(), 'yyyy-MM-dd'))
    df_valuation = df_valuation.withColumn('dthr_igtao', current_timestamp())
    
    return df_geral, df_financeira, df_mercado, df_dividendos, df_valuation


In [13]:
df_geral, df_financeira, df_mercado, df_dividendos, df_valuation = criar_tabelas_spark(df_tickers)
# Exibir os DataFrames
df_geral.show()
df_financeira.show()
df_mercado.show()
df_dividendos.show()
df_valuation.show()

+------+--------------------+--------------------+------------+-------------+------+-------------+--------------------+--------------------+--------+----------+--------------------+
|ticker|               setor|           industria|funcionarios|       cidade|estado|         pais|             website|     resumo_negocios|exchange|  dt_ptcao|          dthr_igtao|
+------+--------------------+--------------------+------------+-------------+------+-------------+--------------------+--------------------+--------+----------+--------------------+
|  AAPL|          Technology|Consumer Electronics|      161000|    Cupertino|    CA|United States|https://www.apple...|Apple Inc. design...|     NMS|2024-10-25|2024-10-25 18:41:...|
|  NVDA|          Technology|      Semiconductors|       29600|  Santa Clara|    CA|United States|https://www.nvidi...|NVIDIA Corporatio...|     NMS|2024-10-25|2024-10-25 18:41:...|
|  MSFT|          Technology|Software - Infras...|      228000|      Redmond|    WA|United

+------+-----------+-------------------+--------+----------+----------+-----------------+-----------------+---------+------------+-----+----------+--------------------+
|ticker|preco_atual|fechamento_anterior|abertura|minimo_dia|maximo_dia|minimo_52_semanas|maximo_52_semanas|   volume|volume_medio| beta|  dt_ptcao|          dthr_igtao|
+------+-----------+-------------------+--------+----------+----------+-----------------+-----------------+---------+------------+-----+----------+--------------------+
|  AAPL|     231.41|             230.57|  229.74|    229.57|     233.2|           164.08|           237.49| 37918358|    50521101|1.239|2024-10-25|2024-10-25 18:41:...|
|  NVDA|     141.54|             140.41|  140.94|     140.8|    144.06|            39.23|           144.42|203385650|   320637379|1.669|2024-10-25|2024-10-25 18:41:...|
|  MSFT|     428.15|             424.73|   426.9|     426.6|     432.5|            328.4|           468.35| 17007704|    19542334|0.896|2024-10-25|2024-10-

+------+---------+-------------+-------------+------------------------+----------+--------------------+
|ticker|pl_futuro|pl_retroativo|preco_booking|indice_preco_lucro_cresc|  dt_ptcao|          dthr_igtao|
+------+---------+-------------+-------------+------------------------+----------+--------------------+
|  AAPL|30.978582|     35.22222|     52.80922|                    3.11|2024-10-25|2024-10-25 18:41:...|
|  NVDA| 34.86207|      66.4507|    59.771957|                    0.86|2024-10-25|2024-10-25 18:41:...|
|  MSFT|28.149244|    36.253174|    11.855185|                    2.17|2024-10-25|2024-10-25 18:41:...|
| GOOGL|18.996553|     23.74569|    6.7711406|                    1.03|2024-10-25|2024-10-25 18:41:...|
|  GOOG|19.216343|     23.92407|    6.8416095|                    1.04|2024-10-25|2024-10-25 18:41:...|
|  AMZN| 32.49654|    44.828163|     8.333185|                    1.24|2024-10-25|2024-10-25 18:41:...|
|  META|  23.4554|    29.352278|       9.2627|                  

In [14]:
df_retorno_mensal.printSchema()
df_ativas.printSchema()
df_geral.printSchema()
df_financeira.printSchema()
df_mercado.printSchema()
df_dividendos.printSchema()
df_valuation.printSchema()


root
 |-- ticker: string (nullable = true)
 |-- data: timestamp (nullable = true)
 |-- abertura: double (nullable = true)
 |-- alta: double (nullable = true)
 |-- baixa: double (nullable = true)
 |-- fechamento: double (nullable = true)
 |-- volume: long (nullable = true)
 |-- dividendos: double (nullable = true)
 |-- desdobramentos: double (nullable = true)
 |-- fechamento_mes_anterior: double (nullable = true)
 |-- valor_retorno: double (nullable = true)
 |-- porcentagem_retorno: double (nullable = true)
 |-- dt_ptcao: string (nullable = false)
 |-- dthr_igtao: timestamp (nullable = false)

root
 |-- ticker: string (nullable = true)
 |-- nome_curto: string (nullable = true)
 |-- nome_exibicao: string (nullable = true)
 |-- preco_mercado_regular: double (nullable = true)
 |-- mudanca_mercado_regular: double (nullable = true)
 |-- mudanca_percentual_mercado_regular: double (nullable = true)
 |-- volume_mercado_regular: long (nullable = true)
 |-- capitalizacao_mercado: long (nullable =

# Sprint 3 - Indices

In [15]:
indices = {
    '^BVSP': 'Ibovespa',
    'BOVA11.SA': 'BOVA11',
    '^GSPC': 'S&P 500',
    '^DJI': 'Dow Jones',
    '^IXIC': 'NASDAQ',
    '^FTSE': 'FTSE 100',
    '^GDAXI': 'DAX 30',
    '^N225': 'Nikkei 225'
}

# Definindo o esquema do DataFrame
schema = StructType([
    StructField("indice", StringType(), True),
    StructField("abertura", DoubleType(), True),
    StructField("alta", DoubleType(), True),
    StructField("baixa", DoubleType(), True),
    StructField("fechamento", DoubleType(), True),
    StructField("fechamento_ajustado", DoubleType(), True),
    StructField("volume", LongType(), True),  # Alterado para LongType
    StructField("nome_indice", StringType(), True),
    StructField("data", DateType(), True)
])

# Criar uma lista para armazenar os dados
data_list = []

# Adicionando dados históricos (exemplo)
for ticker, name in indices.items():
    try:
        data = yf.download(ticker, start=start_date_str, end=end_date_str)
        if not data.empty:
            for index, row in data.iterrows():
                # Converter os valores para float ou inteiro antes de adicionar à lista
                data_list.append((ticker, float(row['Open']), float(row['High']), float(row['Low']), 
                                  float(row['Close']), float(row['Adj Close']), int(row['Volume']),  # Converte para inteiro
                                  name, index.date()))  # Usa index.date() para pegar apenas a parte da data
    except Exception as e:
        print(f"Falha ao baixar dados para {ticker}: {e}")

# Criar um DataFrame Spark
df_indices = spark.createDataFrame(data_list, schema)

janela = Window.partitionBy('indice').orderBy('data')

# Calcular preço de fechamento do mês anterior (deslocar uma linha para cima)
df_indices = df_indices.withColumn('fechamento_mes_anterior', lag('fechamento').over(janela))

# Calcular Retorno em valor (diferença absoluta)
df_indices = df_indices.withColumn('valor_retorno', col('fechamento') - col('fechamento_mes_anterior'))

# Calcular Retorno em porcentagem
df_indices = df_indices.withColumn('porcentagem_retorno', (col('valor_retorno') / col('fechamento_mes_anterior')) * 100)

# Adicionar colunas dt_ptcao e dthr_igtao
df_indices = df_indices.withColumn('dt_ptcao', date_format(current_timestamp(), 'yyyy-MM-dd'))
df_indices = df_indices.withColumn('dthr_igtao', current_timestamp())

# Exibir o DataFrame final
df_indices = df_indices.orderBy('indice','data')
df_indices.show()

[*********************100%***********************]  1 of 1 completed




[*********************100%***********************]  1 of 1 completed




[*********************100%***********************]  1 of 1 completed




[*********************100%***********************]  1 of 1 completed




[*********************100%***********************]  1 of 1 completed




[*********************100%***********************]  1 of 1 completed




[*********************100%***********************]  1 of 1 completed




[*********************100%***********************]  1 of 1 completed




+---------+------------------+------------------+------------------+------------------+-------------------+-------+-----------+----------+-----------------------+--------------------+--------------------+----------+--------------------+
|   indice|          abertura|              alta|             baixa|        fechamento|fechamento_ajustado| volume|nome_indice|      data|fechamento_mes_anterior|       valor_retorno| porcentagem_retorno|  dt_ptcao|          dthr_igtao|
+---------+------------------+------------------+------------------+------------------+-------------------+-------+-----------+----------+-----------------------+--------------------+--------------------+----------+--------------------+
|BOVA11.SA| 49.79999923706055|              51.0|49.790000915527344| 50.81999969482422|  50.81999969482422|3486240|     BOVA11|2014-10-28|                   NULL|                NULL|                NULL|2024-10-25|2024-10-25 18:41:...|
|BOVA11.SA| 50.97999954223633| 50.97999954223633| 49

# Postgres upsert

In [16]:
SRVNAME = "postgres"
USER = "airflow"
PASSWORD = "airflow"
HOST = "postgres"
PORT = "5432"
DBNAME = "paxpedb"

# Parâmetros de conexão usando as variáveis
conn_params = {
    'dbname': DBNAME,
    'user': USER,
    'password': PASSWORD,
    'host': HOST,
    'port': PORT
}

In [17]:
def test_connection():
    try:
        # Connect to the PostgreSQL server using variables
        connection = psycopg2.connect(
            dbname=SRVNAME,
            user=USER,
            password=PASSWORD,
            host=HOST,
            port=PORT
        )
        print("Conexão com postgres sucedida")
        return True
    except OperationalError as e:
        print(f"Error: {e}")
        return False
    finally:
        if connection:
            connection.close()
if not test_connection():
    sys.exit(1)

Conexão com postgres sucedida


### para inserir no postgres é necessário
> 1. alimentar as tabelas em stage --- temp_nometabela
> 2. realizar o upsert comparando as tabelas

In [18]:
def write_to_postgres(df, table_name, schema_name="paxpestg"):
  
    df.write \
        .format("jdbc") \
        .option("url", f"jdbc:postgresql://{HOST}:{PORT}/{DBNAME}") \
        .option("dbtable", f"{schema_name}.{table_name}") \
        .option("user", USER) \
        .option("password", PASSWORD) \
        .option("driver", "org.postgresql.Driver") \
        .mode("overwrite") \
        .save()

In [19]:
# escrita das tabelas em stage
write_to_postgres(df_retorno_mensal, "temp_retorno_mensal")
write_to_postgres(df_ativas, "temp_captacao_mercado")
write_to_postgres(df_geral, "temp_cadastro")
write_to_postgres(df_financeira, "temp_financas")
write_to_postgres(df_mercado, "temp_mercado")
write_to_postgres(df_dividendos, "temp_dividendos")
write_to_postgres(df_valuation, "temp_valuation")
write_to_postgres(df_indices, "temp_indices")

[Stage 427:(12 + 12) / 24][Stage 428:>(0 + 12) / 24][Stage 429:> (0 + 0) / 24][Stage 430:>(4 + 20) / 24][Stage 431:> (0 + 4) / 24][Stage 432:> (0 + 0) / 24]

[Stage 433:>(15 + 9) / 24][Stage 434:>(0 + 15) / 24][Stage 435:> (0 + 0) / 24]

[Stage 435:>(15 + 9) / 24][Stage 436:>(0 + 19) / 24][Stage 437:> (0 + 0) / 24][Stage 438:>(16 + 8) / 24][Stage 439:>(0 + 16) / 24][Stage 440:> (0 + 0) / 24]

[Stage 442:>(9 + 15) / 24][Stage 443:> (0 + 9) / 24][Stage 444:> (0 + 0) / 24][Stage 445:>(22 + 2) / 24][Stage 446:>(0 + 22) / 24][Stage 447:> (0 + 0) / 24]

[Stage 449:(13 + 11) / 24][Stage 450:>(0 + 13) / 24][Stage 451:> (0 + 0) / 24][Stage 451:>(18 + 6) / 24][Stage 452:>(4 + 18) / 24][Stage 453:> (0 + 0) / 24]

[Stage 454:>(18 + 6) / 24][Stage 455:>(0 + 18) / 24][Stage 456:> (0 + 0) / 24][Stage 458:(11 + 13) / 24][Stage 459:>(0 + 11) / 24][Stage 460:> (0 + 0) / 24]

[Stage 461:>(22 + 2) / 24][Stage 462:>(2 + 22) / 24][Stage 463:> (0 + 1) / 24][Stage 465:>(17 + 7) / 24][Stage 466:>(0 + 17) / 24][Stage 467:> (0 + 0) / 24]

[Stage 469:>(8 + 16) / 24][Stage 470:> (0 + 8) / 24][Stage 471:> (0 + 0) / 24][Stage 472:>(20 + 4) / 24][Stage 473:>(0 + 20) / 24][Stage 474:> (0 + 0) / 24]

[Stage 475:>(23 + 1) / 24][Stage 476:>(9 + 15) / 24][Stage 477:> (0 + 9) / 24][Stage 479:>(20 + 4) / 24][Stage 480:>(0 + 20) / 24][Stage 481:> (0 + 0) / 24]

[Stage 483:>(6 + 18) / 24][Stage 484:> (0 + 6) / 24][Stage 485:> (0 + 0) / 24][Stage 486:>(20 + 4) / 24][Stage 487:>(0 + 20) / 24][Stage 488:> (0 + 0) / 24]

[Stage 490:>(8 + 16) / 24][Stage 491:> (0 + 8) / 24][Stage 492:> (0 + 0) / 24][Stage 493:>(23 + 1) / 24][Stage 494:>(1 + 23) / 24][Stage 495:> (0 + 0) / 24]

[Stage 497:>(9 + 15) / 24][Stage 498:> (0 + 9) / 24][Stage 499:> (0 + 0) / 24][Stage 501:>(2 + 22) / 24][Stage 502:> (0 + 2) / 24][Stage 503:> (0 + 0) / 24]

[Stage 504:>(23 + 1) / 24][Stage 505:>(0 + 24) / 24][Stage 506:> (0 + 0) / 24][Stage 508:>(6 + 18) / 24][Stage 509:> (0 + 6) / 24][Stage 510:> (0 + 0) / 24]

[Stage 512:>(5 + 19) / 24][Stage 513:> (0 + 5) / 24][Stage 514:> (0 + 0) / 24][Stage 515:>(20 + 4) / 24][Stage 516:>(0 + 24) / 24][Stage 517:> (0 + 0) / 24]

[Stage 519:>(9 + 15) / 24][Stage 520:> (0 + 9) / 24][Stage 521:> (0 + 0) / 24][Stage 522:>(15 + 9) / 24][Stage 523:>(0 + 15) / 24][Stage 524:> (0 + 0) / 24]

[Stage 526:>(6 + 18) / 24][Stage 527:> (0 + 6) / 24][Stage 528:> (0 + 0) / 24][Stage 529:>(23 + 1) / 24][Stage 530:>(0 + 23) / 24][Stage 531:> (0 + 0) / 24]

[Stage 533:>(9 + 15) / 24][Stage 534:> (0 + 9) / 24][Stage 535:> (0 + 0) / 24][Stage 536:>(18 + 6) / 24][Stage 537:>(0 + 18) / 24][Stage 538:> (0 + 0) / 24]

[Stage 540:(10 + 14) / 24][Stage 541:>(0 + 12) / 24][Stage 542:> (0 + 0) / 24][Stage 543:>(20 + 4) / 24][Stage 544:>(0 + 20) / 24][Stage 545:> (0 + 0) / 24]

[Stage 547:>(15 + 9) / 24][Stage 548:>(0 + 15) / 24][Stage 549:> (0 + 0) / 24][Stage 550:>(22 + 2) / 24][Stage 551:>(0 + 22) / 24][Stage 552:> (0 + 0) / 24]

[Stage 554:>(19 + 5) / 24][Stage 555:>(0 + 19) / 24][Stage 556:> (0 + 0) / 24][Stage 558:>(1 + 23) / 24][Stage 559:> (0 + 1) / 24][Stage 560:> (0 + 0) / 24]

[Stage 561:>(16 + 8) / 24][Stage 562:>(0 + 16) / 24][Stage 563:> (0 + 0) / 24][Stage 565:>(7 + 17) / 24][Stage 566:> (0 + 7) / 24][Stage 567:> (0 + 0) / 24]

[Stage 568:>(21 + 3) / 24][Stage 569:>(0 + 21) / 24][Stage 570:> (0 + 0) / 24][Stage 572:>(21 + 3) / 24][Stage 573:>(0 + 21) / 24][Stage 574:> (0 + 0) / 24]

[Stage 576:>(2 + 22) / 24][Stage 577:> (0 + 2) / 24][Stage 578:> (0 + 0) / 24][Stage 579:(11 + 13) / 24][Stage 580:>(0 + 11) / 24][Stage 581:> (0 + 0) / 24]

[Stage 583:(10 + 14) / 24][Stage 584:>(0 + 10) / 24][Stage 585:> (0 + 0) / 24][Stage 586:>(19 + 5) / 24][Stage 587:>(0 + 19) / 24][Stage 588:> (0 + 0) / 24]

[Stage 590:>(18 + 6) / 24][Stage 591:>(0 + 18) / 24][Stage 592:> (0 + 0) / 24][Stage 593:>(23 + 1) / 24][Stage 594:>(0 + 23) / 24][Stage 595:> (0 + 0) / 24]

[Stage 597:(10 + 14) / 24][Stage 598:>(0 + 10) / 24][Stage 599:> (0 + 0) / 24][Stage 601:>(3 + 21) / 24][Stage 602:> (0 + 3) / 24][Stage 603:> (0 + 0) / 24]

[Stage 604:>(16 + 8) / 24][Stage 605:>(0 + 16) / 24][Stage 606:> (0 + 0) / 24][Stage 608:>(0 + 24) / 24][Stage 609:> (0 + 0) / 24][Stage 610:> (0 + 0) / 24]

[Stage 611:>(18 + 6) / 24][Stage 612:>(0 + 18) / 24][Stage 613:> (0 + 0) / 24][Stage 614:>(19 + 5) / 24][Stage 615:>(0 + 19) / 24][Stage 616:> (0 + 0) / 24]



24/10/25 21:41:55 WARN DAGScheduler: Broadcasting large task binary with size 3.8 MiB










                                                                                

In [20]:
def upsert_data(table_name, temp_table_name, key_columns):
    try:
        # Conectar ao PostgreSQL
        connection = psycopg2.connect(**conn_params)
        cursor = connection.cursor()
        
        # Definir o fuso horário da sessão para UTC-3
        cursor.execute("SET TIME ZONE 'America/Sao_Paulo';")

        # Definir esquemas
        schema_fact = "paxpe"
        schema_stg = "paxpestg"
        
        # Obter todas as colunas da tabela
        cursor.execute(f"""
        SELECT column_name
        FROM information_schema.columns
        WHERE table_schema = '{schema_fact}' AND table_name = '{table_name}'
        """)
        all_columns = [row[0] for row in cursor.fetchall()]

        # Identificar colunas não chave e colunas de referência
        non_key_columns = [col for col in all_columns if col not in key_columns and col not in ['dt_ptcao', 'dthr_igtao']]
        reference_columns = ['dthr_igtao']

        # Construir a declaração SQL de atualização col1, col2
        key_columns_str = ', '.join(key_columns)
        update_set_non_key = ', '.join([
            f"{col} = EXCLUDED.{col}" 
            for col in non_key_columns
        ])
        
        # Atualizar apenas as colunas não-chave se houver uma mudança
        where_condition = ' OR '.join([
            f"target.{col} IS DISTINCT FROM EXCLUDED.{col}" 
            for col in non_key_columns
        ])
        
        # Para atualizar as colunas de referência se houver uma atualização nas colunas não-chave
        update_set_reference = ', '.join([
            f"{col} = EXCLUDED.{col}" 
            for col in reference_columns
        ])
        
        sql = f"""
        INSERT INTO {schema_fact}.{table_name} 
        (SELECT * FROM {schema_stg}.{temp_table_name})
        ON CONFLICT ({key_columns_str}) 
        DO UPDATE SET
        {update_set_non_key},
        {update_set_reference}
        WHERE EXISTS (
            SELECT 1
            FROM {schema_fact}.{table_name} AS target
            WHERE { ' AND '.join([f"target.{key} = EXCLUDED.{key}" for key in key_columns]) }
            AND ({where_condition})
        );
        """
        
        # Executar a declaração SQL de upsert
        cursor.execute(sql)
        connection.commit()
        print(f"Upsert de {schema_stg}.{temp_table_name} realizado para {schema_fact}.{table_name}.")
    
    except Exception as e:
        print(f"Error: {e}")
    
    finally:
        if connection:
            cursor.close()
            connection.close()

In [21]:
# colunas chave para cada tabela
key_columns_retorno_mensal = ["ticker", "data"]
key_columns_cadastro = ["ticker"]
key_columns_cap_mercado = ["ticker"]
key_columns_financas = ["ticker"]
key_columns_mercado = ["ticker"]
key_columns_dividendos = ["ticker","data_exdividendo"]
key_columns_valuation = ["ticker"]
key_columns_indices = ["indice","data"]

In [22]:
# realiza o upsert
upsert_data("retorno_mensal", "temp_retorno_mensal", key_columns_retorno_mensal)
upsert_data("cadastro", "temp_cadastro", key_columns_cadastro)
upsert_data("captacao_mercado", "temp_captacao_mercado", key_columns_cap_mercado)
upsert_data("financas", "temp_financas", key_columns_financas)
upsert_data("mercado", "temp_mercado", key_columns_mercado)
upsert_data("dividendos", "temp_dividendos", key_columns_dividendos)
upsert_data("valuation", "temp_valuation", key_columns_valuation)
upsert_data("indices", "temp_indices", key_columns_indices)

Upsert de paxpestg.temp_retorno_mensal realizado para paxpe.retorno_mensal.
Upsert de paxpestg.temp_cadastro realizado para paxpe.cadastro.
Upsert de paxpestg.temp_captacao_mercado realizado para paxpe.captacao_mercado.
Upsert de paxpestg.temp_financas realizado para paxpe.financas.
Upsert de paxpestg.temp_mercado realizado para paxpe.mercado.
Upsert de paxpestg.temp_dividendos realizado para paxpe.dividendos.
Upsert de paxpestg.temp_valuation realizado para paxpe.valuation.
Upsert de paxpestg.temp_indices realizado para paxpe.indices.
