**Installing Spark**

In [None]:
!tar -xvf spark-3.1.2-bin-hadoop3.2.tgz

# Installing the library to install the spark in python
!pip install -q findspark
!pip install pyspark

tar: spark-3.1.2-bin-hadoop3.2.tgz: Cannot open: No such file or directory
tar: Error is not recoverable: exiting now
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import pandas as pd
import pyspark
from pyspark import SparkContext, SQLContext
import csv
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mp
import seaborn as sns

# Importing functions to perform data cleaning
import pyspark.sql.functions as sql
from pyspark.sql.functions import split  
from pyspark.sql.functions import col, expr, when

In [None]:
# Opening local Spark session
import findspark
findspark.init()
from pyspark.sql import SparkSession
sprk = SparkSession.builder.master("local[*]").getOrCreate()

**Loading the Dataset**

In [None]:
# Loading the Amazon CSV
amazon = sprk.read.csv('Amazon_Responded_Oct05.csv', header = True, multiLine=True, sep=',',quote="\"",escape="\"")
amazon.show(10)

# Loading the Experiment CSV
experiment = sprk.read.csv('experiment.txt', header = False, multiLine=True, sep=',',quote="\"",escape="\"")
experiment.show(10)

# Loading the Final Experiment CSV
final = sprk.read.csv('final_experiment.csv', header = True, multiLine=True, sep=',',quote="\"",escape="\"")
final.show(10)

+--------------------+--------------------+----------------+-----------+-------------------+---------------------+--------------+-----------------+--------------+--------------------+-------------+-------------+--------------------+------------------+--------------------+--------------+--------------------+--------------+---------+-----------------------+-------------------------+-----------------------+-------------+---------+--------------------+
|              id_str|    tweet_created_at|user_screen_name|user_id_str|user_statuses_count|user_favourites_count|user_protected|user_listed_count|user_following|    user_description|user_location|user_verified|user_followers_count|user_friends_count|     user_created_at|tweet_language|               text_|favorite_count|favorited|in_reply_to_screen_name|in_reply_to_status_id_str|in_reply_to_user_id_str|retweet_count|retweeted|                text|
+--------------------+--------------------+----------------+-----------+-------------------+--

**PreProcessing the Datasets**

In [None]:
# Filtering out the Amazon File

## Since we only require 3 columns, we use select rather than drop function

amazon = amazon.select('tweet_created_at','user_screen_name','user_id_str')
amazon.show(10)
print(amazon.count())

+--------------------+----------------+-----------+
|    tweet_created_at|user_screen_name|user_id_str|
+--------------------+----------------+-----------+
|Tue Nov 01 01:57:...|     SeanEPanjab|  143515471|
|Tue Nov 01 02:39:...|      AmazonHelp|   85741735|
|Tue Nov 01 17:14:...|     SeanEPanjab|  143515471|
|Tue Nov 01 17:15:...|     SeanEPanjab|  143515471|
|Tue Nov 01 17:19:...|      AmazonHelp|   85741735|
|Tue Nov 01 17:25:...|      AmazonHelp|   85741735|
|Tue Nov 01 17:55:...|     SeanEPanjab|  143515471|
|Tue Nov 01 17:55:...|     SeanEPanjab|  143515471|
|Tue Nov 01 18:02:...|      AmazonHelp|   85741735|
|Tue Nov 01 03:51:...|   aakashwangnoo|   71457972|
+--------------------+----------------+-----------+
only showing top 10 rows

378134


In [None]:
# We need to clean data as it involves date format column and lookout for missing or duplicate values

# Converting the date format into readible format
date_new = split(amazon["tweet_created_at"], ' ')

# Separating the Month and Day from the attribute
amazon = amazon.withColumn('Month', date_new.getItem(1))
amazon = amazon.withColumn('Day', date_new.getItem(2))

# Joining (Concatenating) the date
amazon = amazon.withColumn("tweet_created_at",sql.concat(sql.col("Month"), sql.lit(" "), sql.col("Day")))
amazon= amazon.select('tweet_created_at','user_screen_name','user_id_str')

## After Cleaning 
amazon.show(10)
print(amazon.count())

+----------------+----------------+-----------+
|tweet_created_at|user_screen_name|user_id_str|
+----------------+----------------+-----------+
|          Nov 01|     SeanEPanjab|  143515471|
|          Nov 01|      AmazonHelp|   85741735|
|          Nov 01|     SeanEPanjab|  143515471|
|          Nov 01|     SeanEPanjab|  143515471|
|          Nov 01|      AmazonHelp|   85741735|
|          Nov 01|      AmazonHelp|   85741735|
|          Nov 01|     SeanEPanjab|  143515471|
|          Nov 01|     SeanEPanjab|  143515471|
|          Nov 01|      AmazonHelp|   85741735|
|          Nov 01|   aakashwangnoo|   71457972|
+----------------+----------------+-----------+
only showing top 10 rows

378134


In [None]:
# Removing the duplicates

amazon = amazon.dropDuplicates()
amazon.show(10)
print(amazon.count())

+----------------+----------------+-----------+
|tweet_created_at|user_screen_name|user_id_str|
+----------------+----------------+-----------+
|          Nov 01|     mybharatraj|  902137872|
|          Nov 01|     lisacatek75| 3526380922|
|          Nov 02|    GujarGaurang| 1188929052|
|          Nov 02|        dxman830|  186287415|
|          Nov 02|       JSpangs44|   23695124|
|          Nov 02|        BMorley5| 3127117599|
|          Nov 10| rashmirawat2912| 3939183493|
|          Nov 07|        SKRinten|  129697885|
|          Nov 07|    DuranDurrant| 4275197799|
|          Nov 07|    ThePixelGirl|   44668641|
+----------------+----------------+-----------+
only showing top 10 rows

93929


**STEP 1** : FIND USERS WHO ARE ACTIVE IN AT LEAST 5 DAYS

In [None]:
# Take only the unique user-created tweets for one day. Every day, one tweet.

amazon = amazon.select("user_screen_name","user_id_str", "tweet_created_at").distinct() 
amazon.show(10)

+----------------+-----------+----------------+
|user_screen_name|user_id_str|tweet_created_at|
+----------------+-----------+----------------+
|         Folferz|   94245683|          Nov 01|
|  JordanPennings|   21267682|          Nov 01|
|    ohthatandrew|  236301546|          Nov 01|
|    sandeepabhat|   92895763|          Nov 02|
| samthejewishguy|   49750455|          Nov 03|
|     louislagoon|  422753927|          Nov 04|
|   grouchypotato|   19451813|          Nov 04|
|  Olliecological|  339912317|          Nov 05|
|      kagesaikin| 4133902939|          Nov 05|
|     absolutwade|     689443|          Nov 05|
+----------------+-----------+----------------+
only showing top 10 rows



In [None]:
# Filtering our users with atleast 5 tweets

amazon = amazon.groupBy("user_screen_name","user_id_str").count() 
amazon = amazon.filter(amazon['count']>=5)

daily_active_users = amazon.select('user_screen_name', 'user_id_str')
daily_active_users.show(10)

print('Active users: ', daily_active_users.count())

+----------------+------------------+
|user_screen_name|       user_id_str|
+----------------+------------------+
|        remakoul|814372928695521280|
|       SkullyRox|          20391647|
|  whisperandmoan|         113516042|
|  sky_regenrated|         483059773|
|   Gentlemen_Sam|         441572163|
|          MtnrMS|        3309102108|
| roxyunderwood93|         295334669|
|        TCMuffin|          35591749|
|        trallyus|          11702402|
|  ChaurasiaRohin|706032993794527232|
+----------------+------------------+
only showing top 10 rows

Active users:  593


Therefore, we find that **593 active members** are present in the dataset who are active in at least 5 listed days

In [None]:
# Storing the result in the target file
daily_active_users.toPandas().to_csv('daily_active_users.csv')

**STEP 2:** FINDING IF THE USER IS ACTIVE OR NOT AND CALCULATING THE PERCENTAGE OF ACTIVE USERS

In [None]:
# Renaming the column
experiment = experiment.withColumnRenamed("_c0", "user_id")
experiment.show(10)

+----------+
|   user_id|
+----------+
| 143515471|
|  85741735|
|  71457972|
|2908108256|
| 106799492|
|  59156981|
| 902137872|
| 110354554|
|  97424433|
|  62364020|
+----------+
only showing top 10 rows



In [None]:
# Combine the two csv files on 'user_id'

new = experiment.join(daily_active_users,daily_active_users.user_id_str == experiment.user_id, how="left")
new.show(10)

+----------+----------------+-----------+
|   user_id|user_screen_name|user_id_str|
+----------+----------------+-----------+
| 106799492|            null|       null|
|  62364020|            null|       null|
|2908108256|            null|       null|
| 143515471|            null|       null|
|  97424433|            null|       null|
|2706101936|            null|       null|
| 110354554| praveen_pandey_|  110354554|
|  59156981|            null|       null|
|  71457972|            null|       null|
|  85741735|      AmazonHelp|   85741735|
+----------+----------------+-----------+
only showing top 10 rows



In [None]:
# Filtering the columns 
experiment_ = new.select('user_id', 'user_id_str')

# Assigning the null values to yes or no
experiment_ = experiment_.withColumn("user_id_str", when(experiment_.user_id_str.isNull(), 'no').otherwise(experiment_["user_id_str"]))
experiment_ = experiment_.withColumn("user_id_str", when(experiment_['user_id_str'] != 'no', 'yes').otherwise(experiment_["user_id_str"]))

# Renaming the columns
experiment_ = experiment_.withColumnRenamed('user_id_str', 'Whether_Active')

experiment_.show(10)
print("The number of Experiment is", experiment_.count(),".")

+----------+--------------+
|   user_id|Whether_Active|
+----------+--------------+
| 143515471|            no|
|  85741735|           yes|
|  71457972|            no|
|2908108256|            no|
| 106799492|            no|
|  59156981|            no|
| 902137872|           yes|
| 110354554|           yes|
|  97424433|            no|
|  62364020|            no|
+----------+--------------+
only showing top 10 rows

The number of Experiment is 5000 .


In [None]:
# Group by the active column
experiment_.groupBy("Whether_Active").count().show()

+--------------+-----+
|Whether_Active|count|
+--------------+-----+
|            no| 4879|
|           yes|  121|
+--------------+-----+



In [None]:
# Calculating the percentage of active users

no = experiment_.groupBy("Whether_Active").count().collect()[0]["count"]
yes = experiment_.groupBy("Whether_Active").count().collect()[1]["count"]
result = yes/(no+yes)*100
print("Percentage of active users", result, "%")

Percentage of active users 2.42 %


Thus the percentage of **active users is 2.42%**

In [None]:
# Store the result in the csv file
experiment_.toPandas().to_csv('Experiment_user.csv')

**STEP 3**: COMBINE THREE TABLES

In [None]:
# Explore the final experiment table to find the shape 

final.show(5)
print(final.count())

+-----------+----+--------------+----------------+
|user_id_str|info|whether_active|user_screen_name|
+-----------+----+--------------+----------------+
|   62364020|   F|          null|            null|
| 2706101936|   M|          null|            null|
|    5654472|   F|          null|            null|
|  145579921|   F|          null|            null|
| 2502172122|   M|          null|            null|
+-----------+----+--------------+----------------+
only showing top 5 rows

4500


In [None]:
# Filtering the final experiment table 

final = final.select('user_id_str', 'info') 
final.show(10)

+-----------+----+
|user_id_str|info|
+-----------+----+
|   62364020|   F|
| 2706101936|   M|
|    5654472|   F|
|  145579921|   F|
| 2502172122|   M|
|  243716471|   F|
| 2610379644|   M|
|  123138418|   M|
|  257376764|   F|
|  269145593|   M|
+-----------+----+
only showing top 10 rows



In [None]:
# In order to join three tables, I first apply left join on final experiment and experiment  

left_join = final.join(experiment_, experiment_.user_id == final.user_id_str, how= 'left')

# Left outer join on daily active users and final experiment

final_ = left_join.join(daily_active_users, daily_active_users.user_id_str == final.user_id_str, how='left_outer') 

In [None]:
final_ = final_.select('user_id','info','Whether_Active','user_screen_name')
final_.show()

+----------+----+--------------+----------------+
|   user_id|info|Whether_Active|user_screen_name|
+----------+----+--------------+----------------+
|1112166661|   F|            no|            null|
|1209614366|   M|            no|            null|
| 123138418|   M|            no|            null|
| 145579921|   F|            no|            null|
|1510968974|   F|            no|            null|
| 163413904|   F|            no|            null|
|  16980347|   M|            no|            null|
|1970607968|   M|            no|            null|
| 243716471|   F|            no|            null|
|2502172122|   M|            no|            null|
| 257376764|   F|            no|            null|
|2610379644|   M|            no|            null|
| 269145593|   M|            no|            null|
|2706101936|   M|            no|            null|
|3285473358|   F|           yes|    iwritegarima|
|3526380922|   M|            no|            null|
| 370711133|   F|            no|            null|


In [None]:
# Renaming the null values as not found

final_ = final_.na.fill("Not Found")
final_.show(10)
print("Total Records: ",final_.count())

+----------+----+--------------+----------------+
|   user_id|info|Whether_Active|user_screen_name|
+----------+----+--------------+----------------+
| 123138418|   M|            no|       Not Found|
| 145579921|   F|            no|       Not Found|
| 243716471|   F|            no|       Not Found|
|2502172122|   M|            no|       Not Found|
| 257376764|   F|            no|       Not Found|
|2610379644|   M|            no|       Not Found|
| 269145593|   M|            no|       Not Found|
|2706101936|   M|            no|       Not Found|
| 370711133|   F|            no|       Not Found|
|   5654472|   F|            no|       Not Found|
+----------+----+--------------+----------------+
only showing top 10 rows

Total Records:  4500


In [None]:
# Counting the number of yes and no
final_.groupBy("Whether_Active").count().show()

+--------------+-----+
|Whether_Active|count|
+--------------+-----+
|     Not Found|  262|
|            no| 4142|
|           yes|   96|
+--------------+-----+



Thus, we find that **96 members are active**

In [None]:
# Storing the results into the CSV

final_.toPandas().to_csv('Final_experiment_output.csv')


**Explanation as to how I joined the three tables:**
1. Applied the left join final_experiment.csv with experiment.csv (active users) since final-experiment is subset of the experiment_users.
Output: users who are active and inactive.

2. Applied the left outer join the above output with daily_active_users.csv so I get the needed active users and I keep the other id's as blank.