In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType
from utils import Utils
from utils.Utils import Transformer
from utils.spark_utils import Spark_utils
import logging

In [2]:
## Settings variables for log handling and Spark Session
spark = None
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

## Spark_utils object to enable SparkSession
spark_util = Spark_utils("DEV", "Meli_Pipeline")
spark_util.set_spark_session()
spark = spark_util.get_spark_session()

Environment:  DEV
24/08/06 08:42:32 WARN Utils: Your hostname, EPCOBOGW1343 resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/08/06 08:42:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/08/06 08:42:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
## Iniaiting data reads from original source to Dataframe
logger.info("Reading Data Sources from Local Storage...")

logger.info("Defining prints.json schema ...")
prints_schema = StructType([
StructField("day", DateType(), True),
StructField("event_data", StructType([
    StructField("position", IntegerType(), True),
    StructField("value_prop", StringType(), True)
]), True),
StructField("user_id", IntegerType(), True)
])

## Reading "prints.json" data to dataframe
logger.info("Read prints.json with as dataset with proper schema")
df_prints = spark.read.json("data_sources/prints.json", schema=prints_schema)

logger.info("Prints's dataframe schema...")
df_prints.printSchema()

INFO:__main__:Reading Data Sources from Local Storage...
INFO:__main__:Defining prints.json schema ...
INFO:__main__:Read prints.json with as dataset with proper schema
INFO:__main__:Prints's dataframe schema...


root
 |-- day: date (nullable = true)
 |-- event_data: struct (nullable = true)
 |    |-- position: integer (nullable = true)
 |    |-- value_prop: string (nullable = true)
 |-- user_id: integer (nullable = true)



In [4]:
## Reading "taps.json" data to dataframe

taps_schema = StructType([
StructField("day", DateType(), True),
StructField("event_data", StructType([
    StructField("position", IntegerType(), True),
    StructField("value_prop", StringType(), True)
]), True),
StructField("user_id", IntegerType(), True)
])
    
df_taps = spark.read.json("data_sources/taps.json", schema = taps_schema)

## Reading "prints.json" data to dataframe

pays_schema = StructType([
StructField("pay_date", DateType(), True),
StructField("total", DoubleType(), True),
StructField("user_id", IntegerType(), True),
StructField("value_prop", StringType(), True)
])
    
df_pays = spark.read.option("header","True").csv("data_sources/pays.csv", schema = pays_schema)
logger.info("3 New Datasets were created")

INFO:__main__:3 New Datasets were created


In [5]:
#################### Processing df_prints DataFrame ##########################

logger.info("Calculating week of year for column day in prints")
df_prints = Transformer.calcWeekOfTheYear(df_prints, "day", "week_of_year")

df_prints.printSchema()

logger.info("Calculating Rank for column week_of_year in prints")
##df_prints = Transformer.calcRank(df_prints, ['user_id'], ['week_of_year'], "rank_num", "desc")

logger.info("Calculating Row_Number for column week_of_year in prints")
##df_prints = Transformer.calcRowNumber(df_prints, ['user_id','week_of_year'], ['week_of_year'], "row_num", "desc")
    
df_prints.printSchema()

logger.info("Showing processed data for Prints")
df_prints.show(truncate = False)
##df_prints.show(10, truncate = False)

#################### Processing df_taps DataFrame ##########################

logger.info("Calculating week of year for column day in taps")
df_taps = Transformer.calcWeekOfTheYear(df_taps, "day", "week_of_year")

df_taps.printSchema()

logger.info("Calculating Rank for column week_of_year in taps")
##df_taps = Transformer.calcRank(df_taps, ['user_id'], ['week_of_year'], "rank_num", "desc")

logger.info("Calculating Row_Number for column week_of_year in prints")
##df_taps = Transformer.calcRowNumber(df_taps, ['user_id','week_of_year'], ['week_of_year'], "row_num", "desc")
    
df_taps.printSchema()

logger.info("Showing processed data for Prints")
df_taps.show(truncate = False)
##df_prints.show(10, truncate = False)

#################### Processing df_pays DataFrame ##########################

logger.info("Calculating week of year for column day in pays")
df_pays = Transformer.calcWeekOfTheYear(df_pays, "pay_date", "week_of_year")

df_pays.printSchema()

logger.info("Calculating Rank for column week_of_year in pays")
##df_pays = Transformer.calcRank(df_pays, ['user_id'], ['week_of_year'], "rank_num", "desc")

logger.info("Calculating Row_Number for column week_of_year in prints")
##df_pays = Transformer.calcRowNumber(df_pays, ['user_id','week_of_year'], ['week_of_year'], "row_num", "desc")
    
##df_pays.printSchema()

logger.info("Showing processed data for Prints")
df_pays.show(truncate = False)
##df_prints.show(10, truncate = False)

INFO:__main__:Calculating week of year for column day in prints
INFO:__main__:Calculating Rank for column week_of_year in prints
INFO:__main__:Calculating Row_Number for column week_of_year in prints
INFO:__main__:Showing processed data for Prints


root
 |-- day: date (nullable = true)
 |-- event_data: struct (nullable = true)
 |    |-- position: integer (nullable = true)
 |    |-- value_prop: string (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- week_of_year: integer (nullable = true)

root
 |-- day: date (nullable = true)
 |-- event_data: struct (nullable = true)
 |    |-- position: integer (nullable = true)
 |    |-- value_prop: string (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- week_of_year: integer (nullable = true)



INFO:__main__:Calculating week of year for column day in taps                   
INFO:__main__:Calculating Rank for column week_of_year in taps
INFO:__main__:Calculating Row_Number for column week_of_year in prints
INFO:__main__:Showing processed data for Prints


+----------+-----------------------+-------+------------+
|day       |event_data             |user_id|week_of_year|
+----------+-----------------------+-------+------------+
|2020-11-01|{0, cellphone_recharge}|98702  |44          |
|2020-11-01|{1, prepaid}           |98702  |44          |
|2020-11-01|{0, prepaid}           |63252  |44          |
|2020-11-01|{0, cellphone_recharge}|24728  |44          |
|2020-11-01|{1, link_cobro}        |24728  |44          |
|2020-11-01|{2, credits_consumer}  |24728  |44          |
|2020-11-01|{3, point}             |24728  |44          |
|2020-11-01|{0, point}             |25517  |44          |
|2020-11-01|{1, credits_consumer}  |25517  |44          |
|2020-11-01|{2, transport}         |25517  |44          |
|2020-11-01|{0, point}             |57587  |44          |
|2020-11-01|{0, transport}         |13609  |44          |
|2020-11-01|{0, cellphone_recharge}|3708   |44          |
|2020-11-01|{1, prepaid}           |3708   |44          |
|2020-11-01|{2

INFO:__main__:Calculating week of year for column day in pays
INFO:__main__:Calculating Rank for column week_of_year in pays
INFO:__main__:Calculating Row_Number for column week_of_year in prints
INFO:__main__:Showing processed data for Prints


+----------+-----------------------+-------+------------+
|day       |event_data             |user_id|week_of_year|
+----------+-----------------------+-------+------------+
|2020-11-01|{0, cellphone_recharge}|98702  |44          |
|2020-11-01|{2, point}             |3708   |44          |
|2020-11-01|{3, send_money}        |3708   |44          |
|2020-11-01|{0, transport}         |93963  |44          |
|2020-11-01|{1, cellphone_recharge}|93963  |44          |
|2020-11-01|{0, link_cobro}        |94945  |44          |
|2020-11-01|{1, cellphone_recharge}|94945  |44          |
|2020-11-01|{2, prepaid}           |89026  |44          |
|2020-11-01|{0, link_cobro}        |7616   |44          |
|2020-11-01|{0, link_cobro}        |63471  |44          |
|2020-11-01|{1, send_money}        |98277  |44          |
|2020-11-01|{2, cellphone_recharge}|83634  |44          |
|2020-11-01|{0, cellphone_recharge}|2367   |44          |
|2020-11-01|{0, credits_consumer}  |14472  |44          |
|2020-11-01|{2

In [6]:
df_prints.count()

                                                                                

508617

In [7]:
df_taps.count()

50859

In [8]:
df_pays.count()

756483

In [9]:
df_prints.createOrReplaceTempView("prints")
df_taps.createOrReplaceTempView("taps")
df_pays.createOrReplaceTempView("pays")

In [10]:
spark.sql("""
select user_id,
count(1) q ,
min(week_of_year),
max(week_of_year)
from prints
group by user_id
order by q desc
""").show(truncate = False)

[Stage 12:>                                                         (0 + 1) / 1]

+-------+---+-----------------+-----------------+
|user_id|q  |min(week_of_year)|max(week_of_year)|
+-------+---+-----------------+-----------------+
|35156  |27 |44               |48               |
|9704   |27 |45               |49               |
|61554  |26 |44               |47               |
|88770  |26 |44               |47               |
|30781  |25 |45               |48               |
|51870  |25 |45               |48               |
|5352   |25 |45               |48               |
|48782  |25 |45               |48               |
|64536  |24 |45               |48               |
|38850  |24 |45               |48               |
|33842  |24 |45               |48               |
|56578  |24 |44               |49               |
|59876  |24 |45               |48               |
|65036  |24 |45               |49               |
|41940  |24 |44               |48               |
|88191  |24 |45               |48               |
|20457  |24 |45               |48               |


                                                                                

In [11]:
spark.sql("""
select count(1) q ,
min(week_of_year),
avg(week_of_year),
max(week_of_year)
from prints
order by q desc
""").show(truncate = False)

[Stage 15:>                                                         (0 + 1) / 1]

+------+-----------------+-----------------+-----------------+
|q     |min(week_of_year)|avg(week_of_year)|max(week_of_year)|
+------+-----------------+-----------------+-----------------+
|508617|44               |46.46289447659044|49               |
+------+-----------------+-----------------+-----------------+



                                                                                

In [12]:
spark.sql("""
select x.*
from
(select * ,
rank() over (partition by user_id order by week_of_year desc) rown
from prints
where user_id = 1) x
""").show(truncate = False)

[Stage 18:>                                                         (0 + 1) / 1]

+----------+-----------------------+-------+------------+----+
|day       |event_data             |user_id|week_of_year|rown|
+----------+-----------------------+-------+------------+----+
|2020-11-30|{0, transport}         |1      |49          |1   |
|2020-11-30|{1, point}             |1      |49          |1   |
|2020-11-30|{2, cellphone_recharge}|1      |49          |1   |
|2020-11-30|{3, link_cobro}        |1      |49          |1   |
|2020-11-23|{0, send_money}        |1      |48          |5   |
|2020-11-23|{1, transport}         |1      |48          |5   |
|2020-11-23|{2, cellphone_recharge}|1      |48          |5   |
|2020-11-17|{0, transport}         |1      |47          |8   |
|2020-11-17|{1, point}             |1      |47          |8   |
|2020-11-12|{0, link_cobro}        |1      |46          |10  |
|2020-11-14|{0, link_cobro}        |1      |46          |10  |
|2020-11-14|{1, prepaid}           |1      |46          |10  |
|2020-11-14|{2, credits_consumer}  |1      |46         

                                                                                

In [13]:
spark.sql("""
select user_id
,week_of_year
,count(1) q
from prints
group by user_id
,week_of_year
order by user_id , week_of_year desc
""").show(truncate = False)

[Stage 21:>                                                         (0 + 1) / 1]

+-------+------------+---+
|user_id|week_of_year|q  |
+-------+------------+---+
|1      |49          |4  |
|1      |48          |3  |
|1      |47          |2  |
|1      |46          |4  |
|1      |45          |4  |
|2      |48          |3  |
|2      |45          |2  |
|3      |48          |3  |
|3      |47          |3  |
|4      |49          |2  |
|4      |47          |4  |
|4      |45          |3  |
|5      |47          |2  |
|6      |48          |3  |
|6      |47          |2  |
|7      |47          |7  |
|8      |46          |3  |
|8      |45          |3  |
|9      |48          |1  |
|9      |47          |4  |
+-------+------------+---+
only showing top 20 rows



                                                                                

In [14]:
spark.sql("""
select user_id
,count(1) q
from prints
group by user_id
order by q desc  
""").show(truncate = False)

[Stage 24:>                                                         (0 + 1) / 1]

+-------+---+
|user_id|q  |
+-------+---+
|35156  |27 |
|9704   |27 |
|61554  |26 |
|88770  |26 |
|30781  |25 |
|51870  |25 |
|5352   |25 |
|48782  |25 |
|64536  |24 |
|38850  |24 |
|33842  |24 |
|56578  |24 |
|59876  |24 |
|65036  |24 |
|41940  |24 |
|88191  |24 |
|20457  |24 |
|50813  |24 |
|95769  |24 |
|19656  |24 |
+-------+---+
only showing top 20 rows



                                                                                

In [15]:
spark.sql("""
select a.*
,case when b.user_id is not null then "y" else "n" end click
from prints a
left join taps b
on a.user_id = b.user_id
and a.event_data = b.event_data
and a.day = b.day
where a.user_id = 3708
""").show(30, truncate = False)

[Stage 28:>                                                         (0 + 1) / 1]

+----------+-----------------------+-------+------------+-----+
|day       |event_data             |user_id|week_of_year|click|
+----------+-----------------------+-------+------------+-----+
|2020-11-01|{0, cellphone_recharge}|3708   |44          |n    |
|2020-11-01|{1, prepaid}           |3708   |44          |n    |
|2020-11-01|{2, point}             |3708   |44          |y    |
|2020-11-01|{3, send_money}        |3708   |44          |y    |
|2020-11-09|{0, credits_consumer}  |3708   |46          |n    |
|2020-11-13|{0, link_cobro}        |3708   |46          |n    |
|2020-11-25|{0, link_cobro}        |3708   |48          |n    |
|2020-11-25|{1, transport}         |3708   |48          |n    |
+----------+-----------------------+-------+------------+-----+



                                                                                

In [85]:
df_prints.groupBy("user_id").agg(max("day")).show(10)

[Stage 203:>                                                        (0 + 1) / 1]

+-------+----------+
|user_id|  max(day)|
+-------+----------+
|  25517|2020-11-28|
|  97186|2020-11-15|
|  16503|2020-11-21|
|   5803|2020-11-29|
|  38868|2020-11-29|
|  73683|2020-11-15|
|  49331|2020-11-17|
|  38311|2020-11-27|
|  45341|2020-11-30|
|  80006|2020-11-28|
+-------+----------+
only showing top 10 rows



                                                                                

In [17]:
df_prints = df_prints.withColumn("date",to_timestamp(col("day")).cast("bigint"))
df_prints = df_prints.withColumn("print_label",(col("event_data.value_prop")))
df_prints.printSchema()

root
 |-- day: date (nullable = true)
 |-- event_data: struct (nullable = true)
 |    |-- position: integer (nullable = true)
 |    |-- value_prop: string (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- week_of_year: integer (nullable = true)
 |-- date: long (nullable = true)
 |-- print_label: string (nullable = true)



In [54]:
from pyspark.sql.window import Window
days = lambda i: i * 86400 ## total number of seconds per day
window_spec = Window.partitionBy("a.user_id","print_label").orderBy(col("date")).rangeBetween(days(-21), days(-1))

In [55]:
df_prints = df_prints.alias("a").withColumn("count_print_last_3_weeks",count("a.user_id").over(window_spec))

In [56]:
df_prints.where(col('user_id')==35156).show(40, truncate = False)

[Stage 38:>                                                         (0 + 1) / 1]

+----------+-----------------------+-------+------------+----------+------------------+------------------------+
|day       |event_data             |user_id|week_of_year|date      |print_label       |count_print_last_3_weeks|
+----------+-----------------------+-------+------------+----------+------------------+------------------------+
|2020-11-01|{3, cellphone_recharge}|35156  |44          |1604206800|cellphone_recharge|0                       |
|2020-11-11|{2, cellphone_recharge}|35156  |46          |1605070800|cellphone_recharge|1                       |
|2020-11-14|{0, cellphone_recharge}|35156  |46          |1605330000|cellphone_recharge|2                       |
|2020-11-01|{2, credits_consumer}  |35156  |44          |1604206800|credits_consumer  |0                       |
|2020-11-06|{1, credits_consumer}  |35156  |45          |1604638800|credits_consumer  |1                       |
|2020-11-13|{1, credits_consumer}  |35156  |46          |1605243600|credits_consumer  |2        

                                                                                

In [63]:
df_join = df_prints.alias("a").join(df_taps.alias("b"), (col("a.user_id") == col("b.user_id")) \
                                    & (col("a.event_data") == col("b.event_data")) \
                                    & (col("a.day") == col("b.day")), "leftouter") \
.withColumn("clicked", when(expr("b.user_id is not null"), "Y").otherwise("N")) \
.withColumn("count_click_last_3_weeks",count("b.user_id").over(window_spec)) \
.select("a.*","clicked","count_click_last_3_weeks")

In [64]:
df_join.show(10, truncate = False)

                                                                                

+----------+-----------------------+-------+------------+----------+------------------+------------------------+-------+------------------------+
|day       |event_data             |user_id|week_of_year|date      |print_label       |count_print_last_3_weeks|clicked|count_click_last_3_weeks|
+----------+-----------------------+-------+------------+----------+------------------+------------------------+-------+------------------------+
|2020-11-23|{2, cellphone_recharge}|1      |48          |1606107600|cellphone_recharge|0                       |N      |0                       |
|2020-11-30|{2, cellphone_recharge}|1      |49          |1606712400|cellphone_recharge|1                       |N      |0                       |
|2020-11-03|{3, credits_consumer}  |1      |45          |1604379600|credits_consumer  |0                       |N      |0                       |
|2020-11-14|{2, credits_consumer}  |1      |46          |1605330000|credits_consumer  |1                       |N      |0   

In [65]:
df_join.count()

                                                                                

508617

In [80]:
# Defining the result dataset with proper calculations.
dataset = df_join.join(df_pays.alias("c"), (col("a.day") == col("c.pay_date")) \
                       & (col("a.print_label") == col("c.value_prop")) \
                       & (col("a.user_id") == col("c.user_id")), \
                       "leftouter") \
.withColumn("count_pays_last_3_weeks", count("c.user_id").over(window_spec)) \
.withColumn("sum_pays_last_3_weeks", sum("c.total").over(window_spec))

In [81]:
dataset.count()

                                                                                

508617

In [83]:
# Retrieving results
dataset.select("a.*","clicked","count_click_last_3_weeks","count_pays_last_3_weeks","sum_pays_last_3_weeks").na.fill(0).show(100, truncate = False)

[Stage 197:>                                                        (0 + 1) / 1]

+----------+-----------------------+-------+------------+----------+------------------+------------------------+-------+------------------------+-----------------------+---------------------+
|day       |event_data             |user_id|week_of_year|date      |print_label       |count_print_last_3_weeks|clicked|count_click_last_3_weeks|count_pays_last_3_weeks|sum_pays_last_3_weeks|
+----------+-----------------------+-------+------------+----------+------------------+------------------------+-------+------------------------+-----------------------+---------------------+
|2020-11-23|{2, cellphone_recharge}|1      |48          |1606107600|cellphone_recharge|0                       |N      |0                       |0                      |0.0                  |
|2020-11-30|{2, cellphone_recharge}|1      |49          |1606712400|cellphone_recharge|1                       |N      |0                       |0                      |0.0                  |
|2020-11-03|{3, credits_consumer}  |1   

                                                                                