In [None]:
# Untar the Spark installer
!tar -xvf spark-2.4.4-bin-hadoop2.7.tgz
# Checking the Spark folder after untar
!ls 
# Installing findspark - a python library to find Spark
!pip install -q findspark
# Setting environment variables: Setting Java and Spark home based on the location where they are stored
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"
# Creating a local Spark session
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

import pandas as pd

# Read CSV file with error handling
# Read CSV file with error handling for newer pandas versions
amazon_df = pd.read_csv("/content/Amazon_Responded_Oct05.csv", on_bad_lines='skip')  
# or for older versions
# amazon_df = pd.read_csv("/content/Amazon_Responded_Oct05.csv")
amazon_df.head()
amazon_df.shape
# Dropping all null rows
amazon_df.dropna(how="all", inplace=True)
amazon_df.shape
# Replacing carriage return and new line characters with a space
amazon_df = amazon_df.replace({r'\r\n': ' '}, regex=True)
amazon_df.head()
# Converting Pandas Dataframe into Spark Dataframe
amazon_df = amazon_df.astype(str) # Converting pandas df to string first
amazon_sdf = spark.createDataFrame(amazon_df)
amazon_sdf.show(10, False) # False allows us to show entire content of the columns
# Columns in df
amazon_sdf.columns
# Schema: Datatypes associated with columns
amazon_sdf.printSchema()
# Total number of rows
amazon_sdf.count()
# Extracting columns 'user_id_str', 'user_followers_count', and 'text_'
amazon_sub_df = amazon_sdf.select(amazon_sdf.user_id_str, amazon_sdf.user_followers_count.cast('int').alias('user_followers_count'), amazon_sdf.text_)
amazon_sub_df.show(20, False)
# Checking datatype of columns
amazon_sub_df.printSchema()
# Number of distinct users 
import pyspark.sql.functions as f
amazon_sub_df.select(f.countDistinct("user_id_str")).show()
# Checking number of rows (tweets) for a particular user
amazon_sub_df.filter(amazon_sub_df.user_id_str == '85741735.0').count()
# Removing duplicate records by keeping just the maximum number of followers for any user
# Step 1: First let us get the max followers for every user
import pyspark.sql.functions as f
maxf = amazon_sub_df.groupBy("user_id_str").agg(f.max("user_followers_count").alias("max")).alias("maxf")
maxf.show()
# Total number of rows
maxf.count()
# Step 2: Let us now join this with the original df (amazon_sub_df) to get all the rows of users which match their max follower count obtained in 'maxf'
from pyspark.sql.functions import col 
amazon_sub_df = amazon_sub_df.alias("amazon_sub_df") # defining alias for original df
amazon_sub_df2 = amazon_sub_df.join(maxf, (col("user_followers_count") == col("max")) & 
                                    (col("amazon_sub_df.user_id_str") == col("maxf.user_id_str"))).select(
                                     col("amazon_sub_df.user_id_str"), col("amazon_sub_df.user_followers_count"), col("amazon_sub_df.text_"))
amazon_sub_df2.show(20, False)
# Total number of rows
amazon_sub_df2.count()
# Checking number of rows (tweets) for that user again
amazon_sub_df2.filter(amazon_sub_df2.user_id_str == '85741735.0').count()
# Creating a filter to find popular users who have more than 5000 followers
popular_df = amazon_sub_df2.filter(amazon_sub_df2.user_followers_count >= 5000)
popular_df.count() # number of rows
popular_df.show(20, False)
# Check: Sorting follower count Ascending
popular_df.sort(popular_df.user_followers_count).show(20, False)
# Check: Sorting follower count Descending
popular_df.sort((popular_df.user_followers_count).desc()).show(20, False)
# Number of distinct users
popular_df.select('user_id_str').distinct().count()
# Finding number of tweets per user using groupBy
groupedUsers = popular_df.groupby('user_id_str').count().withColumnRenamed("count","tweet_count")
# Sorting in descending order of count
groupedUsers.sort((groupedUsers.tweet_count).desc()).show(20)
# Counting words frequency of the tweets posted by the popular users from step 4
# Reading column 'text_' from spark df (popular_df) to a python list, which will then be read into RDD object 
tweet = popular_df.select("text_").rdd.flatMap(lambda x: x).collect()
tweet[0:5]
# Creating Spark Context
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
# Reading 'tweet' to a RDD object using spark context
tweet_rdd = sc.parallelize(tweet)
tweet_rdd.take(5)
# Function for cleaning the text of tweets
import string
import re

def clean_tweet(x):
  
  # Delete all the URLs in the tweets
  text00 = re.sub(r'www\S+', '', x)
  text01 = re.sub(r'http\S+', '', text00)
  
  # Delete all the numbers in the tweets
  text1 = ''.join([i for i in text01 if not i.isdigit()])
  
  # Delete all the punctuation marks in the tweets
  text2 = text1.translate(str.maketrans('','',string.punctuation))
  
  # Convert text to LOWERCASE
  text3 = text2.lower()

  return text3

# Cleaning the text of tweets: 1. Removing URLs, 2. Removing non-alphabets, 3. Lowercase 
clean_tweet_rdd = tweet_rdd.map(clean_tweet)
clean_tweet_rdd.take(10)
# Building Map function
map = clean_tweet_rdd.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1))
map.take(5)
# Building Reduce function
counts = map.reduceByKey(lambda a, b: a + b)
counts.take(5)
# Total number of distinct words
print(len(counts.collect()))

# Getting the Top 10 most popular words and their words frequency
# Sorting 'counts' in descending order and getting the first 10 elements
countsSortedTopTen = counts.takeOrdered(10, lambda a: -a[1] if len(a[0]) > 0 else False) # Conditioned on that number of characters in the string should be at least 1
countsSortedTopTen