Notebook used for data preprocessing. Takes in data from S3 bucket of Twiter, Reddit, and Viewership and aggregates them into a dataframe to clean up and perform feature engineering.

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from datetime import datetime

In [0]:
import time
# Time efficiency
start = time.time()

In [0]:
# spark = SparkSession.builder.appName("rogan_data_preprocess").getOrCreate()
spark = SparkSession.builder.config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1').appName("rogan_app").getOrCreate()

In [0]:
sc = spark.sparkContext

In [0]:
def IntegerSafe(value): # In case there are non-integer type to be converted.
    try:
        return int(value)
    except:
        return None

In [0]:
def UTCSafe(value):
    try:
        return datetime.utcfromtimestamp(int(value))
    except:
        return None

In [0]:
def DateSafe_v2(value):
    try:
        # Only grab year month day
        return datetime.strptime(value, "%Y-%m-%d")
    except:
        return None

In [0]:
def DateSafe(value):
    try:
        # Only grab year month day
        value = value.split(" ")[0]
        return datetime.strptime(value, "%Y-%m-%d")
    except:
        return None

# Add configuration for accessing S3

In [0]:
# spark._jsc.hadoopConfiguration().set('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1')

In [0]:
aws_access_key = ACCESS_KEY
aws_secret_key = SECRET_KEY
spark._jsc.hadoopConfiguration().set(
    "spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.1"
)
spark._jsc.hadoopConfiguration().set(
    "spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"
)
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", aws_access_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", aws_secret_key)


# Data Pre-processing:
### For simplicity, read files from S3 and join them to create an aggregate for this example.

In [0]:
twitter_uri = TWITTER_URI
twitter_json_uri = TWITTER_JSON_URI
reddit_uri = REDDIT_URI

## Create twitter dataframe

In [0]:
twitter_schema = StructType(
    [
        StructField("date", DateType(), False),
        StructField("text", StringType(), False),
        StructField("likes", IntegerType(), False)
    ]
)

In [0]:
tweet_df = spark.read.csv(twitter_uri, header = True)

In [0]:
tweet_rdd = tweet_df.rdd

In [0]:
tweet_rdd = tweet_rdd.map(lambda x: [DateSafe(x[0]), x[1], IntegerSafe(x[2])])

In [0]:
tweet_rdd = tweet_rdd.filter(lambda x: x[0] is not None and x[1] is not None and x[2] is not None)

In [0]:
tweet_rdd.count()

In [0]:
tweet_df = spark.createDataFrame(tweet_rdd, twitter_schema)

In [0]:
tweet_df.printSchema()

In [0]:
tweet_df.cache()

In [0]:
tweet_df.show()

In [0]:
tweet_df.count()

## Create reddit dataframe

In [0]:
reddit_schema = StructType(
    [StructField("date", DateType(), True), StructField("text", StringType(), True)]
)

In [0]:
# Takes about half a second to read in 72k reddit comments
reddit_comments = (
    sc.textFile(reddit_uri).map(lambda x: x.split("|||||")).flatMap(lambda x: x)#.map(lambda x: x.split(",")) # Inconsistent |||||
)
header = reddit_comments.first()
reddit_comments = reddit_comments.filter(lambda row: row != header)


In [0]:
# Drop empty rows and keep rows with utc timestamp, and convert utc timestamp to int
reddit_comments_filter = reddit_comments.filter(lambda x: len(x) > 0).filter(lambda x: x[0][0] in '1234567890').map(lambda x: x.split(",", 1)).filter(lambda x: len(x) == 2).map(lambda x: (IntegerSafe(x[0]), x[1]))

In [0]:
reddit_comments_filter.first()

In [0]:
reddit_comments.cache()

In [0]:
reddit_comments.count()
# 72629 reddit comments

In [0]:
reddit_comments_filter.count()
# 47925

In [0]:
# Convert UTC timestamp string to datetime
reddit_comments_final = reddit_comments_filter.map(lambda x: (UTCSafe(x[0]), x[1]))

In [0]:
reddit_df = spark.createDataFrame(reddit_comments_final, reddit_schema)

In [0]:
reddit_df = reddit_df.withColumn("likes", F.lit(-1))

In [0]:
reddit_df.cache()

In [0]:
reddit_df.show()

# Create an aggregate

In [0]:
# Takes about 2 seconds to create reddit dateframe
reddit_df.count()
# 47925

In [0]:
reddit_df.na.drop().count()
# 47578

In [0]:
# Aggregate data by joining rows together
joined_df = tweet_df.union(reddit_df)


In [0]:
joined_df.show(5)

In [0]:
# Takes about 4 seconds to create aggregate
joined_df.count()
# 2258990

# Get views df

In [0]:
viewer_uri = VIEWS_URI
viewer_schema = StructType(
    [
        StructField("view",LongType(), True),
        StructField("views_gained", IntegerType(), True),
        StructField("week", DateType(), True)
    ]
)

In [0]:
views_df = spark.read.csv(viewer_uri)
views_rdd = views_df.rdd

In [0]:
views_rdd = views_rdd.map(lambda row: (IntegerSafe(row[0]), IntegerSafe(row[1]), DateSafe_v2(row[2])))

In [0]:
views_rdd.cache()

In [0]:
views_df = spark.createDataFrame(views_rdd, schema = viewer_schema)

In [0]:
views_df.cache()

In [0]:
views_df.count()

In [0]:
views_df.na.drop().count()

In [0]:
views_df = views_df.drop("view").dropna()

In [0]:
views_df.count()

In [0]:
views_df.show(10)

In [0]:
joined_df = joined_df.withColumn("week",F.date_sub(F.next_day(F.col("date"),"sunday"),6))

In [0]:
views_joined_df = views_df.join(joined_df, 'week', 'inner').orderBy('date')

In [0]:
views_joined_df.count()

In [0]:
views_joined_df.show(10)

# Feature Engineering

In [0]:
from transformers import pipeline
classifier = pipeline("text-classification",model='bhadresh-savani/roberta-base-emotion')

In [0]:
database = DATABASE
collection = COLLECTION
user_name = USER_NAME
password = PASSWORD
address = ADDRESS
connection_string = (
    f"mongodb+srv://{user_name}:{password}@{address}/{database}.{collection}"
)


In [0]:
get_text_length = udf(lambda x: len(x), IntegerType())

final_df_1 = views_joined_df.withColumn('text_length', get_text_length('text'))

In [0]:
final_df_1.count()

In [0]:
final_df_1.show()

In [0]:
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

def get_hf_emot_label(x):
    try:
        return classifier(x)[0]['label']
    except:
        return None

def get_hf_emot_score(x):
    try:
        return classifier(x)[0]['score']
    except:
        return None

def get_vader_sent_score(x):
    try:
        analyzer = SentimentIntensityAnalyzer()
        vs = analyzer.polarity_scores(x)
        sentiment = vs['compound']
        return sentiment
    except:
        return None
    
def get_hf_sent_label(x):
    try:
        return sentiment_analysis(x)[0]['label']
    except:
        return None
    
def get_hf_sent_score(x):
    try:
        return sentiment_analysis(x)[0]['score']
    except:
        return None
    
def get_hf_sent_score(x):
    try:
        return sentiment_analysis(x)[0]['score']
    except:
        return None


check_hf_emot_label = udf(get_hf_emot_label, StringType())
check_hf_emot_score = udf(get_hf_emot_score, FloatType())
check_vader_sent_score = udf(get_vader_sent_score, FloatType())
check_hf_sent_label = udf(get_hf_sent_label, StringType())
check_hf_sent_score = udf(get_hf_sent_score, FloatType())




# Test udfs one at a time
1. **get_text_length works**
2. check_hf_emot_label
3. check_hf_emot_score
4. check_vader_sent_score
5. check_hf_sent_label
6. check_hf_sent_score

## check_hf_emot_label

In [0]:
final_df_2 = final_df_1.withColumn('hf_emot_label', check_hf_emot_label('text'))

In [0]:
final_df_2.cache()
print(final_df_2.count())

In [0]:
final_df_2.show()

## check_hf_emot_score

In [0]:
final_df_3 = final_df_2.withColumn('hf_emot_score', check_hf_emot_score('text'))
final_df_3.cache()
print(final_df_3.count())

## check_vader_sent_score

In [0]:
final_df_4 = final_df_3.withColumn('vader_sentiment_score', check_vader_sent_score('text'))
final_df_4.cache()
print(final_df_4.count())

## check_hf_sent_label

In [0]:
final_df_5 = final_df_4.withColumn('hf_sentiment_label', check_hf_sent_label('text'))
final_df_5.cache()
print(final_df_5.count())

## check_hf_sent_score

In [0]:
final_df_6 = final_df_5.withColumn('hf_sentiment_score', check_hf_sent_score('text'))
final_df_6.cache()
print(final_df_6.count())

In [0]:
final_df_6.show(truncate=False)

## Write features to mongodb

In [0]:
view_joined_df.write.format("mongo").option("uri", connection_string).mode("ErrorIfExists").save()

In [0]:
df = spark.read.format("mongo").option("uri", connection_string).load()
df.cache()
df.show()

# Connect to MongoDB
## Store aggregates in the database and re-read for machine learning later

In [0]:
# database = DATABASE
# collection = COLLECTION
# user_name = USER_NAME
# password = PASSWORD
# address = ADDRESS
# connection_string = (
#     f"mongodb+srv://{user_name}:{password}@{address}/{database}.{collection}"
# )


In [0]:
# connection_string

In [0]:
# views_joined_df.printSchema()

In [0]:
# views_joined_df.write.format("mongo").option("uri", connection_string).mode("ErrorIfExists").save()

In [0]:
# df = spark.read.format("mongo").option("uri", connection_string).load()


In [0]:
# df.show()


In [0]:
end = time.time()
print(f"{end - start:.3f} seconds to run pre-processing algorithms")

In [0]:
spark.stop()