In [17]:
import pandas as pd
from pyspark.ml.feature import OneHotEncoderEstimator

# Show Data

In [84]:
file_path = "/nfs/project/DIEN/dien_tf2/taobao_data/"

In [4]:
behavior_log = pd.read_csv(file_path + "behavior_log.csv", nrows=100)
set(behavior_log["btag"].values)
behavior_log

Unnamed: 0,user,time_stamp,btag,cate,brand
0,558157,1493741625,pv,6250,91286
1,558157,1493741626,pv,6250,91286
2,558157,1493741627,pv,6250,91286
3,728690,1493776998,pv,11800,62353
4,332634,1493809895,pv,1101,365477
...,...,...,...,...,...
95,1093365,1493824491,pv,6056,101331
96,1093365,1493824549,pv,6056,101331
97,885384,1493795275,pv,6056,101331
98,796539,1493797778,pv,6056,101331


In [5]:
ad_feature = pd.read_csv(file_path + "ad_feature.csv", nrows=100)
ad_feature


Unnamed: 0,adgroup_id,cate_id,campaign_id,customer,brand,price
0,63133,6406,83237,1,95471.0,170.00
1,313401,6406,83237,1,87331.0,199.00
2,248909,392,83237,1,32233.0,38.00
3,208458,392,83237,1,174374.0,139.00
4,110847,7211,135256,2,145952.0,32.99
...,...,...,...,...,...,...
95,8105,6814,13914,72,,532.00
96,8331,10885,13914,72,205431.0,138.00
97,63603,6814,83371,72,,532.00
98,338430,6142,140936,74,,130.00


In [6]:
raw_sample = pd.read_csv(file_path + "raw_sample.csv", nrows=100)
raw_sample

Unnamed: 0,user,time_stamp,adgroup_id,pid,nonclk,clk
0,581738,1494137644,1,430548_1007,1,0
1,449818,1494638778,3,430548_1007,1,0
2,914836,1494650879,4,430548_1007,1,0
3,914836,1494651029,5,430548_1007,1,0
4,399907,1494302958,8,430548_1007,1,0
...,...,...,...,...,...,...
95,443793,1494496413,102,430539_1007,1,0
96,872834,1494303087,102,430539_1007,1,0
97,644063,1494632747,102,430548_1007,1,0
98,618501,1494655235,102,430539_1007,1,0


In [7]:
user_profile = pd.read_csv(file_path + "user_profile.csv", nrows=100)
user_profile

Unnamed: 0,userid,cms_segid,cms_group_id,final_gender_code,age_level,pvalue_level,shopping_level,occupation,new_user_class_level
0,234,0,5,2,5,,3,0,3.0
1,523,5,2,2,2,1.0,3,1,2.0
2,612,0,8,1,2,2.0,3,0,
3,1670,0,4,2,4,,1,0,
4,2545,0,10,1,4,,3,0,
...,...,...,...,...,...,...,...,...,...
95,41143,7,2,2,2,1.0,3,0,2.0
96,41148,0,1,2,1,,3,1,3.0
97,42150,0,2,2,2,,1,0,
98,42180,0,5,2,5,,1,0,


# Process Data With Spark

In [7]:
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [8]:
hdfs_path = '输入自己的hdfs路径'
spark_format = 'com.databricks.spark.csv'

In [9]:
ad_feature = spark.read.format(spark_format).options(header='true', inferschema='true').load(hdfs_path + 'ad_feature.csv')
print(ad_feature.dtypes)

[('adgroup_id', 'int'), ('cate_id', 'int'), ('campaign_id', 'int'), ('customer', 'int'), ('brand', 'string'), ('price', 'double')]


In [10]:
behavior_log = spark.read.format(spark_format).options(header='true', inferschema='true').load(hdfs_path + 'behavior_log.csv')
print(behavior_log.dtypes)

[('user', 'int'), ('time_stamp', 'int'), ('btag', 'string'), ('cate', 'int'), ('brand', 'int')]


In [11]:
raw_sample = spark.read.format(spark_format).options(header='true', inferschema='true').load(hdfs_path + 'raw_sample.csv')
print(raw_sample.dtypes)

[('user', 'int'), ('time_stamp', 'int'), ('adgroup_id', 'int'), ('pid', 'string'), ('nonclk', 'int'), ('clk', 'int')]


In [12]:
user_profile = spark.read.format(spark_format).options(header='true', inferschema='true').load(hdfs_path + 'user_profile.csv')
print(user_profile.dtypes)

[('userid', 'int'), ('cms_segid', 'int'), ('cms_group_id', 'int'), ('final_gender_code', 'int'), ('age_level', 'int'), ('pvalue_level', 'int'), ('shopping_level', 'int'), ('occupation', 'int'), ('new_user_class_level ', 'int')]


## Spark Test

In [20]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="brand", outputCol="brandIndex")
label_encoder = indexer.fit(behavior_log)
indexed = label_encoder.transform(behavior_log)
indexed.show()

+------+----------+----+-----+------+----------+
|  user|time_stamp|btag| cate| brand|brandIndex|
+------+----------+----+-----+------+----------+
|558157|1493741625|  pv| 6250| 91286|   92462.0|
|558157|1493741626|  pv| 6250| 91286|   92462.0|
|558157|1493741627|  pv| 6250| 91286|   92462.0|
|728690|1493776998|  pv|11800| 62353|     640.0|
|332634|1493809895|  pv| 1101|365477|   13805.0|
|857237|1493816945|  pv| 1043|110616|   52295.0|
|619381|1493774638|  pv|  385|428950|    3866.0|
|467042|1493772641|  pv| 8237|301299|     257.0|
|467042|1493772644|  pv| 8237|301299|     257.0|
|991528|1493780710|  pv| 7270|274795|   82042.0|
|991528|1493780712|  pv| 7270|274795|   82042.0|
|991528|1493780712|  pv| 7270|274795|   82042.0|
|991528|1493780712|  pv| 7270|274795|   82042.0|
|991528|1493780714|  pv| 7270|274795|   82042.0|
|991528|1493780765|  pv| 7270|274795|   82042.0|
|991528|1493780714|  pv| 7270|274795|   82042.0|
|991528|1493780765|  pv| 7270|274795|   82042.0|
|991528|1493780764| 

In [22]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="brand", outputCol="brandIndex")
label_encoder = indexer.fit(ad_feature)
indexed_1 = label_encoder.transform(ad_feature)
indexed_1.show()

+----------+-------+-----------+--------+------+-----+----------+
|adgroup_id|cate_id|campaign_id|customer| brand|price|brandIndex|
+----------+-------+-----------+--------+------+-----+----------+
|     63133|   6406|      83237|       1| 95471|170.0|   12569.0|
|    313401|   6406|      83237|       1| 87331|199.0|   30045.0|
|    248909|    392|      83237|       1| 32233| 38.0|   39839.0|
|    208458|    392|      83237|       1|174374|139.0|   30020.0|
|    110847|   7211|     135256|       2|145952|32.99|   75386.0|
|    607788|   6261|     387991|       6|207800|199.0|      65.0|
|    375706|   4520|     387991|       6|  NULL| 99.0|       0.0|
|     11115|   7213|     139747|       9|186847| 33.0|   46009.0|
|     24484|   7207|     139744|       9|186847| 19.0|   46009.0|
|     28589|   5953|     395195|      13|  NULL|428.0|       0.0|
|     23236|   5953|     395195|      13|  NULL|368.0|       0.0|
|    300556|   5953|     395195|      13|  NULL|639.0|       0.0|
|     9256

In [23]:
indexed.where("brand = 95471" ).show()

+-------+----------+----+----+-----+----------+
|   user|time_stamp|btag|cate|brand|brandIndex|
+-------+----------+----+----+-----+----------+
| 502199|1493807392|  pv|6406|95471|   13573.0|
|  44764|1493821582|  pv|6406|95471|   13573.0|
|1100859|1493803302|  pv|6406|95471|   13573.0|
| 983112|1493765238|  pv|6406|95471|   13573.0|
| 618579|1493811839|  pv|6406|95471|   13573.0|
| 618579|1493811837|  pv|6406|95471|   13573.0|
| 618579|1493811799|  pv|6406|95471|   13573.0|
| 618579|1493812379|  pv|6406|95471|   13573.0|
| 379426|1493808443|  pv|6406|95471|   13573.0|
| 536016|1493776270|  pv|6406|95471|   13573.0|
| 536016|1493776277|  pv|6406|95471|   13573.0|
| 361091|1493791416|  pv|6406|95471|   13573.0|
| 361091|1493790591|  pv|6406|95471|   13573.0|
| 361091|1493791416|  pv|6406|95471|   13573.0|
| 361091|1493790586|  pv|6406|95471|   13573.0|
| 724724|1493742342|  pv|6405|95471|   13573.0|
| 893762|1493810436|  pv|6406|95471|   13573.0|
| 807734|1493773789|  pv|6406|95471|   1

In [24]:
indexed_1.where("brand = 95471" ).show()

+----------+-------+-----------+--------+-----+-----+----------+
|adgroup_id|cate_id|campaign_id|customer|brand|price|brandIndex|
+----------+-------+-----------+--------+-----+-----+----------+
|     63133|   6406|      83237|       1|95471|170.0|   12569.0|
|    569665|   6406|      76716|   53089|95471|170.0|   12569.0|
|    587832|   6406|      76716|   53089|95471| 95.0|   12569.0|
|     54106|   6406|     130986|    7740|95471| 95.0|   12569.0|
|    465209|   6406|      26722|  106644|95471| 90.0|   12569.0|
|    347430|   6406|     323643|  106644|95471|170.0|   12569.0|
|    134638|   6406|      47525|  123525|95471|178.0|   12569.0|
|    495053|   6406|     362969|  225438|95471|108.0|   12569.0|
|    632046|   6406|     353670|   53972|95471|180.0|   12569.0|
+----------+-------+-----------+--------+-----+-----+----------+



## (1) 将user_profile中的用户特征与训练数据raw_sample做关联,得到训练数据的用户画像特征

In [42]:
raw_sample.show()

+------+----------+----------+-----------+------+---+
|  user|time_stamp|adgroup_id|        pid|nonclk|clk|
+------+----------+----------+-----------+------+---+
|581738|1494137644|         1|430548_1007|     1|  0|
|449818|1494638778|         3|430548_1007|     1|  0|
|914836|1494650879|         4|430548_1007|     1|  0|
|914836|1494651029|         5|430548_1007|     1|  0|
|399907|1494302958|         8|430548_1007|     1|  0|
|628137|1494524935|         9|430548_1007|     1|  0|
|298139|1494462593|         9|430539_1007|     1|  0|
|775475|1494561036|         9|430548_1007|     1|  0|
|555266|1494307136|        11|430539_1007|     1|  0|
|117840|1494036743|        11|430548_1007|     1|  0|
|739815|1494115387|        11|430539_1007|     1|  0|
|623911|1494625301|        11|430548_1007|     1|  0|
|623911|1494451608|        11|430548_1007|     1|  0|
|421590|1494034144|        11|430548_1007|     1|  0|
|976358|1494156949|        13|430548_1007|     1|  0|
|286630|1494218579|        1

In [43]:
user_profile.show()

+------+---------+------------+-----------------+---------+------------+--------------+----------+---------------------+
|userid|cms_segid|cms_group_id|final_gender_code|age_level|pvalue_level|shopping_level|occupation|new_user_class_level |
+------+---------+------------+-----------------+---------+------------+--------------+----------+---------------------+
|   234|        0|           5|                2|        5|        null|             3|         0|                    3|
|   523|        5|           2|                2|        2|           1|             3|         1|                    2|
|   612|        0|           8|                1|        2|           2|             3|         0|                 null|
|  1670|        0|           4|                2|        4|        null|             1|         0|                 null|
|  2545|        0|          10|                1|        4|        null|             3|         0|                 null|
|  3644|       49|           6| 

In [44]:
rst = raw_sample.join(user_profile, raw_sample.user == user_profile.userid, how='left') 
rst.dtypes

[('user', 'int'),
 ('time_stamp', 'int'),
 ('adgroup_id', 'int'),
 ('pid', 'string'),
 ('nonclk', 'int'),
 ('clk', 'int'),
 ('userid', 'int'),
 ('cms_segid', 'int'),
 ('cms_group_id', 'int'),
 ('final_gender_code', 'int'),
 ('age_level', 'int'),
 ('pvalue_level', 'int'),
 ('shopping_level', 'int'),
 ('occupation', 'int'),
 ('new_user_class_level ', 'int')]

In [45]:
rst.agg({"*": "count"}).show()

+--------+
|count(1)|
+--------+
|26557961|
+--------+



In [46]:
raw_sample.agg({"*": "count"}).show()

+--------+
|count(1)|
+--------+
|26557961|
+--------+



In [47]:
rst.show()

+----+----------+----------+-----------+------+---+------+---------+------------+-----------------+---------+------------+--------------+----------+---------------------+
|user|time_stamp|adgroup_id|        pid|nonclk|clk|userid|cms_segid|cms_group_id|final_gender_code|age_level|pvalue_level|shopping_level|occupation|new_user_class_level |
+----+----------+----------+-----------+------+---+------+---------+------------+-----------------+---------+------------+--------------+----------+---------------------+
|   1|1494571525|    133190|430548_1007|     1|  0|     1|       92|          11|                1|        5|           2|             3|         0|                    3|
|   1|1494478236|    752759|430548_1007|     1|  0|     1|       92|          11|                1|        5|           2|             3|         0|                    3|
|   1|1494571525|    769066|430548_1007|     1|  0|     1|       92|          11|                1|        5|           2|             3|        

In [48]:
#rst.write.mode("overwrite").options(header="true").csv(hdfs_path + "train_data_user_profile")

## (2) 将ad_feature与rst[raw_sample.csv与user_profile.csv的关联结果]做关联 

In [49]:
ad_feature.show()

+----------+-------+-----------+--------+------+-----+
|adgroup_id|cate_id|campaign_id|customer| brand|price|
+----------+-------+-----------+--------+------+-----+
|     63133|   6406|      83237|       1| 95471|170.0|
|    313401|   6406|      83237|       1| 87331|199.0|
|    248909|    392|      83237|       1| 32233| 38.0|
|    208458|    392|      83237|       1|174374|139.0|
|    110847|   7211|     135256|       2|145952|32.99|
|    607788|   6261|     387991|       6|207800|199.0|
|    375706|   4520|     387991|       6|  NULL| 99.0|
|     11115|   7213|     139747|       9|186847| 33.0|
|     24484|   7207|     139744|       9|186847| 19.0|
|     28589|   5953|     395195|      13|  NULL|428.0|
|     23236|   5953|     395195|      13|  NULL|368.0|
|    300556|   5953|     395195|      13|  NULL|639.0|
|     92560|   5953|     395195|      13|  NULL|368.0|
|    590965|   4284|      28145|      14|454237|249.0|
|    529913|   4284|      70206|      14|  NULL|249.0|
|    54693

In [50]:
rst.show()

+----+----------+----------+-----------+------+---+------+---------+------------+-----------------+---------+------------+--------------+----------+---------------------+
|user|time_stamp|adgroup_id|        pid|nonclk|clk|userid|cms_segid|cms_group_id|final_gender_code|age_level|pvalue_level|shopping_level|occupation|new_user_class_level |
+----+----------+----------+-----------+------+---+------+---------+------------+-----------------+---------+------------+--------------+----------+---------------------+
|   1|1494571525|    133190|430548_1007|     1|  0|     1|       92|          11|                1|        5|           2|             3|         0|                    3|
|   1|1494571525|    142774|430548_1007|     1|  0|     1|       92|          11|                1|        5|           2|             3|         0|                    3|
|   1|1494571525|    769066|430548_1007|     1|  0|     1|       92|          11|                1|        5|           2|             3|        

In [51]:
rst = rst.join(ad_feature, "adgroup_id", how='left') 
rst.dtypes

[('adgroup_id', 'int'),
 ('user', 'int'),
 ('time_stamp', 'int'),
 ('pid', 'string'),
 ('nonclk', 'int'),
 ('clk', 'int'),
 ('userid', 'int'),
 ('cms_segid', 'int'),
 ('cms_group_id', 'int'),
 ('final_gender_code', 'int'),
 ('age_level', 'int'),
 ('pvalue_level', 'int'),
 ('shopping_level', 'int'),
 ('occupation', 'int'),
 ('new_user_class_level ', 'int'),
 ('cate_id', 'int'),
 ('campaign_id', 'int'),
 ('customer', 'int'),
 ('brand', 'string'),
 ('price', 'double')]

In [52]:
for col in rst.columns:
    print("'" + col + "',", end = "")

'adgroup_id','user','time_stamp','pid','nonclk','clk','userid','cms_segid','cms_group_id','final_gender_code','age_level','pvalue_level','shopping_level','occupation','new_user_class_level ','cate_id','campaign_id','customer','brand','price',

In [65]:
rst.where(rst["cate_id"].isNotNull()).select("cate_id").count()

26557961

In [66]:
rst.select("cate_id").count()

26557961

In [55]:
rst.write.mode("overwrite").options(header="true").csv(hdfs_path + "train_data")

## (3) 将behavior_log用户行为序列按user_id进行group by分组后按时间戳由小到大排序

In [56]:
behavior_log.show()

+------+----------+----+-----+------+
|  user|time_stamp|btag| cate| brand|
+------+----------+----+-----+------+
|558157|1493741625|  pv| 6250| 91286|
|558157|1493741626|  pv| 6250| 91286|
|558157|1493741627|  pv| 6250| 91286|
|728690|1493776998|  pv|11800| 62353|
|332634|1493809895|  pv| 1101|365477|
|857237|1493816945|  pv| 1043|110616|
|619381|1493774638|  pv|  385|428950|
|467042|1493772641|  pv| 8237|301299|
|467042|1493772644|  pv| 8237|301299|
|991528|1493780710|  pv| 7270|274795|
|991528|1493780712|  pv| 7270|274795|
|991528|1493780712|  pv| 7270|274795|
|991528|1493780712|  pv| 7270|274795|
|991528|1493780714|  pv| 7270|274795|
|991528|1493780765|  pv| 7270|274795|
|991528|1493780714|  pv| 7270|274795|
|991528|1493780765|  pv| 7270|274795|
|991528|1493780764|  pv| 7270|274795|
|991528|1493780633|  pv| 7270|274795|
|991528|1493780764|  pv| 7270|274795|
+------+----------+----+-----+------+
only showing top 20 rows



In [57]:
behavior_log.count()

723268134

In [58]:
behavior_log.select("user", "time_stamp").show()

+------+----------+
|  user|time_stamp|
+------+----------+
|558157|1493741625|
|558157|1493741626|
|558157|1493741627|
|728690|1493776998|
|332634|1493809895|
|857237|1493816945|
|619381|1493774638|
|467042|1493772641|
|467042|1493772644|
|991528|1493780710|
|991528|1493780712|
|991528|1493780712|
|991528|1493780712|
|991528|1493780714|
|991528|1493780765|
|991528|1493780714|
|991528|1493780765|
|991528|1493780764|
|991528|1493780633|
|991528|1493780764|
+------+----------+
only showing top 20 rows



In [67]:
behavior_log.sort('time_stamp',ascending=True).show()

+-------+-----------+----+----+------+
|   user| time_stamp|btag|cate| brand|
+-------+-----------+----+----+------+
| 672029|-2142988435|  pv|4287|364892|
| 561996|-2104520662|  pv|6160|247789|
| 561996|-2104520653|  pv|6160|247789|
| 561996|-2104520651|  pv|6160|247789|
| 561996|-2104520630|  pv|6160|247789|
| 561996|-2104520605|  pv|6160|247789|
| 561996|-2104520539|  pv|6160|247789|
| 561996|-2104520422|  pv|4384|312716|
| 561996|-2104520420|  pv|4384|312716|
| 561996|-2104520415|  pv|4384|312716|
| 561996|-2104520388|  pv|4384|312716|
| 561996|-2104520387|  pv|4384|312716|
|1013803|-2101140178|  pv|1483| 14127|
|1013803|-2101140173|  pv|1483| 14127|
|1013803|-2101140160|  pv|1483| 14127|
|1013803|-2101140159|  pv|1483| 14127|
|1013803|-2101140159|  pv|1483| 14127|
|1013803|-2101139349|  pv|1483|270347|
|1013803|-2101139347|  pv|1483|270347|
|1013803|-2101139292|  pv|1483|270347|
+-------+-----------+----+----+------+
only showing top 20 rows



In [60]:
behavior_log.orderBy('time_stamp','user').show()

+-------+-----------+----+----+------+
|   user| time_stamp|btag|cate| brand|
+-------+-----------+----+----+------+
| 672029|-2142988435|  pv|4287|364892|
| 561996|-2104520662|  pv|6160|247789|
| 561996|-2104520653|  pv|6160|247789|
| 561996|-2104520651|  pv|6160|247789|
| 561996|-2104520630|  pv|6160|247789|
| 561996|-2104520605|  pv|6160|247789|
| 561996|-2104520539|  pv|6160|247789|
| 561996|-2104520422|  pv|4384|312716|
| 561996|-2104520420|  pv|4384|312716|
| 561996|-2104520415|  pv|4384|312716|
| 561996|-2104520388|  pv|4384|312716|
| 561996|-2104520387|  pv|4384|312716|
|1013803|-2101140178|  pv|1483| 14127|
|1013803|-2101140173|  pv|1483| 14127|
|1013803|-2101140160|  pv|1483| 14127|
|1013803|-2101140159|  pv|1483| 14127|
|1013803|-2101140159|  pv|1483| 14127|
|1013803|-2101139349|  pv|1483|270347|
|1013803|-2101139347|  pv|1483|270347|
|1013803|-2101139292|  pv|1483|270347|
+-------+-----------+----+----+------+
only showing top 20 rows



In [None]:
-------

### a. 将数据存入hive表

In [96]:
from pyspark.sql import SparkSession, HiveContext
def save_hive(data, table_name):
    data.registerTempTable('test_hive')
    sqlContext.sql("create table " + table_name + " select * from test_hive")
    return

In [69]:
behavior_log.registerTempTable('test_hive')

In [74]:
behavior_log.registerTempTable('test_hive')
table_name = "guide_dien_user_behavior"
sqlContext.sql("create table " + table_name + " select * from test_hive")

DataFrame[]

In [89]:
raw_sample.registerTempTable('test_hive')
table_name = "guide_dien_raw_sample"
sqlContext.sql("create table " + table_name + " select * from test_hive")

DataFrame[]

In [90]:
user_profile.registerTempTable('test_hive')
table_name = "guide_dien_user_profile"
sqlContext.sql("create table " + table_name + " select * from test_hive")

DataFrame[]

In [91]:
ad_feature.registerTempTable('test_hive')
table_name = "guide_dien_ad_feature"
sqlContext.sql("create table " + table_name + " select * from test_hive")

DataFrame[]

### b. 使用sql处理behavior_log用户行为序列数据

In [93]:
def load_sql_file(sql_file):
    with open(sql_file, 'r') as isf:
        sql_txt = isf.readlines()
        return "".join(sql_txt)

#### user_id分组后按timestamp升序排序并保存到hive

In [103]:
sql = load_sql_file(file_path + "../get_behavior.sql")
print(sql)
df = spark.sql(sql)
df.show()

select 
    user,
    time_stamp,
    btag,
    cate,
    brand,
    RANK() OVER (PARTITION BY user ORDER BY time_stamp ASC) RK
from 
    stg_gs.guide_dien_user_behavior


+----+----------+----+----+------+---+
|user|time_stamp|btag|cate| brand| RK|
+----+----------+----+----+------+---+
|  65|1492900231|  pv|6423|202844|  1|
|  65|1493001655|  pv|1535|  1933|  2|
|  65|1493001696|  pv|5144|221012|  3|
|  65|1493001718|  pv|5144|221012|  4|
|  65|1493001718|  pv|5144|221012|  4|
|  65|1493163026|  pv|4384| 83700|  6|
|  65|1493163115|  pv|4384|268509|  7|
|  65|1493163153|cart|4384|268509|  8|
|  65|1493163157|  pv|4384|268509|  9|
|  65|1493284569|  pv| 859|102030| 10|
|  65|1493337810|  pv|4384|268509| 11|
|  65|1493337870|  pv|4384|268509| 12|
|  65|1493383822|  pv|8233|211132| 13|
|  65|1493383863|  pv|8233|211132| 14|
|  65|1493383881|  pv|8233|211132| 15|
|  65|1493383893|  pv|8233|211132| 16|
|  65|1493383931|  pv|8233|211132| 17|
|  65|1493383933|  pv|8233|211132| 18|
|  65|149

In [None]:
save_hive(df, "guide_dien_behavior_list")

#### 点击数据behavior list序列特征

In [112]:
sql = load_sql_file(file_path + "../click_behavior_data.sql")
print(sql)
df = spark.sql(sql)
df.show()

select
    *
from 
   stg_gs.guide_dien_behavior_list
where
    btag in ("cart", "fav", "buy")
+----+----------+----+----+------+---+
|user|time_stamp|btag|cate| brand| RK|
+----+----------+----+----+------+---+
|  65|1493163153|cart|4384|268509|  8|
|  65|1493533129|cart|8867|111917| 87|
|  65|1493941650|cart|8233|211132|114|
|  65|1493958804| buy|8233|211132|121|
|  65|1493958826| buy|8233|211132|123|
|  65|1494074722|cart|8214| 10655|168|
|  65|1494379922|cart|9095|254674|176|
|  65|1494579211| buy|6172|194766|210|
| 133|1493389745| fav|6251|151773| 39|
| 133|1493390266|cart|4267|400399| 90|
| 133|1493391032| fav|4384|115267|126|
| 133|1493392029| fav|6408| 80872|154|
| 133|1493392869| buy|6251|151773|173|
| 133|1493392869| buy|6251|151773|173|
| 133|1493573034| fav|4838| 49903|242|
| 133|1493573608| fav|2063|374661|264|
| 133|1493573836|cart|2063|374661|278|
| 133|1493574249|cart|4413| 32904|299|
| 133|1493619678| fav|6407| 55596|330|
| 133|1493622340| fav|6261|318028|387|
+----+--

In [115]:
import pyspark.sql.functions as f
behavior_list_cart = df.groupby('user').agg(f.collect_list('cate'), f.collect_list('brand'),f.collect_list('RK'))
behavior_list_cart.show()

+----+--------------------+--------------------+--------------------+
|user|  collect_list(cate)| collect_list(brand)|    collect_list(RK)|
+----+--------------------+--------------------+--------------------+
|   1|[7971, 7971, 7971...|[353787, 245773, ...|[2, 11, 33, 37, 4...|
|   3|[4603, 6432, 6432...|[151043, 253841, ...|[2, 48, 50, 166, ...|
|   4|[5467, 5467, 5467...|[172369, 172369, ...|[80, 83, 85, 95, ...|
|   6|[6806, 6806, 6251...|[310648, 310648, ...|[2, 10, 18, 38, 5...|
|   8|[1226, 6421, 856,...|[363857, 126738, ...|[8, 30, 48, 50, 6...|
|  11|[6180, 4262, 4262...|[450785, 370203, ...|[32, 50, 51, 54, 92]|
|  12|[531, 8979, 8979,...|[348644, 37621, 3...|[36, 134, 134, 13...|
|  33|[6408, 11203, 112...|[45669, 57959, 57...|[19, 24, 41, 43, ...|
|  35|[4262, 4262, 6428...|[370203, 370203, ...|[1, 35, 59, 311, ...|
|  37|[4281, 6274, 6059...|[314496, 43945, 6...|[35, 77, 161, 166...|
|  60|[6428, 6428, 6428...|[3130, 150395, 10...|[2, 9, 20, 22, 26...|
|  61|[4520, 4283, 8

In [116]:
behavior_list_cart = behavior_list_cart.withColumnRenamed('collect_list(cate)', 'click_cate')
behavior_list_cart = behavior_list_cart.withColumnRenamed('collect_list(brand)', 'click_brand')
behavior_list_cart = behavior_list_cart.withColumnRenamed('collect_list(RK)', 'click_RK')
behavior_list_cart.show()

+----+--------------------+--------------------+--------------------+
|user|          click_cate|         click_brand|            click_RK|
+----+--------------------+--------------------+--------------------+
|   1|[7971, 7971, 7971...|[353787, 245773, ...|[2, 11, 33, 37, 4...|
|   3|[4603, 6432, 6432...|[151043, 253841, ...|[2, 48, 50, 166, ...|
|   4|[5467, 5467, 5467...|[172369, 172369, ...|[80, 83, 85, 95, ...|
|   6|[6806, 6806, 6251...|[310648, 310648, ...|[2, 10, 18, 38, 5...|
|   8|[1226, 6421, 856,...|[363857, 126738, ...|[8, 30, 48, 50, 6...|
|  11|[6180, 4262, 4262...|[450785, 370203, ...|[32, 50, 51, 54, 92]|
|  12|[531, 8979, 8979,...|[348644, 37621, 3...|[36, 134, 134, 13...|
|  33|[6408, 11203, 112...|[45669, 57959, 57...|[19, 24, 41, 43, ...|
|  35|[4262, 4262, 6428...|[370203, 370203, ...|[1, 35, 59, 311, ...|
|  37|[4281, 6274, 6059...|[314496, 43945, 6...|[35, 77, 161, 166...|
|  60|[6428, 6428, 6428...|[3130, 150395, 10...|[2, 9, 20, 22, 26...|
|  61|[4520, 4283, 8

In [117]:
save_hive(behavior_list_cart, "guide_dien_click_behavior_features")

#### 展现数据behavior list序列特征

In [121]:
sql = load_sql_file(file_path + "../show_behavior_feature.sql")
print(sql)
df = spark.sql(sql)
df.show()

select
    *
from 
   stg_gs.guide_dien_behavior_list
where
    btag in ("pv")
+----+----------+----+----+------+---+
|user|time_stamp|btag|cate| brand| RK|
+----+----------+----+----+------+---+
|  65|1492900231|  pv|6423|202844|  1|
|  65|1493001655|  pv|1535|  1933|  2|
|  65|1493001696|  pv|5144|221012|  3|
|  65|1493001718|  pv|5144|221012|  4|
|  65|1493001718|  pv|5144|221012|  4|
|  65|1493163026|  pv|4384| 83700|  6|
|  65|1493163115|  pv|4384|268509|  7|
|  65|1493163157|  pv|4384|268509|  9|
|  65|1493284569|  pv| 859|102030| 10|
|  65|1493337810|  pv|4384|268509| 11|
|  65|1493337870|  pv|4384|268509| 12|
|  65|1493383822|  pv|8233|211132| 13|
|  65|1493383863|  pv|8233|211132| 14|
|  65|1493383881|  pv|8233|211132| 15|
|  65|1493383893|  pv|8233|211132| 16|
|  65|1493383931|  pv|8233|211132| 17|
|  65|1493383933|  pv|8233|211132| 18|
|  65|1493383935|  pv|8233|211132| 19|
|  65|1493383969|  pv|8213|271534| 20|
|  65|1493384102|  pv|8233|115717| 21|
+----+----------+----+--

In [122]:
behavior_list_show = df.groupby('user').agg(f.collect_list('cate'), f.collect_list('brand'),f.collect_list('RK'))
behavior_list_show.show()

+----+--------------------+--------------------+--------------------+
|user|  collect_list(cate)| collect_list(brand)|    collect_list(RK)|
+----+--------------------+--------------------+--------------------+
|  65|[6423, 1535, 5144...|[202844, 1933, 22...|[1, 2, 3, 4, 4, 6...|
|  81|[4520, 4282, 3776...|[320394, 320394, ...|[1, 2, 3, 4, 5, 6...|
| 126|[3018, 5944, 7214...|[211795, 193905, ...|[1, 2, 3, 4, 5, 6...|
| 133|[4282, 5480, 6157...|[226577, 301781, ...|[1, 2, 3, 4, 5, 6...|
| 148|[6152, 6152, 6152...|[424202, 252916, ...|[1, 2, 3, 4, 5, 6...|
| 243|[2513, 2513, 2513...|[194103, 194103, ...|[2, 4, 5, 7, 8, 9...|
| 300|[6617, 6617, 6617...|[12389, 12389, 12...|[1, 2, 3, 4, 5, 6...|
| 406|[6423, 6142, 4520...|[323274, 392617, ...|[1, 2, 3, 4, 5, 6...|
| 481|[6300, 978, 856, ...|[233067, 231986, ...|[1, 3, 4, 5, 6, 7...|
| 496|[4520, 4520, 6433...|[21803, 21803, 12...|[1, 2, 3, 4, 5, 6...|
| 540|[5329, 7146, 6177...|[236578, 430125, ...|[1, 2, 3, 4, 5, 6...|
| 597|[6621, 6621, 4

In [123]:
behavior_list_show = behavior_list_show.withColumnRenamed('collect_list(cate)', 'show_cate')
behavior_list_show = behavior_list_show.withColumnRenamed('collect_list(brand)', 'show_brand')
behavior_list_show = behavior_list_show.withColumnRenamed('collect_list(RK)', 'show_RK')
behavior_list_show.show()

+----+--------------------+--------------------+--------------------+
|user|           show_cate|          show_brand|             show_RK|
+----+--------------------+--------------------+--------------------+
|  65|[6423, 1535, 5144...|[202844, 1933, 22...|[1, 2, 3, 4, 4, 6...|
|  81|[4520, 4282, 3776...|[320394, 320394, ...|[1, 2, 3, 4, 5, 6...|
| 126|[3018, 5944, 7214...|[211795, 193905, ...|[1, 2, 3, 4, 5, 6...|
| 133|[4282, 5480, 6157...|[226577, 301781, ...|[1, 2, 3, 4, 5, 6...|
| 148|[6152, 6152, 6152...|[424202, 252916, ...|[1, 2, 3, 4, 5, 6...|
| 243|[2513, 2513, 2513...|[194103, 194103, ...|[2, 4, 5, 7, 8, 9...|
| 300|[6617, 6617, 6617...|[12389, 12389, 12...|[1, 2, 3, 4, 5, 6...|
| 406|[6423, 6142, 4520...|[323274, 392617, ...|[1, 2, 3, 4, 5, 6...|
| 481|[6300, 978, 856, ...|[233067, 231986, ...|[1, 3, 4, 5, 6, 7...|
| 496|[4520, 4520, 6433...|[21803, 21803, 12...|[1, 2, 3, 4, 5, 6...|
| 540|[5329, 7146, 6177...|[236578, 430125, ...|[1, 2, 3, 4, 5, 6...|
| 597|[6621, 6621, 4

In [127]:
save_hive(behavior_list_show, "guide_dien_noclick_behavior_features")

# 生成最终训练数据

In [128]:
rst.show()

+----------+-------+----------+-----------+------+---+-------+---------+------------+-----------------+---------+------------+--------------+----------+---------------------+-------+-----------+--------+------+------+
|adgroup_id|   user|time_stamp|        pid|nonclk|clk| userid|cms_segid|cms_group_id|final_gender_code|age_level|pvalue_level|shopping_level|occupation|new_user_class_level |cate_id|campaign_id|customer| brand| price|
+----------+-------+----------+-----------+------+---+-------+---------+------------+-----------------+---------+------------+--------------+----------+---------------------+-------+-----------+--------+------+------+
|         1| 581738|1494137644|430548_1007|     1|  0| 581738|        0|           8|                1|        2|        null|             3|         0|                 null|   9025|     108570|    1337| 20169|  17.0|
|         3| 449818|1494638778|430548_1007|     1|  0|   null|     null|        null|             null|     null|        null|  

In [129]:
behavior_list_show.show()

+----+--------------------+--------------------+--------------------+
|user|           show_cate|          show_brand|             show_RK|
+----+--------------------+--------------------+--------------------+
|  65|[6423, 1535, 5144...|[202844, 1933, 22...|[1, 2, 3, 4, 4, 6...|
|  81|[4520, 4282, 3776...|[320394, 320394, ...|[1, 2, 3, 4, 5, 6...|
| 126|[3018, 5944, 7214...|[211795, 193905, ...|[1, 2, 3, 4, 5, 6...|
| 133|[4282, 5480, 6157...|[226577, 301781, ...|[1, 2, 3, 4, 5, 6...|
| 148|[6152, 6152, 6152...|[424202, 252916, ...|[1, 2, 3, 4, 5, 6...|
| 243|[2513, 2513, 2513...|[194103, 194103, ...|[2, 4, 5, 7, 8, 9...|
| 300|[6617, 6617, 6617...|[12389, 12389, 12...|[1, 2, 3, 4, 5, 6...|
| 406|[6423, 6142, 4520...|[323274, 392617, ...|[1, 2, 3, 4, 5, 6...|
| 481|[6300, 978, 856, ...|[233067, 231986, ...|[1, 3, 4, 5, 6, 7...|
| 496|[4520, 4520, 6433...|[21803, 21803, 12...|[1, 2, 3, 4, 5, 6...|
| 540|[5329, 7146, 6177...|[236578, 430125, ...|[1, 2, 3, 4, 5, 6...|
| 597|[6621, 6621, 4

In [130]:
behavior_list_cart.show()

+----+--------------------+--------------------+--------------------+
|user|          click_cate|         click_brand|            click_RK|
+----+--------------------+--------------------+--------------------+
|   1|[7971, 7971, 7971...|[353787, 245773, ...|[2, 11, 33, 37, 4...|
|   3|[4603, 6432, 6432...|[151043, 253841, ...|[2, 48, 50, 166, ...|
|   4|[5467, 5467, 5467...|[172369, 172369, ...|[80, 83, 85, 95, ...|
|   6|[6806, 6806, 6251...|[310648, 310648, ...|[2, 10, 18, 38, 5...|
|   8|[1226, 6421, 856,...|[363857, 126738, ...|[8, 30, 48, 50, 6...|
|  11|[6180, 4262, 4262...|[450785, 370203, ...|[32, 50, 51, 54, 92]|
|  12|[531, 8979, 8979,...|[348644, 37621, 3...|[36, 134, 134, 13...|
|  33|[6408, 11203, 112...|[45669, 57959, 57...|[19, 24, 41, 43, ...|
|  35|[4262, 4262, 6428...|[370203, 370203, ...|[1, 35, 59, 311, ...|
|  37|[4281, 6274, 6059...|[314496, 43945, 6...|[35, 77, 161, 166...|
|  60|[6428, 6428, 6428...|[3130, 150395, 10...|[2, 9, 20, 22, 26...|
|  61|[4520, 4283, 8

In [132]:
train_data = rst.join(behavior_list_show, "user", how='left') 
train_data.dtypes

[('user', 'int'),
 ('adgroup_id', 'int'),
 ('time_stamp', 'int'),
 ('pid', 'string'),
 ('nonclk', 'int'),
 ('clk', 'int'),
 ('userid', 'int'),
 ('cms_segid', 'int'),
 ('cms_group_id', 'int'),
 ('final_gender_code', 'int'),
 ('age_level', 'int'),
 ('pvalue_level', 'int'),
 ('shopping_level', 'int'),
 ('occupation', 'int'),
 ('new_user_class_level ', 'int'),
 ('cate_id', 'int'),
 ('campaign_id', 'int'),
 ('customer', 'int'),
 ('brand', 'string'),
 ('price', 'double'),
 ('show_cate', 'array<int>'),
 ('show_brand', 'array<int>'),
 ('show_RK', 'array<int>')]

In [133]:
train_data = train_data.join(behavior_list_cart, "user", how='left') 
train_data.dtypes

[('user', 'int'),
 ('adgroup_id', 'int'),
 ('time_stamp', 'int'),
 ('pid', 'string'),
 ('nonclk', 'int'),
 ('clk', 'int'),
 ('userid', 'int'),
 ('cms_segid', 'int'),
 ('cms_group_id', 'int'),
 ('final_gender_code', 'int'),
 ('age_level', 'int'),
 ('pvalue_level', 'int'),
 ('shopping_level', 'int'),
 ('occupation', 'int'),
 ('new_user_class_level ', 'int'),
 ('cate_id', 'int'),
 ('campaign_id', 'int'),
 ('customer', 'int'),
 ('brand', 'string'),
 ('price', 'double'),
 ('show_cate', 'array<int>'),
 ('show_brand', 'array<int>'),
 ('show_RK', 'array<int>'),
 ('click_cate', 'array<int>'),
 ('click_brand', 'array<int>'),
 ('click_RK', 'array<int>')]

In [134]:
train_data.show()

+----+----------+----------+-----------+------+---+------+---------+------------+-----------------+---------+------------+--------------+----------+---------------------+-------+-----------+--------+------+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|user|adgroup_id|time_stamp|        pid|nonclk|clk|userid|cms_segid|cms_group_id|final_gender_code|age_level|pvalue_level|shopping_level|occupation|new_user_class_level |cate_id|campaign_id|customer| brand| price|           show_cate|          show_brand|             show_RK|          click_cate|         click_brand|            click_RK|
+----+----------+----------+-----------+------+---+------+---------+------------+-----------------+---------+------------+--------------+----------+---------------------+-------+-----------+--------+------+------+--------------------+--------------------+--------------------+--------------------+--------------------+--

In [135]:
save_hive(train_data, "guide_dien_final_train_data")