<a href="https://colab.research.google.com/github/NathanyApSalles/analysis_foodtech/blob/main/Case_Tecnico_DataAnalyst_Ifood.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Importação das bases

In [110]:
import requests
import tarfile
import os
from pyspark.sql import SparkSession, DataFrame

spark = SparkSession.builder.getOrCreate()

In [111]:
def read_file(url: str, local_path: str, type_file: str) -> DataFrame:
  """Função para ler arquivo e retornar um Dataframe."""
  if not os.path.exists(local_path):
    response = requests.get(url)
    with open(local_path, "wb") as f:
        f.write(response.content)
  if type_file == "json":
    return spark.read.json(local_path, multiLine=False)
  elif type_file == "csv":
    return spark.read.option("header", "true").csv(local_path)
  elif type_file == "tar":

    arquivos_extraidos = "/tmp/ab_test_ref"

    os.makedirs(arquivos_extraidos, exist_ok=True)

    with tarfile.open(local_path, "r:gz") as tar:
        tar.extractall(path=arquivos_extraidos)

    filename = ""
    for root, dirs, files in os.walk(arquivos_extraidos):
        for filename in files:
            print(filename) # print para visualizar todos os arquivos exraídos
    if ".csv" in filename: # se houver algum arquivo csv, junte todos os arquivos deste tipo no dataframe
      return spark.read.option("header", "true").csv(arquivos_extraidos + "/*.csv")
    else: # se houver algum arquivo json, junte todos os arquivos deste tipo no dataframe
      return spark.read.json(arquivos_extraidos + "/*.json")
      #pode acontecer de ter arquivos de diferentes tipos misturados, mas para este estudo vamos assumir que todos são do mesmo tipo


In [None]:
url_pedidos = "https://data-architect-test-source.s3-sa-east-1.amazonaws.com/order.json.gz"
url_usuarios = "https://data-architect-test-source.s3-sa-east-1.amazonaws.com/consumer.csv.gz"
url_merchants = "https://data-architect-test-source.s3-sa-east-1.amazonaws.com/restaurant.csv.gz"
url_test_ab = "https://data-architect-test-source.s3-sa-east-1.amazonaws.com/ab_test_ref.tar.gz"

local_path_pedidos = "/tmp/order.json.gz"
local_path_usuarios = "/tmp/consumer.csv.gz"
local_path_merchants = "/tmp/restaurant.csv.gz"
local_path_teste_ab = "/tmp/ab_test_ref.tar.gz"

pedidos = read_file(url_pedidos, local_path_pedidos, "json").cache()
usuarios = read_file(url_usuarios, local_path_usuarios, "csv").cache()
merchants = read_file(url_merchants, local_path_merchants, "csv").cache()
teste_ab = read_file(url_test_ab, local_path_teste_ab, "tar").cache()

In [None]:
pedidos.show(5)

In [None]:
usuarios.show(5)

In [None]:
merchants.show(5)

In [None]:
teste_ab.show(5, truncate=False)

In [None]:
from pyspark.sql.functions import col, broadcast, when, count, isnan, count_distinct, sum, avg, row_number
from pyspark.sql.window import Window

# Entendendo os dados

In [None]:
def df_info(df: DataFrame, colunas: list) -> None:
  """Função para auxiliar a identificar tamanho da base, tipo das colunas,
   valores nulos, duplicidade em colunas específicas na base."""
  # validando o tipo das colunas
  df.printSchema()

  # validando o tamanho da base
  print(f"Qtd de linhas: {df.count()}")

  print("\nValores nulos")
  df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

  print("\nValidando valores duplicados")
  for column in colunas:
    num_duplicados = df.groupBy(column).count().where(col("count") > 1).count()
    print(f"\n{column}: {num_duplicados}")

In [None]:
df_info(usuarios, ["customer_id"])

In [None]:
df_info(pedidos, ["order_id"])

In [None]:
# removendo os usuários nulos da base
# na base de pedidos temos um número pequeno de pedidos sem customer_id atribuído,
# porém como o objetivo da análise é validar os resultados do teste a/b, estes pedidos serão removidos visto que
# não participaram do teste a/b
pedidos_validos_ab = pedidos.where(col("customer_id").isNotNull())

In [None]:
# podemos ver que a base de pedidos possui uma grande voliumentria de pedidos duplicados
# foi analisado uma amostra e notou-se que para os pedidos duplicados, os campos que diferente são CPF e data de criação do pedido
# pode ter acontecido algum problema no produto ao gerar o número do pedido, ou até mesmo ao gerar a base,
# diantes disto podemos seguir com algumas tratativas, como criar um novo order_id concatenando com o CPF, ou
# dentro dos duplicados manter o pedido mais atual, ou a mais antigo.
# Como os valores dos pedidos são iguais, merchants, itens também são idênticos, optou-se por manter o pedido mais antigo


pedidos_validos_ab = (pedidos_validos_ab
                      .withColumn("rank", row_number().over(Window.partitionBy("order_id").orderBy("order_created_at")))
                      .where(col("rank") == 1)
                      .drop("rank")
).cache()
df_info(pedidos_validos_ab, ["order_id"])

In [None]:
df_info(merchants, ["id"])

In [None]:
df_info(teste_ab, ["customer_id"])

# EDA

In [None]:
teste_ab.groupBy(col("is_target")).agg(count_distinct("customer_id")).show()

In [None]:
pedidos_final = pedidos_validos_ab.join(broadcast(teste_ab), ['customer_id'], "left").cache()
pedidos_final.show(5)

In [None]:
pedidos_final.where(col("is_target").isNull()).show()

In [None]:
#qtd de pedidos por usuário
df_media_pedidos = pedidos_final.groupBy(col("customer_id"), col("is_target")).agg(count_distinct("order_id"), count("order_id"))
df_media_pedidos.show(truncate=False)

In [None]:
pedidos_final.groupBy(col("is_target")).agg(
                                      count_distinct(col("order_id")).alias("distinct_orders"),
                                      count_distinct(col("customer_id")).alias("distinct_customers"),
                                      count_distinct(col("merchant_id")).alias("distinct_merchants"),
                                      sum(col("order_total_amount")).alias("order_total_amount"),
                                      avg(col("order_total_amount")).alias("order_avg_amount"),
                                      (sum(col("order_total_amount"))/count_distinct(col("order_id"))).alias("ticket_medio"),


                                      ).show()

- tarquet x control

    - qtd de pedidos
    - valor total da compra
    - ticket médio
    - tempo entre compras
    - recorrencia
    - tem diferença entre usuários ativos ou não?
    - expansão geográfica: qtd de estabelecimentos diferentes que compraram
    - restaurantes que mais venderam
    - região que mais vendeu
    - horário das compras
    - qtd de produtos adquiridos

- ideias para teste ab:

    - qtd de usuários que receberam o cupom, para entender conversão, de quem recebeu, quem não comprou;
     

In [None]:
pedidos_final.where((col("customer_id") == "02e1faf7e89415736be3a37c70a2015641a9d652e0fa478b1b2975ce45dfe539") & (col("order_id") == "806884e91080e3519aaf60459c08ec179df5ff5aacab3fde085f7073a4254aa8")).show(30, truncate=False)

In [None]:
duplicados = pedidos_final.groupBy("order_id").count().filter("count > 1").count()

In [None]:
print(duplicados)