In [1]:
from pathlib import Path 
from bs4 import BeautifulSoup
import json
import pandas as pd 
import re 
from lingua import Language, LanguageDetectorBuilder
import os

In [2]:
directory = "redata"
output_file = 'raw_merged_sep22_jun23.txt'
content_list = []

for filename in os.listdir(directory):
    if filename.endswith('.txt'):
        file_path = os.path.join(directory, filename)
        with open(file_path, 'r', encoding='utf-8') as file:
            content = file.read()
            content_list.append(content)

merged_content = '\n\n'.join(content_list)

with open(output_file, 'w', encoding='utf-8') as output:
    output.write(merged_content)

In [3]:
def read_file_as_string(file_path):
    text = Path(file_path).read_text(encoding='utf-8')
    return text

filepath = '/Users/emilyzou/Desktop/pol/raw_merged_sep22_jun23.txt'
text = read_file_as_string(filepath)

In [4]:
def return_json (input_string): 
    html = input_string
    soup = BeautifulSoup(html, 'html.parser')
    soup = BeautifulSoup(input_string, 'html.parser')
    posts_data = []
    for post_wrapper in soup.find_all("div", class_="post_wrapper"):
        post_data = {
            "post_data": {
                "author": post_wrapper.find("span", class_="post_author").text.strip() if post_wrapper.find("span", class_="post_author") else "",
                "tripcode": post_wrapper.find("span", class_="post_tripcode").text.strip() if post_wrapper.find("span", class_="post_tripcode") else "",
                "poster_hash": post_wrapper.find("span", class_="poster_hash").text.strip() if post_wrapper.find("span", class_="poster_hash") else "",
                "datetime": post_wrapper.find("time")["datetime"] if post_wrapper.find("time") else "",
                "time_text": post_wrapper.find("time").text.strip() if post_wrapper.find("time") else "",
                "post_id": post_wrapper.find("a", {"data-function": "quote"})["data-post"] if post_wrapper.find("a", {"data-function": "quote"}) else "",
                "post_link": post_wrapper.find("a", {"data-function": "quote"})["href"] if post_wrapper.find("a", {"data-function": "quote"}) else ""
            },
            "controls": {
                "controls_links": [
                    control.get("href", "#") for control in post_wrapper.select(".post_controls a")
                ]
            },
            "backlink_list": {
                "quoted_by": [
                    backlink["data-post"] for backlink in post_wrapper.select(".post_backlink")
                ]
            },
            "text_content": {
                "text": post_wrapper.find("div", class_="text").get_text(separator="\n").strip() if post_wrapper.find("div", class_="text") else "",
                "greentext_links": [
                    link["href"] for link in post_wrapper.find_all("a", class_="backlink") if link
                ]
            }
        }
        posts_data.append(post_data)
    json_output = json.dumps(posts_data, indent=4)
    return json_output

In [15]:
data = json.loads(return_json(text))
df = pd.json_normalize(data)

da = df[['post_data.poster_hash', 
        'post_data.datetime', 
        'post_data.post_id',
        'post_data.post_link', 
        #'backlink_list.quoted_by', 
        'text_content.text']].rename(columns = 
        {'post_data.poster_hash': 'poster_ID', 
        'post_data.datetime': 'date', 
        'post_data.post_id' :'post_id', 
        'post_data.post_link': 'post_link', 
        #'backlink_list.quoted_by': 'quoted_by', 
        'text_content.text': 'text'
         })

def threadno_get(url): 
    thread_number = re.search(r'/thread/(\d+)', url)
    if thread_number:
        return thread_number.group(1)
    else: 
        return "No Thread Number Found... Uh Oh..."

da['Thread_No'] = da['post_link'].apply(threadno_get)
da['Reply_To'] = da['text'].apply(lambda text: re.findall(r'>>(\d+)', text))
da['text'] = da['text'].apply(lambda text: re.sub(r'>>\d+\s*', '', text).strip())

print (len(da['Thread_No']))

languages = [Language.LATIN, Language.ENGLISH]
detector = LanguageDetectorBuilder.from_languages(*languages).build()

def latin_exterminator(s):
    confidence_value = detector.compute_language_confidence(s, Language.LATIN)
    cv = float(f"{confidence_value:.2f}") 
    if cv >= 0.5:
        return None
    else: 
        return s

#use the latin exterminator
da['text'] = da['text'].apply(latin_exterminator)
da = da[da['text'].notnull()]

6443


In [None]:
# before duplicate remover, len => 6443

In [16]:
stopwords = ['Imago Dei', """Luke 1:28, 1:42"""] ## if these in the string, delete the whole thing

da = da[~da['text'].str.contains('Imago Dei', case=False, na=False)]
da = da[~da['text'].str.contains("""Luke 1:28, 1:42""", case=False, na=False)]
da = da[~da['text'].str.contains("Opus Dei", case=False, na=False)]

In [18]:
da = da.drop_duplicates(subset = 'poster_ID', keep = 'last')

In [None]:
# after duplicate remover, len => 2535

In [20]:
thread_no_list = list(set([m for m in da['Thread_No']]))

In [21]:
# len(thread_no_list) => 2074

2074

In [24]:
with open("thread_jun22_sep23.txt", "w") as file:
    for thread in thread_no_list:
        file.write(thread + "\n")