# 离线数据获取

In [None]:
import pandas as pd
import numpy as np
from importlib import reload
from sklearn.utils import shuffle
from pyspark.sql import SparkSession
import gensim
from pyspark.sql.types import *
from pyspark.sql.types import LongType, ArrayType
from pyspark.sql.functions import udf
from pyspark.sql import Row
import json
from time import time
import subprocess
from pyspark.sql.types import StructType, StructField, LongType, StringType, FloatType
import os
from pyspark.sql.functions import monotonically_increasing_id
import datetime
import redis


pd.set_option('display.max_columns', None)

os.environ['PYSPARK_PYTHON'] = '/usr/local/anaconda3/bin/python3.6'

spark = (SparkSession
         .builder
         .appName('pi')
         .enableHiveSupport()
         .config('spark.sql.adaptive.enabled', False)
         .config('spark.driver.allowMultipleContexts', True)
         .config('spark.executor.instances', '500')
         .config('spark.executor.memory', '16g')
         .config('spark.executor.cores', '5')
         .config('spark.driver.memory', '20g')
         .config('spark.driver.maxResultSize', '32g')
         .config('spark.sql.shuffle.partitions', '1000')
         .config('spark.yarn.appMasterEnv.yarn.nodemanager.container-executor.class', 'DockerLinuxContainer')
         .config('spark.executorEnv.yarn.nodemanager.container-executor.class', 'DockerLinuxContainer')
         .config('spark.yarn.appMasterEnv.yarn.nodemanager.docker-container-executor.image-name',
                 'bdp-docker.jd.com:5000/wise_mart_bag:latest')
         .config('spark.executorEnv.yarn.nodemanager.docker-container-executor.image-name',
                 'bdp-docker.jd.com:5000/wise_mart_bag:latest')
         .config('spark.sql.execution.arrow.enabled', 'true')
         .config('spark.rpc.message.maxSize', '512')
         .config('spark.default.parallelism', '2000')
         .config('spark.sql.orc.impl', 'hive')
         .config('spark.sql.orc.enableVectorizedReader', 'true')
         .config('spark.sql.hive.convertMetastoreOrc', 'true')
         .config('spark.jars','hdfs://ns1005/user/mart_cd_shtech_base/geshenyang/spark-tensorflow-connector_2.11-1.10.0.jar')
         .getOrCreate())
spark.sparkContext.setLogLevel("WARN")
print(spark.sparkContext.applicationId)

In [6]:
sql = '''
SELECT
    *,
    ROW_NUMBER() over(partition BY sid order by jdqwritetime ASC) AS rns
FROM
    (
        SELECT
            t0.erp_acct_no,
            t0.entry_dt,
            t0.emply_name,
            t0.gender_desc,
            t0.dept_full_name,
            t1.im_num
        FROM
            (
                SELECT
                    erp_acct_no,
                    entry_dt,
                    emply_name,
                    gender_desc,
                    dept_full_name
                FROM
                    gdm.gdm_m02_emply_org_da
                WHERE
                    dt = sysdate( - 1)
                    AND emply_status_cd = '1'
                    AND work_type_cd = 'J'
                    AND entry_dt < '2022-06-01'
                    AND (dept_full_name LIKE '京东集团-京东零售-客户体验与服务部-标准业务服务部-标准业务成都交付部-手机通讯在线服务部%' --  145人
                    OR dept_full_name LIKE '京东集团-京东零售-客户体验与服务部-标准业务服务部-标准业务宿迁交付部-消费品在线服务部%' -- 405人
                    OR dept_full_name LIKE '京东集团-京东零售-客户体验与服务部-标准业务服务部-标准业务成都交付部-家电在线服务部%' --  174人
                    OR dept_full_name LIKE '京东集团-京东零售-客户体验与服务部-标准业务服务部-标准业务宿迁交付部-时尚居家在线服务部%' --  150人
                    OR dept_full_name LIKE '京东集团-京东零售-客户体验与服务部-标准业务服务部-标准业务成都交付部-电脑办公在线服务部%') --  260人
                    -- 共计 1134 人
            )
            t0
        INNER JOIN
            (
                SELECT
                    im_1 AS im_num,
                    erp
                FROM
                    app.mart_dim_agent_information_sys
                WHERE
                    dt = sysdate( - 1)

                UNION ALL

                SELECT
                    im_2 AS im_num,
                    erp
                FROM
                    app.mart_dim_agent_information_sys
                WHERE
                    dt = sysdate( - 1)

                UNION ALL

                SELECT
                    im_3 AS im_num,
                    erp
                FROM
                    app.mart_dim_agent_information_sys
                WHERE
                    dt = sysdate( - 1)
            )
            t1
        ON
            t0.erp_acct_no = t1.erp
    )
    n0
INNER JOIN
    (
        SELECT
            chattype, -- 聊天类型
            content, -- 聊天内容
            customer, -- 顾客pin
            -- groupid, -- 技能组id
            sid, -- 会话ID
            waiter, -- 客服pin
            waitersend, -- 是否客服或顾客发送
            jdqwritetime -- jdq写入时间 
        FROM
            app.app_chat_message
        WHERE
            -- dt = '2022-11-11'
            dt = '2023-01-30'
            -- dt >= '2022-08-01' 
            -- AND dt <= '2022-08-31' 
            -- dt >= '2022-12-20' 
            -- AND dt <= '2022-12-25' 
            AND chattype = 'SERVICE' -- SERVICE人工/JIMI机器人
            -- AND waitersend = 'WAITER' -- WAITER客服 / CUSTOMER客户
            AND type = 'TEXT' -- TEXT/TEMPLATE2
            AND content <> ''
    )
    n1
ON
    n0.im_num = n1.waiter
'''
spark_data = spark.sql(sql)

In [7]:
df_data = spark_data.toPandas()

  PyArrow >= 0.8.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.fallback.enabled' is set to true.


In [8]:
df_data.shape

(597308, 14)

In [14]:
sid_list = list(set(list(df_data.sid)))
len(sid_list)

33950

In [23]:
# 求分位数函数：
import numpy as np

In [28]:
print(np.percentile(list(df_data.sid.value_counts().values),10)) # 一通对话的对话长度的中位数是2
print(np.percentile(list(df_data.sid.value_counts().values),15)) # 一通对话的对话长度的中位数是3
print(np.percentile(list(df_data.sid.value_counts().values),20)) # 一通对话的对话长度的中位数是5
print(np.percentile(list(df_data.sid.value_counts().values),25)) # 一通对话的对话长度的中位数是6
print(np.percentile(list(df_data.sid.value_counts().values),50)) # 一通对话的对话长度的中位数是14
print(np.percentile(list(df_data.sid.value_counts().values),75)) # 一通对话的对话长度的中位数是24
print(np.percentile(list(df_data.sid.value_counts().values),90)) # 一通对话的对话长度的中位数是36

2.0
3.0
5.0
6.0
14.0
24.0
36.0


In [63]:
sid_message_list = []

for sid in sid_list:
    temp = df_data[df_data.sid == sid]
    temp_len = temp.shape[0]
    if temp_len >= 4 and temp_len <= 36:
        sid_message_list.append(temp)

## 数据清洗

* 已实现：只有客户会话的数据剔除。
* 未实现：包含订单号的会话剔除。例如：咨询订单号：260508534633 商品ID：100052268905。

In [64]:
sid_message_list_last = []
invalid_sid = []
for sid_message in sid_message_list:
    if len(sid_message.waitersend.value_counts().keys()) == 2:
        sid_message_list_last.append(sid_message)
    else:
        invalid_sid.append(sid_message)
print(len(sid_message_list))
print(len(sid_message_list_last))

25538
24795


In [71]:
len(invalid_sid)

743

In [75]:
invalid_sid[-100]

Unnamed: 0,erp_acct_no,entry_dt,emply_name,gender_desc,dept_full_name,im_num,chattype,content,customer,sid,waiter,waitersend,jdqwritetime,rns
55036,wangzilong31,2020-04-08,王子龙,男,京东集团-京东零售-客户体验与服务部-标准业务服务部-标准业务宿迁交付部-消费品在线服务部-...,sq-243483-wangzilong,SERVICE,用户发起转人工,小歪kama,8d6afd84-cc9b-4d0b-a00b-c3c349f5b26b,sq-243483-wangzilong,CUSTOMER,2023-01-30 21:29:14.917,1
55037,wangzilong31,2020-04-08,王子龙,男,京东集团-京东零售-客户体验与服务部-标准业务服务部-标准业务宿迁交付部-消费品在线服务部-...,sq-243483-wangzilong,SERVICE,12个灭火器,小歪kama,8d6afd84-cc9b-4d0b-a00b-c3c349f5b26b,sq-243483-wangzilong,CUSTOMER,2023-01-30 21:30:14.860,2
55038,wangzilong31,2020-04-08,王子龙,男,京东集团-京东零售-客户体验与服务部-标准业务服务部-标准业务宿迁交付部-消费品在线服务部-...,sq-243483-wangzilong,SERVICE,全给我放菜鸟,小歪kama,8d6afd84-cc9b-4d0b-a00b-c3c349f5b26b,sq-243483-wangzilong,CUSTOMER,2023-01-30 21:30:15.127,3
55039,wangzilong31,2020-04-08,王子龙,男,京东集团-京东零售-客户体验与服务部-标准业务服务部-标准业务宿迁交付部-消费品在线服务部-...,sq-243483-wangzilong,SERVICE,客服也没有的吗,小歪kama,8d6afd84-cc9b-4d0b-a00b-c3c349f5b26b,sq-243483-wangzilong,CUSTOMER,2023-01-30 21:31:44.969,4
55040,wangzilong31,2020-04-08,王子龙,男,京东集团-京东零售-客户体验与服务部-标准业务服务部-标准业务宿迁交付部-消费品在线服务部-...,sq-243483-wangzilong,SERVICE,还专属客服,小歪kama,8d6afd84-cc9b-4d0b-a00b-c3c349f5b26b,sq-243483-wangzilong,CUSTOMER,2023-01-30 21:32:14.895,5


# ES数据存储/删除/查询

In [36]:
import pandas as pd
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import sys
import time
from datetime import datetime,timedelta
import json

## ES配置

In [37]:
# 集群名称
user = "jiesi-jdos-cs-rec"
# 集群密码
pss = "227DB7650798709B"
# 链接信息
hostArr = ["prod-4-40000-jiesi-jdos-cs-rec.jd.com:40000","prod-1-40000-jiesi-jdos-cs-rec.jd.com:40000","prod-2-40000-jiesi-jdos-cs-rec.jd.com:40000"]

In [38]:
es = Elasticsearch(
    hostArr,
    http_auth=(user, pss)
)

## 创建索引

In [303]:
# 索引名称
index_name='speechcraft_recommend'

In [304]:
mapping = '''
{
  "settings": {
        "index": {
            "number_of_shards": "4",
            "analysis": {
                "analyzer": {
                    "semi_analyzer": {
                        "pattern": ";",
                        "type": "pattern"
                    },
                    "comma_analyzer": {
                        "pattern": "@",
                        "type": "pattern"
                    }
                }
            },
            "number_of_replicas": "1"
        }
    },
  "mappings": {
    "properties": {
        "preMessage":{"type":"text","analyzer": "comma_analyzer"},
        "lastText":{"type":"text","analyzer": "comma_analyzer"},
        "preMessageString":{"type":"text"},
        "lastTextString":{"type":"text"},
        "nextTextString":{"type":"text"},
        "waiter":{"type":"text"}
    }
  }
}
'''
es.indices.create(index=index_name, ignore=400, body=mapping)

{'acknowledged': True,
 'shards_acknowledged': True,
 'index': 'speechcraft_recommend'}

In [305]:
# 分片名称
alias_name = 'cs_speechcraft_recommend'

In [306]:
es.indices.put_alias(index=index_name, name=alias_name)

{'acknowledged': True}

## 删除索引

In [302]:
es.indices.delete(index=index_name, ignore=[400, 404])

{'acknowledged': True}

## 单条写入数据

In [234]:
import jieba
import warnings
warnings.filterwarnings("ignore")

In [235]:
def process_data_to_db(df):
    content_list = list(df.content)
    waitersend_list = list(df.waitersend)
    sid_len = len(waitersend_list)
    pre_list = []
    cur_list = []
    pre_string_list = []
    cur_string_list = []
    next_string_list = []
    
    for i in range(1,sid_len):
        if waitersend_list[i] == 'WAITER' and waitersend_list[i-1] != 'WAITER':
            pre_list.append('@'.join(list(jieba.cut(''.join(content_list[:i-1])))))
            cur_list.append('@'.join(list(jieba.cut(content_list[i-1]))))
            pre_string_list.append(';'.join(content_list[:i-1]))
            cur_string_list.append(content_list[i-1])
            next_string_list.append(content_list[i])
    
    df_new = pd.DataFrame()
    df_new['preMessage'] = pre_list
    df_new['lastText'] = cur_list
    df_new['preMessageString'] = pre_string_list
    df_new['lastTextString'] = cur_string_list
    df_new['nextTextString'] = next_string_list
    df_new['waiter'] = list(df.waiter)[0]
    return df_new

In [236]:
len(sid_message_list_last)

24795

In [307]:
df_db_list = []
for sid_message in sid_message_list_last:
    df_db_list.append(process_data_to_db(sid_message))

In [308]:
df_db = pd.concat(df_db_list,axis = 0)
df_db.shape

(121411, 6)

In [309]:
df_db.head()

Unnamed: 0,preMessage,lastText,preMessageString,lastTextString,nextTextString,waiter
0,,我要@转@人工,,我要转人工,您好，很高兴为您服务，小妹会尽力帮助您的(*^▽^*),cd-268123-guoyao13
1,我要@转@人工@您好@，@很@高兴@为您服务@，@小妹@会@尽力@帮助@您@的@(@*@^@...,我@的@问题@还是@没@解决@啊,我要转人工;您好，很高兴为您服务，小妹会尽力帮助您的(*^▽^*),我的问题还是没解决啊,辛苦您将购物时的订单号提供给小妹一下哦，小妹查询一下解答您的问题哦,cd-268123-guoyao13
2,我要@转@人工@您好@，@很@高兴@为您服务@，@小妹@会@尽力@帮助@您@的@(@*@^@...,（@258352391623@）,我要转人工;您好，很高兴为您服务，小妹会尽力帮助您的(*^▽^*) ;我的问题还是没解决啊;...,（258352391623）,小妹马上为您查询~,cd-268123-guoyao13
3,我要@转@人工@您好@，@很@高兴@为您服务@，@小妹@会@尽力@帮助@您@的@(@*@^@...,对,我要转人工;您好，很高兴为您服务，小妹会尽力帮助您的(*^▽^*) ;我的问题还是没解决啊;...,对,是保温杯吗,cd-268123-guoyao13
4,我要@转@人工@您好@，@很@高兴@为您服务@，@小妹@会@尽力@帮助@您@的@(@*@^@...,嗯,我要转人工;您好，很高兴为您服务，小妹会尽力帮助您的(*^▽^*) ;我的问题还是没解决啊;...,嗯,有联系店铺客服晒单登记了吗,cd-268123-guoyao13


In [310]:
df_db.head().lastText

0             我要@转@人工
1    我@的@问题@还是@没@解决@啊
2    （@258352391623@）
3                   对
4                   嗯
Name: lastText, dtype: object

In [311]:
df_db = df_db.reset_index(drop=True)

In [242]:
# from decimal import *
for index, row in df_db.iterrows():
    doc = {
        "preMessage" :row["preMessage"],
        "lastText" :row["lastText"],
        "preMessageString" :row["preMessageString"],
        "lastTextString":row["lastTextString"],
        "nextTextString" :row["nextTextString"],
        "waiter" :row["waiter"]
    }
    res = es.index(index=index_name, id=f"{index}", body=doc)

In [313]:
df_db.shape

(121411, 6)

## 批量写入数据

In [321]:
import json
from decimal import *
i = 0
k = 0
actions = []
for item in (df_db.iterrows()):
    data = {}
    i += 1
    for col in df_db.columns:
        data[col] = item[1][col]
    action = {
                "_index": index_name,
                "_type": "_doc",
                "_id": item[0],
                "_source": data
              }
    
    actions.append(action)
    if i == 3000:
        try:
            blk=helpers.bulk(es, actions,request_timeout=20)
            i = 0
            actions = []
            k += 1
        except:
            traceback.print_exc()
            print(action)
            pass
try:
    blk=helpers.bulk(es, actions,request_timeout=20)
    k += 1
except:
    traceback.print_exc()
print((k-1)*3000 + i)

41


## 读取数据

In [None]:
# term:明确的值
# match:知道分词器的存在,模糊查询，只能指定一个字段查询
# multi_match：可以指定多个查询字段，同match一样，query会对field进行分词操作，然后再查询
# query:
# "_source":["name","interest"],  //查询结果只展示name和interest字段
#   "_source":{
#     "includes":["name","interest"],  // 包含哪些字段
#     "excludes":["company"]} //排除哪些字段，优先级高于includes

# must:必须满足
# must_not:必须不满足 不计算相关度分数
# should:可能满足

In [385]:
query = {
    "query":
    {
        "bool":
        {
            "must":[],
            "must_not":[],
            "should":
            [
                {
                    "match":
                    {
                        "lastText":
                        {
                            "analyzer": "comma_analyzer",
                            "query": "转@人工",
                            "operator": "OR",
                            "zero_terms_query": "NONE",
                            "minimum_should_match":1,
                            "boost": 5
                        }
                    }
                },
                {
                    "match":
                    {
                        "preMessage":
                        {
                            "analyzer": "comma_analyzer",
                            "query": "我要@转@人工",
                            "operator": "OR",
                            "zero_terms_query": "NONE",
                            "minimum_should_match":1,
                            "boost": 2
                        }
                    }
                }
            ]
        }
    },
    "from":0,
    "size":1,
    "sort":[],
    "aggs":{},
    "_source":['nextTextString','waiter']
    #"explain": True
}

In [386]:
#es查询
res = es.search(index=index_name,body=query,search_type='dfs_query_then_fetch')

In [387]:
print(len(res['hits']['hits']))
res['hits']['hits']

1


[{'_index': 'speechcraft_recommend',
  '_type': '_doc',
  '_id': '116000',
  '_score': 43.383366,
  '_source': {'nextTextString': '您好，在的', 'waiter': 'sq-220706-yuzhen41'}}]

In [None]:
[{'_index': 'speechcraft_recommend',
  '_type': '_doc',
  '_id': '116000',
  '_score': 43.383366,
  '_source': {'nextTextString': '您好，在的', 'waiter': 'sq-220706-yuzhen41'}}]

## 读取超过10000的数据量

In [356]:
from elasticsearch import Elasticsearch, helpers
import time
query = {"query":{"bool":{"must":[{"match_all":{}}],"must_not":[],"should":[]}},"from":0,"size":100000,"sort":[],"aggs":{}}
start_time = time.time()
 
# helpers.scan()生成的是一个迭代器
res = helpers.scan(es, index=index_name, scroll='2m', query=query)
save_data = []
count = 0
for data in res:
    save_data.append(data)
    count += 1
print(len(save_data))
print(count)
end_time = time.time()

print(f"耗时：{end_time - start_time}")

121411
121411
耗时：3.5662176609039307
