Este notebook tem o intuito de construir um dataset estruturado para a modelagem

# Setting up

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.pandas import merge_asof
from pyspark.pandas import DataFrame as ps
from pyspark.sql import Window

from pathlib import Path

# current repo path 
repo_path = Path().resolve().parent

spark = SparkSession.builder.appName('Spark Demo').master('local[*]').getOrCreate()


your 131072x1 screen size is bogus. expect trouble
25/05/15 16:24:14 WARN Utils: Your hostname, George-Book3 resolves to a loopback address: 127.0.1.1; using 172.23.250.106 instead (on interface eth0)
25/05/15 16:24:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/15 16:24:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Carregando dados

In [2]:
transactions_full = spark.read.json((repo_path / 'data' / 'processed' / 'transactions_full').as_posix())

                                                                                

# Construção de target

Aqui temos algumas opções de modelos e targets que poderiam ser escolhidos. Para as targets, podemos pensar em:

 - uma oferta enviada, foi comprada? (binária)
 - uma oferta enviada e aberta, foi comprada? (binária)
 - qual o valor transacionado graças aquela oferta (regressão, pensando em aumentar o valor transacionado por oferta e não simplesmente o ato da transação)
 - uplift (impacto incremental) de enviar uma oferta vs não enviar. 

Para a parte de modelagem, podemos pensar em:

- Multi-Class: predizer qual oferta o cliente tem mais chance de converter
- Classificador (binário) por oferta: um modelo separado por oferta 
- Uplift model: um método mais robusto que diz qual cliente de fato precisa de uma oferta para transacionar. (library econml ajuda neste caso).
- Reinforcement learning/ multi armed bandits: para balancear o teste de novas ofertas vs "exploração" (usar a mais performática), resolvendo com thompson sampling.

Para fins de simplicidade e não me alongar muito no case, irei escolher a target como se uma oferta foi enviada, ela foi convertida?Para a modelagem, irei:

 - Criar um dataset de treino com as features de clientes, das oferta e transações passadas do cliente (até aquela oferta para não haver leakage)
 - Na hora da inferência, irei variar as informações (features) das ofertas para ver a chance daquele cliente converter aquela oferta, iterando sobre todas

Como a base é em um formato de eventos, primeiro devemos construir um dataset indicando se aquela offer pra aquele customer foi bem sucedido (target = 1) ou não (target = 0). Temos que ter o cuidado que a mesma offer pode ser enviada varias vezes em tempos distintos, ou seja, temos que mapear o tempo que ela foi enviada. A dimensão da base é cliente-oferta-tempo

In [3]:
df = (
    transactions_full.filter('event = "offer received"')
    .select("account_id", "offer_id", "time_since_test_start", "event")
    .distinct()
    .orderBy("time_since_test_start","account_id")
)

df.show()



+--------------------+--------------------+---------------------+--------------+
|          account_id|            offer_id|time_since_test_start|         event|
+--------------------+--------------------+---------------------+--------------+
|0011e0d4e6b944f99...|3f207df678b143eea...|                  0.0|offer received|
|0020c2b971eb4e918...|fafdcd668e3743c1b...|                  0.0|offer received|
|003d66b6608740288...|5a8bc65990b245e5a...|                  0.0|offer received|
|00426fe3ffde4c6b9...|5a8bc65990b245e5a...|                  0.0|offer received|
|005500a7188546ff8...|ae264e3637204a6fb...|                  0.0|offer received|
|0056df74b63b42988...|9b98b8c7a33c4b65b...|                  0.0|offer received|
|00715b6e55c3431cb...|ae264e3637204a6fb...|                  0.0|offer received|
|0082fd87c18f45f2b...|5a8bc65990b245e5a...|                  0.0|offer received|
|00840a2ca5d2408e9...|2906b810c7d441179...|                  0.0|offer received|
|00857b24b13f4fe0a...|4d5c57

                                                                                

Pegando a target: qual oferta foi bem sucedida

In [4]:
target1 = (
    transactions_full.filter('event = "offer completed"')
    .select("account_id", "offer_id", "time_since_test_start")
    .distinct()
    .withColumn("target", F.lit(1))
    .orderBy("time_since_test_start", "account_id")
)



In [5]:
dfpd = merge_asof(
    left=ps(df),
    right=ps(target1),
    on="time_since_test_start",
    by=["account_id", "offer_id"],
    direction="forward",
    allow_exact_matches=True
)

In [6]:
dfpd['target'] = dfpd['target'].fillna(0)
df = dfpd.to_spark()



In [7]:
transactions = spark.read.json((repo_path / 'data' / 'processed' / 'transactions').as_posix())

In [8]:
# Calculando total de ofertas enviadas anterioremente aos clientes
window = Window.partitionBy("account_id").orderBy("time_since_test_start")

total_past_offers = (
    transactions_full.filter('event = "offer received"')
    .withColumn(
        "num_past_offers",
        F.count("offer_id").over(window.rangeBetween(Window.unboundedPreceding, -1)),
    )
    .select("account_id", "offer_id", 'event', "time_since_test_start", "num_past_offers")
)

total_past_offers.show()

+--------------------+--------------------+--------------+---------------------+---------------+
|          account_id|            offer_id|         event|time_since_test_start|num_past_offers|
+--------------------+--------------------+--------------+---------------------+---------------+
|0020ccbbb6d84e358...|2298d6c36e964ae4a...|offer received|                  7.0|              0|
|0020ccbbb6d84e358...|f19421c1d4aa40978...|offer received|                 14.0|              1|
|0020ccbbb6d84e358...|5a8bc65990b245e5a...|offer received|                 17.0|              2|
|0020ccbbb6d84e358...|9b98b8c7a33c4b65b...|offer received|                 21.0|              3|
|00426fe3ffde4c6b9...|5a8bc65990b245e5a...|offer received|                  0.0|              0|
|00426fe3ffde4c6b9...|fafdcd668e3743c1b...|offer received|                  7.0|              1|
|00426fe3ffde4c6b9...|0b1e1539f2cc45b7b...|offer received|                 14.0|              2|
|00426fe3ffde4c6b9...|2906b810

In [9]:
# total de views anteriores da oferta
total_past_views = transactions.withColumn(
    "num_past_viewed",
    F.sum(F.when(F.col("event") == "offer viewed", 1).otherwise(0)).over(
        window.rangeBetween(Window.unboundedPreceding, -1)
    ),
).select('account_id', 'offer_id', 'time_since_test_start', 'event', 'num_past_viewed')
total_past_views.show()

+--------------------+--------------------+---------------------+---------------+---------------+
|          account_id|            offer_id|time_since_test_start|          event|num_past_viewed|
+--------------------+--------------------+---------------------+---------------+---------------+
|0020ccbbb6d84e358...|                NULL|                 1.75|    transaction|           NULL|
|0020ccbbb6d84e358...|2298d6c36e964ae4a...|                  7.0| offer received|              0|
|0020ccbbb6d84e358...|2298d6c36e964ae4a...|                  7.0|   offer viewed|              0|
|0020ccbbb6d84e358...|                NULL|                 9.25|    transaction|              1|
|0020ccbbb6d84e358...|2298d6c36e964ae4a...|                 9.25|offer completed|              1|
|0020ccbbb6d84e358...|                NULL|                 10.0|    transaction|              1|
|0020ccbbb6d84e358...|                NULL|                 11.5|    transaction|              1|
|0020ccbbb6d84e358..

In [10]:
df = df.join(
    total_past_offers,
    on=["account_id", "offer_id", "time_since_test_start", "event"],
    how="left",
).join(
    total_past_views,
    on=["account_id", "offer_id", "time_since_test_start", "event"],
    how="left",
)

In [11]:
# soma de amounts passados até aquela oferta
total_past_amount = transactions.withColumn(
    "total_past_amount",
    F.sum(F.col("amount")).over(window.rangeBetween(Window.unboundedPreceding, -1)),
).select(
    "account_id", "offer_id", "time_since_test_start", "event", "total_past_amount"
)

In [14]:
df = df.join(
    total_past_amount,
    on=["account_id", "offer_id", "time_since_test_start", "event"],
    how="left",
)

In [16]:
# soma de rewards passados até aquela oferta
total_past_reward = transactions.withColumn(
    "total_past_reward",
    F.sum(F.col("reward")).over(window.rangeBetween(Window.unboundedPreceding, -1)),
).select(
    "account_id", "offer_id", "time_since_test_start", "event", "total_past_reward"
)

In [18]:
df = df.join(
    total_past_reward,
    on=["account_id", "offer_id", "time_since_test_start", "event"],
    how="left",
)

In [20]:
df.filter('total_past_reward is not null').show()

                                                                                

+--------------------+--------------------+---------------------+--------------+------+---------------+---------------+------------------+-----------------+
|          account_id|            offer_id|time_since_test_start|         event|target|num_past_offers|num_past_viewed| total_past_amount|total_past_reward|
+--------------------+--------------------+---------------------+--------------+------+---------------+---------------+------------------+-----------------+
|0020c2b971eb4e918...|ae264e3637204a6fb...|                  7.0|offer received|     0|              1|              1|             98.33|              2.0|
|016871ea865d43389...|3f207df678b143eea...|                  7.0|offer received|     0|              1|              0|             14.53|              2.0|
|017febbe52e64ac19...|9b98b8c7a33c4b65b...|                  7.0|offer received|     1|              1|              1| 755.2800000000001|             10.0|
|01a5e8b57bc04e029...|f19421c1d4aa40978...|               

In [23]:
# cria feature se ja houve conversão na mesma oferta no passado
conversoes = df.filter("target = 1").select(
    "account_id",
    "offer_id",
    "time_since_test_start",
    F.lit(1).alias("past_offer_conversion"),
)

dfpd2 = merge_asof(
    left=ps(df),
    right=ps(conversoes),
    on="time_since_test_start",
    by=["account_id", "offer_id"],
    direction="backward",
    allow_exact_matches=False,
)

df = dfpd2.to_spark()



In [24]:
df.filter('total_past_reward is not null').show()

25/05/15 16:52:35 WARN AttachDistributedSequenceExec: clean up cached RDD(710) in AttachDistributedSequenceExec(27052)


+--------------------+--------------------+---------------------+--------------+------+---------------+---------------+------------------+-----------------+---------------------+
|          account_id|            offer_id|time_since_test_start|         event|target|num_past_offers|num_past_viewed| total_past_amount|total_past_reward|past_offer_conversion|
+--------------------+--------------------+---------------------+--------------+------+---------------+---------------+------------------+-----------------+---------------------+
|0020c2b971eb4e918...|ae264e3637204a6fb...|                  7.0|offer received|     0|              1|              1|             98.33|              2.0|                 NULL|
|016871ea865d43389...|3f207df678b143eea...|                  7.0|offer received|     0|              1|              0|             14.53|              2.0|                 NULL|
|017febbe52e64ac19...|9b98b8c7a33c4b65b...|                  7.0|offer received|     1|              1|  

In [30]:
# feature que diz quanto tempo passou desde a ultima oferta
df = df.withColumn(
    "time_since_last_offer",
    F.col("time_since_test_start") - F.lag("time_since_test_start").over(window)
)


In [31]:
df.show()

25/05/15 17:04:19 WARN AttachDistributedSequenceExec: clean up cached RDD(1441) in AttachDistributedSequenceExec(64198)


+--------------------+--------------------+---------------------+--------------+------+---------------+---------------+-----------------+-----------------+---------------------+---------------------+
|          account_id|            offer_id|time_since_test_start|         event|target|num_past_offers|num_past_viewed|total_past_amount|total_past_reward|past_offer_conversion|time_since_last_offer|
+--------------------+--------------------+---------------------+--------------+------+---------------+---------------+-----------------+-----------------+---------------------+---------------------+
|0020ccbbb6d84e358...|2298d6c36e964ae4a...|                  7.0|offer received|     1|              0|              0|            16.27|             NULL|                 NULL|                 NULL|
|0020ccbbb6d84e358...|f19421c1d4aa40978...|                 14.0|offer received|     1|              1|              1|            62.86|              3.0|                 NULL|                  7.0|


Adicionando agora features das offers e clientes

In [36]:
transactions_full_features = transactions_full.select(
    "account_id", # chaves de cruzamento
    "offer_id", # chaves de cruzamento
    "event", # chaves de cruzamento
    "time_since_test_start",  # chaves de cruzamento
    "age", # features de clientes
    "credit_card_limit", # features de clientes
    "gender", # features de clientes
    "registered_on",  # features de clientes
    "discount_value", # features das offers
    "channels", # features das offers
    "min_value", # features das offers
    "offer_type", # features das offers
    "duration",  # features das offers
)

In [38]:
df = df.join(
    transactions_full_features,
    on=["account_id", "offer_id", "event", "time_since_test_start"],
    how="left",
)
df.show()

25/05/15 17:19:02 WARN AttachDistributedSequenceExec: clean up cached RDD(1754) in AttachDistributedSequenceExec(80307)


+--------------------+--------------------+--------------+---------------------+------+---------------+---------------+------------------+-----------------+---------------------+---------------------+----+-----------------+------+-------------+--------------+--------------------+---------+-------------+--------+
|          account_id|            offer_id|         event|time_since_test_start|target|num_past_offers|num_past_viewed| total_past_amount|total_past_reward|past_offer_conversion|time_since_last_offer| age|credit_card_limit|gender|registered_on|discount_value|            channels|min_value|   offer_type|duration|
+--------------------+--------------------+--------------+---------------------+------+---------------+---------------+------------------+-----------------+---------------------+---------------------+----+-----------------+------+-------------+--------------+--------------------+---------+-------------+--------+
|00116118485d4dfda...|f19421c1d4aa40978...|offer received|

In [41]:
df.withColumn(
    "registered_on_seno",
    F.sin(F.dayofyear("registered_on") * 2 * F.lit(3.14159) / F.lit(365) )
).show()

+--------------------+--------------------+--------------+---------------------+------+---------------+---------------+------------------+-----------------+---------------------+---------------------+----+-----------------+------+-------------+--------------+--------------------+---------+-------------+--------+--------------------+
|          account_id|            offer_id|         event|time_since_test_start|target|num_past_offers|num_past_viewed| total_past_amount|total_past_reward|past_offer_conversion|time_since_last_offer| age|credit_card_limit|gender|registered_on|discount_value|            channels|min_value|   offer_type|duration|  registered_on_seno|
+--------------------+--------------------+--------------+---------------------+------+---------------+---------------+------------------+-----------------+---------------------+---------------------+----+-----------------+------+-------------+--------------+--------------------+---------+-------------+--------+-----------------

In [44]:
# Criando features cíclicas do registered_on e outras features de datas
df = (
    df.withColumn(
        "registered_on_seno",
        F.sin(F.dayofyear("registered_on") * 2 * F.lit(3.14159) / F.lit(365)),
    )
    .withColumn(
        "registered_on_cos",
        F.cos(F.dayofyear("registered_on") * 2 * F.lit(3.14159) / F.lit(365)),
    )
    .withColumn("year_registered", F.year(F.col("registered_on")))
    .withColumn("month_registered", F.month(F.col("registered_on")))
)

In [48]:
df.select('channels').distinct().show(100,False)

+----------------------------+
|channels                    |
+----------------------------+
|[web, email]                |
|[email, mobile, social]     |
|[web, email, mobile]        |
|[web, email, mobile, social]|
+----------------------------+



In [52]:
# criando features de canais

df = (
    df.withColumn("email", F.array_contains(F.col("channels"), "email"))
    .withColumn("web", F.array_contains(F.col("channels"), "web"))
    .withColumn("mobile", F.array_contains(F.col("channels"), "mobile"))
    .withColumn("social", F.array_contains(F.col("channels"), "social"))
    .withColumn("qtd_canais", F.array_size(F.col("channels")))
)

In [53]:
df.show()

+--------------------+--------------------+--------------+---------------------+------+---------------+---------------+------------------+-----------------+---------------------+---------------------+----+-----------------+------+-------------+--------------+--------------------+---------+-------------+--------+--------------------+--------------------+---------------+----------------+-----+-----+------+------+----------+
|          account_id|            offer_id|         event|time_since_test_start|target|num_past_offers|num_past_viewed| total_past_amount|total_past_reward|past_offer_conversion|time_since_last_offer| age|credit_card_limit|gender|registered_on|discount_value|            channels|min_value|   offer_type|duration|  registered_on_seno|   registered_on_cos|year_registered|month_registered|email|  web|mobile|social|qtd_canais|
+--------------------+--------------------+--------------+---------------------+------+---------------+---------------+------------------+----------