In [1]:
import logging
import os
import operator

import matplotlib.pyplot as plt
import pandas as pd
# Plotly graphs have more features than seaborn, like interactive hover text & zoom, but they don't show up in pdfs
import plotly.express as px
import pyspark.sql.functions as fn
from pyspark.ml.linalg import Vectors, VectorUDT
import seaborn as sns
import numpy as np

logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)

import ihop

import ihop.community2vec as ic2v
import ihop.import_data as iid
import ihop.text_processing as itp
import ihop.clustering as ic


In [2]:
C2V_MODEL_PATH = "../data/community2vec/RC_2021-05/best_model/keyedVectors"

# This data was produced by the bagOfWords_preprocessing_databricks.ipynb notbook, it removes deleted comments/submissions and comments from top most commenting users, joins comments and submissions, but has no text preprocessing
# These are essentially the same steps as ihop.import_data bow 
REDDIT_THREADS_PATH = "../data/bagOfWords/2021-05_to_2021-06_joined_submissions_comments_5percentTopUsersExcludedFromComments_02102022.parquet"

In [3]:
spark = ihop.utils.get_spark_session("Cluster Labels Notebook",config={"spark.driver.memory":"36G", "spark.driver.memoryOverhead":"8G", "spark.master":"local[*]"})

22/06/24 15:13:39 WARN Utils: Your hostname, virginia-beastbox resolves to a loopback address: 127.0.1.1; using 10.3.40.174 instead (on interface wlp147s0)
22/06/24 15:13:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/24 15:13:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2022-06-24 15:13:41,565 : INFO : Spark configuration: [('spark.app.name', 'Cluster Labels Notebook'), ('spark.driver.memory', '36G'), ('spark.driver.port', '37491'), ('spark.executor.id', 'driver'), ('spark.driver.memoryOverhead', '8G'), ('spark.driver.host', 'wpa174.wpanet.cs.local'), ('spark.rdd.compress', 'True'), ('spark.serializer.objectStreamReset', '100'), ('spark.master', 'local[*]'), ('spark

In [4]:
dataframe = spark.read.parquet(REDDIT_THREADS_PATH)#.limit(10000)

In [5]:
dataframe.columns

['subreddit',
 'author',
 'created_utc',
 'id',
 'score',
 'selftext',
 'title',
 'url',
 'fullname_id',
 'comments_subreddit',
 'comments_id',
 'parent_id',
 'comments_score',
 'link_id',
 'comments_author',
 'body',
 'comments_created_utc',
 'time_to_comment_in_seconds']

In [6]:
dataframe.show(10)

+-------------------+-----------+-----------+------+-----+--------------------+--------------------+--------------------+-----------+-------------------+-----------+----------+--------------+---------+--------------------+--------------------+--------------------+--------------------------+
|          subreddit|     author|created_utc|    id|score|            selftext|               title|                 url|fullname_id| comments_subreddit|comments_id| parent_id|comments_score|  link_id|     comments_author|                body|comments_created_utc|time_to_comment_in_seconds|
+-------------------+-----------+-----------+------+-----+--------------------+--------------------+--------------------+-----------+-------------------+-----------+----------+--------------+---------+--------------------+--------------------+--------------------+--------------------------+
|        superleague|  SL_Thread| 1619960426|n135uh|    7|||SUNDAY||\n|:--|...|Sunday Match Thre...|https://www.reddi...|  t

In [7]:
# reproduce some of the work from ihop.text_processing.py to concat submission and comments within a time range of 3s-3d, but don't use TF-IDF 
corpus = itp.SparkCorpus.init_from_joined_dataframe(dataframe, max_time_delta=60*60*72, min_time_delta=3)
corpus.document_dataframe.columns

['id', 'subreddit', 'document_text']

In [8]:
subreddit_text_dataframe = corpus.document_dataframe.groupBy("subreddit").agg(
    fn.concat_ws(" ", fn.collect_list("document_text")).alias("subreddit_text")
)


In [9]:
#print(subreddit_text_dataframe.rdd.getNumPartitions())
subreddit_text_dataframe = subreddit_text_dataframe.repartition(10000)
#print(subreddit_text_dataframe.rdd.getNumPartitions())

In [10]:
text_pipeline = itp.SparkTextPreprocessingPipeline(input_col = "subreddit_text", maxDF=10000, minDF=0.0, minTF=5)
subreddit_vectorized_df = text_pipeline.fit_transform(subreddit_text_dataframe)

2022-06-24 15:13:44,777 : INFO : Parameters for SparkTextPreprocessingPipeline: {'self': <ihop.text_processing.SparkTextPreprocessingPipeline object at 0x7f62417cdb20>, 'input_col': 'subreddit_text', 'output_col': 'vectorized', 'tokens_col': 'tokenized', 'filtered_tokens_col': 'tokensNoStopWords', 'tokenization_pattern': '([\\p{L}\\p{N}#@][\\p{L}\\p{N}\\p{Pd}\\p{Pc}\\p{S}\\p{P}]*[\\p{L}\\p{N}])|[\\p{L}\\p{N}]|[^\\p{P}\\s]', 'match_gaps': False, 'toLowercase': True, 'stopLanguage': 'english', 'stopCaseSensitive': False, 'maxDF': 10000, 'minDF': 0.0, 'minTF': 5, 'binary': False, 'useIDF': False}
2022-06-24 15:13:44,793 : INFO : Using RegexTokenizer with following parameters: {inputCol: subreddit_text, outputCol: tokenized, pattern: ([\p{L}\p{N}#@][\p{L}\p{N}\p{Pd}\p{Pc}\p{S}\p{P}]*[\p{L}\p{N}])|[\p{L}\p{N}]|[^\p{P}\s], toLowercase: True, gaps: False}
2022-06-24 15:13:44,835 : INFO : Using StopWordsRemover with the following parameters: {inputCol: tokenized, outputCol: tokensNoStopWords, 

In [None]:
subreddit_vectorized_df.show()

In [None]:
# Turn the dataframe into a pandas sparse vectors into numpy arrays
pandas_df = subreddit_vectorized_df.select("subreddit", "vectorized").toPandas()
id_to_term = text_pipeline.get_id_to_word()


In [None]:
spark.stop()

In [None]:
pandas_df["numpy_vectorized"] = pandas_df["vectorized"].apply(lambda x: x.toArray())
pandas_df = pandas_df.drop("vectorized", axis=1)

display(pandas_df.head(10))
pandas_df.dtypes


In [None]:
total_corpus_counts = np.sum(pandas_df["numpy_vectorized"])
print("Array transform to numpy:", total_corpus_counts)
print("Vocab size check:", len(total_corpus_counts))
total_tokens = np.sum(total_corpus_counts)
print("Total tokens in corpus:", total_tokens)

In [None]:
def compute_token_probabilities(token_count_pdf, vectorized_col):
    token_count_array = np.sum(token_count_pdf[vectorized_col])
    total_tokens = np.sum(token_count_array)
    return token_count_array/total_tokens

def compute_pmi(token_count_pdf, vectorized_col, total_term_probabilities):
    """Returns numpy array storing pointwise mutual information between given dataframe values and the overall corpus.

    :param token_count_pdf: _description_
    :param vectorized_col: 
    :param total_term_probabilities: _description_
    """
    conditional_probs = compute_token_probabilities(token_count_pdf, vectorized_col)
    pmis = np.log2(conditional_probs / total_term_probabilities)
    return pmis


In [None]:
corpus_term_probabilities = compute_token_probabilities(pandas_df, "numpy_vectorized")

In [None]:
selected_subreddits = ["4chan", "Utah", "MensRights", "conservatives", "libertarianmemes"]
k = 5
for s in selected_subreddits:
 
    selected_pdf = pandas_df[pandas_df["subreddit"]==s]
    display(selected_pdf)

    pmi_values = compute_pmi(selected_pdf, "numpy_vectorized", corpus_term_probabilities)

    top_pmi_indices = np.argpartition(pmi_values, -k)[-k:]
    print("Top PMI values for subreddit:", s)
    print("\tTerm\tPMI")
    for i in top_pmi_indices:
        print(f"\t{id_to_term[i]}\t{pmi_values[i]}")
    print()
    bottom_pmi_indices = np.argpartition(pmi_values,k)[:k]
    print("Bottom PMI values for subreddit:", s)
    print("\tTerm\tPMI")
    for i in bottom_pmi_indices:
        print(f"\t{id_to_term[i]}\t{pmi_values[i]}")

