In [0]:
%pip install vaderSentiment textblob nltk
%pip install datasets
%pip install transformers
%pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu
%pip install tensorflow
%pip install praw
%pip install tf-keras
%pip install gensim spacy nltk wordcloud
%pip install spacy
%pip install nltk 
!python -m spacy download en_core_web_sm
!python -m spacy download en_core_web_md

In [0]:
# dbutils.library.restartPython()

In [0]:
import nltk as nltk
import os, time, math
import pandas as pd
import praw
import zipfile
import os
import numpy as np
import re
from transformers import pipeline
from pyspark.sql.functions import col, when, row_number, desc, concat_ws, udf, substring, when, avg
from datetime import datetime
from pyspark.sql.types import StringType, FloatType, StructType, StructField, ArrayType
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from textblob import TextBlob
from nltk.corpus import wordnet
from gensim.utils import simple_preprocess
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.corpus import brown
from sklearn.model_selection import train_test_split
from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline
from tqdm import tqdm 
from gensim.corpora import Dictionary
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score

In [0]:

reddit = praw.Reddit(
    client_id="[REDACTED]",
    client_secret="[REDACTED]",
    user_agent="test_sent"
)

SUBREDDITS = ["BritishProblems", "AskUK", "UnitedKingdom", "UKPolitics"]

# Mix of listings to cover recent + historic + trending
STREAMS = {
    "hot":           {"fn": "hot",           "limit": 500},
    "new":           {"fn": "new",           "limit": 800},
    "rising":        {"fn": "rising",        "limit": 200},
    "top_day":       {"fn": "top",           "limit": 400, "time_filter": "day"},
    "top_week":      {"fn": "top",           "limit": 600, "time_filter": "week"},
    "top_month":     {"fn": "top",           "limit": 800, "time_filter": "month"},
    "top_year":      {"fn": "top",           "limit": 1000,"time_filter": "year"},
    "top_all":       {"fn": "top",           "limit": 1000,"time_filter": "all"},
    "contro_week":   {"fn": "controversial", "limit": 300, "time_filter": "week"},
    "contro_year":   {"fn": "controversial", "limit": 600, "time_filter": "year"},
}

# Targeted keyword queries
SEARCH_QUERIES = [
    "emigrate OR emigration",
    "\"leave the UK\" OR \"leaving the UK\"",
    "move abroad OR moving abroad OR relocation",
    "visa OR work permit OR sponsorship",
    "cost of living OR housing OR rent OR salary",
    "NHS OR doctor OR nurse OR healthcare job",
]
SEARCH_LIMIT_PER_QUERY = 300  # per subreddit per query

MIN_DATE = "2016-01-01"  
MIN_TS = int(datetime.strptime(MIN_DATE, "%Y-%m-%d").timestamp()) if MIN_DATE else None

SLEEP = 0.6 

def to_row(sub, sr_name):
    dt = datetime.utcfromtimestamp(sub.created_utc)
    return {
        "id": sub.id,
        "subreddit": sr_name,
        "title": sub.title or "",
        "selftext": sub.selftext or "",
        "author": str(sub.author) if sub.author else "[deleted]",
        "score": sub.score,
        "num_comments": sub.num_comments,
        "created_utc": sub.created_utc,
        "created_dt": dt.strftime("%Y-%m-%d %H:%M:%S"),
        "permalink": f"https://www.reddit.com{sub.permalink}",
        "url": sub.url,
        "flair": sub.link_flair_text,
        "over_18": sub.over_18,
        "locked": sub.locked,
        "distinguished": sub.distinguished
    }

def add_batch(iterable, sr_name, seen, out):
    new_count = 0
    for sub in iterable:
        if MIN_TS and sub.created_utc < MIN_TS:
            # skip older than desired window
            continue
        if sub.id in seen:
            continue
        seen.add(sub.id)
        out.append(to_row(sub, sr_name))
        new_count += 1
    return new_count


all_rows, seen_ids = [], set()

for sr in SUBREDDITS:
    subreddit = reddit.subreddit(sr)
    print(f"\n📥 r/{sr}")

    # 1) Listings (hot/new/top/rising/controversial)
    for name, cfg in STREAMS.items():
        try:
            fetch_fn = getattr(subreddit, cfg["fn"])
            kwargs = {"limit": cfg["limit"]}
            if "time_filter" in cfg:
                kwargs["time_filter"] = cfg["time_filter"]
            batch = fetch_fn(**kwargs)
            added = add_batch(batch, sr, seen_ids, all_rows)
            print(f"  • {name:<12} +{added} (limit={cfg['limit']})")
            time.sleep(SLEEP)
        except Exception as e:
            print(f"  • {name:<12} skipped: {e}")

    # 2) Searches (broad coverage beyond listings)
    for q in SEARCH_QUERIES:
        try:
            batch = subreddit.search(q, sort="relevance", time_filter="all", limit=SEARCH_LIMIT_PER_QUERY)
            added = add_batch(batch, sr, seen_ids, all_rows)
            print(f"  • search [{q[:28]}...] +{added} (limit={SEARCH_LIMIT_PER_QUERY})")
            time.sleep(SLEEP)
        except Exception as e:
            print(f"  • search [{q[:28]}...] skipped: {e}")

df = pd.DataFrame(all_rows)

# Drop NSFW or locked posts if desired
df = df[(~df["over_18"]) & (~df["locked"])]

# Remove truly empty content
df = df[(df["title"].str.strip() != "") | (df["selftext"].str.strip() != "")]
df.reset_index(drop=True, inplace=True)



In [0]:

spk_reddit_dir = spark.createDataFrame(df)
spark_reddit_df = spk_reddit_dir.withColumn("concatenated_text", concat_ws(" ", col("title"), col("selftext")))
display(spark_reddit_df)

In [0]:

# Initialize VADER sentiment analyzer
analyzer = SentimentIntensityAnalyzer()

# Function to get VADER sentiment scores (for short texts like comments)
def get_vader_sentiment(text):
    score = analyzer.polarity_scores(str(text))["compound"]
    return "Positive" if score > 0.05 else "Negative" if score < -0.05 else "Neutral"

# Function to get TextBlob sentiment scores (for longer texts like posts)
def get_textblob_sentiment(text):
    score = TextBlob(str(text)).sentiment.polarity
    return "Positive" if score > 0.05 else "Negative" if score < -0.05 else "Neutral"


# Register UDF
get_textblob_sentiment_udf = udf(get_textblob_sentiment, StringType())

# Apply sentiment analysis to posts
spark_reddit_df = spark_reddit_df.withColumn("sentiment", get_textblob_sentiment_udf(spark_reddit_df["concatenated_text"]))



In [0]:

# Load a sentiment-analysis pipeline
classifier = pipeline("sentiment-analysis", model="cardiffnlp/twitter-roberta-base-sentiment")

# Run sentiment
result = classifier("It’s a disgrace - Labour MP slammed for charging taxpayers £900 ‘pet rent’ for dog to live with her in London home")

print(result)

In [0]:
def get_sentiment(text):
    if not text:
        return ("Neutral", 0.0)
    try:
        result = sentiment_model(text[:512])[0]  # truncate to 512 tokens
        label = label_map.get(result['label'], result['label'])
        score = float(result['score'])
        return (label, score)
    except Exception:
        return ("Neutral", 0.0)


In [0]:
# Load the sentiment classifier
sentiment_model = pipeline("sentiment-analysis", model="cardiffnlp/twitter-roberta-base-sentiment")

# Label map (optional, to convert LABEL_0 → Negative, etc.)
label_map = {
    "LABEL_0": "Negative",
    "LABEL_1": "Neutral",
    "LABEL_2": "Positive"
}

# Define schema for struct return type
schema = StructType([
    StructField("sentiment_label", StringType(), True),
    StructField("sentiment_score", FloatType(), True)
])

# Create UDF
sentiment_udf = udf(get_sentiment, schema)


In [0]:
# Apply the UDF
df_with_sentiment = spark_reddit_df.withColumn("sentiment_struct", sentiment_udf("concatenated_text"))

# Split the struct into separate columns
df_with_sentiment = df_with_sentiment \
    .withColumn("sentiment_label", df_with_sentiment["sentiment_struct.sentiment_label"]) \
    .withColumn("sentiment_score", df_with_sentiment["sentiment_struct.sentiment_score"]) \
    .drop("sentiment_struct", "upvotes", "comments", "title", "text")


In [0]:

# If column is string
df_with_sentiment = df_with_sentiment.withColumn("year", substring(col("created_dt"), 1, 4))



In [0]:
columns_to_drop = ['created_dt', 'sentiment']
df_with_sentiment = df_with_sentiment.drop(*columns_to_drop)
df_with_sentiment.show()


In [0]:

df_with_sentiment = df_with_sentiment.withColumn(
    "sentiment_score",
    when(df_with_sentiment.sentiment_label == "Positive", 1)
    .when(df_with_sentiment.sentiment_label == "Negative", -1)
    .otherwise(0)  # optional: treat neutral or unknown as 0
)


In [0]:

yearly_sentiment = df_with_sentiment.groupBy("year").agg(avg("sentiment_score").alias("avg_sentiment"))


In [0]:

yearly_sentiment = yearly_sentiment.withColumn(
    "year",
    col("year").cast("int")
)

yearly_sentiment_pd = yearly_sentiment.orderBy("year").toPandas()

## Preprocess Texts

In [0]:

nltk.download('wordnet', download_dir="/dbfs/tmp/nltk_data/")

dbutils.fs.ls('dbfs:/tmp/nltk_data')

In [0]:

def unzip_if_needed(zip_path, extract_to):
    if not os.path.exists(extract_to):
        os.makedirs(extract_to, exist_ok=True)
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(extract_to)
        print(f"✅ Unzipped {zip_path} to {extract_to}")
    else:
        print(f"ℹ️ Already extracted: {extract_to}")

unzip_if_needed("/dbfs/tmp/nltk_data/corpora/wordnet.zip", "/dbfs/tmp/nltk_data/corpora/wordnet")
unzip_if_needed("/dbfs/tmp/nltk_data/corpora/omw-1.4.zip", "/dbfs/tmp/nltk_data/corpora/omw-1.4")


In [0]:
dbutils.fs.put('/dbfs/databricks/scripts/nltk-install.sh', """ 
#!/bin/bash
pip install nltk""", True)

dbutils.fs.head('/dbfs/databricks/scripts/nltk-install.sh')

In [0]:
# nltk.download('all', download_dir="/dbfs/databricks/nltk_data/")


In [0]:
nltk.data.path.append("/dbfs/tmp/nltk_data/wordnet/wordnet")

In [0]:
nltk.data.path.append("/dbfs/tmp/nltk_data")
print(wordnet.synsets("dog"))

In [0]:

# Set random seed
np.random.seed(400)

# Append NLTK data path on the driver (good practice for driver operations too)
nltk.data.path.append("/dbfs/tmp/nltk_data")

# Load and broadcast stopwords (only plain objects should be broadcasted)
STOPWORDS = set(stopwords.words('english'))
sc = spark.sparkContext
broadcast_stopwords = sc.broadcast(STOPWORDS)

# ✅ Updated Preprocessing Function with Internal nltk.data.path
def preprocess(text):
    import nltk
    from nltk.stem import WordNetLemmatizer
    from gensim.utils import simple_preprocess

    # Set nltk data path on worker
    nltk.data.path.append("/dbfs/tmp/nltk_data")

    if not text:
        return []

    stopwords_set = broadcast_stopwords.value
    lemmatizer = WordNetLemmatizer()

    result = [
        lemmatizer.lemmatize(token, 'v')
        for token in simple_preprocess(text)
        if token not in stopwords_set
    ]
    return result

# Register as UDF
preprocess_udf = udf(preprocess, ArrayType(StringType()))

# Function to apply to a column
def clean_text_column(df, input_col, output_col="cleaned_text"):
    return df.withColumn(output_col, preprocess_udf(df[input_col]))

# Example usage
cleaned_df = clean_text_column(df_with_sentiment, "concatenated_text")
display(cleaned_df)


## Training

In [0]:
nltk.download('brown')  # download Brown corpus
nltk.download('universal_tagset')   # download the POS tags data
from nltk.corpus import brown
nltk_data = list(brown.tagged_sents(tagset='universal'))

In [0]:

train_set, test_set = train_test_split(
    nltk_data,
    train_size=0.80,  # use 80% as the training data
    test_size=0.20,
    random_state=101
)
print(f'Number of training sentences: {len(train_set)}')
print(f'Number of test sentences: {len(test_set)}')

# Separate the labels from the text
train_toks = []  # each item in the list is a list of tokens in a document
train_tags = []  # each item in the list is a list of corresponding tags
for tagged_sentence in train_set:
    sentence_toks = []
    sentence_tags = []
    for token, tag in tagged_sentence:
        sentence_toks.append(token)
        sentence_tags.append(tag)

    train_toks.append(sentence_toks)
    train_tags.append(sentence_tags)

test_toks = []
test_tags = []
for tagged_sentence in test_set:
    sentence_toks = []
    sentence_tags = []
    for token, tag in tagged_sentence:
        sentence_toks.append(token)
        sentence_tags.append(tag)
    test_toks.append(sentence_toks)
    test_tags.append(sentence_tags)

print(f'Number of training sentences in train_toks: {len(train_toks)}')
print(f'Number of test sentences in test_toks: {len(test_toks)}')

In [0]:

# Convert the tokens to IDs in a vocabulary ready for input to our models
dictionary = Dictionary(train_toks + test_toks)

train_toks_encoded = [dictionary.doc2idx(sent) for sent in train_toks]
test_toks_encoded = [dictionary.doc2idx(sent) for sent in test_toks]
print(f'Example sentence: {train_toks_encoded[3]}')

V = len(dictionary.values())  # vocabulary
print(f'Size of vocabulary is {V}')

In [0]:

# Convert the tags from their names to numbers
tag_encoder = LabelEncoder()
tag_encoder.fit([tag for sentence in train_tags for tag in sentence])
train_tags_encoded = [tag_encoder.transform(sentence) for sentence in train_tags]
test_tags_encoded = [tag_encoder.transform(sentence) for sentence in test_tags]

num_tags = len(tag_encoder.classes_)

In [0]:
tag_encoder.classes_

In [0]:

transitions = np.zeros((num_tags, num_tags))
start_states = np.zeros(num_tags)

for sentence_tags in tqdm(train_tags_encoded):
    for i, tag in enumerate(sentence_tags):
        if i==0:
            start_states[tag] += 1
            continue
        ### WRITE YOUR OWN CODE HERE
        transitions[sentence_tags[i-1], tag] += 1


In [0]:
### WRITE YOUR CODE HERE
transitions /= np.sum(transitions, 1)[:, None]
start_states /= np.sum(start_states)

In [0]:
observations = np.zeros((num_tags, V))

for i, sentence_toks in tqdm(enumerate(train_toks_encoded)):
    sentence_tags = train_tags_encoded[i]
    for j, tok in enumerate(sentence_toks):
        tag = sentence_tags[j]
        # WRITE YOUR OWN CODE HERE
        observations[tag, tok] += 1

In [0]:
def viterbi(observed_seq, num_tags, start_probs, transition_probs, observation_probs):
    eps = 1e-7

    num_obs = len(observed_seq)

    # Initialise the V and backpointers
    V = np.zeros((num_obs, num_tags))
    backpointer = np.zeros((num_obs, num_tags))

    # For the first data point in the sequence:
    V[0, :] = start_probs * observation_probs[:, observed_seq[0]]

    # Run Viterbi forward for t > 0
    for t in range(1, num_obs):

        for state in range(num_tags):
            # probabilities for all the sequences leading to this state at time t
            seq_prob = V[t-1, :] * transition_probs[:, state]

            # Choose the most likely sequence
            max_seq_prob = np.max(seq_prob)
            best_previous_state = np.argmax(seq_prob)

            # Calculate the probability of the most likely sequence leading to this state at time t, including the current observation.
            # Add eps to help with numerical issues.
            V[t, state] = (max_seq_prob + eps) * (observation_probs[state, observed_seq[t]] + eps)

            backpointer[t, state] = best_previous_state

    t = num_obs - 1

    # Initialise the sequence of predicted states
    state_seq = np.zeros(num_obs, dtype=int)

    # Get the most likely final state:
    state_seq[t] = np.argmax(V[t, :])

    # Backtrack until the first observation
    for t in range(len(observed_seq)-1, 0, -1):
        state_seq[t-1] = backpointer[t, state_seq[t]]

    return state_seq

In [0]:
predictions = []
for sentence in tqdm(test_toks_encoded):
    # WRITE YOUR OWN CODE HERE
    predictions.append(viterbi(sentence, num_tags, start_states, transitions, observations))

In [0]:
# Convert the sequence of tag IDs to tag names
predicted_tags = []
for sequence in tqdm(predictions):
    predicted_tags.append(tag_encoder.inverse_transform(sequence))

In [0]:
# print some examples:
examples = [2, 334, 4983, 2389]
for eg in examples:
    print(f'Tokens:      {test_toks[eg]}')
    print(f'Gold tag:    {test_tags[eg]}')
    print(f'Predictions: {predicted_tags[eg]}')

In [0]:
# compute accuracy

all_predictions = [tag for sentence in predictions for tag in sentence]
all_targets = [tag for sentence in test_tags_encoded for tag in sentence]

acc = accuracy_score(all_targets, all_predictions)
print(f'Accuracy = {acc}')

In [0]:


model_name = "dslim/bert-base-NER"

tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForTokenClassification.from_pretrained(model_name)

nlp = pipeline("ner", model=model, tokenizer=tokenizer, aggregation_strategy="simple")

# Sample input sentence
example = "Hugging Face Inc. is a company based in New York City."

# Run inference
ner_results = nlp(example)

for entity in ner_results:
    print(entity)


In [0]:
%python

# Load the pretrained NER pipeline with aggregation
model_name = "dslim/bert-base-NER"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForTokenClassification.from_pretrained(model_name)
nlp = pipeline("ner", model=model, tokenizer=tokenizer, aggregation_strategy="simple")

# Function to replace named entities with joined entities
def replace_named_entities(text):
    entities = nlp(text)
    
    # Sort entities in reverse order to avoid offset issues during replacement
    entities = sorted(entities, key=lambda x: x['start'], reverse=True)
    
    for entity in entities:
        entity_text = entity['word']
        joined_entity = re.sub(r'\s+', '_', entity_text.strip().lower())
        
        # Replace entity in original text
        start, end = entity['start'], entity['end']
        text = text[:start] + joined_entity + text[end:]
    
    return text

# Register the function as a UDF
replace_named_entities_udf = udf(replace_named_entities, StringType())

# Apply the UDF to the DataFrame
total_df = cleaned_df.withColumn('processed_text', replace_named_entities_udf(cleaned_df['concatenated_text']))

# Check results
display(total_df.select('concatenated_text', 'processed_text').limit(5))

In [0]:
total_df.createOrReplaceTempView("temp_view_total_df")