In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
%cd /content/drive/MyDrive/Spark

/content/drive/MyDrive/Spark


In [3]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [4]:
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

In [5]:
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

In [6]:
!pip install findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [7]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/drive/MyDrive/Spark/spark-3.0.0-bin-hadoop3.2"

In [8]:
import findspark
findspark.init()

In [9]:
from pyspark.sql import *

spark = SparkSession.builder.getOrCreate()
#from pyspark.sql.functions import trim, to_date, year, month

In [10]:
from pyspark import SparkContext

sc=SparkContext.getOrCreate()

In [11]:
spark = SparkSession.builder \
    .master("local") \
    .appName("Word Count") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [12]:
# Reading the Amazon Responds file via spark
df=spark.read.format("csv").option("multiLine", True).option("header", True).option("escape", "\"").load("Amazon_Responded_Oct05.csv") 

In [14]:
df.show() #displaying top 20 rows

+--------------------+--------------------+----------------+-----------+-------------------+---------------------+--------------+-----------------+--------------+--------------------+-------------+-------------+--------------------+------------------+--------------------+--------------+--------------------+--------------+---------+-----------------------+-------------------------+-----------------------+-------------+---------+--------------------+
|              id_str|    tweet_created_at|user_screen_name|user_id_str|user_statuses_count|user_favourites_count|user_protected|user_listed_count|user_following|    user_description|user_location|user_verified|user_followers_count|user_friends_count|     user_created_at|tweet_language|               text_|favorite_count|favorited|in_reply_to_screen_name|in_reply_to_status_id_str|in_reply_to_user_id_str|retweet_count|retweeted|               text|
+--------------------+--------------------+----------------+-----------+-------------------+--

In [15]:
# Checking count of dataframe
print("Total number of records:")
df.count()

Total number of records:


462030

In [16]:
##############################################################################################################
# CLEANING & PREPARING DATAFRAME
##############################################################################################################

# Removing duplicates if present
df=df.dropDuplicates()

In [17]:
# Removing rows with all fields empty
df = df.dropna(how='all', thresh=2) # Since blank space in 'text' col is not being considered as NA, we'll use thresh argument
df.show()

+--------------------+--------------------+----------------+-----------+-------------------+---------------------+--------------+-----------------+--------------+--------------------+--------------------+-------------+--------------------+------------------+--------------------+--------------+--------------------+--------------+---------+-----------------------+-------------------------+-----------------------+-------------+---------+--------------------+
|              id_str|    tweet_created_at|user_screen_name|user_id_str|user_statuses_count|user_favourites_count|user_protected|user_listed_count|user_following|    user_description|       user_location|user_verified|user_followers_count|user_friends_count|     user_created_at|tweet_language|               text_|favorite_count|favorited|in_reply_to_screen_name|in_reply_to_status_id_str|in_reply_to_user_id_str|retweet_count|retweeted|               text|
+--------------------+--------------------+----------------+-----------+--------

In [19]:
# Checking row count after removing empty row ('with all fiels missing')
print("Row count after removing empty rows: ")
df.count()

Row count after removing empty rows: 


378134

In [22]:
# Selecting the 3 columns we will be using : tweet_created_at, user_screen_name, user_id_str 

from pyspark.sql.functions import *

df=df.select(col("tweet_created_at"),col("user_screen_name"),col("user_id_str"))
df.show(5,False) #displaying top 5 rows

+------------------------------+----------------+-----------+
|tweet_created_at              |user_screen_name|user_id_str|
+------------------------------+----------------+-----------+
|Tue Nov 01 17:43:21 +0000 2016|xD4ni3ll38x     |315105182  |
|Tue Nov 01 18:31:09 +0000 2016|robertcaldecott |175322072  |
|Wed Nov 02 07:01:18 +0000 2016|comper1983      |4755678875 |
|Wed Nov 02 09:26:54 +0000 2016|AmazonHelp      |85741735   |
|Wed Nov 02 07:38:07 +0000 2016|Afitron         |2347212330 |
+------------------------------+----------------+-----------+
only showing top 5 rows



In [29]:
##########################################################################################################
# STEP 1 : Finding Users active on at least 5 days
##########################################################################################################

# Creating new column where we're storing extracted date from 'tweet_created_at'

from pyspark.sql.functions import substring
df=df.withColumn("Day_active", substring(col("tweet_created_at"),4,7))

In [30]:
df.show(5)

+--------------------+----------------+-----------+----------+
|    tweet_created_at|user_screen_name|user_id_str|Day_active|
+--------------------+----------------+-----------+----------+
|Tue Nov 01 17:43:...|     xD4ni3ll38x|  315105182|    Nov 01|
|Tue Nov 01 18:31:...| robertcaldecott|  175322072|    Nov 01|
|Wed Nov 02 07:01:...|      comper1983| 4755678875|    Nov 02|
|Wed Nov 02 09:26:...|      AmazonHelp|   85741735|    Nov 02|
|Wed Nov 02 07:38:...|         Afitron| 2347212330|    Nov 02|
+--------------------+----------------+-----------+----------+
only showing top 5 rows



In [35]:
# Creating a view with this dataframe to use sparksql
df.createOrReplaceTempView("df_view")

# Selecting the users_screen_name and user_id of those who have been active for more than 5 distinct days 
daily_active_users=spark.sql("select user_screen_name,user_id_str from df_view group by user_screen_name,user_id_str having count(distinct day_active) >=5")

print("Count of Users active on more than 5 days: ", daily_active_users.count())
print()
print("Displaying the top 5 rows of active user table: ")
daily_active_users.show(5)

Count of Users active on more than 5 days:  593

Displaying the top 5 rows of active user table: 
+----------------+-----------+
|user_screen_name|user_id_str|
+----------------+-----------+
|   Gentlemen_Sam|  441572163|
|          MtnrMS| 3309102108|
|  sky_regenrated|  483059773|
|  whisperandmoan|  113516042|
|       SkullyRox|   20391647|
+----------------+-----------+
only showing top 5 rows



In [43]:
# Writing obtained df (active users) into csv file

#daily_active_users.write.csv("daily_active_users")

daily_active_users.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save("daily_active_users.csv") #to write as a single .csv file

In [39]:
#############################################################################################################
# STEP 2 :  WORKING WITH THE EXPERIMENT.TXT FILE
############################################################################################################

#Reading the experiment file
df2=spark.read.option("header","false").csv("experiment.txt")
df2.show(5)

+----------+
|       _c0|
+----------+
| 143515471|
|  85741735|
|  71457972|
|2908108256|
| 106799492|
+----------+
only showing top 5 rows



In [41]:
# Creating views for dataframes to use sparksql
df2.createOrReplaceTempView("exp_users")
daily_active_users.createOrReplaceTempView('active_users')

In [65]:
# Creating 'experiment_user' table by using left join, if and isnull statement
experiment_user = spark.sql('select _c0 as user_id_str, if(isnull(au.user_screen_name),"No","Yes") as whether_active from exp_users eu left join active_users au on eu._c0 = au.user_id_str')

In [66]:
experiment_user.show()

+-----------+--------------+
|user_id_str|whether_active|
+-----------+--------------+
|   11798342|            No|
| 1210875679|            No|
|  128257538|            No|
|  137088213|            No|
|  142202059|            No|
|   14291504|            No|
|   16279527|            No|
|  163148814|            No|
|   20110100|            No|
|  211972025|            No|
| 2176836186|            No|
|  234641258|            No|
| 2372082613|            No|
|   26915435|            No|
|   27840175|            No|
|  301131509|            No|
| 3028486809|            No|
|  305743837|            No|
| 3196213653|            No|
|  348844543|            No|
+-----------+--------------+
only showing top 20 rows



In [67]:
# Calculating percentage of active users in the experiment_user table
experiment_user.createOrReplaceTempView("exp_user")
spark.sql("select sum(if(whether_active=='Yes',1,0))/count(*)*100 as percentage_of_active_users from exp_user").show()

+--------------------------+
|percentage_of_active_users|
+--------------------------+
|                      2.42|
+--------------------------+



In [71]:
experiment_user.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save("experiment_users.csv") #to write as a single .csv file

In [77]:
#########################################################################################################################
# STEP 3 : FINAL EXPERIMENT
#########################################################################################################################

# Reading the final_experiment.csv file
df3=spark.read.option("header","True").csv("final_experiment.csv")

In [79]:
print("Count of rows:",df3.count())
print("Disaplaying final_experiment dataframe:")
df3.show()

Count of rows: 4500
Disaplaying final_experiment dataframe:
+-----------+----+--------------+----------------+
|user_id_str|info|whether_active|user_screen_name|
+-----------+----+--------------+----------------+
|   62364020|   F|          null|            null|
| 2706101936|   M|          null|            null|
|    5654472|   F|          null|            null|
|  145579921|   F|          null|            null|
| 2502172122|   M|          null|            null|
|  243716471|   F|          null|            null|
| 2610379644|   M|          null|            null|
|  123138418|   M|          null|            null|
|  257376764|   F|          null|            null|
|  269145593|   M|          null|            null|
|  370711133|   F|          null|            null|
| 1510968974|   F|          null|            null|
| 3526380922|   M|          null|            null|
|  163413904|   F|          null|            null|
|   16980347|   M|          null|            null|
| 1209614366|   M|    

In [81]:
# creating view of this dataframe to work on using sparksql
df3.createOrReplaceTempView('final_exp')

In [111]:
final_experiment = spark.sql('select distinct f.user_id_str, info, ifnull(e.whether_active,"NOT FOUND") as whether_active, ifnull(d.user_screen_name,"NOT FOUND") as user_screen_name from final_exp f left join exp_user e on f.user_id_str = e.user_id_str left join df_view d on e.user_id_str = d.user_id_str ')

# Joining Steps:
# 1) First we do a left join on final_experiment with experiment_user (for 'whether_active fiel)
# 2) Then we perform another left join on the resulting dataframe with df_view (the dataframe with 'tweet_created_at' column) for user_screen_name field
# 3) Now since there are multiple dates for a single user, the reulting df will have multiple rows for same user so we use 'DISTINCT' while selecting user_id

In [116]:
# Displaying final dataframe with User ID, info, active, name
final_experiment.show()

+-----------+----+--------------+----------------+
|user_id_str|info|whether_active|user_screen_name|
+-----------+----+--------------+----------------+
|  103958741|   F|            No|     ThunderPhat|
| 2234018858|   F|            No|      mirafianti|
| 1873374613|   M|            No|         DRayG86|
|  361176156|   F|            No|  PhilBradshaw07|
|  843798637|   M|            No|          Jimzo_|
|7.03795E+17|   M|     NOT FOUND|       NOT FOUND|
| 7.9107E+17|   M|     NOT FOUND|       NOT FOUND|
| 7.7112E+17|   F|     NOT FOUND|       NOT FOUND|
|  255430770|   F|            No|         willytm|
|  195814998|   M|            No|       Aprilcox_|
| 2614781971|   M|            No|   joannednguyen|
|  873296534|   M|            No| fabulousflossie|
|   27732663|   M|            No|        Vstokes_|
| 3380629378|   M|            No|      ssoleima92|
|  813912673|   M|            No| philaeagles2000|
| 3018271286|   F|            No|  GlueNotGlitter|
|  457415010|   M|            N

In [115]:
# Displaying total no. of records in final_experiment
print("No. of records:", final_experiment.count())

No. of records: 4501


In [117]:
# The reason the count is 4501 instead of 4500 is because of user_id: 68279679 !
# This particular user has 2 unique screen_name ('ColdKERNAL' and 'tazryder'), hence the final_experiment dataframe has 2 records for this one user.

In [114]:
final_experiment.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save("final_experiments.csv") # writting to .csv