In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=c1fc806b528f26c5508af98be16defc034eeaa801ce5f5b5641b70b4e0f01c2e
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, countDistinct, when
from pyspark.sql import functions as F

In [3]:
# Create a SparkSession object
spark = SparkSession.builder \
    .appName("Amazon Tweets Analysis") \
    .master("local[2]") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

**Step 1:** Find out the users who are active in at least five listed days (i.e., created posts in at least 5 days) in Amazon_Responded_Oct05.csv and save their “user_screen_name” and “user_id_str” in the dataframe “daily_active_users” (see below). Report how many active users you find.

In [37]:
# Load the data
df = spark.read.csv("Amazon_Responded_Oct05.csv", header=True, multiLine=True, escape="\"")
df.show()

+--------------------+--------------------+----------------+-----------+-------------------+---------------------+--------------+-----------------+--------------+--------------------+-------------+-------------+--------------------+------------------+--------------------+--------------+--------------------+--------------+---------+-----------------------+-------------------------+-----------------------+-------------+---------+--------------------+
|              id_str|    tweet_created_at|user_screen_name|user_id_str|user_statuses_count|user_favourites_count|user_protected|user_listed_count|user_following|    user_description|user_location|user_verified|user_followers_count|user_friends_count|     user_created_at|tweet_language|               text_|favorite_count|favorited|in_reply_to_screen_name|in_reply_to_status_id_str|in_reply_to_user_id_str|retweet_count|retweeted|                text|
+--------------------+--------------------+----------------+-----------+-------------------+--

In [38]:
#Check the original date form in the column "tweet_created_at"
df.select("tweet_created_at").show(5, truncate=False)

+------------------------------+
|tweet_created_at              |
+------------------------------+
|Tue Nov 01 01:57:25 +0000 2016|
|Tue Nov 01 02:39:55 +0000 2016|
|Tue Nov 01 17:14:53 +0000 2016|
|Tue Nov 01 17:15:12 +0000 2016|
|Tue Nov 01 17:19:57 +0000 2016|
+------------------------------+
only showing top 5 rows



In [39]:
#Set the legacy time parser policy
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [40]:
#Transform the date form to YYYY/MM/DD and put them into a newly created column "date"
from pyspark.sql import functions as F

df = df.withColumn(
    "date",
    F.from_unixtime(
        F.unix_timestamp("tweet_created_at", "EEE MMM dd HH:mm:ss ZZZZZ yyyy"),
        "yyyy/MM/dd"
    )
)

In [41]:
#Show the newly created column "date"
df.select("date").show(5, truncate=False)

+----------+
|date      |
+----------+
|2016/11/01|
|2016/11/01|
|2016/11/01|
|2016/11/01|
|2016/11/01|
+----------+
only showing top 5 rows



In [42]:
# Group by 'user_screen_name' and 'user_id_str', and count distinct 'date'
active_users = df.groupBy('user_screen_name', 'user_id_str').agg(countDistinct('date').alias('active_days'))

In [43]:
# Filter users who are active in at least five listed days
daily_active_users = active_users.filter(col('active_days') >= 5)

In [44]:
# Show the dataframe
daily_active_users.show(50)

# Print the number of active users
print("Number of active users: ", daily_active_users.count())

+----------------+------------------+-----------+
|user_screen_name|       user_id_str|active_days|
+----------------+------------------+-----------+
|   Gentlemen_Sam|         441572163|         16|
|       SkullyRox|          20391647|          5|
|  whisperandmoan|         113516042|          5|
|  sky_regenrated|         483059773|         11|
|          MtnrMS|        3309102108|          5|
|        remakoul|814372928695521280|          5|
| roxyunderwood93|         295334669|          5|
|        TCMuffin|          35591749|          5|
|        trallyus|          11702402|          6|
|     ssagardutta|        2991490598|          7|
|  ChaurasiaRohin|706032993794527232|          5|
|  tarunwadhwa198|         130872938|          5|
|       basusagar|          42205044|         13|
|      Whitjoseph|         148495262|          6|
| Savvyy_Investor|758744557315993600|          6|
|   LadySlaughter|          15195593|          5|
|         i_opner|        4189195819|          5|


**Step 2:** A company would like to conduct an A/B test on Twitter. The experiment.txt file includes the user_id_str they selected as potential experiment targets. Please create a dataframe “experiment_user” to document the selected user id and whether they are active users (join the dataframe from step 1). Then calculate the percentage of active user and print out the result.

In [45]:
# Load the experiment data
experiment_df = spark.read.text("experiment.txt").toDF("user_id_str")

In [46]:
# Create a dataframe to document the selected user id and whether they are active users
experiment_user = experiment_df.join(daily_active_users, on="user_id_str", how="left")

In [47]:
# Create a new column 'whether_active' to indicate whether the user is active
experiment_user = experiment_user.withColumn("whether_active", when(col("active_days").isNull(), "no").otherwise("yes"))

In [48]:
# Shows only desired columns ("user_id_str" and "whether_active")
experiment_user = experiment_user.drop("user_screen_name", "active_days")

In [49]:
experiment_user.show(50)

+-----------+--------------+
|user_id_str|whether_active|
+-----------+--------------+
|  163413904|            no|
|   60004557|            no|
|  106799492|            no|
|  300526090|            no|
| 2610379644|            no|
|   62364020|            no|
| 1112166661|            no|
|  193060294|            no|
|   24787056|            no|
|  123138418|            no|
|  281666367|            no|
| 2908108256|            no|
|  143515471|            no|
| 1970607968|            no|
|  253242925|            no|
|  152122529|            no|
|   71098947|            no|
| 3285473358|           yes|
|  315105182|            no|
|   97424433|            no|
| 2706101936|            no|
| 1510968974|            no|
|   16980347|            no|
|  110354554|           yes|
|   22830586|            no|
|  174156582|            no|
|  356667027|            no|
| 2199664468|            no|
| 2502172122|            no|
|  243716471|            no|
|   59156981|            no|
| 2789430008| 

In [50]:
# Calculate the percentage of active users
active_users_count = experiment_user.filter(experiment_user.whether_active == "yes").count()
total_users_count = experiment_user.count()
percentage_active = (active_users_count / total_users_count) * 100

In [51]:
# Print out the result
print("Percentage of active users: ", percentage_active)

Percentage of active users:  2.42


**Step 3:** The company provided their revised experiment target list in final_experiment.csv file. Compared with the former experiment.txt file, they removed several users and added a new column “info” to indicate whether the user is female (F) or male (M). Fill in the remaining columns by joining the dataframes you got from step 1&2 together and save the result in a dataframe “final_experiment”, and describe your join steps briefly.

In [52]:
# Load the final_experiment.csv file
final_experiment_df = spark.read.csv("final_experiment.csv", header=True)

In [53]:
final_experiment_df.show()
final_experiment_df.count()

+-----------+----+--------------+----------------+
|user_id_str|info|whether_active|user_screen_name|
+-----------+----+--------------+----------------+
|   62364020|   F|          NULL|            NULL|
| 2706101936|   M|          NULL|            NULL|
|    5654472|   F|          NULL|            NULL|
|  145579921|   F|          NULL|            NULL|
| 2502172122|   M|          NULL|            NULL|
|  243716471|   F|          NULL|            NULL|
| 2610379644|   M|          NULL|            NULL|
|  123138418|   M|          NULL|            NULL|
|  257376764|   F|          NULL|            NULL|
|  269145593|   M|          NULL|            NULL|
|  370711133|   F|          NULL|            NULL|
| 1510968974|   F|          NULL|            NULL|
| 3526380922|   M|          NULL|            NULL|
|  163413904|   F|          NULL|            NULL|
|   16980347|   M|          NULL|            NULL|
| 1209614366|   M|          NULL|            NULL|
|  447433286|   F|          NUL

4500

In [54]:
#Drop the columns "whether_active" and "user_screen_name" which will be added back later after joining other tables
final_experiment_df = final_experiment_df.drop(final_experiment_df.whether_active, final_experiment_df.user_screen_name)

In [55]:
final_experiment_df.show()
final_experiment_df.count()

+-----------+----+
|user_id_str|info|
+-----------+----+
|   62364020|   F|
| 2706101936|   M|
|    5654472|   F|
|  145579921|   F|
| 2502172122|   M|
|  243716471|   F|
| 2610379644|   M|
|  123138418|   M|
|  257376764|   F|
|  269145593|   M|
|  370711133|   F|
| 1510968974|   F|
| 3526380922|   M|
|  163413904|   F|
|   16980347|   M|
| 1209614366|   M|
|  447433286|   F|
| 1970607968|   M|
| 3285473358|   F|
| 1112166661|   F|
+-----------+----+
only showing top 20 rows



4500

In [59]:
# Create a new table by joining target users in "final_experiment_df" and "whether_active" from "experiment_user"
df_Temp = final_experiment_df.join(experiment_user, on="user_id_str", how="left")

df_Temp.show()
df_Temp.count()

+-----------+----+--------------+
|user_id_str|info|whether_active|
+-----------+----+--------------+
| 1112166661|   F|            no|
| 1209614366|   M|            no|
|  123138418|   M|            no|
|  145579921|   F|            no|
| 1510968974|   F|            no|
|  163413904|   F|            no|
|   16980347|   M|            no|
| 1970607968|   M|            no|
|  243716471|   F|            no|
| 2502172122|   M|            no|
|  257376764|   F|            no|
| 2610379644|   M|            no|
|  269145593|   M|            no|
| 2706101936|   M|            no|
| 3285473358|   F|           yes|
| 3526380922|   M|            no|
|  370711133|   F|            no|
|  422175328|   M|            no|
|  447433286|   F|            no|
|    5654472|   F|            no|
+-----------+----+--------------+
only showing top 20 rows



4500

In [60]:
# Use outer join to make sure all of the new targets are included
# We'll filter out the removed targets from experiment_users (whose "info" is NULL)
df_Temp = df_Temp.join(daily_active_users.select("user_id_str", "user_screen_name"), on="user_id_str", how="outer")

df_Temp.show()
df_Temp.count()

+-----------+----+--------------+----------------+
|user_id_str|info|whether_active|user_screen_name|
+-----------+----+--------------+----------------+
| 1000196282|   F|            no|            NULL|
|  100049952|   M|            no|            NULL|
|  100054985|   F|            no|            NULL|
| 1001811912|   F|            no|            NULL|
| 1002018746|   F|            no|            NULL|
|  100209462|   M|            no|            NULL|
|  100214244|NULL|          NULL| sourabhnandwana|
| 1002186955|   F|            no|            NULL|
|  100339412|   M|           yes|     bigcomedown|
|  100414514|   F|            no|            NULL|
|  100468594|   F|           yes|           opjha|
|  100492517|   M|            no|            NULL|
|  100523546|   F|            no|            NULL|
|  100564102|   F|            no|            NULL|
|  100613504|NULL|          NULL|          sng628|
|  100736255|   F|            no|            NULL|
| 1008060450|   M|           ye

4997

In [61]:
# For inactive users that cannot be found in “daily_active_users”, “user_screen_name” will be filled with “Not found”
df_Temp = df_Temp.withColumn("user_screen_name", F.when(F.col("whether_active") == "no", "Not found").otherwise(F.col("user_screen_name")))

# Drop rows where 'info' column is NULL
df_Temp = df_Temp.filter(df_Temp.info.isNotNull())

df_Temp.show()
df_Temp.count()

+-----------+----+--------------+----------------+
|user_id_str|info|whether_active|user_screen_name|
+-----------+----+--------------+----------------+
| 1112166661|   F|            no|       Not found|
| 1209614366|   M|            no|       Not found|
|  123138418|   M|            no|       Not found|
|  145579921|   F|            no|       Not found|
| 1510968974|   F|            no|       Not found|
|  163413904|   F|            no|       Not found|
|   16980347|   M|            no|       Not found|
| 1970607968|   M|            no|       Not found|
|  243716471|   F|            no|       Not found|
| 2502172122|   M|            no|       Not found|
|  257376764|   F|            no|       Not found|
| 2610379644|   M|            no|       Not found|
|  269145593|   M|            no|       Not found|
| 2706101936|   M|            no|       Not found|
| 3285473358|   F|           yes|    iwritegarima|
| 3526380922|   M|            no|       Not found|
|  370711133|   F|            n

4500

In [None]:
# Save the three dataframe as CSV files
daily_active_users.write.csv('daily_active_users.csv', header=True)

experiment_user.write.csv('experiment_user.csv', header=True)

final_experiment_df.write.csv('Final_experiment.csv', header=True)
