In [1]:
import re
from googleapiclient import discovery
import time
import json
from tqdm.notebook import tqdm
import multiprocessing as mp
import pickle
import os
import nltk
import gzip
import hashlib
import pandas as pd
from datetime import date
import unicodedata
from collections import defaultdict
import numpy as np
from collections import Counter
import os
from datetime import date

# Download C4-en dataset (305GB)

Only run the following command to download if you have sufficient storage for the C4-en dataset, which is 305GB.\
If not, we provide a dataset of the extracted descriptor sentences in C4-en which is 11GB in '/supporting_datasets/c4_sentences_w_descriptors'.

In [None]:
!GIT_LFS_SKIP_SMUDGE=1 git clone https://huggingface.co/datasets/allenai/c4
!cd c4
!git lfs pull --include "en/*"

# Download BeanCounter dataset (train + val = 170GB)

Only run the following command to download if you have sufficient storage\
-- train split: 142 GB \
-- val split: 25MB \
-- sample split: 1.5GB \
-- deduped split: 60GB \
-- fraud split: 411MB \
You can choose to download a specific split by using --include "split_name/*"

In [None]:
!GIT_LFS_SKIP_SMUDGE=1 git clone https://huggingface.co/datasets/bradfordlevy/BeanCounter
!cd BeanCounter
!git lfs pull --include "sample/*"

# Download extracted descriptor sentences dataset

If you want to explore the extracted sentences from BeanCounter and C4 with demographic descriptors, you can download the dataset with the commands below \
-- default split \
-- bc-clean split \
-- c4-en split \
-- sample split \
You can choose to download a specific split by using --include "split_name/*"

In [None]:
!GIT_LFS_SKIP_SMUDGE=1 git clone https://huggingface.co/datasets/bradfordlevy/BeanCounter-Descriptor-Sents
!cd BeanCounter-Descriptor_sents
!git lfs pull --include "sample/*"

# Set up necessary directories & Perspective API key

Perspective API Key is necessary if want to process toxicity of texts

In [None]:
SUPPORTING_DATA_DIR = '/supporting_datasets'
BC_DATASET_DIR = 'beancounter'
TRAIN_SPLIT = 'train'
VAL_SPLIT = 'validation'
C4_DIR = 'c4/en'
BC_EXTRACTED_SENTENCES_DIR = 'beancounter_extracted_descriptor_sentences' # change directory name to where sentences are stored
C4_EXTRACTED_SENTENCES_DIR = 'c4_sentences_w_descriptors' # change directory name to where sentences are stored
BC_SENTENCE_BATCHES_DIR = 'beancounter_sentences_batches'
C4_SENTENCE_BATCHES_DIR = 'c4_sentence_batches'
C4_PERSPECTIVE_SENTENCE_BATCHES_DIR = 'c4_perspective_sentence_batches'
DESCRIPTOR_TO_COUNT_CSV = 'beancounter_descriptor_count.csv'
BC_PERSPECTIVE_SCORES = 'beancounter_perspective_scores'
C4_PERSPECTIVE_SCORES = 'c4_perspective_scores'
current_date = str(date.today()) # for naming output directories

In [13]:
PERSPECTIVE_API_KEY = '' # Perspective API key; set up Perspective API here: https://developers.perspectiveapi.com/s/docs-get-started?language=en_US
PERSPECTIVE_API_QPS = 20 # default is 1 query per second (QPS), you can increase the QPS by submitting a request via: https://developers.perspectiveapi.com/s/request-quota-increase?language=en_US

# Load all demographic descriptors

In [34]:
with open('demographic_descriptors.json', 'r') as f:
    tox_analysis_descriptors = json.load(f)

# Load broad categories associated with each descriptor

In [35]:
with open('descriptors_to_categories.json', 'r') as f:
    desc2category = json.load(f)

# Create dictionary mapping a variant descriptor to the descriptor's original form

In [36]:
variation2original = {}
for desc in tox_analysis_descriptors:
    if '-' in desc:
        variation2original[desc.replace('-', ' ')] = desc
    elif ' ' in desc:
        variation2original[desc.replace(' ', '-')] = desc
    variation2original[desc] = desc

# Utils for extracting sentences with descriptors

In [40]:
nltk.data.find("tokenizers/punkt")
sentence_tokenizer = nltk.data.load("nltk:tokenizers/punkt/english.pickle")

In [41]:
def get_sentences_w_descriptors(cleaned_text, sentence_tokenizer=sentence_tokenizer, tox_analysis_descriptors = tox_analysis_descriptors):
    tox_desc = '|'.join(tox_analysis_descriptors)
    re_desc = re.compile("""(?=(?:([^\w\s]|\s)(""" + tox_desc +""")([^\w\s]|\s)))""")
    
    lines = cleaned_text.splitlines()
    sentences = [sentence_tokenizer.tokenize(line) for line in lines]
    sentences = [c for b in sentences for c in b]
    sentences_tbp = defaultdict(set)
    for sent in sentences:
        matches = re.findall(re_desc, ' ' + sent)
        for m in matches:
            sentences_tbp[m[1]].add(sent)

    return sentences_tbp

# Extracting sentences containing descriptors from BeanCounter

In [126]:
# get all files containing the BeanCounter dataset
files_to_process = []
for path, dirs, fns in os.walk(os.path.join(SUPPORTING_DATA_DIR, BC_DATASET_DIR, TRAIN_SPLIT)):
    for fn in fns:
        if os.path.splitext(fn)[-1] == '.gz':
            files_to_process.append(os.path.join(path, fn))

for path, dirs, fns in os.walk(os.path.join(SUPPORTING_DATA_DIR, BC_DATASET_DIR, VAL_SPLIT)):
    for fn in fns:
        if os.path.splitext(fn)[-1] == '.gz':
            files_to_process.append(os.path.join(path, fn))

In [129]:
# define output directory for extracted sentences
out_dir = os.path.join(SUPPORTING_DATA_DIR, f"{BC_EXTRACTED_SENTENCES_DIR}_{current_date}")
os.makedirs(out_dir, exist_ok = True)

In [136]:
def work(path, out_dir = out_dir):
    force = False
    path_out = os.path.join(out_dir, path.split('/')[-2])
    fn = path.rsplit('/', 1)[-1]
    
    if os.path.exists(os.path.join(path_out, fn)) and not force:
        return False

    os.makedirs(path_out, exist_ok = True)
    
    # write .lock file; files that did not finish processing will have .lock extension.
    lock_file = os.path.join(path_out, fn.rsplit('.', 1)[0])+'.lock'
    open(lock_file, 'w').close()
    
    with gzip.open(path, 'r') as filings:
        all_filings = []
        for line in filings.readlines():
            info = json.loads(line)
            desc2sentences = get_sentences_w_descriptors(info['text'])

            for descriptor, sentences in desc2sentences.items():
                for sent in sentences:
                    desc_dict = {}
                    desc_dict['accession'] = info['accession']
                    desc_dict['filename'] = info['filename']
                    desc_dict['type_filing'] = info['type_filing']
                    desc_dict['type_attachment'] = info['type_attachment']
                    desc_dict['form_type'] = info['form_type']
                    desc_dict['date'] = info['ts_accept'].split('T')[0]
                    desc_dict['descriptor'] = descriptor
                    desc_dict['sentence'] = sent
                    all_filings.append(desc_dict)
        
        with gzip.open(os.path.join(path_out, fn), 'w') as out_file:
            for filing in all_filings:
                out_file.write((json.dumps(filing) + '\n').encode())

    # remove .lock file
    os.remove(lock_file)
    return True

In [138]:
with mp.Pool(16) as p: # set workers as 8 or 16
    results = [r for r in tqdm(p.imap_unordered(work, files_to_process), total=len(files_to_process), miniters=1)]

  0%|          | 0/220 [00:00<?, ?it/s]

# Extracting sentences containing descriptors from C4-en

In [134]:
files_to_process = []
for path, dirs, fns in os.walk(os.path.join(C4_DIR)):
    for fn in fns:
        if os.path.splitext(fn)[-1] == '.gz':
            files_to_process.append(os.path.join(path, fn))

In [108]:
out_dir = os.path.join(SUPPORTING_DATA_DIR, f"{C4_EXTRACTED_SENTENCES_DIR}_{current_date}")
os.makedirs(out_dir, exist_ok = True)

In [146]:
def work(path, out_dir = out_dir):
    force = False
    fn = path.rsplit('/', 1)[1]
    
    if os.path.exists(os.path.join(out_dir, fn)) and not force:
        return False
    
    # write .lock file; files that did not finish processing will have .lock extension.
    lock_file = os.path.join(out_dir, fn.rsplit('.', 1)[0])+'.lock'
    open(lock_file, 'w').close()
    
    with gzip.open(path, 'r') as entries:
        all_entries = []
        for entry in entries:
            info = json.loads(entry)
            
            desc2sentences = get_sentences_w_descriptors(info['text'])

            for descriptor, sentences in desc2sentences.items():
                for sent in sentences:
                    desc_dict = {}
                    desc_dict['url'] = info['url']
                    desc_dict['timestamp'] = info['timestamp']
                    desc_dict['descriptor'] = descriptor
                    desc_dict['sentence'] = sent
                    all_entries.append(desc_dict)
        
        with gzip.open(os.path.join(out_dir, fn), 'w') as out_file:
            for entry in all_entries:
                out_file.write((json.dumps(entry) + '\n').encode())

    # remove .lock file.
    os.remove(lock_file)

    return True

In [None]:
with mp.Pool(16) as p: 
    results = [r for r in tqdm(p.imap_unordered(work, files_to_process), total=len(files_to_process), miniters=1)]

  0%|          | 0/1032 [00:00<?, ?it/s]

# Separate BeanCounter sentences into batches

In [140]:
observations_path = os.path.join(SUPPORTING_DATA_DIR, BC_EXTRACTED_SENTENCES_DIR)
def get_entries_generator(root_directory):
    for root, dirs, files in os.walk(root_directory):
        for file_name in files:
            file_path = os.path.join(root, file_name)
            if os.path.splitext(file_path)[-1] == '.gz':
                with gzip.open(file_path, 'r') as filings:
                    for filing in filings.readlines():
                        yield json.loads(filing)

In [141]:
all_filings_generator = get_entries_generator(observations_path)

In [150]:
# directory for batched sentences
out_dir = os.path.join(SUPPORTING_DATA_DIR, f"{BC_SENTENCE_BATCHES_DIR}_{current_date}")
os.makedirs(out_dir, exist_ok = True)

In [149]:
# create batches of 10_000 sentences. Remove duplicate sentences.
seen = set()
batch_size = 10_000
batch = []
batch_num = 0

for filing in tqdm(all_filings_generator):
    accession = filing['accession']
    filename = filing['filename']
    form_type = filing['form_type']
    attachment_type = filing['type_attachment']
    type_filing = filing['type_filing']
    sent = filing['sentence']
    desc = filing['descriptor']
    accepted_date = filing['date']
    text_hash = hashlib.md5(sent.encode()).hexdigest()
    if text_hash in seen:
        continue
    seen.add(text_hash)
    if len(batch) < batch_size:
        batch.append({'accession': accession, 'filename': filename, 'descriptor': desc, 'sentence': sent, \
                      'text_hash': text_hash, 'date': accepted_date, 'form_type': form_type, 'type_attachment': attachment_type, 'type_filing': type_filing})
    else:
        with gzip.open(os.path.join(out_dir, 'batch_%s.gz' % batch_num), 'w') as out_file:
            for filing in batch:
                out_file.write((json.dumps(filing) + '\n').encode())
        batch_num += 1
        batch = [{'accession': accession, 'filename': filename, 'descriptor': desc, 'sentence': sent, \
                      'text_hash': text_hash, 'date': accepted_date, 'form_type': form_type, 'type_attachment': attachment_type, 'type_filing': type_filing}]

with gzip.open(os.path.join(out_dir, 'batch_%s.gz' % batch_num), 'w') as out_file:
    for filing in batch:
        out_file.write((json.dumps(filing) + '\n').encode())

# Perspective analysis of BeanCounter sentences

Running this analysis on a QPS of 20 with 19 workers in multiprocessing took ~5 days. 

In [5]:
def get_batches(root_directory):
    all_batch_paths = []
    for root, dirs, files in os.walk(root_directory):
        for file_name in files:
            file_path = os.path.join(root, file_name)
            if os.path.splitext(file_path)[-1] == '.gz':
                all_batch_paths.append(file_path)
    return all_batch_paths

In [6]:
batch_paths = get_batches(os.path.join(SUPPORTING_DATA_DIR, f'{BC_SENTENCE_BATCHES_DIR}_{current_date}'))

In [7]:
def chunking(cleaned_text, max_char_length):
    if len(cleaned_text) <= max_char_length:
        return [cleaned_text]
    lines = iter(cleaned_text.splitlines())
    chunks, current = [], next(lines)
    for l in lines:
        if len(current) + len(l) > max_char_length:
            if len(current) > max_char_length:
                current_chunks = [current[i:i + max_char_length] for i in range(0, len(current), max_char_length)]
                chunks+= current_chunks
                current = l
                continue
            chunks.append(current)
            current = l
        else:
            current += '\n' + l

    # chunk the last line
    if len(current) < max_char_length:
        chunks.append(current)
    else:
        chunks += [current[i:i + max_char_length] for i in range(0, len(current), max_char_length)]
        
    return chunks

In [8]:
max_char_length = 18_000 # perspective api takes at most 20KB
def get_perspective_toxicity(text, client, max_char_length = max_char_length):
    """
    parameters: text (str), client (perspective api client)
    returns: dictionary mapping the chunk to associated toxicity score
    """
    
    chunks = chunking(text, max_char_length)
    chunks2score = {}
    
    for c in chunks:
        if c.strip():
            start = time.time()
            analyze_request = {
                'comment': {'text': c.strip()},
                'languages': ['en'],
                'requestedAttributes': {'TOXICITY': {}}
            }
            response = client.comments().analyze(body=analyze_request).execute()
            end = time.time()
            sleep_time = 1-(end-start)
            time.sleep(max(sleep_time, 0))
            chunks2score[c.strip()] = response['attributeScores']['TOXICITY']['summaryScore']['value']
    return chunks2score

In [9]:
def work(sentence):
    # build client here
    client = discovery.build(
        "commentanalyzer",
        "v1alpha1",
        developerKey=PERSPECTIVE_API_KEY,
        discoveryServiceUrl="https://commentanalyzer.googleapis.com/$discovery/rest?version=v1alpha1",
        static_discovery=False,
        )

    sentence_hash = hashlib.md5(sentence.encode()).hexdigest()
    tox_score = list(get_perspective_toxicity(sentence, client).values())[0]
    return {sentence_hash:tox_score}

In [10]:
out_dir = os.path.join(SUPPORTING_DATA_DIR, f'{BC_PERSPECTIVE_SCORES}_{current_date}')
os.makedirs(out_dir, exist_ok = True)

In [11]:
def process_toxicity_scores(batch_paths, out_dir = out_dir):
    for batch in tqdm(batch_paths):
        res_path = os.path.join(out_dir, batch.rsplit('/', 1)[1].split('.')[0] + '.pkl')

        if os.path.exists(res_path):
            with open(res_path, 'rb') as res_file:
                res_dict = pickle.load(res_file)
        else:
            res_dict = dict()
    
        all_sent2hash= dict()

        with gzip.open(batch, 'r') as batch_n:
            for sent_info in batch_n.readlines():
                info = json.loads(sent_info)
                sent = info['sentence']
                if len(sent.encode("utf8")) > 20_000:
                    continue
                sent_hash = info['text_hash']
                all_sent2hash[sent_hash] = sent
    
        to_process = [sent for hash, sent in all_sent2hash.items() if hash not in res_dict]
        if len(to_process) == 0:
            continue

        n_workers = PERSPECTIVE_API_QPS-1
        try:
            with mp.Pool(n_workers) as p:
                results = [r for r in tqdm(p.imap_unordered(work, to_process), total=len(to_process), miniters=1)]
        except:
            n_workers = n_workers//2
            with mp.Pool(n_workers) as p:
                results = [r for r in tqdm(p.imap_unordered(work, to_process), total=len(to_process), miniters=1)]
       
        for r in results:
            if isinstance(r, dict):
                res_dict.update(r)
    
        with open(res_path, 'wb') as out_file:
            pickle.dump(res_dict, out_file)

In [16]:
process_toxicity_scores(batch_paths)

  0%|          | 0/817 [00:00<?, ?it/s]

# Get descriptors and its count from BeanCounter in order to sample c4-en

In [18]:
# Get scores from BeanCounter's perspective analysis
scores_tbp = []
for path, dirs, fns in os.walk(os.path.join(SUPPORTING_DATA_DIR, f'{BC_PERSPECTIVE_SCORES}_{current_date}')): # change to correct path if necessary
    for fn in fns:
        if os.path.splitext(fn)[-1] == '.pkl':
            scores_tbp.append(os.path.join(path, fn))

list_score_dfs = []
for path in tqdm(scores_tbp):
    with open(path, 'rb') as out_file:
        score_df = pd.DataFrame(pickle.load(out_file).items()).rename(columns={0: 'text_hash', 1: 'toxicity_score'})
        list_score_dfs.append(score_df)

all_scores_updated = pd.concat(list_score_dfs)

  0%|          | 0/817 [00:00<?, ?it/s]

In [21]:
# Get all extracted sentences from BeanCounter
files_to_process = []
for path, dirs, fns in os.walk(os.path.join(SUPPORTING_DATA_DIR, f'{BC_SENTENCE_BATCHES_DIR}_{current_date}')): # change to correct path if necessary
    for fn in fns:
        if os.path.splitext(fn)[-1] == '.gz':
            files_to_process.append(os.path.join(path, fn))

list_dfs = []
for path in tqdm(files_to_process):
    with gzip.open(path, 'r') as out_file:
        sents_df = pd.read_json(out_file, lines=True, orient='records')
        list_dfs.append(sents_df)

all_sents_df = pd.concat(list_dfs)

  0%|          | 0/817 [00:00<?, ?it/s]

In [23]:
all_sents_df['desc_len'] = all_sents_df['descriptor'].str.len()
all_sents_df = all_sents_df.sort_values(by='desc_len', ascending = False).groupby('text_hash').head(1) # if a sentence contains multiple descriptors, only keep the row with the longest descriptor

In [24]:
sent2score = all_scores_updated.merge(
    all_sents_df,
    on = 'text_hash',
    how = 'left'
)

## Descriptor mapped to number of sentences

In [28]:
# limit analysis to years 1996-2023 
sent2score['year'] = sent2score['date'].apply(lambda x:int(str(x).split('-')[0]))
sent2score = sent2score.loc[(sent2score['year'] >= 1996) & (sent2score['year'] <= 2023)]
sent2score = sent2score.dropna()

In [37]:
desc2count_sample = pd.DataFrame(sent2score.groupby('descriptor')['text_hash'].count())
desc2count_sample = desc2count_sample.reset_index().rename(columns = {'text_hash':'num_unique_sentences'})
desc2count_sample = desc2count_sample.sort_values(by='num_unique_sentences', ascending = False)
desc2count_sample['original_descriptor'] = desc2count_sample['descriptor'].apply(lambda x : variation2original[x] if x in variation2original.keys() else x)

In [42]:
BC_DESCRIPTOR_COUNT_CSV = f'beancounter_descriptor_count_{current_date}.csv'

In [43]:
desc2count_sample.to_csv(os.path.join(SUPPORTING_DATA_DIR, BC_DESCRIPTOR_COUNT_CSV))

# Batch C4 sentences (in preparation for sampling)

In [2]:
obs_path = os.path.join(SUPPORTING_DATA_DIR, f'{C4_EXTRACTED_SENTENCES_DIR}_{current_date}')
def get_entries_generator(root_directory):
    for root, dirs, files in os.walk(root_directory):
        for file_name in files:
            file_path = os.path.join(root, file_name)
            if os.path.splitext(file_path)[-1] == '.gz':
                with gzip.open(file_path, 'r') as filings:
                    for filing in filings.readlines():
                        yield json.loads(filing)

In [20]:
all_entries = get_entries_generator(obs_path)

In [21]:
out_dir = os.path.join(SUPPORTING_DATA_DIR, f'{C4_SENTENCE_BATCHES_DIR}_{current_date}') 
os.makedirs(out_dir, exist_ok = True)

In [22]:
seen = set()
batch_size = 10_000
batch = []
batch_num = 0
count = 0

for filing in tqdm(all_entries):
    entry_url = filing['url']
    entry_time = filing['timestamp']
    if 'sentences_w_descriptors' in filing:
        for desc, sent_list in filing['sentences_w_descriptors'].items():
            for sent in sent_list:
                text_hash = hashlib.md5(sent.encode()).hexdigest()
                if text_hash in seen:
                    continue
                seen.add(text_hash)
                if len(batch) < batch_size:
                    batch.append({'url': entry_url, 'timestamp': entry_time, 'descriptor': desc, 'sentence': sent, 'text_hash': text_hash})
                else:
                    with gzip.open(os.path.join(out_dir, 'batch_%s.gz' % batch_num), 'w') as out_file:
                        for filing in batch:
                            out_file.write((json.dumps(filing) + '\n').encode())
                    batch_num += 1
                    batch = [{'url': entry_url, 'timestamp': entry_time, 'descriptor': desc, 'sentence': sent, 'text_hash': text_hash}]


with gzip.open(os.path.join(out_dir, 'batch_%s.gz' % batch_num), 'w') as out_file:
    for filing in batch:
        out_file.write((json.dumps(filing) + '\n').encode())

0it [00:00, ?it/s]

# Sample C4 according to desc2count

In [2]:
desc2count_sample = pd.read_csv(os.path.join(SUPPORTING_DATA_DIR, DESCRIPTOR_TO_COUNT_CSV))
desc2count_sample = desc2count_sample.drop(['Unnamed: 0'], axis = 1)

In [None]:
C4_SENTENCE_BATCHES_DIR = f"{C4_SENTENCE_BATCHES_DIR}_{current_date}"

In [45]:
fns_tbp = []

for path, dirs, fns in os.walk(os.path.join(SUPPORTING_DATA_DIR, C4_SENTENCE_BATCHES_DIR)):
    for fn in fns:
        if os.path.splitext(fn)[-1] == '.gz':
            fns_tbp.append(os.path.join(path, fn))

In [46]:
list_dfs = []
for path in tqdm(fns_tbp):
    with gzip.open(path, 'r') as out_file:
        sents_df = pd.read_json(out_file, lines=True, orient='records')
        list_dfs.append(sents_df)

c4_sents_df = pd.concat(list_dfs)

  0%|          | 0/13688 [00:00<?, ?it/s]

In [49]:
# c4_sents_df['text_hash'] = c4_sents_df['sentence'].apply(lambda x : hashlib.md5(x.encode()).hexdigest())
c4_sents_df['desc_len'] = c4_sents_df['descriptor'].str.len()
c4_sents_df = c4_sents_df.sort_values(by='desc_len', ascending = False).groupby('text_hash').head(1) # assign sentence to longest descriptor

In [53]:
all_desc = list(desc2count_sample['descriptor'])
all_desc = sorted(all_desc, key=len)

In [55]:
all_samples = []
# increment random state
rand_state =1 
for d in tqdm(all_desc):
    n_samples = desc2count_sample[desc2count_sample['descriptor'] == d]['num_unique_sentences'].item()
    grouped = c4_sents_df.groupby('descriptor').get_group(d)
 
    len_grouped = grouped.shape[0]
    if len_grouped >= n_samples:
        all_samples.append(grouped.sample(n=n_samples, random_state=rand_state))
    else:
        all_samples.append(grouped.sample(n=n_samples, replace=True, random_state=rand_state))
    rand_state += 1

  0%|          | 0/142 [00:00<?, ?it/s]

In [57]:
sampled_c4_df = pd.concat(all_samples).reset_index()
sampled_c4_df = sampled_c4_df.drop(columns = ['index'])

In [60]:
os.makedirs(os.path.join(SUPPORTING_DATA_DIR, f'{C4_PERSPECTIVE_SENTENCE_BATCHES_DIR}_{current_date}'), exist_ok = True)

In [61]:
nrows = 10_000
batch_groups = sampled_c4_df.groupby(sampled_c4_df.index // nrows)
batch_n = 0
for name, group in tqdm(batch_groups):
    group.to_json(path_or_buf = os.path.join(SUPPORTING_DATA_DIR, f'{C4_PERSPECTIVE_SENTENCE_BATCHES_DIR}_{current_date}', 'batch_%s.gz' % batch_n), orient='records', lines=True)
    batch_n += 1

  0%|          | 0/817 [00:00<?, ?it/s]

# C4 Perspective analysis

In [4]:
fns_tbp = []
for path, dirs, fns in os.walk(os.path.join(SUPPORTING_DATA_DIR, f'{C4_PERSPECTIVE_SENTENCE_BATCHES_DIR}_{current_date}')):
    for fn in fns:
        if os.path.splitext(fn)[-1] == '.gz':
            fns_tbp.append(os.path.join(path, fn))

In [6]:
def chunking(cleaned_text, max_char_length):
    if len(cleaned_text) <= max_char_length:
        return [cleaned_text]
    lines = iter(cleaned_text.splitlines())
    chunks, current = [], next(lines)
    for l in lines:
        if len(current) + len(l) > max_char_length:
            if len(current) > max_char_length:
                current_chunks = [current[i:i + max_char_length] for i in range(0, len(current), max_char_length)]
                chunks+= current_chunks
                current = l
                continue
            chunks.append(current)
            current = l
        else:
            current += '\n' + l

    # chunk the last line
    if len(current) < max_char_length:
        chunks.append(current)
    else:
        chunks += [current[i:i + max_char_length] for i in range(0, len(current), max_char_length)]
        
    return chunks

In [8]:
max_char_length = 18_000 # perspective api takes at most 20KB
def get_perspective_toxicity(text, client, max_char_length = max_char_length):
    """
    parameters: text (str), client (perspective api client)
    returns: dictionary mapping the chunk to associated toxicity score
    """
    
    chunks = chunking(text, max_char_length)
    chunks2score = {}
    
    for c in chunks:
        if c.strip():
            start = time.time()
            analyze_request = {
                'comment': {'text': c.strip()},
                'languages': ['en'],
                'requestedAttributes': {'TOXICITY': {}}
            }
            response = client.comments().analyze(body=analyze_request).execute()
            end = time.time()
            sleep_time = 1-(end-start)
            time.sleep(max(sleep_time, 0))
            chunks2score[c.strip()] = response['attributeScores']['TOXICITY']['summaryScore']['value']
    return chunks2score

In [9]:
def work(sentence):
    # build client here
    client = discovery.build(
        "commentanalyzer",
        "v1alpha1",
        developerKey=PERSPECTIVE_API_KEY,
        discoveryServiceUrl="https://commentanalyzer.googleapis.com/$discovery/rest?version=v1alpha1",
        static_discovery=False,
        )

    sentence_hash = hashlib.md5(sentence.encode()).hexdigest()
    tox_score = list(get_perspective_toxicity(sentence, client).values())[0]
    return {sentence_hash:tox_score}

In [10]:
out_dir = os.path.join(SUPPORTING_DATA_DIR, f'{C4_PERSPECTIVE_SCORES_DIR}_{current_date}')
os.makedirs(out_dir, exist_ok = True)

In [11]:
def process_toxicity_scores(batch_paths, out_dir = out_dir):
    for batch in tqdm(batch_paths):
        res_path = os.path.join(out_dir, batch.rsplit('/', 1)[1].split('.')[0] + '.pkl')

        if os.path.exists(res_path):
            with open(res_path, 'rb') as res_file:
                res_dict = pickle.load(res_file)
        else:
            res_dict = dict()
    
        all_sent2hash= dict()

        with gzip.open(batch, 'r') as batch_n:
            for sent_info in batch_n.readlines():
                info = json.loads(sent_info)
                sent = info['sentence']
                if len(sent.encode("utf8")) > 20_000:
                    continue
                sent_hash = info['text_hash']
                all_sent2hash[sent_hash] = sent
    
        to_process = [sent for hash, sent in all_sent2hash.items() if hash not in res_dict]
        if len(to_process) == 0:
            continue

        n_workers = PERSPECTIVE_API_QPS-1
        try:
            with mp.Pool(n_workers) as p:
                results = [r for r in tqdm(p.imap_unordered(work, to_process), total=len(to_process), miniters=1)]
        except:
            n_workers = n_workers//2
            with mp.Pool(n_workers) as p:
                results = [r for r in tqdm(p.imap_unordered(work, to_process), total=len(to_process), miniters=1)]
       
        for r in results:
            if isinstance(r, dict):
                res_dict.update(r)
    
        with open(res_path, 'wb') as out_file:
            pickle.dump(res_dict, out_file)

In [24]:
process_toxicity_scores(fns_tbp)

  0%|          | 0/817 [00:00<?, ?it/s]