# Configuração do ambiente

## Import das bibliotecas

In [0]:
import os
import requests
import subprocess

from loguru import logger
from pathlib import Path
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from urllib import request

## Definição dos links e diretórios

In [0]:
# Mapeia o diretório que está no Bucket S3 com os arquivos do projeto
dir_s3_dados = r's3a://databricks-workspace-stack-edf2c-bucket/unity-catalog/3805457818561400/case-tecnico-analise-dados-ifood/dados'
dir_s3_dados_brutos = f'{dir_s3_dados}/brutos'
dir_s3_dados_processados = f'{dir_s3_dados}/processados'


# Dados

## Download

Primeiro é feita uma verificação se o dataset já foi salvo no diretório de dados brutos.
Caso o arquivo já esteja lá, não é feito o download novamente.

### Pedidos

In [0]:
# Verificação se os arquivos já estão no diretório de dados brutos
if any(f for f in dbutils.fs.ls(dir_s3_dados_processados) if "pedidos" in f.path):
    logger.info(f'Arquivo processado de pedidos já existe')
else:
    logger.info('Arquivo processado de pedidos não existe, baixando do diretório de dados brutos')

    # Tipos de dados da tabela de pedidos
    estrutura_pedidos = StructType([
        StructField('cpf', StringType(), True),
        StructField('customer_id', StringType(), True),
        StructField('customer_name', StringType(), True),

        StructField('delivery_address_city', StringType(), True),
        StructField('delivery_address_country', StringType(), True),
        StructField('delivery_address_district', StringType(), True),
        StructField('delivery_address_external_id', StringType(), True),
        StructField('delivery_address_latitude', StringType(), True),
        StructField('delivery_address_longitude', StringType(), True),
        StructField('delivery_address_state', StringType(), True),
        StructField('delivery_address_zip_code', StringType(), True),

        StructField('items', StringType(), True),

        StructField('merchant_id', StringType(), True),
        StructField('merchant_latitude', StringType(), True),
        StructField('merchant_longitude', StringType(), True),
        StructField('merchant_timezone', StringType(), True),

        StructField('order_created_at', TimestampType(), True),
        StructField('order_id', StringType(), True),
        StructField('order_scheduled', BooleanType(), True),
        StructField('order_total_amount', DoubleType(), True),
        StructField('origin_platform', StringType(), True),
        StructField('order_scheduled_date', TimestampType(), True)
    ])

    # Lê o arquivo do diretório de dados brutos
    pedidos = (
      spark.read
      .schema(estrutura_pedidos)
    #   .option("multiline", True)
      .json(f"{dir_s3_dados_brutos}/order.json.gz")
    )
    logger.info(f'Foram lidas {pedidos.count()} linhas do arquivo de dados brutos')

    # Converte campos de latitude e longitude para flot
    pedidos = (
        pedidos
        .withColumn("delivery_address_latitude", col("delivery_address_latitude").cast("double"))
        .withColumn("delivery_address_longitude", col("delivery_address_longitude").cast("double"))
        .withColumn("merchant_latitude", col("merchant_latitude").cast("double")) 
        .withColumn("merchant_longitude", col("merchant_longitude").cast("double"))
    )

    # Escreve o arquivo .parquet no diretório de dados processados
    (
        pedidos
        .write
        .mode("overwrite")
        .parquet(f'{dir_s3_dados_processados}/pedidos/')
    )
    logger.success(f'Arquivo processado de pedidos criado com sucesso')


### Usuários

In [0]:
# Verificação se os arquivos já estão no diretório de dados brutos
if any(f for f in dbutils.fs.ls(dir_s3_dados_processados) if "usuarios" in f.path):
    logger.info(f'Arquivo processado de usuários já existe')
else:
    logger.info('Arquivo processado de usuários não existe, baixando do diretório de dados brutos')

    # Tipos de dados da tabela de usuários
    estrutura_usuarios = StructType([
        StructField('customer_id', StringType(), True),
        StructField('language', StringType(), True),
        StructField('created_at', TimestampType(), True),
        StructField('active', BooleanType(), True),
        StructField('customer_name', StringType(), True),
        StructField('customer_phone_area', StringType(), True),
        StructField('customer_phone_number', StringType(), True),
    ])

    # Lê o arquivo do diretório de dados brutos
    usuarios = (
      spark.read
      .option("header", True)
      .schema(estrutura_usuarios)
      .csv(f"{dir_s3_dados_brutos}/consumer.csv.gz")
    )
    logger.info(f'Foram lidas {usuarios.count()} linhas do arquivo de dados brutos')

    # Escreve o arquivo .parquet no diretório de dados processados
    (
        usuarios
        .write
        .mode("overwrite")
        .parquet(f'{dir_s3_dados_processados}/usuarios/')
    )

    logger.success(f'Arquivo processado de usuários criado com sucesso')


### Restaurantes

In [0]:
# Verificação se os arquivos já estão no diretório de dados brutos
if any(f for f in dbutils.fs.ls(dir_s3_dados_processados) if "restaurantes" in f.path):
    logger.info(f'Arquivo processado de restaurantes já existe')
else:
    logger.info('Arquivo processado de restaurantes não existe, baixando do diretório de dados brutos')

    # Tipos de dados da tabela de restaurantes
    estrutura_restaurantes = StructType([
        StructField('id', StringType(), True),
        StructField('created_at', TimestampType(), True),
        StructField('enabled', BooleanType(), True),
        StructField('price_range', IntegerType(), True),
        StructField('average_ticket', DoubleType(), True),
        StructField('takeout_time', DoubleType(), True),
        StructField('delivery_time', DoubleType(), True),
        StructField('minimum_order_value', DoubleType(), True),
        StructField('merchant_zip_code', StringType(), True),
        StructField('merchant_city', StringType(), True),
        StructField('merchant_state', StringType(), True),
        StructField('merchant_country', StringType(), True)
    ])

    # Lê o arquivo do diretório de dados brutos
    restaurantes = (
      spark.read
      .option("header", True)
      .schema(estrutura_restaurantes)
      .csv(f"{dir_s3_dados_brutos}/restaurant.csv.gz")
    )
    logger.info(f'Foram lidas {restaurantes.count()} linhas do arquivo de dados brutos')

    # Escreve o arquivo .parquet no diretório de dados processados
    (
        restaurantes
        .write
        .mode("overwrite")
        .parquet(f'{dir_s3_dados_processados}/restaurantes/')
    )

    logger.success(f'Arquivo processado de restaurantes criado com sucesso')


In [0]:
display(restaurantes.limit(100))

### Usuários teste A/B

In [0]:
# Verificação se os arquivos já estão no diretório de dados brutos
if any(f for f in dbutils.fs.ls(dir_s3_dados_processados) if "usuarios_teste_ab" in f.path):
    logger.info(f'Arquivo processado de usuários do teste A/B já existe')
else:
    logger.info('Arquivo processado de usuários do teste A/B não existe, baixando do diretório de dados brutos')

    # Tipos de dados da tabela de usuários do teste A/B
    estrutura_usuarios_teste_ab = StructType([
        StructField("customer_id", StringType(), True),
        StructField("is_target", StringType(), True)
    ])

    # Lê o CSV extraído
    usuarios_teste_ab = (
        spark.read
        .option("header", True)
        .schema(estrutura_usuarios_teste_ab)
        .csv(f"{dir_s3_dados_brutos}/ab_test_ref.csv")
    )
    logger.info(f'Foram lidas {usuarios_teste_ab.count()} linhas do arquivo de dados brutos')

    # Escreve o arquivo .parquet no diretório de dados processados
    (
        usuarios_teste_ab
        .write
        .mode("overwrite")
        .parquet(f'{dir_s3_dados_processados}/usuarios_teste_ab/')
    )

    logger.success(f'Arquivo processado de usuários do teste A/B criado com sucesso')