## Task 1 - Build Purchases Attribution Projection

In [1]:
from pyspark.sql import SparkSession
from pytest import fixture


pyspark = SparkSession.builder.master("local[2]").getOrCreate()

click_stream = pyspark.read.csv('tests/fixtures/clickstream_sample.tsv', sep=r'\t', header=True)
purchases = pyspark.read.csv('tests/fixtures/purchases_sample.tsv', sep=r'\t', header=True)


### Task 1.1 - Implement it by utilizing default Spark SQL capabilities.

In [2]:
%%writefile etl/purchase_attribution.py

from pyspark.sql import Window
import pyspark.sql.functions as psf 

def get_sessions(click_stream):

    click_session_use = click_stream.where(psf.col('eventType').isin({'app_open', 'app_close'}))

    # FIXME: maybe add another window to remove join
    w0 = Window.partitionBy('userId')
    sessions = (click_session_use.select(
            psf.col('userId'),
            psf.col('eventTime').alias('sessionStart').cast('timestamp'),
            psf.lead('eventTime').over(
                w0.orderBy(psf.col('eventTime').asc())
            ).alias('sessionEnd').cast('timestamp'),
            psf.get_json_object(psf.col('attributes'), '$.channel_id').alias('channelId'),
            psf.get_json_object(psf.col('attributes'), '$.campaign_id').alias('campaignId'),
            psf.col('eventId').alias('sessionId')
        ).where(psf.col('eventType') == 'app_open')
        .orderBy(psf.col('userId').asc(), psf.col('sessionStart'))
    )

    return sessions

Overwriting etl/purchase_attribution.py


In [3]:
%%writefile -a etl/purchase_attribution.py

def get_purchase_attribution(click_stream, sessions, purchases):

    click_purchases = click_stream.where(
        psf.col('eventType') == "purchase"
    ).withColumnRenamed(
        'userId', 'purchaseUserId'
    ).withColumn(
        'purchaseStreamId', psf.get_json_object(psf.col('attributes'), '$.purchase_id')
    ).drop('attributes')

    purchases_stream = click_purchases.join(sessions,
        [
            click_purchases.purchaseUserId == sessions.userId,
            sessions.sessionStart < click_purchases.eventTime, 
            sessions.sessionEnd > click_purchases.eventTime
        ]
    ).select(
        'purchaseStreamId', 'sessionId', 'campaignId', 'channelId'
    )

    purchase_attribution = purchases_stream.join(
        purchases, purchases.purchaseId == purchases_stream.purchaseStreamId
    ).select(
        'purchaseId', 'purchaseTime', 'billingCost', 'isConfirmed', 'sessionId', 'campaignId', 'channelId'
    ).orderBy(
        'purchaseTime'
    )

    return purchase_attribution


Appending to etl/purchase_attribution.py


In [4]:
from etl.purchase_attribution import get_sessions, get_purchase_attribution

sessions = get_sessions(click_stream)
sessions.show()

purchase_attribution = get_purchase_attribution(click_stream, sessions, purchases)
purchase_attribution.show()

+------+-------------------+-------------------+------------+----------+---------+
|userId|       sessionStart|         sessionEnd|   channelId|campaignId|sessionId|
+------+-------------------+-------------------+------------+----------+---------+
|    u1|2019-01-01 00:00:00|2019-01-01 00:02:00|  Google Ads|      cmp1|    u1_e1|
|    u2|2019-01-01 00:00:00|2019-01-01 00:04:00|  Yandex Ads|      cmp1|    u2_e1|
|    u2|2019-01-02 00:00:00|2019-01-02 00:04:00|  Yandex Ads|      cmp2|    u2_e6|
|    u3|2019-01-01 00:00:00|2019-01-01 00:02:00|Facebook Ads|      cmp2|    u3_e1|
|    u3|2019-01-01 01:11:11|2019-01-01 01:12:30|  Google Ads|      cmp1|    u3_e5|
|    u3|2019-01-02 02:00:00|2019-01-02 02:15:40|  Yandex Ads|      cmp2|   u3_e10|
|    u3|2019-01-02 13:00:10|2019-01-02 13:06:00|  Yandex Ads|      cmp2|   u3_e19|
+------+-------------------+-------------------+------------+----------+---------+

+----------+-------------------+-----------+-----------+---------+----------+---------

### Task 1.2 - Implement it by using a custom Aggregator or UDAF.

In [5]:
%%writefile etl/purchase_attribution_udf.py

import json
import operator
from pyspark.sql import types as T
import pyspark.sql.functions as psf 


def collect_purchases(click_stream):
    click_stream = sorted(click_stream, key=operator.itemgetter(1, 0))

    purchases = []
    current_sesh = None, None, None

    for meta in click_stream:
        event_id, _, event_type, event_attr = meta

        if event_attr:
            event_attr = json.loads(event_attr)

        if event_type == 'app_close':
            current_sesh = None, None, None

        elif event_type == 'app_open':
            try:
                current_sesh = [
                    event_id,
                    event_attr['campaign_id'],
                    event_attr['channel_id']
                ]
            except KeyError:
                pass

        elif event_type == 'purchase':
            try:
                purchase_id = event_attr['purchase_id']

                purchases.append([
                    purchase_id,
                    *current_sesh
                ])
            except KeyError:
                pass
    
    return purchases

collect_purchases_udf = psf.udf(collect_purchases, T.ArrayType(T.ArrayType(T.StringType())))

Overwriting etl/purchase_attribution_udf.py


In [6]:
%%writefile -a etl/purchase_attribution_udf.py

def get_purchase_attribution_udf(click_stream, purchases):
    purchases_stream = click_stream.where(
        psf.col('eventType').isin({'app_open', 'app_close', 'purchase'})
    ).withColumn(
        'purchase_meta',
        psf.array(psf.col('eventId'), psf.col('eventTime'), psf.col('eventType'), psf.col('attributes'))
    ).groupBy(
        psf.col('userId')
    ).agg(
        collect_purchases_udf(psf.collect_list('purchase_meta')).alias('purchases')
    ).withColumn(
        'purchase',
        psf.explode(psf.col('purchases'))
    ).select(
        psf.col('purchase')[0].alias('purchaseStreamId'),
        psf.col('purchase')[1].alias('sessionId'),
        psf.col('purchase')[2].alias('campaignId'),
        psf.col('purchase')[3].alias('channelId')
    )

    purchase_attribution = purchases_stream.join(
        purchases, purchases.purchaseId == purchases_stream.purchaseStreamId
    ).select(
        'purchaseId', 'purchaseTime', 'billingCost', 'isConfirmed', 'sessionId', 'campaignId', 'channelId'
    ).orderBy(
        'purchaseTime'
    )

    return purchase_attribution

Appending to etl/purchase_attribution_udf.py


In [7]:
from etl.purchase_attribution_udf import get_purchase_attribution_udf

purchase_attribution_udf = get_purchase_attribution_udf(click_stream, purchases)
purchase_attribution_udf.show()

+----------+-------------------+-----------+-----------+---------+----------+----------+
|purchaseId|       purchaseTime|billingCost|isConfirmed|sessionId|campaignId| channelId|
+----------+-------------------+-----------+-----------+---------+----------+----------+
|        p1| 2019-01-01 0:01:05|      100.5|       TRUE|    u1_e1|      cmp1|Google Ads|
|        p2| 2019-01-01 0:03:10|        200|       TRUE|    u2_e1|      cmp1|Yandex Ads|
|        p3| 2019-01-01 1:12:15|        300|      FALSE|    u3_e5|      cmp1|Google Ads|
|        p4| 2019-01-01 2:13:05|       50.2|       TRUE|   u3_e10|      cmp2|Yandex Ads|
|        p5| 2019-01-01 2:15:05|         75|       TRUE|   u3_e10|      cmp2|Yandex Ads|
|        p6|2019-01-02 13:03:00|         99|      FALSE|   u3_e19|      cmp2|Yandex Ads|
+----------+-------------------+-----------+-----------+---------+----------+----------+



### Complete solution for the first task:

In [8]:
%%writefile task1.py

import argparse

from pyspark.sql import SparkSession
import pyspark.sql.functions as psf 

from etl.purchase_attribution import get_sessions, get_purchase_attribution

def main(click_stream_input, purchases_input, format_):
    pyspark = SparkSession.builder.master("local[2]").getOrCreate()

    options = dict()
    tsv_options = {'sep': r'\t', 'header': True}
    if format_ == 'tsv':
        format_ = 'csv'
        options = {**tsv_options}

    click_stream = pyspark.read.format(format_).load(click_stream_input, **options)
    purchases = pyspark.read.format(format_).load(purchases_input, **options)  

    sessions = get_sessions(click_stream)
    purchase_attribution = get_purchase_attribution(click_stream, sessions, purchases)

    for df, timestamp_f, file_name in (
            (purchase_attribution, 'purchaseTime', 'purchase_attribution'),
            (sessions, 'sessionStart', 'sessions')
        ):

        df.withColumn(
            'year', psf.year(timestamp_f)
        ).withColumn(
            'month', psf.month(timestamp_f)
        ).withColumn(
            'day', psf.dayofmonth(timestamp_f)
        ).write.partitionBy(
            'year', 'month', 'day'
        ).mode("overwrite").parquet(f'warehouse/{file_name}.parquet')

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('click_stream_input', type=str)
    parser.add_argument('purchases_input', type=str)
    parser.add_argument('format', choices=('parquet', 'tsv'))

    args = parser.parse_args()

    main(args.click_stream_input, args.purchases_input, args.format)


Overwriting task1.py


## Task 2 - Calculate Marketing Campaigns And Channels Statistics

### Task 2.1 - Top Campaigns: What are the Top 10 marketing campaigns that bring the biggest revenue (based on billingCost of confirmed purchases)?

In [9]:
%%writefile etl/stats.py

def get_top_campaigns(pyspark, purchase_attribution):

    purchase_attribution.createOrReplaceTempView('purchase_attribution')

    top_campaigns = pyspark.sql(
        'SELECT campaignId, SUM(billingCost) as revenue '
        'FROM purchase_attribution '
        'WHERE isConfirmed = "TRUE" '
        'GROUP BY campaignId '
        'ORDER BY revenue DESC '
        'LIMIT 10'
    )

    return top_campaigns

Overwriting etl/stats.py


### Task 2.2 - Channels engagement performance: What is the most popular (i.e. Top) channel that drives the highest amount of unique sessions (engagements) with the App in each campaign?

In [10]:
%%writefile -a etl/stats.py

def get_top_channels(pyspark, sessions):

    sessions.createOrReplaceTempView('sessions')

    top_channels = pyspark.sql(
        'SELECT campaignId, channelId, performance '
        'FROM ( '
            'SELECT '
                'ROW_NUMBER() OVER (PARTITION BY campaignId ORDER BY performance DESC) as rn, '
                'campaignId, channelId, performance '
            'FROM ( '
                'SELECT '
                    'campaignId, '
                    'channelId, '
                    'COUNT(DISTINCT(userId)) as performance ' 
                'FROM sessions '
                'GROUP BY campaignId, channelId '
                'ORDER BY performance DESC '
            ') '
        ') '
        'WHERE rn=1 '
        'ORDER BY campaignId, channelId'
    )

    return top_channels

Appending to etl/stats.py


In [11]:
from etl.stats import get_top_campaigns, get_top_channels

top_campaigns = get_top_campaigns(pyspark, purchase_attribution)
top_campaigns.show()

top_channels = get_top_channels(pyspark, sessions)
top_channels.show()

+----------+-------+
|campaignId|revenue|
+----------+-------+
|      cmp1|  300.5|
|      cmp2|  125.2|
+----------+-------+

+----------+----------+-----------+
|campaignId| channelId|performance|
+----------+----------+-----------+
|      cmp1|Google Ads|          2|
|      cmp2|Yandex Ads|          2|
+----------+----------+-----------+



### Complete solution for the second task:

In [32]:
%%writefile task2.py

import argparse

from pyspark.sql import SparkSession

from etl.stats import get_top_campaigns, get_top_channels

def main(purchase_attribution_input, sessions_input, format_):
    pyspark = SparkSession.builder.master("local[2]").getOrCreate()

    options = dict()
    tsv_options = {'sep': r'\t', 'header': True}
    if format_ == 'tsv':
        format_ = 'csv'
        options = {**tsv_options}

    purchase_attribution = pyspark.read.format(format_).load(purchase_attribution_input, **options)
    sessions = pyspark.read.format(format_).load(sessions_input, **options)  

    top_campaigns = get_top_campaigns(pyspark, purchase_attribution)
    top_channels = get_top_channels(pyspark, sessions)

    top_campaigns.show()
    top_channels.show()

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('purchase_attribution_input', type=str)
    parser.add_argument('sessions_input', type=str)
    parser.add_argument('format', choices=('parquet', 'tsv'))

    args = parser.parse_args()

    main(args.purchase_attribution_input, args.sessions_input, args.format)


Overwriting task2.py


## Task 3 - Organize data warehouse and calculate metrics for time period.


### Task 3.1 - Convert input dataset to parquet. Think about partitioning. Compare performance on top CSV input and parquet input. Save output for Task #1 as parquet as well.

In [13]:

import pyspark.sql.functions as psf 

for format_ in ['parquet', 'tsv']:
    for df, timestamp_f, file_name in (
            (click_stream, 'eventTime', 'clickstream_sample'),
            (purchases, 'purchaseTime', 'purchases_sample'),
            (purchase_attribution, 'purchaseTime', 'purchase_attribution'),
            (sessions, 'sessionStart', 'sessions')
        ):

        real_format = format_
        options = dict()
        tsv_options = {'sep': r'\t', 'header': True}
        if format_ == 'tsv':
            real_format = 'csv'
            options = {**tsv_options}


        df = df.withColumn(
            'year', psf.year(timestamp_f)
        ).withColumn(
            'month', psf.month(timestamp_f)
        ).withColumn(
            'day', psf.dayofmonth(timestamp_f)
        )
        writer = df.write
        
        if format_ == 'parquet':
            df = writer.partitionBy('year', 'month', 'day')
            
        writer.mode("overwrite").format(real_format).save(f'warehouse/{file_name}.{format_}', **options)


### CSV - Task 1 integrational test

In [14]:
%%bash

time ./.venv/bin/python3 task1.py warehouse/clickstream_sample.tsv warehouse/purchases_sample.tsv tsv


20/12/28 00:10:30 WARN Utils: Your hostname, C8119 resolves to a loopback address: 127.0.0.1; using 192.168.31.239 instead (on interface en0)
20/12/28 00:10:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
20/12/28 00:10:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/12/28 00:10:31 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
                                                                                
real	0m13.370s
user	0m0.175s
sys	0m0.050s


### CSV - Task 1 unit test

In [15]:
%%timeit

click_stream = pyspark.read.csv('warehouse/clickstream_sample.tsv', sep=r'\t', header=True)
purchases = pyspark.read.csv('warehouse/purchases_sample.tsv', sep=r'\t', header=True)

sessions = get_sessions(click_stream)
purchase_attribution = get_purchase_attribution(click_stream, sessions, purchases)
purchase_attribution.count()

824 ms ± 52.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### Parquet - Task 1 integrational test

In [16]:
%%bash

time ./.venv/bin/python3 task1.py warehouse/clickstream_sample.parquet warehouse/purchases_sample.parquet parquet


20/12/28 00:10:51 WARN Utils: Your hostname, C8119 resolves to a loopback address: 127.0.0.1; using 192.168.31.239 instead (on interface en0)
20/12/28 00:10:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
20/12/28 00:10:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/12/28 00:10:52 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
                                                                                
real	0m12.943s
user	0m0.174s
sys	0m0.053s


### Parquet - Task 1 unit test

In [17]:
%%timeit

click_stream_parquet = pyspark.read.parquet('warehouse/clickstream_sample.parquet')
purchases_parquet = pyspark.read.parquet('warehouse/purchases_sample.parquet')

sessions_parquet = get_sessions(click_stream_parquet)
purchase_attribution = get_purchase_attribution(click_stream_parquet, sessions_parquet, purchases_parquet)
purchase_attribution.count()

777 ms ± 61.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### Task 3.2 - Calculate metrics from Task #2 for different time periods:

For September 2020

For 2020-11-11

Compare performance on top csv input and partitioned parquet input. Print and analyze query plans (logical and phisical) for both inputs. 


In [18]:
def get_purchase_attribution_tsv():
    purchase_attribution = pyspark.read.csv('warehouse/purchase_attribution.tsv', sep=r'\t', header=True)
    sessions = pyspark.read.csv('warehouse/sessions.tsv', sep=r'\t', header=True)

    return purchase_attribution, sessions

def get_purchase_attribution_parquet():
    purchase_attribution_parquet = pyspark.read.parquet('warehouse/purchase_attribution.parquet')
    sessions_parquet = pyspark.read.parquet('warehouse/sessions.parquet')

    return purchase_attribution_parquet, sessions_parquet

def get_2020_9(purchase_attribution, sessions):
    purchase_attribution = purchase_attribution.where(psf.col('year') == 2020).where(psf.col('month') == 9)
    sessions = sessions.where(psf.col('year') == 2020).where(psf.col('month') == 9)

    return purchase_attribution, sessions

def get_2020_11_11(purchase_attribution, sessions):
    purchase_attribution = purchase_attribution.where(
        psf.col('year') == 2020
    ).where(
        psf.col('month') == 11
    ).where(
        psf.col('day') == 11
    )
    sessions = sessions.where(
        psf.col('year') == 2020
    ).where(
        psf.col('month') == 11
    ).where(
        psf.col('day') == 11
    )

    return purchase_attribution, sessions


### CSV 2020-09

In [19]:
%%timeit

purchase_attribution, sessions = get_2020_9(*get_purchase_attribution_tsv())

top_campaigns = get_top_campaigns(pyspark, purchase_attribution)
top_campaigns.count()
top_channels = get_top_channels(pyspark, sessions)
top_channels.count()

2.37 s ± 78.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### CSV 2020-11-11

In [20]:
%%timeit

purchase_attribution, sessions = get_2020_11_11(*get_purchase_attribution_tsv())

top_campaigns = get_top_campaigns(pyspark, purchase_attribution)
top_campaigns.count()
top_channels = get_top_channels(pyspark, sessions)
top_channels.count()

2.19 s ± 32.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [21]:
%%capture cap --no-stderr
purchase_attribution, sessions = get_2020_11_11(*get_purchase_attribution_tsv())

top_campaigns = get_top_campaigns(pyspark, purchase_attribution)
top_campaigns.explain(extended=True)

In [22]:
with open('plans/csv_top_campaigns_2020_11_11.md', 'w') as f:
     f.write(cap.stdout)


In [23]:
%%capture cap --no-stderr

top_channels = get_top_channels(pyspark, sessions)
top_channels.explain(extended=True)

In [24]:
with open('plans/csv_top_channels_2020_11_11.md', 'w') as f:
     f.write(cap.stdout)

### Parquet 2020-09

In [25]:
%%timeit

purchase_attribution, sessions = get_2020_9(*get_purchase_attribution_parquet())

top_campaigns = get_top_campaigns(pyspark, purchase_attribution)
top_campaigns.count()
top_channels = get_top_channels(pyspark, sessions)
top_channels.count()

2.15 s ± 196 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### Parquet 2020-11-11

In [26]:
%%timeit

purchase_attribution, sessions = get_2020_11_11(*get_purchase_attribution_parquet())

top_campaigns = get_top_campaigns(pyspark, purchase_attribution)
top_campaigns.count()
top_channels = get_top_channels(pyspark, sessions)
top_channels.count()

1.98 s ± 130 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [27]:
%%capture cap --no-stderr
purchase_attribution, sessions = get_2020_11_11(*get_purchase_attribution_tsv())

top_campaigns = get_top_campaigns(pyspark, purchase_attribution)
top_campaigns.explain(extended=True)

In [28]:
with open('plans/parquet_top_campaigns_2020_11_11.md', 'w') as f:
     f.write(cap.stdout)


In [29]:
%%capture cap --no-stderr

top_channels = get_top_channels(pyspark, sessions)
top_channels.explain(extended=True)

In [30]:
with open('plans/parquet_top_channels_2020_11_11.md', 'w') as f:
     f.write(cap.stdout)