In [1]:
import os
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession 
from pyspark.conf import SparkConf
from pyspark.sql import HiveContext,SparkSession
from pyspark.sql.functions import split, explode
from pyspark.sql.functions import udf, col, size
from pyspark.sql.types import ArrayType, StringType

sparkConf = SparkConf()
# 设置Driver进程的内存
sparkConf.set('spark.driver.memory', '8G')
# 设置Driver的CPU core数量
sparkConf.set('spark.driver.cores', '4')
# 设置Spark作业总共要用多少个Executor进程来执行
sparkConf.set("spark.executor.instances", "500")
# 设置每个Executor进程的CPU core数量
sparkConf.set("spark.executor.cores", "4")
# 设置每个Executor进程的内存
sparkConf.set("spark.executor.memory", "4g")
# 设置Spark应用的名称

sparkConf.set("spark.app.name", "bixiaoyu3")

spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
sc = spark.sparkContext

In [2]:
import re
import json

def get_text(text):
    match = []
    regex = '.*?([\u4E00-\u9FA5]+).*?'
    res = re.findall(regex, str(text))
    if res:
        match.append(res)
    
    return match

def get_cut_words(text):
    res = []
    try:
        for i in text:
            seg_list = jieba.cut(i)
            res.extend(seg_list)
    except:
        return res
    
    return res

## 原来的有点Bug，修改了
stop = open('hit_stopwords.txt', 'r+', encoding='utf-8')
stop_words = stop.read().split("\n") + [' '] + ['您好'] + ['你好'] + ['请求'] + ['文本']
stop.close()
def skip_stop_words(text):
    res =[]
    for word in text:
        if word not in stop_words:
            res.append(word)
            
    return res

In [7]:
def jieba_words_human(text, role):
    try:
        import jieba
        import re
        import json
        
        if role == 0:
            res_user = []
            for i in text:
                i = json.loads(i)
                if i['role'] == 'USER':
                    res_user.append(i['text'])
            
            res_user = get_text(res_user)
            res_user_cut = []
            try:
                for i in res_user[0]:
                    seg_list = jieba.cut(i)
                    res_user_cut.extend(seg_list)
            except:
                return []
            
            return (res_user_cut)
        
        if role == 1:
            res_agent = []

            for i in text:
                i = json.loads(i)
                if i['role'] == 'AGENT':
                    res_agent.append(i['text'])
        
        
            res_agent = get_text(res_agent)

            res_agent_cut = []

            try:
                for i in res_agent[0]:
                    seg_list = jieba.cut(i)
                    res_agent_cut.extend(seg_list)
            except:
                return []

            return (res_agent_cut)
           
    except Exception as e:
        return ['%s' % e]

spark.udf.register('jieba_words_human', jieba_words_human, ArrayType(StringType()))

<function __main__.jieba_words_human(text, role)>

In [8]:
df1 = spark.sql('''
select 
    jieba_words_human(text,0) as human_user_jieba,
    jieba_words_human(text,1) as human_agent_jieba
from 
    dmr_dev.bxy_overdue_table_with_call_text
Where
    call_type = 'human'
limit 100''')

In [9]:
df1

DataFrame[human_user_jieba: array<string>, human_agent_jieba: array<string>]

In [10]:
df1.show(10)

+--------------------+--------------------+
|    human_user_jieba|   human_agent_jieba|
+--------------------+--------------------+
|[Extra data: line...|[Extra data: line...|
|                  []|                  []|
|[Extra data: line...|[Extra data: line...|
|[Extra data: line...|[Extra data: line...|
|[Extra data: line...|[Extra data: line...|
|[Extra data: line...|[Extra data: line...|
|                  []|                  []|
|[Extra data: line...|[Extra data: line...|
|[Extra data: line...|[Extra data: line...|
|[Extra data: line...|[Extra data: line...|
+--------------------+--------------------+
only showing top 10 rows



In [7]:
def jieba_words_robot(text, role):
    try:
        import jieba
        import re
        import json
        
        if role == 0:
            res_user = []
            for i in text:
                i = json.loads(i)
                if i['role'] == 'VOICEIN_CONVERSATION':
                    res_user.append(i['text_info'])
            
            res_user = get_text(res_user)
            res_user_cut = []
            try:
                for i in res_user[0]:
                    seg_list = jieba.cut(i)
                    res_user_cut.extend(seg_list)
            except:
                return []
            
            return (res_user_cut)
        
        if role == 1:
            res_agent = []

            for i in text:
                i = json.loads(i)
                if i['role'] == 'TTS_CONVERSATION':
                    res_agent.append(i['text_info'])
        
        
            res_agent = get_text(res_agent)

            res_agent_cut = []

            try:
                for i in res_agent[0]:
                    seg_list = jieba.cut(i)
                    res_agent_cut.extend(seg_list)
            except:
                return []

            return (res_agent_cut)
           
    except Exception as e:
        return ['%s' % e]

spark.udf.register('jieba_words_robot', jieba_words_robot, ArrayType(StringType()))

<function __main__.jieba_words_robot(text, role)>

In [8]:
df2 = spark.sql('''
select 
    jieba_words_robot(text, 0) as robot_user_jieba,
    jieba_words_robot(text, 1) as robot_agent_jieba
from 
    dmr_dev.bxy_overdue_table_with_call_text
Where
    call_type = 'robot'
limit 100''')

In [9]:
df2

DataFrame[robot_user_jieba: array<string>, robot_agent_jieba: array<string>]

In [10]:
df2.show(10)

+--------------------+--------------------+
|    robot_user_jieba|   robot_agent_jieba|
+--------------------+--------------------+
|    [对, 嗯, 我, 知道, 了]|[请求, 文本, 您好, 我, 是...|
|                  []|[请求, 文本, 您好, 我, 是...|
|[对, 对, 对, 对, 对, 这...|[请求, 文本, 您好, 我, 是...|
|    [对, 啊, 哦, 好好, 好]|[请求, 文本, 您好, 我, 是...|
|[那样, 的, 问题, 嗯, 嗯,...|[请求, 文本, 您好, 我, 是...|
|                  []|[请求, 文本, 您好, 我, 是...|
|        [嗯, 是, 啊, 哦]|[请求, 文本, 您好, 我, 是...|
|         [客服, 你好, 喂]|[请求, 文本, 您好, 我, 是...|
|[嗯, 对, 哦, 好, 行, 我...|[请求, 文本, 您好, 我, 是...|
|[对, 呀, 没有, 嗯, 好, ...|[请求, 文本, 您好, 我, 是...|
+--------------------+--------------------+
only showing top 10 rows



In [11]:
##-----------

In [12]:
df = spark.sql('''
select 
    *,
    Case When call_type = 'human' then jieba_words_human(text, 0)
         When call_type = 'robot' then jieba_words_robot(text, 0) End as user_jieba_text,
    Case When call_type = 'human' then jieba_words_human(text, 1)
         When call_type = 'robot' then jieba_words_robot(text, 1) End as agent_jieba_text
from 
    dmr_dev.bxy_overdue_table_with_call_text
''')

In [13]:
df.show(50)

+-----------------+-------+---------+------------+------------------+------------------+-------------------+--------+----------+--------------------+----------+--------------------+--------------------+
|              pin|product|call_type|overdue_days|       overdue_amt|           cur_bal|          urge_time|duration|call_round|                text|        dt|     user_jieba_text|    agent_jieba_text|
+-----------------+-------+---------+------------+------------------+------------------+-------------------+--------+----------+--------------------+----------+--------------------+--------------------+
| jd_5c120056f89fb|   京东白条|    robot|           1|            184.59|            184.69|2020-03-25 12:15:44|    32.0|         4|[{"role": "TTS_CO...|2020-03-25|[哎, 你好, 哪位, 嗯, 有,...|[请求, 文本, 您好, 我, 是...|
|        laiweijin|   京东白条|    robot|           1|           1666.34|           4999.84|2020-04-07 13:11:57|    23.0|         3|[{"role": "TTS_CO...|2020-04-07|[哎, 你好, 嗯, 好, 好, ...|[请求, 文本

In [14]:
spark.sql('drop table if exists dmr_dev.bxy_overdue_table_with_call_text_jieba')
spark.sql('''
Create Table dmr_dev.bxy_overdue_table_with_call_text_jieba as
select 
    *,
    Case When call_type = 'human' then jieba_words_human(text, 0)
         When call_type = 'robot' then jieba_words_robot(text, 0) End as user_jieba_text,
    Case When call_type = 'human' then jieba_words_human(text, 1)
         When call_type = 'robot' then jieba_words_robot(text, 1) End as agent_jieba_text
from 
    dmr_dev.bxy_overdue_table_with_call_text''')

DataFrame[]

In [15]:
sc.applicationId

'application_1597575444192_2325529'