## Sobre os dados

O arquivo CSV contém eventos 'click' ou 'view' no tempo, de usuários em anúncios de determinadas campanhas.

**Descrição das colunas:**  
timestamp,user_id,action,adId,campaignId 

**Amostra:**  
2016-09-21 22:11:00,7c74953c-66cc-48bd-9d02-a02bf039cf3f,click,adId_09,campaignId_01  
2016-06-25 18:29:00,676a083e-2f8e-4ff2-9ec2-270f7f9d6033,view,adId_09,campaignId_02  
2016-02-14 19:03:00,77158997-0dfa-48b7-9149-973dc151ef8d,click,adId_02,campaignId_02  
2016-03-26 06:27:00,78aa2467-b502-413b-94e9-04ec8210bd13,click,adId_07,campaignId_03

**Nome do arquivo CSV:**  
ad_action.csv

## Sobre as questões

As questões devem ser respondidas usando as tecnolocias de acordo com as tags abaixo:

- [Dask]: usar código Dask na resposta
- [Spark]: usar código Spark na resposta. Pode usar Map-Reduce ou Spark SQL
- [SparkStreaming]: usar código de Spark Streaming na resposta. Favor não alterar o código que gera o inputStream para manter o mesmo padrão na resposta

Mesmo que não consiga terminar alguma questão, favor enviar, porque parte do código pode valer alguma pontuação.
## ----------------------

In [1]:
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster

cluster = LocalCluster()
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:39305  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 8.23 GB


In [2]:
data_dask = dd.read_csv('ad_action.csv', header=None, parse_dates=[0])
data_dask.columns = ['timestamp','user_id','action','adId','campaignId']
data_dask

Unnamed: 0_level_0,timestamp,user_id,action,adId,campaignId
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
,datetime64[ns],object,object,object,object
,...,...,...,...,...


In [3]:
data_dask.head(5)

Unnamed: 0,timestamp,user_id,action,adId,campaignId
0,2016-09-21 22:11:00,7c74953c-66cc-48bd-9d02-a02bf039cf3f,click,adId_09,campaignId_01
1,2016-06-25 18:29:00,676a083e-2f8e-4ff2-9ec2-270f7f9d6033,view,adId_09,campaignId_02
2,2016-02-14 19:03:00,77158997-0dfa-48b7-9149-973dc151ef8d,click,adId_02,campaignId_02
3,2016-03-26 06:27:00,78aa2467-b502-413b-94e9-04ec8210bd13,click,adId_07,campaignId_03
4,2016-01-02 04:57:00,fef9a98c-d73e-48ef-b2cb-766ba85dc3e3,view,adId_02,campaignId_02


## 1) [Dask] Quais são as top 3 campanhas que geraram mais eventos? Ordene pela quantidade de eventos (2 pontos)

In [4]:
# ESCREVA SEU CÓDIGO AQUI

most_popular_campaign = data_dask[['campaignId', 'timestamp']].groupby('campaignId')\
    ['timestamp']\
    .count()\
    .nlargest(3)\
    .compute()

print(most_popular_campaign)

campaignId
campaignId_02    91216
campaignId_03    87036
campaignId_01    76461
Name: timestamp, dtype: int64


## 2) [Dask] Qual campanha teve mais clicks? (2 pontos)

In [5]:
# ESCREVA SEU CÓDIGO AQUI

clicks_campaign = data_dask[['campaignId', 'action']]\
    .where(data_dask['action'] == 'click')\
    .loc[(data_dask['action'] != 'NaN') & (data_dask['campaignId'] != 'NaN')]
   

most_clicks_campaign = clicks_campaign[['campaignId', 'action']].groupby('campaignId')\
    ['action']\
    .count()\
    .nlargest(1)\
    .compute()
    
    
    
print('A campanha: '+most_clicks_campaign.index[0]+' obteve maior número de clicks')


A campanha: campaignId_02 obteve maior número de clicks


## ----------------------

In [6]:
# Para que o Jupyter consiga carregar o Spark corretamente no notebook
import findspark
from distutils.sysconfig import get_python_lib
findspark.init(f"{get_python_lib()}/pyspark")

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

In [7]:
# Para quem usar Spark com Map-Reduce
conf = SparkConf().setMaster("local[*]")
sc = SparkContext(conf=conf)
data_sc = sc.textFile('ad_action.csv')
data_sc.take(5)

['2016-09-21 22:11:00,7c74953c-66cc-48bd-9d02-a02bf039cf3f,click,adId_09,campaignId_01',
 '2016-06-25 18:29:00,676a083e-2f8e-4ff2-9ec2-270f7f9d6033,view,adId_09,campaignId_02',
 '2016-02-14 19:03:00,77158997-0dfa-48b7-9149-973dc151ef8d,click,adId_02,campaignId_02',
 '2016-03-26 06:27:00,78aa2467-b502-413b-94e9-04ec8210bd13,click,adId_07,campaignId_03',
 '2016-01-02 04:57:00,fef9a98c-d73e-48ef-b2cb-766ba85dc3e3,view,adId_02,campaignId_02']

In [None]:
# Para quem usar Spark SQL
from pyspark.sql.functions import to_timestamp
spark = SparkSession.builder\
    .master("local[*]")\
    .getOrCreate()
data_spark = spark.read.csv('ad_action.csv', header=False, inferSchema=True)
data_spark = data_spark.withColumnRenamed('_c0', 'timestamp')
data_spark = data_spark.withColumnRenamed('_c1', 'user_id')
data_spark = data_spark.withColumnRenamed('_c2', 'action')
data_spark = data_spark.withColumnRenamed('_c3', 'adId')
data_spark = data_spark.withColumnRenamed('_c4', 'campaignId')
data_spark = data_spark.withColumn('timestamp', to_timestamp('timestamp'))
data_spark.printSchema()

In [None]:
data_spark.show(5)

## 3) [Spark] Dos 12 meses do ano, qual teve o maior total de eventos acumulado ao longo dos anos? (2 pontos)

In [8]:
#Professor, essa questão tive dúvidas com relação se era o mês do ano que tem maior eventos ou se era a campanha que teria mais eventos.
# Com isso fiz as duas opções. 


# --------------  Agrupando pelo mês. Identificando o mês com maior número de eventos ----------------------
def parse_action(line):
    fields = line.split(",")
    val = fields[0].split() 
    val = val[0].split("-")
    teste = val[0]+"/"+val[1] 
    tupla = (teste, 1)
    return tupla

dataCont = data_sc.map(parse_action)\
            .reduceByKey(lambda x, y: x + y)


campaign_count= dataCont.map(lambda x: (x[1], x[0])).sortByKey(ascending=False)

campaign_final = campaign_count.take(1)

for campaign, number_of_campaign in sorted(campaign_final):
    print(f"{number_of_campaign} teve {campaign}"+" com maior número de eventos")

2016/01 teve 25800 com maior número de eventos


In [None]:
# --------------  Agrupando pela campanha ----------------

def parse_action(line):
    fields = line.split(",")
    return (fields[4], 1)

campaign = data_sc.map(parse_action)\
        .reduceByKey(lambda x, y: x + y)

campaign_count= campaign.map(lambda x: (x[1], x[0])).sortByKey(ascending=False)

campaign_final = campaign_count.take(1)

for campaign, number_of_campaign in sorted(campaign_final):
    print(f"{number_of_campaign} teve {campaign}"+" com maior número de eventos")

## 4) [Spark] Qual campanha teve mais views? (2 pontos)

In [9]:
def parse_action(line):
    fields = line.split(",")
    return (fields[2],(fields[4], 1))

def take_top_click(rdd):
    top_players = rdd.top(1, lambda x: x[1])
    print(top_players)

campaign = data_sc.map(parse_action)\
        .filter(lambda x: 'view' == x[0])\
        .map(lambda x: x[1])\
        .reduceByKey(lambda x, y: x + y)
        

campaign_count= campaign.map(lambda x: (x[1], x[0])).sortByKey(ascending=False)

campaign_final = campaign_count.take(1)

for campaign, number_of_campaign in sorted(campaign_final):
    print(f"{number_of_campaign} teve {campaign}"+" com maior número de views")

campaignId_02 teve 27233 com maior número de views


## ----------------------

In [10]:
#Repetindo os imports

import numpy
import os
import operator
import re

# Para que o Jupyter consiga carregar o Spark corretamente no notebook
import findspark
from distutils.sysconfig import get_python_lib
findspark.init(f"{get_python_lib()}/pyspark")

from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext

In [None]:
# Criando um cluster local com 1 executor e a quantidade de threads igual a quantidade de cores de CPU disponíveis

conf = SparkConf().setMaster("local[*]")
sc = SparkContext(conf=conf)
sc

#### Configurando Streaming para dividir os dados em 25471 mini-batches e enviar 1 por segundo

In [11]:
ad_action_rdd = sc.textFile("ad_action.csv")
ad_action = ad_action_rdd.collect()
print(f"Total de registros: {len(ad_action)}")
ad_action_stream = numpy.array_split(ad_action, 25471)
rddQueue = [sc.parallelize(mini_batch) for mini_batch in ad_action_stream]
print(f"Total de mini-batches: {len(rddQueue)}")
print(f"Total de registros no primeiro mini-batch: {len(ad_action_stream[0])}")
print(ad_action_stream[:3])

Total de registros: 254713
Total de mini-batches: 25471
Total de registros no primeiro mini-batch: 11
[array(['2016-09-21 22:11:00,7c74953c-66cc-48bd-9d02-a02bf039cf3f,click,adId_09,campaignId_01',
       '2016-06-25 18:29:00,676a083e-2f8e-4ff2-9ec2-270f7f9d6033,view,adId_09,campaignId_02',
       '2016-02-14 19:03:00,77158997-0dfa-48b7-9149-973dc151ef8d,click,adId_02,campaignId_02',
       '2016-03-26 06:27:00,78aa2467-b502-413b-94e9-04ec8210bd13,click,adId_07,campaignId_03',
       '2016-01-02 04:57:00,fef9a98c-d73e-48ef-b2cb-766ba85dc3e3,view,adId_02,campaignId_02',
       '2016-03-04 09:14:00,6ba65af9-4d83-4567-b580-a34f177bb788,view,adId_09,campaignId_01',
       '2016-07-09 21:42:00,be3befb9-ee08-4311-89f4-430d23ee63f1,click,adId_09,campaignId_01',
       '2016-07-18 21:15:00,d37832ae-546f-4b9b-94f1-ab2ddfc0f49d,click,adId_05,campaignId_02',
       '2016-04-20 13:30:00,1e5b575e-19e7-44fa-b5c2-f9aa6953de8a,click,adId_03,campaignId_01',
       '2016-02-23 19:55:00,ed1b4467-8581-459

## 5) [SparkStreaming] Quais são as top 3 campanhas que geraram mais eventos nos últimos 10 segundos? Ordene pela quantidade de eventos e calcule a cada 3 segundos. (2 pontos)

In [12]:
ssc = StreamingContext(sc, 1)
inputStream = ssc.queueStream(rddQueue)

# ESCREVA SEU CÓDIGO AQUI

def parse_action(line):
    fields = line.split(",")
    return (fields[4], 1)

def take_top_click(rdd):
    top_limit = 3
    top_players = rdd.top(top_limit, lambda x: x[1])
    #val = top_players.takeOrdered(5)
    print(top_players)

inputStream = inputStream.window(10,3)\
        .map(parse_action)\
        .reduceByKey(lambda x, y: x + y)\
        .foreachRDD(take_top_click)
   
      
ssc.start()

[('campaignId_01', 14), ('campaignId_02', 11), ('campaignId_03', 8)]
[('campaignId_01', 24), ('campaignId_03', 21), ('campaignId_02', 18)]
[('campaignId_01', 33), ('campaignId_02', 33), ('campaignId_03', 27)]
[('campaignId_02', 36), ('campaignId_01', 34), ('campaignId_03', 31)]
[('campaignId_02', 43), ('campaignId_03', 29), ('campaignId_01', 28)]


In [None]:
# Stop job
ssc.stop(stopSparkContext=False, stopGraceFully=True)

[('campaignId_02', 46), ('campaignId_01', 30), ('campaignId_03', 24)]
