In [3]:
import rmm
import time
import torch
import cupy as cp
import numpy as np
import pandas as pd
from tqdm import tqdm
from cuml.manifold import UMAP as cuUMAP
from cuml.cluster import HDBSCAN as cuHDBSCAN
from transformers import AutoTokenizer, AutoModel

In [4]:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

engine = create_engine("postgresql+psycopg2://postgres:Csci5502@35.222.3.25:5432/reddit-data") # replace with your own DB
Session = sessionmaker(bind=engine)
session = Session()

In [5]:
from sqlalchemy.orm.base import Mapped
from sqlalchemy.orm import Mapped, declarative_base, mapped_column
from sqlalchemy import ARRAY, BigInteger, CHAR, Column, Date, Integer, Numeric, PrimaryKeyConstraint, Sequence, String, Table, Text, UniqueConstraint


Base = declarative_base()
metadata = Base.metadata


class Cluster(Base):
    __tablename__ = 'cluster'
    __table_args__ = (
        PrimaryKeyConstraint('id', name='cluster_pkey'),
    )

    id = mapped_column(BigInteger)
    cluster_id = mapped_column(Integer)
    score_sum = mapped_column(Integer)
    day_diff = mapped_column(Integer)
    post_date = mapped_column(Date)
    num_of_comment = mapped_column(Integer)
    subreddit = mapped_column(Text)
    list_of_post = mapped_column(ARRAY(Text()))


class ClusterInfo(Base):
    __tablename__ = 'cluster_info'
    __table_args__ = (
        PrimaryKeyConstraint('id', name='cluster_info_pkey'),
    )

    id = mapped_column(BigInteger)
    subreddit = mapped_column(Text)
    post_date = mapped_column(Date)
    num_of_post = mapped_column(Integer)
    avg_num_comment = mapped_column(Numeric)
    avg_len_content = mapped_column(Numeric)
    avg_score = mapped_column(Numeric)
    cluster_id = mapped_column(BigInteger)
    number_of_comment = mapped_column(BigInteger)


class ClusterInfoNew(Base):
    __tablename__ = 'cluster_info_new'
    __table_args__ = (
        PrimaryKeyConstraint('id', name='cluster_info_new_pkey'),
    )

    id = mapped_column(BigInteger, Sequence('cluster_info_id_seq'))
    subreddit = mapped_column(Text)
    post_date = mapped_column(Date)
    num_of_post = mapped_column(Integer)
    avg_num_comment = mapped_column(Numeric)
    avg_len_content = mapped_column(Numeric)
    avg_score = mapped_column(Numeric)
    cluster_id = mapped_column(BigInteger)
    number_of_comment = mapped_column(BigInteger)


class ClusterNew(Base):
    __tablename__ = 'cluster_new'
    __table_args__ = (
        PrimaryKeyConstraint('id', name='cluster_new_pkey'),
    )

    id = mapped_column(BigInteger, Sequence('cluster_id_seq'))
    cluster_id = mapped_column(Integer)
    score_sum = mapped_column(Integer)
    day_diff = mapped_column(Integer)
    post_date = mapped_column(Date)
    num_of_comment = mapped_column(Integer)
    subreddit = mapped_column(Text)
    list_of_post = mapped_column(ARRAY(Text()))


class ClusterTest(Base):
    __tablename__ = 'cluster_test'
    __table_args__ = (
        PrimaryKeyConstraint('id', name='cluster_test_pkey'),
    )

    id = mapped_column(BigInteger, Sequence('cluster_id_seq'))
    cluster_id = mapped_column(Integer)
    score_sum = mapped_column(Integer)
    day_diff = mapped_column(Integer)
    post_date = mapped_column(Date)
    num_of_comment = mapped_column(Integer)
    subreddit = mapped_column(Text)
    list_of_post = mapped_column(ARRAY(Text()))


t_economics = Table(
    'economics', metadata,
    Column('private_id', Integer, nullable=False),
    Column('post_id', String(8), nullable=False),
    Column('subreddit', String(25)),
    Column('post_title', Text),
    Column('post_content', Text),
    Column('post_score', Integer),
    Column('post_create', Date),
    Column('command_content', ARRAY(Text())),
    Column('command_score', ARRAY(Integer())),
    Column('command_create', ARRAY(Date())),
    UniqueConstraint('private_id', name='economics_private_id_key')
)


t_economics_new = Table(
    'economics_new', metadata,
    Column('private_id', Integer, nullable=False),
    Column('post_id', String(8), nullable=False),
    Column('subreddit', String(25)),
    Column('post_title', Text),
    Column('post_content', Text),
    Column('post_score', Integer),
    Column('post_create', Date),
    Column('command_content', ARRAY(Text())),
    Column('command_score', ARRAY(Integer())),
    Column('command_create', ARRAY(Date())),
    UniqueConstraint('private_id', name='economics_new_private_id_key')
)


t_politics = Table(
    'politics', metadata,
    Column('private_id', Integer, nullable=False),
    Column('post_id', String(8), nullable=False),
    Column('subreddit', String(25)),
    Column('post_title', Text),
    Column('post_content', Text),
    Column('post_score', Integer),
    Column('post_create', Date),
    Column('command_content', ARRAY(Text())),
    Column('command_score', ARRAY(Integer())),
    Column('command_create', ARRAY(Date())),
    UniqueConstraint('private_id', name='politics_private_id_key')
)


t_politics_2 = Table(
    'politics_2', metadata,
    Column('private_id', Integer, Sequence('politics_private_id_seq'), nullable=False),
    Column('post_id', String(8), nullable=False),
    Column('subreddit', String(25)),
    Column('post_title', Text),
    Column('post_content', Text),
    Column('post_score', Integer),
    Column('post_create', Date),
    Column('command_content', ARRAY(Text())),
    Column('command_score', ARRAY(Integer())),
    Column('command_create', ARRAY(Date())),
    UniqueConstraint('private_id', name='politics_2_private_id_key')
)


t_politics_new = Table(
    'politics_new', metadata,
    Column('private_id', Integer, nullable=False),
    Column('post_id', String(8), nullable=False),
    Column('subreddit', String(25)),
    Column('post_title', Text),
    Column('post_content', Text),
    Column('post_score', Integer),
    Column('post_create', Date),
    Column('command_content', ARRAY(Text())),
    Column('command_score', ARRAY(Integer())),
    Column('command_create', ARRAY(Date())),
    UniqueConstraint('private_id', name='politics_new_private_id_key')
)


class PostPreprocessing(Base):
    __tablename__ = 'post_preprocessing'
    __table_args__ = (
        PrimaryKeyConstraint('id', name='post_preprocessing_pkey'),
    )

    id = mapped_column(BigInteger)
    post_id = mapped_column(String)
    subreddit = mapped_column(String)
    post_create = mapped_column(Date)
    content = mapped_column(String)
    content_length = mapped_column(BigInteger)
    num_of_comment = mapped_column(BigInteger)


class Sentiment(Base):
    __tablename__ = 'sentiment'
    __table_args__ = (
        PrimaryKeyConstraint('id', name='sentiment_pkey'),
    )

    id = mapped_column(BigInteger)
    post_id = mapped_column(CHAR(64))
    result = mapped_column(String(65535))


t_sports = Table(
    'sports', metadata,
    Column('private_id', Integer, nullable=False),
    Column('post_id', String(8), nullable=False),
    Column('subreddit', String(25)),
    Column('post_title', Text),
    Column('post_content', Text),
    Column('post_score', Integer),
    Column('post_create', Date),
    Column('command_content', ARRAY(Text())),
    Column('command_score', ARRAY(Integer())),
    Column('command_create', ARRAY(Date())),
    UniqueConstraint('private_id', name='sports_private_id_key')
)


t_sports_new = Table(
    'sports_new', metadata,
    Column('private_id', Integer, nullable=False),
    Column('post_id', String(8), nullable=False),
    Column('subreddit', String(25)),
    Column('post_title', Text),
    Column('post_content', Text),
    Column('post_score', Integer),
    Column('post_create', Date),
    Column('command_content', ARRAY(Text())),
    Column('command_score', ARRAY(Integer())),
    Column('command_create', ARRAY(Date())),
    UniqueConstraint('private_id', name='sports_new_private_id_key')
)

In [None]:
def safe_normalize(v):
    """Safely normalize a vector to unit length."""
    norm = np.linalg.norm(v)
    return v / norm if norm > 0 else v

In [None]:
import nltk
from nltk.corpus import stopwords
# Ensure that NLTK stopwords are downloaded
nltk.download('stopwords')

In [None]:
class GPUEmbeddingProcessor:
    def __init__(self, 
                 model_name='allenai/longformer-base-4096', 
                 batch_size=32):
        # Use RMM pool allocator (updated method)
        rmm.reinitialize(
            pool_allocator=True,  # Enable pool allocation
            initial_pool_size=2**30  # 1 GB initial pool size, adjust as needed
        )
        
        # Detect device
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
        # Load tokenizer and model
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModel.from_pretrained(model_name).to(self.device)
        
        # Batch processing configuration
        self.batch_size = batch_size
        self.stop_words = set(stopwords.words('english'))  # Add language of your choice


    def remove_stopwords(self, text):
        # Tokenize text and filter out stopwords
        tokens = text.split()  # Assuming the text is pre-tokenized or you can use nltk.word_tokenize
        filtered_tokens = [word for word in tokens if word.lower() not in self.stop_words]
        return ' '.join(filtered_tokens)  # Return as a cleaned-up string
    

    def generate_embeddings(self, texts):
        # Preprocess texts into batches
        all_embeddings = []
        
        for i in tqdm(range(0, len(texts), self.batch_size), desc="Processing", unit="batch"):
            batch_texts = texts[i:i+self.batch_size]

            cleaned_batch_texts = [self.remove_stopwords(text) for text in batch_texts]
            try:
                # Tokenize batch
                inputs = self.tokenizer(
                    cleaned_batch_texts, 
                    return_tensors='pt', 
                    padding=True, 
                    truncation=True,
                    max_length=1024
                ).to(self.device)
                
                # Generate embeddings
                with torch.no_grad():
                    outputs = self.model(**inputs)
                
                # Mean pooling
                batch_embeddings = outputs.last_hidden_state.mean(dim=1)
                
                # Convert to numpy and normalize
                batch_embeddings_np = batch_embeddings.cpu().numpy()
                batch_normalized = np.array([safe_normalize(emb) for emb in batch_embeddings_np])
                
                all_embeddings.append(batch_normalized)
                
            
            except Exception as e:
                print(f"Error processing batch: {e}")

        
        return np.vstack(all_embeddings)

[nltk_data] Downloading package stopwords to
[nltk_data]     /home/zhihaolyu/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [10]:
def gpu_dimensionality_reduction_clustering(embeddings, 
                                            n=2,
                                            n_neighbors=15, 
                                            min_cluster_size=10):
    try:
        # Convert to CuPy array
        embeddings_gpu = cp.asarray(embeddings)
        # GPU UMAP
        umap_reducer = cuUMAP(
            n_neighbors=n_neighbors,
            n_components=n,
            random_state=42
        )
        reduced_dims = umap_reducer.fit_transform(embeddings_gpu)
        # GPU HDBSCAN
        hdbscan_clusterer = cuHDBSCAN(
            min_cluster_size=min_cluster_size,
            gen_min_span_tree=True,
            min_samples = 20
        )
        cluster_labels = hdbscan_clusterer.fit_predict(reduced_dims)

        t1 = cp.asnumpy(reduced_dims)
        t2 = cp.asnumpy(cluster_labels)
        return (
            t1, 
            t2
        )
    except Exception as e:
        print(f"Dimensionality reduction error: {e}")
        return None, None

In [11]:
p_reddit = set(['politics', 'PoliticalDiscussion', 'unpopularopinion', 'Conservative', 'PoliticalHumor'])
s_reddit = set(['nba','sports','nfl','PremierLeague','formula1'])
e_reddit = set(['Economics', 'AskEconomics', 'inflation', 'economicCollapse', 'badeconomics'])

In [12]:
def save_to_db(df, engine):
    df_new = df.drop(columns=['command_score', 'post_create', 'post_score', 'command_score_sum', 'command_content'])
    df_new.to_sql('cluster', engine, index=False, if_exists='append') # cluster is the table name

In [13]:
def aggregation(df, month, year=2023):
    # Add cluster information to DataFrame
    
    agg_func = {
        'post_id': list,
        'command_score': list,
        'post_create': list,
        'post_score': 'sum',
        'subreddit': 'first',
        "command_content": list
    }
    result_df = df.groupby('cluster_id').agg(agg_func).reset_index()

    result_df = result_df.rename(columns={'post_id': 'list_of_post'}) 

    result_df['command_score_sum'] = result_df['command_score'].apply(lambda a : sum([0 if item is None else item for sublist in a for item in sublist]))
    result_df['score_sum'] = result_df['command_score_sum'] + result_df['post_score']
    
    result_df['num_of_comment'] = result_df['command_content'].apply(lambda a : sum([0 if item is None else 1 for sublist in a for item in sublist]))
    
    result_df['post_create'] = result_df['post_create'].apply(lambda x: pd.to_datetime(x, format='%Y-%m-%d'))
    result_df['day_diff'] = result_df['post_create'].apply(lambda x: x.max() - x.min())
    result_df['day_diff'] = result_df['day_diff'].dt.days
    
    def classify_subreddit(sub):
        if sub in p_reddit:
            return 'politic'
        elif sub in s_reddit:
            return 'sport'
        else:
            return 'economic'
    result_df['subreddit'] = result_df['subreddit'].apply(classify_subreddit)
    
    result_df['post_date'] = pd.to_datetime(f'{year}-{month:02d}')

    return result_df

In [21]:
start_time = time.time()
auto_reply = "Please remember what subreddit you are in, this is unpopular opinion. We want civil and unpopular takes and discussion. Any uncivil and ToS violating comments will be removed and subject to a ban. Have a nice day!*I am a bot, and this action was performed automatically. Please [contact the moderators of this subreddit](/message/compose/?to= if you have any questions or concerns.*"

reduction_dimentions = 12
n = 10

year = 2023
for month in range(1, 13):
    # GPU-accelerated CSV reading with cuDF
    query = f"""
    SELECT * FROM public.sports
    WHERE EXTRACT(YEAR FROM post_create) = {year}
    AND EXTRACT(MONTH FROM post_create) = {month}
    """
    df = pd.read_sql(query, engine)

    # 定义一个函数来处理每一行
    def combine_content(row):
        # 如果 command_content 是 [None]，只返回 post_content
        if row['command_content'] == [None] or row['command_content'] is None:
            return row['post_content']
        # 否则将 post_content 和 command_content 合并
        else:
            original_text = row['post_content']
            for i in range(len(row['command_content'])):
                text = row['command_content'][i]
                if text is not None and text != auto_reply:
                    original_text += " "
                    original_text += text
            return original_text
        
    df['combined_content'] = df.apply(combine_content, axis=1)
    # Convert to list for processing
    texts = df['combined_content'].tolist()

    # Initialize GPU embedding processor
    embedding_processor = GPUEmbeddingProcessor(batch_size=32)

    # # Generate embeddings
    embeddings = embedding_processor.generate_embeddings(texts)

    # # Perform GPU dimensionality reduction and clustering
    reduced_dims, cluster_labels = gpu_dimensionality_reduction_clustering(
        embeddings, 
        n=reduction_dimentions,
        n_neighbors=n,
        min_cluster_size=5
    )

    df['cluster_id'] = cluster_labels
    result_df = aggregation(df, month)

    save_to_db(result_df, engine)

    # Print cluster information
    unique_clusters = np.unique(cluster_labels)

    print(f"Number of clusters detected: {len(unique_clusters)}")
    for cluster in unique_clusters:
        cluster_count = np.sum(cluster_labels == cluster)
        print(f"Cluster {cluster}: {cluster_count} texts")

    end_time = time.time()

    time_cost = end_time - start_time
    print(f"this task costs {time_cost} seconds")

Processing: 100%|██████████| 166/166 [06:35<00:00,  2.38s/batch]


Fitting 3 folds for each of 180 candidates, totalling 540 fits


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Av

Best Parameters: {'alpha': 1.0, 'cluster_selection_method': 'eom', 'min_cluster_size': 50, 'min_samples': 5}
Best Silhouette Score: 0.5123909910519918
Fitting 3 folds for each of 180 candidates, totalling 540 fits
Best Parameters: {'alpha': 1.0, 'cluster_selection_method': 'eom', 'min_cluster_size': 50, 'min_samples': 15}
Best Silhouette Score: 0.48202627897262573
Fitting 3 folds for each of 180 candidates, totalling 540 fits
Best Parameters: {'alpha': 1.0, 'cluster_selection_method': 'eom', 'min_cluster_size': 50, 'min_samples': 5}
Best Silhouette Score: 0.4705037971337636
Fitting 3 folds for each of 180 candidates, totalling 540 fits
Best Parameters: {'alpha': 1.0, 'cluster_selection_method': 'eom', 'min_cluster_size': 40, 'min_samples': 15}
Best Silhouette Score: 0.45420345664024353
Fitting 3 folds for each of 180 candidates, totalling 540 fits
Best Parameters: {'alpha': 1.0, 'cluster_selection_method': 'eom', 'min_cluster_size': 30, 'min_samples': 15}
Best Silhouette Score: 0.53755