In [4]:
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession, HiveContext
from utils.spark_utils import insertHive
import uuid

sparkSession = (SparkSession
                    .builder
                    .appName('example-pyspark-read-and-write-from-hive')
                    .config("hive.metastore.uris", "thrift://localhost:9083", conf=SparkConf())
                    .enableHiveSupport()
                    .getOrCreate()
                    )

In [5]:
sparkSession

In [6]:
task_id = uuid.uuid1().__str__()
task_id

'619f3ee2-6a15-11eb-b181-acde48001122'

In [7]:
# get all the records from user visit action
user_action_rdd = sparkSession.sql("select * from user_visit_action_table").rdd

In [8]:
user_action_rdd.collect()

[Row(date='2021-01-27', user_id=69, session_id='a58034ae603b11ebb6eeacde48001122', page_id=70, action_time='2021-01-27 4:41:40', search_keyword=None, click_category_id=90, click_product_id=25, order_category_ids=None, order_product_ids=None, pay_category_ids=None, pay_product_ids=None, city_id=4),
 Row(date='2021-01-27', user_id=69, session_id='a58034ae603b11ebb6eeacde48001122', page_id=35, action_time='2021-01-27 4:09:59', search_keyword=None, click_category_id=-1, click_product_id=-1, order_category_ids=None, order_product_ids=None, pay_category_ids=78, pay_product_ids=88, city_id=2),
 Row(date='2021-01-27', user_id=69, session_id='a58034ae603b11ebb6eeacde48001122', page_id=29, action_time='2021-01-27 4:37:22', search_keyword=None, click_category_id=-1, click_product_id=-1, order_category_ids='1', order_product_ids='63', pay_category_ids=None, pay_product_ids=None, city_id=3),
 Row(date='2021-01-27', user_id=69, session_id='a58034ae603b11ebb6eeacde48001122', page_id=96, action_time='

In [9]:
def print_tuple(x):
    tuple_list = []
    for item in x[1]:
        tuple_list.append(item)
    print((x[0], tuple_list))

def print_rdd(x):
    print(x)


In [10]:
session_to_action_rdd = user_action_rdd.map(lambda x : (x.session_id, x))
session_to_action_group = session_to_action_rdd.groupByKey()

In [11]:
session_to_action_group.collect()

[('a580a84c603b11eb9ef0acde48001122',
  <pyspark.resultiterable.ResultIterable at 0x128d29e48>),
 ('a5818e94603b11eb9bf0acde48001122',
  <pyspark.resultiterable.ResultIterable at 0x128d29e10>),
 ('a5846a1a603b11eba7ecacde48001122',
  <pyspark.resultiterable.ResultIterable at 0x128d36eb8>),
 ('a5848b3a603b11ebbdfaacde48001122',
  <pyspark.resultiterable.ResultIterable at 0x128d365f8>),
 ('a5885ecc603b11eb8a19acde48001122',
  <pyspark.resultiterable.ResultIterable at 0x128d46a20>),
 ('a588b3b8603b11ebb00aacde48001122',
  <pyspark.resultiterable.ResultIterable at 0x128d642e8>),
 ('a553f092603b11eb997facde48001122',
  <pyspark.resultiterable.ResultIterable at 0x128d64908>),
 ('a5555568603b11eba45aacde48001122',
  <pyspark.resultiterable.ResultIterable at 0x128d29e80>),
 ('a55739f0603b11ebbd5bacde48001122',
  <pyspark.resultiterable.ResultIterable at 0x128d8a048>),
 ('a55770a8603b11eba1b3acde48001122',
  <pyspark.resultiterable.ResultIterable at 0x128d8aa20>),
 ('a557855e603b11eba8d0acde480

In [12]:
import datetime
def time_earlier(time1, time2):
    # time1 = '2021-01-27 4:34:30'
    # time2 = '2021-01-27 4:35:06'

    time1 = get_time_tuple(time1)
    time2 = get_time_tuple(time2)

    time1 = datetime.datetime(int(time1[0]), int(time1[1]), int(time1[2]), int(time1[3]), int(time1[4]), int(time1[5]))
    time2 = datetime.datetime(int(time2[0]), int(time2[1]), int(time2[2]), int(time2[3]), int(time2[4]), int(time2[5]))

    return time1 < time2

def get_time_tuple(time):
    time_date =  time.split(' ')[0].split('-')
    time_hour_to_second = time.split(' ')[1].split(':')

    return (time_date[0], time_date[1], time_date[2], time_hour_to_second[0], time_hour_to_second[1], time_hour_to_second[2])

def get_datetime_minus_second(time1, time2):
    d1 = datetime.datetime.strptime(time1, '%Y-%m-%d %H:%M:%S')
    d2 = datetime.datetime.strptime(time2, '%Y-%m-%d %H:%M:%S')

    delta = d2 - d1
    return delta.seconds

In [13]:
"""
1.26: all the date stuff is still pending
"""
def get_userid_to_info(x):
    user_id = -1
    start_time = None
    end_time = None
    step = 0
    search_keywords = ''
    click_categories = ''
    session_id, action_infos = x
    aggr_info = ''
    
    for action in action_infos:
        
        user_id = action.user_id if user_id == -1 else user_id
        
        step = step + 1
        
        # if search kw not in and not none
        search_keywords = search_keywords + action.search_keyword + ',' if action.search_keyword != None and action.search_keyword not in search_keywords else search_keywords
        
        click_categories = click_categories + str(action.click_category_id) + ',' if action.click_category_id != -1 and str(action.click_category_id) not in click_categories else click_categories
        
        # deal with time
        start_time = action.action_time if start_time == None or time_earlier(action.action_time, start_time) else start_time
        end_time = action.action_time if end_time == None or time_earlier(end_time, action.action_time) else end_time
    
    # trim the last ',' of click_categories id and search keywords
    search_keywords = search_keywords[0: len(search_keywords) - 1] if search_keywords.endswith(',') else search_keywords
    click_categories = click_categories[0 : len(click_categories) - 1] if click_categories.endswith(',') else click_categories
    
    visit_length = get_datetime_minus_second(start_time, end_time)
    aggr_info = 'session_id=' + session_id + '|search_keywords=' + search_keywords + '|click_categories=' + click_categories + '|visit_length=' + str(visit_length) + '|step_length=' + str(step) + '|start_time=' + start_time
    return (user_id, aggr_info)

In [14]:
"""
to get a aggregate info
cause we need to join the table with user info table, so the key will be changed to userid
(userid : session_id | search keywords | click categories | visit length | step length | start time)
"""
user_id_to_aggr_info_rdd = session_to_action_group.map(lambda x : get_userid_to_info(x))

In [15]:
user_id_to_aggr_info_rdd.collect()

[(6,
  'session_id=a580a84c603b11eb9ef0acde48001122|search_keywords=Facial Lotions,Lenovo Laptops,Mugs,Machine Learning,Apple,Napkins,Vacuum|click_categories=21,35,66,58,71,98,24,52,27,0,16|visit_length=3373|step_length=38|start_time=2021-01-27 19:03:37'),
 (14,
  'session_id=a5818e94603b11eb9bf0acde48001122|search_keywords=Mugs,Machine Learning,Apple,Vacuum,Facial Lotions,Huawei Cell Phone,Lenovo Laptops,Lamer,Lobsters|click_categories=66,10,72,58,43,30,56,13,29|visit_length=3438|step_length=56|start_time=2021-01-27 7:00:44'),
 (13,
  'session_id=a5846a1a603b11eba7ecacde48001122|search_keywords=Huawei Cell Phone,Mugs,Facial Lotions,Napkins,Machine Learning,Lenovo Laptops,Apple,Lamer,Vacuum,Lobsters|click_categories=31,9,92,83,72,16,82,25,48,77,24,33,58,18,23,99,100,21,37,54,19,98,88|visit_length=3468|step_length=95|start_time=2021-01-27 0:00:11'),
 (13,
  'session_id=a5848b3a603b11ebbdfaacde48001122|search_keywords=Machine Learning,Lenovo Laptops,Mugs,Huawei Cell Phone,Vacuum,Apple,La

In [16]:
"""
to get the user info here, need to join the user table
"""
user_info_rdd = sparkSession.sql("select * from user_info_table").rdd

In [17]:
user_info_rdd.collect()

[Row(user_id=90, username='user90', name='name90', age=30, professional='professional37', city='city29', sex='Female'),
 Row(user_id=91, username='user91', name='name91', age=48, professional='professional56', city='city39', sex='Male'),
 Row(user_id=92, username='user92', name='name92', age=31, professional='professional90', city='city77', sex='Female'),
 Row(user_id=93, username='user93', name='name93', age=16, professional='professional23', city='city28', sex='Male'),
 Row(user_id=94, username='user94', name='name94', age=15, professional='professional100', city='city93', sex='Female'),
 Row(user_id=95, username='user95', name='name95', age=21, professional='professional15', city='city85', sex='Male'),
 Row(user_id=96, username='user96', name='name96', age=47, professional='professional71', city='city10', sex='Female'),
 Row(user_id=97, username='user97', name='name97', age=20, professional='professional2', city='city66', sex='Female'),
 Row(user_id=98, username='user98', name='name

In [18]:
user_id_to_user_info_rdd = user_info_rdd.map(lambda x : (x.user_id, x))

In [19]:
user_id_to_user_info_rdd.collect()

[(90,
  Row(user_id=90, username='user90', name='name90', age=30, professional='professional37', city='city29', sex='Female')),
 (91,
  Row(user_id=91, username='user91', name='name91', age=48, professional='professional56', city='city39', sex='Male')),
 (92,
  Row(user_id=92, username='user92', name='name92', age=31, professional='professional90', city='city77', sex='Female')),
 (93,
  Row(user_id=93, username='user93', name='name93', age=16, professional='professional23', city='city28', sex='Male')),
 (94,
  Row(user_id=94, username='user94', name='name94', age=15, professional='professional100', city='city93', sex='Female')),
 (95,
  Row(user_id=95, username='user95', name='name95', age=21, professional='professional15', city='city85', sex='Male')),
 (96,
  Row(user_id=96, username='user96', name='name96', age=47, professional='professional71', city='city10', sex='Female')),
 (97,
  Row(user_id=97, username='user97', name='name97', age=20, professional='professional2', city='city66'

In [20]:
def get_session_id_from_aggr_info_string(x):
    return x.split('|')[0].split('=')[1]

def aggr_user_and_session(x):
    session_info, user_info = x[1]
    
    age = user_info.age
    professional = user_info.professional
    sex = user_info.sex
    city = user_info.city
    
    full_info = session_info + '|age=' + str(age) + '|professional=' + professional + '|sex=' + sex + '|city=' + city
    session_id = get_session_id_from_aggr_info_string(session_info)
    return (session_id, full_info)

In [21]:
# now join
# user_id_to_user_info_rdd.join(user_id_to_aggr_info_rdd).collect()
session_id_to_aggr_info_and_user_info_rdd = user_id_to_aggr_info_rdd.join(user_id_to_user_info_rdd).map(lambda x : aggr_user_and_session(x))

In [22]:
session_id_to_aggr_info_and_user_info_rdd.collect()

[('a5e95ed4603b11eb83a9acde48001122',
  'session_id=a5e95ed4603b11eb83a9acde48001122|search_keywords=Lobsters,Huawei Cell Phone,Lamer|click_categories=86,28,66,11,85|visit_length=3496|step_length=20|start_time=2021-01-27 21:00:28|age=25|professional=professional50|sex=Female|city=city75'),
 ('a550186e603b11ebb8b2acde48001122',
  'session_id=a550186e603b11ebb8b2acde48001122|search_keywords=Mugs,Lobsters,Vacuum,Facial Lotions,Apple,Lamer,Lenovo Laptops,Napkins,Huawei Cell Phone,Machine Learning|click_categories=81,15,29,18,85,98,55,72,48,44,59,0,27,53,26,100,34,77|visit_length=3530|step_length=84|start_time=2021-01-27 2:00:51|age=25|professional=professional50|sex=Female|city=city75'),
 ('a54f37b4603b11eb82cdacde48001122',
  'session_id=a54f37b4603b11eb82cdacde48001122|search_keywords=Apple,Lamer,Lenovo Laptops,Machine Learning,Lobsters|click_categories=60,86,54,18,91,30,29,67,78|visit_length=3088|step_length=21|start_time=2021-01-27 19:07:14|age=25|professional=professional50|sex=Female

In [23]:
from properties import filter_config
from utils.string_utils import get_value_from_session_user_aggr_info
from op_func.requirement1 import session_accumulator_param
import json

def get_acc_visit_length_key(visit_length):
    visit_range = [i * 300 for i in range(25)]
    if visit_length > visit_range[len(visit_range) - 1]:
        return 'visit_length > 7200'
    else:
        for i in range(len(visit_range)):
            if visit_length <= visit_range[i]:
                return 'visit_' + str(visit_range[i - 1]) + '_' + str(visit_range[i])

def get_acc_step_length_key(step_length):
    
    step_range = [i * 5 for i in range(41)]
    if step_length > step_range[len(step_range) - 1]:
        return 'step_length > 200'
    else:
        for i in range(len(step_range)):
            if step_length <= step_range[i]:
                return 'step_' + str(step_range[i - 1]) + '_' +  str(step_range[i])
            
# so far just take the age
def filter_rdd_by_param(x, accumulator):
    param = filter_config.filter_param
    age = get_value_from_session_user_aggr_info('age', x[1])
    visit_length = int(get_value_from_session_user_aggr_info('visit_length', x[1]))
    step_length = int(get_value_from_session_user_aggr_info('step_length', x[1]))
    param = filter_config.filter_param
    
    visit_length_key = get_acc_visit_length_key(visit_length)
    step_length_key = get_acc_step_length_key(step_length)
    
    # modify accumulator
    if int(age) >= param['start_age'] and int(age) <= param['end_age']:
        accumulator.add('sum_visit')
        accumulator.add('sum_step')
        accumulator.add(visit_length_key)
        accumulator.add(step_length_key)
        
    return int(age) >= param['start_age'] and int(age) <= param['end_age']
    
def get_initialed_count_accumulator():
    acc = sc.accumulator('initial', session_accumulator_param())
    acc.value = json.dumps({}, sort_keys=True)
    return acc
    


In [24]:
acc = get_initialed_count_accumulator()
session_filter_rdd = session_id_to_aggr_info_and_user_info_rdd.filter(lambda x : filter_rdd_by_param(x, acc))

In [25]:
session_filter_rdd.collect()

[('a57406b6603b11eb8e9facde48001122',
  'session_id=a57406b6603b11eb8e9facde48001122|search_keywords=Lamer,Lobsters,Facial Lotions,Vacuum,Mugs,Apple,Napkins,Machine Learning,Lenovo Laptops|click_categories=40,53,7,34,49,25,30,24,23,36,12,90,46,15,20,18,35|visit_length=3586|step_length=77|start_time=2021-01-27 13:00:07|age=32|professional=professional19|sex=Female|city=city28'),
 ('a5742178603b11eb8204acde48001122',
  'session_id=a5742178603b11eb8204acde48001122|search_keywords=Apple,Lenovo Laptops,Mugs,Facial Lotions,Vacuum|click_categories=51,95,34,48,7,80,88,19|visit_length=3234|step_length=33|start_time=2021-01-27 3:01:21|age=32|professional=professional19|sex=Female|city=city28'),
 ('a574fa80603b11ebb8bbacde48001122',
  'session_id=a574fa80603b11ebb8bbacde48001122|search_keywords=Facial Lotions,Lamer,Machine Learning,Vacuum,Mugs,Apple|click_categories=35,73,13,2,61,75,29,37,33,63|visit_length=3243|step_length=46|start_time=2021-01-27 1:00:04|age=32|professional=professional19|sex=F

In [26]:
# session_filter_rdd.count()

In [27]:
res = json.loads(acc.value)

In [28]:
res

{'{"step_15_20": 1, "step_20_25": 1, "step_25_30": 3, "step_30_35": 1, "step_35_40": 1, "step_40_45": 2, "step_45_50": 4, "step_55_60": 1, "step_60_65": 2, "step_65_70": 2, "step_70_75": 3, "step_75_80": 1, "step_80_85": 1, "step_85_90": 1, "step_90_95": 4, "step_95_100": 2, "sum_step": 30, "sum_visit": 30, "visit_3000_3300": 3, "visit_3300_3600": 27}': 1,
 '{"step_15_20": 1, "step_20_25": 2, "step_35_40": 1, "step_40_45": 1, "step_50_55": 2, "step_65_70": 2, "step_95_100": 1, "sum_step": 10, "sum_visit": 10, "visit_3000_3300": 2, "visit_3300_3600": 8}': 1,
 '{"step_15_20": 1, "step_20_25": 4, "step_35_40": 1, "step_40_45": 3, "step_45_50": 1, "step_50_55": 1, "step_55_60": 2, "step_60_65": 1, "step_65_70": 2, "step_75_80": 4, "step_80_85": 1, "step_85_90": 5, "step_90_95": 4, "sum_step": 30, "sum_visit": 30, "visit_2700_3000": 1, "visit_3000_3300": 4, "visit_3300_3600": 25}': 1,
 '{"step_15_20": 2, "step_20_25": 4, "step_25_30": 1, "step_35_40": 1, "step_40_45": 2, "step_45_50": 1, "s

In [29]:
# count the sum ratio
sum_up_count = {}
for item in res.items():
    sub_res = json.loads(item[0])
    
    for sub_item in sub_res.items():
        if sub_item[0] not in sum_up_count.keys():
            sum_up_count[sub_item[0]] = 0
        sum_up_count[sub_item[0]] = sum_up_count[sub_item[0]] + sub_item[1] * item[1]
#     if 'sum_step' in sub_res.keys():
#         count = count + sub_res['sum_step'] * item[1]
#     print(sub_res)

sum_up_count

{'step_15_20': 8,
 'step_20_25': 35,
 'step_25_30': 30,
 'step_30_35': 28,
 'step_35_40': 26,
 'step_40_45': 33,
 'step_45_50': 30,
 'step_55_60': 23,
 'step_60_65': 26,
 'step_65_70': 31,
 'step_70_75': 25,
 'step_75_80': 22,
 'step_80_85': 25,
 'step_85_90': 23,
 'step_90_95': 42,
 'step_95_100': 35,
 'sum_step': 480,
 'sum_visit': 480,
 'visit_3000_3300': 47,
 'visit_3300_3600': 426,
 'step_50_55': 38,
 'visit_2700_3000': 6,
 'visit_2400_2700': 1}

In [30]:
# cache the data, to avoid acc executed again, this is recommended
def calculate_ratio_for_steps_visit(key, dataset, step_or_visit):
    if key not in dataset.keys():
        return 0
    sum_key = 'sum_' + step_or_visit
    return dataset[key] / dataset[sum_key]


In [31]:
from utils.data_model import visit_length_ratio, step_length_ratio

step_ratio = step_length_ratio(task_id,
                               sum_up_count['sum_step'],
        calculate_ratio_for_steps_visit('step_0_5', sum_up_count, 'step'),
                               calculate_ratio_for_steps_visit('step_5_10', sum_up_count, 'step'),
                               calculate_ratio_for_steps_visit('step_10_15', sum_up_count, 'step'),
                               calculate_ratio_for_steps_visit('step_15_20', sum_up_count, 'step'),
                              calculate_ratio_for_steps_visit('step_20_25', sum_up_count, 'step'),
                              calculate_ratio_for_steps_visit('step_25_30', sum_up_count, 'step'),
                              calculate_ratio_for_steps_visit('step_30_35', sum_up_count, 'step'),
                              calculate_ratio_for_steps_visit('step_35_40', sum_up_count, 'step'),
                              calculate_ratio_for_steps_visit('step_40_45', sum_up_count, 'step'),
                              calculate_ratio_for_steps_visit('step_45_50', sum_up_count, 'step'),
                              calculate_ratio_for_steps_visit('step_50_55', sum_up_count, 'step'),
                              calculate_ratio_for_steps_visit('step_55_60', sum_up_count, 'step'),
                              calculate_ratio_for_steps_visit('step_60_65', sum_up_count, 'step'),
                              calculate_ratio_for_steps_visit('step_65_70', sum_up_count, 'step'),
                              calculate_ratio_for_steps_visit('step_70_75', sum_up_count, 'step'),
                              calculate_ratio_for_steps_visit('step_75_80', sum_up_count, 'step'),
                              calculate_ratio_for_steps_visit('step_80_85', sum_up_count, 'step'),
                              calculate_ratio_for_steps_visit('step_85_90', sum_up_count, 'step'),
                              calculate_ratio_for_steps_visit('step_90_95', sum_up_count, 'step'),
                              calculate_ratio_for_steps_visit('step_95_100', sum_up_count, 'step'))

visit_ratio = visit_length_ratio(task_id,
                                 sum_up_count['sum_visit'],
    calculate_ratio_for_steps_visit('visit_0_300', sum_up_count, 'visit'),
                                calculate_ratio_for_steps_visit('visit_300_600', sum_up_count, 'visit'),
                                calculate_ratio_for_steps_visit('visit_600_900', sum_up_count, 'visit'),
                                calculate_ratio_for_steps_visit('visit_900_1200', sum_up_count, 'visit'),
                                calculate_ratio_for_steps_visit('visit_1200_1500', sum_up_count, 'visit'),
                                calculate_ratio_for_steps_visit('visit_1500_1800', sum_up_count, 'visit'),
                                calculate_ratio_for_steps_visit('visit_1800_2100', sum_up_count, 'visit'),
                                calculate_ratio_for_steps_visit('visit_2100_2400', sum_up_count, 'visit'),
                                calculate_ratio_for_steps_visit('visit_2400_2700', sum_up_count, 'visit'),
                                calculate_ratio_for_steps_visit('visit_2700_3000', sum_up_count, 'visit'),
                                calculate_ratio_for_steps_visit('visit_3000_3300', sum_up_count, 'visit'),
                                calculate_ratio_for_steps_visit('visit_3300_3600', sum_up_count, 'visit'))

visit_ratio

visit_length_ratio(task_id='619f3ee2-6a15-11eb-b181-acde48001122', session_count=480, visit_0_300=0, visit_300_600=0, visit_600_900=0, visit_900_1200=0, visit_1200_1500=0, visit_1500_1800=0, visit_1800_2100=0, visit_2100_2400=0, visit_2400_2700=0.0020833333333333333, visit_2700_3000=0.0125, visit_3000_3300=0.09791666666666667, visit_3300_3600=0.8875)

In [32]:
# write into database
step_ratio_df = sparkSession.createDataFrame([step_ratio])
visit_ratio_df = sparkSession.createDataFrame([visit_ratio])
visit_ratio_df, step_ratio_df

(DataFrame[task_id: string, session_count: bigint, visit_0_300: bigint, visit_300_600: bigint, visit_600_900: bigint, visit_900_1200: bigint, visit_1200_1500: bigint, visit_1500_1800: bigint, visit_1800_2100: bigint, visit_2100_2400: bigint, visit_2400_2700: double, visit_2700_3000: double, visit_3000_3300: double, visit_3300_3600: double],
 DataFrame[task_id: string, session_count: bigint, step_0_5: bigint, step_5_10: bigint, step_10_15: bigint, step_15_20: double, step_20_25: double, step_25_30: double, step_30_35: double, step_35_40: double, step_40_45: double, step_45_50: double, step_50_55: double, step_55_60: double, step_60_65: double, step_65_70: double, step_70_75: double, step_75_80: double, step_80_85: double, step_85_90: double, step_90_95: double, step_95_100: double])

In [33]:
# write into database
step_ratio_df.write.mode('append').format('hive').saveAsTable('step_count_ratio')

In [34]:
visit_ratio_df.write.mode('append').format('hive').saveAsTable('visit_length_ratio')

In [35]:
# see if records exist
step_ratio_result = sparkSession.sql("select * from step_count_ratio")
visit_ratio_result = sparkSession.sql("select * from visit_length_ratio")

step_ratio_result.show(), visit_ratio_result.show()

+--------------------+-------------+--------+---------+----------+--------------------+-------------------+----------+--------------------+-------------------+----------+----------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+--------------------+-------------------+----------+-------------------+
|             task_id|session_count|step_0_5|step_5_10|step_10_15|          step_15_20|         step_20_25|step_25_30|          step_30_35|         step_35_40|step_40_45|step_45_50|         step_50_55|         step_55_60|         step_60_65|         step_65_70|          step_70_75|         step_75_80|          step_80_85|         step_85_90|step_90_95|        step_95_100|
+--------------------+-------------+--------+---------+----------+--------------------+-------------------+----------+--------------------+-------------------+----------+----------+-------------------+-------------------+-------------------+---------

(None, None)