In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
%cd /content/drive/My Drive/561 Lab and assign

/content/drive/My Drive/561 Lab and assign


Spark is written in the Scala programming language and requires the Java Virtual Machine (JVM) to run. Therefore, our first task is to download Java.

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Get Spark installer (check the path on spark.apache.org)

In [None]:
!wget -v https://dlcdn.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz

--2021-10-24 02:00:48--  https://dlcdn.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
Resolving dlcdn.apache.org (dlcdn.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to dlcdn.apache.org (dlcdn.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 228834641 (218M) [application/x-gzip]
Saving to: ‘spark-3.1.2-bin-hadoop3.2.tgz.6’


2021-10-24 02:01:02 (51.3 MB/s) - ‘spark-3.1.2-bin-hadoop3.2.tgz.6’ saved [228834641/228834641]



# Check if the file is copied

In [None]:
!ls

 Amazon_Responded_Oct05.csv	 spark-3.1.2-bin-hadoop3.2.tgz.1
 conversation_doc.gdoc		 spark-3.1.2-bin-hadoop3.2.tgz.2
 conversation_example.txt	 spark-3.1.2-bin-hadoop3.2.tgz.3
 Final_DF2.csv			 spark-3.1.2-bin-hadoop3.2.tgz.4
 Final_DF.csv			 spark-3.1.2-bin-hadoop3.2.tgz.5
 find_text.csv			 spark-3.1.2-bin-hadoop3.2.tgz.6
 HW2.ipynb			 TestOP.csv
'IDS561lab2(1).ipynb'		 Untitled
 spark-3.1.2-bin-hadoop3.2	 Word_count.csv
 spark-3.1.2-bin-hadoop3.2.tgz


In [None]:
!pwd

/content/drive/My Drive/561 Lab and assign


# Untar the Spark installer

In [None]:
!tar -xvf spark-3.1.2-bin-hadoop3.2.tgz

In [None]:
!ls 

 Amazon_Responded_Oct05.csv	 spark-3.1.2-bin-hadoop3.2.tgz.1
 conversation_doc.gdoc		 spark-3.1.2-bin-hadoop3.2.tgz.2
 conversation_example.txt	 spark-3.1.2-bin-hadoop3.2.tgz.3
 Final_DF2.csv			 spark-3.1.2-bin-hadoop3.2.tgz.4
 Final_DF.csv			 spark-3.1.2-bin-hadoop3.2.tgz.5
 find_text.csv			 spark-3.1.2-bin-hadoop3.2.tgz.6
 HW2.ipynb			 TestOP.csv
'IDS561lab2(1).ipynb'		 Untitled
 spark-3.1.2-bin-hadoop3.2	 Word_count.csv
 spark-3.1.2-bin-hadoop3.2.tgz


# Install findspark - a python library to find Spark

In [None]:
!pip install -q findspark

# Set environment variables
Set Java and Spark home based on the location where they are stored

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/drive/My Drive/561 Lab and assign/spark-3.1.2-bin-hadoop3.2"

As we know, in previous versions, sparkContext is the entry point for Spark. As RDD was the main API, it was created and manipulated using context APIs. For every other API, we needed to use a different context.

For streamin, we needed streamingContext. For SQL, sqlContext, and for Hive, hiveContext. But as DataSet and DataFrame APIs are becoming new standalone APIs, we need an entry point build for them. So in Spark 2.0, we have a new entry point build for DataSet and DataFrame APIs called as SparkSession.

SparkSession is an entry point to underlying PySpark functionality in order to programmatically create PySpark RDD, DataFrame.

SparkContext is the entry gate of Apache Spark functionality and the most important step of any Spark driver application is to generate SparkContext which represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.


Another important study source: https://dzone.com/articles/introduction-to-spark-session

# Creat a local Spark session

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
# https://sparkbyexamples.com/spark/sparksession-explained-with-examples/

df = spark.createDataFrame([{"Google": "Colab","Spark": "Scala"} ,{"Google": "Dataproc","Spark":"Python"}])
df.show()

+--------+------+
|  Google| Spark|
+--------+------+
|   Colab| Scala|
|Dataproc|Python|
+--------+------+



##

In [None]:

from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType

In [None]:
# DataFrameReader is the foundation for reading data in Spark, it can be accessed via the attribute spark.read
df_amzn = spark.read.format('csv') \
                .option('header',True) \
                .option('multiLine', True) \
                .load('Amazon_Responded_Oct05.csv')

In [None]:
df_amzn.show(5)

+--------------------+--------------------+----------------+-----------+-------------------+---------------------+--------------+-----------------+--------------+--------------------+-------------+-------------+--------------------+------------------+--------------------+--------------+--------------------+--------------+---------+-----------------------+-------------------------+-----------------------+-------------+---------+--------------------+
|              id_str|    tweet_created_at|user_screen_name|user_id_str|user_statuses_count|user_favourites_count|user_protected|user_listed_count|user_following|    user_description|user_location|user_verified|user_followers_count|user_friends_count|     user_created_at|tweet_language|               text_|favorite_count|favorited|in_reply_to_screen_name|in_reply_to_status_id_str|in_reply_to_user_id_str|retweet_count|retweeted|                text|
+--------------------+--------------------+----------------+-----------+-------------------+--

In [None]:
type(df_amzn)

pyspark.sql.dataframe.DataFrame

## Task 1

#### Step 1

In [None]:
# Remove the rows where user_verified is False.
df_amzn1 = df_amzn.filter(df_amzn.user_verified == "True")
df_amzn1.show(5)

+--------------------+--------------------+----------------+-----------+-------------------+---------------------+--------------+-----------------+--------------+--------------------+-------------+-------------+--------------------+------------------+--------------------+--------------+--------------------+--------------+---------+-----------------------+-------------------------+-----------------------+-------------+---------+--------------------+
|              id_str|    tweet_created_at|user_screen_name|user_id_str|user_statuses_count|user_favourites_count|user_protected|user_listed_count|user_following|    user_description|user_location|user_verified|user_followers_count|user_friends_count|     user_created_at|tweet_language|               text_|favorite_count|favorited|in_reply_to_screen_name|in_reply_to_status_id_str|in_reply_to_user_id_str|retweet_count|retweeted|                text|
+--------------------+--------------------+----------------+-----------+-------------------+--

#### Step 2

In [None]:
df_amzn1.count()

171885

In [None]:
# Converting the tweet_created_at to created_date in the format "Nov 01"
from pyspark.sql.functions import substring
df_amzn_date = df_amzn1.withColumn('Created_Date', substring('tweet_created_at',5,6))
df_amzn_date.show(5)

#df_amzn1.groupby("user_screen_name").count().show()

+--------------------+--------------------+----------------+-----------+-------------------+---------------------+--------------+-----------------+--------------+--------------------+-------------+-------------+--------------------+------------------+--------------------+--------------+--------------------+--------------+---------+-----------------------+-------------------------+-----------------------+-------------+---------+--------------------+------------+
|              id_str|    tweet_created_at|user_screen_name|user_id_str|user_statuses_count|user_favourites_count|user_protected|user_listed_count|user_following|    user_description|user_location|user_verified|user_followers_count|user_friends_count|     user_created_at|tweet_language|               text_|favorite_count|favorited|in_reply_to_screen_name|in_reply_to_status_id_str|in_reply_to_user_id_str|retweet_count|retweeted|                text|Created_Date|
+--------------------+--------------------+----------------+--------

In [None]:
# Following dateframe gives the count of number of tweets each day
df_amzn_date1 = df_amzn_date.groupby("Created_Date").count()
df_amzn_date1.show(5)

+------------+-----+
|Created_Date|count|
+------------+-----+
|      Nov 27|  208|
|      Mar 22|  506|
|      Apr 16|  632|
|      Apr 27|  394|
|      May 24|  843|
+------------+-----+
only showing top 5 rows



#### Step 3

In [None]:
amzn_sorted = df_amzn_date1.sort("count", ascending = False)
amzn_sorted.show(5)
# The result shows that Jan 03 had highest number of tweets

+------------+-----+
|Created_Date|count|
+------------+-----+
|      Jan 03| 1536|
|      Jan 10| 1508|
|      Jan 11| 1496|
|      Jan 12| 1410|
|      Jan 06| 1363|
+------------+-----+
only showing top 5 rows



In [None]:
df_amzn_date2 = df_amzn_date.filter(df_amzn_date.Created_Date=='Jan 03' )
df_sum_t = df_amzn_date2.withColumn('sum_fav_retwt', df_amzn_date2.favorite_count + df_amzn_date2.retweet_count)\
            .select('id_str','Created_Date','user_screen_name', 'sum_fav_retwt', 'text_' )\
            .sort('sum_fav_retwt', ascending = False)

df_sum_t.show()
#groupby("Created_Date").count()

+--------------------+------------+----------------+-------------+--------------------+
|              id_str|Created_Date|user_screen_name|sum_fav_retwt|               text_|
+--------------------+------------+----------------+-------------+--------------------+
|'816329761530093568'|      Jan 03|  bhagyashree123|          5.0|@amazon worst sho...|
|'816083406962434048'|      Jan 03|      AmazonHelp|          3.0|@ItsJosshA We alw...|
|'816086117938319360'|      Jan 03|      AmazonHelp|          2.0|@ItsJosshA Oh no!...|
|'816157517428523008'|      Jan 03|      AmazonHelp|          2.0|@ratbones666 You ...|
|'816217909819297792'|      Jan 03|      AmazonHelp|          2.0|@ThorpPerrow Awww...|
|'816095108013654017'|      Jan 03|      AmazonHelp|          2.0|@KStefl Sounds li...|
|'816314295680110593'|      Jan 03|      AmazonHelp|          2.0|@thedexterouz Hi!...|
|'816109446069911554'|      Jan 03|      AmazonHelp|          2.0|@Schoey1992 Happy...|
|'816323706431668226'|      Jan 

In [None]:
print(df_sum_t.count())

1536


In [None]:
#df_sum_t.take(100)
#https://sparkbyexamples.com/spark/show-top-n-rows-in-spark-pyspark/
df_top_100 = df_sum_t.limit(100)
df_top_100.show(4)

+--------------------+------------+----------------+-------------+--------------------+
|              id_str|Created_Date|user_screen_name|sum_fav_retwt|               text_|
+--------------------+------------+----------------+-------------+--------------------+
|'816329761530093568'|      Jan 03|  bhagyashree123|          5.0|@amazon worst sho...|
|'816083406962434048'|      Jan 03|      AmazonHelp|          3.0|@ItsJosshA We alw...|
|'816086117938319360'|      Jan 03|      AmazonHelp|          2.0|@ItsJosshA Oh no!...|
|'816095108013654017'|      Jan 03|      AmazonHelp|          2.0|@KStefl Sounds li...|
+--------------------+------------+----------------+-------------+--------------------+
only showing top 4 rows



In [None]:
top100 = df_top_100.select("id_str", "Created_Date", "user_screen_name", "sum_fav_retwt", "text_")
type(top100)

pyspark.sql.dataframe.DataFrame

In [None]:
# https://mashimo.wordpress.com/tag/spark/
# the most difficult part for me
# https://people.duke.edu/~ccc14/sta-663-2018/notebooks/S15C_Spark_DataFrames.html

df1_top100 = top100.select(trim(lower(regexp_replace('text_',r'@', r' '))).alias('text_clean'))                       # replace @ with white space
df1_top100 = df1_top100.select(trim(lower(regexp_replace('text_clean',r'_', r' '))).alias('text_clean1'))             # replace _ with white space
df1_top100 = df1_top100.select(regexp_replace('text_clean1','[\s]+', ' ').alias('text_clean2'))                       # replace [\s]+ with single white space character. [\s]+ refers to one and more than one white space characeters
df_100_clean = df1_top100.select(trim(lower(regexp_replace('text_clean2','[^a-zA-Z0-9]',' '))).alias('text_clean3'))  # ^ inverts the meaning. Replace any non alphanumeric character with space

df_100_clean.show(10)

+--------------------+
|         text_clean3|
+--------------------+
|amazon worst shop...|
|itsjossha we alwa...|
|itsjossha oh no  ...|
|kstefl sounds lik...|
|schoey1992 happy ...|
|ratbones666 you s...|
|thorpperrow awww ...|
|thedexterouz hi  ...|
|matt linsley plea...|
|vlslt sorry to he...|
+--------------------+
only showing top 10 rows



In [None]:
word_count = df_100_clean.select(explode(split(df_100_clean.text_clean3," ")).alias("Word")).groupBy("Word").count()

word_count.show(100)

+---------------+-----+
|           Word|count|
+---------------+-----+
|         amazon|    8|
|          worst|    1|
|       shopping|    4|
|     experience|    9|
|               |  470|
|             no|    5|
|        service|    1|
|    substantial|    1|
|          reply|    2|
|             to|   65|
|     complaints|    1|
|       delivery|    6|
|            for|   50|
|              1|    6|
|           week|    1|
|           post|    1|
|      guarantee|    3|
|           date|    5|
|      itsjossha|    2|
|             we|   46|
|         always|    3|
|            aim|    2|
|        deliver|    4|
|             by|    8|
|            the|   65|
|          given|    1|
|             in|   13|
|           your|   34|
|   confirmation|    1|
|          email|    3|
|           have|   18|
|         missed|    1|
|           that|   20|
|            any|    8|
|         update|    2|
|       tracking|    5|
|             nf|    1|
|             oh|    1|
|              i

In [None]:
type(word_count)
word_count.write.csv("/content/drive/My Drive/561 Lab and assign/Word_count.csv", header = True)

In [None]:
Word_cnt = spark.read.format('csv') \
                .option('header',True) \
                .option('multiLine', True) \
                .load('Word_count.csv')

Word_cnt.show(5)

+----------+-----+
|      Word|count|
+----------+-----+
|    amazon|    8|
|     worst|    1|
|  shopping|    4|
|experience|    9|
|      null|  470|
+----------+-----+
only showing top 5 rows



##Task 2

In [None]:
# Load a find_text.csv file as Dataframe
find_file = spark.read.format('csv') \
                .option('header',True) \
                .option('multiLine', True) \
                .load('find_text.csv')

find_file.show()

+--------------------+----+
|              id_str|text|
+--------------------+----+
|'793270689780203520'|null|
|'793281386912354304'|null|
|'793299404975247360'|null|
|'793301295255945216'|null|
|'793315815411978240'|null|
|'793322306848292864'|null|
|'793322433625415680'|null|
|'793365409047023616'|null|
|'793369654878232577'|null|
|'793375905280393216'|null|
|'793376242837823488'|null|
|'793378044052406272'|null|
|'793378188416131072'|null|
|'793379112685568000'|null|
|'793381418395136000'|null|
|'793382930085253121'|null|
|'793383832720474113'|null|
|'793386133434593280'|null|
|'793386974459682816'|null|
|'793390636619759616'|null|
+--------------------+----+
only showing top 20 rows



In [None]:
find_DF = find_file.withColumnRenamed('id_str', 'find_id')

In [None]:
# https://sparkbyexamples.com/pyspark/pyspark-join-explained-with-examples/
joined_DF = find_DF.join(df_amzn,find_DF.find_id == df_amzn.id_str,"left") 

#joined_DF.show(10)
joined_DF.select("id_str","text_").show()
Final_DF = joined_DF.select("id_str","text_")

Final_DF.write.csv("/content/drive/My Drive/561 Lab and assign/Final_DF.csv", header = True)

+--------------------+--------------------+
|              id_str|               text_|
+--------------------+--------------------+
|'793552243689676800'|@AmazonHelp amazo...|
|'793595479116439552'|@SuNoSuKo Please ...|
|'793751757247619072'|@aasifkhan Hey, I...|
|'793788930361556996'|@AmazonHelp I wou...|
|'793797330310983680'|Disgusting servic...|
|'793819060505108480'|@Joe_Twells1 Sorr...|
|'793828605746896897'|Made an order usi...|
|'793899985444241408'|@JohnnyHeatrock H...|
|'793903133462331392'|@mrniceguy1987 Pl...|
|'793904094595444736'| @AmazonHelp will do|
|'793920564628955137'|@AmazonHelp need ...|
|'793923389203316736'|@magicwaz Sorry f...|
|'794220779768451072'|@AmazonHelp I cou...|
|'794263348447678464'|@Nikki_Revak I'm ...|
|'794279214388285440'|@samthejewishguy ...|
|'794280604221276160'|@jesskatopps Sorr...|
|'794305189926858753'|@AmazonHelp my pa...|
|'794314585608167425'|@KrishnaNand162 H...|
|'794565369084186625'|@shiv_iipm Please...|
|'794648919720493061'|@_Balaji_ 

In [None]:
Final_DF.count()

53927

In [None]:
fx = spark.read.format('csv') \
                .option('header',True) \
                .option('multiLine', True) \
                .load('Final_DF.csv')

fx.show(5)

+--------------------+--------------------+
|              id_str|               text_|
+--------------------+--------------------+
|'793562181732339712'|@AmazonHelp Yes.I...|
|'793583212970651648'|@amazonIN LMAO! U...|
|'793745541398167552'|@AmazonHelp my or...|
|'793767884505907201'|@AmazonHelp Alrea...|
|'793844938916106240'|@AmazonHelp Hi, j...|
+--------------------+--------------------+
only showing top 5 rows



In [None]:
joined_DF = find_DF.join(df_amzn,find_DF.find_id == df_amzn.id_str,"inner") 

#joined_DF.show(10)
joined_DF.select("id_str","text_").show()
Final_DF2 = joined_DF.select("id_str","text_")

Final_DF2.write.csv("/content/drive/My Drive/561 Lab and assign/Final_DF2.csv", header = True)

+--------------------+--------------------+
|              id_str|               text_|
+--------------------+--------------------+
|'793270689780203520'|@AmazonHelp Can y...|
|'793281386912354304'|@SeanEPanjab I'm ...|
|'793501578766319616'|@AmazonHelp It wa...|
|'793501657346682880'|@AmazonHelp I am ...|
|'793502854459879424'|@SeanEPanjab Plea...|
|'793504235400884224'|@SeanEPanjab With...|
|'793511847899070465'|@AmazonHelp It wa...|
|'793511899279208449'|@AmazonHelp if it...|
|'793513446633533440'|@SeanEPanjab I'm ...|
|'793299404975247360'|@JeffBezos @amazo...|
|'793301295255945216'|@aakashwangnoo Hi...|
|'793407430344310785'|@AmazonHelp How m...|
|'793423313674571776'|@aakashwangnoo Hi...|
|'793423314333134850'|@aakashwangnoo Pl...|
|'793467086869630977'|@AmazonHelp @amaz...|
|'793492430666498050'|@aakashwangnoo Hi...|
|'793535036213501952'|@AmazonHelp @amaz...|
|'793535221329113088'|@AmazonHelp @amaz...|
|'793537840533471232'|@AmazonHelp @amaz...|
|'793538125884645376'|@AmazonHel

In [None]:
Final_DF2.count()

53922