In [1]:
import findspark
findspark.init("/home/vbhamidipati1/spark")
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from textblob import TextBlob

In [2]:
spark = SparkSession \
        .builder \
        .appName("Count Number of tweets based on state") \
        .getOrCreate()

In [3]:
# Read the CSV as a pandas dataframe here
commentsDF = spark.read.option("delimiter", "\t").load('/home/vbhamidipati1/spark/workspace/Project_YouTube_Sentiment_Analysis/Data/preprocessed_comments.csv', 
                      format='com.databricks.spark.csv', 
                      header='true', 
                      inferSchema='true').drop('_c0')

In [4]:
commentsDF.show()

+-----------+--------------------+-----+-------+
|   video_id|        comment_text|likes|replies|
+-----------+--------------------+-----+-------+
|XpVt6Z1Gjjo|          logan paul|    4|      0|
|XpVt6Z1Gjjo|been following fr...|    3|      0|
|XpVt6Z1Gjjo|       kong maverick|    3|      0|
|XpVt6Z1Gjjo|          attendance|    3|      0|
|XpVt6Z1Gjjo|            trending|    3|      0|
|XpVt6Z1Gjjo|   trending ayyeeeee|    3|      0|
|XpVt6Z1Gjjo|              though|    4|      0|
|XpVt6Z1Gjjo|            trending|    3|      0|
|XpVt6Z1Gjjo|happy year vlogav...|    3|      0|
|XpVt6Z1Gjjo|your shit brother...|    0|      0|
|XpVt6Z1Gjjo|there should mini...|    0|      0|
|XpVt6Z1Gjjo|dear logan really...|    0|      0|
|XpVt6Z1Gjjo|honestly evan ann...|    0|      0|
|XpVt6Z1Gjjo|casey still bette...|    0|      0|
|XpVt6Z1Gjjo|geez rick this fa...|    0|      0|
|XpVt6Z1Gjjo|   happy cause movie|    0|      0|
|XpVt6Z1Gjjo|ayyyyoooo logang ...|    1|      0|
|XpVt6Z1Gjjo|didnt g

In [5]:
comments_with_polarity = commentsDF.withColumn("pol", lit(0))
comments_with_polarity.show()

+-----------+--------------------+-----+-------+---+
|   video_id|        comment_text|likes|replies|pol|
+-----------+--------------------+-----+-------+---+
|XpVt6Z1Gjjo|          logan paul|    4|      0|  0|
|XpVt6Z1Gjjo|been following fr...|    3|      0|  0|
|XpVt6Z1Gjjo|       kong maverick|    3|      0|  0|
|XpVt6Z1Gjjo|          attendance|    3|      0|  0|
|XpVt6Z1Gjjo|            trending|    3|      0|  0|
|XpVt6Z1Gjjo|   trending ayyeeeee|    3|      0|  0|
|XpVt6Z1Gjjo|              though|    4|      0|  0|
|XpVt6Z1Gjjo|            trending|    3|      0|  0|
|XpVt6Z1Gjjo|happy year vlogav...|    3|      0|  0|
|XpVt6Z1Gjjo|your shit brother...|    0|      0|  0|
|XpVt6Z1Gjjo|there should mini...|    0|      0|  0|
|XpVt6Z1Gjjo|dear logan really...|    0|      0|  0|
|XpVt6Z1Gjjo|honestly evan ann...|    0|      0|  0|
|XpVt6Z1Gjjo|casey still bette...|    0|      0|  0|
|XpVt6Z1Gjjo|geez rick this fa...|    0|      0|  0|
|XpVt6Z1Gjjo|   happy cause movie|    0|      

In [6]:
def polarityAnalysis(video_id, comments, likes, replies, polarity):
    try:
        analysis =TextBlob(comments)
        polarity = analysis.sentiment.polarity
        if polarity < 0:
            polarity = -1
        elif polarity > 0:
            polarity = 1
        else:
            polarity = 0
    except:
        polarity = 0
    yield (video_id, comments, likes, replies, polarity)

In [7]:
Adjusted_polarity = comments_with_polarity.rdd.flatMap(lambda line : polarityAnalysis(line[0], line[1], line[2], line[3], line[4]))
Adjusted_polarity.persist()

PythonRDD[24] at RDD at PythonRDD.scala:53

In [8]:
Adjusted_polarity.collect()

[('XpVt6Z1Gjjo', 'logan paul', 4, 0, 0),
 ('XpVt6Z1Gjjo',
  'been following from start your vine channel have seen vlogs',
  3,
  0,
  0),
 ('XpVt6Z1Gjjo', 'kong maverick', 3, 0, 0),
 ('XpVt6Z1Gjjo', 'attendance', 3, 0, 0),
 ('XpVt6Z1Gjjo', 'trending', 3, 0, 0),
 ('XpVt6Z1Gjjo', 'trending ayyeeeee', 3, 0, 0),
 ('XpVt6Z1Gjjo', 'though', 4, 0, 0),
 ('XpVt6Z1Gjjo', 'trending', 3, 0, 0),
 ('XpVt6Z1Gjjo', 'happy year vlogaversary', 3, 0, 1),
 ('XpVt6Z1Gjjo',
  'your shit brother have single handedly ruined youtube thanks',
  0,
  0,
  -1),
 ('XpVt6Z1Gjjo', 'there should mini logan paul', 0, 0, 0),
 ('XpVt6Z1Gjjo',
  'dear logan really wanna your merch have money even have would really make have your merch',
  0,
  0,
  1),
 ('XpVt6Z1Gjjo',
  'honestly evan annoying like funny watching famous trying hard like',
  0,
  0,
  1),
 ('XpVt6Z1Gjjo', 'casey still better then logan', 0, 0, 1),
 ('XpVt6Z1Gjjo', 'geez rick this face youtube', 0, 0, 0),
 ('XpVt6Z1Gjjo', 'happy cause movie', 0, 0, 1),
 

In [9]:
polarity_scores_DF = Adjusted_polarity.toDF(schema=['video_id', 'comment_text', 'likes', 'replies', 'polarity'])
polarity_scores_DF.createGlobalTempView("polarity_scores_DF")

In [10]:
polarity_scores_DF.show()

+-----------+--------------------+-----+-------+--------+
|   video_id|        comment_text|likes|replies|polarity|
+-----------+--------------------+-----+-------+--------+
|XpVt6Z1Gjjo|          logan paul|    4|      0|       0|
|XpVt6Z1Gjjo|been following fr...|    3|      0|       0|
|XpVt6Z1Gjjo|       kong maverick|    3|      0|       0|
|XpVt6Z1Gjjo|          attendance|    3|      0|       0|
|XpVt6Z1Gjjo|            trending|    3|      0|       0|
|XpVt6Z1Gjjo|   trending ayyeeeee|    3|      0|       0|
|XpVt6Z1Gjjo|              though|    4|      0|       0|
|XpVt6Z1Gjjo|            trending|    3|      0|       0|
|XpVt6Z1Gjjo|happy year vlogav...|    3|      0|       1|
|XpVt6Z1Gjjo|your shit brother...|    0|      0|      -1|
|XpVt6Z1Gjjo|there should mini...|    0|      0|       0|
|XpVt6Z1Gjjo|dear logan really...|    0|      0|       1|
|XpVt6Z1Gjjo|honestly evan ann...|    0|      0|       1|
|XpVt6Z1Gjjo|casey still bette...|    0|      0|       1|
|XpVt6Z1Gjjo|g

In [11]:
sentimentScores = spark.sql("SELECT video_id, count(case polarity when '1' then 1 else null end) as positive_comments, count(case polarity when '0' then 1 else null end) as neutral_comments, count(case polarity when '-1' then 1 else null end) as negative_comments  FROM global_temp.polarity_scores_DF GROUP BY video_id")
sentimentScores.createGlobalTempView("sentimentScores")

In [12]:
sentimentScores.show()

+-----------+-----------------+----------------+-----------------+
|   video_id|positive_comments|neutral_comments|negative_comments|
+-----------+-----------------+----------------+-----------------+
|xPS7bqBePSs|              177|             117|                6|
|dInwVhRtN4E|              171|             330|              198|
|rn5Xgak1zzA|               67|              69|               64|
|TzyraAp3jaY|               85|             139|               26|
|eHq6ZA6uKOg|               63|              71|               37|
|_r5eTelhpmQ|              160|             114|               15|
|JkqTeQHFoBY|               64|              50|               20|
|Bo-qp-Zu0OY|               11|              10|                1|
|K7pQsR8WFSo|              181|             161|               58|
|g_ekn1gjBq0|               25|               5|                1|
|4yCkkOvIkUI|                6|              11|                2|
|7TN09IP5JuI|              207|             253|              

In [13]:
videosDF = spark.read.load('/home/vbhamidipati1/spark/workspace/Project_YouTube_Sentiment_Analysis/Data/USvideos.csv', 
                      format='com.databricks.spark.csv', 
                      header='true', 
                      inferSchema='true')
videosDF.createGlobalTempView("videosDF")

In [14]:
videosDF.show()

+-----------+--------------------+--------------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|   video_id|               title|       channel_title|category_id|                tags|  views| likes|dislikes|comment_total|      thumbnail_link| date|
+-----------+--------------------+--------------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|XpVt6Z1Gjjo|1 YEAR OF VLOGGIN...|    Logan Paul Vlogs|         24|logan paul vlog|l...|4394029|320053|    5931|        46245|https://i.ytimg.c...|13.09|
|K4wEI5zhHB0|iPhone X — Introd...|               Apple|         28|Apple|iPhone 10|i...|7860119|185853|   26679|            0|https://i.ytimg.c...|13.09|
|cLdxuaxaQwc|         My Response|           PewDiePie|         22|              [none]|5845909|576597|   39774|       170708|https://i.ytimg.c...|13.09|
|WYYvHb03Eog|Apple iPhone X fi...|           The Verge|         28|apple iph

In [15]:
videoScores = spark.sql("SELECT video_id, sum(views) as total_views, sum(likes) as total_likes, sum(dislikes) as total_dislikes FROM global_temp.videosDF GROUP BY video_id")
videoScores.createGlobalTempView("videoScores")
videoScores.show()

+-----------+-----------+-----------+--------------+
|   video_id|total_views|total_likes|total_dislikes|
+-----------+-----------+-----------+--------------+
|xPS7bqBePSs|      76775|       5682|            37|
|dInwVhRtN4E|   15660374|     392718|         17901|
|rn5Xgak1zzA|     529514|      40548|           952|
|TzyraAp3jaY|     228909|       2644|             4|
|eHq6ZA6uKOg|     135189|       1156|           374|
|_r5eTelhpmQ|     182342|       7712|           191|
|JkqTeQHFoBY|     431082|       4694|            43|
|Bo-qp-Zu0OY|      21654|        179|             1|
|K7pQsR8WFSo|    2300663|     110553|          1060|
|g_ekn1gjBq0|      16427|        395|            17|
|4yCkkOvIkUI|      12905|         59|            43|
|7TN09IP5JuI|   20785005|     869092|          8855|
|RE-far-FvRs|    1355210|      63437|          1225|
|WQjO1mMCPg4|   10417128|     307533|         29960|
|aRgTLb5EbiQ|      72810|       9292|            48|
|xNddRhpx5tA|    4393208|     109168|         

In [16]:
sentimentsJoin = sentimentScores.join(videoScores, sentimentScores.video_id == videoScores.video_id).select(sentimentScores['*'],videoScores['total_views'], videoScores['total_likes'], videoScores['total_dislikes'])
sentimentsJoin.show()

+-----------+-----------------+----------------+-----------------+-----------+-----------+--------------+
|   video_id|positive_comments|neutral_comments|negative_comments|total_views|total_likes|total_dislikes|
+-----------+-----------------+----------------+-----------------+-----------+-----------+--------------+
|xPS7bqBePSs|              177|             117|                6|      76775|       5682|            37|
|dInwVhRtN4E|              171|             330|              198|   15660374|     392718|         17901|
|rn5Xgak1zzA|               67|              69|               64|     529514|      40548|           952|
|TzyraAp3jaY|               85|             139|               26|     228909|       2644|             4|
|eHq6ZA6uKOg|               63|              71|               37|     135189|       1156|           374|
|_r5eTelhpmQ|              160|             114|               15|     182342|       7712|           191|
|JkqTeQHFoBY|               64|              5

In [18]:
sentimentsJoin.repartition(1).write.csv(path="./Data/sentiment_scores")

In [19]:
spark.stop()