In [0]:
from pyspark.sql import SparkSession

client_id = dbutils.secrets.get('bix_data_scope', 'client-id')
client_secret = dbutils.secrets.get('bix_data_scope', 'client-secret')
endpoint_id = dbutils.secrets.get('bix_data_scope', 'endpoint-id')

def configure_mounts(spark, configs, mounts):
    """
    Configura os pontos de montagem no Databricks, desmontando se já estiverem montados e montando novamente com as novas configurações.

    Args:
        spark (SparkSession): Sessão ativa do Spark.
        configs (dict): Dicionário com as configurações de autenticação para o Azure Data Lake Storage.
        mounts (dict): Dicionário contendo os pontos de montagem e seus respectivos caminhos de origem no Azure.
    """
    # Obter os pontos de montagem atuais
    mounts_df = spark.createDataFrame(dbutils.fs.mounts())
    
    # Iterar sobre os pontos de montagem e configurar cada um
    for mount_point, source in mounts.items():
        if mounts_df.filter(mounts_df.mountPoint == mount_point).count() > 0:
            # Desmontar se já estiver montado
            dbutils.fs.unmount(mount_point)
        
        # Montar novamente com as configurações fornecidas
        dbutils.fs.mount(
            source=source,
            mount_point=mount_point,
            extra_configs=configs
        )
    print("Configurações de montagem concluídas.")

# Sessão do Spark
spark = SparkSession.builder.appName("BixDataPipeline").getOrCreate()

# Configurações de autenticação para o Azure Data Lake Storage
configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": f"{client_id}", 
    "fs.azure.account.oauth2.client.secret": f"{client_secret}",
    "fs.azure.account.oauth2.client.endpoint": f"{endpoint_id}"
}

# Dicionário com as informações dos containers e pontos de montagem
mounts = {
    "/mnt/landing": "abfss://landing@projbixarmazenamento.dfs.core.windows.net/",
    "/mnt/processing": "abfss://processing@projbixarmazenamento.dfs.core.windows.net/",
    "/mnt/curated": "abfss://curated@projbixarmazenamento.dfs.core.windows.net/"
}

# Configurar as montagens
configure_mounts(spark, configs, mounts)


In [0]:
import requests
import json


# URL da API dos funcionários
api_url = "https://us-central1-bix-tecnologia-prd.cloudfunctions.net/api_challenge_junior"

def extract_employees(api_url, start_id, end_id):
    """
    Extrai os dados dos funcionários a partir da API.

    Args:
        api_url (str): URL da API para extração dos dados.
        start_id (int): ID inicial do funcionário.
        end_id (int): ID final do funcionário.
    """
    all_data = []
    for employee_id in range(start_id, end_id + 1):
        url = f"{api_url}?id={employee_id}"
        response = requests.get(url)
        if response.status_code == 200:
            data = response.text.strip()
            all_data.append({"id": employee_id, "name": data})
        else:
            print(f"Falha na obtenção de dados para ID {employee_id}: {response.status_code}")
            response.raise_for_status()
    return all_data

def save_employees_to_landing(file_name, employees_data):
    """
    Salva os dados dos funcionários em formato JSON na camada 'landing'.

    Args:
        file_name (str): Nome do arquivo JSON a ser salvo.
        employees_data (list): Lista de dicionários contendo os dados dos funcionários.
    """
    file_path = f"/mnt/landing/{file_name}"
    
    # Converte os dados para formato JSON
    json_data = json.dumps(employees_data, indent=4)
    
    # Salva os dados em formato JSON no DBFS
    try:
        dbutils.fs.put(file_path, json_data, overwrite=True)
        print(f"Arquivo JSON '{file_name}' salvo com sucesso em /mnt/landing.")
    except Exception as e:
        print(f"Erro ao salvar o arquivo JSON: {e}")


employees_data = extract_employees(api_url, 1, 9)
save_employees_to_landing("employees_data.json", employees_data)

In [0]:
import os
import requests
from pyspark.sql import SparkSession

# URL do arquivo Parquet e nome do arquivo
parquet_url = "https://storage.googleapis.com/challenge_junior/categoria.parquet"
file_name = "categoria.parquet"

# Inicializar a sessão do Spark
spark = SparkSession.builder.appName("DownloadParquet").getOrCreate()

def save_parquet_to_landing(parquet_url, file_name):
    """
    Baixa um arquivo Parquet da URL fornecida e o salva na pasta /mnt/landing.

    Args:
        parquet_url (str): URL do arquivo Parquet.
        file_name (str): Nome do arquivo Parquet a ser salvo.

    Returns:
        None
    """
    landing_dir = "/mnt/landing"
    temp_file_path = f"/tmp/{file_name}"
    dbfs_temp_path = f"dbfs:/tmp/{file_name}"
    destination_path = os.path.join(landing_dir, file_name)
    
    try:
        # Baixa o arquivo Parquet da URL
        response = requests.get(parquet_url)
        if response.status_code == 200:
            # Salva o arquivo Parquet temporariamente no sistema local
            with open(temp_file_path, 'wb') as file:
                file.write(response.content)
            print(f"Arquivo Parquet baixado e salvo temporariamente em {temp_file_path}")
            
            # Copia o arquivo Parquet para o DBFS
            dbutils.fs.cp(f"file:{temp_file_path}", dbfs_temp_path)
            print(f"Arquivo Parquet copiado para o DBFS em {dbfs_temp_path}")
            
            # Lê o arquivo Parquet no DBFS em um DataFrame do PySpark
            df = spark.read.parquet(dbfs_temp_path)
            
            # Salva o DataFrame como Parquet no DBFS na pasta /mnt/landing
            df.write.mode("overwrite").parquet(destination_path)
            print(f"Arquivo Parquet salvo em {destination_path}")
            
            # Remove o arquivo temporário tanto no local quanto no DBFS
            os.remove(temp_file_path)
            dbutils.fs.rm(dbfs_temp_path)
        else:
            print(f"Falha ao baixar o arquivo Parquet: {response.status_code}")
            response.raise_for_status()
    except Exception as e:
        print(f"Erro ao processar o arquivo Parquet: {e}")


save_parquet_to_landing(parquet_url, file_name)


In [0]:
import psycopg2
from psycopg2 import sql
import pandas as pd

# Configurações para conectar ao banco de dados PostgreSQL
postgres_host = dbutils.secrets.get('bix_data_scope', 'postgres-host')
postgres_user = dbutils.secrets.get('bix_data_scope', 'postgres-user')
postgres_password = dbutils.secrets.get('bix_data_scope', 'postgres-password')
postgres_port = dbutils.secrets.get('bix_data_scope', 'postgres-port')
postgres_db = dbutils.secrets.get('bix_data_scope', 'postgres-db')

postgres_conn = {
    "host": postgres_host,
    "user": postgres_user,
    "password": postgres_password,
    "port": postgres_port,
    "dbname": postgres_db
}


def extract_and_save_sales_data(postgres_conn, csv_path):
    """
    Extrai dados da tabela de vendas do banco de dados PostgreSQL e os salva como um arquivo CSV.

    Args:
        postgres_conn (dict): Dicionário com os parâmetros de conexão ao banco de dados PostgreSQL.
        csv_path (str): Caminho no DBFS onde o arquivo CSV será salvo.

    Returns:
        None
    """
    conn = None
    cursor = None
    try:
        # Conectar ao banco de dados PostgreSQL
        conn = psycopg2.connect(**postgres_conn)
        cursor = conn.cursor()
        
        # Executar consulta SQL para extrair dados
        query_sales = sql.SQL("SELECT * FROM public.venda")
        cursor.execute(query_sales)
        rows = cursor.fetchall()
        col_names = [desc[0] for desc in cursor.description]
        
        # Criar DataFrame com os dados extraídos
        df = pd.DataFrame(rows, columns=col_names)
        
        # Salvar o DataFrame como um arquivo CSV localmente
        local_csv_path = "/tmp/sales_data.csv"
        df.to_csv(local_csv_path, index=False, header=True)
        print(f"Dados salvos como CSV localmente em {local_csv_path}")

        # Copiar o arquivo CSV para o DBFS
        dbfs_csv_path = f"{csv_path}"
        dbutils.fs.cp(f"file:{local_csv_path}", dbfs_csv_path)
        print(f"Arquivo CSV copiado para o DBFS em {dbfs_csv_path}")
        
    except Exception as e:
        print(f"Erro ao extrair e salvar dados do banco de dados: {e}")
    finally:
        # Fechar o cursor e a conexão com o banco de dados
        if cursor:
            cursor.close()
        if conn:
            conn.close()

# Caminho do arquivo CSV na pasta /mnt/landing
csv_path = "/mnt/landing/sales_data.csv"


extract_and_save_sales_data(postgres_conn, csv_path)

In [0]:
# Listar arquivos no container 'landing'
files = dbutils.fs.ls("/mnt/landing/")
for file in files:
    print(file.path)