In [None]:
import pandas as pd
from datetime import datetime
import numpy as np
import math
import functools

In [None]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [None]:
spark

In [None]:
clickstream_df = spark.read.options(delimiter='\t', header='True', inferSchema='True').csv('hdfs:///data/lsml/sga/clickstream.csv')

In [None]:
from pyspark.sql.functions import col, min, collect_list, count, desc, udf

In [None]:
def shape(df):
    return df.count(), len(df.columns)

In [None]:
# shape(clickstream_df)

In [None]:
clickstream_df.dtypes

[('user_id', 'int'),
 ('session_id', 'int'),
 ('event_type', 'string'),
 ('event_page', 'string'),
 ('timestamp', 'int')]

In [None]:
error_min_time_df = clickstream_df.filter(
        col('event_type').contains('error')).groupBy(
        'user_id', 'session_id').agg(
        min(clickstream_df.timestamp).alias('emt_timestamp'))

error_min_time_df = error_min_time_df.withColumnRenamed('user_id','emt_user_id').withColumnRenamed('session_id','emt_session_id')

In [None]:
# shape(error_min_time_df)

In [None]:
routes_df = clickstream_df.join(error_min_time_df, [
        clickstream_df.user_id == error_min_time_df.emt_user_id, 
        clickstream_df.session_id == error_min_time_df.emt_session_id], 
        'left_outer').filter(
        (col('event_type') == 'page') & \
        ((error_min_time_df.emt_timestamp.isNull()) | \
        (clickstream_df.timestamp <= error_min_time_df.emt_timestamp))).orderBy(clickstream_df.timestamp)

In [None]:
# shape(routes_df)

In [None]:
final_routes = routes_df.groupby(
    'user_id', 'session_id').agg(
    collect_list('event_page').alias('route')).groupBy(
    'route').agg(
    count('user_id').alias('count')).orderBy(
    desc('count')).withColumn('route', udf(lambda x: '-'.join(x))(col('route')))
# build_route_pyspark = udf(lambda x: '-'.join(x))
# final_routes.withColumn('route', build_route_pyspark(col('route'))).show(30)

In [None]:
# final_routes.show(30)

In [None]:
final_routes_pd = final_routes.toPandas()

In [None]:
final_routes_pd.head(30)

Unnamed: 0,route,count
0,main,39250
1,main-tariffs,6408
2,main-news,6152
3,main-archive,5739
4,main-family,4773
5,main-digital,4136
6,main-bonus,3427
7,main-tariffs-news,1154
8,main-news-tariffs,1093
9,main-tariffs-archive,997


In [None]:
final_routes_pd.head(30).to_csv('bharani_lsml_sga_spark_df.tsv', sep='\t', encoding='utf-8', header=False, index=False)

In [None]:
! cat bharani_lsml_sga_spark_df.tsv

main	39250
main-tariffs	6408
main-news	6152
main-archive	5739
main-family	4773
main-digital	4136
main-bonus	3427
main-tariffs-news	1154
main-news-tariffs	1093
main-tariffs-archive	997
main-archive-news	971
main-news-archive	967
main-archive-tariffs	954
main-tariffs-family	899
main-family-tariffs	895
main-news-family	893
main-family-news	852
main-archive-family	783
main-news-digital	761
main-family-archive	752
main-tariffs-main	727
main-digital-news	723
main-tariffs-digital	720
main-archive-digital	702
main-digital-tariffs	700
main-spravka	694
main-news-main	665
main-digital-archive	662
main-tariffs-bonus	649
main-archive-main	605


In [None]:
! curl -d "$(cat bharani_lsml_sga_spark_df.tsv)" hadoop2-00.yandex.ru:8008/sga/task_spark-df

Great job! Secret keyword is 'AwfulDavros'
