In [1]:
import io
import csv
import pandas as pd
from pyspark import *
from pyspark import SparkContext as sc, SparkConf
from pyspark.python.pyspark.shell import spark
from pyspark.sql.functions import *
from pyspark.sql.session import SparkSession
from pyspark.context import SparkContext
from datetime import datetime
from pyspark.sql.functions import udf, to_date, to_utc_timestamp, lit, col
from pyspark.sql.types import StringType, DateType

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.0.1
      /_/

Using Python version 3.8.3 (default, Jul  2 2020 17:30:36)
SparkSession available as 'spark'.


Loading Sparkcontext and SQL context

In [4]:
from pyspark import SparkContext
import pyspark.sql.functions
from pyspark.sql import SQLContext
import pandas as pd
from pyspark.sql import SparkSession

In [5]:
sc = SparkContext.getOrCreate()

In [6]:
sql_con = SQLContext(sc)

In [7]:
df = sql_con.read.csv('Amazon_Responded_Oct05.csv', header=True)
df_amzn = df

In [8]:
df = df.select('tweet_created_at', 'user_screen_name', 'user_id_str')
df.show(10)

+--------------------+----------------+-----------+
|    tweet_created_at|user_screen_name|user_id_str|
+--------------------+----------------+-----------+
|Tue Nov 01 01:57:...|     SeanEPanjab|  143515471|
|Tue Nov 01 02:39:...|      AmazonHelp|   85741735|
|Tue Nov 01 17:14:...|     SeanEPanjab|  143515471|
|Tue Nov 01 17:15:...|     SeanEPanjab|  143515471|
|Tue Nov 01 17:19:...|      AmazonHelp|   85741735|
|Tue Nov 01 17:25:...|      AmazonHelp|   85741735|
|Tue Nov 01 17:55:...|     SeanEPanjab|  143515471|
|Tue Nov 01 17:55:...|     SeanEPanjab|  143515471|
|Tue Nov 01 18:02:...|      AmazonHelp|   85741735|
|Tue Nov 01 03:51:...|   aakashwangnoo|   71457972|
+--------------------+----------------+-----------+
only showing top 10 rows



In [9]:
from pyspark.sql.functions import *

Task 1 -  Finding out users who are active in at least five listed days

In [10]:
date_split = pyspark.sql.functions.split(df['tweet_created_at'],' ') 
sql_func = pyspark.sql.functions 
df = df.withColumn('Month', date_split.getItem(1))
df = df.withColumn('Day',date_split.getItem(2))
df = df.withColumn('year',date_split.getItem(5))
df = df.withColumn('Date',sql_func.concat('Month','Day',lit(" "),'year'))

In [11]:
df = df.select('tweet_created_at','user_screen_name','user_id_str','Date')
df.show(5)

+--------------------+----------------+-----------+----------+
|    tweet_created_at|user_screen_name|user_id_str|      Date|
+--------------------+----------------+-----------+----------+
|Tue Nov 01 01:57:...|     SeanEPanjab|  143515471|Nov01 2016|
|Tue Nov 01 02:39:...|      AmazonHelp|   85741735|Nov01 2016|
|Tue Nov 01 17:14:...|     SeanEPanjab|  143515471|Nov01 2016|
|Tue Nov 01 17:15:...|     SeanEPanjab|  143515471|Nov01 2016|
|Tue Nov 01 17:19:...|      AmazonHelp|   85741735|Nov01 2016|
+--------------------+----------------+-----------+----------+
only showing top 5 rows



In [12]:
filter_cols = df.select('user_id_str','user_screen_name','Date').groupBy("user_id_str","user_screen_name").agg(countDistinct("Date").alias('count')).filter(column('count') >= 5)

In [14]:
daily_active_users = filter_cols.select('user_screen_name','user_id_str')
daily_active_users.show(10)

+----------------+-----------+
|user_screen_name|user_id_str|
+----------------+-----------+
|      jeujeubeee| 2214869341|
|        nadu_bda| 2219985456|
|  jordanwaring87|   47584774|
|          sng628|  100613504|
|    LalwaniNiraj| 1666520694|
|   urvashi_mitra|  527489415|
| roadshowrigolet|  630450707|
|   ItsJustMe_Rae| 3009368979|
|     bijender393|  903272676|
|   DadaSiddharth|  359869433|
+----------------+-----------+
only showing top 10 rows



Task 2 - Creating a datafram 'experiment_user' to capture the selected user_id_str and see if they are active users

In [15]:
new_list = []
expt = open('experiment', "r")
for item in expt:
   new_list.append(item.rstrip("\n"))

In [16]:
df_exp = sql_con.createDataFrame(new_list,StringType())
df_exp.show(5)

+----------+
|     value|
+----------+
| 143515471|
|  85741735|
|  71457972|
|2908108256|
| 106799492|
+----------+
only showing top 5 rows



Creating temporary table/views
----------------------------------------------------

table 1 - dataframe consisting of daily active users
table 2 - dataframe consisting of list of users in the experiment.txt file
table 3 - dataframe consisting of active users among the ones chosen for the AB-test experiment (created later)
table 4 - original dataframe of Amazon_Responded_Oct05.csv file (created later)

In [17]:
daily_active_users.createOrReplaceTempView('table1')
df_exp.createOrReplaceTempView('table2')

In [18]:
exp_user = spark.sql("SELECT value from table2 where value in (SELECT user_id_str FROM table1)")
exp_user = exp_user.withColumn("Whether_active", lit("Yes"))
exp_user.show(10)

In [19]:
#Display the active users from the experiment file 
exp_user.show(10)

+----------+--------------+
|     value|Whether_active|
+----------+--------------+
|  85741735|           Yes|
| 902137872|           Yes|
| 110354554|           Yes|
|3285473358|           Yes|
|1399965709|           Yes|
|  23019151|           Yes|
|2698629504|           Yes|
|  71302070|           Yes|
| 180361172|           Yes|
| 493941380|           Yes|
+----------+--------------+
only showing top 10 rows



Calculating the percentage of active users in the new file

In [22]:
total = df_amzn.count() #total users
active_ones = exp_user.count() #active users in those chosen for the experiment

#compute percentage of active users
percentage = active_ones/total * 100

In [25]:
print("Total percentage of active users: " , percentage)

Total percentage of active users:  2.3004600920184037


Task 3 - Create a new dataframe by joining the three tables

In [26]:
#Create another temporary table with results
exp_user.createOrReplaceTempView('table3')

#create temporary table for the original Amazon_Responded_Oct05.csv file dataset file
df_amzn.createOrReplaceTempView('table4')

Joining the table1, table2 and table 4

In [27]:
final_join = spark.sql("SELECT * FROM table4 where user_id_str in (Select user_id_str FROM table1) AND user_id_str in (SELECT value FROM table2)")

In [28]:
#creating the resulting dataframe
output_file = final_join.toPandas()
output_file.to_csv('Amazon_new.csv')