In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession \
        .builder \
        .appName('Wrangling Data') \
        .getOrCreate()

In [5]:
author_df = spark.read.csv('authors.csv')
author_df.createOrReplaceTempView('authors')

author_table = spark.sql("""
select authors._c0 author_id,
        authors._c1 author,
        authors._c2 meibi,
        authors._c3 meibix
from authors
""")
author_table.write.parquet('temp_file/author.parquet')

In [6]:
word_count_table = spark.sql("""
select authors._c1 author,
       authors._c0 author_id,
       authors._c4 word_count_stopwords,
       authors._c5 word_count_nostopwords
from authors
""")
word_count_table.write.parquet('temp_file/word_count.parquet')

In [7]:
post_df = spark.read.csv('posts.csv')
post_df.createOrReplaceTempView('posts')

post_table = spark.sql("""
select posts._c0 post_id,
       posts._c1 title,
       posts._c2 blogger_name,
       posts._c3 blogger_id,
       posts._c4 number_of_comments,
       posts._c5 content,
       posts._c6 url,
       posts._c7 date,
       posts._c8 number_of_retrieved_comments
from posts
""")
post_table.write.parquet('temp_file/posts.parquet')

In [8]:
comment_df = spark.read.csv('comments.csv')
comment_df.createOrReplaceTempView('comments')
comment_table = spark.sql("""
select comments._c0 comment_id,
       comments._c1 post_id,
       comments._c2 content,
       comments._c3 aurthor,
       comments._c4 date,
       comments._c5 vote
from comments
""")
comment_table.write.parquet('temp_file/comments.parquet')

In [9]:
comment_df = spark.read.csv('comments.csv')
author_df = spark.read.csv('authors.csv')
comment_df.createOrReplaceTempView('comments')
author_df.createOrReplaceTempView('authors')


review_staging_df = spark.sql("""
select authors._c1 author,
        comments._c3 author_id,
        comments._c1 post_id,
        comments._c0 comment_id,
        comments._c4 date,
        lower(comments._c2) content
from comments
join authors on authors._c0 = comments._c3
where comments._c3 is not null and comments._c2 is not null
""")

# Build the sentiment analyzer
import nltk
nltk.download('vader_lexicon')
from nltk.sentiment.vader import SentimentIntensityAnalyzer
analyser = SentimentIntensityAnalyzer()
import pyspark.sql.functions as F
from pyspark.sql.types import *
import re


def get_sentiment_analysis_score(sentence):
    score = analyser.polarity_scores(sentence)
    return score['compound']

def get_sentiment_analysis_result(score):
    if score >= 0.05:
        return "POSITIVE"
    elif score <= -0.05:
        return "NEGATIVE"
    else:
        return "NEUTRAL"
    
get_sentiment_analysis_score_udf = F.udf(lambda x: get_sentiment_analysis_score(x), DoubleType())
get_sentiment_analysis_result_udf = F.udf(lambda x: get_sentiment_analysis_result(x), StringType())


# Load review data and write a parquet
comment_review_table_df = review_staging_df \
    .withColumn("sa_score", get_sentiment_analysis_score_udf("content")) \
    .withColumn("sentiment", get_sentiment_analysis_result_udf("sa_score")) \
    .select(
        "author_id",
        "author",
        "post_id",
        "comment_id",
        "date",
        "content",
        "sentiment")
    
comment_review_table_df.write.parquet('temp_file/comment_review_table_df.parquet')

[nltk_data] Downloading package vader_lexicon to
[nltk_data]     /home/godwin/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!
