<a href="https://colab.research.google.com/github/L-Chemelli/ALB_SEB_2020-1/blob/master/Teachable_hotmart_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Preparação do ambiente

In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=80112ec5d1bebf658b7faf58e99b492be7f2e0af37336dda6c9ad3e9bee3c64b
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [33]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType, TimestampType
from datetime import date

In [None]:
spark = SparkSession.builder.appName("hotmart_teachable_ae_test").getOrCreate()

# Pergunta 1

## Preparação do código

In [107]:
# Definição do schema
schema = StructType([
    StructField("purchase_id", IntegerType(), True),
    StructField("buyer_id", IntegerType(), True),
    StructField("prod_item_id", IntegerType(), True),
    StructField("order_date", DateType(), True),
    StructField("release_date", DateType(), True),
    StructField("producer_id", IntegerType(), True),
    StructField("purchase_partition", IntegerType(), True),
    StructField("prod_item_partition", IntegerType(), True)
])

# Dados
data = [
    (55, 15947, 5, date(2022, 12, 1), date(2022, 12, 1), 852852, 5, 5),
    (56, 369798, 746520, date(2022, 12, 25), date(2022, 12, 25), 963963, 6, 0),
    (57, 147, 98736, date(2021, 7, 3), date(2021, 7, 3), 963963, 7, 6),
    (58, 986533, 6565, date(2021, 10, 12), None, 200478, 8, 5)
]

# Criação do DataFrame
purchase = spark.createDataFrame(data, schema)

In [108]:
# Definição do schema
schema = StructType([
    StructField("prod_item_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("item_quantity", IntegerType(), True),
    StructField("purchase_value", DoubleType(), True),
    StructField("prod_item_partition", StringType(), True)
])

# Dados para o DataFrame
data = [
    (1, 69, 5, 500.00, 5),
    (5, 69, 120, 1.00, 0),
    (98736, 37, 69, 25.00, 6),
    (3, 96, 369, 140.00, 5)
]

# Criação do DataFrame
product_item  = spark.createDataFrame(data, schema)

In [109]:
purchase.createOrReplaceTempView('purchase')
product_item.createOrReplaceTempView('product_item')

## - Quais são os 50 maiores produtores em faturamento ($) de 2021?

In [110]:
resultado_1 = spark.sql('''
select distinct producer_id,
                sum(purchase_value*item_quantity) faturamento
from purchase pch
  inner join product_item pdi on pch.prod_item_id = pdi.prod_item_id
where 1=1
  and pch.release_date is not null
  and YEAR(pch.release_date) = 2021
group by producer_id
order by faturamento desc
limit 50
''')

resultado_1.show()

+-----------+-----------+
|producer_id|faturamento|
+-----------+-----------+
|     963963|     1725.0|
+-----------+-----------+



## - Quais são os 2 produtos que mais faturaram ($) de cada produtor?


In [112]:
resultado_2 = spark.sql('''
with faturamento_produto_produtor as (
select distinct producer_id,
                product_id,
                sum(purchase_value*item_quantity) faturamento
from purchase pch
  inner join product_item pdi on pch.prod_item_id = pdi.prod_item_id
where 1=1
  and pch.release_date is not null
  and YEAR(pch.release_date) = 2021
group by producer_id, product_id
),
rank_faturamento as (
select producer_id,
       product_id,
       faturamento,
       row_number() over (partition by producer_id order by faturamento desc) posicao_faturamento
from faturamento_produto_produtor
)
select producer_id,
       product_id,
       faturamento
from rank_faturamento
where 1=1
  and posicao_faturamento <= 2
order by producer_id, faturamento desc;
''')

resultado_2.show()

+-----------+----------+-----------+
|producer_id|product_id|faturamento|
+-----------+----------+-----------+
|     963963|        37|     1725.0|
+-----------+----------+-----------+



# Pergunta 2

## Preparação para o código

In [94]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, DateType, DecimalType
from datetime import datetime

# Função para converter strings em datetime
def to_datetime(date_str):
    return datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")

def to_date(date_str):
    return datetime.strptime(date_str, "%Y-%m-%d").date()

# Esquema para o primeiro DataFrame
schema1 = StructType([
    StructField("transaction_datetime", TimestampType(), True),
    StructField("transaction_date", DateType(), True),
    StructField("purchase_id", IntegerType(), True),
    StructField("buyer_id", IntegerType(), True),
    StructField("prod_item_id", IntegerType(), True),
    StructField("order_date", DateType(), True),
    StructField("release_date", DateType(), True),
    StructField("producer_id", IntegerType(), True)
])

# Dados para o primeiro DataFrame convertidos
purchase = [
    (to_datetime("2023-01-20 22:00:00"), to_date("2023-01-20"), 55, 15947, 5, to_date("2023-01-20"), to_date("2023-01-20"), 852852),
    (to_datetime("2023-01-26 00:01:00"), to_date("2023-01-26"), 56, 369798, 746520, to_date("2023-01-25"), None, 963963),
    (to_datetime("2023-02-05 10:00:00"), to_date("2023-02-05"), 55, 160001, 5, to_date("2023-01-20"), to_date("2023-01-20"), 852852),
    (to_datetime("2023-02-26 03:00:00"), to_date("2023-02-26"), 69, 160001, 18, to_date("2023-02-26"), to_date("2023-02-28"), 96967),
    (to_datetime("2023-07-15 09:00:00"), to_date("2023-07-15"), 55, 160001, 5, to_date("2023-01-20"), to_date("2023-03-01"), 852852)
]

# Criação do DataFrame
df_purchase = spark.createDataFrame(purchase, schema=schema1)

# Esquema para o segundo DataFrame
schema2 = StructType([
    StructField("transaction_datetime", TimestampType(), True),
    StructField("transaction_date", DateType(), True),
    StructField("purchase_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("item_quantity", IntegerType(), True),
    StructField("purchase_value", DoubleType(), True)
])

# Dados para o segundo DataFrame convertidos
product_item = [
    (to_datetime("2023-01-20 22:02:00"), to_date("2023-01-20"), 55, 696969, 10, 50.00),
    (to_datetime("2023-01-25 23:59:59"), to_date("2023-01-25"), 56, 808080, 120, 2400.00),
    (to_datetime("2023-02-26 03:00:00"), to_date("2023-02-26"), 69, 373737, 2, 2000.00),
    (to_datetime("2023-07-12 09:00:00"), to_date("2023-07-12"), 55, 696969, 10, 55.00)
]

# Criação do DataFrame
df_product_item = spark.createDataFrame(product_item, schema=schema2)

# Esquema para o terceiro DataFrame
schema3 = StructType([
    StructField("transaction_datetime", TimestampType(), True),
    StructField("transaction_date", DateType(), True),
    StructField("purchase_id", IntegerType(), True),
    StructField("subsidiary", StringType(), True)
])

# Dados para o terceiro DataFrame convertidos
purchase_extra_info = [
    (to_datetime("2023-01-23 00:05:00"), to_date("2023-01-23"), 55, "nacional"),
    (to_datetime("2023-01-25 23:59:59"), to_date("2023-01-25"), 56, "internacional"),
    (to_datetime("2023-02-28 01:10:00"), to_date("2023-02-28"), 69, "nacional"),
    (to_datetime("2023-03-12 07:00:00"), to_date("2023-03-12"), 69, "internacional")
]

# Criação do DataFrame
df_purchase_extra_info = spark.createDataFrame(purchase_extra_info, schema=schema3)

# Exibindo os DataFrames
df_purchase.show(truncate=False)
df_product_item.show(truncate=False)
df_purchase_extra_info.show(truncate=False)


+--------------------+----------------+-----------+--------+------------+----------+------------+-----------+
|transaction_datetime|transaction_date|purchase_id|buyer_id|prod_item_id|order_date|release_date|producer_id|
+--------------------+----------------+-----------+--------+------------+----------+------------+-----------+
|2023-01-20 22:00:00 |2023-01-20      |55         |15947   |5           |2023-01-20|2023-01-20  |852852     |
|2023-01-26 00:01:00 |2023-01-26      |56         |369798  |746520      |2023-01-25|NULL        |963963     |
|2023-02-05 10:00:00 |2023-02-05      |55         |160001  |5           |2023-01-20|2023-01-20  |852852     |
|2023-02-26 03:00:00 |2023-02-26      |69         |160001  |18          |2023-02-26|2023-02-28  |96967      |
|2023-07-15 09:00:00 |2023-07-15      |55         |160001  |5           |2023-01-20|2023-03-01  |852852     |
+--------------------+----------------+-----------+--------+------------+----------+------------+-----------+

+--------

## ETL

In [95]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

#Exemplo de leitura (Extract)
# df_purchase = spark.read.table('purchase')
# df_product_item = spark.read.table('product_item')
# df_purchase_extra_info = spark.read.table('purchase_extra_info')

#Janela que será utilizada
window_spec = Window.partitionBy("purchase_id", "transaction_date").orderBy(F.col("transaction_datetime").desc())

#Rank de todas as tabelas
df_purchase = df_purchase.withColumn("rank", F.row_number().over(window_spec))
df_product_item = df_product_item.withColumn("rank", F.row_number().over(window_spec))
df_purchase_extra_info = df_purchase_extra_info.withColumn("rank", F.row_number().over(window_spec))

#Última atualização do dia
df_purchase = df_purchase.filter(F.col("rank") == 1).drop("rank", "transaction_datetime")
df_product_item = df_product_item.filter(F.col("rank") == 1).drop("rank", "transaction_datetime")
df_purchase_extra_info = df_purchase_extra_info.filter(F.col("rank") == 1).drop("rank", "transaction_datetime")

#Todo range de datas para cada purchase
df_purchase_dates = df_purchase.select(F.col('transaction_date'),
                                       F.col('purchase_id'))
df_product_item_dates = df_product_item.select(F.col('transaction_date'),
                                               F.col('purchase_id'))
df_purchase_extra_info_dates = df_purchase_extra_info.select(F.col('transaction_date'),
                                                              F.col('purchase_id'))

df_combined = df_purchase_dates.union(df_product_item_dates)
df_purchase_all_dates = df_combined.union(df_purchase_extra_info_dates)\
                         .select('*')\
                         .distinct()

#Join Condition

join_condition = [F.col('a.transaction_date') == F.col('b.transaction_date'), F.col('a.purchase_id') == F.col('b.purchase_id')]

#Preenche as tabelas nas datas faltantes
df_purchase = df_purchase_all_dates.alias('a').join(df_purchase.alias('b'), join_condition,'left').drop(F.col('b.purchase_id'), F.col('b.transaction_date'))
df_product_item = df_purchase_all_dates.alias('a').join(df_product_item.alias('b'), join_condition,'left').drop(F.col('b.purchase_id'), F.col('b.transaction_date'))
df_purchase_extra_info = df_purchase_all_dates.alias('a').join(df_purchase_extra_info.alias('b'), join_condition,'left').drop(F.col('b.purchase_id'), F.col('b.transaction_date'))

#Ordenando DFs
df_purchase = df_purchase.orderBy("purchase_id", "transaction_date")
df_product_item = df_product_item.orderBy("purchase_id", "transaction_date")
df_purchase_extra_info = df_purchase_extra_info.orderBy("purchase_id", "transaction_date")

# Janela para preencher vazios
window_spec = Window.partitionBy("purchase_id").orderBy("transaction_date").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Preencher os valores nulos
def fill_nulls(df):
    # Obter a lista das colunas (exceto transaction_date e purchase_id)
    columns_to_fill = [c for c in df.columns if c not in ('transaction_date', 'purchase_id')]

    # Definir a janela para preencher valores nulos, particionando por purchase_id
    window_spec = Window.partitionBy("purchase_id").orderBy("transaction_date").rowsBetween(Window.unboundedPreceding, Window.currentRow)

    # Aplicar o preenchimento a todas as colunas
    df_filled = df
    for column in columns_to_fill:
        df_filled = df_filled.withColumn(
            column,
            F.last(column, ignorenulls=True).over(window_spec)
        )

    return df_filled

df_purchase = fill_nulls(df_purchase)
df_product_item = fill_nulls(df_product_item)
df_purchase_extra_info = fill_nulls(df_purchase_extra_info)

# Mostrar o DataFrame preenchido
df_output = df_purchase.alias('a').join(df_product_item.alias('b'), join_condition, 'left').drop(F.col('b.purchase_id'), F.col('b.transaction_date'))
df_output = df_output.alias('a').join(df_purchase_extra_info.alias('b'), join_condition, 'left').drop(F.col('b.purchase_id'), F.col('b.transaction_date'))

#Versão mais recente do registro
df_purchase_most_recent = df_purchase_all_dates.groupBy("purchase_id") \
    .agg(F.max("transaction_date").alias("latest_transaction_date"))

join_condition = [F.col('a.purchase_id') == F.col('b.purchase_id'), F.col('a.transaction_date') == F.col('b.latest_transaction_date')]

df_output = df_output.alias('a').join(df_purchase_most_recent.alias('b'),join_condition, 'left')\
                                                    .withColumn('is_most_recent_transaction', F.when(F.col('latest_transaction_date').isNull(), False)
                                                                                              .otherwise(True))\
                                                    .drop(F.col('b.purchase_id'), F.col('b.latest_transaction_date'))\
                                                    .withColumn("unique_id", F.sha2(F.concat(F.col("transaction_date"), F.lit("_"), F.col("purchase_id")), 256))\
                                                    .select(
                                                            "unique_id",
                                                            "transaction_date",
                                                            "purchase_id",
                                                            "buyer_id",
                                                            "prod_item_id",
                                                            "order_date",
                                                            "release_date",
                                                            "producer_id",
                                                            "product_id",
                                                            "item_quantity",
                                                            "purchase_value",
                                                            "subsidiary",
                                                            "is_most_recent_transaction"
                                                        )


df_output.show()

+--------------------+----------------+-----------+--------+------------+----------+------------+-----------+----------+-------------+--------------+-------------+--------------------------+
|           unique_id|transaction_date|purchase_id|buyer_id|prod_item_id|order_date|release_date|producer_id|product_id|item_quantity|purchase_value|   subsidiary|is_most_recent_transaction|
+--------------------+----------------+-----------+--------+------------+----------+------------+-----------+----------+-------------+--------------+-------------+--------------------------+
|41acafb0a9249af43...|      2023-01-20|         55|   15947|           5|2023-01-20|  2023-01-20|     852852|    696969|           10|          50.0|         NULL|                     false|
|a2a61420bbe783a30...|      2023-01-23|         55|   15947|           5|2023-01-20|  2023-01-20|     852852|    696969|           10|          50.0|     nacional|                     false|
|0e5dbfcb3a4ed1a48...|      2023-02-05|      

## Create As Table

In [None]:

# Se utilizado o PySpark, poderia ser simplesmente utilizado o Save as Table ao fim com partição, para criar a tabela com Schema final do código
# Para o caso do problema pedido, o Create As Table poderia ser:
spark.sql('''
 CREATE TABLE purchases_historical (
    unique_id STRING,
    transaction_date DATE,
    purchase_id INT,
    buyer_id INT,
    prod_item_id INT,
    order_date DATE,
    release_date DATE,
    producer_id INT,
    product_id INT,
    item_quantity INT,
    purchase_value DECIMAL(10, 2),
    subsidiary STRING,
    is_most_recent_transaction BOOLEAN
)
PARTITIONED BY (transaction_date DATE)

CREATE UNIQUE INDEX idx_unique_purchase_id ON TABLE purchases_historical (unique_id)
CREATE INDEX idx_purchase_id ON TABLE purchases_historical (purchase_id)
CREATE INDEX idx_transaction_date ON TABLE purchases_historical (transaction_date)
CREATE INDEX idx_composite_purchase_date ON TABLE purchases_historical (purchase_id, transaction_date);
''')

In [98]:
df_output.createOrReplaceTempView('purchases_historical')

## GMV Diário Atual

In [99]:
resultado = spark.sql('''
select release_date,
       subsidiary,
       sum(purchase_value*item_quantity) faturamento
from purchases_historical pch
where 1=1
  and pch.release_date is not null
  and is_most_recent_transaction = TRUE
group by release_date, subsidiary
order by release_date desc
''')

resultado.show()

+------------+-------------+-----------+
|release_date|   subsidiary|faturamento|
+------------+-------------+-----------+
|  2023-03-01|     nacional|      550.0|
|  2023-02-28|internacional|     4000.0|
+------------+-------------+-----------+



## GMV Diário se posicionando no tempo

In [106]:
resultado = spark.sql('''
with rank_registros as (
select
    unique_id,
    ROW_NUMBER() OVER (PARTITION BY purchase_id ORDER BY transaction_date DESC) AS posicao_registro
from purchases_historical
where 1=1
 and transaction_date <= "2023-02-27"
),
posicionamento as (
select *
from purchases_historical pch
  inner join rank_registros rr on pch.unique_id = rr.unique_id and rr.posicao_registro = 1
)
select release_date,
       subsidiary,
       sum(purchase_value*item_quantity) faturamento
from posicionamento pch
where 1=1
  and pch.release_date is not null
group by release_date, subsidiary
order by release_date desc
''')

resultado.show()

+------------+----------+-----------+
|release_date|subsidiary|faturamento|
+------------+----------+-----------+
|  2023-02-28|      NULL|     4000.0|
|  2023-01-20|  nacional|      500.0|
+------------+----------+-----------+

