# Estudo para Desafio Escale

Este <i>notebook</i> apresenta uma <i>storytelling</i> do estudo realizado, descrevendo  um passo a passo de como foram utilizadas as tecnologia Spark e Python para a resolução do desafio propospo pela Escale (https://escaletech.github.io/dataplatform/data-engineer-test).

Para este trabalho foi considerado apenas um dataset (part-00000.json.gz). 

Um outro notebook será construído para o processamento de todos os arquivos para atender o desafio solicitado.

No primeiro momento sera instalado e importadas bibliotecas do Spark

In [1]:
try:
    !pip install pyspark=="2.4.5" --quiet
    !pip install pandas=="1.0.4" --quiet
except:
    print("Running throw py file.")

In [37]:
from pyspark import SparkContext as sc
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark import SparkFiles
from pyspark.sql.types import StringType, FloatType
import pyspark
import json
import pandas as pd
import numpy as np

Criação de uma sessão Spark

In [2]:
spark = SparkSession\
        .builder\
        .appName("Desafio Data Engineer Escale - Fabio Kfouri")\
        .getOrCreate()
spark

Para otimizar a resolução, foram realizados downloads dos datasets, esta lógica é para identificar se este notebook esta rodando máquina do autor, caso positivo, utilizará o dataset local, do contrário, utilizará o dataset da núvem.

In [3]:
import os

dataPath = 'https://d3l36jjwr70u5l.cloudfront.net/data-engineer-test/'

if 'E:\\' in os.getcwd() and 'dataEngineerTest_Escale' in os.getcwd():
    dataPath = os.getcwd() + "/data/"

print(dataPath)


E:\Projetos\Jobs\dataEngineerTest_Escale/data/


## Leitura do primeiro dataset
Os códigos seguintes tem o objetivo apenas de conhecer o dataset:
- vizualizar alguns registros
- quantidade de registros
- observar o Schema do datase

In [None]:
df0 = spark.read.json(dataPath + 'part-0000' + str(0) +'.json.gz')
df0.show(3)

In [None]:
df0.count()

In [None]:
df0.printSchema()

## Construção do ClickStream

O código seguinte é para identificar os usuários que mais utilizaram o site. Esta pesquisa é importante para poder criar o <b>Sessionamento</b>.

In [None]:
df = df0.groupBy('anonymous_id').count().sort(F.col("count").desc())
df.show(10,False)

Identificado um <b>anonymous_id</b> com uso intensivo do site, que será utilizado para a modelagem do sessionamento.

In [None]:
df = df0.filter('anonymous_id = "ae59b395-31ac-40e3-b7c0-3d97d00ac6cc"').sort(F.col("device_sent_timestamp"))
df.show()

Para calculo do tempo de sessão será utilizado a função <b>LAG()</b> que tem por objetivo trazer no registro corrente o dado de timestamp do registro anterior.

O autor assumiu como premissa não declarada para definir uma sessão, que além do tempo limite de 30 minutos desde a última utilização, que uma sessão precisaria considerar o <b>device_family</b> e <b>os_family</b>.

Ou seja, mesmo que não tenha excedido o tempo limite de 30 minutos, mas se for caracterizado que houve uma mudança de device_family ou os_family, trata-se de uma sessão nova.

In [None]:
overCategory = Window.partitionBy('anonymous_id','device_family','os_family','browser_family')\
                    .orderBy('anonymous_id','device_family','os_family','browser_family','device_sent_timestamp')

In [None]:
dftemp = df0.withColumn("lag", F.lag('device_sent_timestamp', 1).over(overCategory))
dftemp.show(5)

Identificado que o tempo esta no formato <b>epoch</b>. 

A primeira intuição do autor foi de converter para o formato de data, porém, logo o autor percebeu que poderia usar o própria valor numérico para identificar a tolerancia de 30 minutos.

Para isso, foi convertido o tempo de 30 minutos em milisegundos para o uso simplificado na identificação da sessão.

In [None]:
time_limit = (30*60)*1000
print(time_limit)

In [None]:
dftemp = dftemp.withColumn('delta_seg', (F.col('device_sent_timestamp') - F.col('lag'))/1000)\
        .withColumn('same_section', (F.col('device_sent_timestamp') - F.col('lag')) < time_limit) \
        .withColumn('event_time', (F.col('device_sent_timestamp')/1000).cast('timestamp'))

dftemp.select('device_family','os_family','browser_family','device_sent_timestamp','same_section','event_time')\
        .show(20, False)

Devido a demora de processamento na maquina do autor, optou-se por gerar um dataset menor para analise.

In [None]:
dftemp.coalesce(1).write.format("json").save("analise.json")

In [4]:
dftemp = spark.read.json(os.getcwd() + '/analise.json')

Criacão de uma view chamada clickStream.

In [5]:
dftemp.createOrReplaceTempView("raw_table")

Pimeiro passo foi possível observar as sessoes abertas e fazer um especie de <i>smoke test</i>.

In [None]:
df_sessioned = spark.sql("""

      SELECT anonymous_id, browser_family, device_family, device_sent_timestamp, event, event_time, n, os_family, --
             platform, nvl(same_section,false) same_section, version,
             case when nvl(same_section,false) = false then
                 0
             else
                delta_seg
             end delta_seg,
             case when nvl(same_section,false) = false then
                'session_' || ROW_NUMBER() OVER (PARTITION BY anonymous_id,nvl(same_section,false) ORDER BY device_sent_timestamp )
             else
                null
             end
             as partial_session
        FROM raw_table t --
        order by anonymous_id, device_sent_timestamp 
""")
df_sessioned.select('device_family','device_sent_timestamp','delta_seg','same_section','event_time','partial_session').show(30, False)

In [6]:
df_sessioned = spark.sql("""
with temp as (--
      SELECT anonymous_id, browser_family, device_family, device_sent_timestamp, event, event_time, n, os_family, --
             platform, nvl(same_section,false) same_section, version,
             case when nvl(same_section,false) = false then
                 0
             else
                delta_seg
             end delta_seg,             
             case when nvl(same_section,false) = false then
             'session_' || ROW_NUMBER() OVER (PARTITION BY anonymous_id,nvl(same_section,false) ORDER BY device_sent_timestamp )
             else
              null
             end
             as partial_session
        FROM raw_table t --
), table_section_id as (--
    select anonymous_id, browser_family, delta_seg, device_family, device_sent_timestamp, event, event_time, n, os_family, --
             platform, nvl(same_section,false) same_section, version,
             LAST_VALUE(partial_session,True) OVER (PARTITION BY anonymous_id ORDER BY device_sent_timestamp ) || '_' || anonymous_id session_id
    from temp t
)
select * from table_section_id
order by anonymous_id, device_sent_timestamp 
""")
#df_sessioned.show()

Neste ponto temos efetivamente um ClickStream

In [7]:
df_sessioned.createOrReplaceTempView("clickstream")

In [8]:
spark.catalog.dropTempView('raw_table')

## Desafio 1

Calcular a quantidade total de sessões únicas por arquivo do conjunto de dados e apresentar no formato JSON.

Neste passo, foi realizado o filtro para desconsiderar as sessoes abertas para descobrir o total de sessoes únicas.

In [None]:
df_question_1 = spark.sql("""
select * from clickstream
where same_section = false
order by anonymous_id, device_sent_timestamp 
  
""")
df_question_1.select('device_family','device_sent_timestamp','same_section','event_time','session_id').show(30, False)

No terceiro e passo foi calculada a quantidade de sessoes abertas, neste caso, para um único usuário.

In [None]:
df_question_1 = spark.sql("""
SELECT /*anonymous_id,*/ COUNT(session_id)  qtd_session
FROM clickstream
where same_section = false
--group by anonymous_id
""")
df_question_1.show()

# Desafio 2
Calcular a quantidade de sessões únicas que ocorreram em cada Browser, Sistema Operacional e Dispositivo dentro de todo o conjunto de dados.

No primeiro passo foi identificar da quantidade por <b>browser_family</b>, <b>os_family</b> e <b>device_family</b>.

In [None]:
spark.sql("""

SELECT browser_family, COUNT(session_id)  qtd_session
FROM clickstream
where same_section = false
group by browser_family --

""").show()

In [None]:
spark.sql("""
SELECT os_family, COUNT(session_id)  qtd_session
FROM clickstream
where same_section = false
group by os_family
""").show()

In [31]:
spark.sql("""
SELECT * --'device_family' what, device_family ref, COUNT(session_id)  qtd_session
  FROM clickstream
 where same_section = false
   and (device_family = 'iPhone8' or browser_family = 'iPhone8'  or os_family= 'iPhone8')
 --group by device_family
""").show()

+------------+--------------+---------+-------------+---------------------+-----+----------+---+---------+--------+------------+-------+----------+
|anonymous_id|browser_family|delta_seg|device_family|device_sent_timestamp|event|event_time|  n|os_family|platform|same_section|version|session_id|
+------------+--------------+---------+-------------+---------------------+-----+----------+---+---------+--------+------------+-------+----------+
+------------+--------------+---------+-------------+---------------------+-----+----------+---+---------+--------+------------+-------+----------+



Segundo passo <b>Método 1</b> foi agrupar todos esses resultados em um dataframe

In [10]:
df_question_2 = spark.sql("""
with table_temp as (--
    SELECT *
    FROM clickstream
    where same_section = false
), 
table_union as (--
    SELECT 'device_family' what, device_family ref, COUNT(session_id)  qtd_session
      FROM table_temp
     group by device_family
    union
    SELECT 'os_family' what, os_family ref, COUNT(session_id)  qtd_session
      FROM table_temp
    group by os_family
    union
    SELECT 'browser_family' what, browser_family ref, COUNT(session_id)  qtd_session
      FROM table_temp
     group by browser_family --
),
table_collection as (--
    select what, nvl(ref, 'Not Identified') || ':' || qtd_session collection 
    from table_union
    order by what, ref--
)
select what, array_join(collect_list(trim(collection)),',')  collection
from table_collection
group by what
""")

#df_question_2.show(10, False)

In [12]:
dados = df_question_2.toPandas()

In [14]:
dados.head()

Unnamed: 0,what,collection
0,device_family,":88,1001-G Go:221,2014819:69,4009E:5,4009I:6,4..."
1,os_family,"Android:7247173,Chrome OS:1892,Fedora:266,Free..."
2,browser_family,"AdsBot-Google:11399,Amazon Silk:131,Android:36..."


In [27]:
question2a = {}
print(question2a)

{}


In [32]:

for index, row in dados.iterrows():
    items = str(row["collection"]).split(',')

    
    if not row["what"] in question2a:
        question2a[row["what"]] = {}
    
        
    for item in items:
        element = str(item).split(':')
        if not element[0] in question2a[row["what"]]:
            question2a[row["what"]][element[0]] = 0
        
        if len(element) > 1:
            question2a[row["what"]][element[0]] = float(element[1]) + question2a[row["what"]][element[0]]
        else:
            question2a[row["what"]][element[0]] = float(0) + question2a[row["what"]][element[0]]

print(question2a)

{'device_family': {'': 264.0, '1001-G Go': 663.0, '2014819': 207.0, '4009E': 15.0, '4009I': 18.0, '4017F': 693.0, '4028E': 6.0, '4034E': 4506.0, '4055J': 510.0, '5010E': 4392.0, '5016J': 255.0, '5017E': 27.0, '5026J': 864.0, '5033E': 1185.0, '5033J': 2568.0, '5045J': 1467.0, '5046J': 285.0, '5049E': 78.0, '5051J': 741.0, '5054W': 33.0, '5056N': 87.0, '5085J': 669.0, '5085N': 1545.0, '5090I': 75.0, '5152D': 1080.0, '5159J': 2889.0, '5186D': 468.0, '5199I': 231.0, '6039J': 78.0, '6055B': 672.0, '6060S': 27.0, '62S': 168.0, '7048A': 6.0, '705-G': 66.0, '705-G Go': 1464.0, '71S': 156.0, '8050E': 5151.0, '9008J': 1191.0, '9008N': 2019.0, '91S': 6.0, 'A3_Pro': 24.0, 'ALE-L21': 243.0, 'ATU-LX3': 3.0, 'Advance 4.0 L3': 108.0, 'Advance 4.0M': 126.0, 'Advance 5.2': 48.0, 'Advance L4': 240.0, 'Aquaris X Pro': 54.0, 'Archos 60 Platinum': 63.0, 'Archos Access 50 Color 3G': 78.0, 'Archos Core 50 4G': 210.0, 'Armor_3': 3.0, 'Armor_6': 60.0, 'Asus A001D': 32244.0, 'Asus A007': 5649.0, 'Asus ASUS': 141

Segundo passo <b>Método 2</b> foi agrupar todos esses resultados em um dataframe

In [None]:
df_question_2 = spark.sql("""
with table_temp as (--
    SELECT *
    FROM clickstream
    where same_section = false
), 
table_union as (--
    SELECT 'device_family' what, device_family ref, COUNT(session_id)  qtd_session
      FROM table_temp
     group by device_family
    union
    SELECT 'os_family' what, os_family ref, COUNT(session_id)  qtd_session
      FROM table_temp
    group by os_family
    union
    SELECT 'browser_family' what, browser_family ref, COUNT(session_id)  qtd_session
      FROM table_temp
     group by browser_family --
)
select what, ref || ':' || qtd_session value 
from table_union
order by what, ref
""")
df_question_2.show()

No passo seguinte foi de concatenar a coluna 'ref' com 'qtd_session' em um formato json e agrupar por collecao.

In [None]:
d2 = df_question_2.groupBy('what').agg(F.array_join(F.collect_list('item'), delimiter=',').alias('collection'))
d2.show(10, False)

Finalmente construir a saida Json do desafio

In [None]:
question2 = {}
for row in d2.collect():
    items = str(row["collection"]).replace('"','').split(',')
    obj = {}
    for item in items:
        element = str(item).split(':')
        obj[element[0]] = float(element[1])       
    
    question2[row["what"]] = obj

print(question2)

In [None]:
print(json.dumps(question2))

## Desafio 3

Calcular a mediana da duração (em segundos) entre todas sessões únicas para cada segmento.

Neste exemplo selecionado uma sessao com alguns eventos.

In [None]:
ddf = spark.sql("""

SELECT session_id, delta_seg                
FROM clickstream
where session_id = 'session_31_ae59b395-31ac-40e3-b7c0-3d97d00ac6cc'
group by session_id, delta_seg
order by session_id, delta_seg

""")
ddf.show(300, False)

Metodo 1, usando a função approxQuantile.

In [None]:
ddf.approxQuantile("delta_seg", [0.5], 0.25)

Metodo 2, usando uma função UDF para aplicação da biblioteca Numpy.
Descartado por ser um processamento muito longo.

In [None]:
#https://stackoverflow.com/questions/38743476/how-to-find-the-median-in-apache-spark-with-python-dataframe-api/38743477
def find_median(values_list):
    try:
        median = np.median(values_list) #get the median of values in a list in each row
        return round(float(median),2)
    except Exception:
        return None #if there is anything wrong with the given values

median_finder = F.udf(find_median,FloatType())

In [None]:
ddfa = ddf.groupBy('session_id').agg(F.collect_list('delta_seg').alias('deltas'))
ddfa.show()

In [None]:
ddfa = ddfa.withColumn("median",median_finder("deltas")) 
ddfa.show()

Método 3, usando a função percentile_approx. Parecido com o método 1, porém, dentro da expressao SQL.

In [None]:
spark.sql("""
with t1 as (--
    SELECT  delta_seg                
    FROM clickstream
    where session_id = 'session_31_ae59b395-31ac-40e3-b7c0-3d97d00ac6cc'
    group by  delta_seg
    order by  delta_seg
--
), t2 as (--
    SELECT count( delta_seg)/2 central
    FROM clickstream
    where session_id = 'session_31_ae59b395-31ac-40e3-b7c0-3d97d00ac6cc' --
)
select session_id, percentile_approx(delta_seg , 0.5) median 
 FROM clickstream
group by session_id

""").show(300, False)

Escolha do Metodo 3 e reuso parcial da query do desafio 2. Considerando somente as sessões abertas

In [9]:
df_question_3 = spark.sql("""
with table_temp as (--
    SELECT *
    FROM clickstream
    where same_section = true
), 
table_union as (--
    SELECT 'device_family' what, device_family ref, percentile_approx(delta_seg , 0.5) median   
      FROM table_temp
     group by device_family
    union
    SELECT 'os_family' what, os_family ref, percentile_approx(delta_seg , 0.5) median 
      FROM table_temp
    group by os_family
    union
    SELECT 'browser_family' what, browser_family ref, percentile_approx(delta_seg , 0.5) median 
      FROM table_temp
     group by browser_family --
),
table_collection as (--
    select what, ref || ':' || median collection 
    from table_union
    order by what, ref--
)
select what, array_join(collect_list(trim(collection)),',')  collection
from table_collection
group by what
""")


In [10]:
dados3 = df_question_3.toPandas()

In [16]:
dados3.head()

Unnamed: 0,what,collection
0,device_family,"Generic Smartphone:633.786,Samsung SM-A105FN:8..."
1,os_family,"Android:585.859,Other:360.042,iOS:195.088"
2,browser_family,"Chrome Mobile:806.804,Facebook:800.884,Googleb..."


In [38]:
question3 = {}
for index, row in dados3.iterrows():
    items = row["collection"].split(',')
    
    if not row["what"] in question3:
        question3[row["what"]] = {}

    for item in items:
        element = str(item).split(':')

        if not element[0] in question3[row["what"]]:
            question3[row["what"]][element[0]] = 0


        if len(element) > 1:
            question3[row["what"]][element[0]] = np.median([ float(element[1]), question3[row["what"]][element[0]] ])
        else:
            question3[row["what"]][element[0]] = question3[row["what"]][element[0]]

        
print(question3)

{'device_family': {'Generic Smartphone': 316.893, 'Samsung SM-A105FN': 400.442, 'Samsung SM-G9650': 794.2095, 'Samsung SM-J415G': 403.402, 'Spider': 201.988, 'iPhone': 97.544}, 'os_family': {'Android': 292.9295, 'Other': 180.021, 'iOS': 97.544}, 'browser_family': {'Chrome Mobile': 403.402, 'Facebook': 400.442, 'Googlebot': 201.988, 'Mobile Safari': 97.544}}
