### Execute the cells in the order in which they are written

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.master("local[*]").getOrCreate()
data = spark.read.csv('Amazon_Responded_Oct05.csv',header = True) # Importing the dataset as a csv

In [4]:
data.show(5)

+--------------------+--------------------+----------------+-----------+-------------------+---------------------+--------------+-----------------+--------------+--------------------+-------------+-------------+--------------------+------------------+--------------------+--------------+--------------------+--------------+---------+-----------------------+-------------------------+-----------------------+-------------+---------+--------------------+
|              id_str|    tweet_created_at|user_screen_name|user_id_str|user_statuses_count|user_favourites_count|user_protected|user_listed_count|user_following|    user_description|user_location|user_verified|user_followers_count|user_friends_count|     user_created_at|tweet_language|               text_|favorite_count|favorited|in_reply_to_screen_name|in_reply_to_status_id_str|in_reply_to_user_id_str|retweet_count|retweeted|                text|
+--------------------+--------------------+----------------+-----------+-------------------+--

In [5]:
# Selecting only the three columns on which we will be working.
data_new = data[['tweet_created_at','user_screen_name','user_id_str']]

In [6]:
data_new

DataFrame[tweet_created_at: string, user_screen_name: string, user_id_str: string]

In [7]:
data_new.first()

Row(tweet_created_at='Tue Nov 01 01:57:25 +0000 2016', user_screen_name='SeanEPanjab', user_id_str='143515471')

In [8]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(spark.sparkContext)

In [9]:
from pyspark.sql.types import StringType
from pyspark.sql.types import StructType
from pyspark.sql.functions import udf

In [10]:
# Creating a function that takes in a column and extracts Day, Month and Date
def strip(s):
    return(str(s)[0:10])

In [11]:
# registering a function as an udf
strip_udf = udf(strip,StringType())

In [12]:
# Calling the udf
dat = data_new.withColumn('tweet_created', strip_udf(data_new['tweet_created_at']))

In [13]:
dat.show(10)

+--------------------+----------------+-----------+-------------+
|    tweet_created_at|user_screen_name|user_id_str|tweet_created|
+--------------------+----------------+-----------+-------------+
|Tue Nov 01 01:57:...|     SeanEPanjab|  143515471|   Tue Nov 01|
|Tue Nov 01 02:39:...|      AmazonHelp|   85741735|   Tue Nov 01|
|Tue Nov 01 17:14:...|     SeanEPanjab|  143515471|   Tue Nov 01|
|Tue Nov 01 17:15:...|     SeanEPanjab|  143515471|   Tue Nov 01|
|Tue Nov 01 17:19:...|      AmazonHelp|   85741735|   Tue Nov 01|
|Tue Nov 01 17:25:...|      AmazonHelp|   85741735|   Tue Nov 01|
|Tue Nov 01 17:55:...|     SeanEPanjab|  143515471|   Tue Nov 01|
|Tue Nov 01 17:55:...|     SeanEPanjab|  143515471|   Tue Nov 01|
|Tue Nov 01 18:02:...|      AmazonHelp|   85741735|   Tue Nov 01|
|Tue Nov 01 03:51:...|   aakashwangnoo|   71457972|   Tue Nov 01|
+--------------------+----------------+-----------+-------------+
only showing top 10 rows



### Task

### Step-1

In [14]:
# registering pyspark DataFrame as table
sqlContext.registerDataFrameAsTable(dat, "table1")

In [15]:
# Exexcuting SQL query to return daily active users.
daily_active_users = sqlContext.sql("Select user_id_str,user_screen_name from (Select user_id_str,user_screen_name,count(DISTINCT tweet_created) as count from table1 group by user_id_str,user_screen_name HAVING count >=5 order by count DESC)")    


In [16]:
daily_active_users.show(15)

+------------------+----------------+
|       user_id_str|user_screen_name|
+------------------+----------------+
|             False|               0|
|              null|            null|
|          85741735|      AmazonHelp|
|             False|               1|
|             False|            null|
|          54704557| SthrnDixieCwgrl|
|          15413409|   theonetruebix|
|              null|           False|
|                75|           False|
|         133804026|  RichPierce0079|
|         362763757| BillyTheBigBone|
|         441572163|   Gentlemen_Sam|
|                57|           False|
|         587376186|    Localboy_Ash|
|720329744726687744|          saj44a|
+------------------+----------------+
only showing top 15 rows



### Step-2

In [17]:
# Importing the experiment.txt file
AB_Test_id = spark.sparkContext.textFile('experiment.txt')

In [18]:
type(AB_Test_id)

pyspark.rdd.RDD

In [19]:
# Assigning column name to the id's
from pyspark.sql import Row
row = Row("user_id_str")

In [20]:
# Converting the rdd into Pyspark DataFrame
AB_Test_Dataframe = AB_Test_id.map(row).toDF()

In [21]:
AB_Test_Dataframe

DataFrame[user_id_str: string]

In [22]:
AB_Test_Dataframe.first()

Row(user_id_str='143515471')

In [23]:
AB_Test_Dataframe.show(5)

+-----------+
|user_id_str|
+-----------+
|  143515471|
|   85741735|
|   71457972|
| 2908108256|
|  106799492|
+-----------+
only showing top 5 rows



In [24]:
from pyspark.sql.functions import lit

In [25]:
# Inserting column(Whether_active) in the DataFrame and initializing it with none 
experiment_user_dummy = AB_Test_Dataframe.withColumn('Whether_active',lit(None).cast(StringType()))

In [26]:
experiment_user_dummy.count()

5000

In [27]:
# For the time being every entry in the DataFrame look like this.
experiment_user_dummy.first()

Row(user_id_str='143515471', Whether_active=None)

In [28]:
# registering the DataFrames as tables.
sqlContext.registerDataFrameAsTable(experiment_user_dummy, "table_1")
sqlContext.registerDataFrameAsTable(daily_active_users, "table_2")

In [29]:
# First performing the left join on both the tables and storing the result 
ans = sqlContext.sql("Select t1.user_id_str,t1.Whether_active,t2.user_id_str as other,t2.user_screen_name from table_1 as t1  left join table_2 as t2 on t1.user_id_str = t2.user_id_str")              

In [30]:
# Registering the DataFrame created above as table
sqlContext.registerDataFrameAsTable(ans, "table")

In [31]:
# Executing SQL query on the table created in the above step, to get whether the user is active or not
experiment_user = sqlContext.sql('Select user_id_str, CASE when other is NULL then \'NO \' else \'YES \' END as whether_active from table')

In [32]:
# Getting a count of Active and Non- Active users.
yes = experiment_user.filter(experiment_user.whether_active.contains('YES')).count()
no = experiment_user.filter(experiment_user.whether_active.contains('NO')).count()

In [33]:
# Calculating the percentage of active users
percentage_of_active_users = (yes/(yes+no))*100

In [34]:
percentage_of_active_users

2.42

### Step-3

In [35]:
# Registering DataFrames as tables
sqlContext.registerDataFrameAsTable(data, "tab1")
sqlContext.registerDataFrameAsTable(daily_active_users, "tab2")
sqlContext.registerDataFrameAsTable(experiment_user,"tab3")

In [36]:
# Executing the SQL join query on three tables
ans_1 = sqlContext.sql("Select t1.*  from tab1 as t1 inner join tab2 as t2 on t1.user_id_str = t2.user_id_str inner join tab3 as t3 on t1.user_id_str = t3.user_id_str")

In [37]:
x = ans_1.collect()

In [38]:
import pandas as pd

In [39]:
# gettting all the columns to append to the final DataFrame
columns = [entry for entry in data.columns]

In [40]:
# Creating the DataFrame
data_final = pd.DataFrame(x,columns = columns)

In [41]:
# Exporting the DataFrame as a csv
data_final.to_csv('Amazon_new.csv',index = False)