In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.master("local[*]").getOrCreate()
data = spark.read.csv('Amazon_Responded_Oct05.csv',header = True) # Importing the dataset as a csv

In [4]:
data.show(5)

+--------------------+--------------------+----------------+-----------+-------------------+---------------------+--------------+-----------------+--------------+--------------------+-------------+-------------+--------------------+------------------+--------------------+--------------+--------------------+--------------+---------+-----------------------+-------------------------+-----------------------+-------------+---------+--------------------+
|              id_str|    tweet_created_at|user_screen_name|user_id_str|user_statuses_count|user_favourites_count|user_protected|user_listed_count|user_following|    user_description|user_location|user_verified|user_followers_count|user_friends_count|     user_created_at|tweet_language|               text_|favorite_count|favorited|in_reply_to_screen_name|in_reply_to_status_id_str|in_reply_to_user_id_str|retweet_count|retweeted|                text|
+--------------------+--------------------+----------------+-----------+-------------------+--

In [5]:
data_new = data[['tweet_created_at','user_screen_name','user_id_str']]

In [6]:
data_new # this is Amazon_responded_oct5

DataFrame[tweet_created_at: string, user_screen_name: string, user_id_str: string]

In [7]:
data_new.first()

Row(tweet_created_at='Tue Nov 01 01:57:25 +0000 2016', user_screen_name='SeanEPanjab', user_id_str='143515471')

In [8]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(spark.sparkContext)

In [9]:
from pyspark.sql.types import StringType
from pyspark.sql.types import StructType
from pyspark.sql.functions import udf

In [10]:
def strip(s):
    return(str(s)[0:10])

In [11]:
strip_udf = udf(strip,StringType())

In [12]:
dat = data_new.withColumn('tweet_created', strip_udf(data_new['tweet_created_at']))

In [13]:
dat.show(10)

+--------------------+----------------+-----------+-------------+
|    tweet_created_at|user_screen_name|user_id_str|tweet_created|
+--------------------+----------------+-----------+-------------+
|Tue Nov 01 01:57:...|     SeanEPanjab|  143515471|   Tue Nov 01|
|Tue Nov 01 02:39:...|      AmazonHelp|   85741735|   Tue Nov 01|
|Tue Nov 01 17:14:...|     SeanEPanjab|  143515471|   Tue Nov 01|
|Tue Nov 01 17:15:...|     SeanEPanjab|  143515471|   Tue Nov 01|
|Tue Nov 01 17:19:...|      AmazonHelp|   85741735|   Tue Nov 01|
|Tue Nov 01 17:25:...|      AmazonHelp|   85741735|   Tue Nov 01|
|Tue Nov 01 17:55:...|     SeanEPanjab|  143515471|   Tue Nov 01|
|Tue Nov 01 17:55:...|     SeanEPanjab|  143515471|   Tue Nov 01|
|Tue Nov 01 18:02:...|      AmazonHelp|   85741735|   Tue Nov 01|
|Tue Nov 01 03:51:...|   aakashwangnoo|   71457972|   Tue Nov 01|
+--------------------+----------------+-----------+-------------+
only showing top 10 rows



In [14]:
sqlContext.registerDataFrameAsTable(dat, "table1")

In [80]:
daily_active_users = sqlContext.sql("Select user_id_str,user_screen_name,count(DISTINCT tweet_created) as count from table1 group by user_id_str,user_screen_name HAVING count >=5 order by count DESC")              

In [81]:
daily_active_users.show(15) # this is daily user

+------------------+----------------+-----+
|       user_id_str|user_screen_name|count|
+------------------+----------------+-----+
|             False|               0|  561|
|              null|            null|  525|
|          85741735|      AmazonHelp|  279|
|             False|               1|   51|
|             False|            null|   47|
|          54704557| SthrnDixieCwgrl|   27|
|          15413409|   theonetruebix|   19|
|              null|           False|   18|
|                75|           False|   17|
|         133804026|  RichPierce0079|   16|
|         362763757| BillyTheBigBone|   16|
|         441572163|   Gentlemen_Sam|   16|
|                57|           False|   15|
|         587376186|    Localboy_Ash|   14|
|720329744726687744|          saj44a|   14|
+------------------+----------------+-----+
only showing top 15 rows



In [82]:
daily_active_users.collect()

[Row(user_id_str='False', user_screen_name='0', count=561),
 Row(user_id_str=None, user_screen_name=None, count=525),
 Row(user_id_str='85741735', user_screen_name='AmazonHelp', count=279),
 Row(user_id_str='False', user_screen_name='1', count=51),
 Row(user_id_str='False', user_screen_name=None, count=47),
 Row(user_id_str='54704557', user_screen_name='SthrnDixieCwgrl', count=27),
 Row(user_id_str='15413409', user_screen_name='theonetruebix', count=19),
 Row(user_id_str=None, user_screen_name='False', count=18),
 Row(user_id_str='75', user_screen_name='False', count=17),
 Row(user_id_str='133804026', user_screen_name='RichPierce0079', count=16),
 Row(user_id_str='362763757', user_screen_name='BillyTheBigBone', count=16),
 Row(user_id_str='441572163', user_screen_name='Gentlemen_Sam', count=16),
 Row(user_id_str='57', user_screen_name='False', count=15),
 Row(user_id_str='14', user_screen_name='False', count=14),
 Row(user_id_str='293447041', user_screen_name='Eva250178', count=14),
 R

In [17]:
AB_Test_id = spark.sparkContext.textFile('experiment.txt')

In [18]:
type(AB_Test_id)

pyspark.rdd.RDD

In [19]:
from pyspark.sql import Row
row = Row("user_id_str")

In [20]:
AB_Test_Dataframe = AB_Test_id.map(row).toDF()

In [21]:
AB_Test_Dataframe

DataFrame[user_id_str: string]

In [22]:
AB_Test_Dataframe.first()

Row(user_id_str='143515471')

In [23]:
AB_Test_Dataframe.show(5)

+-----------+
|user_id_str|
+-----------+
|  143515471|
|   85741735|
|   71457972|
| 2908108256|
|  106799492|
+-----------+
only showing top 5 rows



In [24]:
from pyspark.sql.functions import lit

In [25]:
experiment_user_dummy = AB_Test_Dataframe.withColumn('Whether_active',lit(None).cast(StringType()))

In [36]:
experiment_user_dummy.count()

5000

In [26]:
experiment_user_dummy.first()

Row(user_id_str='143515471', Whether_active=None)

In [27]:
sqlContext.registerDataFrameAsTable(experiment_user_dummy, "table_1")
sqlContext.registerDataFrameAsTable(df1, "table_2")

In [29]:
ans = sqlContext.sql("Select t1.user_id_str,t1.Whether_active,t2.user_id_str as other,t2.user_screen_name from table_1 as t1  left join table_2 as t2 on t1.user_id_str = t2.user_id_str")              

In [37]:
len(ans.collect())

5000

In [31]:
sqlContext.registerDataFrameAsTable(ans, "table")

In [83]:
experiment_user = sqlContext.sql('Select user_id_str, CASE when other is NULL then \'NO \' else \'YES \' END as whether_active from table')

In [84]:
experiment_user.collect()

[Row(user_id_str='143515471', whether_active='NO '),
 Row(user_id_str='85741735', whether_active='YES '),
 Row(user_id_str='71457972', whether_active='NO '),
 Row(user_id_str='2908108256', whether_active='NO '),
 Row(user_id_str='106799492', whether_active='NO '),
 Row(user_id_str='59156981', whether_active='NO '),
 Row(user_id_str='902137872', whether_active='YES '),
 Row(user_id_str='110354554', whether_active='YES '),
 Row(user_id_str='97424433', whether_active='NO '),
 Row(user_id_str='62364020', whether_active='NO '),
 Row(user_id_str='2706101936', whether_active='NO '),
 Row(user_id_str='5654472', whether_active='NO '),
 Row(user_id_str='145579921', whether_active='NO '),
 Row(user_id_str='2502172122', whether_active='NO '),
 Row(user_id_str='243716471', whether_active='NO '),
 Row(user_id_str='2610379644', whether_active='NO '),
 Row(user_id_str='123138418', whether_active='NO '),
 Row(user_id_str='257376764', whether_active='NO '),
 Row(user_id_str='269145593', whether_active='

In [86]:
yes = experiment_user.filter(experiment_user.whether_active.contains('YES')).count()
no = experiment_user.filter(experiment_user.whether_active.contains('NO')).count()

In [87]:
percentage_of_active_users = (yes/(yes+no))*100

In [88]:
percentage_of_active_users

2.42

In [43]:
ans2 = sqlContext.sql("Select t1.user_id_str,t2.user_screen_name from table_1 as t1 join table_2 as t2 on t1.user_id_str = t2.user_id_str")

In [45]:
ans2.show(5)

+-----------+----------------+
|user_id_str|user_screen_name|
+-----------+----------------+
|   18435372|          medtek|
| 1387267123|      TheMimiZee|
|  267381861|      SparklyPen|
|  885336408|    J_cooper1990|
|  213060183|         ke4ole1|
+-----------+----------------+
only showing top 5 rows



In [90]:
sqlContext.registerDataFrameAsTable(data, "tab1")
sqlContext.registerDataFrameAsTable(daily_active_users, "tab2")
sqlContext.registerDataFrameAsTable(experiment_user,"tab3")

In [108]:
ans_1 = sqlContext.sql("Select t1.*  from tab1 as t1 inner join tab2 as t2 on t1.user_id_str = t2.user_id_str inner join tab3 as t3 on t1.user_id_str = t3.user_id_str")

In [109]:
x = ans_1.collect()

In [112]:
import pandas as pd

In [119]:
columns = [entry for entry in data.columns]

In [120]:
data_final = pd.DataFrame(x,columns = columns)

In [122]:
data_final.to_csv('Amazon_new.csv')