## Parse into beliefs

Additional detail on the development of the belief parser may be found in [ICWSM_revisions2a.ipynb]

In [3]:
import re
import allTokens
from allTokens import abbr_dict, emoji_pattern

import spacy
from spacy_langdetect import LanguageDetector
from spacy.language import Language

import pandas as pd
import numpy as np
from belief_extraction_spacy import add_to_pipe
from concurrent.futures import ProcessPoolExecutor

In [4]:
!pip install spacy==3.5.4
!python -m spacy download en_core_web_lg

[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_lg')


In [5]:
print(spacy.__version__)

3.5.4


In [6]:
# Load SpaCy model and add plugin to pipeline
%run "belief_extraction_spacy.py"
nlp = spacy.load('en_core_web_lg')
# nlp = spacy.load('en_core_web_lg', disable=["ner", "lemmatizer"])  # Disable unnecessary components
add_to_pipe(nlp)

<spacy.lang.en.English at 0x7f741daf34f0>

In [7]:
def preprocess_texts(texts):
    """
    Process texts with spaCy and extract relevant information for beliefs.
    Returns a list of dictionaries with 'text' and 'beliefs'.
    """
    processed_data = []
    for doc in nlp.pipe(texts, batch_size=50):
        for sent in doc.sents:
            if hasattr(sent._, 'beliefs') and sent._.beliefs:
                beliefs_info = [{'subject': b.subject, 'sentence': str(s)} for b in sent._.beliefs for s in doc.sents]
                processed_data.append({'text': doc.text, 'beliefs': beliefs_info})
    return processed_data



# def process_text_and_extract_beliefs(text):
#     # Process the text with spaCy to create a Doc object
#     doc = nlp(text)
#     beliefs = []
#     for sent in doc.sents:
#         if hasattr(sent._, 'beliefs') and sent._.beliefs:
#             beliefs.extend([(b.subject, str(s)) for b in sent._.beliefs for s in doc.sents])
#     return beliefs


# def extract_beliefs(texts):
#     beliefs = []
#     for doc in nlp.pipe(texts, batch_size=50):  # Use batch processing
#         for sent in doc.sents:
#             if sent._.beliefs:
#                 beliefs.extend([(b.subject, str(s)) for b in sent._.beliefs for s in doc.sents])
#     return belief

In [8]:
def further_processing(data):
    # Placeholder for further processing, returns data as-is
    return data

# def worker_func(df_slice, column):
#     # Apply the processing and extraction function to each text in the DataFrame slice
#     df_slice[column] = df_slice[column].apply(process_text_and_extract_beliefs)
#     return df_slice

In [9]:
def parallel_process(data, num_cores):
    with ProcessPoolExecutor(max_workers=num_cores) as executor:
        result = list(executor.map(further_processing, data))
    return result

# def parallel_apply(df, column, num_partitions, num_cores):
#     df_split = np.array_split(df, num_partitions)
#     with ProcessPoolExecutor(max_workers=num_cores) as executor:
#         # Map the worker function across DataFrame slices
#         results = list(executor.map(worker_func, df_split, [column]*len(df_split)))
#     return pd.concat(results)

# def parallel_apply(df, func, column, num_partitions, num_cores):
#     df_split = np.array_split(df, num_partitions)
#     pool = ProcessPoolExecutor(num_cores)
#     # Use a wrapper function instead of a lambda for multiprocessing
#     results = pool.map(apply_extract_beliefs_to_df, df_split, [func]*len(df_split), [column]*len(df_split))
#     df = pd.concat(list(results))
#     pool.shutdown()
#     return df

In [10]:
# num_partitions = 10  # Number of partitions to split dataframe
# num_cores = 4       # Number of cores on your machine

In [11]:
L_DATA = "../../../data/politics_cleaned"
file_path = f"{L_DATA}/total_cleaned_tweets.feather"
deduped = pd.read_feather(file_path)

In [12]:
subset_texts = deduped['cleanedText'].head(1000).tolist()
processed_data = preprocess_texts(subset_texts)

In [None]:
num_cores = 4  # Adjust based on your machine
parallel_results = parallel_process(processed_data, num_cores)

In [31]:
# deduped['beliefs'] = parallel_apply(deduped, 'cleanedText', num_partitions, num_cores)
# deduped['beliefs'] = parallel_apply(deduped, extract_beliefs, 'cleanedText', num_partitions, num_cores)

NotImplementedError: [E112] Pickling a span is not supported, because spans are only views of the parent Doc and can't exist on their own. A pickled span would always have to include its Doc and Vocab, which has practically no advantage over pickling the parent Doc directly. So instead of pickling the span, pickle the Doc it belongs to or use Span.as_doc to convert the span to a standalone Doc object.

In [None]:
# Data wrangling
labeled_beliefs = deduped.explode('beliefs').dropna(subset=['beliefs'])
labeled_beliefs[['subject', 'sentence']] = pd.DataFrame(labeled_beliefs['beliefs'].tolist(), index=labeled_beliefs.index)
labeled_beliefs = labeled_beliefs.reset_index(drop=True)
labeled_beliefs['sentence_idx'] = labeled_beliefs.groupby('tweet_hash').cumcount()

# Save and load data
labeled_beliefs.to_feather(f'{L_DATA}/english_cleaned_deduped_labeled_beliefs.feather')
d = pd.read_feather(f'{G_DATA}/english_cleaned_deduped_labeled_beliefs.feather')
print(d.subject.value_counts())