# Group 5 - Youshi Zhang, Maneel Reddy, Gurusankar Gopalakrishnan, Sentiment Analysis of Reddit Posts 
ML Goals (Group 5)

- Given a reddit post, what is the sentiment scores (Positive, Negative, Neutral)? Here Reddit posts are pulled from a list of subreddits that are associated with stock investment

ML Outcome
- Used FinBert to predict sentiments scores for 13855 reddit posts from Feb 3rd to Mar 3rd.
- FinBert allows at most 512 tokens for input and some long reddit post far exeeds that. We decide to divide each post into paragraphes (along newline character), predict on each paragraph, aggregate the results back to post level (aggregation is done in later step, not here)

In [0]:
dbutils.widgets.removeAll()
dbutils.widgets.dropdown('USE_GPU', 'no', ['no', 'yes'])
# USE_GPU = dbutils.widgets.get('USE_GPU') == 'yes'

In [0]:
import os
import numpy as np
import pandas as pd
import datetime
import torch
import json
import gc

from transformers import AutoTokenizer, AutoModelForSequenceClassification, DataCollatorWithPadding
from datasets import Dataset
from torch.utils.data import DataLoader

import pyspark
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.types import *

import warnings
warnings.filterwarnings('ignore')



### Initiate Spark session and test Mongo connection

In [0]:
spark = SparkSession.builder.getOrCreate()

In [0]:
# REF_STRING = "mongodb+srv://admin:<password>@msds697-cluster.qzgwq.mongodb.net/"
def read_df_from_mongo(spark, collection_name):
    mongo_username = 'admin'
    mongo_password =  'msds697'
    mongo_ip_address = 'msds697-cluster.qzgwq.mongodb.net/'
    MONGO_DB_NAME = "msds697_project"
    connection_string = f"mongodb+srv://{mongo_username}:{mongo_password}@{mongo_ip_address}{MONGO_DB_NAME}.{collection_name}"
    spark_df = spark.read.format("mongo").option("uri", connection_string).load()
    return spark_df
    
def store_df_to_mongo(spark, spark_df, collection_name):
    mongo_username = 'admin'
    mongo_password =  'msds697'
    mongo_ip_address = 'msds697-cluster.qzgwq.mongodb.net/'
    MONGO_DB_NAME = "msds697_project"
    connection_string = f"mongodb+srv://{mongo_username}:{mongo_password}@{mongo_ip_address}{MONGO_DB_NAME}.{collection_name}"
    spark_df.write.format("com.mongodb.spark.sql.DefaultSource")\
                     .mode("overwrite")\
                     .option("uri", connection_string)\
                     .save()

In [0]:
df_raw = read_df_from_mongo(spark, "reddit2")
df_raw.show(10)

+--------------------+-------------+-------------------+--------+--------------------+--------------+-----------------------------+----------------------+---------------------+------------------------+-----------------+-----------------------+-----------------+---------------+--------------------+--------------+--------+--------+--------+------------------+------------+-----------+----------------+---------------------+---------------+-------------+--------------------+------+---------+--------------------+------+--------+------+----------+-------+----------------------+----------------+----------+-------+-------------------+----------------------+------------------+-------+--------+---------------------------+--------------------+--------------------+----------------------+---------------+---------------------+---------------+------+-----+-----------+--------------------+----------+---------+------------+--------------+-------+-----------------------+--------------------+------+------

### Preprocessing the data
- Subset columns: The API to pull reddit data, unfortunately, does not return uniform schema across queries. To prevent errors from columns we don't need, we will subset the dataframe
- Drop Duplicates: Previous API queries have overlaps dates so there are duplicated posts. In practice, the "id" column uniquely identifies a post.
- Drop NaN value: Remove posts where both the title and the body text are empty
  - Note: some posts' body texts are marked as "[removed]" which is effectly NaN
- Concatenate texts: We are interested in both the title and the body text, so we concatenate them

In [0]:
# Subset relevant columns
df = df_raw.select("id","title","selftext")
# df = df_raw.alias("df_copy").select("*")
# filter based on conditions
## drop id duplicates. Practically, id uniquely identifies a reddit post
df = df.dropDuplicates(subset=["id"])
# replace "[removed]" and null value with empty string
df = df.fillna("",subset=["title","selftext"])
df = df.replace("[removed]","","selftext")
## the whole text is not empty string
cond = (col("selftext") != "") | (col("title") != "")
df = df.filter(cond)
# concatenate title and selftext to full text
df = df.withColumn("fulltext", concat_ws('\n', df.title, df.selftext))
df.count()

Out[6]: 13855

In [0]:
df.cache()

Out[7]: DataFrame[id: string, title: string, selftext: string, fulltext: string]

#### Initiate huggingface finbert pre-trained model

In [0]:
tokenizer = AutoTokenizer.from_pretrained("ProsusAI/finbert")
model = AutoModelForSequenceClassification.from_pretrained("ProsusAI/finbert").to("cuda").half()
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

Downloading:   0%|          | 0.00/252 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/758 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/232k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/112 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/438M [00:00<?, ?B/s]

### Methods to create sentiment scores [positive, neutral, negative] for sentences raw text using huggingface/pytorch.

Overall procedure: given a text chunk from a post
1. Split the chunk into a list of paragraphs
2. For each paragraph, tokenize and predict.
  - To prevent RAM issue, we load inputs in batches.
3. The output should be n by 3 tensor. n is the number of paragraphs generated from the text chunk.

In [0]:
# def split_lines(text):
#     """Split long text into paragraphs/lines. Remove all chunks
#     """
#     text_ls = text.split("\n")
#     text_ls = [x.strip() for x in text_ls]
#     text_ls = [x for x in text_ls if x != '']
#     return text_ls
def split_paragraphs(long_text, limit = 500):
    """Split long texts into paragraphs. With upper limit of n tokens, do as few split as possible
    """
    ps = long_text.split('\n')
    ps = [p.strip() for p in ps]
    ps = [p for p in ps if p != ""]
    ps_token = [p.split() for p in ps]
    chunks = []
    current_chunk, current_length = '', 0
    for i in range(len(ps)):
        p_len = len(ps_token[i]) + 1
        if (current_length + p_len) < limit:
            current_chunk += ps[i] + "\n"
            current_length += p_len
        else:
            chunks.append(current_chunk.strip())
            current_chunk, current_length = ps[i], p_len
    chunks.append(current_chunk.strip())
    return chunks

def pred_batch(dataloader):
    """Make prediction in batched way to prevent RAM overload
    argument:
    - dataloader
    return:
    - 2D tensor, in cpu, detached
    """
    outputs = []
    for batch in dataloader:
        with torch.no_grad():
            batch_gpu = {k:v.to("cuda") for k,v in batch.items()}
            logits = model(**batch_gpu).logits
            probs = torch.softmax(logits, dim=1).cpu().detach()
            outputs.append(probs)
    return torch.cat(outputs, dim=0)

def pred_one_post(long_setence, batch_size=16):
    """
    Make prediction to one reddit post
    Prediction is made in batch to prevent RAM overflow
    arguments:
    - long_sentence: concatenated text from one reddit post
    return:
    - 2D tensor
    """
    text_ls = split_paragraphs(long_setence)
    # text_ls = split_lines(long_setence)
    tokenized = tokenizer(text_ls, padding=True, truncation=True, return_tensors="pt")
    ds = Dataset.from_dict(tokenized)
    dataloader = DataLoader(ds, batch_size=batch_size, collate_fn=data_collator)
    probs = pred_batch(dataloader)
    gc.collect()
    torch.cuda.empty_cache()
    return probs
    
# def analysisofoutput(output):
#     argmax = torch.argmax(output,dim = 1)
#     argmax = argmax.detach().numpy()
#     dictionary = {0:'positive',1:'negative',2:'nuetral'}
#     return dictionary[argmax[0]]

##### Test sentiment outputs for one record

In [0]:
pred_one_post(df.select("fulltext").limit(1).collect()[0]["fulltext"])

Out[25]: tensor([[0.0241, 0.9360, 0.0398]], dtype=torch.float16)

#### Create sentiment scores vectors for spark_df

In [0]:
#Create UDF to add sentiment analysis, it should also be able to optimize RAM allocation
## Because the function returns tensor, 
pred_one_post_vec = udf(lambda x: pred_one_post(x).tolist(), ArrayType(ArrayType(FloatType())))

def sentiment_vectorizer(spark_df, large = False): 
    sentiment_df = spark_df.withColumn("vectors", pred_one_post_vec(col("fulltext")))
    return sentiment_df

In [0]:
#test for first a few records
sentiment_vectorizer(df.limit(3)).show()

+-------+--------------------+--------------------+--------------------+--------------------+
|     id|               title|            selftext|            fulltext|             vectors|
+-------+--------------------+--------------------+--------------------+--------------------+
|10swf3l|Wow, Nordstrom's ...|Yo , looks like t...|Wow, Nordstrom's ...|[[0.02406311, 0.9...|
|10te4r7|The best opportunity|                    |The best opportun...|[[0.14160156, 0.0...|
|10ufqon|Why can/can’t the...|I was have a disc...|Why can/can’t the...|[[0.05596924, 0.0...|
+-------+--------------------+--------------------+--------------------+--------------------+



#### create sentiment score vectors for all records and store to mongo for later use

In [0]:
sentiment_df = sentiment_vectorizer(df)
store_df_to_mongo(spark, sentiment_df, "sentiment_raw_reddit_vectors2")

In [0]:
sentiment_df.show(2)

+-------+--------------------+--------------------+--------------------+--------------------+
|     id|               title|            selftext|            fulltext|             vectors|
+-------+--------------------+--------------------+--------------------+--------------------+
|10swf3l|Wow, Nordstrom's ...|Yo , looks like t...|Wow, Nordstrom's ...|[[0.02406311, 0.9...|
|10te4r7|The best opportunity|                    |The best opportun...|[[0.14160156, 0.0...|
+-------+--------------------+--------------------+--------------------+--------------------+
only showing top 2 rows

