<a href="https://colab.research.google.com/github/Ricardojnf33/Spark_actionAD1/blob/main/Exercicio_Spark_Ricardo_Fernandes.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Instalando as dependências

In [1]:
# instalar as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

# Importando Bibliotecas


In [3]:
import os
import findspark

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import col, asc, desc, dayofweek, month, lag, lead, when, unix_timestamp
from pyspark.sql.window import Window

# Criando variáveis de ambiente

In [2]:
# configurar as variáveis de ambiente
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

# tornar o pyspark "importável"
findspark.init('spark-2.4.4-bin-hadoop2.7')

# Iniciando Sessão Spark

In [4]:
spark = SparkSession.builder.getOrCreate()

# Feature Engineering

In [5]:
from pyspark.sql.types import StructType, IntegerType, TimestampType, StringType, StructField

schema = StructType([
    StructField("timestamp", TimestampType()),
    StructField("user_id", StringType()),
    StructField("action", StringType()),
    StructField("adId", StringType()),
    StructField("campaignId", StringType())
])

# Carregando CSV_file

In [6]:
!git clone https://github.com/Ricardojnf33/Spark_actionAD1.git

fatal: destination path 'Spark_actionAD1' already exists and is not an empty directory.


In [7]:
df_ad = spark.read.csv('/content/Spark_actionAD1/ad_action.csv', header=False, schema=schema)
df_ad.show(5)

+-------------------+--------------------+------+-------+-------------+
|          timestamp|             user_id|action|   adId|   campaignId|
+-------------------+--------------------+------+-------+-------------+
|2016-09-21 22:11:00|7c74953c-66cc-48b...| click|adId_09|campaignId_01|
|2016-06-25 18:29:00|676a083e-2f8e-4ff...|  view|adId_09|campaignId_02|
|2016-02-14 19:03:00|77158997-0dfa-48b...| click|adId_02|campaignId_02|
|2016-03-26 06:27:00|78aa2467-b502-413...| click|adId_07|campaignId_03|
|2016-01-02 04:57:00|fef9a98c-d73e-48e...|  view|adId_02|campaignId_02|
+-------------------+--------------------+------+-------+-------------+
only showing top 5 rows



# Perguntas

## 1) Quais são as top 3 campanhas que geraram mais eventos? Ordene pela quantidade de eventos (2,5 pontos)

In [8]:
df_ad.groupby('campaignId').count().orderBy(col('count').desc()).show(3)

+-------------+-----+
|   campaignId|count|
+-------------+-----+
|campaignId_02|91216|
|campaignId_03|87036|
|campaignId_01|76461|
+-------------+-----+



## Resposta :


*   campaignId_02|91216
*   campaignId_03|87036
*   campaignId_01|76461


## 2) Qual campanha teve mais clicks? (2,5 pontos)

In [9]:
df_ad.where(df_ad.action=='click').groupby('campaignId').count().orderBy(col('count').desc()).show(1)

+-------------+-----+
|   campaignId|count|
+-------------+-----+
|campaignId_02|63983|
+-------------+-----+
only showing top 1 row



## Resposta :


*   campaignId_02|63983

## 3) Dos 12 meses do ano, qual teve o maior total de eventos acumulado ao longo dos anos? (2,5 pontos)

In [10]:
df_ad.select('timestamp','action',).withColumn('timestamp',month(df_ad.timestamp)).groupBy('timestamp').count().show(1)

+---------+-----+
|timestamp|count|
+---------+-----+
|       12|20297|
+---------+-----+
only showing top 1 row



## Resposta :


*   Dezembro (12)|20297

## 4) Nas situações onde existe um evento de view seguido de um evento de click criados pelo mesmo usuário no mesmo anúncio e campanha, quais são os 5 pares de anúncio e campanha com menores médias de tempo entre os dois eventos (2,5 pontos)

In [11]:
df_ad_lead_time_copy = df_ad.alias('df_ad_lead_time_copy')

df_ad_lead_time_copy = df_ad_lead_time_copy.withColumn('lead', lead(col('action'),1 )\
                         .over(Window.partitionBy(col('user_id'), 
                                                  col('campaignId'),
                                                  col('adId'),).orderBy(col('timestamp').asc(), col('action').desc())))

df_ad_lead_time_copy = df_ad_lead_time_copy.withColumn('lead_time', lead(col('timestamp'),1 )\
                         .over(Window.partitionBy(col('user_id'), 
                                                  col('campaignId'),
                                                  col('adId'),).orderBy(col('timestamp').asc(), col('action').desc())))

df_ad_lead_time_copy.where((col('action')=='view') & (col('action')!=col('lead')) )\
.select( col('user_id'), col('campaignId'),
        col('adId'),(unix_timestamp(col('lead_time')) - unix_timestamp(col('timestamp'))).alias('lead_sec'))\
        .orderBy(col('lead_sec').asc())\
        .groupBy(col('user_id'), col('campaignId'), col('adId'))\
        .avg().orderBy(col('avg(lead_sec)')).show(5)

+--------------------+-------------+-------+-------------+
|             user_id|   campaignId|   adId|avg(lead_sec)|
+--------------------+-------------+-------+-------------+
|848eb13c-6881-45a...|campaignId_02|adId_05|        480.0|
|f4db1c5e-7780-4be...|campaignId_02|adId_05|        660.0|
|e929e33e-791a-4b7...|campaignId_03|adId_10|        900.0|
|28c5def5-3ede-4ea...|campaignId_01|adId_01|        960.0|
|c67d4a61-a911-498...|campaignId_02|adId_06|       1140.0|
+--------------------+-------------+-------+-------------+
only showing top 5 rows



## Resposta :

                  user_id|   campaignId|   adId| avg(lead_sec)|

*   |848eb13c-6881-45a...|campaignId_02|adId_05|        480.0|
*   |f4db1c5e-7780-4be...|campaignId_02|adId_05|        660.0|
*   |e929e33e-791a-4b7...|campaignId_03|adId_10|        900.0|
*   |28c5def5-3ede-4ea...|campaignId_01|adId_01|        960.0|
*   |c67d4a61-a911-498...|campaignId_02|adId_06|       1140.0|