案例介绍
- 案例背景：以某大型电商平台的用户行为数据为数据集，使用大数据处理技术分析海量数据下的用户行为特征，并通过建立分类模型对用户行为做出预测；
- 案例思路：
    - 使用大数据处理技术读取海量数据
    - 海量数据预处理
    - 抽取部分数据调试模型
    - 使用海量数据搭建模型

In [1]:
from google.colab import drive
drive.mount('/content/gdrive')


Mounted at /content/gdrive


数据字典：  
U_Id：the serialized ID that represents a user  
T_Id：the serialized ID that represents an item  
C_Id：the serialized ID that represents the category which the corresponding item belongs to  
Ts：the timestamp of the behavior  
Be_type：enum-type from ('pv', 'buy', 'cart', 'fav')  
> pv: Page view of an item's detail page, equivalent to an item click  
> buy: Purchase an item  
> cart: Add an item to shopping cart  
> fav: Favor an item

# 读取数据

这里关键是使用dask库来处理海量数据，它的大多数操作的运行速度比常规pandas等库快十倍左右。

pandas在分析结构化数据方面非常的流行和强大，但是它最大的限制就在于设计时没有考虑到可伸缩性。pandas特别适合处理小型结构化数据，并且经过高度优化，可以对存储在内存中的数据执行快速高效的操作。然而随着数据量的大幅度增加，单机肯定会读取不下的，通过集群的方式来处理是最好的选择。这就是Dask DataFrame API发挥作用的地方：通过为pandas提供一个包装器，可以智能的将巨大的DataFrame分隔成更小的片段，并将它们分散到多个worker(帧)中，**并存储在磁盘中而不是RAM中**。

Dask DataFrame会被分割成多个部门，每个部分称之为一个分区，每个分区都是一个相对较小的DataFrame，可以分配给任意的worker，并在需要复制时维护其完整数据。具体操作就是**对每个分区并行**或单独操作(多个机器的话也可以并行)，然后再将结果合并，其实从直观上也能推出Dask肯定是这么做的。

In [None]:
# 安装库（清华镜像）
# pip install dask -i https://pypi.tuna.tsinghua.edu.cn/simple

In [2]:
!python -m pip install 'fsspec>=0.3.3'
!python -m pip install pyecharts

Collecting fsspec>=0.3.3
  Downloading fsspec-2022.3.0-py3-none-any.whl (136 kB)
[?25l[K     |██▍                             | 10 kB 25.4 MB/s eta 0:00:01[K     |████▉                           | 20 kB 26.4 MB/s eta 0:00:01[K     |███████▎                        | 30 kB 23.1 MB/s eta 0:00:01[K     |█████████▋                      | 40 kB 16.7 MB/s eta 0:00:01[K     |████████████                    | 51 kB 15.1 MB/s eta 0:00:01[K     |██████████████▌                 | 61 kB 17.1 MB/s eta 0:00:01[K     |████████████████▉               | 71 kB 16.8 MB/s eta 0:00:01[K     |███████████████████▎            | 81 kB 15.8 MB/s eta 0:00:01[K     |█████████████████████▊          | 92 kB 17.2 MB/s eta 0:00:01[K     |████████████████████████        | 102 kB 18.4 MB/s eta 0:00:01[K     |██████████████████████████▌     | 112 kB 18.4 MB/s eta 0:00:01[K     |█████████████████████████████   | 122 kB 18.4 MB/s eta 0:00:01[K     |███████████████████████████████▎| 133 kB 18.4 MB

In [3]:
import os
import gc
import pandas as pd
import numpy as np
from tqdm import tqdm
from numba import jit
import dask
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
# from dask.distributed import Client
from pyecharts import options as opts
from pyecharts.charts import HeatMap, Pie, Line, HeatMap
from pyecharts.globals import CurrentConfig, NotebookType, ThemeType
import sys

面对海量数据，跑完一个模块的代码就可以加一行gc.collect()来做内存碎片回收  

In [4]:
gc.collect()

17

Dask Dataframes与Pandas Dataframes具有相同的API

与pandas不同，这里我们仅获取数据框的结构，而不是实际数据框。Dask已将数据帧分为几块加载，这些块存在于磁盘上，而不存在于RAM中。如果必须输出数据帧，则首先需要将所有数据帧都放入RAM，将它们缝合在一起，然后展示最终的数据帧。使用.compute()强迫它这样做，否则它不.compute() 。其实dask使用了一种延迟数据加载机制，这种延迟机制类似于python的迭代器组件，只有当需要使用数据的时候才会去真正加载数据。

In [5]:
import os
os.chdir('/content/gdrive/MyDrive/用户行为')

In [5]:

# 忽略警告提示
import warnings
warnings.filterwarnings('ignore')

CurrentConfig.NOTEBOOK_TYPE = NotebookType.JUPYTER_LAB

fileName = './UserBehavior_all.csv'

dtypes = {'U_Id':'uint32',
         'T_Id':'uint32',
         'C_Id':'uint32',
         'Be_type':'object',
         'Ts':'int64'
         }


switchDict = {
    0 : 'TEST',
    1 : 'ALL'
}

# 编译数据量状态开关 0为测试（约170万条数据），1为全量（约1亿条数据）
status = switchDict[0]


def importData(fileName):
    if status == 'TEST':
        df = dd.read_csv(fileName, header=0, blocksize="64MB", dtype=dtypes).head(2000000)
    else:
        df = dd.read_csv(fileName, header=0, blocksize="64MB", dtype=dtypes).compute()
    df.index = pd.RangeIndex(start=0, stop=len(df))
    return df
with ProgressBar():
    userBehaviorDf = importData(fileName)
# userBehaviorDf = dd.read_csv(fileName, header=None, blocksize="64MB", dtype=dtypes)
# userBehaviorDf.head()

[########################################] | 100% Completed |  0.8s


In [6]:
userBehaviorDf.U_Id

0               1
1               1
2               1
3               1
4               1
            ...  
1735282    162205
1735283    162205
1735284    162205
1735285    162205
1735286    162205
Name: U_Id, Length: 1735287, dtype: uint32

In [7]:
# userBehaviorDf = userBehaviorDf.drop(1, axis=1)

columnsDict = {'U_Id':'用户ID', 'C_Id':'商品类目ID', 'Be_type':'行为类型', 'Ts':'时间'}
userBehaviorDf = userBehaviorDf.rename(columns=columnsDict)

In [8]:
userBehaviorDf.drop(columns="T_Id",axis=1,inplace=True)

In [9]:
userBehaviorDf.head()

Unnamed: 0,用户ID,商品类目ID,行为类型,时间
0,1,2520377,pv,1511544070
1,1,2520771,pv,1511561733
2,1,149192,pv,1511572885
3,1,4181361,pv,1511593493
4,1,2520377,pv,1511596146


# 数据预处理

## 数据压缩

In [None]:
types = {'U_Id':'uint32',
         'T_Id':'uint32',
         'C_Id':'uint32',
         'Be_type':'object',
         'Ts':'int64'
}


## 缺失值

In [None]:
userBehaviorDf.isnull().sum()

Dask Series Structure:
npartitions=1
商品类目ID    int64
行为类型        ...
dtype: int64
Dask Name: dataframe-sum-agg, 465 tasks

In [None]:

def computeIsNA(df, colNameList):
    tmpDf = pd.DataFrame(columns=colNameList)
    for i in range(0, len(colNameList)):
        s = df[colNameList[i]].isna()
        tmpDf[colNameList[i]] = s.loc[s == True]
        print(colNameList[i] + '列缺失值数目为%d'%(tmpDf[colNameList[i]].count()))
    return tmpDf        
computeIsNA(userBehaviorDf, ['用户ID', '商品类目ID', '行为类型', '时间'])


用户ID列缺失值数目为0
商品类目ID列缺失值数目为0
行为类型列缺失值数目为0
时间列缺失值数目为0


Unnamed: 0,用户ID,商品类目ID,行为类型,时间


## 数据探索与可视化

除了matplotlib和seaborn，我们还可以使用pyecharts库。  
pyecharts是一款将python与百度开源的echarts结合的数据可视化工具。新版的1.X和旧版的0.5.X版本代码规则大不相同，新版详见官方文档https://gallery.pyecharts.org/#/README

In [None]:
!pip install pyecharts 

Collecting pyecharts
  Downloading pyecharts-1.9.1-py3-none-any.whl (135 kB)
[?25l[K     |██▍                             | 10 kB 18.4 MB/s eta 0:00:01[K     |████▉                           | 20 kB 24.8 MB/s eta 0:00:01[K     |███████▎                        | 30 kB 13.2 MB/s eta 0:00:01[K     |█████████▋                      | 40 kB 10.0 MB/s eta 0:00:01[K     |████████████                    | 51 kB 6.4 MB/s eta 0:00:01[K     |██████████████▌                 | 61 kB 7.6 MB/s eta 0:00:01[K     |█████████████████               | 71 kB 7.1 MB/s eta 0:00:01[K     |███████████████████▎            | 81 kB 6.8 MB/s eta 0:00:01[K     |█████████████████████▊          | 92 kB 7.6 MB/s eta 0:00:01[K     |████████████████████████▏       | 102 kB 8.2 MB/s eta 0:00:01[K     |██████████████████████████▋     | 112 kB 8.2 MB/s eta 0:00:01[K     |█████████████████████████████   | 122 kB 8.2 MB/s eta 0:00:01[K     |███████████████████████████████▍| 133 kB 8.2 MB/s eta 0:00:0

### 饼图

In [10]:
gc.collect()

81

In [11]:
userBehaviorDf.head()

Unnamed: 0,用户ID,商品类目ID,行为类型,时间
0,1,2520377,pv,1511544070
1,1,2520771,pv,1511561733
2,1,149192,pv,1511572885
3,1,4181361,pv,1511593493
4,1,2520377,pv,1511596146


In [12]:
'''
使用get_dummies进行one-hot编码，产生虚拟变量（dummy variables），列名前缀是行为类型
'''
behaviorDf = dd.from_pandas(pd.DataFrame(), npartitions=4)
behaviorDf = dd.get_dummies(userBehaviorDf.行为类型, prefix='行为类型')

# 

In [13]:
behaviorDf.head()

Unnamed: 0,行为类型_buy,行为类型_cart,行为类型_fav,行为类型_pv
0,0,0,0,1
1,0,0,0,1
2,0,0,0,1
3,0,0,0,1
4,0,0,0,1


In [14]:
userBehaviorDf = dd.concat([userBehaviorDf, behaviorDf], axis=1)

In [15]:
userBehaviorDf

Unnamed: 0_level_0,用户ID,商品类目ID,行为类型,时间,行为类型_buy,行为类型_cart,行为类型_fav,行为类型_pv
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
0,uint32,uint32,object,int64,uint8,uint8,uint8,uint8
1735286,...,...,...,...,...,...,...,...


In [16]:
del behaviorDf
gc.collect()

34

In [17]:


with ProgressBar():
    catGb = userBehaviorDf.groupby('商品类目ID').行为类型_buy.sum().compute()

catGb.sort_values(ascending=False, inplace=True, na_position='first', kind='mergesort')

display_num = 20
catBuyIndexList = catGb.index.values[0:display_num].tolist()
catBuyCountList = np.array(catGb.values[0:display_num]).tolist()
lst = [list(z) for z in zip(catBuyIndexList, catBuyCountList)]

pie = Pie(init_opts=opts.InitOpts(theme=ThemeType.DARK))
pie.add("", lst, radius=["30%", "75%"], center=[
        "50%", "50%"], rosetype="radius", label_opts=opts.LabelOpts(is_show=False))
pie.set_global_opts(title_opts=opts.TitleOpts(title="不同类目商品的订单数", pos_top='1%', pos_left='2%'),
                    legend_opts=opts.LegendOpts(
    orient="vertical", pos_top="15%", pos_left="2%"
    ))
pie.set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c}"))
pie.load_javascript()

pie.render_notebook()

[########################################] | 100% Completed |  0.1s


In [18]:
# 申请回收无用变量
del catGb
gc.collect()

84

### 漏斗图

### 涟漪散点图

### 象形柱图

### 多维散点图

### 时间轴

### 词云图

# 时间戳问题

dask对于时间戳的支持非常不友好

In [19]:
userBehaviorDf.时间

Dask Series Structure:
npartitions=1
0          int64
1735286      ...
Name: 时间, dtype: int64
Dask Name: getitem, 8 tasks

In [20]:
# 筛选出数据集实际时间区间的数据
orderTimeDf = userBehaviorDf.copy()
orderTimeDf = userBehaviorDf.loc[(userBehaviorDf.时间 <= 1512316799) 
                         & (userBehaviorDf.时间 >= 1511539200)]

# 申请回收无用变量
# del userBehaviorDf
# gc.collect()

def conv(timestamp):
    if timestamp >= 0:
        return pd.datetime.fromtimestamp(timestamp)
    else:
        return pd.datetime(1970, 1, 1, 8, 0, 0) - (pd.datetime.fromtimestamp(-timestamp) - pd.datetime(1970, 1, 1, 8, 0, 0))

orderTimeDf.时间 = orderTimeDf.时间.map(conv)

orderTimeDf.时间 = (orderTimeDf.时间.astype(str)).map(lambda x: x.split(':')[0])

timeDf = orderTimeDf.groupby(orderTimeDf.时间).行为类型_buy.sum().compute()
dateList = timeDf.index.values.tolist()
line = (Line(init_opts=opts.InitOpts(theme=ThemeType.DARK))
        .add_xaxis(dateList)
        .add_yaxis("淘宝用户", np.array(timeDf.values).tolist(), areastyle_opts=opts.AreaStyleOpts(opacity=0.5),
                   markpoint_opts=opts.MarkPointOpts(
                       data=[opts.MarkPointItem(type_="max"), opts.MarkPointItem(type_="min")]),
                   )
        .set_global_opts(title_opts=opts.TitleOpts(title="订单数与时间关系图", pos_top='1%', pos_left='2%'),
                         xaxis_opts=opts.AxisOpts(
            axislabel_opts=opts.LabelOpts(rotate=-45)),
    datazoom_opts=opts.DataZoomOpts(),
))
line.load_javascript()

line.render_notebook()

In [70]:
orderTimeDf.columns

Index(['用户ID', '商品类目ID', '行为类型', '时间', '行为类型_buy', '行为类型_cart', '行为类型_fav',
       '行为类型_pv'],
      dtype='object')

# 特征工程

思路：不考虑时间窗口，只以用户的点击和收藏等行为来预测是否购买  
流程：以用户ID(U_Id)为分组键，将每位用户的点击、收藏、加购物车的行为统计出来，分别为
> 是否点击，点击次数；是否收藏，收藏次数；是否加购物车，加购物车次数  

以此来预测最终是否购买

In [None]:
# 去掉时间戳


## 行为类型

In [71]:
from collections import Counter   # 计数器包

In [72]:
orderTimeDf['小时'] = (orderTimeDf.时间.astype(str)).map(lambda x: int(x.split(' ')[1]), meta=('x', str))

moment_mapDict = {
                    '0' : "0-2",
                    '1' : "0-2",
                    '2' : "0-2",
                    '3' : "3-5",
                    '4' : "3-5",
                    '5' : "3-5",
                    '6' : "6-8",
                    '7' : "6-8",
                    '8' : "6-8",
                    '9' : "9-11",
                    '10' : "9-11",
                    '11' : "9-11",
                    '12' : "12-14",
                    '13' : "12-14",
                    '14' : "12-14",
                    '15' : "15-17",
                    '16' : "15-17",
                    '17' : "15-17",
                    '18' : "18-20",
                    '19' : "18-20",
                    '20' : "18-20",
                    '21' : "21-23",
                    '22' : "21-23",
                    '23' : "21-23",
                    }

orderTimeDf['时间段'] = (orderTimeDf.小时.astype(str)).map(moment_mapDict)

momentDf = dd.from_pandas(pd.Series(np.array(orderTimeDf.时间段).tolist(), dtype='category'), npartitions=4)
momentDf = dd.get_dummies(momentDf, prefix='时间段')

orderTimeDf = dd.concat([orderTimeDf, momentDf], axis=1)

del momentDf
gc.collect()

#删掉时间段这一列
orderTimeDf = orderTimeDf.drop('时间段',axis=1)

with ProgressBar():
    buyDf = orderTimeDf.groupby(['用户ID', '商品类目ID'])['时间段_0-2', '时间段_3-5', '时间段_6-8', '时间段_9-11', 
    '时间段_12-14', '时间段_15-17', '时间段_18-20', '时间段_21-23', '行为类型_buy', '行为类型_cart', 
    '行为类型_fav', '行为类型_pv'].sum().compute()

corrDf = buyDf.corr()
corrDf

[########################################] | 100% Completed | 14.8s


Unnamed: 0,时间段_0-2,时间段_3-5,时间段_6-8,时间段_9-11,时间段_12-14,时间段_15-17,时间段_18-20,时间段_21-23,行为类型_buy,行为类型_cart,行为类型_fav,行为类型_pv
时间段_0-2,1.0,0.307661,0.285823,0.250273,0.270312,0.197723,0.067791,0.172921,0.119839,0.274915,0.190883,0.573921
时间段_3-5,0.307661,1.0,0.33614,0.291728,0.305168,0.215149,0.084771,0.147179,0.124658,0.294231,0.197398,0.627827
时间段_6-8,0.285823,0.33614,1.0,0.315593,0.302606,0.205705,0.085029,0.153476,0.128325,0.293736,0.198981,0.638137
时间段_9-11,0.250273,0.291728,0.315593,1.0,0.339844,0.197155,0.081636,0.15749,0.1309,0.287179,0.20408,0.622774
时间段_12-14,0.270312,0.305168,0.302606,0.339844,1.0,0.266352,0.08914,0.181829,0.149322,0.338782,0.220977,0.704764
时间段_15-17,0.197723,0.215149,0.205705,0.197155,0.266352,1.0,0.116057,0.109532,0.11329,0.237902,0.171825,0.514759
时间段_18-20,0.067791,0.084771,0.085029,0.081636,0.08914,0.116057,1.0,0.123386,0.042665,0.10937,0.066429,0.234137
时间段_21-23,0.172921,0.147179,0.153476,0.15749,0.181829,0.109532,0.123386,1.0,0.075064,0.176343,0.127835,0.357005
行为类型_buy,0.119839,0.124658,0.128325,0.1309,0.149322,0.11329,0.042665,0.075064,1.0,0.12772,0.04201,0.160692
行为类型_cart,0.274915,0.294231,0.293736,0.287179,0.338782,0.237902,0.10937,0.176343,0.12772,1.0,0.03537,0.396577


### 点击次数

In [73]:
value = [[i, j, round(corrDf.iloc[i][j] * 100, 2)] for i in range(12) for j in range(12)]

heatMap = (HeatMap(init_opts=opts.InitOpts(theme=ThemeType.DARK)).add_xaxis(corrDf.index.values.tolist())
        .add_yaxis("淘宝用户", corrDf.index.values.tolist(), value)
        .set_global_opts(
            title_opts=opts.TitleOpts(title="时间段与行为相关性热力图"),
            visualmap_opts=opts.VisualMapOpts(),
            xaxis_opts=opts.AxisOpts(
            axislabel_opts=opts.LabelOpts(rotate=-45))
        ))

heatMap.load_javascript()

heatMap.render_notebook()

### 加购次数

### 收藏次数

## 相关分析

是否加购与加购次数、是否收藏与收藏次数之间存在一定相关性，但经验证剔除其中之一与纳入全部变量效果基本一致，故之后使用全部变量建模。

## 数据标签

In [None]:
import seaborn as sns

In [None]:
#是否购买（Label）


# 建立模型

## 划分数据集

In [74]:
from sklearn.model_selection import train_test_split

In [76]:
buyDf_X = buyDf.drop('行为类型_buy',axis=1)

sourceRow = len(buyDf)

#原始数据集：特征
source_X = buyDf_X.iloc[0 : (round(sourceRow * 0.8) - 1), :]
#原始数据集：标签
source_y = buyDf.iloc[0 : (round(sourceRow * 0.8) - 1)].行为类型_buy

#预测数据集：特征
pred_X = buyDf_X.iloc[round(sourceRow * 0.8):, :]

del buyDf
del buyDf_X
gc.collect()

from sklearn.model_selection import train_test_split 

#建立模型用的训练数据集和测试数据集
train_X, test_X, train_y, test_y = train_test_split(source_X ,
                                                    source_y,
                                                    train_size=.8)


## 分类模型

### 模型建立

In [77]:
from sklearn.linear_model import LogisticRegression

In [78]:
model = LogisticRegression()

In [79]:
#简单测试
model.fit(train_X, train_y)

LogisticRegression()

### 模型评估

In [None]:
from sklearn import metrics
from sklearn.metrics import classification_report
from sklearn.metrics import auc,roc_curve

In [80]:
#分类报告
model.score(test_X, test_y)

0.9915092782886545

In [None]:
#roc曲线
