### Imports

In [0]:
import requests
import os
import tarfile
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType, ArrayType, StructType, StructField, IntegerType, StringType
from scipy.stats import ttest_ind, chi2_contingency
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from scipy.stats import skew, kurtosis
import seaborn as sns

### Funções

In [0]:
def baixar_arquivos(urls, dbfs_dir="/FileStore/tables/"):
    """
    Faz o download de arquivos de URLs fornecidas e os copia para o Databricks File System (DBFS).

    Parâmetros:
        urls (list): Lista de URLs dos arquivos a serem baixados.
        dbfs_dir (str, opcional): Diretório de destino no DBFS onde os arquivos serão copiados.

    """
    dbutils.fs.mkdirs(dbfs_dir)

    for url in urls:
        nome_arquivo = url.split("/")[-1]
        local_tmp_path = f"/tmp/{nome_arquivo}"
        dbfs_path = f"{dbfs_dir}/{nome_arquivo}"
        print(f"Baixando: {url}")
        response = requests.get(url, stream=True)
        if response.status_code == 200:
            with open(local_tmp_path, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
            print(f"Salvo em: {local_tmp_path}")
            dbutils.fs.cp(f"file:{local_tmp_path}", dbfs_path)
            print(f"Copiado para DBFS: {dbfs_path}\n")
        else:
            print(f"Erro ao baixar {url} - Status code: {response.status_code}")


def metricas_iqr(df, coluna):
    """
    Calcula as métricas baseadas no intervalo interquartil (IQR) para identificar outliers em uma coluna de um DataFrame Spark.

    Essa função calcula os quartis (Q1, mediana, Q3) para a coluna especificada no DataFrame,
    define os limites inferior e superior para identificação de outliers com base no IQR, filtra os valores que são
    considerados outliers e exibe as métricas calculadas, como os quartis, o IQR e o número de outliers encontrados.

    Parâmetros:
        df (DataFrame): DataFrame Spark contendo a coluna a ser analisada.
        coluna (str): O nome da coluna numérica no DataFrame para a qual as métricas IQR serão calculadas.

    Retorna:
        DataFrame: Um DataFrame Spark contendo apenas os registros que são considerados outliers.

    Exibe:
        - Valor de Q1 (1º quartil), mediana e Q3 (3º quartil).
        - Valor do IQR (intervalo interquartil).
        - Limites inferior e superior calculados para definir os outliers.
        - Número total de valores considerados outliers.
    """
    q1, median, q3 = df.approxQuantile(coluna, [0.25, 0.5, 0.75], 0.01)
    iqr = q3 - q1 

    lower_bound = q1 - 1.5 * iqr
    upper_bound = q3 + 1.5 * iqr

    df_outliers = df.filter((col(coluna) < lower_bound) | (col(coluna) > upper_bound))

    print(
        f"Q1: {q1}\n"
        f"Median: {median}\n"
        f"Q3: {q3}\n"
        f"IQR: {iqr}\n"
        f"Limites: inferior={lower_bound:.2f}, superior={upper_bound:.2f}\n"
        f"Número de outliers: {df_outliers.count()}"
    )
    
    return df_outliers


def extract_all_files(tar_file_path, extract_to):
    """
    Extrai todos os arquivos de um arquivo .tar.gz armazenado no DBFS para um diretório especificado no DBFS.

    Esta função copia o arquivo .tar.gz do Databricks File System (DBFS) para um caminho temporário no sistema 
    local, realiza a extração de seus conteúdos e, em seguida, copia os arquivos extraídos de volta ao diretório 
    especificado no DBFS.

    Parâmetros:
        tar_file_path (str): Caminho completo do arquivo .tar.gz no DBFS.
        extract_to (str): Caminho destino no DBFS onde os arquivos extraídos serão armazenados.
    """
    dbutils.fs.cp(tar_file_path, "file:/tmp/temp_file.tar.gz")
    with tarfile.open("/tmp/temp_file.tar.gz", 'r:gz') as tar:
        tar.extractall("/tmp/extracted/")
    dbutils.fs.cp("file:/tmp/extracted/", extract_to, recurse=True)
    
    print(f"Arquivos extraídos para: {extract_to}")


### Downloads

In [0]:
urls = [
    "https://data-architect-test-source.s3-sa-east-1.amazonaws.com/order.json.gz",
    "https://data-architect-test-source.s3-sa-east-1.amazonaws.com/ab_test_ref.tar.gz",
    "https://data-architect-test-source.s3-sa-east-1.amazonaws.com/consumer.csv.gz",
    "https://data-architect-test-source.s3-sa-east-1.amazonaws.com/restaurant.csv.gz"
]

baixar_arquivos(urls, "/FileStore/iFood/dataset")

Baixando: https://data-architect-test-source.s3-sa-east-1.amazonaws.com/order.json.gz
Salvo em: /tmp/order.json.gz
Copiado para DBFS: /FileStore/iFood/dataset/order.json.gz

Baixando: https://data-architect-test-source.s3-sa-east-1.amazonaws.com/ab_test_ref.tar.gz
Salvo em: /tmp/ab_test_ref.tar.gz
Copiado para DBFS: /FileStore/iFood/dataset/ab_test_ref.tar.gz

Baixando: https://data-architect-test-source.s3-sa-east-1.amazonaws.com/consumer.csv.gz
Salvo em: /tmp/consumer.csv.gz
Copiado para DBFS: /FileStore/iFood/dataset/consumer.csv.gz

Baixando: https://data-architect-test-source.s3-sa-east-1.amazonaws.com/restaurant.csv.gz
Salvo em: /tmp/restaurant.csv.gz
Copiado para DBFS: /FileStore/iFood/dataset/restaurant.csv.gz



In [0]:
extract_all_files(
    tar_file_path='/FileStore/iFood/dataset/ab_test_ref.tar.gz', 
    extract_to='/FileStore/iFood/dataset/extracted/'
)

Arquivos extraídos para: /FileStore/iFood/dataset/extracted/


### Reads

In [0]:
consumer_raw = spark.read.option("compression", "gzip").csv("/FileStore/iFood/dataset/consumer.csv.gz", header=True, inferSchema=True)
restaurant = spark.read.option("compression", "gzip").csv("/FileStore/iFood/dataset/restaurant.csv.gz", header=True, inferSchema=True)
order_raw = spark.read.option("compression", "gzip").json("/FileStore/iFood/dataset/order.json.gz")
ab_test = spark.read.options(header=True, inferSchema=True).csv('/FileStore/iFood/dataset/extracted/ab_test_ref.csv')

### Tratamento dos dados

#### Dados do teste AB

In [0]:
# Verificando se temos usuários presentes no controle e no tratamento simultaneamente
ab_test.groupBy("customer_id").agg(count("*").alias("n_rows")).orderBy(col("n_rows").desc()).filter(col("n_rows") > 1).display()

customer_id,n_rows


#### Dados de Pedidos

##### Checagem dos dados 

In [0]:
# Verificando se temos valores nulos de customer_id na tabela de pedidos
order_raw.filter(col("customer_id").isNull()).agg(countDistinct("order_id").alias("total_orders")).display()

total_orders
5559


In [0]:
# Verificando se há orders duplicadas na tabela
check_orders_duplicadas = order_raw.groupBy("order_id").agg(count("*").alias("n_rows")).filter(col("n_rows") > 1)
check_orders_duplicadas.groupBy("n_rows").agg(countDistinct("order_id").alias("total_orders")).display()

n_rows,total_orders
2,1237852


In [0]:
# Verificando se temos valor total de pedido zerado
order_raw.filter(col("order_total_amount") == 0).agg(countDistinct("order_id").alias("total_orders")).display()


total_orders
102


##### Remoção de duplicadas, informações sensíveis e orders com customer_id nulos

In [0]:
wind_spec = Window.partitionBy("order_id").orderBy(col("order_reference_timestamp").desc())

orders = (
    order_raw.alias("o")
    .drop("cpf","customer_name")
    .distinct()
    .filter(
        (col("customer_id").isNotNull()) & 
        (col("order_total_amount") != 0) 
    ) 
    .join(
        ab_test.alias("ab"),
        col("ab.customer_id") == col("o.customer_id"),
        "inner"
    )
    .withColumns({
        "order_reference_timestamp":to_timestamp("order_created_at", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"),
        "order_reference_timestamp_ts":to_timestamp("order_scheduled_date", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
    })
    .withColumn("rn", row_number().over(wind_spec))
    .filter(col("rn") == 1)
    .select(
        "o.customer_id",
        "is_target",
        "delivery_address_city",
        "delivery_address_country",
        "delivery_address_district",
        "delivery_address_external_id",
        "delivery_address_latitude",
        "delivery_address_longitude",
        "delivery_address_state",
        "delivery_address_zip_code",
        "items",
        "merchant_id",
        "merchant_latitude",
        "merchant_longitude",
        "merchant_timezone",
        "order_reference_timestamp",
        "order_id",
        "order_scheduled",
        "order_total_amount",
        "origin_platform",
        col("order_reference_timestamp_ts").alias("order_scheduled_date"))
)

##### Checagem de outliers

In [0]:
orders.describe("order_total_amount").display()

summary,order_total_amount
count,2427313.0
mean,47.85628797775041
stddev,101.3985272559612
min,0.01
max,138750.9


In [0]:
orders_outliers = metricas_iqr(df = orders, coluna = "order_total_amount")

Q1: 27.0
Median: 39.8
Q3: 57.4
IQR: 30.4
Limites: inferior=-18.60, superior=103.00
Número de outliers: 139572


O método IQR considera orders com valores acima de R$103,00 como outlier. No entanto, pedidos com valores acima de R$103 ainda são factíveis. Desse modo, observei como estava o comportamento do grupo de outliers.

In [0]:
orders_outliers.describe("order_total_amount").display()

summary,order_total_amount
count,139572.0
mean,149.56600822514426
stddev,401.3824459386856
min,103.01
max,138750.9


In [0]:
metricas_iqr(df = orders_outliers, coluna = "order_total_amount")

Q1: 113.6
Median: 129.5
Q3: 158.0
IQR: 44.400000000000006
Limites: inferior=47.00, superior=224.60
Número de outliers: 9916
Out[38]: DataFrame[customer_id: string, is_target: string, delivery_address_city: string, delivery_address_country: string, delivery_address_district: string, delivery_address_external_id: string, delivery_address_latitude: string, delivery_address_longitude: string, delivery_address_state: string, delivery_address_zip_code: string, items: string, merchant_id: string, merchant_latitude: string, merchant_longitude: string, merchant_timezone: string, order_reference_timestamp: timestamp, order_id: string, order_scheduled: boolean, order_total_amount: double, origin_platform: string, order_scheduled_date: timestamp]

Ao observar as métricas do grupo de outliers, o limite superior pelo método IQR continua apresentando um valor baixo (comparado a valores factíveis de pedidos em delivery de restaurantes). \
Desse modo, optou-se por retirar da base order_id de pedidos cujo order_total_amount são superiores a R$1.000,00 (59 orders).


In [0]:
df_orders = orders.filter(col("order_total_amount") <= 1000)

#### Dados de Restaurantes

In [0]:
restaurant.groupBy("id").agg(count("*").alias("n_rows")).filter(col("n_rows") > 1).display()

id,n_rows


#### Dados de usuários

In [0]:
consumer_raw.groupBy("customer_id").agg(count("*").alias("n_rows")).filter(col("n_rows") > 1).display()

customer_id,n_rows


In [0]:
df_consumer = (consumer_raw
    .drop("customer_name","customer_phone_number")
)

### Escritas

In [0]:
df_orders.write.mode("overwrite").format("delta").save("/FileStore/iFood/dataset/orders_tratado/")
df_consumer.write.mode("overwrite").format("delta").save("/FileStore/iFood/dataset/consumer_tratado/")
ab_test.write.mode("overwrite").format("delta").save("/FileStore/iFood/dataset/ab_tratado")
restaurant.write.mode("overwrite").format("delta").save("/FileStore/iFood/dataset/restaurant_tratado")

Obs.: Ao longo do código deixei alguns displays que não seriam necessários em um ambiente produtivo. 