# Objetivo

A partir dos dados apresentados o objetivo deste notebook é apresentar o passo a passo da limpeza e manipulação dos dados. 
Para realizar a limpeza dos dados e ter maior compreensão destes análises pontuais foram realizadas.

# Datasets

**offers.jsons** - Dataset contém os IDs das ofertas e metadados de cada uma delas:

*  ```id``` [string]: ID da oferta

* ```offertype``` [string]: Tipo da oferta, i.e., BOGO, discount, informational

* ```minvalue``` [int]: Valor mínimo que precisa ser gasto para que a oferta seja ativada

* ```discountvalue``` [int]: Valor do desconto a ser aplicado, caso a oferta seja ativada

* ```duration``` [int]: Tempo durante o qual a oferta está disponível para o cliente agir, i.e., prazo para que o cliente utilize a oferta recebida


**customers.jsons** - Dataset contém informações sobre aproximadamente 17k clientes:

* ```id``` [string]: ID do cliente

* ```age``` [int]: Idade do cliente no momento da criação da conta

* ```registered_on``` [string]: Data em que o cliente criou a conta

* ```gender``` [string]: gênero do cliente (algumas entradas podem conter '0' para outras opções além de M ou F)

* ```credit_card_limit``` [float]: Limite do cartão de crédito do cliente registrado no momento da criação da conta.

**transactions.jsons** - Dataset contém informações sobre aproximadamente 300k eventos:

* ```account_id``` [string]: ID do cliente

* ```event``` [string]: Descrição do evento (transação, oferta recebida, oferta visualizada, oferta concluída)

* ```time_since_test_start``` [int]: Tempo passado desde o começo do teste, em dias. Os dados começam em t=0

* ```value ``` [json]: Pode registrar o offer_id de oferta, o desconto concedido (reward) ou o valor da transação, dependendo do tipo de evento
    * ```offer id``` [string]
    * ```offer_id``` [string]
    * ```amount``` [float]
    * ```reward``` [float]

**Importante:** Os descontos de uma oferta (key reward do campo value) são concedidos quando uma transação é realizada após a visualização de uma oferta. Nesse caso, o reward recebido será automaticamente aplicado à próxima transação. Logo, o valor da transação seguinte ao recebimento do desconto de uma oferta é o valor com o desconto já aplicado.

# Modelagem do dataset de transação

Dado o comentário:

*Os descontos de uma oferta (key reward do campo value) são concedidos quando uma transação é realizada após a visualização de uma oferta. Nesse caso, o reward recebido será automaticamente aplicado à próxima transação. Logo, o valor da transação seguinte ao recebimento do desconto de uma oferta é o valor com o desconto já aplicado.*

Podemos inferir que a ordem ideal que os eventos podem acontecer é: 

```offer_received``` >> ```offer_viewed``` >> ```transaction``` >> ```offer_completed```

Partindo dessa ordem ideal, tem-se as seguintes informações: 

- ```offer_received```: O cliente recebe a oferta, mas nsse ponto não há desconto aplicado. O ```reward``` ainda não foi conquistado, apenas a oferta foi disponibilizada ao cliente.

- ```offer_viewed```: Quando o cliente visualiza a oferta, cria-se a condição para que ele possa ganhar o ```reward```. Entretanto, após a visualização, o desconto ainda não está aplicado a nenhuma transação.

- ```transaction``` (após a oferta ter sido vista): Quando o cliente realiza uma transação depois de ter visto a oferta, esse ato faz com que o ```reward``` seja efetivamente “ganho”. Neste momento, a oferta é considerada pronta para ser concluída, mas o desconto ainda não é aplicado nesta transação atual. O ato de comprar após visualizar a oferta é o passo necessário para que o sistema marque essa oferta como utilizável.

- ```offer_completed```: Com a realização da transação subsequente à visualização, a oferta é finalizada, o ```reward``` é concedido ao cliente. Significa que, agora, o sistema disponibiliza o desconto para ser utilizado.

In [0]:
from datetime import datetime

import matplotlib.pyplot as plt
import numpy as np
import pyspark.pandas as ps
import seaborn as sns
from pyspark.pandas.config import set_option
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, when, lit, isnull, explode, struct

# Enable operations on different frames
set_option("compute.ops_on_diff_frames", True)

# 0 | Helper functions

In [0]:
def check_offer_ids(df: DataFrame) -> DataFrame:
    """
    Adds a new column 'offer_flag' to the DataFrame to check the condition between 'offer id' and 'offer_id'.
    The function is using pyspark dataframe sintax

    Args:
        df (DataFrame): PySpark DataFrame with columns 'offer id' and 'offer_id'.

    Returns:
        DataFrame: DataFrame with a new column 'offer_flag'.
    """
    # Create the logic for the 'offer_flag' column using when/otherwise
    df = df.withColumn(
        "offer_flag",
        when(
            isnull(col("offer_id_1")) & isnull(col("offer_id_2")), lit("flag_both_null")
        )
        .when(
            ~isnull(col("offer_id_1")) & ~isnull(col("offer_id_2")),
            when(
                col("offer_id_1") == col("offer_id_2"), lit("flag_both_not_null")
            ).otherwise(lit("flag_different")),
        )
        .when(~isnull(col("offer_id_1")), lit("flag_id_1_not_null"))
        .otherwise(lit("flag_id_2_not_null")),
    )

    return df

In [0]:
def associate_transactions_to_offers(
    transactions: ps.DataFrame, offers_duration: ps.DataFrame
) -> ps.DataFrame:
    """Associates transactions with offers based on customer activity and offer durations.

    This function merges transaction data with offer duration information, sorts the data,
    and associates transactions occurring within the influence window of each offer received.
    It processes each customer individually, ensuring proper mapping of transactions to 
    offers based on the time constraints of the offers.

    Args:
        transactions (ps.DataFrame): A PySpark pandas DataFrame containing transaction data 
            with columns:
            - "customer_id": The unique identifier for each customer.
            - "offer_id": The identifier for the offer, which may initially be null.
            - "event": The type of event (e.g., "offer received", "transaction").
            - "time_since_test_start": The timestamp of the event relative to the test start.
        
        offers_duration (ps.DataFrame): A PySpark pandas DataFrame containing offer duration data 
            with columns:
            - "offer_id": The identifier for the offer.
            - "duration": The duration of the offer's influence window.

    Returns:
        ps.DataFrame: A PySpark pandas DataFrame with the same structure as `transactions` but with 
        updated "offer_id" values for transactions that occurred within the influence window of 
        an offer.

    Steps:
        1. Merge `transactions` with `offers_duration` to include offer durations.
        2. Sort the merged DataFrame by "customer_id", "offer_id", and "time_since_test_start".
        3. Process each customer group:
           - Identify offers received and their influence windows.
           - Update "offer_id" for transactions falling within the influence window of each offer.
        4. Return the updated DataFrame with associated transactions.
    """
    
    # Step 1: Merge transactions with offers_duration
    transactions = transactions.merge(offers_duration, on="offer_id", how="left")
    
    # Step 2: Sort transactions
    transactions = transactions.sort_values(
        by=["customer_id", "offer_id", "time_since_test_start"]
    )

    # Step 3: Define a function to process each customer group
    def process_customer(customer_df):
        customer_df = customer_df.copy()
        received_offers = customer_df[customer_df["event"] == "offer received"]

        for _, offer in received_offers.iterrows():
            offer_id = offer["offer_id"]
            start_time = offer["time_since_test_start"]
            duration = offer["duration"]
            end_time = start_time + duration

            # Associate transactions within the influence window
            transactions_idx = customer_df[
                (customer_df["event"] == "transaction")
                & (customer_df["time_since_test_start"] >= start_time)
                & (customer_df["time_since_test_start"] <= end_time)
                & (customer_df["offer_id"].isna())
            ].index

            customer_df.loc[transactions_idx, "offer_id"] = offer_id

        return customer_df

    # Step 4: Apply the function to each group of customers
    transactions = (
        transactions.groupby("customer_id")
        .apply(process_customer)
        .reset_index(drop=True)
    )

    return transactions

In [0]:
def calculate_time(transactions: ps.DataFrame) -> ps.DataFrame:
    """
    Calculate the time to view and complete an offer using PySpark pandas.

    Args:
        transactions (ps.DataFrame): Transaction data with columns:
            - customer_id: Identifier for the customer.
            - offer_id: Identifier for the offer.
            - event: Type of event (e.g., "offer received", "offer viewed", "offer completed").
            - time_since_test_start: Time relative to the test start.

    Returns:
        ps.DataFrame: Processed data with calculated `time_to_view` and `time_to_complete`.
    """
    # Sort transactions by customer_id, offer_id, and time_since_test_start
    transactions = transactions.sort_values(
        by=["customer_id", "offer_id", "time_since_test_start"]
    )

    # Filter rows for "offer received"
    received_offers = transactions[transactions["event"] == "offer received"]

    # Define a function to calculate time_to_view and time_to_complete
    def calculate_times(row, transactions):
        offer_id = row["offer_id"]
        customer_id = row["customer_id"]
        start_time = row["time_since_test_start"]

        # Filter related events for the customer and after offer start time
        related_events = transactions[
            (transactions["customer_id"] == customer_id) &
            (transactions["time_since_test_start"] >= start_time)
        ]

        # Time to view
        viewed_event = related_events[
            (related_events["event"] == "offer viewed") &
            (related_events["offer_id"] == offer_id)
        ]
        if not viewed_event.empty:
            time_to_view = viewed_event.iloc[0]["time_since_test_start"] - start_time

            # Time to complete
            completed_event = related_events[
                (related_events["event"] == "offer completed") &
                (related_events["offer_id"] == offer_id)
            ]
            if not completed_event.empty:
                time_to_complete = (
                    completed_event.iloc[0]["time_since_test_start"]
                    - viewed_event.iloc[0]["time_since_test_start"]
                )
                # Ensure no negative time_to_complete
                if time_to_complete >= 0:
                    return time_to_view, time_to_complete
            return time_to_view, None

        # If no "offer viewed" event, set both times to None
        return None, None

    # Apply the function to calculate times for each received offer
    received_offers[["time_to_view", "time_to_complete"]] = received_offers.apply(
        lambda row: calculate_times(row, transactions),
        axis=1,
        result_type="expand"
    )

    # Return only the relevant columns
    return received_offers[["customer_id", "offer_id", "time_to_view", "time_to_complete"]]

# 1 | Dataset offers

In [0]:
file_location_offers = "/FileStore/tables/offers.json"

offers_raw = (
    spark.read.format("json")
    .option("inferSchema", "false")
    .option("header", "false")
    .option("sep", ",")
    .load(file_location_offers)
)

display(offers_raw)

In [0]:
# Get the dtypes of the DataFrame
dtypes = offers_raw.dtypes

In [0]:
# Get the shape of the DataFrame (number of rows and columns)
num_rows = offers_raw.count()
num_cols = len(offers_raw.columns)
print("DataFrame shape:", (num_rows, num_cols))

In [0]:
# To make easy the quick exploration of the data, the pyspark.DataFrame will be converted into pypark.pandas
offers_raw_df = offers_raw.toPandas()

In [0]:
# Quick stats
offers_raw_df.describe().round(4)

In [0]:
# Get unique channels
all_channels = offers_raw_df["channels"].explode().to_list()
unique_channels = set(all_channels)

print(f"Channels: {unique_channels}")

**Observações:**

- Dados não precisam de limpeza, visto que não há missings, outliers, etc

- ```channels```: Lista de strings, que conté os canais: 'web', 'social', 'mobile', 'email'

A partir da análise serão criadas variáveis dummies para ```channels``` e ```offer_type```

In [0]:
# Rename the column
offers_aux_df = ps.DataFrame(offers_raw_df.rename(columns={"id": "offer_id"}))

In [0]:
# Get dummies for offer type column
offer_type_dummies = ps.get_dummies(offers_aux_df["offer_type"])
offer_type_dummies

In [0]:
# Transform channels in dummies columns
for channel in unique_channels:
    offers_aux_df[channel] = offers_aux_df["channels"].apply(
        lambda x: 1 if channel in x else 0
    )

In [0]:
# Concat the channels dummies with the offers
offers_psdf = ps.concat(
    [offers_aux_df.drop(columns="channels"), offer_type_dummies], axis=1
)
offers_psdf.head()

In [0]:
# Transform pyspark.pandas dataframe to pyspark.dataframe to download the data
offers_pyspark_df = offers_aux_df.to_spark()
display(offers_pyspark_df)

# 2 | Dataset customers

In [0]:
file_location_customers = "/FileStore/tables/profile.json"

customers_raw = (
    spark.read.format("json")
    .option("inferSchema", "false")
    .option("header", "false")
    .option("sep", ",")
    .load(file_location_customers)
)

display(customers_raw)

In [0]:
# Get the dtypes of the dataframe
dtypes = customers_raw.dtypes
print(dtypes)

In [0]:
# Get the shape of the DataFrame (number of rows and columns)
num_rows = customers_raw.count()
num_cols = len(customers_raw.columns)
print("DataFrame shape:", (num_rows, num_cols))

In [0]:
# To make easy the quick exploration of the data, the pyspark.DataFrame will be converted into pypark.pandas
customers_raw_df = customers_raw.toPandas()

In [0]:
duplicated_ids = customers_raw_df.duplicated(subset=["id"]).sum()
print(f"Número de IDs duplicados: {duplicated_ids}")

In [0]:
# Quick stats
customers_raw_df.describe().round(4)

In [0]:
# Quick stats removing age = 118
customers_raw_df[customers_raw_df["age"] != 118].describe().round(4)

Removendo a idade igual a 118 anos, observa-se que a média e a mediana das variáveis ```age``` e ```credit_card_limit``` são próximas, o que indica que a distribuição dos dados é simétrica.

- ```age```: média = 54.39 e mediana = 55

- ```credit_card_limit```: média = 65405 e mediana = 64000

In [0]:
# Get the percentage of missings
missing_percent = customers_raw_df.isna().mean() * 100
missing_percent = missing_percent.round(4)
print(missing_percent)

In [0]:
# Get the percentage of clients with age equals to 118
age_118_percent = (
    customers_raw_df[customers_raw_df["age"] == 118]["age"].count()
    / len(customers_raw_df)
    * 100
)
age_118_percent = round(age_118_percent, 4)
print(f"Percentual de registros com idade igual a 118: {age_118_percent}%")

O que se observa é que filtrando ```age = 118``` e checando o percentual de missings na base, o percentual de missings se mantém. 

O que leva a hipótese de que os clientes com idade igual 118 tem os campos ```gender``` e ```credit_card_limit``` faltantes (únicos campos com missings na base)

In [0]:
# Customers with age equals to 118 and missings values
customers_noinfo_df = customers_raw_df[
    (customers_raw_df["age"] == 118)
    & (customers_raw_df["gender"].isna())
    & (customers_raw_df["credit_card_limit"].isna())
]

customers_noinfo_df.info()

In [0]:
# Customers with age equals to 118 and without missing values in gender and credit_card_limit
customers_age_not_118_df = customers_raw_df[
    (customers_raw_df["age"] != 118)
    & (customers_raw_df["gender"].notna())
    & (customers_raw_df["credit_card_limit"].notna())
]
customers_age_not_118_df.info()

In [0]:
# Percentage of clients per gender, withou NA data
customers_raw_df[(customers_raw_df["age"] != 118)]["gender"].value_counts(
    normalize=True
) * 100

In [0]:
fig, axes = plt.subplots(1, 2, figsize=(12, 5))

sns.boxplot(
    data=customers_raw_df[(customers_raw_df["age"] != 118)],
    x="age",
    y="gender",
    ax=axes[0],
)
axes[0].set_title("Age")
axes[0].set_xlabel("Age")
axes[0].set_ylabel("Gender")

sns.boxplot(
    data=customers_raw_df[(customers_raw_df["age"] != 118)],
    x="credit_card_limit",
    y="gender",
    ax=axes[1],
)
axes[1].set_title("Credit card limit")
axes[1].set_xlabel("Credit card limit")
axes[1].set_ylabel("Gender")

# Adjust layout
plt.tight_layout()
plt.show()

**Observações:**

- Removendo a idade igual a 118 anos, observa-se que a média e a mediana das variáveis ```age``` e ```credit_card_limit``` são próximas

- Para os clientes que a idade é igual a 118 anos, os campos  ```gender``` e ```credit_card_limit``` são faltantes

- O percentual de missings nos campos  ```gender``` e ```credit_card_limit``` é de 12.7941%, assim como o percentual de clientes com idade igual a 118 anos

A partir da análise serão realizadas as seguintes transformações nos dados:

- ```registered_on``` será convertido para datetime

- ```gender``` será transformado para o tipo inteiro. Os casos de missing serão atribuídos a categoria "Outros"

- Para os valores missings em ```gender``` e ```credit_card_limit``` a média será imputada. A média imputada será calculada sem os valores missings

- O campo  ```id``` será renomeado para ```customer_id```, afim de facilitar os futuros joins

- A partir de ```registered_on``` pode-se criar uma nova variável, o tempo (em dias) que o cliente se registrou

- ```flag_infos``` feature para indicar se os dados do cliente originalmente são missings ou a idade é igual a 118. O intuito desta feature é utilizar os dados posteriormente para a modelagem

Abaixo estão os passos de manipulação de ```customers.json```

In [0]:
# Add a column flag to indicate the customers with age equal to 118
customers_raw_df["flag_info"] = np.where(customers_raw_df["age"] != 118, 1, 0)

# Quick stats
customers_raw_df[["flag_info", "id", "age"]].groupby("flag_info").agg(
    {"id": "count", "age": "mean"}
).reset_index()

In [0]:
mean_age = customers_raw_df[customers_raw_df["age"] != 118]["age"].mean()
mean_credit_limit = customers_raw_df[customers_raw_df["age"] != 118][
    "credit_card_limit"
].mean()

In [0]:
customers_df = customers_raw_df.rename(columns={"id": "customer_id"})

In [0]:
# Convert registered_on to datetime
customers_df["registered_on"] = ps.to_datetime(
    customers_df["registered_on"], format="%Y%m%d", errors="coerce"
)

In [0]:
# Convert 'gender' to numeric type with default for missing/invalid values
gender_mapping = {"F": 0, "M": 1, "O": 2, None: 2}
customers_df["gender"] = (
    customers_df["gender"].map(gender_mapping).fillna(2).astype(int)
)

In [0]:
# Replace NaN in credit_card_limit with the mean of credit card limit
customers_df["credit_card_limit"] = customers_df["credit_card_limit"].fillna(
    mean_credit_limit
)

In [0]:
# Add new feature: number of days have been a customer
current_date = datetime.now()
customers_df["days_as_customer"] = (
    current_date - customers_df["registered_on"]
).dt.days

In [0]:
# Display the data to download
customers_psdf = ps.DataFrame(customers_df)
customers_pyspark_df = customers_df
display(customers_pyspark_df)

# 3 | Dataset transactions

In [0]:
file_location_transactions = "/FileStore/tables/transactions.json"

transactions_raw = (
    spark.read.format("json")
    .option("inferSchema", "false")
    .option("header", "false")
    .option("sep", ",")
    .load(file_location_transactions)
)

display(transactions_raw)

In [0]:
# Get the dtypes of the dataframe
transactions_raw.dtypes

In [0]:
# Get the shape of the DataFrame (number of rows and columns)
num_rows = transactions_raw.count()
num_cols = len(transactions_raw.columns)
print("DataFrame shape:", (num_rows, num_cols))

In [0]:
# Check the unique keys in value
set(transactions_raw.schema["value"].dataType.fieldNames())

In [0]:
# Extract the fields from the 'value' struct as separate columns
value_details = transactions_raw.select(
    col("value.amount").alias("amount"),
    col("value.offer id").alias("offer_id_1"),
    col("value.offer_id").alias("offer_id_2"),
    col("value.reward").alias("reward"),
)

# Show the resulting DataFrame
display(value_details)

No dataset acima observa-se dois campos que referenciam ao ID da oferta, ```offer id``` e ```offer_id```. O que leva a hipótese de que esses campos são iguais, mas podem ter sido extraídos de maneira inadequado. 

Para checar se eles são iguais será verficado se essas features são mutuamente exclusivas, considerando o seguinte racional:

- Ambas as features são nulas

- Uma das features não é nula

- Ambas as features não são nulas e diferentes

O último ponto é o mais crítico, pois indica que uma observação contém IDs de ofertas distintos, o que irá levar ao descarte da observação. Abaixo é realizada a análise para verificar a hipótese.

In [0]:
# Prepare data for concatenation 
value_details_psdf = ps.DataFrame(value_details)
transactions_raw_psdf = ps.DataFrame(transactions_raw)
transactions_raw_psdf = transactions_raw_psdf.drop(columns=["value"])

In [0]:
# Get the expanded dataset of transaction to have only one dataset to clean
transactions_value_details_psdf = ps.concat(
    [transactions_raw_psdf, value_details_psdf], axis=1
)
transactions_value_details_psdf.head()

In [0]:
# Get the shape
transactions_value_details_psdf.shape

In [0]:
# Check the offer id and offer_id are equal or not
transactions_value_details_pyspark_df = transactions_value_details_psdf.to_spark()
transactions_value_details_pyspark_df = check_offer_ids(
    transactions_value_details_pyspark_df
)

In [0]:
# Check the offer id and offer_id are equal or not
transactions_value_details_psdf = ps.DataFrame(transactions_value_details_pyspark_df)
grouped_counts = (
    transactions_value_details_psdf[["account_id", "offer_flag"]]
    .groupby(by="offer_flag")
    .count()
)
grouped_counts

A partir da tabela observa-se que nenhuma observação contem IDs de ofertas distintos. Dado isso pode-se combinar os campos, de forma a criar um único ```offer_id```. Para criar essa nova feature as features serão combinadas, i.e.:

- Quando ambas as features estiverem nulas o seu valor se mantém nulo

- Quando uma das features for não nula esse valor será atribuído a nova feature ```offer_id```

In [0]:
# Create the column with correct offer_id
transactions_value_details_psdf[
    "combined_offer_id"
] = transactions_value_details_psdf.apply(
    lambda row: row["offer_id_1"] if ps.notna(row["offer_id_1"]) else row["offer_id_2"],
    axis=1,
)
transactions_value_details_psdf.head(3)

In [0]:
# Check the percentage of missings
transactions_value_details_psdf.isna().mean() * 100

In [0]:
# Transform data to pypark dataframe to download
transactions_value_details_pyspark_df = transactions_value_details_psdf.to_spark()
display(transactions_value_details_pyspark_df)

Observa-se que o número de missings na feature ```combined_offer_id``` se mantém alto. Antes de investigar a possível causa, algumas variáveis serão descartadas (pois não são mais necessárias) e outras serão renomeadas.

In [0]:
# Drop unnecessary columns
transactions_value_details_psdf = transactions_value_details_psdf.drop(
    columns={"offer_id_1", "offer_id_2", "offer_flag"}
)

In [0]:
# Rename columns
transactions_value_details_psdf.rename(
    columns={"combined_offer_id": "offer_id", "account_id": "customer_id"}, inplace=True
)

In [0]:
# Percentage of events
transactions_value_details_psdf["event"].value_counts(normalize=True) * 100

Após os passos anteriores de manipulação, serão visualizados os dados de alguns clientes. O objetivo é compreender o que leva o ```offer_id``` ter um alto percentual de missings.

In [0]:
# Customer "78afa995795e4d85b5d9ceeca43f5fef"
transactions_value_details_psdf[
    transactions_value_details_psdf["customer_id"] == "78afa995795e4d85b5d9ceeca43f5fef"
].sort_values(by=["offer_id", "time_since_test_start"])

Para o cliente ```78afa995795e4d85b5d9ceeca43f5fef``` observa-se que a feature ```offer_id``` é nula quando o evento é de transação. E quando o ID da oferta não é nulo a feature ```amount``` é.

In [0]:
# Customer "a03223e636434f42ac4c3df47e8bac43"
transactions_value_details_psdf[
    transactions_value_details_psdf["customer_id"] == "a03223e636434f42ac4c3df47e8bac43"
].sort_values(by=["offer_id", "time_since_test_start"])

Para o cliente ```a03223e636434f42ac4c3df47e8bac43``` observa-se que a feature ```offer_id``` é nula quando o event é de transação. E quando o ID da oferta não é nulo a feature ```amount``` é.

Observa-se também que o cliente recebeu a oferta ```0b1e1539f2cc45b7b9fa7c272da2e1d7``` mais de uma vez (a feature ```time_since_test_start``` indica o tempo que o cliente recebeu, visualizou, etc a oferta). Dado isso pode-se inferir que um cliente pode ser impactado por uma oferta mais de uma vez e receber mais de uma oferta ao menos tempo.


In [0]:
# Customer "e2127556f4f64592b11af22de27a7932"
transactions_value_details_psdf[
    transactions_value_details_psdf["customer_id"] == "e2127556f4f64592b11af22de27a7932"
].sort_values(by=["offer_id", "time_since_test_start"])

Para o cliente ```e2127556f4f64592b11af22de27a7932``` observa-se que a feature ```offer_id``` é nula quando o event é de transação. E quando o ID da oferta não é nulo a feature ```amount``` é.

A análise dos dados de 3 clientes leva a hipótese de que os eventos de transação não estão diretamente associados a uma oferta. Além disso a partir da análise as features ```amount``` pode ser definida como o valor gasto em uma transação.

Abaixo será checada a hipótese de que os eventos de transação não estão associados a uma oferta.

In [0]:
transactions_value_details_psdf["event"].value_counts()

In [0]:
event_transactions = transactions_value_details_psdf[
    transactions_value_details_psdf["event"] == "transaction"
]

associated_with_offer = event_transactions[event_transactions["offer_id"].notna()]
not_associated_with_offer = event_transactions[event_transactions["offer_id"].isna()]

print(f"Transações associadas a ofertas: {len(associated_with_offer)}")
print(f"Transações não associadas a ofertas: {len(not_associated_with_offer)}")

Observa-se que o total de eventos de transação é de 138953 e o total de eventos de transações com o ID da oferta nulo é de 138953. O que corrobora a hipótese de que os dados relativos a transações não estão diretamente associados a uma oferta.

Dado isso será necessário desenvolver uma proxy para associar os dados de transações aos IDs de ofertas. Para tal serão consideradas as seguintes features:

- `time_since_test_start`: Do dataset `transactions`, indica quando cada evento ocorreu em relação ao início do teste

- `duration`: Do dataset `offers`, indica o tempo de duração de uma oferta, i.e., o tempo que o cliente tem para utilizar a oferta

Vale ressaltar que a definição dada a `duration` ocorre porque uma oferta deve ter um tempo de vida útil, i.e., uma oferta tem um prazo de validade.

Para associar uma transação a uma oferta, considera-se uma janela de tempo que define o período em que a oferta pode ter influenciado a transação. As regras são:

- Início do período de influência:

    - Quando uma oferta é recebida (event = "offer_received"), o tempo de início é: 
      start_time = time_since_test_start

-	Fim do período de influência:

    - O prazo de validade da oferta (duration) é contado a partir de start_time: 
      end_time = time_since_test_start + duration

- Associação de transações:

    - Se uma transação (event = "transaction") ocorrer dentro do período de influência da oferta (start_time <= time_since_test_start <= end_time), a transação será associada ao ID da oferta.

Esse processo é repetido para cada ID de oferta, considerando o período de duração específico de cada uma.

Resumo: Para cada ID de oferta e eventos do cliente, verifica-se se a transação ocorreu no período de influência da oferta.

In [0]:
# Quick stats
offers_psdf["duration"].describe()

In [0]:
# Quick stats
transactions_value_details_psdf["time_since_test_start"].describe()

In [0]:
transactions_value_details_psdf.head()

In [0]:
# Prepare data to apply function
transactions_value_details_psdf = transactions_value_details_psdf.sort_values(by=["customer_id"])
offers_duration_psdf = offers_psdf[["offer_id", "duration"]]

In [0]:
# Apply the proxy
transactions_with_offers_psdf = associate_transactions_to_offers(
    transactions_value_details_psdf, offers_duration_psdf
)

In [0]:
transactions_with_offers_psdf.head()

In [0]:
transactions_with_offers_pyspark_df = transactions_with_offers_psdf.to_spark()
display(transactions_with_offers_pyspark_df)

Após realizar a associação dos eventos de transação com uma oferta, será realizado o processo de feature engineering. 

As seguintes features serão criadas:

- ```time_to_view```: tempo que o cliente leva para visualizar uma oferta, i.e., tempo entre receber uma oferta e visualizá-la

- ```time_to_open```: tempo que o cliente leva para completar uma oferta, i.e., tempo entre visualizar uma oferta e completá-la

- ```event```: serão criadas variáveis dummies para essa feature

In [0]:
transactions_with_offers_to_time_psdf = transactions_with_offers_psdf.copy()
transactions_with_offers_to_time_psdf.event.value_counts()

In [0]:
transactions_with_offers_to_time_psdf.head()

In [0]:
# Apply function to get times
transactions_time_psdf = calculate_time(transactions_with_offers_to_time_psdf)
transactions_time_psdf.head()

In [0]:

# Transform data to pypark dataframe to download
transactions_time_pyspark_df = transactions_time_psdf.to_spark()
display(transactions_time_pyspark_df)

In [0]:
transactions_with_offers_psdf.head(3)

In [0]:
# Get dummies for events
unique_events = ps.get_dummies(transactions_with_offers_psdf["event"])
unique_events.head()


In [0]:
transactions_with_dummies_psdf = ps.concat([transactions_with_offers_psdf, unique_events])
transactions_with_dummies_psdf.head()

In [0]:
# Get transactions with associated offers to transactions and the time features
transactions_processed_psdf = transactions_with_dummies_psdf.merge(
    transactions_time_psdf, on=["customer_id", "offer_id"], how="inner"
)
transactions_processed_psdf.head()

In [0]:
transactions_processed_psdf = transactions_processed_psdf.rename(columns={
    "event_offer completed": "event_offer_completed",
    "event_offer received": "event_offer_received",
    "event_offer viewed": "event_offer_viewed",
})
transactions_processed_psdf.head(3)

In [0]:
# Transform data to pypark dataframe to download
transactions_processed_pyspark_df = transactions_processed_psdf.to_spark()
display(transactions_processed_pyspark_df)

# 4 | Merge datasets

Dois datasets serão criados:

- ```customers_offers_agg```: informações agregadas a nível de  ```offers``` e ```customers```

- ```customers_agg```: informações agregadas a nível de ```customers```

In [0]:
customers_transactions_psdf = customers_psdf.merge(
    transactions_processed_psdf, on=["customer_id"], how="left"
)
customers_transactions_psdf.head()

In [0]:
customers_transactions_offers_psdf = customers_transactions_psdf.merge(
    offers_psdf.drop(columns="duration"), on="offer_id", how="left"
)

Da tabela acima observa-se que o cliente pode receber uma mesma oferta mais de uma vez.

In [0]:
# Fill NA with zeros
customers_transactions_offers_psdf["time_to_view"] = customers_transactions_offers_psdf[
    "time_to_view"
].fillna(0)
customers_transactions_offers_psdf["time_to_complete"] = customers_transactions_offers_psdf[
    "time_to_complete"
].fillna(0)

In [0]:
customers_transactions_offers_psdf.head()

Sumarizando os dados para cada cliente

In [0]:
customer_offers_agg_psdf = (
    customers_transactions_offers_psdf.groupby(["customer_id", "offer_id"])
    .agg(
        total_amount=("amount", "sum"),
        total_reward=("reward", "sum"),
        total_offer_completed=("event_offer_completed", "sum"),
        total_offer_received=("event_offer_received", "sum"),
        total_offer_viewed=("event_offer_viewed", "sum"),
        total_transaction=("event_transaction", "sum"),
        total_social=("social", "sum"),
        total_mobile=("mobile", "sum"),
        total_email=("email", "sum"),
        total_web=("web", "sum"),
        total_bogo=("bogo", "sum"),
        total_discount=("discount", "sum"),
        total_informational=("informational", "sum"),
        agv_time_to_view=("time_to_view", "mean"),
        avg_time_to_complete=("time_to_complete", "mean"),
        credit_card_limit=("credit_card_limit", "mean"),
        days_as_customer=("days_as_customer", "mean"),
        avg_amount=("amount", "mean"),
        min_value=("min_value", "mean")
    )
    .reset_index()
    .sort_values(by=["customer_id", "offer_id"])
)

In [0]:
customer_offers_agg_psdf["viewed_ratio"] = (
    customer_offers_agg_psdf["total_offer_viewed"]
    / customer_offers_agg_psdf["total_offer_received"]
)
customer_offers_agg_psdf["completed_ratio"] = (
    customer_offers_agg_psdf["total_offer_completed"]
    / customer_offers_agg_psdf["total_offer_viewed"]
)
customer_offers_agg_psdf["completed_ratio"] = customer_offers_agg_psdf[
    "completed_ratio"
].replace(np.inf, 0)
customer_offers_agg_psdf.head()

In [0]:
customers_gender = customers_transactions_offers_psdf[["customer_id", "gender", "flag_info"]]
customers_gender.drop_duplicates(inplace=True)

In [0]:
customer_offers_agg_psdf_final = customer_offers_agg_psdf.merge(
    customers_gender,
    on="customer_id",
    how="inner",
)
customer_offers_agg_psdf_final.info()

In [0]:
customer_offers_agg_psdf_final.head(3)

In [0]:
# Transform data to pypark dataframe to download
customer_offers_agg_pyspark_final = customer_offers_agg_psdf_final.to_spark()
display(customer_offers_agg_pyspark_final)

Sumarizando os dados a nível de cliente

In [0]:
customer_agg_psdf = (
    customer_offers_agg_psdf_final.groupby(by=["customer_id"])
    .agg(
        total_amount=("total_amount", "sum"),
        total_reward=("total_reward", "sum"),
        total_offer_completed=("total_offer_completed", "sum"),
        total_offer_received=("total_offer_received", "sum"),
        total_offer_viewed=("total_offer_viewed", "sum"),
        total_transaction=("total_transaction", "sum"),
        total_social=("total_social", "sum"),
        total_mobile=("total_mobile", "sum"),
        total_email=("total_email", "sum"),
        total_web=("total_web", "sum"),
        total_bogo=("total_bogo", "sum"),
        total_discount=("total_discount", "sum"),
        total_informational=("total_informational", "sum"),
        agv_time_to_view=("agv_time_to_view", "mean"),
        avg_time_to_complete=("avg_time_to_complete", "mean"),
        credit_card_limit=("credit_card_limit", "mean"),
        days_as_customer=("days_as_customer", "mean"),
        avg_amount=("avg_amount", "mean"),
        min_value=("min_value", "mean")
    )
    .reset_index()
    .sort_values(by="customer_id")
)
customer_agg_psdf.head()

In [0]:
customer_agg_psdf_final = customer_agg_psdf.merge(
    customers_gender,
    on="customer_id",
    how="inner",
)
customer_agg_psdf_final.info()

In [0]:
customer_agg_psdf_final.sort_values(by="customer_id").head()

In [0]:
# Transform data to pypark dataframe to download
customer_agg_pyspark_final = customer_agg_psdf_final.to_spark()
display(customer_agg_pyspark_final)