# 使用Perspective API分析文本毒性

国外服务器或开代理

In [1]:
import json
import re
import pandas as pd
from joblib import load, dump
import warnings
warnings.filterwarnings('ignore')

# 输出DataFrame时显示所有的列
pd.set_option('display.max_columns', None)
# 输出DataFrame时每行显示完整的内容
pd.set_option('display.max_colwidth', None)

## 分析

In [2]:
# 取用户推文数据
user_texts = pd.read_csv("data/user_texts[restrict=3media+retweet][topic=POTUS2020].csv")
user_texts.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 385956 entries, 0 to 385955
Data columns (total 2 columns):
 #   Column           Non-Null Count   Dtype 
---  ------           --------------   ----- 
 0   author.username  385956 non-null  object
 1   text             385955 non-null  object
dtypes: object(2)
memory usage: 5.9+ MB


In [3]:
# 访问API的函数体
from googleapiclient import discovery

def perspective_analyze(text):

    API_KEY = 'AIzaSyAPlZbUYD2pYxmz_CsDPUe5vNIQFiGlxS0'

    client = discovery.build(
        "commentanalyzer",
        "v1alpha1",
        developerKey=API_KEY,
        discoveryServiceUrl="https://commentanalyzer.googleapis.com/$discovery/rest?version=v1alpha1",
        static_discovery=False,
    )

    analyze_request = {
        'comment': {'text': text},
        'requestedAttributes': {
            'TOXICITY': {}
        }
    }

    return client.comments().analyze(body=analyze_request).execute()

### 并行处理

In [4]:
df = user_texts

In [5]:
from concurrent.futures import ThreadPoolExecutor

def analyze_text(x):
    try:
        return perspective_analyze(x)
    except:
        return None

def parallel_analyze(data, func, n_workers=4):
    with ThreadPoolExecutor(max_workers=n_workers) as executor:
        results = list(executor.map(func, data))
    return results

# 使用方法
# results = parallel_analyse(list(df_url.iloc[i:i+chunk, :]['url']), analyse, n_workers=4)

In [None]:
# 开始转换短URL
begin = 0   # 开始的位置
chunk = 500   # 分批处理，每批的数量
chunk_10 = chunk * 10   # 每10批输出一次进度
df['perspective_api_results'] = ''
result_file = "data/perspective_api_results[restrict=3media+retweet][topic=POTUS2020].csv"

for i in range(begin, df.shape[0], chunk): # df.shape[0]
    tmp = parallel_analyze(list(df.loc[i:i+chunk, :]['text']), analyze_text, n_workers=4)   # 线程数量
    df.loc[i:i+chunk, ['perspective_api_results']] = tmp
    df.to_csv(result_file, index=False)
    print(i, end=' ')
    if (i % chunk_10) == 0:
        print('\n', end='')

df.to_csv(result_file, index=False)

### 串行处理（一般用不到）

In [None]:
# 处理一半
df = user_texts
df.loc[:, ['author.username','perspective_api_results']]

In [None]:
# 使用API分析
chunk_size = 100    # 每处理100个数据保存一次文件
results_file = "data/1-user_texts_with_perspective_api_results[restrict=retweet+3media][topic=POTUS2020].csv"
df['perspective_api_results'] = ''
error_count = 0
for i in df.index:
    try:
        df['perspective_api_results'][i] = perspective_analyze(df['text'][i])
    except:
        error_count += 1
    if i != 0 and i % chunk_size == 0:
        print(f"number={i}, error_count={error_count}")
        df.to_csv(results_file, index=False)

# 保存文件
df.to_csv(results_file, index=False)

## 处理分析结果（用不到）

In [7]:
res = pd.read_csv(result_file)
res.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 87293 entries, 0 to 87292
Data columns (total 3 columns):
 #   Column                   Non-Null Count  Dtype 
---  ------                   --------------  ----- 
 0   author.username          87293 non-null  object
 1   text                     87293 non-null  object
 2   perspective_api_results  87285 non-null  object
dtypes: object(3)
memory usage: 2.0+ MB


In [None]:
res.sample(8).loc[:, ['perspective_api_results']]

In [5]:
# 将JSON展开的函数
def json_to_df(x):
    # JSON变成DataFrame
    # x为待处理的JSON字符串
    x = re.sub(pattern="'", repl='"', string=x) # 把单引号换成双引号，因为单引号在JSON中没有意义
    dff = pd.json_normalize(json.loads(x))
    # 消除列表，把列表元素拿出来
    dff.loc[0, ['languages']] = dff['languages'][0]
    dff.loc[0, ['detectedLanguages']] = dff['detectedLanguages'][0]
    dff.loc[0, ['attributeScores.TOXICITY.spanScores']] = dff['attributeScores.TOXICITY.spanScores'][0]
    # 处理JSON的嵌套结构
    dff_normalized = pd.json_normalize(dff['attributeScores.TOXICITY.spanScores'])
    dff_normalized
    # 嵌套结构处理结果和原DataFrame合并
    dff = pd.concat([dff, dff_normalized], axis=1).drop(['attributeScores.TOXICITY.spanScores'], axis=1)
    # 重命名列
    dff.columns = ['languages','detectedLanguages',
                    'TOXICITY.summaryScore.value','TOXICITY.summaryScore.type',
                    'TOXICITY.spanScore.begin','TOXICITY.spanScore.end',
                    'TOXICITY.spanScore.value','TOXICITY.spanScore.type']
    return dff