In [0]:
import requests
import pandas as pd
from datetime import datetime

def extrair_dados_bitcoin() -> pd.DataFrame:
    """ Extrai o valor do bitcoin em tempo real """
    # URL para requisitar o valor do bitcoin
    url_btc = "https://api.coinbase.com/v2/prices/spot"

    # Requisitando, ela retorna em formato JSON
    response = requests.get(url_btc)
    return response.json()

In [0]:
def extrair_cotacao_usd_brl() -> pd.DataFrame:
    """ Extrai a cotação USD-BRL da API CurrencyFreaks """
    # Chave da API
    api_key = '5a82cf6b79e14fb6a26cd353bf70f1d5'
    # URL de requisição
    url_cot = f"https://api.currencyfreaks.com/v2.0/rates/latest?apikey={api_key}"

    response = requests.get(url_cot)
    return response.json()

In [0]:
def tratar_dados_bitcoin(dados_json, taxa_usd_brl):
    """ Transforma os dados brutos da API, renomeia colunas, adiciona timestamp e converte para BRL """
    
    valor_usd = float(dados_json["data"]["amount"])
    criptomoeda = dados_json["data"]["base"]
    moeda_original = dados_json["data"]["currency"]
    
    # Convertendo de USD para BRL
    valor_brl = valor_usd * taxa_usd_brl

    # Adicionando timestamp como datetime object
    timestamp = datetime.now()

    dados_tratados = [{
        'valor_usd': valor_usd,
        'valor_brl': valor_brl,
        'criptomoeda': criptomoeda,
        'moeda_original': moeda_original,
        'taxa_conversao_usd_brl': taxa_usd_brl,
        'timestamps': timestamp
    }]

    return dados_tratados


In [0]:
# Extraindo dados
dados_bitcoin = extrair_dados_bitcoin()
dados_cotacao = extrair_cotacao_usd_brl()

# Extraindo a taxa de conversão USD-BRL
taxas_usd_brl = float(dados_cotacao["rates"]["BRL"])

# Tratando os dados e convertendo para BRL
dados_bitcoin_tratado = tratar_dados_bitcoin(dados_bitcoin, taxas_usd_brl)

print(dados_bitcoin_tratado)


In [0]:
%sql
CREATE CATALOG IF NOT EXISTS pipeline_api_bitcoin
COMMENT 'Catálogo para armazenamento de dados da API Bitcoin';

CREATE SCHEMA IF NOT EXISTS pipeline_api_bitcoin.lakehouse
COMMENT 'Schema para armazenamento de dados da API Bitcoin';

CREATE VOLUME IF NOT EXISTS pipeline_api_bitcoin.lakehouse.raw_files
COMMENT 'Volume para arquivos brutos de ingestão inicial'

In [0]:
# Criar dataframe do pandas dos dados retornados da API
import pandas as pd

df_pandas = pd.DataFrame(dados_bitcoin_tratado)

print(df_pandas)

In [0]:
# Salvando os dados em arquivo JSON
"""
Extraindo timestamp para nomear arquivo json, gerando um novo arquivoo a cada run
"""

events_ts = dados_bitcoin_tratado[0]['timestamps'] # Extraindo o ts
ts = events_ts.strftime("%Y%m%d_%H%M%S_%f") # Transformando o ts em string

# path do arquivo
json_file = f"/Volumes/pipeline_api_bitcoin/lakehouse/raw_files/bitcoin_{ts}.json"

# Salvando como JSON usando Pandas
df_pandas.to_json(json_file, orient='records', date_format='iso', indent=2)
print(f"Arquivo JSON salvo: {json_file}")

In [0]:
%sql

-- Criando SCHEMA separado para a Delta Table

CREATE SCHEMA IF NOT EXISTS pipeline_api_bitcoin.bitcoin_data
COMMENT 'Schema para armazenamento de dados da API Bitcoin'

In [0]:
# Criando DataFrame spark
df_spark = spark.createDataFrame(df_pandas)

# Mostrar schem
df_spark.printSchema()

# Salvar DataFrame spark em Delta-Table
df_spark.write.format("delta").mode("append").saveAsTable('pipeline_api_bitcoin.bitcoin_data.bitcoin_data')