# Montagem do dataset de concentração de impressões e cliques na lista

Início da sessão spark:

In [37]:
import pyspark.sql.functions as F

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Leitura da tabela de performance no datalake do mês de fevereiro de 2021, usaremos estes dados como base da análise:

In [43]:
performance = spark.read.parquet('s3://datalake.icarros/raw/performance/ano=2021/mes=01/')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [57]:
performance.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+-----------+------------------+--------+--------+---------+-----+-----+--------------------+-----------+--------+------------+-----------------------+---------------+-----------+--------------------+------+------------------+-----------+-------------+----------------+--------------------+---------------+-------+-----------------+-------------------+--------------+--------------------+------------+--------------+--------------------+-----------------------+--------------------+-----------------+-------------+--------+------------+-----------------+------+---------+----------+----------+-------------+-----+----------+----+-----------------+--------------------+---------------+-------+-------------+---------------+------+-------+-----------+---------+--------------------+----------+-----+----+--------------------+---+
|           dateevent|           conversao|conversions|valorParcelaMaxima|parceiro|mesmoddd|   visita|midia|leads|                

Reformulação da tabela para o que vamos utilizar:
> A partir de "orderBy" as transformações são para criar a coluna "lista", onde apresenta um array de id de anuncio na ordem em que foi apresentado ao usuário na navegação.

In [44]:
from pyspark.sql.window import Window

windowanuncios = Window.partitionBy('visita').orderBy('len_anuncios')

df = performance \
    .select(
        (F.to_timestamp('dateevent', "yyyy-MM-dd'T'HH:mm:ssX") - F.expr("INTERVAL 3 HOURS")).alias('timestamp'),
        (F.split('conversao', u'\.'))[2].alias('tag'),
        'visita',
        'interfaceOrigem',
        'anuncios',
        'anuncio',
        'finalOrder',
        F.when(F.size('anuncios') == -1, 0).otherwise(F.size('anuncios')).alias('len_anuncios')
    ) \
    .orderBy(F.col('visita').asc(), F.col('timestamp').asc(), F.col('len_anuncios').asc()) \
    .withColumn('lag', F.lag('len_anuncios', 1).over(windowanuncios)) \
    .withColumn('start', F.col('lag') + 1) \
    .withColumn('len_lista', F.col('len_anuncios') - F.col('lag')) \
    .withColumn('lista', F.when(F.col('tag') == 'busca', F.expr('slice(anuncios, start, len_lista)'))) \
    .drop('lag', 'start')

df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- timestamp: timestamp (nullable = true)
 |-- tag: string (nullable = true)
 |-- visita: long (nullable = true)
 |-- interfaceOrigem: integer (nullable = true)
 |-- anuncios: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- anuncio: integer (nullable = true)
 |-- finalOrder: integer (nullable = true)
 |-- len_anuncios: integer (nullable = false)
 |-- len_lista: integer (nullable = true)
 |-- lista: array (nullable = true)
 |    |-- element: integer (containsNull = true)

In [45]:
df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+-----+---------+---------------+--------------------+-------+----------+------------+---------+-----+
|          timestamp|  tag|   visita|interfaceOrigem|            anuncios|anuncio|finalOrder|len_anuncios|len_lista|lista|
+-------------------+-----+---------+---------------+--------------------+-------+----------+------------+---------+-----+
|2021-01-01 00:03:07|busca|447804729|              1|[31577260, 318229...|   null|        14|           5|     null| null|
|2021-01-01 00:03:36|busca|447807393|              0|[31867588, 318147...|   null|         3|          16|     null| null|
|2021-01-01 00:04:22|busca|447807926|              0|[31766251, 313972...|   null|         3|          20|     null| null|
|2021-01-01 00:04:45|busca|447808617|              0|[31891416, 318913...|   null|         3|          10|     null| null|
|2021-01-01 00:06:25|busca|447808920|              1|[31950197, 319501...|   null|        14|           3|     null| null|
+---------------

Transformação para um dataframe onde busca qual foi o id do anúncio em cada posição da lista:

In [46]:
posicoes = df \
    .where(F.col('tag') == 'busca') \
    .select(
        F.to_date('timestamp').alias('data'),
        'visita',
        'finalOrder',
        'interfaceOrigem',
        F.col('lista').getItem(0).alias('01'),
        F.col('lista').getItem(1).alias('02'),
        F.col('lista').getItem(2).alias('03'),
        F.col('lista').getItem(3).alias('04'),
        F.col('lista').getItem(4).alias('05'),
        F.col('lista').getItem(5).alias('06'),
        F.col('lista').getItem(6).alias('07'),
        F.col('lista').getItem(7).alias('08'),
        F.col('lista').getItem(8).alias('09'),
        F.col('lista').getItem(9).alias('10'),
        F.col('lista').getItem(10).alias('11'),
        F.col('lista').getItem(11).alias('12'),
        F.col('lista').getItem(12).alias('13'),
        F.col('lista').getItem(13).alias('14'),
        F.col('lista').getItem(14).alias('15'),
        F.col('lista').getItem(15).alias('16'),
        F.col('lista').getItem(16).alias('17'),
        F.col('lista').getItem(17).alias('18'),
        F.col('lista').getItem(18).alias('19'),
        F.col('lista').getItem(19).alias('20')
    )

posicoes.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- data: date (nullable = true)
 |-- visita: long (nullable = true)
 |-- finalOrder: integer (nullable = true)
 |-- interfaceOrigem: integer (nullable = true)
 |-- 01: integer (nullable = true)
 |-- 02: integer (nullable = true)
 |-- 03: integer (nullable = true)
 |-- 04: integer (nullable = true)
 |-- 05: integer (nullable = true)
 |-- 06: integer (nullable = true)
 |-- 07: integer (nullable = true)
 |-- 08: integer (nullable = true)
 |-- 09: integer (nullable = true)
 |-- 10: integer (nullable = true)
 |-- 11: integer (nullable = true)
 |-- 12: integer (nullable = true)
 |-- 13: integer (nullable = true)
 |-- 14: integer (nullable = true)
 |-- 15: integer (nullable = true)
 |-- 16: integer (nullable = true)
 |-- 17: integer (nullable = true)
 |-- 18: integer (nullable = true)
 |-- 19: integer (nullable = true)
 |-- 20: integer (nullable = true)

In [47]:
posicoes.show(5, False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+---------+----------+---------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|data      |visita   |finalOrder|interfaceOrigem|01  |02  |03  |04  |05  |06  |07  |08  |09  |10  |11  |12  |13  |14  |15  |16  |17  |18  |19  |20  |
+----------+---------+----------+---------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|2021-01-01|447804729|14        |1              |null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|
|2021-01-01|447807393|3         |0              |null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|
|2021-01-01|447807926|3         |0              |null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|
|2021-01-01|447808617|3         |0              |null|null|null|null|null|null|null|null|null|null|n

Um segundo dataframe onde cria uma coluna que é uma array de id de anúncios em que houve a visualização (página detalhe) por visita:

In [48]:
detalhes = df \
    .where(F.col('tag') == 'detalhe') \
    .groupby('visita') \
    .agg(
        F.collect_list('anuncio').alias('detalhe_lista')
    )

detalhes.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- visita: long (nullable = true)
 |-- detalhe_lista: array (nullable = true)
 |    |-- element: integer (containsNull = true)

In [49]:
detalhes.show(5, False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+--------------------------------------------------+
|visita   |detalhe_lista                                     |
+---------+--------------------------------------------------+
|447810896|[31616208]                                        |
|447815206|[31613493, 30508066]                              |
|447817303|[31232002, 30919106, 31814249, 31233153, 31954663]|
|447817668|[31726748]                                        |
|447818326|[31829832]                                        |
+---------+--------------------------------------------------+
only showing top 5 rows

Definição de uma função que irá passar as colunas de posição para linhas e a usando no dataframe `posicoes`:

In [50]:
from pyspark.sql import DataFrame
from typing import Iterable

def melt(
        df: DataFrame, 
        id_vars: Iterable[str], value_vars: Iterable[str], 
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""

    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = F.array(*(
        F.struct(F.lit(c).alias(var_name), F.col(c).alias(value_name)) 
        for c in value_vars))

    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", F.explode(_vars_and_vals))

    cols = id_vars + [
            F.col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

posicoes_melted = melt(posicoes, id_vars=['data', 'visita', 'finalOrder', 'interfaceOrigem'], value_vars=['01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20'])

posicoes_melted.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- data: date (nullable = true)
 |-- visita: long (nullable = true)
 |-- finalOrder: integer (nullable = true)
 |-- interfaceOrigem: integer (nullable = true)
 |-- variable: string (nullable = false)
 |-- value: integer (nullable = true)

In [51]:
posicoes_melted.where(F.col('value').isNotNull()).show(5, False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+---------+----------+---------------+--------+--------+
|data      |visita   |finalOrder|interfaceOrigem|variable|value   |
+----------+---------+----------+---------------+--------+--------+
|2021-01-01|447817303|4         |0              |01      |31527535|
|2021-01-01|447817303|4         |0              |02      |30999100|
|2021-01-01|447817303|4         |0              |03      |31871271|
|2021-01-01|447817303|4         |0              |04      |31588874|
|2021-01-01|447817303|4         |0              |05      |31769255|
+----------+---------+----------+---------------+--------+--------+
only showing top 5 rows

Criação de um dataframe onde tem como base o dataframe `posicoes_melted` join com o dataframe `detalhes` e a criação de uma nova coluna chama `houve_detalhe`, onde assume os valores de 0, para quando não foi identificado o anúncio impresso na lista de anúncios visualizados na visita e 1 para quando foi identificado, ou seja, o usuário clicou no anúncio para visualizar os detalhes:
> como é processado linha a linha, pode acontecer do usuário ter visto várias listas de anúncios onde um anúncio específico apareceu em posições diferentes, e se o usuário clicou para visualizar os detalhes deste anúncio específico, será contabilizado em todas as posições.

In [52]:
lista_detalhes_full = posicoes_melted \
    .join(detalhes, on='visita', how='left') \
    .withColumnRenamed('variable', 'posicaolista') \
    .withColumnRenamed('value', 'anuncio_id') \
    .withColumn('houve_detalhe', F.when(F.arrays_overlap(F.array(F.col('anuncio_id')), F.col('detalhe_lista')), 1).otherwise(0))

lista_detalhes_full.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- visita: long (nullable = true)
 |-- data: date (nullable = true)
 |-- finalOrder: integer (nullable = true)
 |-- interfaceOrigem: integer (nullable = true)
 |-- posicaolista: string (nullable = false)
 |-- anuncio_id: integer (nullable = true)
 |-- detalhe_lista: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- houve_detalhe: integer (nullable = false)

In [53]:
lista_detalhes_full \
    .where(F.col('visita') == 496974023) \
    .show(40, False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+----+----------+---------------+------------+----------+-------------+-------------+
|visita|data|finalOrder|interfaceOrigem|posicaolista|anuncio_id|detalhe_lista|houve_detalhe|
+------+----+----------+---------------+------------+----------+-------------+-------------+
+------+----+----------+---------------+------------+----------+-------------+-------------+

Agrupamento do dataframe `lista_detalhe_full` ao nível de anúncio:

In [54]:
lista_detalhe_por_anuncio = lista_detalhes_full \
    .groupby('data', 'finalOrder', 'interfaceOrigem', 'anuncio_id', 'posicaolista') \
    .agg(
        F.sum('houve_detalhe').alias('detalhes'),
        F.count('anuncio_id').alias('impressoes')
    ) \
    .withColumn('taxa_para_detalhe', F.when(F.col('detalhes') > 0, F.round((F.col('detalhes') / F.col('impressoes'))*100, 2)))

lista_detalhe_por_anuncio.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- data: date (nullable = true)
 |-- finalOrder: integer (nullable = true)
 |-- interfaceOrigem: integer (nullable = true)
 |-- anuncio_id: integer (nullable = true)
 |-- posicaolista: string (nullable = false)
 |-- detalhes: long (nullable = true)
 |-- impressoes: long (nullable = false)
 |-- taxa_para_detalhe: double (nullable = true)

In [55]:
lista_detalhe_por_anuncio.show(10, False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----------+---------------+----------+------------+--------+----------+-----------------+
|data      |finalOrder|interfaceOrigem|anuncio_id|posicaolista|detalhes|impressoes|taxa_para_detalhe|
+----------+----------+---------------+----------+------------+--------+----------+-----------------+
|2021-01-01|14        |1              |31406226  |03          |2       |128       |1.56             |
|2021-01-01|14        |1              |31375898  |13          |0       |13        |null             |
|2021-01-01|14        |1              |31097984  |14          |0       |58        |null             |
|2021-01-01|14        |1              |31687525  |12          |0       |9         |null             |
|2021-01-01|3         |1              |31606923  |11          |0       |3         |null             |
|2021-01-01|14        |1              |30310912  |17          |1       |240       |0.42             |
|2021-01-01|3         |1              |31878797  |01          |2       |39        

Gravação do dataframe `lista_detalhe_por_anuncio` em parquet no datalake:

In [56]:
lista_detalhe_por_anuncio.coalesce(1).write.mode('append').parquet('s3://datalake.icarros/score_anuncio/base/estudo_posicao_lista/janeiro_2022/')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…