In [0]:
import requests
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, TimestampType
from datetime import datetime

# Inicializar Spark
spark = SparkSession.builder.appName("Bitcoin Price Integration").getOrCreate()

# Configurações
API_URL = "https://api.coinbase.com/v2/prices/spot?currency=USD"
DELTA_PATH = "/mnt/delta/bitcoin_price"

def fetch_bitcoin_price():
    """Obtém o preço atual do Bitcoin da API Coinbase."""
    try:
        response = requests.get(API_URL)
        response.raise_for_status()
        data = response.json()['data']
        return {
            "amount": float(data['amount']),
            "base": data['base'],
            "currency": data['currency']
        }
    except requests.exceptions.RequestException as e:
        raise Exception(f"Erro ao acessar a API: {e}")

def save_to_table(data):
    """Salva os dados no Delta Lake."""
    # Schema para DataFrame
    schema = StructType([
        StructField("amount", FloatType(), True),
        StructField("base", StringType(), True),
        StructField("currency", StringType(), True)
    ])
    
    # Converter para DataFrame
    data_df = spark.createDataFrame([data], schema=schema)
    
   # Salvar diretamente como tabela gerenciada no Unity Catalog
    table_name = "bronze.bitcoin_price" # Substitua 'default' pelo schema correto se necessário
    data_df.write.format("delta").mode("append").saveAsTable(table_name)
    print(f"Dados salvos na table {table_name}") 

if __name__ == "__main__":
    # Obter dados da API
    print("Obtendo dados da API Coinbase...")
    bitcoin_price = fetch_bitcoin_price()
    print(f"Dados obtidos:", bitcoin_price)
   
   # Salvar na tabela gerenciada
    print("Salvando dados na tabela gerenciada...")
    save_to_table(bitcoin_price)
    print("Pipeline concluído com sucesso.")

Obtendo dados da API Coinbase...
Dados obtidos: {'amount': 96304.995, 'base': 'BTC', 'currency': 'USD'}
Salvando dados na tabela gerenciada...
Dados salvos na table bronze.bitcoin_price
Pipeline concluído com sucesso.
