# ChangeMyView - Big Data Project


##  Project Pipeline — Notebook-by-Notebook Roadmap  

| Stage | Notebook | Main Goal | Key Actions | Output written to GCS |
|-------|----------|-----------|-------------|-----------------------|
| **N1 · Data Acquisition & Pre-processing** | `Notebook_01_Data_preprocessing.ipynb` | Turn 4.3 GB of raw CMV JSON into a clean, tokenised dataset. | 1. Download Zenodo dump (65 k threads).<br>2. Keep only *original* posts (OPs).<br>3. Merge `title` + `selftext`; regex scrub; NLTK tokenise & lemmatise.<br>4. Persist cleaned DF partitioned by `thread_id`. | `gs://cmv-bucket/n1_preprocessed_df/` |
| **N2 · Topic Discovery (LDA)** | `Notebook_02_LDA.ipynb` | Uncover discussion themes. | 1. Load N1 DF.<br>2. `CountVectorizer` → sparse TF matrix.<br>3. Fit 45-topic Spark LDA; manual coherence tuning.<br>4. Map topics → 14 macro-categories; attach dominant topic to each post. | `gs://cmv-bucket/n2_categorised_df/` |
| **N3 · Feature Engineering** | `Notebook_03_Feature_engineering.ipynb` | Build a rich feature set for persuasion prediction. | 1. Load N2 DF.<br>2. Add lexical, structural, interaction features.<br>3. Standardise & assemble into a `features` vector. | `gs://cmv-bucket/n3_features_df/` |
| **N4 · Persuasion Modelling** | `Notebook_04_Modelling.ipynb` | Predict whether a comment earns a Δ. | 1. Load N3 DF.<br>2. Handle class imbalance (SMOTE + undersampling).<br>3. Train & tune Logistic Reg., RF, GBT, Dist. Trees.<br>4. Log metrics (F<sub>1</sub>, precision, recall). | `gs://cmv-bucket/n4_models/` |
| **N5 · Exploratory Visualisation** | `Notebook_05_Visualisation.ipynb` | Tell the story visually. | 1. Load N3 (or N4) artefacts.<br>2. Plot topic prevalence, Δ-rates, t-SNE clusters, time trends. | Figures saved to `gs://cmv-bucket/figures/` |

> **Dependency chain:** N1 ➜ N2 ➜ N3 ➜ {N4, N5}.  
> Each notebook can be rerun independently as long as its upstream artefact folder exists in GCS.


# Notebook 01 – Data Pre-processing Pipeline
*A first look at how we turn 4.3 GB of raw ChangeMyView JSON into tidy, model-ready tables.*

> **Goal in one line:** build a fully distributed Spark pipeline that cleans, labels, and tokenises each discussion thread **without pulling any data back to the driver**.


### Import statements

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, ArrayType,  LongType
from pyspark.sql.functions import from_unixtime, year, month, date_format
from pyspark.sql.functions import min, max

import re
from pyspark.sql.functions import split
from nltk.corpus import stopwords, words
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer

from pyspark.sql.functions import udf, concat_ws, col
from pyspark.sql.types import StringType

# Download NLTK resources
import nltk
nltk.download('stopwords')
nltk.download('punkt')
nltk.download('wordnet')
nltk.download('words')

stop_words = set(stopwords.words('english'))
english_words = set(words.words())
lemmatizer = WordNetLemmatizer()



[nltk_data] Downloading package stopwords to
[nltk_data]     /usr/local/share/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /usr/local/share/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     /usr/local/share/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package words to /usr/local/share/nltk_data...
[nltk_data]   Package words is already up-to-date!



## 1 - Data source  

[https://zenodo.org/records/1043504]

**Explicit Schema**

To speed up loading and avoid schema inference errors, we define an explicit `StructType` for the Reddit JSON.  
This guarantees consistent data types across runs and avoids surprises like `LongType` misinterpretations or null-handling issues.

There is a description of key fields included in the code below.




In [None]:
schema_two = StructType([
    StructField("num_comments", IntegerType(), True),     # number of comments in the post
    StructField("selftext", StringType(), True),          # descriptive text of the post
    StructField("score", IntegerType(), True),            # ?????????
    StructField("title", StringType(), True),             # title of the post
    StructField("delta", BooleanType(), True),            # target
    StructField("urls", ArrayType(StringType()), True),   # number of urls in the post
    StructField("name", StringType(), True),              # user unique identifier
    StructField("created_utc", IntegerType(), True)       # date and time of creation
])

df_change = spark.read.schema(schema_two).json("gs://st446-cmv/threads.jsonl.bz2")


## 2 - Data preprocessing

1. **Timestamp fix-up**
2. **Moderator cleanup**
3. **Field merge**
4. **Regex scrub**
5. **Tokenisation & lemmatisation**
6. **Token split**

Sample output:
- `merged`: "I think this should be changed because..."
- `processed`: \[think, changed\]


In [None]:
#data check for the shape
shape_two = (df_change.count(), len(df_change.columns))
print(f"Shape: {shape_two}")



Shape: (65169, 8)


                                                                                

1. **Timestamp fix-up** – cast `created_utc` → Spark `timestamp`, add `year_month` for partitioning.  

In [None]:
# Timestamp fix-up
# Convert the createdUTC to an understandable date time
df_change = df_change.withColumn("created_datetime", from_unixtime("created_utc").cast("timestamp"))
df_change = df_change.withColumn("year_month", date_format("created_datetime", "yyyy-MM"))

In [None]:
# Display time range
df_change.agg(
    min("created_datetime").alias("min_date"),
    max("created_datetime").alias("max_date")
).show()




+-------------------+-------------------+
|           min_date|           max_date|
+-------------------+-------------------+
|2013-01-17 16:24:11|2017-09-30 23:13:07|
+-------------------+-------------------+



                                                                                

2. **Remove moderator comments** – some post contain moderator comments in `selftext` in between stars

Notice that not all posts contain 'selftext', meaning all information is contained in the title.


In [None]:
# Remove some moderator comments at the bottom of some comments that were obstructing future process
def remove_starred_text(text):
    if not text:
        return text
    # All occurence between ** are taken out
    return re.sub(r"\*{1,3}.*?\*{1,3}", "", text)

remove_starred_text_udf = udf(remove_starred_text, StringType())
df_removed = df_change.withColumn("selftext", remove_starred_text_udf(col("selftext")))
df_removed.select("selftext").show(5, truncate=False)

[Stage 13:>                                                         (0 + 1) / 1]

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

3. **Merge fields** – concatenate `title` + `selftext` into a single `merged` column.  

In [None]:
# here we merge text + title
df_merged = df_removed.withColumn("merged", concat_ws(" ", col("selftext"), col("title")))

4. **Regex scrub** – lower-case, strip markdown, punctuation, and URLs with `regexp_replace`.  
5. **Tokenise + lemmatise** – NLTK `word_tokenize` + WordNet in a UDF; extended stop-list removes CMV-specific filler (“change”, “view”, “opinion”…).  

In [None]:
# Specific list of words with high repetition on CMV 'titles' or  'selftext'
custom_stopwords = set(stopwords.words('english')).union({
    'also', 'could', 'one', 'would', 'use', 'say', 'even', 'thing', 'get', 'much',
    'change', 'view', 'opinion', 'really', 'make', 'still', 'see', 'think',
    'know', 'like', 'way', 'go', 'come', 'want','think','believe', 'wa', 'ha', 'dont', 'people',
    'feel','', 'please', 'u', 'people', 'dont', 'please', 'feel', 'first', 'comment', 'question', 'read', 'look', 'effective', 'concern',
    'right', 'message', 'report', 'speaking','however','going','footnote','thinking','topic','firstly','many','moderator','user','hello',
    'something','cant','someone'})

# We preprocess carefully the full text as best as possible for lda
def preprocess_text(text):
    text = text.lower()
    text = re.sub(r'[^a-z\s]', '', text)
    text = re.sub(r'\s+', ' ', text)
    tokens = word_tokenize(text)
    tokens = [lemmatizer.lemmatize(word) for word in tokens]
    tokens = [word for word in tokens if word not in custom_stopwords]
    tokens = [word for word in tokens if word in english_words]
    return " ".join(tokens)
preprocess_udf = udf(preprocess_text, StringType())
df_processed = df_merged.withColumn("processed", preprocess_udf(col("merged")))

6. **Final Token Split**

At this point, the `processed` column still contains each post as a single space-separated string of lemmatised, cleaned words.  
We now convert this into an actual array of tokens using Spark's `split()` function, which will be required for downstream steps.

The output below shows:
- `merged`: the raw text (title + selftext)
- `processed`: a list of cleaned tokens, with stopwords and punctuation already removed

This transformation ensures our pipeline remains distributed and avoids flattening the token list into separate rows.


In [None]:
#split processed
final_df = df_processed.withColumn("processed", split(df_processed["processed"], " "))
final_df.show(3, truncate = False)

[Stage 14:>                                                         (0 + 1) / 1]

+------------+--------+-----+--------------------------------------------------------------------------------------------------------------------------------------+-----+----+---------+-----------+-------------------+----------+---------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------+
|num_comments|selftext|score|title                                                                                                                                 |delta|urls|name     |created_utc|created_datetime   |year_month|merged                                                                                                                                 |processed                                                     |
+------------+--------+-----+-------------------------------------------------------------------------------------------------------------------

                                                                                

## 3 – Export to Cloud Storage  

We write the final cleaned DataFrame to GCS as JSON, partitioned by `thread_id` to preserve debate structure.  
This output will be used as input for topic modelling in Notebook 02.


In [None]:
final_df.write \
         .mode("overwrite") \
         .option("header", "true") \
         .json("gs://st446-cmv/n1_preprocessing_df/")



# Appendix

### Just politics & technology

In [None]:
def classify_topic(text: str) -> int:
    politics_keywords = {
        "government", "policy", "election", "senate", "congress", "democracy", "republic", "president",
        "left-wing", "right-wing", "liberal", "conservative", "capitalism", "socialism", "nationalism",
        "politician", "parliament", "vote", "voting", "law", "legislation", "public sector", "minister",
        "trump", "tax", "policy","policy","democrat","republican"
    }

    technology_keywords = {
        "ai", "artificial intelligence", "black box", "cpu", "machine learning", "technology", "software", "hardware",
        "robotics", "data", "algorithm", "python", "coding", "developer", "engineer", "app", "computer",
        "neural network", "gpu", "chip", "tech", "smartphone", "device", "programming", "tech industry"
    }

    # Compile regex once per executor (works well for large datasets)
    politics_pattern = re.compile(r'\b(?:' + '|'.join(re.escape(word) for word in politics_keywords) + r')\b', flags=re.IGNORECASE)
    technology_pattern = re.compile(r'\b(?:' + '|'.join(re.escape(word) for word in technology_keywords) + r')\b', flags=re.IGNORECASE)

    text = text.lower() if isinstance(text, str) else ""

    politics_match = bool(politics_pattern.search(text))
    technology_match = bool(technology_pattern.search(text))

    if politics_match and technology_match:
        return 2  # Other
    elif politics_match:
        return 0  # Politics
    elif technology_match:
        return 1  # Technology
    else:
        return 2  # Other

classify_udf = udf(classify_topic, IntegerType())


In [None]:
#politics or technology
df_change = df_change.withColumn("explicit_class", classify_udf("processed"))

In [None]:
df_change.groupBy("explicit_class").count().show()

25/04/19 10:58:54 WARN TaskSetManager: Lost task 0.0 in stage 14.0 (TID 311) (st446-cluster-w08-w-0.europe-west2-c.c.delta-container-448310-n4.internal executor 12): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/tmp/ipykernel_12453/2792152010.py", line 6, in preprocess_text
  File "/opt/conda/miniconda3/lib/python3.11/site-packages/nltk/tokenize/__init__.py", line 129, in word_tokenize
    sentences = [text] if preserve_line else sent_tokenize(text, language)
                                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/miniconda3/lib/python3.11/site-packages/nltk/tokenize/__init__.py", line 106, in sent_tokenize
    tokenizer = PunktTokenizer(language)
                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/miniconda3/lib/python3.11/site-packages/nltk/tokenize/punkt.py", line 1744, in __init__
    self.load_lang(lang)
  File "/opt/conda/miniconda3/lib/python3.11/site-packages/nltk/tokenize/punkt.py", line 1

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/tmp/ipykernel_12453/2792152010.py", line 6, in preprocess_text
  File "/opt/conda/miniconda3/lib/python3.11/site-packages/nltk/tokenize/__init__.py", line 129, in word_tokenize
    sentences = [text] if preserve_line else sent_tokenize(text, language)
                                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/miniconda3/lib/python3.11/site-packages/nltk/tokenize/__init__.py", line 106, in sent_tokenize
    tokenizer = PunktTokenizer(language)
                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/miniconda3/lib/python3.11/site-packages/nltk/tokenize/punkt.py", line 1744, in __init__
    self.load_lang(lang)
  File "/opt/conda/miniconda3/lib/python3.11/site-packages/nltk/tokenize/punkt.py", line 1749, in load_lang
    lang_dir = find(f"tokenizers/punkt_tab/{lang}/")
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/miniconda3/lib/python3.11/site-packages/nltk/data.py", line 582, in find
    raise LookupError(resource_not_found)
LookupError: 
**********************************************************************
  Resource [93mpunkt_tab[0m not found.
  Please use the NLTK Downloader to obtain the resource:

  [31m>>> import nltk
  >>> nltk.download('punkt_tab')
  [0m
  For more information see: https://www.nltk.org/data.html

  Attempted to load [93mtokenizers/punkt_tab/english/[0m

  Searched in:
    - '/home/nltk_data'
    - '/opt/conda/miniconda3/nltk_data'
    - '/opt/conda/miniconda3/share/nltk_data'
    - '/opt/conda/miniconda3/lib/nltk_data'
    - '/usr/share/nltk_data'
    - '/usr/local/share/nltk_data'
    - '/usr/lib/nltk_data'
    - '/usr/local/lib/nltk_data'
**********************************************************************



In [None]:
#given the massive size of our data this step is essential otherwise the data takes too long to load
#the schema is given on the original website

schema = StructType([
    StructField("author", StringType(), True),
    StructField("body", StringType(), True),
    StructField("normalizedBody", StringType(), True),
    StructField("content", StringType(), True),
    StructField("content_len", LongType(), True),
    StructField("summary", StringType(), True),
    StructField("summary_len", LongType(), True),
    StructField("id", StringType(), True),
    StructField("subreddit", StringType(), True),
    StructField("subreddit_id", StringType(), True),
    StructField("title", StringType(), True)
])

original_df = spark.read.schema(schema).json("gs://dataproc-staging-europe-west2-869961533204-pghri6io/corpus-webis-tldr-17.json")
original_df.show(5, truncate=False)

AnalysisException: [PATH_NOT_FOUND] Path does not exist: gs://dataproc-staging-europe-west2-869961533204-pghri6io/corpus-webis-tldr-17.json.

In [None]:
shape = (df_cmview.count(), len(df_cmview.columns))
print(f"Shape: {shape}")



Shape: (9212, 11)


                                                                                

In [None]:
#Curious to see the size the of the file, checking through a count approach is too long compared to rdd
rdd_shape = spark.sparkContext.textFile("gs://dataproc-staging-europe-west2-869961533204-pghri6io/corpus-webis-tldr-17.json")
print(f"Total raw lines: {rdd_shape.count()}")

#There are 3,848,330 observations in our original data



Total raw lines: 3848330


                                                                                

In [None]:
df_cmview = original_df.filter(original_df["subreddit"] == "changemyview")
df_cmview.select("title", "normalizedBody", "subreddit").show(5, truncate=False)


                                                                                

+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------