In [1]:
import config
import json
import random
import re
import os
import ast
import datetime
import jieba
import emoji
import numpy as np 
import pandas as pd
import jieba.posseg as pseg
from zhconv import convert
from time import time
from datetime import date
from collections import Counter
from category_encoders.count import CountEncoder
from datetime import timedelta
from sklearn.feature_extraction.text import TfidfVectorizer
from nltk.collocations import BigramAssocMeasures, BigramCollocationFinder
from nltk.collocations import TrigramAssocMeasures, TrigramCollocationFinder
from functools import reduce 
from shutil import copyfile

In [2]:
print('读取参数')
cn_stopword_path = config.cn_stopword_path
en_stopword_path = config.en_stopword_path
common_stopword_path = config.common_stopword_path
keepword_path = config.keepword_path
synonyms_path = config.synonyms_path
legal_pos = config.legal_pos
tokenized_filename = config.tokenized_filename

tfidf_max_df = config.tfidf_max_df
tfidf_max_features = config.tfidf_max_features
tfidf_top_k = config.tfidf_top_k

bigrams_count = config.bigrams_count
bigrams_occur = config.bigrams_occur
bigrams_window = config.bigrams_window
bigrams_show_samples = config.bigrams_show_samples
bigrams_time_limit = config.bigrams_time_limit
bigrams_result_df_path = config.bigrams_result_df_path

trigrams_count = config.trigrams_count
trigrams_occur = config.trigrams_occur
trigrams_window = config.trigrams_window
trigrams_show_samples = config.trigrams_show_samples
trigrams_time_limit = config.trigrams_time_limit
trigrams_result_df_path = config.trigrams_result_df_path

standard_Q_path = config.standard_Q_path
config_path = config.config_path

tokenized_tablename = config.tokenized_tablename
bigram_tablename = config.bigram_tablename
trigram_tablename = config.trigram_tablename

adm_tokenized_tablename = config.adm_tokenized_tablename
adm_bigram_tablename = config.adm_bigram_tablename
adm_trigram_tablename = config.adm_trigram_tablename

input_filename = config.input_filename

读取参数


In [4]:
print('从数据库拉取一周数据:')
t1 = time()
# hive syntax
os.system(
    f"""
        hive -e "
            select * from tmp_htl_ai_db.tmp_fy_im_weekly
        ">{input_filename}
    """
)
t2 = time()
print(t2-t1)

从数据库拉取一周数据:
27.54895257949829


In [8]:
t1=time()

df=pd.read_csv(input_filename, header=0,sep='\t', error_bad_lines=False, engine='python')
df = df[['session_id','datachange_createtime',  'msgtype', 'type', 'rank', 'body']]
print('原始数据集shape: ', df.shape)

print('1. 清洗和筛选')

print('对session_id进行计数编码')
df_session = df['session_id']
ce = CountEncoder(cols=['session_id'])
df_encoded = ce.fit_transform(df)
df_encoded.columns = ['round','datachange_createtime',  'msgtype', 'type', 'rank', 'body']
df = pd.concat([df_session, df_encoded], axis=1)

session_ids = df['session_id'].drop_duplicates()
session_ids_count = session_ids.shape[0]
print('总session数量', session_ids_count)

print('1.1 round完整性筛选')
rank1_session_ids = df[df['rank']==1]['session_id'].drop_duplicates()
print('有rank1的session数量：', rank1_session_ids.shape)
print('有rank1的session比例：', rank1_session_ids.shape[0] / session_ids.shape[0])   

rank_eq_round_session_ids = df[df['rank']==df['round']]['session_id'].drop_duplicates()
print('rank=round的session数量', rank_eq_round_session_ids.shape)
print('rank=round的session比例', rank_eq_round_session_ids.shape[0] / session_ids.shape[0] )       

session_ids = pd.merge(rank1_session_ids, session_ids, how='inner', on='session_id')
session_ids = pd.merge(rank_eq_round_session_ids, session_ids, how='inner', on='session_id')
print('完整的session数量：', session_ids.shape)
print('完整的session占比：', session_ids.shape[0] / session_ids_count)  # session 既完整也能进人工 比例

df = pd.merge(session_ids, df, how='left', on='session_id')

print('1.2 根据round内部发言筛选')
print('聚合type字段')
df_temp = df[['session_id', 'type']]
df_temp['type'] = df_temp['type'].apply(lambda x: str(x))
df_temp = df_temp.groupby(by='session_id')['type'].sum().to_frame()  # 聚合type
df_temp = df_temp.reset_index()
df_temp.columns = ['session_id', 'type_seq']

print('确保session中同时有顾客客服和机器人的发言')
def find_man_and_machine(x):  
    if ('1' in x and '2' in x and '3' in x) or ('1' in x and '2' in x and '4' in x):
        return True
    else:
        return False
df_temp['man_and_machine'] = df_temp['type_seq'].apply(lambda x: find_man_and_machine(x))
df_temp = df_temp[df_temp['man_and_machine']==True]
print('此时的session数量', df_temp.shape)


print('取最后的顾客发言之前的一个机器发言')
def find_last_machine(x):
    last1idx = len(x) - x[::-1].index('1') - 1
    x = x[:last1idx]
    if '2' in x:
        return len(x) - x[::-1].index('2') 
    else:
        return 0
    
df_temp['last_machine'] = df_temp['type_seq'].apply(lambda x: find_last_machine(x))
df = pd.merge(df_temp, df, how='left', on='session_id')
df = df[df['rank'] > df['last_machine']]
print('此时数据集shape: ', df.shape)

print('1.3 根据type和msgtype筛选')
type_dict = {1:'客人',2:'机器',3:"酒店客服人员",4:'携程客服人员', 5:'服务评价'}
msgtype_dict = {0:'手打文本',1:'图像',2:'酒店或房型',3:'视频',4:'语音',5:'文件或链接',6:'坐标',7:'json模板'} 
df = df[(df['type']==1) ]
df = df[(df['msgtype']==0) | (df['msgtype']==7)]
print('此时数据集shape: ', df.shape)

print('1.4 JSON 转文本')
def get_title(body):
    if 'title' in body and '{' in body and '}' in body:
        return json.loads(body, strict=False)['title']
    else:
        return body
df['body']=df['body'].apply(lambda x : get_title(x))

print('1.5 Session聚合')
df = df[['session_id','rank','body']]
df = df.groupby(by='session_id').body.sum().to_frame()  # 聚合body
df = df.reset_index()
print('此时数据集shape: ', df.shape)

print('删除转义符，防止保存csv的时候出错')
df['body'] = df['body'].apply(lambda x: x.replace('\r',''))  
df['body'] = df['body'].apply(lambda x: x.replace('\n',''))  
df['body'] = df['body'].apply(lambda x: x.replace('\t',''))  

print('2. 分词')
print('2.1 读取停止词')
f = open(cn_stopword_path,"r")   
stop_sents_cn = f.read()    
f.close()   
stop_sents_cn = stop_sents_cn.split('\n')
f = open(en_stopword_path,"r")   
stop_sents_en = f.read()    
f.close()   
stop_sents_en = stop_sents_en.split('\n')
f = open(common_stopword_path,"r")    
stop_sents_common = f.read()   
f.close()  
stop_sents_common = stop_sents_common.split('\n')
stop_words = list(set(stop_sents_cn+stop_sents_common+stop_sents_en))
print('停止词数量：',len(stop_words))

print('2.2 加载保留词')
jieba.load_userdict(keepword_path)   

print('2.2~ 加载同义词')
combine_dict = {}
for line in open(synonyms_path, "r"):
    seperate_word = line.strip().split(" ")
    num = len(seperate_word)
    for i in range(1, num):
        combine_dict[seperate_word[i]] = seperate_word[0]
print('同义词对数量：', len(combine_dict))

def clean_then_tokenize(x):
    x = str(x)
    x = emoji.replace_emoji(x, replace='')     # 去除emoji
    x = convert(x, 'zh-cn')                    # 繁简转换
    x = re.sub(u"\\(.*?\\)|\\{.*?\\}|\\[.*?\\]|\\<.*?\\>", "", x)  # 去除括号及其内部

    ## 分词，去停止词，同时pos筛选
    result = []
    words = pseg.cut(x)
    for word, flag in words:
        cn_word = re.findall('[\u4e00-\u9fa5]', word) # 只保留汉字
        cn_word = ''.join(cn_word) 
        if cn_word!='':
            word=cn_word
        word = word.lower()
        if (flag in legal_pos) and (word not in stop_words):
            result.append(word.rstrip().lstrip())
    return list(filter(lambda x : x != '', result))

def replace_synonyms(x):
    result = []
    for word in x:
        if word in combine_dict:
            result.append(combine_dict[word])
        else:
            result.append(word)
    return result

print('2.2 开始分词')
df['tokens_list'] = df['body'].apply(lambda x: clean_then_tokenize(x))
df['tokens_list'] = df['tokens_list'].apply(lambda x: replace_synonyms(x))
df['tokens'] = df['tokens_list'].apply(lambda x: ' '.join(x))
df = df.drop(df[df['tokens']==''].index)

df.reset_index(inplace=True)
df.drop('index',axis=1,inplace=True)
print('当前数据量', df.shape)

print('保存分词后的数据')
df.insert(0, 'org_date_start', datetime.date.today()+timedelta(days=-8))
df.insert(0, 'org_date_end', datetime.date.today()+timedelta(days=-2))
df.to_csv(tokenized_filename, header=False, encoding='utf-8-sig', index=False, sep='\t')

print('3. TF-IDF建模：')
df['idx'] = df.index
tfidf_model = TfidfVectorizer(
    stop_words=stop_words,
    token_pattern=r"(?u)\b\w+\b",  # enable 1-char words
    lowercase=False,
    max_df=tfidf_max_df, 
    max_features=tfidf_max_features  # only depends on tf
)

weight = tfidf_model.fit_transform(df['tokens']).toarray()
word = tfidf_model.get_feature_names()
df_word_weight = pd.DataFrame(data=weight, columns=word)
topwords = df_word_weight.sum(axis=0)
print('TFIDF后的数据大小', df_word_weight.shape)
print('Topwords:')
print(topwords.index.tolist())

print('3.1 提取每个session的关键词：')
def get_keywords(x, topK=tfidf_top_k):
    key_words = []
    for i, w in enumerate(weight[x]):
        if w>0 :
            key_words.append( (word[i],float('{:.3f}'.format(w))) )

    key_words.sort(key=lambda y: y[1], reverse=True)     
    result = key_words[:topK]
    return [x for x,_ in result]

df['keywords'] = df['idx'].apply(lambda x: get_keywords(x))
df.drop('idx', inplace=True, axis=1)

t2=time()
print('预处理耗时：', t2-t1)

Skipping line 40492: Expected 6 fields in line 40492, saw 7
Skipping line 52123: Expected 6 fields in line 52123, saw 8
Skipping line 66938: Expected 6 fields in line 66938, saw 7
Skipping line 67063: Expected 6 fields in line 67063, saw 9
Skipping line 142778: Expected 6 fields in line 142778, saw 8
Skipping line 178960: Expected 6 fields in line 178960, saw 8
Skipping line 217510: Expected 6 fields in line 217510, saw 11
Skipping line 264890: Expected 6 fields in line 264890, saw 7
Skipping line 301657: Expected 6 fields in line 301657, saw 11
Skipping line 301661: Expected 6 fields in line 301661, saw 11
Skipping line 307824: Expected 6 fields in line 307824, saw 7
Skipping line 371546: Expected 6 fields in line 371546, saw 7
Skipping line 418162: Expected 6 fields in line 418162, saw 9
Skipping line 451972: Expected 6 fields in line 451972, saw 7


原始数据集shape:  (454287, 6)
1. 清洗和筛选
对session_id进行计数编码
总session数量 38492
1.1 round完整性筛选
有rank1的session数量： (38232,)
有rank1的session比例： 0.993245349683051
rank=round的session数量 (38242,)
rank=round的session比例 0.9935051439260106
完整的session数量： (38220, 1)
完整的session占比： 0.9929335965914995
1.2 根据round内部发言筛选
聚合type字段


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


确保session中同时有顾客客服和机器人的发言
此时的session数量 (28497, 3)
取最后的顾客发言之前的一个机器发言
此时数据集shape:  (262233, 10)
1.3 根据type和msgtype筛选
此时数据集shape:  (104126, 10)
1.4 JSON 转文本
1.5 Session聚合


Building prefix dict from the default dictionary ...
Loading model from cache /tmp/jieba.cache


此时数据集shape:  (28444, 2)
删除转义符，防止保存csv的时候出错
2. 分词
2.1 读取停止词
停止词数量： 903
2.2 加载保留词


Loading model cost 0.971 seconds.
Prefix dict has been built successfully.


2.2~ 加载同义词
同义词对数量： 23
2.2 开始分词
当前数据量 (25375, 4)
保存分词后的数据
3. TF-IDF建模：
TFIDF后的数据大小 (25375, 1000)
Topwords:
['a', 'app', 'fi', 'h', 'ktv', 'loft', 'qq', 't2', 'v', 'vip', 'wi', 'wifi', '一定', '一小', '一楼', '一次性', '一直', '三人间', '三房', '三间房', '上班', '上订', '下午茶', '下单', '下好', '下班', '下车', '下雨', '不上', '不住', '不可', '不同', '不够', '不好', '不对', '不敢', '不能取消', '不让', '不远', '不通', '不错', '专票', '丢', '两张床', '两间房', '严重', '个人', '中心', '中风险', '久', '乐园', '买', '买票', '了有', '事', '事情', '二楼', '交', '交通', '享受', '亲亲', '亲子', '亲子房', '人员', '人间', '介绍', '付', '付款', '付费', '付钱', '价', '价位', '价格', '价钱', '休息', '优惠', '会员', '会议室', '估计', '位置', '低', '低风险', '住', '住客', '住宿', '住店', '住房', '体验', '使用', '便宜', '保留', '保证', '信号', '信息', '修改', '做', '做好', '做核酸', '做饭', '停', '停电', '停车', '停车位', '停车场', '停车费', '健康', '健康码', '健身房', '儿童', '儿童乐园', '允许', '充电', '充电器', '先', '先定', '免费', '兑换', '入', '入境', '入驻', '全', '全款', '全额', '公众', '公司', '公寓', '公用', '关', '关系', '关闭', '其实', '具体', '具体位置', '内容', '再定', '再订', '写', '冰箱', '决定', '冷', '准备', '几楼', '出', '出不来', '出京', '出入', '出去', '

In [None]:
def find_stardard_Q(keys, Qs):
    keys = keys.split('+')
    for Q in Qs:
        if all(key in Q for key in keys):
            return Q
    return np.nan

def add_stardard_Q(df, standard_Q_path):
    standard_Q = pd.read_excel(standard_Q_path,engine='openpyxl')
    Qs = standard_Q['业务Q'].tolist()
    df['standard_Q'] = df['gram'].apply(lambda x: find_stardard_Q(x, Qs))
    print('新挖掘问题数量',len(df))
    print('不在标准Q里的数量',df['standard_Q'].isna().sum())
    print('占比',df['standard_Q'].isna().sum()/len(df))
    return df


def get_bigrams_report(df, topwords, result_df_path, standard_Q_path, time_limit=600,
                       show_samples=10, grams_count=100, occur=0.0001, window=2):
    
    t1=time()
    
    # input for calculating bigram pmi scores
    data_pmi_input = []
    for words in df['tokens_list'].tolist():
        data_pmi_input.extend(words)
        for i in range(window-1):
            data_pmi_input.extend('|')
    
    # calculate bigram pmi scores
    bigram_measures = BigramAssocMeasures()
    bi_finder = BigramCollocationFinder.from_words(data_pmi_input,window_size=window)  # default window size = 2 
    
    # save bigram pmi scores and generate report
    bigrams = []
    result_df = pd.DataFrame()
    if 0<occur<1:
        occur = len(df)*occur
 
    for w1w2, score in bi_finder.score_ngrams(bigram_measures.pmi): # iterate grams and its score
        
        t2=time()
        if t2-t1>time_limit or grams_count==0:
            break
            
        w1, w2 = w1w2
        list_w1w2 = sorted(set([w1, w2]))
        
        if  (w1!=w2) and (w1 in topwords) and (w2 in topwords) and len(list_w1w2)==2 and (list_w1w2 not in bigrams):
            bigrams.append(list_w1w2)
            temp_df = df['keywords'].apply(set(list_w1w2).issubset)
            cnt = temp_df.sum()
            row_result = dict()
            
            if cnt>occur:
                print(list_w1w2,grams_count)
                row_result['gram']=w1+'+'+w2
                row_result['pmi']=score
                row_result['occurance']=cnt
                sample_df = df.iloc[temp_df[temp_df==True].index][:show_samples]
                sample_df = sample_df[['session_id','body']]
                row_result['samples'] = dict(zip(sample_df.session_id, sample_df.body))
                result_df = result_df.append(row_result, ignore_index=True)
                grams_count-=1
        
    
    result_df = add_stardard_Q(result_df, standard_Q_path)
    result_df = result_df[['gram', 'occurance', 'pmi', 'standard_Q', 'samples']]
    result_df.insert(0, 'org_date_start', datetime.date.today()+timedelta(days=-8))
    result_df.insert(0, 'org_date_end', datetime.date.today()+timedelta(days=-2))
    result_df.to_csv(result_df_path, header=False,index=False,encoding='utf-8-sig', sep='\t')
    print(len(result_df), ' grams')
    return result_df

print('4.1 提取Bigrams：')

result_bigram = get_bigrams_report(
    df, 
    topwords, 
    grams_count=bigrams_count, 
    occur=bigrams_occur, 
    window=bigrams_window,
    show_samples=bigrams_show_samples, 
    time_limit=bigrams_time_limit,
    result_df_path=bigrams_result_df_path, 
    standard_Q_path=standard_Q_path
)


In [None]:
def get_trigrams_report(df, bigrams, result_df_path, standard_Q_path, time_limit=1000,
                        grams_count=200, show_samples=5, occur=0.0001, window=5):

    t1=time()
    
    # input for calculating trigram pmi scores
    data_pmi_input = []
    for words in df['tokens_list'].tolist():
        data_pmi_input.extend(words)
        for i in range(window-1):
            data_pmi_input.extend('|')
    
    # calculate trigram pmi scores
    trigram_measures = TrigramAssocMeasures()
    tri_finder = TrigramCollocationFinder.from_words(data_pmi_input,window_size=window) 
    
    # save trigram pmi scores # generate report
    trigrams = []
    result_df = pd.DataFrame()
    if 0<occur<1:
        occur = occur*len(df)
    for w1w2w3, score in tri_finder.score_ngrams(trigram_measures.pmi):
        t2=time()
        if grams_count==0 or t2-t1>time_limit:
            break
        is_trigram_recorded = False
        w1, w2, w3 = w1w2w3
        set_w1w2w3 = set([w1,w2,w3])
        list_w1w2w3 = sorted(set_w1w2w3)
        
        for set_w1w2 in bigrams:
            if set_w1w2.issubset(set_w1w2w3) and len(set_w1w2w3)==3 and (list_w1w2w3 not in trigrams):
                trigrams.append(list_w1w2w3)
                is_trigram_recorded=True
                break
            
        if is_trigram_recorded:
            temp_df = df['keywords'].apply(set_w1w2w3.issubset)
            cnt = temp_df.sum()
            row_result = dict()
            if cnt>occur:
                print(list_w1w2w3, grams_count)
                row_result['gram']=w1+'+'+w2+'+'+w3
                row_result['pmi']=score
                row_result['occurance']=cnt
                sample_df = df.iloc[temp_df[temp_df==True].index][:show_samples]
                sample_df = sample_df[['session_id','body']]
                row_result['samples'] = dict(zip(sample_df.session_id, sample_df.body))
                result_df = result_df.append(row_result, ignore_index=True)
                grams_count-=1

    result_df = add_stardard_Q(result_df, standard_Q_path)
    result_df = result_df[['gram', 'occurance', 'pmi', 'standard_Q', 'samples']]
    result_df.insert(0, 'org_date_start', datetime.date.today()+timedelta(days=-8))
    result_df.insert(0, 'org_date_end', datetime.date.today()+timedelta(days=-2))
    result_df.to_csv(result_df_path, header=False, index=False, encoding='utf-8-sig', sep='\t')
    print(len(result_df), ' grams')
    return result_df

bigrams = result_bigram.gram.tolist()
bigrams = [set(x.split('+')) for x in bigrams]

print('4.2 提取Trigrams：')
result_trigram=get_trigrams_report(
    df, bigrams, 
    result_df_path=trigrams_result_df_path,
    standard_Q_path=standard_Q_path, 
    time_limit=trigrams_time_limit,
    grams_count=trigrams_count, 
    show_samples=trigrams_show_samples, 
    occur=trigrams_occur, 
    window=trigrams_window
)
print('全部处理完毕！')
print('session数量',df.shape)
print('bigram数量', result_bigram.shape)
print('trigram数量',result_trigram.shape)

print('保存配置文件：')
copyfile('config.py', config_path) 


In [None]:
t1 = time()
print('数据存入临时表：')
os.system(
 """hive -e "
 LOAD DATA LOCAL INPATH '{}' OVERWRITE INTO TABLE {};" """.format(tokenized_filename, tokenized_tablename)
)
os.system(
 """hive -e " 
 LOAD DATA LOCAL INPATH '{}' OVERWRITE INTO TABLE {};" """.format(bigrams_result_df_path, bigram_tablename)
)
os.system(
 """hive -e "
 LOAD DATA LOCAL INPATH '{}' OVERWRITE INTO TABLE {};" """.format(trigrams_result_df_path, trigram_tablename)
)


today = date.today()
d = today.strftime("%Y-%m-%d")

print('临时表数据写入正式表：')
os.system(
 """hive -e "
    insert into table {} partition(d='{}')
    select * from {};
 " """.format(adm_bigram_tablename, d, bigram_tablename)
)

os.system(
 """hive -e "
    insert into table {} partition(d='{}')
    select * from {};
 " """.format(adm_trigram_tablename, d, trigram_tablename)
)
os.system(
 """hive -e "
    insert into table {} partition(d='{}')
    select * from {};
 " """.format(adm_tokenized_tablename, d, tokenized_tablename)
)


t2 = time()
print('结果写入数据库耗时', t2-t1)

print('脚本执行完毕！')



In [None]:
ssssssssssssss

In [5]:
t1=time()

df=pd.read_csv(input_filename, header=0,sep='\t',error_bad_lines=False,engine='python')
df = df[['session_id','datachange_createtime',  'msgtype', 'type', 'rank', 'body']]
print('原始数据集shape: ', df.shape)

print('1. 清洗和筛选')

print('对session_id进行计数编码')
df_session = df['session_id']
ce = CountEncoder(cols=['session_id'])
df_encoded = ce.fit_transform(df)
df_encoded.columns = ['round','datachange_createtime',  'msgtype', 'type', 'rank', 'body']
df = pd.concat([df_session, df_encoded], axis=1)

session_ids = df['session_id'].drop_duplicates()
session_ids_count = session_ids.shape[0]
print('总session数量', session_ids_count)

print('1.1 round完整性筛选')
rank1_session_ids = df[df['rank']==1]['session_id'].drop_duplicates()
print('有rank1的session数量：', rank1_session_ids.shape)
print('有rank1的session比例：', rank1_session_ids.shape[0] / session_ids.shape[0])   

rank_eq_round_session_ids = df[df['rank']==df['round']]['session_id'].drop_duplicates()
print('rank=round的session数量', rank_eq_round_session_ids.shape)
print('rank=round的session比例', rank_eq_round_session_ids.shape[0] / session_ids.shape[0] )       

session_ids = pd.merge(rank1_session_ids, session_ids, how='inner', on='session_id')
session_ids = pd.merge(rank_eq_round_session_ids, session_ids, how='inner', on='session_id')
print('完整的session数量：', session_ids.shape)
print('完整的session占比：', session_ids.shape[0] / session_ids_count)  # session 既完整也能进人工 比例

df = pd.merge(session_ids, df, how='left', on='session_id')

print('1.2 根据round内部发言筛选')
print('聚合type字段')
df_temp = df[['session_id', 'type']]
df_temp['type'] = df_temp['type'].apply(lambda x: str(x))
df_temp = df_temp.groupby(by='session_id')['type'].sum().to_frame()  # 聚合type
df_temp = df_temp.reset_index()
df_temp.columns = ['session_id', 'type_seq']

print('确保session中同时有顾客客服和机器人的发言')
def find_man_and_machine(x):  
    if ('1' in x and '2' in x and '3' in x) or ('1' in x and '2' in x and '4' in x):
        return True
    else:
        return False
df_temp['man_and_machine'] = df_temp['type_seq'].apply(lambda x: find_man_and_machine(x))
df_temp = df_temp[df_temp['man_and_machine']==True]
print('此时的session数量', df_temp.shape)


print('取最后的顾客发言之前的一个机器发言')
def find_last_machine(x):
    last1idx = len(x) - x[::-1].index('1') - 1
    x = x[:last1idx]
    if '2' in x:
        return len(x) - x[::-1].index('2') 
    else:
        return 0
    
df_temp['last_machine'] = df_temp['type_seq'].apply(lambda x: find_last_machine(x))
df = pd.merge(df_temp, df, how='left', on='session_id')
df = df[df['rank'] > df['last_machine']]
print('此时数据集shape: ', df.shape)

print('1.3 根据type和msgtype筛选')
type_dict = {1:'客人',2:'机器',3:"酒店客服人员",4:'携程客服人员', 5:'服务评价'}
msgtype_dict = {0:'手打文本',1:'图像',2:'酒店或房型',3:'视频',4:'语音',5:'文件或链接',6:'坐标',7:'json模板'} 
df = df[(df['type']==1) ]
df = df[(df['msgtype']==0) | (df['msgtype']==7)]
print('此时数据集shape: ', df.shape)

print('1.4 JSON 转文本')
def get_title(body):
    if 'title' in body and '{' in body and '}' in body:
        return json.loads(body, strict=False)['title']
    else:
        return body
df['body']=df['body'].apply(lambda x : get_title(x))

print('1.5 Session聚合')
df = df[['session_id','rank','body']]
df = df.groupby(by='session_id').body.sum().to_frame()  # 聚合body
df = df.reset_index()
print('此时数据集shape: ', df.shape)

print('删除转义符，防止保存csv的时候出错')
df['body'] = df['body'].apply(lambda x: x.replace('\r',''))  
df['body'] = df['body'].apply(lambda x: x.replace('\n',''))  
df['body'] = df['body'].apply(lambda x: x.replace('\t',''))  

print('2. 分词')
print('2.1 读取停止词')
f = open(cn_stopword_path,"r")   
stop_sents_cn = f.read()    
f.close()   
stop_sents_cn = stop_sents_cn.split('\n')
f = open(en_stopword_path,"r")   
stop_sents_en = f.read()    
f.close()   
stop_sents_en = stop_sents_en.split('\n')
f = open(common_stopword_path,"r")    
stop_sents_common = f.read()   
f.close()  
stop_sents_common = stop_sents_common.split('\n')
stop_words = list(set(stop_sents_cn+stop_sents_common+stop_sents_en))
print('停止词数量：',len(stop_words))

print('2.2 加载保留词')
jieba.load_userdict(keepword_path)   

print('2.2~ 加载同义词')
combine_dict = {}
for line in open(synonyms_path, "r"):
    seperate_word = line.strip().split(" ")
    num = len(seperate_word)
    for i in range(1, num):
        combine_dict[seperate_word[i]] = seperate_word[0]
print('同义词对数量：', len(combine_dict))

def clean_then_tokenize(x):
    x = str(x)
    x = emoji.replace_emoji(x, replace='')     # 去除emoji
    x = convert(x, 'zh-cn')                    # 繁简转换
    x = re.sub(u"\\(.*?\\)|\\{.*?\\}|\\[.*?\\]|\\<.*?\\>", "", x)  # 去除括号及其内部

    ## 分词，去停止词，同时pos筛选
    result = []
    words = pseg.cut(x)
    for word, flag in words:
        cn_word = re.findall('[\u4e00-\u9fa5]', word) # 只保留汉字
        cn_word = ''.join(cn_word) 
        if cn_word!='':
            word=cn_word
        word = word.lower()
        if (flag in legal_pos) and (word not in stop_words):
            result.append(word.rstrip().lstrip())
    return list(filter(lambda x : x != '', result))

def replace_synonyms(x):
    result = []
    for word in x:
        if word in combine_dict:
            result.append(combine_dict[word])
        else:
            result.append(word)
    return result

print('2.2 开始分词')
df['tokens_list'] = df['body'].apply(lambda x: clean_then_tokenize(x))
df['tokens_list'] = df['tokens_list'].apply(lambda x: replace_synonyms(x))
df['tokens'] = df['tokens_list'].apply(lambda x: ' '.join(x))
df = df.drop(df[df['tokens']==''].index)

df.reset_index(inplace=True)
df.drop('index',axis=1,inplace=True)
print('当前数据量', df.shape)

print('保存分词后的数据')
df.insert(0, 'org_date_start', datetime.date.today()+timedelta(days=-8))
df.insert(0, 'org_date_end', datetime.date.today()+timedelta(days=-2))
df.to_csv(tokenized_filename, header=False, encoding='utf-8-sig', index=False, sep='\t')

Skipping line 16542: Expected 6 fields in line 16542, saw 7
Skipping line 34645: Expected 6 fields in line 34645, saw 15
Skipping line 54071: Expected 6 fields in line 54071, saw 8
Skipping line 70152: Expected 6 fields in line 70152, saw 9
Skipping line 91774: Expected 6 fields in line 91774, saw 7
Skipping line 92093: Expected 6 fields in line 92093, saw 7
Skipping line 100187: Expected 6 fields in line 100187, saw 9
Skipping line 100188: Expected 6 fields in line 100188, saw 9
Skipping line 104560: Expected 6 fields in line 104560, saw 8
Skipping line 108757: Expected 6 fields in line 108757, saw 9
Skipping line 135421: Expected 6 fields in line 135421, saw 8
Skipping line 159649: Expected 6 fields in line 159649, saw 9
Skipping line 213990: Expected 6 fields in line 213990, saw 7
Skipping line 214006: Expected 6 fields in line 214006, saw 7
Skipping line 214644: Expected 6 fields in line 214644, saw 12
Skipping line 258717: Expected 6 fields in line 258717, saw 7
Skipping line 2590

原始数据集shape:  (399868, 6)
1. 清洗和筛选
对session_id进行计数编码
总session数量 33678
1.1 round完整性筛选
有rank1的session数量： (33442,)
有rank1的session比例： 0.9929924579844409
rank=round的session数量 (33459,)
rank=round的session比例 0.9934972385533583
完整的session数量： (33431, 1)
完整的session占比： 0.9926658352633767
1.2 根据round内部发言筛选
聚合type字段


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


确保session中同时有顾客客服和机器人的发言
此时的session数量 (24917, 3)
取最后的顾客发言之前的一个机器发言
此时数据集shape:  (231610, 10)
1.3 根据type和msgtype筛选
此时数据集shape:  (90297, 10)
1.4 JSON 转文本
1.5 Session聚合


Building prefix dict from the default dictionary ...
Loading model from cache /tmp/jieba.cache


此时数据集shape:  (24849, 2)
删除转义符，防止保存csv的时候出错
2. 分词
2.1 读取停止词
停止词数量： 903
2.2 加载保留词


Loading model cost 0.980 seconds.
Prefix dict has been built successfully.


2.2~ 加载同义词
同义词对数量： 20
2.2 开始分词
当前数据量 (22184, 4)
保存分词后的数据


In [13]:
df.sample(5)

Unnamed: 0,org_date_end,org_date_start,session_id,body,tokens_list,tokens
2594,2022-04-17,2022-04-11,100000091497334,入住要24小时核酸检测吗要24小时核酸检测吗外地的24小时不行吗？本地的码没那么快出来,"[入住, 小时, 核酸检测, 小时, 核酸检测, 外地, 小时, 的码, 没, 快]",入住 小时 核酸检测 小时 核酸检测 外地 小时 的码 没 快
5947,2022-04-17,2022-04-11,300000090797638,你好我这会下单那个房间没有了吗人工,"[下单, 房间]",下单 房间
14882,2022-04-17,2022-04-11,600000090997161,位置不是疫情管控的位置吧只要出示绿码和行程码九可以了对吧昨天做的核酸还没有出结果我和我女朋友...,"[位置, 疫情, 管控, 位置, 出示, 绿码, 行程码, 做, 核酸, 出, 女朋友, 核...",位置 疫情 管控 位置 出示 绿码 行程码 做 核酸 出 女朋友 核酸 没出 有毒 只能 退...
10056,2022-04-17,2022-04-11,400000090650513,我要取消订单未入住，行程有变店家客服电话？,"[取消, 订单, 未, 入住, 行程, 有变, 店家, 电话]",取消 订单 未 入住 行程 有变 店家 电话
4194,2022-04-17,2022-04-11,200000091256568,费用,[费用],费用


In [5]:
print('3. TF-IDF建模：')
df['idx'] = df.index
tfidf_model = TfidfVectorizer(
    stop_words=stop_words,
    token_pattern=r"(?u)\b\w+\b",  # enable 1-char words
    lowercase=False,
    max_df=tfidf_max_df, 
    max_features=tfidf_max_features  # only depends on tf
)

weight = tfidf_model.fit_transform(df['tokens']).toarray()
word = tfidf_model.get_feature_names()
df_word_weight = pd.DataFrame(data=weight, columns=word)
topwords = df_word_weight.sum(axis=0)
print('TFIDF后的数据大小', df_word_weight.shape)
print('Topwords:')
print(topwords.index.tolist())

print('3.1 提取每个session的关键词：')
def get_keywords(x, topK=tfidf_top_k):
    key_words = []
    for i, w in enumerate(weight[x]):
        if w>0 :
            key_words.append( (word[i],float('{:.3f}'.format(w))) )

    key_words.sort(key=lambda y: y[1], reverse=True)     
    result = key_words[:topK]
    return [x for x,_ in result]

df['keywords'] = df['idx'].apply(lambda x: get_keywords(x))
df.drop('idx', inplace=True, axis=1)

t2=time()
print(t2-t1)

3. TF-IDF建模：
TFIDF后的数据大小 (30820, 1000)
3.1 提取每个session的关键词：
130.4496250152588


In [6]:
def find_stardard_Q(keys, Qs):
    keys = keys.split('+')
    for Q in Qs:
        if all(key in Q for key in keys):
            return Q
    return np.nan

def add_stardard_Q(df, standard_Q_path):
    standard_Q = pd.read_excel(standard_Q_path,engine='openpyxl')
    Qs = standard_Q['业务Q'].tolist()
    df['standard_Q'] = df['gram'].apply(lambda x: find_stardard_Q(x, Qs))
    print('新挖掘问题数量',len(df))
    print('不在标准Q里的数量',df['standard_Q'].isna().sum())
    print('占比',df['standard_Q'].isna().sum()/len(df))
    return df


def get_bigrams_report(df, topwords, result_df_path, standard_Q_path, time_limit=600,
                       show_samples=10, grams_count=100, occur=0.0001, window=2):
    
    t1=time()
    
    # input for calculating bigram pmi scores
    data_pmi_input = []
    for words in df['tokens_list'].tolist():
        data_pmi_input.extend(words)
        for i in range(window-1):
            data_pmi_input.extend('|')
    
    # calculate bigram pmi scores
    bigram_measures = BigramAssocMeasures()
    bi_finder = BigramCollocationFinder.from_words(data_pmi_input,window_size=window)  # default window size = 2 
    
    # save bigram pmi scores and generate report
    bigrams = []
    result_df = pd.DataFrame()
    if 0<occur<1:
        occur = len(df)*occur
 
    for w1w2, score in bi_finder.score_ngrams(bigram_measures.pmi): # iterate grams and its score
        
        t2=time()
        if t2-t1>time_limit or grams_count==0:
            break
            
        w1, w2 = w1w2
        list_w1w2 = sorted(set([w1, w2]))
        
        if  (w1!=w2) and (w1 in topwords) and (w2 in topwords) and len(list_w1w2)==2 and (list_w1w2 not in bigrams):
            bigrams.append(list_w1w2)
            temp_df = df['keywords'].apply(set(list_w1w2).issubset)
            cnt = temp_df.sum()
            row_result = dict()
            
            if cnt>occur:
                print(list_w1w2,grams_count)
                row_result['gram']=w1+'+'+w2
                row_result['pmi']=score
                row_result['occurance']=cnt
                sample_df = df.iloc[temp_df[temp_df==True].index][:show_samples]
                sample_df = sample_df[['session_id','body']]
                row_result['samples'] = dict(zip(sample_df.session_id, sample_df.body))
                result_df = result_df.append(row_result, ignore_index=True)
                grams_count-=1
        
    
    result_df = add_stardard_Q(result_df, standard_Q_path)
    result_df = result_df[['gram', 'occurance', 'pmi', 'standard_Q', 'samples']]
    result_df.insert(0, 'org_date_start', datetime.date.today()+timedelta(days=-8))
    result_df.insert(0, 'org_date_end', datetime.date.today()+timedelta(days=-2))
    result_df.to_csv(result_df_path, header=False,index=False,encoding='utf-8-sig', sep='\t')
    print(len(result_df), ' grams')
    return result_df

print('4.1 提取Bigrams：')



result_bigram = get_bigrams_report(
    df, 
    topwords, 
    grams_count=bigrams_count, 
    occur=bigrams_occur, 
    window=bigrams_window,
    show_samples=bigrams_show_samples, 
    time_limit=bigrams_time_limit,
    result_df_path=bigrams_result_df_path, 
    standard_Q_path=standard_Q_path
)

4.1 提取Bigrams：
['洗漱', '用品'] 200
['fi', 'wi'] 199
['电', '竞'] 198
['插', '网线'] 197
['厅', '室'] 196
['效果', '隔音'] 195
['学院', '科技'] 194
['装修', '风格'] 193
['电梯', '要刷'] 192
['主题', '浪漫'] 191
['行政', '酒廊'] 190
['感谢您', '非常'] 189
['有限公司', '税号'] 188
['一次性', '拖鞋'] 187
['差价', '补'] 186
['征用', '政府'] 185
['专票', '能开'] 184
['开具', '水单'] 183
['短信', '给我发'] 182
['房卡', '要刷'] 181
['娱乐', '项目'] 180
['稳定', '网络'] 179
['寄存', '行李'] 178
['支行', '账号'] 177
['qq', '邮箱'] 176
['手续费', '扣'] 175
['娱乐', '设施'] 174
['健康', '监测'] 173
['智能', '马桶'] 172
['厨具', '电磁炉'] 171
['有限公司', '科技'] 170
['午餐', '晚餐'] 169
['人间', '男生'] 168
['拖鞋', '用品'] 167
['病例', '确诊'] 166
['报', '身份证号'] 165
['收取', '违约金'] 164
['宠物', '携带'] 163
['a', 'b'] 162
['公司', '名称'] 161
['团购', '抖音'] 160
['变', '黄码'] 159
['新增', '病例'] 158
['厨具', '锅'] 157
['电脑', '配置'] 156
['厨房', '锅'] 155
['密码', '无线'] 154
['下雨', '天气'] 153
['拖鞋', '牙刷'] 152
['一次性', '马桶'] 151
['坐', '高铁'] 150
['听', '得到'] 149
['持有', '未成年'] 148
['有变', '行程'] 147
['fi', '密码'] 146
['卫生间', '独立'] 145
['一楼', '二楼'] 144
['洗', '洗衣服'] 143

In [7]:
print('4.2 提取Trigrams：')
def get_trigrams_report(df, bigrams, result_df_path, standard_Q_path, time_limit=1000,
                        grams_count=200, show_samples=5, occur=0.0001, window=5):

    t1=time()
    
    # input for calculating trigram pmi scores
    data_pmi_input = []
    for words in df['tokens_list'].tolist():
        data_pmi_input.extend(words)
        for i in range(window-1):
            data_pmi_input.extend('|')
    
    # calculate trigram pmi scores
    trigram_measures = TrigramAssocMeasures()
    tri_finder = TrigramCollocationFinder.from_words(data_pmi_input,window_size=window) 
    
    # save trigram pmi scores # generate report
    trigrams = []
    result_df = pd.DataFrame()
    if 0<occur<1:
        occur = occur*len(df)
    for w1w2w3, score in tri_finder.score_ngrams(trigram_measures.pmi):
        t2=time()
        if grams_count==0 or t2-t1>time_limit:
            break
        is_trigram_recorded = False
        w1, w2, w3 = w1w2w3
        set_w1w2w3 = set([w1,w2,w3])
        list_w1w2w3 = sorted(set_w1w2w3)
        
        for set_w1w2 in bigrams:
            if set_w1w2.issubset(set_w1w2w3) and len(set_w1w2w3)==3 and (list_w1w2w3 not in trigrams):
                trigrams.append(list_w1w2w3)
                is_trigram_recorded=True
                break
            
        if is_trigram_recorded:
            temp_df = df['keywords'].apply(set_w1w2w3.issubset)
            cnt = temp_df.sum()
            row_result = dict()
            if cnt>occur:
                print(list_w1w2w3, grams_count)
                row_result['gram']=w1+'+'+w2+'+'+w3
                row_result['pmi']=score
                row_result['occurance']=cnt
                sample_df = df.iloc[temp_df[temp_df==True].index][:show_samples]
                sample_df = sample_df[['session_id','body']]
                row_result['samples'] = dict(zip(sample_df.session_id, sample_df.body))
                result_df = result_df.append(row_result, ignore_index=True)
                grams_count-=1

    result_df = add_stardard_Q(result_df, standard_Q_path)
    result_df = result_df[['gram', 'occurance', 'pmi', 'standard_Q', 'samples']]
    result_df.insert(0, 'org_date_start', datetime.date.today()+timedelta(days=-8))
    result_df.insert(0, 'org_date_end', datetime.date.today()+timedelta(days=-2))
    result_df.to_csv(result_df_path, header=False, index=False, encoding='utf-8-sig', sep='\t')
    print(len(result_df), ' grams')
    return result_df

bigrams = result_bigram.gram.tolist()
bigrams = [set(x.split('+')) for x in bigrams]

result_trigram=get_trigrams_report(
    df, bigrams, 
    result_df_path=trigrams_result_df_path,
    standard_Q_path='../data/StandardQ.xlsx', 
    time_limit=trigrams_time_limit,
    grams_count=trigrams_count, 
    show_samples=trigrams_show_samples, 
    occur=trigrams_occur, 
    window=trigrams_window
)
print('全部处理完毕！')


4.2 提取Trigrams：
['房卡', '电梯', '要刷'] 100
['fi', 'wi', '密码'] 99
['qq', '有限公司', '税号'] 98
['健康', '监测', '解除'] 97
['天', '接收', '解除'] 96
['居家', '接受', '监测'] 95
['回', '居家', '监测'] 94
['健康', '天', '监测'] 93
['qq', '发票', '邮箱'] 92
['楼层', '预留', '高'] 91
['外省', '是否', '自驾'] 90
['天', '居家', '监测'] 89
['居家', '支持', '隔离'] 88
['手机', '投屏', '电视'] 87
['安排', '楼层', '高'] 86
['办理', '电子', '证件'] 85
['刷卡', '电梯', '需要'] 84
['交', '店', '押金'] 83
['发票', '开', '电子'] 82
['天', '隔离', '集中'] 81
['有变', '行程', '订单'] 80
['取消', '有变', '行程'] 79
['v', '加', '方便'] 78
['有变', '行程', '退'] 77
['刷卡', '电梯', '酒店'] 76
['外卖', '房间', '送到'] 75
['地区', '星号', '高风险'] 74
['办理', '电子', '身份证'] 73
['低风险', '地区', '行程卡'] 72
['外省', '正常', '自驾'] 71
['不上', '电话', '联系'] 70
['低风险', '地区', '星号'] 69
['低风险', '地区', '绿码'] 68
['低风险', '地区', '带星'] 67
['房间', '投屏', '电视'] 66
['带', '电子', '身份证'] 65
['wifi', '密码', '房间'] 64
['低风险', '地区', '隔离'] 63
['低风险', '地区', '小时'] 62
['不上', '联系', '酒店'] 61
['投屏', '电视', '酒店'] 60
['低风险', '地区', '核酸'] 59
['低风险', '地区', '需要'] 58
['wifi', '密码', '酒店'] 57
['低风险', '地区

In [8]:
print(df.shape)
print(result_bigram.shape)
print(result_trigram.shape)

(30820, 7)
(200, 7)
(45, 7)


In [9]:
result_trigram.sample(5)

Unnamed: 0,org_date_end,org_date_start,gram,occurance,pmi,standard_Q,samples
7,2022-04-06,2022-03-31,天+健康+监测,14.0,15.691835,,"{600000090763986: '不用7天健康监测那些吗', 7000000912367..."
12,2022-04-06,2022-03-31,支持+居家+隔离,10.0,14.660205,,{500000090717686: '你好，我想问下支持+7吗好的居家隔离好的没了谢谢你好的...
41,2022-04-06,2022-03-31,低风险+地区+核酸,15.0,9.129449,,{100000091180385: '我问下我从北京过去可以住吧 [愉快]有核酸 低风险地区...
24,2022-04-06,2022-03-31,酒店+电梯+刷卡,10.0,12.456062,,"{100000091108791: '好的这个酒店上电梯要刷卡吗好的，谢谢', 100000..."
44,2022-04-06,2022-03-31,低风险+地区+星,8.0,8.806358,,{200000090946361: '江苏省低风险地区持48h核酸检测报告可以入住吗，没有星...


In [10]:
result_bigram.sample(5)

Unnamed: 0,org_date_end,org_date_start,gram,occurance,pmi,standard_Q,samples
44,2022-04-06,2022-03-31,电脑+配置,13.0,9.347762,,"{100000091258551: '什么配置啊电脑什么配置', 2000000909876..."
135,2022-04-06,2022-03-31,收到+短信,4.0,7.92245,,{400000090364769: '好的，谢谢短信还没收到不知道是不是手机设置过拒收陌生短...
5,2022-04-06,2022-03-31,隔音+效果,19.0,11.150379,,"{100000091157429: '隔音效果怎么样', 200000090916056: ..."
17,2022-04-06,2022-03-31,开具+水单,4.0,10.211485,我要开具水单,"{100000091275277: '可以开具水单吗', 400000090389118: ..."
124,2022-04-06,2022-03-31,厨房+做饭,10.0,7.998852,,"{400000090362497: '有没有厨房可以做饭呀', 40000009038246..."


In [11]:
df.sample(5)

Unnamed: 0,org_date_end,org_date_start,session_id,body,tokens_list,tokens,keywords
3606,2022-04-06,2022-03-31,100000091274542,在吗有房吗多少钱一间能三个人住吗,"[房, 钱, 住]",房 钱 住,"[钱, 房, 住]"
22744,2022-04-06,2022-03-31,600000090856685,您好昨天看到3晚套餐怎么没了？,"[看到, 套餐, 没]",看到 套餐 没,"[套餐, 看到, 没]"
14721,2022-04-06,2022-03-31,400000090447962,电子身份证能不能办理入住？,"[电子, 身份证, 办理, 入住]",电子 身份证 办理 入住,"[电子, 身份证, 办理]"
23678,2022-04-06,2022-03-31,700000091247504,请问一下我从机场过去住 码会黄吗明早还有飞机,"[机场, 住, 码会, 黄, 飞机]",机场 住 码会 黄 飞机,"[黄, 飞机, 机场, 住]"
4290,2022-04-06,2022-03-31,200000090943042,费用酒店服务和设施餐饮服务你好餐厅一桌的标准费用是多少呢,"[费用, 酒店, 服务, 设施, 餐饮, 服务, 餐厅, 标准, 费用]",费用 酒店 服务 设施 餐饮 服务 餐厅 标准 费用,"[费用, 服务, 餐饮, 标准, 餐厅]"


In [3]:
import os

t1 = time()


tokenized_tablename = 'tmp_htl_ai_db.tmp_fy_im_weekly_tokenized'
bigram_tablename = 'tmp_htl_ai_db.tmp_fy_im_weekly_bigrams'
trigram_tablename = 'tmp_htl_ai_db.tmp_fy_im_weekly_trigrams'

tokenized_filename = '../temp_data/tokenized.csv'
bigrams_result_df_path = '../temp_data/bigram.csv'
trigrams_result_df_path = '../temp_data/trigram.csv'

# 添加新数据到 hive 临时表
os.system(
 """hive -e "use tmp_htl_ai_db; 
 LOAD DATA LOCAL INPATH '{}' OVERWRITE INTO TABLE {};" """.format(tokenized_filename, tokenized_tablename)
)
os.system(
 """hive -e "use tmp_htl_ai_db; 
 LOAD DATA LOCAL INPATH '{}' OVERWRITE INTO TABLE {};" """.format(bigrams_result_df_path, bigram_tablename)
)
os.system(
 """hive -e "use tmp_htl_ai_db; 
 LOAD DATA LOCAL INPATH '{}' OVERWRITE INTO TABLE {};" """.format(trigrams_result_df_path, trigram_tablename)
)

t2 = time()
print(t2-t1)

57.96534466743469


In [8]:

import os
from datetime import date


today = date.today()
d = today.strftime("%Y-%m-%d")

adm_tokenized_tablename = 'htl_ai_db.adm_fy_im_weekly_tokenized'
adm_bigram_tablename = 'htl_ai_db.adm_fy_im_weekly_bigrams'
adm_trigram_tablename = 'htl_ai_db.adm_fy_im_weekly_trigrams'

print('临时表数据写入正式表：')

# os.system(
#  """hive -e "
#     insert into table htl_ai_db.adm_fy_im_weekly_trigrams partition(d='2022-04-15')
#     select * from tmp_htl_ai_db.tmp_fy_im_weekly_trigrams;
#  " """
# )

os.system(
 """hive -e "
    insert into table {} partition(d='{}')
    select * from {};
 " """.format(adm_bigram_tablename, d, bigram_tablename)
)

os.system(
 """hive -e "
    insert into table {} partition(d='{}')
    select * from {};
 " """.format(adm_trigram_tablename, d, trigram_tablename)
)
os.system(
 """hive -e "
    insert into table {} partition(d='{}')
    select * from {};
 " """.format(adm_tokenized_tablename, d, tokenized_tablename)
)



临时表数据写入正式表：


0

In [3]:
!python --version

Python 3.6.5 :: Anaconda, Inc.
