In [50]:
from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .master('local') \
        .appName('Capstone') \
        .getOrCreate()

In [51]:
spark.version

'3.1.2'

In [52]:
from pyspark.sql.types import StructType, StructField, StringType, MapType, TimestampType, DoubleType, BooleanType

clickstream_schema = StructType([
    StructField("userId", StringType(), True),
    StructField("eventId", StringType(), True),
    StructField("eventType", StringType(), True),
    StructField("eventTime", TimestampType(), True),
    StructField("attributes", StringType(), True)
  ])
purchases_schema = StructType([
    StructField("purchaseId", StringType(), True),
    StructField("purchaseTime", TimestampType(),True),
    StructField("billingCost", DoubleType(), True),
    StructField("isConfirmed", BooleanType(), True)
  ])

clickstream_raw_df = spark.read \
                    .option("header", "true") \
                    .schema(clickstream_schema) \
                    .csv("capstone-dataset/mobile_app_clickstream/*.gz")
purchases_df = spark.read \
                    .option("header", "true") \
                    .schema(purchases_schema) \
                    .csv("capstone-dataset/user_purchases/*.gz")

In [53]:
import json
from pyspark.sql.functions import udf

def to_map_type(value):
    if value:
        try:
            return json.loads(value.replace('\'', '"'))
        except json.JSONDecodeError:
            return None
    return None

to_map_udf = udf(to_map_type, MapType(StringType(), StringType()))

clickstream_df = clickstream_raw_df.withColumn('attributes', to_map_udf('attributes'))

clickstream_df.printSchema()

root
 |-- userId: string (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventType: string (nullable = true)
 |-- eventTime: timestamp (nullable = true)
 |-- attributes: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



In [54]:
from pyspark.sql.window import Window
from pyspark.sql.functions import last

def generate_session_id(user_id, event_time, event_type, attributes):
    if event_type == 'app_open':
        try:
            session_id = f'{user_id}_{event_time.timestamp()}'
            campaign_id = attributes['campaign_id']
            channel_id = attributes['channel_id']
            return session_id, campaign_id, channel_id
        except Exception:
            return None
    return None

udf_schema = StructType([
    StructField('sessionId', StringType(), False),
    StructField('campaignId', StringType(), False),
    StructField('channelId', StringType(), False)
])
generate_session_udf = udf(generate_session_id, udf_schema)

def add_session_column(input_df):
    w = Window.partitionBy('userId').orderBy('eventTime')
    
    return input_df \
            .withColumn('sessionInfo', generate_session_udf('userId', 'eventTime', 'eventType', 'attributes')) \
            .withColumn('sessionInfo', last('sessionInfo', True).over(w))

clicks_sess_df = add_session_column(clickstream_df)
clicks_sess_df.printSchema()

root
 |-- userId: string (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventType: string (nullable = true)
 |-- eventTime: timestamp (nullable = true)
 |-- attributes: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- sessionInfo: struct (nullable = true)
 |    |-- sessionId: string (nullable = false)
 |    |-- campaignId: string (nullable = false)
 |    |-- channelId: string (nullable = false)



In [57]:
purchases_clicks_df = clicks_sess_df.filter(clicks_sess_df.eventType == 'purchase')

purchases_projection_df = purchases_df \
                            .join(purchases_clicks_df, 
                                  purchases_df.purchaseId == purchases_clicks_df.attributes['purchase_id']) \
                            .select(purchases_df.purchaseId, 
                                    purchases_df.purchaseTime,
                                    purchases_df.billingCost,
                                    purchases_df.isConfirmed,
                                    purchases_clicks_df.sessionInfo.sessionId.alias('sessionId'),
                                    purchases_clicks_df.sessionInfo.campaignId.alias('campaignId'),
                                    purchases_clicks_df.sessionInfo.channelId.alias('channelId'))

purchases_projection_df.printSchema()
purchases_projection_df.createOrReplaceTempView('purchases_projection')
spark.catalog.cacheTable('purchases_projection')

root
 |-- purchaseId: string (nullable = true)
 |-- purchaseTime: timestamp (nullable = true)
 |-- billingCost: double (nullable = true)
 |-- isConfirmed: boolean (nullable = true)
 |-- sessionId: string (nullable = true)
 |-- campaignId: string (nullable = true)
 |-- channelId: string (nullable = true)



In [56]:
top_campaigns_df = spark.sql("""
    SELECT campaignId, sum(billingCost) as revenue FROM purchases_projection
    WHERE isConfirmed
    GROUP BY campaignId
    ORDER BY sum(billingCost) DESC
    LIMIT 10
""")

top_campaigns_df.show()

+----------+------------------+
|campaignId|           revenue|
+----------+------------------+
|       190|2041060.8400000012|
|       528|1510378.1599999997|
|       325|1470622.3199999996|
|       585|        1047838.02|
|       779|1031561.1399999999|
|       650|1031403.1300000004|
|       859|1024151.6600000001|
|       610|1020675.5800000003|
|       669|1019272.5699999997|
|       461|1018971.0200000006|
+----------+------------------+



In [47]:
most_popular_channels_df = spark.sql("""
    WITH channelStatistics as (
        SELECT campaignId, channelId, count(DISTINCT sessionId) as uniqueSessions FROM purchases_projection
        GROUP BY campaignId, channelId
    )
    SELECT cs.campaignId, cs.channelId, maxcs.uniqueSessions FROM channelStatistics as cs
    INNER JOIN (
        SELECT campaignId, max(uniqueSessions) as uniqueSessions FROM channelStatistics GROUP BY campaignId
    ) as maxcs
    ON cs.uniqueSessions = maxcs.uniqueSessions 
        AND cs.campaignId = maxcs.campaignId
""")

most_popular_channels_df.show()

+----------+------------+--------------+
|campaignId|   channelId|uniqueSessions|
+----------+------------+--------------+
|       766|      VK Ads|           419|
|       350|      VK Ads|           425|
|       823|Facebook Ads|           428|
|       451|  Yandex Ads|           821|
|       271|  Yandex Ads|           417|
|       747| Twitter Ads|           409|
|       888|Facebook Ads|           399|
|       963|  Google Ads|           406|
|       604|  Google Ads|           408|
|       924| Twitter Ads|           432|
|       833|  Yandex Ads|           457|
|       538| Twitter Ads|           422|
|       502|Facebook Ads|           408|
|       734|  Yandex Ads|           418|
|       262| Twitter Ads|           400|
|       998|  Yandex Ads|           415|
|       911|Facebook Ads|           414|
|       611|  Google Ads|           421|
|       298|      VK Ads|           815|
|       695| Twitter Ads|           400|
+----------+------------+--------------+
only showing top

In [48]:
purchases_projection_df.write.parquet("results/purchases_projection.parquet")
top_campaigns_df.write.parquet("results/top_campaigns.parquet")
most_popular_channels_df.write.parquet("results/most_popular_channels.parquet")

In [26]:
def test_to_map_type_correct_input_single_quotes():
    test_value = "{'a': 'b'}"
    result = to_map_type(test_value)
    expected = {'a': 'b'}
    assert result == expected
    
test_to_map_type_correct_input_single_quotes()

In [27]:
def test_to_map_type_correct_input_double_quotes():
    test_value = '{"a": "b"}'
    result = to_map_type(test_value)
    expected = {'a': 'b'}
    assert result == expected
    
test_to_map_type_correct_input_double_quotes()

In [23]:
def test_to_map_type_null_input():
    test_value = None
    result = to_map_type(test_value)
    expected = None
    assert result == expected
    
test_to_map_type_null_input()

In [24]:
def test_to_map_type_incorrect_input():
    test_value = "{'a': }"
    result = to_map_type(test_value)
    expected = None
    assert result == expected
    
test_to_map_type_incorrect_input()

In [38]:
def test_generate_session_id_correct_input():
    from datetime import datetime
    test_user_id = 1
    test_event_time = datetime(2021, 1, 1)
    test_event_type = 'app_open'
    test_attributes = {'campaign_id': 1, 'channel_id': 2}
    result = generate_session_id(test_user_id, test_event_time, test_event_type, test_attributes)
    expected = f'1_{test_event_time.timestamp()}', 1, 2
    assert result == expected
    
test_generate_session_id_correct_input()

In [39]:
def test_generate_session_id_incorrect_input():
    from datetime import datetime
    test_user_id = 1
    test_event_time = None
    test_event_type = 'app_open'
    test_attributes = None
    result = generate_session_id(test_user_id, test_event_time, test_event_type, test_attributes)
    expected = None
    assert result == expected
    
test_generate_session_id_incorrect_input()

In [40]:
def test_generate_session_id_wrong_event_type():
    from datetime import datetime
    test_user_id = 1
    test_event_time = datetime(2021, 1, 1)
    test_event_type = 'app_close'
    test_attributes = {'campaign_id': 1, 'channel_id': 2}
    result = generate_session_id(test_user_id, test_event_time, test_event_type, test_attributes)
    expected = None
    assert result == expected
    
test_generate_session_id_wrong_event_type()

In [78]:
def assert_frame_equal_with_sort(results, expected, keycolumns):
    from pandas.testing import assert_frame_equal
    
    results_sorted = results.sort_values(by=keycolumns).reset_index(drop=True)
    expected_sorted = expected.sort_values(by=keycolumns).reset_index(drop=True)
    assert_frame_equal(results_sorted, expected_sorted)

In [81]:
def test_add_session_column():
    import pandas as pd
    from datetime import datetime

    test_data = {
            'userId': [1, 1, 2, 2, 2, 2],
            'eventId': [1, 2, 3, 4, 5, 6],
            'eventType': ['app_open', 'app_close'] * 3,
            'eventTime': [datetime(2020, 1, 1), datetime(2020, 1, 2), datetime(2020, 1, 3),
                         datetime(2020, 1, 4), datetime(2020, 1, 5), datetime(2020, 1, 6)],
            'attributes': [{'campaign_id': '332', 'channel_id': 'Facebook Ads'}, None] * 3
    }
    test_pd_df = pd.DataFrame(test_data)

    test_df = spark.createDataFrame(test_pd_df)
    result_df = add_session_column(test_df)
    result_pd_df = result_df.toPandas()

    expected_data = test_data.copy()
    expected_data['sessionInfo'] = [
        ('1_1577836800.0', '332', 'Facebook Ads'),
        ('1_1577836800.0', '332', 'Facebook Ads'),
        ('2_1578009600.0', '332', 'Facebook Ads'),
        ('2_1578009600.0', '332', 'Facebook Ads'),
        ('2_1578182400.0', '332', 'Facebook Ads'),
        ('2_1578182400.0', '332', 'Facebook Ads'),
    ]
    expected_pd_df = pd.DataFrame(expected_data)
    assert_frame_equal_with_sort(result_pd_df, expected_pd_df, ['eventTime'])
    
test_add_session_column()

In [37]:
spark.stop()