
# 1. Imports

In [0]:
import datetime
import os
import sys
import subprocess
import tempfile


# 2. Kaggle Autenticação

> Teste Prático -> Escolha um conjunto de dados do Kaggle relacionado a vendas. Certifique-se de que o conjunto de dados inclui informações como datas, produtos, quantidades vendidas, etc.

> Teste Prático -> Carregue o conjunto de dados no Databricks.

Devido a pequena quantidade de dados deste DataSet, preferi colocar os dados no próprio dbfs. Em outras situações, com grande volumetria de dados, poderia ter utilizado um serviço de storage em cloud mais adequado [eg: Blob Storage (Azure) ou S3 (AWS)]

In [0]:
json_file_path = 'dbfs:/FileStore/KaggleToken/kaggle.json'

spark_json_df = spark.read.format('json').option('header', 'true').option('inferschema', 'true').load(json_file_path)

KAGGLE_USERNAME = spark_json_df.select(spark_json_df.username).take(1)[0]['username']
KAGGLE_KEY= spark_json_df.select(spark_json_df.key).take(1)[0]['key']

KAGGLE_DATASET_NAME = "olistbr/brazilian-ecommerce"
DBFS_DEST_PATH = "dbfs:/FileStore/Datum/KaggleOlistData/bronze"
DBFS_LAST_UPDATE_OLIST_DATASET = "dbfs:/FileStore/Datum/KaggleOlistLastUpdate/last_update.txt"
PREVIOUS_UPDATE = datetime.datetime(2020, 10, 1, 19, 8, 27)

In [0]:
def kaggle_auth(KAGGLE_USERNAME, KAGGLE_KEY):
    os.environ["KAGGLE_USERNAME"] = KAGGLE_USERNAME
    os.environ["KAGGLE_KEY"] = KAGGLE_KEY

    from kaggle.api.kaggle_api_extended import KaggleApi
    
    api = KaggleApi()
    api.authenticate()

    print("Auth success")

    return api

In [0]:
try:
    api = kaggle_auth(KAGGLE_USERNAME, KAGGLE_KEY)
except Exception as e:
    print(e)
    print("Installing kaggle module...")

    subprocess.check_call([sys.executable, "-m", "pip", "install", "kaggle"])
    api = kaggle_auth(KAGGLE_USERNAME, KAGGLE_KEY)

No module named 'kaggle'
Installing kaggle module...
Collecting kaggle
  Using cached kaggle-1.6.3-py3-none-any.whl
Collecting tqdm
  Using cached tqdm-4.66.1-py3-none-any.whl (78 kB)
Collecting python-slugify
  Using cached python_slugify-8.0.2-py2.py3-none-any.whl (10 kB)
Collecting text-unidecode>=1.3
  Using cached text_unidecode-1.3-py2.py3-none-any.whl (78 kB)
Installing collected packages: text-unidecode, tqdm, python-slugify, kaggle
Successfully installed kaggle-1.6.3 python-slugify-8.0.2 text-unidecode-1.3 tqdm-4.66.1



[notice] A new release of pip available: 22.3.1 -> 23.3.2
[notice] To update, run: pip install --upgrade pip


Auth success


In [0]:
del spark_json_df


# 3. Funções Auxiliares

In [0]:
def obter_informacoes_dataset(api, nome_completo_dataset):
    try:
        datasets = api.dataset_list(search=nome_completo_dataset)
        for dataset in datasets:
            if dataset.ref == nome_completo_dataset:
                return dataset
        print(f"Dataset {nome_completo_dataset} não encontrado.")
        return None
    except Exception as e:
        print(f"Erro ao recuperar informações do dataset: {e}")
        return None

In [0]:
def verificar_existencia_arquivos(caminho_dbfs):
    try:
        if dbutils.fs.ls(caminho_dbfs):
            print(f'--> Camada bronze "{caminho_dbfs}" existe.')
            return True
    except Exception as e:
        print(f'--> Camada bronze "{caminho_dbfs}" não existe.')
        return False

In [0]:
def salvar_data_ultima_atualizacao(caminho_arquivo, data_atualizacao):
    caminho = caminho_arquivo.replace("dbfs:/", "/dbfs/")
    with open(caminho, "w") as arquivo:
        arquivo.write(data_atualizacao.strftime("%Y-%m-%d %H:%M:%S"))

In [0]:
def ler_data_atualizacao(caminho_arquivo, data_padrao):
    caminho = caminho_arquivo.replace("dbfs:/", "/dbfs/")
    try:
        with open(caminho, "r") as arquivo:
            data_str = arquivo.read()
            print(
                f'--> Watermark existe e possui valor {datetime.datetime.strptime(data_str, "%Y-%m-%d %H:%M:%S")}.'
            )
            return datetime.datetime.strptime(data_str, "%Y-%m-%d %H:%M:%S")
    except Exception as e:
        print(
            f'--> Watermark com última atualização do DataSet da Olist no Kaggle não encontrado em "{caminho}". Novo watermark será criado com o datetime {PREVIOUS_UPDATE}.'
        )
        salvar_data_ultima_atualizacao(caminho_arquivo, data_padrao)
        return data_padrao

In [0]:
def baixar_e_salvar_dataset(api, nome_dataset, caminho_dbfs):
    with tempfile.TemporaryDirectory() as tmpdir:
        api.dataset_download_files(nome_dataset, path=tmpdir, unzip=True)

        arquivos = os.listdir(tmpdir)
        for arquivo in arquivos:
            caminho_arquivo = os.path.join(tmpdir, arquivo)
            caminho_destino = os.path.join(caminho_dbfs, arquivo)
            dbutils.fs.cp(f"file:{caminho_arquivo}", caminho_destino)
            print(f"* Arquivo {arquivo} salvo em {caminho_destino}")

In [0]:
def verificar_atualizacao_e_download(api, nome_dataset, caminho_dbfs, data_anterior):
    dataset = obter_informacoes_dataset(api, nome_dataset)

    if dataset:
        data_ultima_atualizacao = dataset.lastUpdated
        arquivos_existem = verificar_existencia_arquivos(caminho_dbfs)

        data_anterior = ler_data_atualizacao(
            DBFS_LAST_UPDATE_OLIST_DATASET, data_anterior
        )
        necessita_atualizacao = data_ultima_atualizacao > data_anterior

        if necessita_atualizacao or not arquivos_existem:
            if necessita_atualizacao:
                print(
                    f"--> Dataset atualizado após {data_anterior}, iniciando download..."
                )
            else:
                print(
                    f"--> Iniciando download dos arquivos do DataSet {KAGGLE_DATASET_NAME}..."
                )
            baixar_e_salvar_dataset(api, nome_dataset, caminho_dbfs)

            salvar_data_ultima_atualizacao(
                DBFS_LAST_UPDATE_OLIST_DATASET, data_ultima_atualizacao
            )
        else:
            raise Exception(
                "--> Dataset está atualizado e todos os arquivos da bronze já existem no DBFS. Esta Exception é customizada para parar o Workflow."
            )
    else:
        print("--> Não foi possível obter as informações do dataset.")


# 4. Verificando Atualizações

Se houver atualização no dataset, faremos o download dos novos arquivos e atualizaremos a watermark.

In [0]:
verificar_atualizacao_e_download(api, KAGGLE_DATASET_NAME, DBFS_DEST_PATH, PREVIOUS_UPDATE)

--> Camada bronze "dbfs:/FileStore/Datum/KaggleOlistData/bronze" não existe.
--> Watermark existe e possui valor 2021-10-01 19:08:27.
--> Iniciando download dos arquivos do DataSet olistbr/brazilian-ecommerce...
* Arquivo olist_sellers_dataset.csv salvo em dbfs:/FileStore/Datum/KaggleOlistData/bronze/olist_sellers_dataset.csv
* Arquivo olist_geolocation_dataset.csv salvo em dbfs:/FileStore/Datum/KaggleOlistData/bronze/olist_geolocation_dataset.csv
* Arquivo olist_products_dataset.csv salvo em dbfs:/FileStore/Datum/KaggleOlistData/bronze/olist_products_dataset.csv
* Arquivo olist_order_items_dataset.csv salvo em dbfs:/FileStore/Datum/KaggleOlistData/bronze/olist_order_items_dataset.csv
* Arquivo olist_orders_dataset.csv salvo em dbfs:/FileStore/Datum/KaggleOlistData/bronze/olist_orders_dataset.csv
* Arquivo olist_customers_dataset.csv salvo em dbfs:/FileStore/Datum/KaggleOlistData/bronze/olist_customers_dataset.csv
* Arquivo olist_order_reviews_dataset.csv salvo em dbfs:/FileStore/Datum