In [0]:
# Código para reduzir a base ao invés de aumentar a base minoritária.
## Algumas funções não rodam no databrick community edition e para não deixar muito complexo vou simplesmente reduzir a base e depois rodar o modelo de clusterização
## Para o teste AB vou rodar os testes de hipótese normalmente com o ticket médio e frequência
## Para o modelo de clusterização vou rodar somente com 6 variáveis e com a base reduzida
# Pacotes
from pyspark.sql import functions as F

# Carregando em PySpark
# Seleção de colunas que vou utilizar no modelo. Já desempacotando utilizando o '*'
selected_cols = ['delivery_region','origin_platform','delivery_time_class','price_range','minimum_order_value_class','order_total_amount','is_target','customer_id','order_id'] 
df = spark.table("silver.orders_time_distinct_class").select(*selected_cols)


# Separando as classes
df_major = df.filter(F.col("is_target") == 'target')  # classe majoritária 
df_minor = df.filter(F.col("is_target") == 'control')  # classe minoritária

# Contagens atuais
count_major = df_major.count()
count_minor = df_minor.count()

# Calcule o fator de sobreamostragem para igualar a majoritária
sampling_ratio = count_minor / count_major

# Inserindo a classe minoritária com reposição
df_major_downsampled = df_major.orderBy(F.rand(seed=42)).limit(count_minor)

# Combinando o conjunto com a majoritária original
df_balanced = df_minor.union(df_major_downsampled)

# Verifique o balanceamento final
df_balanced.groupBy("is_target").count().show()
df_balanced.display()

# Salvando
spark.sql("DROP TABLE IF EXISTS gold.orders_model")
df_balanced.write.mode("overwrite").format("delta").saveAsTable("gold.orders_model")

In [0]:
# Código para utilizar o modelo K-Prototype para clusterizar a base

# Pacotes
from sklearn.preprocessing import LabelEncoder, StandardScaler
from kmodes.kprototypes import KPrototypes
import pandas as pd

# Lendo base
df = spark.table("gold.orders_model")
df_pd = df.toPandas()

# Retirando variáveis que não vou utilizar no modelo
var = df_pd[['customer_id','is_target','order_id']]
df_pd = df_pd.drop(columns=['customer_id','is_target','order_id'])

# Variáveis categóricas e categóricas numéricas
cat_col = ['delivery_region','origin_platform']
cat_col_num = ['delivery_time_class','price_range','minimum_order_value_class']
num_col = ['order_total_amount']

# Aplicação do labelEncoder
le_dict = {}
for col in cat_col:
  le = LabelEncoder()
  df_pd[col] = le.fit_transform(df_pd[col])
  le_dict[col] = le

# Padronizando a variável numérica
scaler = StandardScaler()
df_pd[num_col] = scaler.fit_transform(df_pd[num_col])

# Unir todas as variáveis
cat_colls = cat_col + cat_col_num
cat_indices = [df_pd.columns.get_loc(col) for col in cat_colls]

# Aplicação do modelo K-Prototype
X = df_pd.values
kproto = KPrototypes(n_clusters=3, init='Huang',n_init = 2 , verbose = 1, random_state=42) # Huang para variáveis numéricas e categóricas
clusters = kproto.fit_predict(X, categorical=cat_indices)

# Inserindo os clusters e voltando com os dados iniciais
df_pd['cluster'] = clusters
df_final = pd.concat([var.reset_index(drop=True), df_pd], axis = 1)

# Salvando
df_final_spark = spark.createDataFrame(df_final)
df_final_spark.write.mode("overwrite").format("delta").saveAsTable("gold.orders_model_cluster_result")

In [0]:
# Código para voltar com os dados do tipo que spark permite
import numpy as np

def convert_types_for_spark(df):
    for col, dtype in df.dtypes.items():
        if np.issubdtype(dtype, np.integer):
            # Converter inteiros menores para int32 ou int64
            if dtype in [np.int8, np.int16]:
                df[col] = df[col].astype(np.int32)
            else:
                df[col] = df[col].astype(np.int64)
        elif np.issubdtype(dtype, np.floating):
            # Converter floats menores para float64
            if dtype != np.float64:
                df[col] = df[col].astype(np.float64)
        elif dtype == 'object':
            # Pode deixar como está (strings)
            pass
        else:
            # Se tiver outros tipos, converter para string para evitar erros
            df[col] = df[col].astype(str)
    return df

# Usar assim
df_final = convert_types_for_spark(df_final)

df_final_spark = spark.createDataFrame(df_final)
df_final_spark.write.format("delta").mode("overwrite").saveAsTable("gold.orders_segmentados")


In [0]:
# Código para voltar com os dados de valores originais
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


# Lendo base
df = spark.table("gold.orders_segmentados")
df_order = spark.table("gold.orders_model")


# Seleção de colunas
colunas_segmentados = ['customer_id','order_id','origin_platform','price_range','minimum_order_value_class','delivery_time_class','delivery_region','is_target','cluster']
coluna_order = ['order_id','order_total_amount']

# Novo df
df_select = df.select(colunas_segmentados)
df_order_select = df_order.select(coluna_order)

# Gerando aliases
order = df_select.alias('order')
df_order = df_order_select.alias('df_order')


# Join
df_final = order \
    .join(df_order, col('order.order_id') == col('df_order.order_id'), 'left') \
    .select(
        col('order.*'),
        col('df_order.order_total_amount')
        )
    
# Salvando  
spark.sql("DROP TABLE IF EXISTS gold.orders_model_cluster_result")  
df_final.write.mode("overwrite").format("delta").saveAsTable("gold.orders_model_cluster_result")

In [0]:
# Código para gerar a base reduzido para a análise do teste AB
# Pacotes
from pyspark.sql import functions as F

# Carregando em PySpark
# Seleção de colunas que vou utilizar no modelo. Já desempacotando utilizando o '*'
selected_cols = ['delivery_region','origin_platform','delivery_time_class','price_range','minimum_order_value_class','order_total_amount','is_target','customer_id','order_id','order_created_at'] 
df = spark.table("silver.orders_time_distinct_class").select(*selected_cols)


# Separando as classes
df_major = df.filter(F.col("is_target") == 'target')  # classe majoritária 
df_minor = df.filter(F.col("is_target") == 'control')  # classe minoritária

# Contagens atuais
count_major = df_major.count()
count_minor = df_minor.count()

# Calcule o fator de sobreamostragem para igualar a majoritária
sampling_ratio = count_minor / count_major

# Inserindo a classe majoritária para a redução
df_major_downsampled = df_major.orderBy(F.rand(seed=42)).limit(count_minor)

# Combinando o conjunto com a minoritária original
df_balanced = df_minor.union(df_major_downsampled)

# Verifique o balanceamento final
#df_balanced.groupBy("is_target").count().show()
df_balanced.display()

# Salvando
df_balanced.write.mode("overwrite").format("delta").saveAsTable("gold.orders_model_abtest")

In [0]:
# Criar subsets para facilitar a utilização do ANOVA - Base com a variável platform

# Lendo a base
df = spark.table("gold.orders_model_abtest")

# Criação da view temporária
df.createOrReplaceTempView("orders_model_abtest")

# SQL
resultado_platform = spark.sql("""
  SELECT 
        customer_id,
        ROUND(SUM(order_total_amount), 2)                      AS vlr_order,
        ROUND(SUM(order_total_amount) / COUNT(order_id), 3)    AS ticket_medio,
        ROUND(COUNT(order_id) / COUNT(DISTINCT customer_id), 3)  AS frequencia,
        origin_platform,
        is_target
    FROM orders_model_abtest
    GROUP BY customer_id, origin_platform, is_target
""")

resultado_price_range = spark.sql("""
  SELECT 
        customer_id,
        ROUND(SUM(order_total_amount), 2)                      AS vlr_order,
        ROUND(SUM(order_total_amount) / COUNT(order_id), 3)    AS ticket_medio,
        ROUND(COUNT(order_id) / COUNT(DISTINCT customer_id), 3)  AS frequencia,
        price_range,
        is_target
    FROM orders_model_abtest
    GROUP BY customer_id, price_range, is_target
""")

resultado_minimum_value = spark.sql("""
  SELECT 
        customer_id,
        ROUND(SUM(order_total_amount), 2)                      AS vlr_order,
        ROUND(SUM(order_total_amount) / COUNT(order_id), 3)    AS ticket_medio,
        ROUND(COUNT(order_id) / COUNT(DISTINCT customer_id), 3)  AS frequencia,
        minimum_order_value_class,
        is_target
    FROM orders_model_abtest
    GROUP BY customer_id, minimum_order_value_class, is_target
""")

resultado_delivery_time = spark.sql("""
  SELECT 
        customer_id,
        ROUND(SUM(order_total_amount), 2)                      AS vlr_order,
        ROUND(SUM(order_total_amount) / COUNT(order_id), 3)    AS ticket_medio,
        ROUND(COUNT(order_id) / COUNT(DISTINCT customer_id), 3)  AS frequencia,
        delivery_time_class,
        is_target
    FROM orders_model_abtest
    GROUP BY customer_id, delivery_time_class, is_target
""")

resultado_region = spark.sql("""
  SELECT 
        customer_id,
        ROUND(SUM(order_total_amount), 2)                      AS vlr_order,
        ROUND(SUM(order_total_amount) / COUNT(order_id), 3)    AS ticket_medio,
        ROUND(COUNT(order_id) / COUNT(DISTINCT customer_id), 3)  AS frequencia,
        delivery_region,
        is_target
    FROM orders_model_abtest
    GROUP BY customer_id, delivery_region, is_target
""")

spark.sql("DROP TABLE IF EXISTS gold.abtest_platform")
spark.sql("DROP TABLE IF EXISTS gold.abtest_price_range")
spark.sql("DROP TABLE IF EXISTS gold.abtest_minimum_value")
spark.sql("DROP TABLE IF EXISTS gold.abtest_delivery_time")
spark.sql("DROP TABLE IF EXISTS gold.abtest_region")

# Salvando
resultado_platform.write.mode("overwrite").format("delta").saveAsTable("gold.abtest_platform")
resultado_price_range.write.mode("overwrite").format("delta").saveAsTable("gold.abtest_price_range")
resultado_minimum_value.write.mode("overwrite").format("delta").saveAsTable("gold.abtest_minimum_value")
resultado_delivery_time.write.mode("overwrite").format("delta").saveAsTable("gold.abtest_delivery_time")
resultado_region.write.mode("overwrite").format("delta").saveAsTable("gold.abtest_region")

In [0]:
# Criar subsets para facilitar a utilização do ANOVA - Base com a variável platform

# Lendo a base
df = spark.table("gold.orders_model_cluster_result")

# Criação da view temporária
df.createOrReplaceTempView("orders_model_cluster_result")

# SQL
resultado_cluster= spark.sql("""
  SELECT 
        customer_id,
        ROUND(SUM(order_total_amount), 2)                      AS vlr_order,
        ROUND(SUM(order_total_amount) / COUNT(order_id), 3)    AS ticket_medio,
        ROUND(COUNT(order_id) / COUNT(DISTINCT customer_id), 3)  AS frequencia,
        cluster,
        is_target
    FROM orders_model_cluster_result
    GROUP BY customer_id, is_target, cluster
""")


spark.sql("DROP TABLE IF EXISTS gold.abtest_cluster")

# Salvando
resultado_cluster.write.mode("overwrite").format("delta").saveAsTable("gold.abtest_cluster")

In [0]:
# Código para baixo seria para aumentar a base minoritária com SMOTE-NC ou com PySpark, mas decidi seguir com a redução tendo em vista que o método para 
# aumentar a base é simplesmente uma duplicação de dados.

In [0]:
# Código para criar uma base na camada Gold já com informações balanceadas e com variáveis tratadas para a criação do Cluster de clientes
## Para o balanceamento vou utilizar o SMOTE-NC para lidar com variáveis categóricas e numéricas
## Aplica o labelEncoder para transformar as variáveis do tipo string em numéricas inteiras
## Separo o que é variável categórica com numérica e aplico o modelo de balanceamento da variável 'target' com menor representação (aumentar a variável controle do is_target)
### Depois aplico o modelo K-Prototype para criar a clusterização de clientes baseada em 6 variáveis
### Serão 5 variáveis categóricas e 1 numérica. Vou criar 3 clusters
#### Como estou utilizando o community edition não consigo instalar algumas bibliotecas
#### Vou ter que utilizar funções nativas do PySpark para 'balancear' a base repitindo os valores da classe minoritária


# ===============
# CÓDIGO NÂO UTILIZADO
# ===============
# Pacotes
from imblearn.over_sampling import SMOTENC
from sklearn.preprocessing import LabelEncoder
import pandas as pd

# Carregando em PySpark e depois transformando em Python devido a possibilidade de utilizar SMOTENC, mesmo em python sendo processado em RAM
# Seleção de colunas que vou utilizar no modelo. Já desempacotando utilizando o '*'
selected_cols = ['delivery_region','origin_platform','delivery_time_class','price_range','minimum_order_value_class','order_total_amount','is_target'] 
df_spark = spark.table("silver.orders_time_distinct_class").select(*selected_cols)

# Transformando em Pandas
df = df_spark.toPandas()

# Separação de variáveis categóricas e numéricas
cat = ['delivery_region','origin_platform','delivery_time_class','price_range','minimum_order_value_class']
num = ['order_total_amount']
target = ['is_target']

# Variáveis categóricas que precisam passar pelo LabelEncoder
cat_string = ['delivery_region','origin_platform']

# Aplicação do LabelEncoder
le_dict = {}
for col in cat_string:
    le = LabelEncoder()
    df[col] = le.fit_transform(df[col])
    le_dict[col] = le

X = df[cat + num]
y = df[target]

# Criando o índice
cat_indices = [X.columns.get_loc(col) for col in cat]

# Aplicação do SMOTE-NC
# random_state = 42 para garantir a reprodutibilidade dos testes
smote = SMOTENC(categorical_features=cat_indices, random_state=42)
X_resampled, y_resampled = smote.fit_resample(X, y)

# Juntando tudo novamente
df_balanced = pd.concat([pd.DataFrame(X_resampled, columns=X.columns), pd.Series(y_resampled, name=target)], axis=1)


# Salvando
spark.sql("CREATE DATABASE IF NOT EXISTS gold")
#spark.sql("DROP TABLE IF EXISTS gold.modeled_orders")
df_balanced.write.mode("overwrite").format("delta").saveAsTable("gold.modeled_orders")

In [0]:
# ===============
# CÓDIGO NÂO UTILIZADO
# ===============
# Salvando uma base do modelo sem alguns campos para verificação de balanceamento
selected_cols = ['delivery_region','origin_platform','delivery_time_class','price_range','minimum_order_value_class','order_total_amount','is_target'] 
df_spark = spark.table("silver.orders_time_distinct_class").select(*selected_cols)

# Salvando
spark.sql("CREATE DATABASE IF NOT EXISTS gold")
#spark.sql("DROP TABLE IF EXISTS gold.modeled_orders")
df_spark.write.mode("overwrite").format("delta").saveAsTable("gold.orders_model")

In [0]:
# ===============
# CÓDIGO NÂO UTILIZADO
# ===============
# Pacotes
from pyspark.sql import functions as F

# Carregando em PySpark
# Seleção de colunas que vou utilizar no modelo. Já desempacotando utilizando o '*'
selected_cols = ['delivery_region','origin_platform','delivery_time_class','price_range','minimum_order_value_class','order_total_amount','is_target'] 
df = spark.table("silver.orders_time_distinct_class").select(*selected_cols)


# Separando as classes
df_major = df.filter(F.col("is_target") == 'target')  # classe majoritária (ajuste o valor conforme)
df_minor = df.filter(F.col("is_target") == 'control')  # classe minoritária

# Contagens atuais
count_major = df_major.count()
count_minor = df_minor.count()

# Calcule o fator de sobreamostragem para igualar a majoritária
sampling_ratio = count_major / count_minor

# Inserindo a classe minoritária com reposição
df_minor_upsampled = df_minor.sample(withReplacement=True, fraction=sampling_ratio, seed=42)

# Combinando o conjunto com a majoritária original
df_balanced = df_major.union(df_minor_upsampled)

# Verifique o balanceamento final
#df_balanced.groupBy("is_target").count().show()
#df_balanced.display()