In [4]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install -q pyspark

import os
os.environ['PYTHONHASHSEED']="0"
os.environ["PYSPARK_PYTHON"]="python3"
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64/"
# A few additional libraries we will need
from math import sqrt

import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import *

try:
  conf = SparkConf().setMaster("local[*]").set("spark.executor.memory", "1g").set("spark.executorEnv.PYTHONHASHSEED","0").set("spark.ui.port", "4050")
  sc = SparkContext(conf = conf)
  spark = SparkSession.builder.getOrCreate()
except ValueError:
  #it's ok if the server is already started
  pass

def dbg(x):
  """ A helper function to print debugging information on RDDs """
  if isinstance(x, pyspark.RDD):
    print([(t[0], list(t[1]) if 
            isinstance(t[1], pyspark.resultiterable.ResultIterable) else t[1])
           if isinstance(t, tuple) else t
           for t in x.take(100)])
  else:
    print(x)
    

import unittest
Test = unittest.TestCase()

In [5]:
## FROM START TO FINISH , WITH TIMER
import time

time_start = time.time()

import tweepy
from textblob import TextBlob
import pandas as pd
import re
import math

# KEYS
api_key = ''#kept hidden
api_key_secret = '' #kept hidden
access_token = '' #kept hidden
access_token_secret = '' #kept hidden

# AUTHENTICATING API
authenticate = tweepy.OAuthHandler(api_key, api_key_secret)
authenticate.set_access_token(access_token, access_token_secret)
api = tweepy.API(authenticate, wait_on_rate_limit=True)

# HELPER CODES

def remove_emoji(string):
    emoji_pattern = re.compile("["
                               u"\U0001F600-\U0001F64F"  # emoticons
                               u"\U0001F300-\U0001F5FF"  # symbols & pictographs
                               u"\U0001F680-\U0001F6FF"  # transport & map symbols
                               u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
                               u"\U00002500-\U00002BEF"  # chinese char
                               u"\U00002702-\U000027B0"
                               u"\U00002702-\U000027B0"
                               u"\U000024C2-\U0001F251"
                               u"\U0001f926-\U0001f937"
                               u"\U00010000-\U0010ffff"
                               u"\u2640-\u2642"
                               u"\u2600-\u2B55"
                               u"\u200d"
                               u"\u23cf"
                               u"\u23e9"
                               u"\u231a"
                               u"\ufe0f"  # dingbats
                               u"\u3030"
                               "]+", flags=re.UNICODE)
    return emoji_pattern.sub(r'', string)

def clean_twts(tweet):

  #tweet = re.sub(r'@[A-Za-z0-9]', '', tweet)
  #tweet = re.sub(r'#', '', tweet)
  tweet = re.sub(r'RT[\s]+', '', tweet)
  tweet = re.sub(r'http://\S+|https://\S+', '', tweet)
  tweet = re.sub(r'\n', '', tweet)
  tweet = re.sub(r'\'', '', tweet)
  tweet = re.sub(r'  ', '', tweet)

  return tweet

def polarity_twts(tweet):
  return TextBlob(tweet).sentiment.polarity

def pos_scores(sentiment):
  positive = 0
  if sentiment[0][1] > 0:
    positive += 1

  return positive

def neg_scores(sentiment):
  negative = 0
  if sentiment[0][1] < 0:
    negative += 1

  return negative

#APPLEMUSIC TWEETS
limit = 100
apple100 = tweepy.Cursor(api.user_timeline, screen_name= "AppleMusic", max_id = 1532237700144812032, count=100, lang="en", tweet_mode = "extended").items(limit)
apple100 = sc.parallelize(apple100)

max_id = apple100.map(lambda tweet: tweet.id)
dbg(max_id.take(1)) # will be used as timestamp so will only return tweets that are less than the max_id

apple100 = apple100.map(lambda x: x.full_text)
dbg(apple100.take(5)) 

# CLEANING THE DATA
apple100_clean = apple100.map(clean_twts).map(remove_emoji)
dbg(apple100_clean.take(10))

#SENTIMENT ANALYSIS
apple100_sent = apple100_clean.map(polarity_twts)
dbg(apple100_sent.take(5))

# SOME VISUALISATION
apple100_cleanzip = apple100_clean.zipWithIndex().map(lambda x: (x[1],x[0]))
apple100_sentzip = apple100_sent.zipWithIndex().map(lambda x: (x[1],x[0]))
apple_combi = apple100_cleanzip.join(apple100_sentzip)
apple_final = apple_combi.groupByKey().mapValues(list)
apple_final_df = apple_final.flatMap(lambda x: x[1]).collect()

apple_df = pd.DataFrame(apple_final_df, columns=['Tweets', 'Sentiment'])
print(apple_df)

# HIGHEST AND LOWEST SENTIMENT
highest_apple = apple_final.flatMap(lambda x: x[1]).sortBy(lambda x: x[1], False)
lowest_apple = apple_final.flatMap(lambda x: x[1]).sortBy(lambda x: x[1])
dbg(highest_apple.take(1))
dbg(lowest_apple.take(1))

num_pos_twt_app = apple_final.map(lambda x: x[1]).map(pos_scores).sum()
print(num_pos_twt_app)

num_neg_twt_app = apple_final.map(lambda x: x[1]).map(neg_scores).sum()
print(num_neg_twt_app)

# SPOTIFY TWEETS
limit = 100
spotify100 = tweepy.Cursor(api.user_timeline, screen_name= "Spotify", max_id = 1532098361523838976, count=100, lang="en", tweet_mode = "extended").items(limit)
spotify100 = sc.parallelize(spotify100)

max_id = spotify100.map(lambda tweet: tweet.id)
dbg(max_id.take(1)) # will be used as timestamp so will only return tweets that are less than the max_id

spotify100 = spotify100.map(lambda x: x.full_text)
dbg(spotify100.take(5)) 

# CLEANING THE DATA
spotify100_clean = spotify100.map(clean_twts).map(remove_emoji)
dbg(spotify100_clean.take(10))

#SENTIMENT ANALYSIS
spotify100_sent = spotify100_clean.map(polarity_twts)
dbg(spotify100_sent.take(5))

# SOME VISUALISATION
spotify100_cleanzip = spotify100_clean.zipWithIndex().map(lambda x: (x[1],x[0]))
spotify100_sentzip = spotify100_sent.zipWithIndex().map(lambda x: (x[1],x[0]))
spotify_combi = spotify100_cleanzip.join(spotify100_sentzip)
spotify_final = spotify_combi.groupByKey().mapValues(list)
spotify_final_df = spotify_final.flatMap(lambda x: x[1]).collect()

spotify_df = pd.DataFrame(spotify_final_df, columns=['Tweets', 'Sentiment'])
print(spotify_df)

num_pos_twt_spot = spotify_final.map(lambda x: x[1]).map(pos_scores).sum()
print(num_pos_twt_spot)

num_neg_twt_spot = spotify_final.map(lambda x: x[1]).map(neg_scores).sum()
print(num_neg_twt_spot)


# COSINE SIMILARITY 

def cosine_similarity(a,b):
    suma = 0
    sumb = 0
    total_sum = 0
    
    for i,j in zip(a,b):
        suma += i*i
        sumb += j*j
        total_sum += i*j
    
    cos_similarity = total_sum / ((sqrt(suma)) * (sqrt(sumb)))
    return cos_similarity

spotify100_senti = spotify100_sent.collect()
apple100_senti = apple100_sent.collect()

print(cosine_similarity(spotify100_senti, apple100_senti))


# TF of Apple Music
Tf_apple = apple100_clean.flatMap(lambda line: [(word.lower(), 1) for word in line.split(" ")])
Tf_apple = Tf_apple.reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], False) 
dbg(Tf_apple.take(50))

max_count = Tf_apple.map(lambda x: x[1]).max()

TFij_apple = Tf_apple.map(lambda x: (x[0], x[1]/max_count))
dbg(TFij_apple.collect())

# TF of Spotify

Tf_spotify = spotify100_clean.flatMap(lambda line: [(word.lower(), 1) for word in line.split(" ")])
Tf_spotify = Tf_spotify.reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], False) 
dbg(Tf_spotify.take(50))

max_count = Tf_spotify.map(lambda x: x[1]).max()

TFij_spotify = Tf_spotify.map(lambda x: (x[0], x[1]/max_count))
dbg(TFij_spotify.collect())

#IDF for AppleMusic and Spotify

import math

TF_spotify_distinct = spotify100_clean.flatMap(lambda line: [(word.lower(), 1) for word in line.split(" ")]).distinct()
TF_apple_distinct = apple100_clean.flatMap(lambda line: [(word.lower(), 1) for word in line.split(" ")]).distinct()

terms = sc.union([TF_spotify_distinct, TF_apple_distinct]).reduceByKey(lambda a,b: a+b)

IDFi = terms.map(lambda x: (x[0], math.log2(2/x[1]))).sortBy(lambda x: x[1], False)

dbg(IDFi.collect())

#TF-IDF OF APPLEMUSIC
TF_IDF_apple100 = TFij_apple.join(IDFi).map(lambda x: (x[0], (x[1][0]*x[1][1]))).sortBy(lambda x: x[1], False)
dbg(TF_IDF_apple100.take(10))

#TF-IDF OF SPOTIFY
TF_IDF_spotify100 = TFij_spotify.join(IDFi).map(lambda x: (x[0], (x[1][0]*x[1][1]))).sortBy(lambda x: x[1], False)
dbg(TF_IDF_spotify100.take(10))


time_end = time.time()
print("elapsed time is %s" % str(time_end-time_start))

[1532237700144812032]
['New heat from @antiupmusic, the supergroup formed by @chrislake x @lorenzosbeats. 🔥\n\nListen to #Chromatic on #danceXL. https://t.co/C4S70sRtfZ https://t.co/oH92l3w7M1', '#FacetheSun by #SEVENTEEN (@pledis_17) is out now. ☀️\n\nThey talk to @BrookeReese on The Chart Show about recording a song in English for the first time, reveal their favorite tracks from the album, and share their dream collaborators. https://t.co/oTu2Jx5lg9 https://t.co/K9Hq7Tls0q', 'Celebrating the future of Black artistry with the rising stars shaping music now and in the years to come.\n\nGet yourself acquainted with them throughout #BlackMusicMonth. https://t.co/QT7KkWGbP7 https://t.co/rk0RrdtXfP', '"Virginia (Wind In The Night)" by @headandtheheart leads #ALTCTRL. 💙 https://t.co/pA2fWdHRN9 https://t.co/sglqIiswAz', 'The next chapter for @PostMalone.\n\nHe sits down with @zanelowe for a conversation about his new album, #TwelveCaratToothache, plus his feelings on social media and father