In [0]:
import requests
import pandas as pd
import datetime
import os

In [0]:
def extrair_dados_bitcoin():
    """Extrair o json completo da API da coinbase."""
    url = f'https://api.coinbase.com/v2/prices/spot'
    resposta = requests.get(url)
    return resposta.json()


In [0]:
dados_bitcoin = extrair_dados_bitcoin()
print(dados_bitcoin)

{'data': {'amount': '76658.055', 'base': 'BTC', 'currency': 'USD'}}


In [0]:
def extrair_cotacao_usd_brl():
    """Extrair o json completo da API da coinbase."""
    api_key = os.getenv("API_KEY")

    url = url = f'https://api.currencyfreaks.com/v2.0/rates/latest?apikey={api_key}'

    resposta = requests.get(url)
    return resposta.json()




In [0]:
dados_cotacao = extrair_cotacao_usd_brl()
print(dados_cotacao['rates']['BRL'])



5.23915


In [0]:
def tratar_dados_bitcoin(dados_json, taxa_usd_brl):
    """Transforma os dados brutos da API, renomeia colunas, adiciona timestamp e converte para BRL."""
    
    valor_usd = float(dados_json['data']['amount'])
    criptomoeda = dados_json['data']['base']
    moeda_original = dados_json['data']['currency']

    # convertendo de USD para BRL (seguro)
    if taxa_usd_brl is None:
        valor_brl = None
    else:
        valor_brl = valor_usd * taxa_usd_brl

    # adicionando timestamp como datetime object 
    timestamp = datetime.datetime.now()

    dados_tratados = [{
        'valor_usd': valor_usd,
        'valor_brl': valor_brl,
        'criptomoeda': criptomoeda,
        'moeda_original': moeda_original,
        'taxa_conversao_usd_brl': taxa_usd_brl,
        'timestamp': timestamp
    }]

    return dados_tratados




In [0]:
#extraindo dados 
dados_bitcoin = extrair_dados_bitcoin()
dados_cotacao = extrair_cotacao_usd_brl()

# extraindo a taa de conversao USD-BRL
taxa_usd_brl = float(dados_cotacao['rates']['BRL'])

#tratando os dados
dados_bitcoin_tratado = tratar_dados_bitcoin(dados_bitcoin, taxa_usd_brl)
print(dados_bitcoin_tratado)




[{'valor_usd': 76658.055, 'valor_brl': 401623.04885325, 'criptomoeda': 'BTC', 'moeda_original': 'USD', 'taxa_conversao_usd_brl': 5.23915, 'timestamp': datetime.datetime(2026, 2, 4, 1, 36, 24, 893085)}]


In [0]:
%sql
CREATE CATALOG IF NOT EXISTS pipeline_api_bitcoin
COMMENT 'Catalogo de demonstraçao criado para workshop de pipeline_api_bitcoin';

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS pipeline_api_bitcoin.lakehouse
COMMENT 'Schema Lakehouse para salvar dados ';

In [0]:
%sql
CREATE VOLUME IF NOT EXISTS pipeline_api_bitcoin.lakehouse.raw_files
COMMENT 'VOLUME PARA ARQUIVOS BRUTOS DE INGESTÃO INICIAL'

In [0]:
# CRIANDO DATAFRAME PANDASA PARTIR DOS DADOS TRATADOS
df_pandas = pd.DataFrame(dados_bitcoin_tratado)

#mostrar os dados
print(df_pandas)

   valor_usd      valor_brl  ... taxa_conversao_usd_brl                  timestamp
0  76658.055  401623.048853  ...                5.23915 2026-02-04 01:36:24.893085

[1 rows x 6 columns]


In [0]:
print(type(df_pandas))


<class 'pandas.core.frame.DataFrame'>


In [0]:
# pega o timestamp do proprio evento 
event_ts = dados_bitcoin_tratado[0]['timestamp']

# converte para formato seuro para nome de arquivo
ts = event_ts.strftime('%Y-%m-%d_%H-%M-%S')

# caminho do json
json_file = f'/Volumes/pipeline_api_bitcoin/lakehouse/raw_files/bitcoin_{ts}.json'

#salvar como json usando pandas
df_pandas.to_json(json_file, orient='records', date_format='iso', indent=2)
print(f'Arquivo gravado em {json_file}')


Arquivo gravado em /Volumes/pipeline_api_bitcoin/lakehouse/raw_files/bitcoin_2026-02-04_01-36-24.json


In [0]:

# pega o timestamp do proprio evento 
event_ts = dados_bitcoin_tratado[0]['timestamp']

# converte para formato seguro para nome de arquivo
ts = event_ts.strftime('%Y-%m-%d_%H-%M-%S')

# caminho do CSV
csv_file = f'/Volumes/pipeline_api_bitcoin/lakehouse/raw_files/bitcoin_{ts}.csv'
df_pandas.to_csv(csv_file, index=False)
print(f'Arquivo gravado em {csv_file}')

# caminho do Parquet
parquet_file = f'/Volumes/pipeline_api_bitcoin/lakehouse/raw_files/bitcoin_{ts}.parquet'
df_pandas.to_parquet(parquet_file, engine='pyarrow', index=False)
print(f'Arquivo gravado em {parquet_file}')


Arquivo gravado em /Volumes/pipeline_api_bitcoin/lakehouse/raw_files/bitcoin_2026-02-04_01-36-24.csv
Arquivo gravado em /Volumes/pipeline_api_bitcoin/lakehouse/raw_files/bitcoin_2026-02-04_01-36-24.parquet


In [0]:
# Converter pandas para Spark DataFrame
spark_df = spark.createDataFrame(df_pandas)

# Inserir na Delta Table
spark_df.write.format("delta").mode("append").saveAsTable(
    "pipeline_api_bitcoin.lakehouse.cotacao_bitcoin"
)


In [0]:
%sql
DESCRIBE HISTORY pipeline_api_bitcoin.lakehouse.cotacao_bitcoin
LIMIT 2;


version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
388,2026-02-04T01:36:30.000Z,77304007769752,brunoguim2025@gmail.com,WRITE,"Map(mode -> Append, statsOnLoad -> true, partitionBy -> [])",,List(755803724818793),0203-203028-4ui6u446-v2n,387,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 1, numOutputBytes -> 1737)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
387,2026-02-04T01:35:49.000Z,77304007769752,brunoguim2025@gmail.com,WRITE,"Map(mode -> Append, statsOnLoad -> true, partitionBy -> [])",,List(755803724818793),0203-203028-4ui6u446-v2n,386,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 1, numOutputBytes -> 1737)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
