In [31]:
from pyspark.sql import Row
#TODO: Change the following directory to point to your data directory
dataDir = "/Users/RajT/Documents/Writing/SparkForBeginners/To-PACKTPUB/Contents/B05289-09-DesigningSparkApplications/Code/Data/"

In [32]:
#Load the user data into an RDD
userDataRDD = sc.textFile(dataDir + "user.txt").map(lambda line: line.split("|")).map(lambda p: Row(Id=int(p[0]), UserName=p[1], FirstName=p[2], LastName=p[3], EMail=p[4], AlternateEmail=p[5], Phone=p[6]))
#Convert the RDD into data frame
userDataDF = userDataRDD.toDF()
userDataDF.createOrReplaceTempView("user")
userDataDF.show()

+----------------+--------------------+---------+---+--------+--------------+--------+
|  AlternateEmail|               EMail|FirstName| Id|LastName|         Phone|UserName|
+----------------+--------------------+---------+---+--------+--------------+--------+
|mt12@example.com| mthomas@example.com|     Mark|  1|  Thomas|+4411860297701| mthomas|
| mit@example.com|mithomas@example.com|  Michael|  2|  Thomas|+4411860297702|mithomas|
| mtw@example.com|  mtwain@example.com|     Mark|  3|   Twain|+4411860297703|  mtwain|
|  th@example.com|  thardy@example.com|   Thomas|  4|   Hardy|+4411860297704|  thardy|
|  bb@example.com| wbryson@example.com|  William|  5|  Bryson|+4411860297705| wbryson|
|  wb@example.com|   wbrad@example.com|  William|  6|Bradford|+4411860297706|   wbrad|
|  eh@example.com| eharris@example.com|       Ed|  7|  Harris|+4411860297707| eharris|
|  tk@example.com|   tcook@example.com|   Thomas|  8|    Cook|+4411860297708|   tcook|
|  ar@example.com| arobert@example.com|    

In [33]:
#Load the follower data into an RDD
followerDataRDD = sc.textFile(dataDir + "follower.txt").map(lambda line: line.split("|")).map(lambda p: Row(Follower=p[0], Followed=p[1]))
#Convert the RDD into data frame
followerDataDF = followerDataRDD.toDF()
followerDataDF.createOrReplaceTempView("follow")
followerDataDF.show()

+--------+--------+
|Followed|Follower|
+--------+--------+
|mithomas| mthomas|
|  mtwain| mthomas|
| wbryson|  thardy|
| wbryson|   wbrad|
| mthomas| eharris|
|   tcook| eharris|
|  jjames| arobert|
+--------+--------+



In [34]:
#Load the message data into an RDD
messageDataRDD = sc.textFile(dataDir + "message.txt").map(lambda line: line.split("|")).map(lambda p: Row(UserName=p[0], MessageId=int(p[1]), ShortMessage=p[2], Timestamp=int(p[3])))
#Convert the RDD into data frame
messageDataDF = messageDataRDD.toDF()
messageDataDF.createOrReplaceTempView("message")
messageDataDF.show()

+---------+--------------------+----------+--------+
|MessageId|        ShortMessage| Timestamp|UserName|
+---------+--------------------+----------+--------+
|        1|@mithomas Your po...|1459009608| mthomas|
|        2|Feeling awesome t...|1459010608| mthomas|
|        3|My namesake in th...|1459010776|  mtwain|
|        4|Started the day w...|1459011016|  mtwain|
|        5|It is just spring...|1459011199|  thardy|
|        6|Some days are rea...|1459011256| wbryson|
|        7|@wbryson Stuff ha...|1459011333|   wbrad|
|        8|Anybody knows goo...|1459011426| eharris|
|        9|Stock market is p...|1459011483|   tcook|
|       10|Dont do day tradi...|1459011539|   tcook|
|       11|I have never hear...|1459011622|   tcook|
|       12|#Barcelona has pl...|1459157132|   wbrad|
|       13|@wbryson It is go...|1459164906|  mtwain|
+---------+--------------------+----------+--------+



In [35]:
#Create the purposed view of the message to users
messagetoUsersRDD = messageDataRDD.filter(lambda message: "@" in message.ShortMessage).map(lambda message : (message, " ".join(filter(lambda s: s[0] == '@', message.ShortMessage.split(" "))))).map(lambda msgTuple: Row(FromUserName=msgTuple[0].UserName, ToUserName=msgTuple[1][1:], MessageId=msgTuple[0].MessageId, ShortMessage=msgTuple[0].ShortMessage, Timestamp=msgTuple[0].Timestamp))
#Convert the RDD into data frame
messagetoUsersDF = messagetoUsersRDD.toDF()
messagetoUsersDF.createOrReplaceTempView("messageToUsers")
messagetoUsersDF.show()

+------------+---------+--------------------+----------+----------+
|FromUserName|MessageId|        ShortMessage| Timestamp|ToUserName|
+------------+---------+--------------------+----------+----------+
|     mthomas|        1|@mithomas Your po...|1459009608|  mithomas|
|       wbrad|        7|@wbryson Stuff ha...|1459011333|   wbryson|
|      mtwain|       13|@wbryson It is go...|1459164906|   wbryson|
+------------+---------+--------------------+----------+----------+



In [36]:
#Create the purposed view of tagged messages 
taggedMessageRDD = messageDataRDD.filter(lambda message: "#" in message.ShortMessage).map(lambda message : (message, " ".join(filter(lambda s: s[0] == '#', message.ShortMessage.split(" "))))).map(lambda msgTuple: Row(HashTag=msgTuple[1], UserName=msgTuple[0].UserName, MessageId=msgTuple[0].MessageId, ShortMessage=msgTuple[0].ShortMessage, Timestamp=msgTuple[0].Timestamp))
#Convert the RDD into data frame
taggedMessageDF = taggedMessageRDD.toDF()
taggedMessageDF.createOrReplaceTempView("taggedMessages")
taggedMessageDF.show()

+----------+---------+--------------------+----------+--------+
|   HashTag|MessageId|        ShortMessage| Timestamp|UserName|
+----------+---------+--------------------+----------+--------+
|#Barcelona|        8|Anybody knows goo...|1459011426| eharris|
|#Barcelona|       12|#Barcelona has pl...|1459157132|   wbrad|
+----------+---------+--------------------+----------+--------+



In [37]:
#The following are the queries given in the use cases
#Find the messages that are grouped by a given hash tag
byHashTag = spark.sql("SELECT a.UserName, b.FirstName, b.LastName, a.MessageId, a.ShortMessage, a.Timestamp FROM taggedMessages a, user b WHERE a.UserName = b.UserName AND HashTag = '#Barcelona' ORDER BY a.Timestamp")
byHashTag.show()

+--------+---------+--------+---------+--------------------+----------+
|UserName|FirstName|LastName|MessageId|        ShortMessage| Timestamp|
+--------+---------+--------+---------+--------------------+----------+
| eharris|       Ed|  Harris|        8|Anybody knows goo...|1459011426|
|   wbrad|  William|Bradford|       12|#Barcelona has pl...|1459157132|
+--------+---------+--------+---------+--------------------+----------+



In [38]:
#Find the messages that are addressed to a given user
byToUser = spark.sql("SELECT FromUserName, ToUserName, MessageId, ShortMessage, Timestamp FROM messageToUsers WHERE ToUserName = 'wbryson' ORDER BY Timestamp")
byToUser.show()

+------------+----------+---------+--------------------+----------+
|FromUserName|ToUserName|MessageId|        ShortMessage| Timestamp|
+------------+----------+---------+--------------------+----------+
|       wbrad|   wbryson|        7|@wbryson Stuff ha...|1459011333|
|      mtwain|   wbryson|       13|@wbryson It is go...|1459164906|
+------------+----------+---------+--------------------+----------+



In [39]:
#Find the followers of a given user
followers = spark.sql("SELECT b.FirstName as FollowerFirstName, b.LastName as FollowerLastName, a.Followed FROM follow a, user b WHERE a.Follower = b.UserName AND a.Followed = 'wbryson'")
followers.show()

+-----------------+----------------+--------+
|FollowerFirstName|FollowerLastName|Followed|
+-----------------+----------------+--------+
|          William|        Bradford| wbryson|
|           Thomas|           Hardy| wbryson|
+-----------------+----------------+--------+



In [40]:
#Find the followed users of a given user
followedUsers = spark.sql("SELECT b.FirstName as FollowedFirstName, b.LastName as FollowedLastName, a.Follower FROM follow a, user b WHERE a.Followed = b.UserName AND a.Follower = 'eharris'")
followedUsers.show()

+-----------------+----------------+--------+
|FollowedFirstName|FollowedLastName|Follower|
+-----------------+----------------+--------+
|           Thomas|            Cook| eharris|
|             Mark|          Thomas| eharris|
+-----------------+----------------+--------+

