Importing the Libraries

In [1]:
pip install pyspark==3.2.1

Collecting pyspark==3.2.1
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.0/199.0 kB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m00:01[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853627 sha256=25bc97f58507099b7743d7879ca8b62c9f43379f641becba6d2d5c664d4728d3
  Stored in directory: /home/jovyan/.cache/pip/wheels/15/97/bd/52908574a60b5f8e3dc4dc5a0b5be8a59ac20986ee51c2611b
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing install

In [2]:
import time
import pyspark.sql.functions as f

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *

from pyspark.sql.functions import udf
from pyspark.sql.functions import array_contains
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import desc

from itertools import islice

Reading the file from HDFS

!spark-shell --conf spark.ui.port=14058 --master yarn --deploy-mode client
!set spark.port.maxRetries=30


spark = SparkSession \
    .builder \
    .appName("Python Spark Assignment") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [31]:
spark = SparkSession \
    .builder \
    .appName("Python Spark Assignment") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sc = spark.sparkContext
rdd = sc.textFile('hdfs:/data/clickstream.csv')

Removing header from the file and parsing it

In [32]:
rdd = rdd.mapPartitionsWithIndex(
    lambda idx, it: islice(it, 1, None) if idx == 0 else it 
)
rdd = rdd.map(lambda l: l.split('\t'))
rdd.take(5)

                                                                                

[['562', '507', 'page', 'main', '1695584127'],
 ['562', '507', 'event', 'main', '1695584134'],
 ['562', '507', 'event', 'main', '1695584144'],
 ['562', '507', 'event', 'main', '1695584147'],
 ['562', '507', 'wNaxLlerrorU', 'main', '1695584154']]

Schema for the data

In [33]:
rdd = rdd.map(lambda p: Row(session_id=int(str(p[1]) + str(p[0])), event_type=str(p[2]), event_page=str(p[3]), timestamp=int(p[4])))
rdd.take(5)

[Row(session_id=507562, event_type='page', event_page='main', timestamp=1695584127),
 Row(session_id=507562, event_type='event', event_page='main', timestamp=1695584134),
 Row(session_id=507562, event_type='event', event_page='main', timestamp=1695584144),
 Row(session_id=507562, event_type='event', event_page='main', timestamp=1695584147),
 Row(session_id=507562, event_type='wNaxLlerrorU', event_page='main', timestamp=1695584154)]

Creating DataFrame

In [34]:
df = spark.createDataFrame(rdd)
df.createOrReplaceTempView('clickstream')
df.cache()
df.show(5)
df.count()

                                                                                

+----------+------------+----------+----------+
|session_id|  event_type|event_page| timestamp|
+----------+------------+----------+----------+
|    507562|        page|      main|1695584127|
|    507562|       event|      main|1695584134|
|    507562|       event|      main|1695584144|
|    507562|       event|      main|1695584147|
|    507562|wNaxLlerrorU|      main|1695584154|
+----------+------------+----------+----------+
only showing top 5 rows



                                                                                

1000000

Removing Events

In [35]:
df = df[df['event_type'] != 'event']
df.show(5)
df.count()

+----------+------------+----------+----------+
|session_id|  event_type|event_page| timestamp|
+----------+------------+----------+----------+
|    507562|        page|      main|1695584127|
|    507562|wNaxLlerrorU|      main|1695584154|
|    507562|        page|    rabota|1695584166|
|    507562|        page|      main|1695584194|
|    507562|        page|     bonus|1695584221|
+----------+------------+----------+----------+
only showing top 5 rows



                                                                                

421610

In [36]:
df = df.orderBy(['session_id', 'timestamp'], ascending=True)
df.show(15)
df.count()

                                                                                

+----------+----------+----------+----------+
|session_id|event_type|event_page| timestamp|
+----------+----------+----------+----------+
|       412|      page|      main|1695726743|
|       412|      page|   digital|1695728906|
|       412|      page|      news|1695729251|
|       412|      page|     vklad|1695730809|
|       412|      page|    rabota|1695733354|
|       412|      page|     bonus|1695734278|
|       539|      page|      main|1695794508|
|       539|      page|    rabota|1695795361|
|       539|      page|  internet|1695796728|
|       539|      page|      main|1695796979|
|       539|      page|  internet|1695797328|
|       539|      page|      news|1695799269|
|       539|      page|     vklad|1695800281|
|       579|      page|      main|1696560729|
|       579|      page|   archive|1696561214|
+----------+----------+----------+----------+
only showing top 15 rows



421610

In [37]:
df.groupby('session_id').count().count()

                                                                                

48489

In [38]:
df.groupby('session_id').count().orderBy('count', ascending=False).show(10)

[Stage 23:>                                                         (0 + 1) / 1]

+----------+-----+
|session_id|count|
+----------+-----+
|    333645|   92|
|   1953050|   86|
|   8434802|   82|
|    498217|   79|
|   9523932|   78|
|    781018|   77|
|   9494382|   73|
|   7364134|   72|
|    414869|   68|
|   8942108|   67|
+----------+-----+
only showing top 10 rows



                                                                                

In [39]:
df[df['session_id'] == 1367635].count()

0

Calculating the result

In [40]:
dataCollect = df.collect()
len(dataCollect)

                                                                                

421610

In [41]:
result = {}

In [42]:
for row in dataCollect:
    session_id = row['session_id']
    event_type = row['event_type']
    event_page = row['event_page']
    
    if session_id not in result:
        result[session_id] = []
        
    if event_type == 'page':
        result[session_id].append(event_page)
    else:
        result[session_id].append('error')

In [43]:
len(result)

48489

In [44]:
rows = []

In [45]:
for k, v in result.items():
    # create route
    route = []
    for page in v:
        if page == 'error': break
        if len(route) > 0:
            if route[-1] == page:
                continue
        route.append(page)
        
    route = '-'.join(route)
        
    # Add session_id, route to DataFrame
    rows.append(Row(k, route))

In [46]:
len(rows)

48489

In [47]:
df2 = spark.createDataFrame(rows, ['session_id', 'route'])
df2.show(5, False)

+----------+--------------------------------------------------------+
|session_id|route                                                   |
+----------+--------------------------------------------------------+
|412       |main-digital-news-vklad-rabota-bonus                    |
|539       |main-rabota-internet-main-internet-news-vklad           |
|579       |main-archive-rabota-bonus-archive-tariffs-bonus-internet|
|629       |main-internet-main-online                               |
|709       |main-vklad-internet-main-rabota-news-rabota-main        |
+----------+--------------------------------------------------------+
only showing top 5 rows



In [48]:
df2[df2['session_id'] == 507].show()



+----------+-----+
|session_id|route|
+----------+-----+
+----------+-----+



                                                                                

Proof-Check

In [49]:
df[df['session_id'] == 514].show(50, False)

+----------+----------+----------+---------+
|session_id|event_type|event_page|timestamp|
+----------+----------+----------+---------+
+----------+----------+----------+---------+



In [50]:
df3 = df2.groupby('route').count().orderBy('count', ascending=False).limit(30)

In [51]:
df3.show(100, truncate=False)



+---------------------+-----+
|route                |count|
+---------------------+-----+
|main                 |8177 |
|main-archive         |1111 |
|main-rabota          |1046 |
|main-internet        |896  |
|main-bonus           |868  |
|main-news            |768  |
|main-tariffs         |676  |
|main-online          |587  |
|main-vklad           |517  |
|main-rabota-archive  |170  |
|main-archive-rabota  |166  |
|main-bonus-archive   |142  |
|main-rabota-bonus    |139  |
|main-news-rabota     |135  |
|main-bonus-rabota    |134  |
|main-archive-internet|130  |
|main-rabota-news     |129  |
|main-internet-rabota |128  |
|main-archive-news    |126  |
|main-rabota-internet |124  |
|main-internet-archive|123  |
|main-archive-bonus   |117  |
|main-internet-bonus  |115  |
|main-tariffs-internet|113  |
|main-news-archive    |112  |
|main-news-internet   |109  |
|main-archive-tariffs |104  |
|main-tariffs-archive |103  |
|main-internet-news   |103  |
|main-rabota-main     |94   |
+---------

                                                                                

In [52]:
for row in df3.collect():
    print(row)
    break



Row(route='main', count=8177)


                                                                                

In [53]:
print(df3.limit(30).toPandas().to_string(index=False, header=False))



                 main 8177
         main-archive 1111
          main-rabota 1046
        main-internet  896
           main-bonus  868
            main-news  768
         main-tariffs  676
          main-online  587
           main-vklad  517
  main-rabota-archive  170
  main-archive-rabota  166
   main-bonus-archive  142
    main-rabota-bonus  139
     main-news-rabota  135
    main-bonus-rabota  134
main-archive-internet  130
     main-rabota-news  129
 main-internet-rabota  128
    main-archive-news  126
 main-rabota-internet  124
main-internet-archive  123
   main-archive-bonus  117
  main-internet-bonus  115
main-tariffs-internet  113
    main-news-archive  112
   main-news-internet  109
 main-archive-tariffs  104
   main-internet-news  103
 main-tariffs-archive  103
     main-rabota-main   94


                                                                                