In [4]:
!pip install transformers torch

Collecting transformers
  Using cached transformers-4.41.2-py3-none-any.whl.metadata (43 kB)
Collecting torch
  Downloading torch-2.3.0-cp311-cp311-win_amd64.whl.metadata (26 kB)
Collecting huggingface-hub<1.0,>=0.23.0 (from transformers)
  Using cached huggingface_hub-0.23.2-py3-none-any.whl.metadata (12 kB)
Collecting tokenizers<0.20,>=0.19 (from transformers)
  Downloading tokenizers-0.19.1-cp311-none-win_amd64.whl.metadata (6.9 kB)
Collecting safetensors>=0.4.1 (from transformers)
  Downloading safetensors-0.4.3-cp311-none-win_amd64.whl.metadata (3.9 kB)
Collecting mkl<=2021.4.0,>=2021.1.1 (from torch)
  Using cached mkl-2021.4.0-py2.py3-none-win_amd64.whl.metadata (1.4 kB)
Collecting intel-openmp==2021.* (from mkl<=2021.4.0,>=2021.1.1->torch)
  Using cached intel_openmp-2021.4.0-py2.py3-none-win_amd64.whl.metadata (1.2 kB)
Collecting tbb==2021.* (from mkl<=2021.4.0,>=2021.1.1->torch)
  Using cached tbb-2021.12.0-py3-none-win_amd64.whl.metadata (1.1 kB)
Using cached transformers-4.

In [2]:
import os
import json
import pandas as pd
import io
import datetime
import logging
import zstandard as zstd
from transformers import BertTokenizer, BertModel
import torch
from concurrent.futures import ProcessPoolExecutor, as_completed
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

In [3]:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertModel.from_pretrained('bert-base-uncased').to(device)

In [5]:
def get_embedding(text):
    try:
        inputs = tokenizer(text, return_tensors='pt', truncation=True, padding=True, max_length=512).to(device)
        with torch.no_grad():
            outputs = model(**inputs)
        embedding = outputs.last_hidden_state.mean(dim=1).cpu().numpy()
        return embedding
    except Exception as e:
        logging.error(f"Error in get_embedding: {e}")
        return np.zeros((1, 768))  

In [6]:
def get_related_words_bert(keyword, top_k=10):
    keyword_embedding = get_embedding(keyword)
    related_words = []
    for word in tokenizer.vocab.keys():
        word_embedding = get_embedding(word)
        similarity = cosine_similarity(keyword_embedding, word_embedding)
        related_words.append((word, similarity))
    related_words = sorted(related_words, key=lambda x: x[1], reverse=True)[:top_k]
    return [word for word, _ in related_words]

In [8]:
def generate_keywords_list(keywords, top_k=10):

    all_keywords = set(keywords)
    for keyword in keywords:
        related_words = get_related_words_bert(keyword, top_k)
        all_keywords.update(related_words)
    extended_keywords_list = list(all_keywords)
   
    print("Extended Keywords List:")
    for keyword in extended_keywords_list:
        print(keyword)
    return extended_keywords_list

In [9]:
def stream_zst_file(file_path):
    try:
        with open(file_path, 'rb') as fh:
            dctx = zstd.ZstdDecompressor(max_window_size=2147483648)  # 2 GB
            with dctx.stream_reader(fh) as reader:
                text_stream = io.TextIOWrapper(reader, encoding='utf-8')
                for line in text_stream:
                    yield line
    except Exception as e:
        logging.error(f"Error reading file {file_path}: {e}")

In [11]:
def process_posts_chunk(file_path, subreddits, keywords, chunk_size=100):
    logging.info(f"Processing file: {file_path}")
    data = []
    chunk = []
    try:
        for line in stream_zst_file(file_path):
            try:
                post = json.loads(line)
                content = post.get('selftext', '').lower()  
                if post.get('subreddit') in subreddits and any(keyword in content for keyword in keywords):
                    chunk.append(post)
                    if len(chunk) >= chunk_size:
                        data.extend(process_chunk(chunk))
                        chunk = []
            except json.JSONDecodeError:
                logging.warning(f"JSONDecodeError for line: {line}")
            except Exception as e:
                logging.error(f"Error processing line: {e}")
        if chunk:
            data.extend(process_chunk(chunk))
    except Exception as e:
        logging.error(f"Error in process_posts_chunk: {e}")
    return data

In [12]:
def process_chunk(chunk):
    logging.info(f"Processing chunk of size: {len(chunk)}")
    data = []
    for post in chunk:
        try:
            data.append({
                'title': post.get('title', ''),
                'author': post.get('author', ''),
                'content': post.get('selftext', ''),
                'id': post.get('id', ''),
                'score': post.get('score', 0),
                'created_utc': datetime.datetime.fromtimestamp(post.get('created_utc', 0)).strftime('%Y-%m-%d %H:%M:%S'),
                'url': post.get('url', ''),
                'num_comments': post.get('num_comments', 0),
                'subreddit': post.get('subreddit', '')
            })
        except Exception as e:
            logging.error(f"Error processing post: {e}")
    return data

In [13]:
def process_directory(directory_path, subreddits, keywords, num_workers=1, chunk_size=100):
    logging.info(f"Starting directory processing: {directory_path}")
    all_data = []
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        futures = []
        for filename in os.listdir(directory_path):
            if filename.endswith('.zst'):
                file_path = os.path.join(directory_path, filename)
                futures.append(executor.submit(process_posts_chunk, file_path, subreddits, keywords, chunk_size))
        for future in as_completed(futures):
            try:
                result = future.result()
                all_data.extend(result)
            except Exception as e:
                logging.error(f"Error in future result: {e}")
    logging.info(f"Finished directory processing: {directory_path}")
    return all_data

In [15]:
def main():
    directory_path = "E:\\Torrents\\reddit\\submissions\\2023"
    output_csv = os.path.join("C:\\Users\\ntu-s\\OneDrive - Nanyang Technological University\\sherry\\reddit output", 'filtered_posts.csv')
    subreddits = ['askSingapore', 'NTU', 'nus', 'SGExams', 'singapore', 'SingaporeRaw', 'NationalServiceSG']
    keywords = [
        'mental health', 'suicide', 'depressed', 'depression', 'anxiety', 'stressed', 'stressful',
        'burnout', 'hopeless', 'hopelessness', 'meaningless', 'meaninglessness', 'sad',
        'failure', 'loser', 'toxic'
    ]

    extended_keywords = generate_keywords_list(keywords, top_k=10)
    logging.info(f"Extended Keywords: {extended_keywords}")

    all_data = process_directory(directory_path, subreddits, extended_keywords, num_workers=1, chunk_size=100)

    if all_data:
        for i in range(0, len(all_data), 10000): 
            chunk = all_data[i:i + 10000]
            chunk_df = pd.DataFrame(chunk)
            chunk_df.to_csv(output_csv, mode='a', index=False, header=(i == 0))
            logging.info(f"Saved chunk {i // 10000 + 1} to '{output_csv}'.")

if __name__ == "__main__":
    main()

2024-06-05 20:21:16,991 - INFO - Extended Keywords: ['coveted', 'hazardous', 'toxic', 'meaningless', 'grief', 'tragedy', 'concern', 'winning', 'forbidden', 'paralysis', 'peripheral', 'scandal', 'stress', 'anguish', 'losers', 'bankruptcy', 'burial', 'termination', 'impatience', 'destruction', 'pearls', 'worried', 'dominant', 'sanitation', 'opponent', 'hopelessness', 'elimination', 'alcoholic', 'finalists', 'tormented', 'resentment', 'education', 'diagnosed', 'failing', 'dangerously', 'alcoholism', 'trapped', 'losing', 'depressed', 'cruelty', 'helpless', 'burnout', 'schizophrenia', 'apprehension', 'illness', 'outward', 'painful', 'isolation', 'dangerous', 'anxious', 'richest', 'shame', 'unable', 'despair', 'stressful', 'health', 'betrayed', 'wealthy', 'awkward', 'grounded', 'eliminated', 'garbage', 'fear', 'sorrow', 'investigative', 'frustrated', 'arrogance', 'rotting', 'fail', 'airship', 'worry', 'useless', 'stresses', 'abandonment', 'pity', 'pollen', 'applied', 'murder', 'tumor', 'care

Extended Keywords List:
coveted
hazardous
toxic
meaningless
grief
tragedy
concern
winning
forbidden
paralysis
peripheral
scandal
stress
anguish
losers
bankruptcy
burial
termination
impatience
destruction
pearls
worried
dominant
sanitation
opponent
hopelessness
elimination
alcoholic
finalists
tormented
resentment
education
diagnosed
failing
dangerously
alcoholism
trapped
losing
depressed
cruelty
helpless
burnout
schizophrenia
apprehension
illness
outward
painful
isolation
dangerous
anxious
richest
shame
unable
despair
stressful
health
betrayed
wealthy
awkward
grounded
eliminated
garbage
fear
sorrow
investigative
frustrated
arrogance
rotting
fail
airship
worry
useless
stresses
abandonment
pity
pollen
applied
murder
tumor
careless
solitude
refining
pathetic
loser
organic
troubled
fails
hatred
disappointed
infrastructure
governance
greed
waste
anxiety
emptiness
uncomfortable
controversy
depression
welfare
powerless
exclusion
expansion
ignores
depths
nutrition
disorders
initials
somber
poin

OSError: [WinError 123] The filename, directory name, or volume label syntax is incorrect: '"E:\\Torrents\reddit\\submissions\x823"'