In [1]:
import os
import sys
# 如果当前代码文件运行测试需要加入修改路径，避免出现后导包问题
BASE_DIR = os.path.dirname(os.path.dirname(os.getcwd()))
sys.path.insert(0, os.path.join(BASE_DIR))

PYSPARK_PYTHON = "/miniconda2/envs/reco_sys/bin/python"
# 当存在多个版本时，不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON

from offline import SparkSessionBase
import pyhdfs
import time


class UpdateUserProfile(SparkSessionBase):
    """离线相关处理程序
    """
    SPARK_APP_NAME = "updateUser"
    ENABLE_HIVE_SUPPORT = True

    SPARK_EXECUTOR_MEMORY = "3g"

    def __init__(self):

        self.spark = self._create_spark_session()

In [2]:
uup = UpdateUserProfile()

In [3]:
uup.spark.sql('use profile')

DataFrame[]

In [4]:
# 读取日志数据，（关联历史日志数据和HIVE表分区）
import pandas as pd
from datetime import datetime

def datelist(startdate, enddate):
    date = [datetime.strftime(d, '%Y-%m-%d') for d in list(pd.date_range(start=startdate, end=enddate))]
    return date

dl = datelist('2019-03-05', time.strftime('%Y-%m-%d', time.localtime()))


pydfs = pyhdfs.HdfsClient(hosts="hadoop-master:50070")
# 循环每个日期进行关联
for d in dl:
    
    # 构造hadoop地址
    _location = '/user/hive/warehouse/profile.db/user_action/' + d
    try:
        if pydfs.exists(_location):
            uup.spark.sql("alter table user_action add partition (dt='%s') location '%s'" % (d, _location))
    except:
        pass

In [5]:
# 如果hadoop没有今天该日期文件，则没有日志数据，结束
time_str = time.strftime("%Y-%m-%d", time.localtime())
_localions = '/user/hive/warehouse/profile.db/user_action/' + time_str
if pydfs.exists(_localions):
    # 如果有该文件直接关联，捕获重复关联异常
    try:
        uup.spark.sql("alter table user_action add partition (dt='%s') location '%s'" % (time_str, _localions))
    except Exception as e:
        pass

else:
    pass

In [6]:
# 首先查看一下user_action中的分区
uup.spark.sql("show partitions user_action").show(50)

+-------------+
|    partition|
+-------------+
|dt=2019-03-05|
|dt=2019-03-06|
|dt=2019-03-07|
|dt=2019-03-08|
|dt=2019-03-09|
|dt=2019-03-10|
|dt=2019-03-11|
|dt=2019-03-12|
|dt=2019-03-13|
|dt=2019-03-14|
|dt=2019-03-15|
|dt=2019-03-16|
|dt=2019-03-17|
|dt=2019-03-18|
|dt=2019-03-19|
|dt=2019-03-20|
|dt=2019-03-21|
|dt=2019-03-22|
|dt=2019-03-23|
|dt=2019-03-24|
|dt=2019-03-25|
|dt=2019-03-26|
|dt=2019-03-27|
|dt=2019-03-28|
|dt=2019-03-29|
|dt=2019-03-30|
|dt=2019-03-31|
|dt=2019-04-01|
|dt=2019-04-02|
|dt=2019-04-03|
|dt=2019-04-04|
|dt=2019-04-05|
|dt=2019-04-06|
|dt=2019-04-07|
|dt=2019-04-08|
|dt=2019-04-09|
|dt=2019-04-10|
+-------------+



In [7]:
# 先删除没有数据的分区，否则直接查询数据会报错
# uup.spark.sql("alter table user_action drop partition(dt>'2019-04-10')")

In [8]:
# 再查看删除分区之后的user_action中的分区
uup.spark.sql("show partitions user_action").show(50)

+-------------+
|    partition|
+-------------+
|dt=2019-03-05|
|dt=2019-03-06|
|dt=2019-03-07|
|dt=2019-03-08|
|dt=2019-03-09|
|dt=2019-03-10|
|dt=2019-03-11|
|dt=2019-03-12|
|dt=2019-03-13|
|dt=2019-03-14|
|dt=2019-03-15|
|dt=2019-03-16|
|dt=2019-03-17|
|dt=2019-03-18|
|dt=2019-03-19|
|dt=2019-03-20|
|dt=2019-03-21|
|dt=2019-03-22|
|dt=2019-03-23|
|dt=2019-03-24|
|dt=2019-03-25|
|dt=2019-03-26|
|dt=2019-03-27|
|dt=2019-03-28|
|dt=2019-03-29|
|dt=2019-03-30|
|dt=2019-03-31|
|dt=2019-04-01|
|dt=2019-04-02|
|dt=2019-04-03|
|dt=2019-04-04|
|dt=2019-04-05|
|dt=2019-04-06|
|dt=2019-04-07|
|dt=2019-04-08|
|dt=2019-04-09|
|dt=2019-04-10|
+-------------+



In [9]:
sqlDF = uup.spark.sql(
"select actionTime, readTime, channelId, param.articleId, param.algorithmCombine, param.action, param.userId from user_action where dt>='2019-04-01'")

In [10]:
sqlDF.show()

+-------------------+--------+---------+-------------------+----------------+------+-------------------+
|         actionTime|readTime|channelId|          articleId|algorithmCombine|action|             userId|
+-------------------+--------+---------+-------------------+----------------+------+-------------------+
|2019-04-08 08:36:07|        |        7|             141437|              C2| click|1114863941936218112|
|2019-04-08 08:36:13|    5152|        7|             141437|              C2|  read|1114863941936218112|
|2019-04-08 08:36:07|        |        7|             141437|              C2| click|1114863941936218112|
|2019-04-08 08:36:13|    5152|        7|             141437|              C2|  read|1114863941936218112|
|2019-04-08 08:36:07|        |        7|             141437|              C2| click|1114863941936218112|
|2019-04-08 08:36:13|    5152|        7|             141437|              C2|  read|1114863941936218112|
|2019-04-08 08:36:07|        |        7|             14

In [11]:
# 转化格式：["user_id", "action_time","article_id", "channel_id", "shared", "clicked", "collected", "exposure", "read_time"]

def _compute(row):
    # 进行判断行为类型
    _list = []
    if row.action == "exposure":
        for article_id in eval(row.articleId):
            _list.append(
                [row.userId, row.actionTime, article_id, row.channelId, False, False, False, True, row.readTime])
        return _list
    else:
        class Temp(object):
            shared = False
            clicked = False
            collected = False
            read_time = ""

        _tp = Temp()
        if row.action == "share":
            _tp.shared = True
        elif row.action == "click":
            _tp.clicked = True
        elif row.action == "collect":
            _tp.collected = True
        elif row.action == "read":
            _tp.clicked = True
        else:
            pass
        _list.append(
            [row.userId, row.actionTime, int(row.articleId), row.channelId, _tp.shared, _tp.clicked, _tp.collected,
             True,
             row.readTime])
        return _list

In [12]:
# 对原user_action数据进行格式转换
res = sqlDF.rdd.flatMap(_compute)

In [13]:
data = res.toDF(["user_id", "action_time","article_id", "channel_id", "shared", "clicked", "collected", "exposure", "read_time"])

In [14]:
data.show()

+-------------------+-------------------+-------------------+----------+------+-------+---------+--------+---------+
|            user_id|        action_time|         article_id|channel_id|shared|clicked|collected|exposure|read_time|
+-------------------+-------------------+-------------------+----------+------+-------+---------+--------+---------+
|1114094806092480512|2019-04-05 17:17:56|1112608068731928576|         0| false|  false|    false|    true|         |
|1114094806092480512|2019-04-05 17:17:56|1112593242529988608|         0| false|  false|    false|    true|         |
|1114094806092480512|2019-04-05 17:17:56|1112566345800613888|         0| false|  false|    false|    true|         |
|1114094806092480512|2019-04-05 17:17:56|1112593324574769152|         0| false|  false|    false|    true|         |
|1114094806092480512|2019-04-05 17:17:56|1112592065390182400|         0| false|  false|    false|    true|         |
|1114094806092480512|2019-04-05 17:17:56|             141440|   

In [15]:
# 合并历史数据，插入表中
old = uup.spark.sql("select * from user_article_basic")
# 由于合并的结果中不是对于user_id和article_id唯一的，一个用户会对文章有多种操作
new_old = old.unionAll(data)

In [16]:
new_old.registerTempTable("temptable")
# 将历史合并数据按照用户，文章分组后存放

# uup.spark.sql(
#         "insert overwrite table user_article_basic select user_id, max(action_time) as action_time, "
#         "article_id, max(channel_id) as channel_id, max(shared) as shared, max(clicked) as clicked, "
#         "max(collected) as collected, max(exposure) as exposure, max(read_time) as read_time from temptable "
#         "group by user_id, article_id")

# 通过这个数据表我们就能够知道用户对某篇文章的历史行为都有哪些
# 注意：数据表user_article_basic中的数据我们已经有了，不需要再去写入存储

In [17]:
# 获取基本用户行为信息，然后进行文章画像的主题词合并
uup.spark.sql("use profile")

DataFrame[]

In [18]:
# 删除日志中的channel_id，因为日志中的频道信息不可靠
# 0号频道是推荐频道，如果给用户推荐的文章属于18号Python频道，但是是在0号频道也就是推荐频道被曝光的，那么这个行为会被记录成对0号频道文章的操作
user_article_basic = uup.spark.sql("select * from user_article_basic").drop('channel_id')

In [19]:
# 读取文章画像，打到用户身上的标签都是主题词
uup.spark.sql('use article')
article_label = uup.spark.sql("select article_id, channel_id, topics from article_profile")

In [20]:
# 合并用户行为数据和文章主题词数据，并使用文章中正确的channel_id
user_topics_list = user_article_basic.join(article_label, how='left', on=['article_id'])

In [21]:
user_topics_list.show()

+-------------------+-------------------+-------------------+------+-------+---------+--------+---------+----------+--------------------+
|         article_id|            user_id|        action_time|shared|clicked|collected|exposure|read_time|channel_id|              topics|
+-------------------+-------------------+-------------------+------+-------+---------+--------+---------+----------+--------------------+
|              13401|                 10|2019-03-06 10:06:12| false|  false|    false|    true|         |        18|[补码, 字符串, 李白, typ...|
|              13401|1106396183141548032|2019-03-28 10:58:20| false|  false|    false|    true|         |        18|[补码, 字符串, 李白, typ...|
|              14805|1105045287866466304|2019-03-11 18:15:48| false|  false|    false|    true|         |        18|[占位符, Code, sep, ...|
|              14805|                  1|2019-03-05 17:34:03| false|  false|    false|    true|         |        18|[占位符, Code, sep, ...|
|              14805|1111524501104

In [22]:
# 展开主题词列表，便于计算主题词权重
import pyspark.sql.functions as F
user_single_topic = user_topics_list.withColumn('topic', F.explode('topics')).drop('topics')

In [23]:
user_single_topic.show()

+----------+-------------------+-------------------+------+-------+---------+--------+---------+----------+--------+
|article_id|            user_id|        action_time|shared|clicked|collected|exposure|read_time|channel_id|   topic|
+----------+-------------------+-------------------+------+-------+---------+--------+---------+----------+--------+
|     13401|                 10|2019-03-06 10:06:12| false|  false|    false|    true|         |        18|      补码|
|     13401|                 10|2019-03-06 10:06:12| false|  false|    false|    true|         |        18|     字符串|
|     13401|                 10|2019-03-06 10:06:12| false|  false|    false|    true|         |        18|      李白|
|     13401|                 10|2019-03-06 10:06:12| false|  false|    false|    true|         |        18|    type|
|     13401|                 10|2019-03-06 10:06:12| false|  false|    false|    true|         |        18|      元素|
|     13401|                 10|2019-03-06 10:06:12| false|  fal

In [27]:
# 计算每个用户对每篇文章的标签的权重

def save_weights(rowpartition):
    """
    处理每个用户对文章的点击数据
    """
    weightsOfaction = {
        "read_min": 1,
        "read_middle": 2,
        "collect": 2,
        "share": 3,
        "click": 5
    }

    import happybase
    from datetime import datetime
    import numpy as np
    import json
    #  用于读取hbase缓存结果配置
    pool = happybase.ConnectionPool(size=10, host='192.168.19.137', port=9090)

    # 读取文章的标签数据
    # 计算权重值
    # 时间间隔
    for row in rowpartition:

        t = datetime.now() - datetime.strptime(row.action_time, '%Y-%m-%d %H:%M:%S')
        # 时间衰减系数
        time_exp = 1 / (np.log(t.days + 1) + 1)

        if row.read_time == '':
            r_t = 0
        else:
            r_t = int(row.read_time)
        # 浏览时间分数
        is_read = weightsOfaction['read_middle'] if r_t > 1000 else weightsOfaction['read_min']

        # 每个词的权重分数
        weigths = time_exp * (
                    row.shared * weightsOfaction['share'] + row.collected * weightsOfaction['collect'] + row.
                    clicked * weightsOfaction['click'] + is_read)

#         with pool.connection() as conn:
#            table = conn.table('user_profile')
#            table.put('user:{}'.format(row.user_id).encode(),
#                      {'partial:{}:{}'.format(row.channel_id, row.topic).encode(): json.dumps(
#                          weigths).encode()})
#            conn.close()

user_single_topic.foreachPartition(save_weights)

In [32]:
# 查询存储结果
import happybase
#  用于读取hbase缓存结果配置
pool = happybase.ConnectionPool(size=10, host='192.168.19.137', port=9090)

with pool.connection() as conn:
    table = conn.table('user_profile')
    # 获取每个键 对应的所有列的结果
    data = table.row(b'user:2', columns=[b'partial'])
    print(data)
    conn.close()

{b'partial:13:\xe4\xba\xba\xe5\xb7\xa5\xe6\x99\xba\xe8\x83\xbd': b'1.1354814310844472', b'partial:13:\xe4\xba\xba\xe7\xb1\xbb': b'1.1354814310844472', b'partial:13:\xe5\x88\x86\xe6\x94\xaf': b'1.1354814310844472', b'partial:13:\xe5\xad\xa6\xe7\xa7\x91': b'1.1354814310844472', b'partial:13:\xe5\xb0\x96\xe7\xab\xaf\xe6\x8a\x80\xe6\x9c\xaf': b'1.1354814310844472', b'partial:13:\xe6\x95\x99\xe6\x8e\x88': b'1.1354814310844472', b'partial:13:\xe6\x99\xba\xe5\x8a\x9b': b'1.1354814310844472', b'partial:13:\xe6\x99\xba\xe8\x83\xbd': b'1.1354814310844472', b'partial:13:ldquo': b'1.1354814310844472', b'partial:18:\xe4\xb8\x96\xe7\x95\x8c': b'0.9688425326486048', b'partial:18:\xe4\xb8\xaa\xe6\x95\xb0': b'0.9620870418999846', b'partial:18:\xe4\xb8\xad\xe6\x8b\xac\xe5\x8f\xb7': b'0.16007622004474806', b'partial:18:\xe4\xbb\xa3\xe6\x8c\x87': b'1.1510499187169054', b'partial:18:\xe4\xbb\xa3\xe7\xa0\x81\xe8\x87\xaa\xe5\x8a\xa8\xe8\xa1\xa5\xe5\x85\xa8': b'1.1224348822166488', b'partial:18:\xe4\xbc\x9a\x

In [38]:
# 将用户的基础信息更新到用户画像中

def update_user_info():
    """
    更新用户的基础信息画像
    :return:
    """
    uup.spark.sql("use toutiao")

    user_basic = uup.spark.sql("select user_id, gender, birthday from user_profile")

    # 更新用户基础信息
    def _udapte_user_basic(partition):
        """更新用户基本信息
        """
        import happybase
        import json
        #  用于读取hbase缓存结果配置
        pool = happybase.ConnectionPool(size=10, host='192.168.19.137', port=9090)
        for row in partition:

            from datetime import date
            age = 0
            if row.birthday != 'null':
                born = datetime.strptime(row.birthday, '%Y-%m-%d')
                today = date.today()
                age = today.year - born.year - ((today.month, today.day) < (born.month, born.day))

            with pool.connection() as conn:
                table = conn.table('user_profile')
                table.put('user:{}'.format(row.user_id).encode(),
                          {'basic:gender'.encode(): json.dumps(row.gender).encode()})
                table.put('user:{}'.format(row.user_id).encode(),
                          {'basic:birthday'.encode(): json.dumps(age).encode()})
                conn.close()

    user_basic.foreachPartition(_udapte_user_basic)

In [39]:
# update_user_info()