In [1]:
import utils
import numpy as np
import pandas as pd

In [2]:
def is_in_notebook():
    import sys
    return 'ipykernel' in sys.modules
def clear_output():
    """
    clear output for both jupyter notebook and the console
    """
    import os
    os.system('cls' if os.name == 'nt' else 'clear')
    if is_in_notebook():
        from IPython.display import clear_output as clear
        clear()

In [3]:
data_sql = utils.load_sql_file("./sql/xiefei.sql")
data_sql

"select\n    order_id,\n    bubble_trace_id,\n    concat_ws(',', collect_set(product_category)) as product_set,\n    count(distinct product_category) as pl_num\nfrom\n    (\n        select\n            order_id,\n            bubble_trace_id,\n            case\n                when sub_product_line = 9 then 'Luxury'\n                when sub_product_line in (1, 6)\n                and require_level in (100) then 'Premium'\n                when sub_product_line in (1, 6)\n                and require_level in (400) then 'CommercialPremium'\n                when sub_product_line in(3, 7, 314)\n                and require_level in (600)\n                and combo_type in (4)\n                and carpool_type in (1, 2) then 'Carpool'\n                when sub_product_line in (20, 99) then 'YouXiang'\n                when sub_product_line in (30) then 'APlusFlash'\n                when sub_product_line in (3, 7, 314)\n                and require_level in (600)\n                and combo_type 

In [4]:
data = spark.sql(data_sql)
data.cache()
data.head(5)

[Row(order_id=17705223126851, bubble_trace_id=u'c0a801055fc44dfb2cc15f4610100403', product_set=u'Other,SpFlash', pl_num=2),
 Row(order_id=17705257091193, bubble_trace_id=u'6446aa145fc47b70b86f1a53549bcb02', product_set=u'APlusFlash,Flash,Unione', pl_num=3),
 Row(order_id=17705361600391, bubble_trace_id=u'c0a803335fc505121906470f10810303', product_set=u'Premium', pl_num=1),
 Row(order_id=17705362545380, bubble_trace_id=u'0aa44a125fc505ed00001be2d74ab795', product_set=u'Flash,YouXiang', pl_num=2),
 Row(order_id=17705362728264, bubble_trace_id=u'6f01c10c5fc507ba031f171e10bd0303', product_set=u'Unione', pl_num=1)]

In [11]:
x = data.head(1)
x

[Row(order_id=17705223126851, bubble_trace_id=u'c0a801055fc44dfb2cc15f4610100403', product_set=u'Other,SpFlash', pl_num=2)]

In [8]:
data.columns

['order_id', 'bubble_trace_id', 'product_set', 'pl_num']

In [17]:
def map_func(x):
    k, vv = x
    trace_id = k
    rst_key = ""
    for v in vv:
        product_set = sorted(v["product_set"].split(","))
        print(product_set)
        rst_key = ",".join([ x for x in product_set])
        print(rst_key)
    return [(rst_key, [1])]

In [18]:
map_func(("0003ff985fcb9c2f000052546db20735", x))

[u'Other', u'SpFlash']
Other,SpFlash


[(u'Other,SpFlash', [1])]

In [19]:
def reduce_func(v1, v2):
    print("RUN reduce_func")
    return list(map(sum, zip(v1, v2))) 

In [20]:
reduce_func(
    [1],
    [1]
)

RUN reduce_func


[2]

In [21]:
rst = data.rdd \
    .map(lambda x: (x["bubble_trace_id"], x)) \
    .groupByKey() \
    .flatMap(lambda x: map_func(x)) \
    .reduceByKey(reduce_func) \
    .collect()
rst

[(u'CommercialPremium,Luxury,Premium,Unione', [155]),
 (u'Carpool,Luxury,Premium,SpFlash,Unione,YouXiang', [197]),
 (u'APlusFlash,Carpool,CommercialPremium,Flash,Other,SpFlash,Unione,YouXiang',
  [1]),
 (u'CommercialPremium,Flash,Other,Premium,Unione,YouXiang', [1849]),
 (u'CommercialPremium,Flash,Other,Premium,SpFlash,Unione', [287]),
 (u'APlusFlash,Flash,Premium,YouXiang', [79073]),
 (u'APlusFlash,Carpool,CommercialPremium,Flash,Luxury,Other,Premium,SpFlash,YouXiang',
  [1]),
 (u'CommercialPremium,Flash,Other,YouXiang', [250]),
 (u'APlusFlash,Carpool,CommercialPremium,SpFlash', [35]),
 (u'APlusFlash,Carpool,Luxury,YouXiang', [6]),
 (u'APlusFlash,Carpool,CommercialPremium,Flash,Other,Premium,SpFlash,Unione',
  [27]),
 (u'APlusFlash,Carpool,CommercialPremium,Flash,Luxury,Other,Premium,Unione,YouXiang',
  [1]),
 (u'Carpool,CommercialPremium,Luxury,Premium,SpFlash', [6]),
 (u'Carpool,Flash,Luxury,Other,SpFlash,Unione', [4]),
 (u'APlusFlash,Flash,Luxury,Premium,Unione,YouXiang', [154]),
 

In [22]:
len(rst)

867

In [23]:
def to_pandas(data):
    rst = pd.DataFrame([v for v in data])
    rst.columns = [
        "product_set",
        "call_num"
    ]
    return rst

In [24]:
rst_pd = to_pandas(rst)

In [25]:
rst_pd

Unnamed: 0,product_set,call_num
0,"CommercialPremium,Luxury,Premium,Unione",[155]
1,"Carpool,Luxury,Premium,SpFlash,Unione,YouXiang",[197]
2,"APlusFlash,Carpool,CommercialPremium,Flash,Oth...",[1]
3,"CommercialPremium,Flash,Other,Premium,Unione,Y...",[1849]
4,"CommercialPremium,Flash,Other,Premium,SpFlash,...",[287]
5,"APlusFlash,Flash,Premium,YouXiang",[79073]
6,"APlusFlash,Carpool,CommercialPremium,Flash,Lux...",[1]
7,"CommercialPremium,Flash,Other,YouXiang",[250]
8,"APlusFlash,Carpool,CommercialPremium,SpFlash",[35]
9,"APlusFlash,Carpool,Luxury,YouXiang",[6]


In [27]:
call_num_list = []
for i in range(len(rst_pd["call_num"].values)):
    call_num_list.append(rst_pd["call_num"].values[i][0])
call_num_list

[155,
 197,
 1,
 1849,
 287,
 79073,
 1,
 250,
 35,
 6,
 27,
 1,
 6,
 4,
 154,
 158,
 23596,
 144,
 59162,
 32,
 47493,
 1,
 62,
 12934641,
 1179,
 3716,
 2,
 175,
 359,
 242,
 437,
 2,
 3491,
 390,
 4,
 26,
 27,
 28,
 101,
 2528,
 10,
 105,
 1,
 10,
 22,
 15,
 6,
 1643,
 85529,
 65564,
 86,
 396409,
 146592,
 161,
 1,
 5,
 93060,
 118,
 2683,
 11,
 2,
 253422,
 41,
 14,
 28,
 5,
 77,
 2,
 8,
 6,
 2,
 182,
 1,
 577,
 15280,
 96,
 1,
 1,
 15148,
 99,
 12832,
 20,
 4,
 3,
 24196,
 23375,
 8,
 1880065,
 818,
 62,
 3,
 2962,
 3,
 109,
 2327,
 11,
 40,
 1,
 2,
 35,
 240,
 11354,
 43921,
 1,
 2,
 206,
 38,
 163,
 96,
 7052,
 4,
 70,
 32,
 4186,
 401,
 201,
 64,
 5,
 1790,
 365,
 312,
 72,
 4187,
 21,
 19,
 206488,
 2,
 2,
 8785,
 9611,
 3,
 302859,
 526,
 68,
 33,
 25,
 42,
 318,
 16,
 3,
 31,
 4,
 10,
 8547547,
 37,
 3131,
 12,
 1,
 305336,
 255519,
 1403482,
 2069,
 28963,
 34,
 59,
 6,
 36086,
 14,
 1,
 47,
 37295,
 9407,
 22847,
 11,
 178,
 15,
 1,
 11,
 3,
 319863,
 10,
 62,
 40,
 36,
 

In [28]:
rst_pd["call_num"] = call_num_list
rst_pd

Unnamed: 0,product_set,call_num
0,"CommercialPremium,Luxury,Premium,Unione",155
1,"Carpool,Luxury,Premium,SpFlash,Unione,YouXiang",197
2,"APlusFlash,Carpool,CommercialPremium,Flash,Oth...",1
3,"CommercialPremium,Flash,Other,Premium,Unione,Y...",1849
4,"CommercialPremium,Flash,Other,Premium,SpFlash,...",287
5,"APlusFlash,Flash,Premium,YouXiang",79073
6,"APlusFlash,Carpool,CommercialPremium,Flash,Lux...",1
7,"CommercialPremium,Flash,Other,YouXiang",250
8,"APlusFlash,Carpool,CommercialPremium,SpFlash",35
9,"APlusFlash,Carpool,Luxury,YouXiang",6


In [29]:
rst_pd.to_csv("./rst/xiefei.csv", index=False, header=True, sep=",", encoding="utf-8_sig")