In [122]:
from itertools import chain,combinations
from pyspark.sql import HiveContext,DataFrameWriter
from pyspark.sql.types import *

import copy
import time
import json
import datetime
import random
hc = HiveContext(sc)

hc.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
hc.setConf("spark.sql.parquet.compression.codec", "uncompressed")
hc.setConf("spark.sql.shuffle.partitions", "128")

In [3]:
df = hc.read.parquet("yyyymmdd=20170123/*")
df.registerTempTable("rowdata_celebrus0123")

## 計算Domain組合

In [4]:
DOMAINLIST = ["Web-PC", "Web-Mobile", "MyBank-PC", "MyBank-Mobile", "B2B", "KOKO"]

In [185]:
domain_combo = []
for i in range(1,7):
    domain_combo.append(list(combinations(DOMAINLIST,i)))
domain_combo = [list(row) for row in list(chain.from_iterable(domain_combo))]

In [186]:
domain_combo

[['Web-PC'],
 ['MyBank-Mobile'],
 ['B2B'],
 ['Web-Mobile'],
 ['MyBank-PC'],
 ['KOKO'],
 ['Web-PC', 'MyBank-Mobile'],
 ['Web-PC', 'B2B'],
 ['Web-PC', 'Web-Mobile'],
 ['Web-PC', 'MyBank-PC'],
 ['Web-PC', 'KOKO'],
 ['MyBank-Mobile', 'B2B'],
 ['MyBank-Mobile', 'Web-Mobile'],
 ['MyBank-Mobile', 'MyBank-PC'],
 ['MyBank-Mobile', 'KOKO'],
 ['B2B', 'Web-Mobile'],
 ['B2B', 'MyBank-PC'],
 ['B2B', 'KOKO'],
 ['Web-Mobile', 'MyBank-PC'],
 ['Web-Mobile', 'KOKO'],
 ['MyBank-PC', 'KOKO'],
 ['Web-PC', 'MyBank-Mobile', 'B2B'],
 ['Web-PC', 'MyBank-Mobile', 'Web-Mobile'],
 ['Web-PC', 'MyBank-Mobile', 'MyBank-PC'],
 ['Web-PC', 'MyBank-Mobile', 'KOKO'],
 ['Web-PC', 'B2B', 'Web-Mobile'],
 ['Web-PC', 'B2B', 'MyBank-PC'],
 ['Web-PC', 'B2B', 'KOKO'],
 ['Web-PC', 'Web-Mobile', 'MyBank-PC'],
 ['Web-PC', 'Web-Mobile', 'KOKO'],
 ['Web-PC', 'MyBank-PC', 'KOKO'],
 ['MyBank-Mobile', 'B2B', 'Web-Mobile'],
 ['MyBank-Mobile', 'B2B', 'MyBank-PC'],
 ['MyBank-Mobile', 'B2B', 'KOKO'],
 ['MyBank-Mobile', 'Web-Mobile', 'MyBank-

## Method 1 直覺作法

In [7]:
startDate = '2017-01-23'
endDate = '2017-01-23'
endDate_add1 = '2017-01-24'
rowRDD= hc.sql('''
               SELECT   A.domainname,
                        A.session_id,
                        A.cookie_id,
                        A.duration,
                        A.individual_id,
                        B.first_visit,
                        CASE WHEN first_visit < '{startDate}' THEN 'N'
                        ELSE 'Y' END AS newUser 
               FROM     rowdata_celebrus0123 A
               JOIN     ( SELECT   cookie_id,min(creation_datetime) as first_visit
                           FROM    rowdata_celebrus0123
                           GROUP BY cookie_id
                        )B
               ON       A.cookie_id = B.cookie_id
               WHERE    A.creation_datetime >='{startDate}' and A.creation_datetime < '{endDate_add1}' 
               '''.format(startDate=startDate,endDate_add1=endDate_add1)).rdd.persist(StorageLevel.MEMORY_AND_DISK)

In [8]:
print datetime.datetime.now()
result = []
for domain in domain_combo:
    domainRDD = rowRDD.filter(lambda line : line["domainname"] in domain).persist()
    sessionCnt = domainRDD.map(lambda line: line["session_id"]).distinct().count()
    if sessionCnt > 0:
        users = domainRDD.map(lambda line: line["cookie_id"]).distinct().count()
        pageViews = domainRDD.count()
        avgPages = float(pageViews)/sessionCnt
        avgDuration = domainRDD.map(lambda line: line["duration"] if line["duration"] else 0).sum()/sessionCnt
        newUserRate = domainRDD.filter(lambda line: line["newUser"]=='Y')\
                            .map(lambda line: line["cookie_id"])\
                            .distinct().count()/float(users)
        recognition = domainRDD.filter(lambda line: line["individual_id"]<>'None')\
                               .map(lambda line: line["cookie_id"])\
                               .distinct().count()/float(users)
        result.append({"startDate":"2017-01-01","endDate":"2017-01-02","domain":domain,\
                       "data":[users, pageViews, recognition, newUserRate, avgPages, avgDuration]})
    else:
        result.append({"startDate":"2017-01-01","endDate":"2017-01-02","domain":domain,\
                       "data":[0, 0, 0, 0, 0, 0]})
    domainRDD.unpersist()
print datetime.datetime.now()


2017-03-07 09:55:01.926447
2017-03-07 12:00:52.417442


## Method 2 合併Domain組合

In [9]:
startDate = '2017-01-23'
endDate = '2017-01-23'
endDate_add1 = '2017-01-24'
rowRDD= hc.sql('''
               SELECT   A.domainname,
                        A.session_id,
                        A.cookie_id,
                        A.duration,
                        A.individual_id,
                        B.first_visit,
                        CASE WHEN first_visit < '{startDate}' THEN 'N'
                        ELSE 'Y' END AS newUser 
               FROM     rowdata_celebrus0123 A
               JOIN     ( SELECT   cookie_id,min(creation_datetime) as first_visit
                           FROM    rowdata_celebrus0123
                           GROUP BY cookie_id
                        )B
               ON       A.cookie_id = B.cookie_id
               WHERE    A.creation_datetime >='{startDate}' and A.creation_datetime < '{endDate_add1}' 
               '''.format(startDate=startDate,endDate_add1=endDate_add1)).rdd.persist(StorageLevel.MEMORY_AND_DISK)

In [10]:
def domainMatch(pageDomain, value, newUserFlag='Y', individual='1'):
    for domain in domain_combo:
        if pageDomain in domain and newUserFlag == 'Y' and individual <> 'None' and 'XXXX' not in individual:
            yield (tuple(row for row in domain),value)

In [11]:
print datetime.datetime.now()
sessionCnt = rowRDD.flatMap(lambda line : domainMatch(line['domainname'], line['session_id']))\
                   .distinct()\
                   .countByKey()

users = rowRDD.flatMap(lambda line : domainMatch(line['domainname'], line["cookie_id"]))\
                   .distinct()\
                   .countByKey()

pageViews = rowRDD.flatMap(lambda line : domainMatch(line['domainname'], 1))\
                   .countByKey()
    
duration = dict(rowRDD.flatMap(lambda line : domainMatch(line['domainname'], line['duration'] if line['duration'] else 0))\
                  .reduceByKey(lambda x,y:x+y)
                  .collect())

newUsers = rowRDD.flatMap(lambda line : domainMatch(line['domainname'], line['cookie_id'], line['newUser']))\
                   .distinct()\
                   .countByKey()  

recognition = rowRDD.flatMap(lambda line : domainMatch(line['domainname'], line['cookie_id'], 'Y', line['individual_id'] if line['individual_id'] else 'None'))\
                   .distinct()\
                   .countByKey()    
        
print datetime.datetime.now()

2017-03-07 13:34:59.791637
2017-03-07 13:57:10.357061


In [7]:
result = []
for domain in domain_combo:
    if sessionCnt[tuple(domain)] > 0:
        recognitionRate = recognition[tuple(domain)]/float(users[tuple(domain)])
        newUserRate = newUsers[tuple(domain)]/float(users[tuple(domain)])
        avgPages = pageViews[tuple(domain)]/float(sessionCnt[tuple(domain)])
        avgDuration = duration[tuple(domain)]/sessionCnt[tuple(domain)]
        result.append({"startDate":startDate,"endDate":endDate,"domain":domain,\
                                  "data":[users[tuple(domain)], pageViews[tuple(domain)], recognitionRate,\
                                  newUserRate, avgPages, avgDuration]})
    else:
        result.append({"startDate":startDate,"endDate":endDate,"domain":domain,\
                       "data":[0, 0, 0.0, 0.0, 0.0, 0]})

In [114]:
result

[{'data': [63967, 257526, 1.0, 1.0, 2.780187630223796, 359807],
  'domain': ['Web-PC'],
  'endDate': '2017-01-02',
  'startDate': '2017-01-01'},
 {'data': [36549, 141690, 1.0, 1.0, 2.8259438760246516, 139898],
  'domain': ['Web-Mobile'],
  'endDate': '2017-01-02',
  'startDate': '2017-01-01'},
 {'data': [71450, 428264, 1.0, 1.0, 4.646709705419628, 374164],
  'domain': ['MyBank-PC'],
  'endDate': '2017-01-02',
  'startDate': '2017-01-01'},
 {'data': [76844, 412429, 1.0, 1.0, 3.967494612898261, 194149],
  'domain': ['MyBank-Mobile'],
  'endDate': '2017-01-02',
  'startDate': '2017-01-01'},
 {'data': [10794, 136911, 1.0, 1.0, 6.700812451057165, 1227477],
  'domain': ['B2B'],
  'endDate': '2017-01-02',
  'startDate': '2017-01-01'},
 {'data': [805, 3406, 1.0, 1.0, 3.686147186147186, 215456],
  'domain': ['KOKO'],
  'endDate': '2017-01-02',
  'startDate': '2017-01-01'},
 {'data': [100500, 399216, 1.0, 1.0, 2.796256864283313, 282577],
  'domain': ['Web-PC', 'Web-Mobile'],
  'endDate': '2017-0

## 當資料類型為['lucy',3,'aa'] 

In [8]:
schema = StructType([StructField('startDate', StringType(), False),
                     StructField('endDate', StringType(), False),
                     StructField('domain', ArrayType(elementType=StringType()), False),
                     StructField('data', StructType([
                                    StructField('users', IntegerType(), False), 
                                    StructField('pageViews', IntegerType(), False),
                                    StructField('recognition', FloatType(), False),
                                    StructField('newUserRate', FloatType(), False),
                                    StructField('avgPages', FloatType(), False),
                                    StructField('avgDuration', IntegerType(), False)
                                ])
                            , False)
                    ])

In [13]:
resultDF.printSchema()

root
 |-- startDate: string (nullable = false)
 |-- endDate: string (nullable = false)
 |-- domain: array (nullable = false)
 |    |-- element: string (containsNull = true)
 |-- data: struct (nullable = false)
 |    |-- users: integer (nullable = false)
 |    |-- pageView: integer (nullable = false)
 |    |-- recognition: float (nullable = false)
 |    |-- newUserRate: float (nullable = false)
 |    |-- avgPages: float (nullable = false)
 |    |-- domavgDurationain: integer (nullable = false)



In [None]:
hc.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
hc.setConf("spark.sql.parquet.compression.codec", "uncompressed")
hc.setConf("spark.sql.shuffle.partitions", "1")

resultDF = hc.createDataFrame(result,schema)
hc_writer = DataFrameWriter(resultDF)
hc_writer.saveAsTable('bap_chp.mongoData',mode='append',format='parquet')

## Method3 - AggregateByKey & 直打橫

In [189]:
def phase(domain, key, creation_datetime, first_datetime):
    values = []
   
    #ftime = time.mktime(datetime.datetime.strptime(first_datetime, "%Y-%m-%d").timetuple())
    stime = time.mktime(datetime.datetime.strptime(creation_datetime, "%Y-%m-%d").timetuple())
   
    real_idx = int(stime-sdatetime.value) / sec_day.value #實際上第幾天
    for i, d in enumerate(bc_domain_combo.value):
        values.append([0 for _ in range(int(edatetime.value-sdatetime.value)/sec_day.value)])
        if domain in d:
            for idx in range(real_idx, len(values[i])):
                values[i][idx] = 1
   
    return (key, values)
 
def seq1(t1, t2):
    for i in range(len(t1)):
        for j in range(len(t1[i])):
            t1[i][j] += t2[i][j]
           
    return t1
 
comb1 = seq1
 
def seq2(t1, t2):
    for i in range(len(t1)): #domain
        for j in range(len(t1[i])): #days
            t1[i][j][0] += t2[1][i][j]
            t1[i][j][1] += 1 if t2[1][i][j] > 0 else 0
           
    return t1
 
def comb2(t1, t2):
    for i in range(len(t1)):
        for j in range(len(t1[i])):
            t1[i][j][0] += t2[i][j][0]
            t1[i][j][1] += t2[i][j][1]
           
    return t1

In [160]:
s = "2017-01-23"
e = "2017-01-24"
 
DOMAINLIST = ["Web-PC", "MyBank-Mobile", "B2B", "Web-Mobile", "MyBank-PC", "KOKO"]
 
domain_combo = []
for i in range(1,7):
    domain_combo.append(list(combinations(DOMAINLIST,i)))
domain_combo = [",".join(list(row)) for row in list(chain.from_iterable(domain_combo))]
random.shuffle(domain_combo) #洗牌

sdatetime = sc.broadcast(time.mktime(datetime.datetime.strptime(s, "%Y-%m-%d").timetuple()))
edatetime = sc.broadcast(time.mktime(datetime.datetime.strptime(e, "%Y-%m-%d").timetuple()))
sec_day = sc.broadcast(86400)
 
init_values = sc.broadcast([[0 for _ in domain_combo] for _ in range(int(edatetime.value-sdatetime.value)/sec_day.value)])
 
btimestamp = time.time()
 
batch_size = 3
size = len(domain_combo) / batch_size
for iii in range(0, batch_size):
    siii = iii*size
    eiii = min(len(domain_combo), (iii+1)*size)
    d = domain_combo[siii:eiii]
   
    bc_domain_combo = sc.broadcast(d)
    rowRDD= hc.sql('''
                   SELECT   A.*,
                            B.first_visit,
                            CASE WHEN first_visit < '{s}' THEN 'N'
                            ELSE 'Y' END AS newUser
                   FROM     rowdata_celebrus0123 A
                   JOIN     ( SELECT   cookie_id,min(creation_datetime) as first_visit
                               FROM     rowdata_celebrus0123
                               WHERE domainname != "others"
                               GROUP BY cookie_id
                            )B
                   ON       A.cookie_id = B.cookie_id
                   WHERE    A.domainname != 'others' AND A.creation_datetime >= '{s}' and A.creation_datetime < '{e}'
                   '''.format(s=s, e=e)).rdd
 
    init_values_1 = []
    init_values_2 = []
    for domain in d:
        init_values_1.append([])
        init_values_2.append([])
        for _ in range(int(edatetime.value - sdatetime.value)/sec_day.value):
            init_values_1[-1].append(0)
            init_values_2[-1].append([0, 0])
 
    results = rowRDD.map(lambda x: phase(x["domainname"],
                                         x["session_id"],
                                         x["creation_datetime"].split(" ")[0],
                                         x["first_visit"].split(" ")[0])).\
                     aggregateByKey(init_values_1, seq1, comb1).\
                     #map(lambda x: x[1]).\
                     aggregate(init_values_2, seq2, comb2)
 
    print time.time() - btimestamp   

534.776773214
1109.50695705
1669.64082599


In [195]:
s = "2017-01-23"
e = "2017-01-24"
 
DOMAINLIST = ["Web-PC", "MyBank-Mobile", "B2B", "Web-Mobile", "MyBank-PC", "KOKO"]
 
domain_combo = []
for i in range(1,7):
    domain_combo.append(list(combinations(DOMAINLIST,i)))
domain_combo = [",".join(list(row)) for row in list(chain.from_iterable(domain_combo))]
random.shuffle(domain_combo) #洗牌

sdatetime = sc.broadcast(time.mktime(datetime.datetime.strptime(s, "%Y-%m-%d").timetuple()))
edatetime = sc.broadcast(time.mktime(datetime.datetime.strptime(e, "%Y-%m-%d").timetuple()))
sec_day = sc.broadcast(86400)
 
init_values = sc.broadcast([[0 for _ in domain_combo] for _ in range(int(edatetime.value-sdatetime.value)/sec_day.value)])
 
btimestamp = time.time()
 
batch_size = 1
size = len(domain_combo) / batch_size
for iii in range(0, batch_size):
    siii = iii*size
    eiii = min(len(domain_combo), (iii+1)*size)
    d = domain_combo[siii:eiii]
   
    bc_domain_combo = sc.broadcast(d)
    rowRDD= hc.sql('''
                   SELECT   A.*,
                            B.first_visit,
                            CASE WHEN first_visit < '{s}' THEN 'N'
                            ELSE 'Y' END AS newUser
                   FROM     rowdata_celebrus0123 A
                   JOIN     ( SELECT   cookie_id,min(creation_datetime) as first_visit
                               FROM     rowdata_celebrus0123
                               WHERE domainname != "others"
                               GROUP BY cookie_id
                            )B
                   ON       A.cookie_id = B.cookie_id
                   WHERE    A.domainname != 'others' AND A.creation_datetime >= '{s}' and A.creation_datetime < '{e}'
                   '''.format(s=s, e=e)).rdd
 
    init_values_1 = []
    init_values_2 = []
    for domain in d:
        init_values_1.append([])
        init_values_2.append([])
        for _ in range(int(edatetime.value - sdatetime.value)/sec_day.value):
            init_values_1[-1].append(0)
            init_values_2[-1].append([0, 0])
 
    results = rowRDD.map(lambda x: phase(x["domainname"],
                                         x["session_id"],
                                         x["creation_datetime"].split(" ")[0],
                                         x["first_visit"].split(" ")[0])).\
                     aggregateByKey(init_values_1, seq1, comb1).\
                     aggregate(init_values_2, seq2, comb2)
 
    print time.time() - btimestamp   



1348.73073292


In [198]:
results

[[[394437, 110542]],
 [[145096, 51020]],
 [[806866, 214452]],
 [[415835, 104868]],
 [[844099, 196962]],
 [[1122700, 257646]],
 [[428264, 92165]],
 [[136911, 20432]],
 [[822701, 165462]],
 [[554119, 145640]],
 [[669955, 196581]],
 [[568581, 112144]],
 [[710271, 162158]],
 [[1235130, 269372]],
 [[141690, 50139]],
 [[1239909, 293836]],
 [[981010, 216046]],
 [[826107, 166222]],
 [[827480, 198335]],
 [[685790, 148196]],
 [[140317, 21353]],
 [[815051, 238998]],
 [[964391, 215519]],
 [[260932, 93401]],
 [[573360, 143114]],
 [[948556, 256087]],
 [[811645, 238269]],
 [[1098219, 252148]],
 [[536127, 160599]],
 [[282007, 71367]],
 [[412429, 103952]],
 [[402622, 143497]],
 [[257526, 92629]],
 [[397843, 111314]],
 [[1243315, 294553]],
 [[569954, 142304]],
 [[967797, 216236]],
 [[810272, 215216]],
 [[1380226, 311724]],
 [[431670, 93018]],
 [[689196, 148956]],
 [[278601, 70489]],
 [[549340, 124342]],
 [[985789, 238615]],
 [[982383, 237805]],
 [[539533, 161328]],
 [[1101625, 252900]],
 [[557525, 14652

## partition內做seq, partition間做combine

In [105]:
def seq(a,b):
    return a*b

def combine(a,b):
    return a+b

In [110]:
data = sc.parallelize([(1,3),(1,2),(1,4),\
                       (1,5),(1,6),(2,3),\
                       (2,5),(3,3),(3,6)],3)
data.aggregateByKey(2,seq,combine,10).collect()

[(1, 108), (2, 16), (3, 36)]

In [None]:
data.aggregateByKey(4,seq,combine,2).collect()