In [100]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, CountVectorizer, IDF
from pyspark.sql.window import Window as W
from pyspark.ml import Pipeline
import os
import uuid
from pyspark.sql.types import (
    ArrayType,
    DoubleType
)
from pyspark.sql.functions import col, window, collect_list, from_unixtime, concat_ws, explode,udf, array, lit, struct, row_number
from pyspark.sql import Row

In [101]:
spark = (
    SparkSession.builder.appName("Reddit TF-IDF Processing")
    .master("local[*]")
    .getOrCreate()
)

In [102]:
input_dir="data/raw"

In [103]:
df = spark.read.parquet("data/raw/*.parquet")

                                                                                

In [104]:
df.show(5)

+-------+------------------+-------------+-----+----------+---------+--------------------+--------------------+-------------------+
|     id|            author|  created_utc|score| parent_id|subreddit|           permalink|                text|          timestamp|
+-------+------------------+-------------+-----+----------+---------+--------------------+--------------------+-------------------+
|l70kegt|5chrodingers_pussy|1.717472683E9|    2|t1_l70ii9i| JoeRogan|/r/JoeRogan/comme...|You bring up a co...|2024-06-04 05:44:43|
|l70kegt|5chrodingers_pussy|1.717472683E9|    2|t1_l70ii9i| JoeRogan|/r/JoeRogan/comme...|You bring up a co...|2024-06-04 05:44:43|
|l70kegt|5chrodingers_pussy|1.717472683E9|    2|t1_l70ii9i| JoeRogan|/r/JoeRogan/comme...|You bring up a co...|2024-06-04 05:44:43|
|l6z1s7d|     Sardoodledome|1.717450311E9|    1|t1_l6z1cw9| JoeRogan|/r/JoeRogan/comme...|Well I forgot abo...|2024-06-03 23:31:51|
|l6z1s7d|     Sardoodledome|1.717450311E9|    1|t1_l6z1cw9| JoeRogan|/r/JoeR

In [105]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")

In [106]:
# Apply CountVectorizer to get term frequency
cv = CountVectorizer(
    inputCol="words", outputCol="rawFeatures", vocabSize=1000, minDF=1.0
)

In [107]:
# Apply IDF to get TF-IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")

In [108]:
# Define a pipeline
pipeline = Pipeline(stages=[tokenizer, cv, idf])

In [109]:
# Apply windowing
windowed_df = df.groupBy(window(col("timestamp"), "60 seconds", "5 seconds")).agg(
    collect_list("text").alias("texts")
)

In [110]:
windowed_df.show(5)



+--------------------+--------------------+
|              window|               texts|
+--------------------+--------------------+
|{2024-06-03 21:11...|            [, , , ]|
|{2024-06-03 21:11...|            [, , , ]|
|{2024-06-03 21:12...|            [, , , ]|
|{2024-06-03 21:12...|            [, , , ]|
|{2024-06-03 21:17...|[As someone with ...|
+--------------------+--------------------+
only showing top 5 rows



                                                                                

In [111]:
row = windowed_df.collect()[17]

                                                                                

In [112]:
row

Row(window=Row(start=datetime.datetime(2024, 6, 3, 21, 20, 45), end=datetime.datetime(2024, 6, 3, 21, 21, 45)), texts=['Maybe AI can finally explain exactly what “woke” is supposed to mean, since the people using it constantly have so far been unable to.', 'Maybe AI can finally explain exactly what “woke” is supposed to mean, since the people using it constantly have so far been unable to.', 'Maybe AI can finally explain exactly what “woke” is supposed to mean, since the people using it constantly have so far been unable to.', 'Maybe AI can finally explain exactly what “woke” is supposed to mean, since the people using it constantly have so far been unable to.', 'It is already my dude', 'It is already my dude', 'It is already my dude', 'It is already my dude'])

In [113]:
window_start, window_end = row["window"]["start"], row["window"]["end"]
texts = row["texts"]

In [114]:
texts_df = spark.createDataFrame([(text,) for text in texts], ["text"])

In [115]:
model = pipeline.fit(texts_df)

In [116]:
tfidf_df = model.transform(texts_df)

In [117]:
vocab = model.stages[1].vocabulary

In [118]:
vocab

['it',
 'is',
 'been',
 'supposed',
 'using',
 'since',
 'my',
 'unable',
 'exactly',
 'dude',
 'what',
 'can',
 'so',
 'to',
 'constantly',
 'finally',
 'have',
 'to.',
 'explain',
 'maybe',
 'the',
 'already',
 'people',
 'ai',
 'mean,',
 'far',
 '“woke”']

In [119]:
def extract_top_words(features):
    top_indices = features.indices
    top_values = features.values
    top_words = [(vocab[i], v) for i, v in zip(top_indices, top_values)]
    top_words = sorted(top_words, key=lambda x: x[1], reverse=True)[:10]
    return top_words

In [120]:
top_words = (
    tfidf_df.select("features")
    .rdd.flatMap(lambda row: extract_top_words(row["features"]))
    .collect()
)

                                                                                

In [121]:
top_words

[('been', 0.5877866649021191),
 ('supposed', 0.5877866649021191),
 ('using', 0.5877866649021191),
 ('since', 0.5877866649021191),
 ('unable', 0.5877866649021191),
 ('exactly', 0.5877866649021191),
 ('what', 0.5877866649021191),
 ('can', 0.5877866649021191),
 ('so', 0.5877866649021191),
 ('to', 0.5877866649021191),
 ('been', 0.5877866649021191),
 ('supposed', 0.5877866649021191),
 ('using', 0.5877866649021191),
 ('since', 0.5877866649021191),
 ('unable', 0.5877866649021191),
 ('exactly', 0.5877866649021191),
 ('what', 0.5877866649021191),
 ('can', 0.5877866649021191),
 ('so', 0.5877866649021191),
 ('to', 0.5877866649021191),
 ('been', 0.5877866649021191),
 ('supposed', 0.5877866649021191),
 ('using', 0.5877866649021191),
 ('since', 0.5877866649021191),
 ('unable', 0.5877866649021191),
 ('exactly', 0.5877866649021191),
 ('what', 0.5877866649021191),
 ('can', 0.5877866649021191),
 ('so', 0.5877866649021191),
 ('to', 0.5877866649021191),
 ('been', 0.5877866649021191),
 ('supposed', 0.58778

In [122]:
def to_array(v):
        if v is None:
            return None
        return v.toArray().tolist()

to_array_udf = udf(to_array, ArrayType(DoubleType()))
tfidf_df = tfidf_df.withColumn("tfidf_values", to_array_udf(col("features")))

In [123]:
tfidf_df.show(10)

                                                                                

+--------------------+--------------------+--------------------+--------------------+--------------------+
|                text|               words|         rawFeatures|            features|        tfidf_values|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|Maybe AI can fina...|[maybe, ai, can, ...|(27,[0,1,2,3,4,5,...|(27,[0,1,2,3,4,5,...|[0.0, 0.0, 0.5877...|
|Maybe AI can fina...|[maybe, ai, can, ...|(27,[0,1,2,3,4,5,...|(27,[0,1,2,3,4,5,...|[0.0, 0.0, 0.5877...|
|Maybe AI can fina...|[maybe, ai, can, ...|(27,[0,1,2,3,4,5,...|(27,[0,1,2,3,4,5,...|[0.0, 0.0, 0.5877...|
|Maybe AI can fina...|[maybe, ai, can, ...|(27,[0,1,2,3,4,5,...|(27,[0,1,2,3,4,5,...|[0.0, 0.0, 0.5877...|
|It is already my ...|[it, is, already,...|(27,[0,1,6,9,21],...|(27,[0,1,6,9,21],...|[0.0, 0.0, 0.0, 0...|
|It is already my ...|[it, is, already,...|(27,[0,1,6,9,21],...|(27,[0,1,6,9,21],...|[0.0, 0.0, 0.0, 0...|
|It is already my ...|[it, is, alread

In [124]:
exploded_df = tfidf_df.select(explode(
            array([struct(lit(vocab[i]).alias("word"), col("tfidf_values")[i].alias("tfidf")) for i in range(len(vocab))])
        ).alias("word_tfidf"))

In [125]:
exploded_df.show(10)

+--------------------+
|          word_tfidf|
+--------------------+
|           {it, 0.0}|
|           {is, 0.0}|
|{been, 0.58778666...|
|{supposed, 0.5877...|
|{using, 0.5877866...|
|{since, 0.5877866...|
|           {my, 0.0}|
|{unable, 0.587786...|
|{exactly, 0.58778...|
|         {dude, 0.0}|
+--------------------+
only showing top 10 rows



In [126]:
top_words_df = exploded_df.select("word_tfidf.word", "word_tfidf.tfidf")

In [127]:
window_spec = W.orderBy(col("tfidf").desc())
top_words_df = top_words_df.withColumn("rank", row_number().over(window_spec)).filter(col("rank") <= 10)

In [128]:
top_words_df = top_words_df.withColumn("window_start", lit(window_start)).withColumn("window_end", lit(window_end))

In [129]:
top_words_df.show(5)

24/06/11 18:00:51 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/11 18:00:51 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/11 18:00:51 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

+-----+------------------+----+-------------------+-------------------+
| word|             tfidf|rank|       window_start|         window_end|
+-----+------------------+----+-------------------+-------------------+
| what|0.5877866649021191|   1|2024-06-03 21:20:45|2024-06-03 21:21:45|
|maybe|0.5877866649021191|   2|2024-06-03 21:20:45|2024-06-03 21:21:45|
|  can|0.5877866649021191|   3|2024-06-03 21:20:45|2024-06-03 21:21:45|
|   so|0.5877866649021191|   4|2024-06-03 21:20:45|2024-06-03 21:21:45|
|   to|0.5877866649021191|   5|2024-06-03 21:20:45|2024-06-03 21:21:45|
+-----+------------------+----+-------------------+-------------------+
only showing top 5 rows



                                                                                

    def process_window(row):
        window_start, window_end = row["window"]["start"], row["window"]["end"]
        texts = row["texts"]

        # Create a DataFrame for the texts in the window
        texts_df = spark.createDataFrame([(text,) for text in texts], ["text"])

        # Fit the pipeline to the data
        model = pipeline.fit(texts_df)

        # Transform the data
        tfidf_df = model.transform(texts_df)

        # Extract the vocabulary and TF-IDF features
        vocab = model.stages[1].vocabulary

        # Convert sparse vector to dense vector
        def to_array(v):
            if v is None:
                return None
            return v.toArray().tolist()

        to_array_udf = udf(to_array, ArrayType(DoubleType()))
        tfidf_df = tfidf_df.withColumn("tfidf_values", to_array_udf(col("features")))

        # Explode the features column to get individual words and their TF-IDF scores
        exploded_df = tfidf_df.select(
            explode(
                array(
                    [
                        struct(
                            lit(vocab[i]).alias("word"),
                            col("tfidf_values")[i].alias("tfidf"),
                        )
                        for i in range(len(vocab))
                    ]
                )
            ).alias("word_tfidf")
        )

        # Select word and tfidf score
        top_words_df = exploded_df.select("word_tfidf.word", "word_tfidf.tfidf")

        # Get top 10 words based on TF-IDF scores
        window_spec = W.orderBy(col("tfidf").desc())
        top_words_df = top_words_df.withColumn(
            "rank", row_number().over(window_spec)
        ).filter(col("rank") <= 10)

        # Add window information
        top_words_df = top_words_df.withColumn(
            "window_start", lit(window_start)
        ).withColumn("window_end", lit(window_end))

        return top_words_df

In [135]:
def process_window(row):
    window_start, window_end = row["window"]["start"], row["window"]["end"]
    texts = row["texts"]

    # Create a DataFrame for the texts in the window
    texts_df = spark.createDataFrame([(text,) for text in texts], ["text"])

    # Fit the pipeline to the data
    model = pipeline.fit(texts_df)

    # Transform the data
    tfidf_df = model.transform(texts_df)

    # Extract the vocabulary and TF-IDF features
    vocab = model.stages[1].vocabulary

    # Convert sparse vector to dense vector
    def to_array(v):
        if v is None:
            return None
        return v.toArray().tolist()

    to_array_udf = udf(to_array, ArrayType(DoubleType()))
    tfidf_df = tfidf_df.withColumn("tfidf_values", to_array_udf(col("features")))

    # Explode the features column to get individual words and their TF-IDF scores
    exploded_df = tfidf_df.select(
        explode(
            array(
                [
                    struct(
                        lit(vocab[i]).alias("word"),
                        col("tfidf_values")[i].alias("tfidf"),
                    )
                    for i in range(len(vocab))
                ]
            )
        ).alias("word_tfidf")
    )

    # Select word and tfidf score
    top_words_df = exploded_df.select("word_tfidf.word", "word_tfidf.tfidf")

    # Get top 10 words based on TF-IDF scores
    window_spec = W.orderBy(col("tfidf").desc())
    top_words_df = top_words_df.withColumn(
        "rank", row_number().over(window_spec)
    ).filter(col("rank") <= 10)

    # Add window information
    top_words_df = top_words_df.withColumn(
        "window_start", lit(window_start)
    ).withColumn("window_end", lit(window_end))

    # Write the results to Parquet files
    # partition_columns = ["window_start", "window_end"]
    top_words_df.write.mode("append").partitionBy(*partition_columns).parquet(output_dir)

for row in windowed_df.rdd.collect():
        process_window(row)

24/06/11 18:05:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/11 18:05:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/11 18:05:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/11 18:05:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/11 18:05:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/11 18:05:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/11 1

KeyboardInterrupt: 

test

In [85]:
df_result = spark.read.parquet("data/tfidf/*.parquet")

In [88]:
df_result.count()

5005