In [1]:
from pyspark import SparkContext, SparkConf
import os
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
import pandas as pd 

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [3]:
path = '/data/lsml/sga/clickstream.csv'
df = spark.read.option("delimiter", "\t").option("header", "true").csv(path)

In [4]:
df.take(5)

[Row(user_id=u'562', session_id=u'507', event_type=u'page', event_page=u'main', timestamp=u'1620494781'),
 Row(user_id=u'562', session_id=u'507', event_type=u'event', event_page=u'main', timestamp=u'1620494788'),
 Row(user_id=u'562', session_id=u'507', event_type=u'event', event_page=u'main', timestamp=u'1620494798'),
 Row(user_id=u'562', session_id=u'507', event_type=u'event', event_page=u'main', timestamp=u'1620494801'),
 Row(user_id=u'562', session_id=u'507', event_type=u'wNaxLlerrorU', event_page=u'main', timestamp=u'1620494808')]

In [11]:
from pyspark.sql import Row

def check_error(row):
    if 'ERROR' in str(row[2]).upper():
        return [row[0], row[1], u'page', u'None', row[4]]
    return [row[i] for i in range(5)]

In [12]:
df_err = df.rdd.map(lambda row: check_error(row))

In [13]:
df_err.take(8)

[[u'562', u'507', u'page', u'main', u'1620494781'],
 [u'562', u'507', u'event', u'main', u'1620494788'],
 [u'562', u'507', u'event', u'main', u'1620494798'],
 [u'562', u'507', u'event', u'main', u'1620494801'],
 [u'562', u'507', u'page', u'None', u'1620494808'],
 [u'562', u'507', u'event', u'main', u'1620494808'],
 [u'562', u'507', u'event', u'main', u'1620494808'],
 [u'562', u'507', u'event', u'main', u'1620494814']]

In [14]:
df3 = df_err.map(lambda row: [row[0] + ', '+ row[1] + ', '+ row[2], row[3]]).reduceByKey(lambda a, b: a + '-' + b)

In [15]:
df3.take(5)

[(u'146, 1055, page',
  u'main-bonus-news-bonus-main-family-news-digital-family-news-digital-news'),
 (u'934, 751, event',
  u'bonus-news-news-tariffs-family-family-family-family-family-archive-archive-archive-archive-tariffs-tariffs-tariffs-tariffs-tariffs-tariffs-news-news-news-digital-bonus-bonus-bonus-bonus-bonus-tariffs-tariffs-digital-digital-digital-tariffs-tariffs-spravka-spravka-tariffs-tariffs-archive-archive'),
 (u'442, 658, page',
  u'main-news-archive-spravka-family-bonus-digital-tariffs-bonus-news-tariffs-digital-news-digital'),
 (u'22, 1187, page', u'main-spravka'),
 (u'215, 2156, page', u'main-bonus-news-family-main-archive-bonus')]

In [16]:
def cut_route(row):
    head = ''
    route_list = []
    for i, v in enumerate(str(row[1]).split('-')):
        if v == 'None':
            break
        elif head != v:
            route_list.append(v)
        head = v
    return [row[0], '-'.join(route_list)]

In [17]:
df4 = df3.map(lambda row: cut_route(row))

In [18]:
df4.take(5)

[[u'146, 1055, page',
  'main-bonus-news-bonus-main-family-news-digital-family-news-digital-news'],
 [u'934, 751, event',
  'bonus-news-tariffs-family-archive-tariffs-news-digital-bonus-tariffs-digital-tariffs-spravka-tariffs-archive'],
 [u'442, 658, page',
  'main-news-archive-spravka-family-bonus-digital-tariffs-bonus-news-tariffs-digital-news-digital'],
 [u'22, 1187, page', 'main-spravka'],
 [u'215, 2156, page', 'main-bonus-news-family-main-archive-bonus']]

In [19]:
def count_page_route(row):
    if 'page' in str(row[0]):
        return [row[1], 1]
    else:
        return [row[1], 0]

In [20]:
df5 = df4.map(lambda row: count_page_route(row))

In [21]:
df5.take(10)

[['main-bonus-news-bonus-main-family-news-digital-family-news-digital-news',
  1],
 ['bonus-news-tariffs-family-archive-tariffs-news-digital-bonus-tariffs-digital-tariffs-spravka-tariffs-archive',
  0],
 ['main-news-archive-spravka-family-bonus-digital-tariffs-bonus-news-tariffs-digital-news-digital',
  1],
 ['main-spravka', 1],
 ['main-bonus-news-family-main-archive-bonus', 1],
 ['main-tariffs-bonus-archive-digital-archive-family-digital-bonus', 0],
 ['archive-news-family-news-tariffs-archive', 0],
 ['main', 1],
 ['family', 0],
 ['digital-tariffs', 0]]

In [22]:
df6 = df5.reduceByKey(lambda a, b: a + b).sortBy(lambda row: row[1], ascending=False)

In [23]:
df6.take(30)

[('main', 39756),
 ('main-tariffs', 6613),
 ('main-news', 6350),
 ('main-archive', 5928),
 ('main-family', 4914),
 ('main-digital', 4282),
 ('main-bonus', 3534),
 ('main-tariffs-news', 1202),
 ('main-news-tariffs', 1148),
 ('main-tariffs-archive', 1050),
 ('main-news-archive', 1026),
 ('main-archive-tariffs', 1011),
 ('main-archive-news', 1008),
 ('main-news-family', 931),
 ('main-family-tariffs', 929),
 ('main-tariffs-family', 923),
 ('main-family-news', 892),
 ('main-archive-family', 826),
 ('main-news-digital', 805),
 ('main-tariffs-main', 790),
 ('main-family-archive', 780),
 ('main-tariffs-digital', 760),
 ('main-digital-news', 756),
 ('main-digital-tariffs', 731),
 ('main-archive-digital', 725),
 ('main-spravka', 714),
 ('main-news-main', 701),
 ('main-digital-archive', 688),
 ('main-tariffs-bonus', 673),
 ('main-archive-main', 630)]