# Dataframe

In [1]:
import pandas as pd
from datetime import datetime
import numpy as np
import math
import functools
import pyspark.sql.functions as pyf

In [2]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [3]:
clickstream = spark.read.options(delimiter='\t', 
                                    header='True', 
                                    inferSchema='True').csv('hdfs:/data/lsml/sga/clickstream.csv')

In [4]:
errors = clickstream[clickstream.event_type.contains("error")]

In [5]:
# there can be more than 1 error for each session. 
# We can reduce the amount of errors as we don't need to consider others after the first error
error_1 = errors.groupBy("user_id", "session_id") \
                .agg(pyf.min(errors.timestamp).alias("error_1")) \
                .withColumnRenamed('user_id','user_id_err') \
                .withColumnRenamed('session_id','session_id_err')

In [6]:
errors_joined = clickstream.join(error_1, 
                                 [clickstream.user_id == error_1.user_id_err,
                                 clickstream.session_id == error_1.session_id_err], how='leftouter')

In [7]:
filtered_errors = errors_joined.filter((errors_joined.error_1.isNull()) |
                                          (errors_joined.timestamp <= errors_joined.error_1))

## Notice board:

* the correct route can be taken by grouping the data by the event_type when it is 'page'. 
* user is anyway needs to access the page (event_type) and after that the status can be different as 'event' or 'error'

### So to achieve the result:
* we need to group by the event_type == 'page' 
* collect the list of routes 
* use concat_ws to join them.

In [8]:
filtered_errors = filtered_errors[filtered_errors.event_type == 'page']

In [9]:
routes = filtered_errors.groupby(
    'user_id', 'session_id').agg( pyf.collect_list('event_page').alias('route')) \
     .groupBy('route') \
       .agg(
           pyf.count('user_id').alias('count')).orderBy(pyf.desc('count'))

In [10]:
final_routes = routes.withColumn('route', pyf.concat_ws("-", routes['route']))

In [11]:
final_routes.toPandas().head(30).to_csv('niiaz_lsml_dataframe.tsv', sep='\t', encoding='utf-8', header=False, index=False)

In [36]:
! curl -d "$(cat niiaz_lsml_dataframe.tsv)" hadoop2-00.yandex.ru:8008/sga/task_spark-df

Great job! Secret keyword is 'AwfulDavros'


# Hive. SQL

In [13]:
clickstream.createOrReplaceTempView('clickstream')

In [14]:
error_1 = spark.sql("select user_id, session_id, \
min(timestamp) error_1 from clickstream where instr(lower(event_type), 'error') != 0 group by user_id, session_id")

In [15]:
error_1.createOrReplaceTempView('error_1')

In [16]:
filtered_data = spark.sql ("select cl.* from clickstream cl left join error_1 err1 on \
cl.user_id = err1.user_id and cl.session_id = err1.session_id \
where (err1.error_1 is null or cl.timestamp <= err1.error_1) and event_type == 'page'")

In [17]:
filtered_data.createOrReplaceTempView('filtered_data')

In [18]:
final_routes = spark.sql("select route, count(*) as total from \
                (select user_id, session_id, concat_ws('-', collect_list(event_page)) as route \
                        from filtered_data group by user_id, session_id)\
                            group by route order by total desc limit 30")

In [19]:
final_routes.show()

+--------------------+-----+
|               route|total|
+--------------------+-----+
|                main|39250|
|        main-tariffs| 6524|
|           main-news| 6264|
|        main-archive| 5841|
|         main-family| 4849|
|        main-digital| 4211|
|          main-bonus| 3489|
|   main-tariffs-news| 1185|
|   main-news-tariffs| 1130|
|main-tariffs-archive| 1037|
|   main-news-archive|  998|
|   main-archive-news|  992|
|main-archive-tariffs|  990|
| main-family-tariffs|  921|
|    main-news-family|  916|
| main-tariffs-family|  913|
|    main-family-news|  874|
| main-archive-family|  814|
|   main-news-digital|  793|
| main-family-archive|  769|
+--------------------+-----+
only showing top 20 rows



In [20]:
final_routes.toPandas().head(30).to_csv('niiaz_lsml_sql.tsv', sep='\t', encoding='utf-8', header=False, index=False)

In [21]:
! curl -d "$(cat niiaz_lsml_sql.tsv)" hadoop2-00.yandex.ru:8008/sga/task_hive

Great job! Secret keyword is 'HiveMind'


# RDD

In [22]:
clickstream = spark.read.option("delimiter", "\t")\
                                    .option("header", "true")\
                                    .csv('/data/lsml/sga/clickstream.csv')

In [23]:
error_1 = clickstream.rdd.map(lambda x: [x[0], x[1], x[2], 'error', x[4]] 
                              if 'error' in str(x[2]).lower() 
                              else [x[0], x[1], x[2], x[3], x[4]])

In [24]:
err = error_1.filter(lambda x: x[2] == 'page' or "error" in x[2].lower())

In [25]:
err2 = err.map(lambda x: [(x[0], x[1]), [x[3]]])

In [26]:
err2.take(5)

[[('562', '507'), ['main']],
 [('562', '507'), ['error']],
 [('562', '507'), ['family']],
 [('562', '507'), ['main']],
 [('562', '507'), ['news']]]

In [27]:
err3 = err2.reduceByKey(lambda x, y: x + y)

In [28]:
err3.take(5)

[(('951', '1235'), ['main', 'bonus', 'main']),
 (('478', '1888'), ['main', 'archive']),
 (('900', '805'), ['main', 'tariffs', 'news', 'main']),
 (('129', '1124'), ['main', 'news', 'family', 'spravka', 'archive', 'bonus']),
 (('717', '1096'),
  ['main',
   'tariffs',
   'news',
   'archive',
   'bonus',
   'archive',
   'news',
   'archive',
   'family',
   'bonus',
   'archive'])]

In [29]:
def route(line):
    route = []
    if len(line) == 1:
        return line[0]
    route.append(line[0])
    for word in range(1,len(line)):
        if line[word] != 'error':
            if line[word] == route[-1]:
                continue
            route.append(line[word])
        else:
            break
    return "-".join(route)

In [30]:
err4 = err3.map(lambda x: (route(x[1]), 1))

In [31]:
routes = err4.reduceByKey(lambda x,y: x + y).sortBy(lambda x: x[1], ascending=False).take(30)
routes

[('main', 39623),
 ('main-tariffs', 6583),
 ('main-news', 6330),
 ('main-archive', 5907),
 ('main-family', 4883),
 ('main-digital', 4252),
 ('main-bonus', 3519),
 ('main-tariffs-news', 1195),
 ('main-news-tariffs', 1149),
 ('main-tariffs-archive', 1039),
 ('main-news-archive', 1018),
 ('main-archive-news', 1003),
 ('main-archive-tariffs', 1001),
 ('main-family-tariffs', 922),
 ('main-news-family', 921),
 ('main-tariffs-family', 911),
 ('main-family-news', 876),
 ('main-archive-family', 819),
 ('main-news-digital', 793),
 ('main-tariffs-main', 786),
 ('main-family-archive', 774),
 ('main-digital-news', 757),
 ('main-tariffs-digital', 756),
 ('main-digital-tariffs', 731),
 ('main-archive-digital', 719),
 ('main-spravka', 709),
 ('main-news-main', 696),
 ('main-digital-archive', 684),
 ('main-tariffs-bonus', 670),
 ('main-archive-main', 623)]

In [32]:
final_routes = ''

In [33]:
for i in routes:
    final_routes += '%s\t%s\n' % (i[0], i[1])

with open('niiaz_lsml_RDD.tsv', 'w') as f:
    f.write(final_routes[:-1])

In [35]:
! curl -d "$(cat niiaz_lsml_RDD.tsv)" hadoop2-00.yandex.ru:8008/sga/task_spark-rdd

Great job! Secret keyword is 'TheSilenceOfPandora'
