Analytics
=========

## 1.

Implementé varios WITH clauses: 
1. En "aggregate" se cuenta la cantidad de compras por usuario por mes. 
2. En "ranked" se genera un ranking porcentual ordenado por la cantidad de compras y tomando como denominador al total de usuarios.
3. En "lagged", para cada registro de compra se obtiene el event_date de la compra anterior y se filtra a aquellos usuarios que pertenecen al 5% de los que mas compras realizaron en el mes.
4. La query final termina aplicando el promedio de la diferencia de cada compra contra la anterior agrupado por usuario y por mes. Se filtran las primeras compras de cada mes y usuario. El Time delta estará en segundos.

In [None]:
spark.sql("""WITH aggregate AS (
             SELECT DEVICE_ID, 
                    MONTH(EVENT_DATE) as month, 
                    sum(case when EVENT_ID = 2 then 1 else 0 end) as purchases
             FROM log_table
             GROUP BY 1, 2),
             
             ranked AS (
             SELECT DEVICE_ID,
                    month,
                    purchases,
                    percent_rank() OVER(PARTITION BY month ORDER BY purchases DESC) AS rank
             FROM aggregate
             ORDER BY rank),
             
             lagged AS (
             SELECT a.DEVICE_ID, 
                    MONTH(EVENT_DATE) AS month, 
                    EVENT_DATE, 
                    LAG(EVENT_DATE, 1) OVER (PARTITION BY a.DEVICE_ID, month ORDER BY EVENT_DATE) AS last_event
             FROM log_table a
             INNER JOIN ranked b
             ON a.DEVICE_ID = b.DEVICE_ID AND MONTH(a.EVENT_DATE) = b.month
             WHERE EVENT_DATE BETWEEN '2019-05-01' AND '2019-05-31'
             AND EVENT_ID = 2
             AND rank =< 0.05)
             
             SELECT DEVICE_ID, month, MEAN(EVENT_DATE - last_event) AS time_delta
             FROM lagged
             WHERE last_event IS NOT NULL
             GROUP BY 1, 2
             """)

### 2.

Si bien se aplica el promedio de predicted_ctr, lo ideal sería calcular SUM(PREDICTED_CLICK)/SUM(PREDICTED_IMPRESSION). Debido a que el promedio de proporciones no sería correcto matemáticamente.
Usé LEFT JOIN, ya que no sé si todo Ad tiene aparejado un costo, de esta forma me aseguraba de incluir todos los ads en la cuenta. De otra forma hubiese usado INNER JOIN.

In [None]:
spark.sql("""SELECT VARIANT, 
                    SUM(IMPRESSION) AS IMPRESSIONS, 
                    SUM(CLICK) AS CLICKS, 
                    SUM(IMPRESSION)/SUM(CLICK) AS CTR,
                    MEAN(PREDICTED_CTR) AS MEAN_PREDICTED_CTR
                    SUM(COST) AS COST
                    FROM IMPRESSIONS A
                    LEFT JOIN COSTS B
                    ON A.ID = B.ID
                    GROUP BY VARIANT
                    HAVING VARIANT != 'c'
""")