<h1 align="center">
  <a href="https://escale.com.br/"><img src="https://www.kaszek.com/wp-content/uploads/2018/04/logo-escale-black.png" alt="Markdownify" width="200"></a>
  <br>
  Clickstream Events Analysis
</h1>

<h4 align="center">An analytical approach to clickstream and sessions data using <a href="https://spark.apache.org/docs/latest/api/python/pyspark.html" target="_blank">Pyspark</a>.</h4>

## Author

2020 - Eduardo Trevisani

Developed from case supplied by Escale, available on this [online document](https://escaletech.github.io/dataplatform/data-engineer-test).

## Requirements
---
***Python >= 3.6***

* PySpark >= 2.4.3 
  - Using environment variable `export PYSPARK_PYTHON=${replace_for_your_python3_home_location}`
* requests
* shutils

## Initial settings: clickstream data download
---

In [1]:
import os
import requests
import shutil

In [2]:
files= ["https://d3l36jjwr70u5l.cloudfront.net/data-engineer-test/part-00000.json.gz",
        "https://d3l36jjwr70u5l.cloudfront.net/data-engineer-test/part-00001.json.gz",
        "https://d3l36jjwr70u5l.cloudfront.net/data-engineer-test/part-00002.json.gz",
        "https://d3l36jjwr70u5l.cloudfront.net/data-engineer-test/part-00003.json.gz",
        "https://d3l36jjwr70u5l.cloudfront.net/data-engineer-test/part-00004.json.gz",
        "https://d3l36jjwr70u5l.cloudfront.net/data-engineer-test/part-00005.json.gz",
        "https://d3l36jjwr70u5l.cloudfront.net/data-engineer-test/part-00006.json.gz",
        "https://d3l36jjwr70u5l.cloudfront.net/data-engineer-test/part-00007.json.gz",
        "https://d3l36jjwr70u5l.cloudfront.net/data-engineer-test/part-00008.json.gz",
        "https://d3l36jjwr70u5l.cloudfront.net/data-engineer-test/part-00009.json.gz"]

In [3]:
# Replace by a convenient file system path to store the downloaded files
FILES_FOLDER_PATH = '/tmp/clickstream_data/'

os.makedirs(FILES_FOLDER_PATH, exist_ok=True)

In [4]:
for url in files:
    filename = f'{FILES_FOLDER_PATH}{url.split("/")[-1]}'

    r = requests.get(url, stream=True)
    
    if r.status_code == 200:
        with open(filename, 'wb') as f:
            r.raw.decode_content = True  # just in case transport encoding was applied
            shutil.copyfileobj(r.raw, f)

## Initial settings: Spark initialization
---

#### References
* https://www.knowru.com/blog/2-tunings-you-should-make-spark-applications-running-emr/
* https://stackoverflow.com/questions/39868263/spark-load-data-and-add-filename-as-dataframe-column

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name, split, reverse
from operator import add

# Using Kryo Serialize to enable a high performance on data serialization
spark = SparkSession.builder \
                   .master("local")\
                   .appName("ClickstreamEventsAnalysis")\
                   .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")\
                   .getOrCreate()

df = spark.read.json(f"{FILES_FOLDER_PATH}/*.json.gz")
df = df.withColumn("filename", reverse(split(input_file_name(), '/'))[0])
df = df.repartition("anonymous_id").sortWithinPartitions("anonymous_id", "device_sent_timestamp")

## Step 1: unique sessions analysis
---

#### References
* https://spark.apache.org/docs/latest/api/python/pyspark.html?#pyspark.RDD.aggregateByKey
* https://spark.apache.org/docs/latest/api/python/pyspark.html?#pyspark.RDD.reduceByKey
* https://sparkbyexamples.com/pyspark/pyspark-reducebykey-usage-with-examples/
* https://medium.com/@mukeshkumar_46704/apache-spark-rdd-api-using-pyspark-91a1f17507c
* https://medium.com/@yesilliali/apache-spark-understanding-zerovalue-in-aggregatebykey-function-3d7df62567ae
* https://b.tdhopper.com/blog/pysparks-aggregatebykey-method/

In [6]:
kv_pairs = df.rdd.map(lambda x: (x.anonymous_id, (x[1:])))

def seqFunc(x, y):
    is_new_session = 1 if int(y[2]) >= x[0] + 1800000 else 0
    return (y[2], y[8], x[2] + is_new_session)

def combFunc(x, y):
    last_timestamp = x[0] if x[0] > y[0] else y[0]
    return (last_timestamp, x[1], x[2] + y[2])

agg_kv_pairs = kv_pairs.aggregateByKey((0,'',0), seqFunc, combFunc)
agg_filename_pairs = agg_kv_pairs.map(lambda x: (x[1][1], (x[1][2])))
agg_unique_sessions = agg_filename_pairs.reduceByKey(add)

In [7]:
# Converting Spark dataframe to Pandas dataframe
# Estimated time to completition: 35 min (local execution with dual-core i5 processor)

pd_step_1 = agg_unique_sessions.toDF(["filename","unique_sessions"])\
                               .toPandas()\
                               .sort_values('filename')
print(pd_step_1)

             filename  unique_sessions
7  part-00000.json.gz         10234861
4  part-00001.json.gz         10230776
0  part-00002.json.gz         10227177
1  part-00003.json.gz         10233329
3  part-00004.json.gz         10233306
8  part-00005.json.gz         10235200
2  part-00006.json.gz         10232658
5  part-00007.json.gz         10236335
6  part-00008.json.gz         10228150
9  part-00009.json.gz         10229042


In [8]:
# Converting Pandas dataframe to JSON string
result_step_1 = pd_step_1.sort_values('filename')\
                         .set_index('filename')\
                         .to_dict()['unique_sessions']
print(result_step_1)

{'part-00000.json.gz': 10234861, 'part-00001.json.gz': 10230776, 'part-00002.json.gz': 10227177, 'part-00003.json.gz': 10233329, 'part-00004.json.gz': 10233306, 'part-00005.json.gz': 10235200, 'part-00006.json.gz': 10232658, 'part-00007.json.gz': 10236335, 'part-00008.json.gz': 10228150, 'part-00009.json.gz': 10229042}
