##ETL with PySpark SQL

###HIGGS Tweeter Database

The Higgs dataset has been built after monitoring the spreading processes on Twitter before, during and after the announcement of the discovery of a new particle with the features of the elusive Higgs boson on 4th July 2012. The messages posted in Twitter about this discovery between 1st and 7th July 2012 are considered.

**The four directional networks made available here have been extracted from user activities in Twitter as:**

- re-tweeting (retweet network)
- replying (reply network) to existing tweets
- mentioning (mention network) other users
- friends/followers social relationships among user involved in the above activities
- information about activity on Twitter during the discovery of Higgs boson

It is worth remarking that the user IDs have been anonimized, and the same user ID is used for all networks. This choice allows to use the Higgs dataset in studies about large-scale interdependent/interconnected multiplex/multilayer networks, where one layer accounts for the social structure and three layers encode different types of user dynamics .

**Note that this dataset has been updated on Mar 31 2015. If you downloaded a previous version, please update it, results could differ.**

**Dataset Link:-** http://snap.stanford.edu/data/higgs-twitter.html

In [0]:
#loading the required modules

import sys
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *



####Directory & FilePath Setup

In [0]:
#setting the root directory where all files are stored

datasetDir = "/FileStore/tables/"

#loading the dataset in pyspark
socialDir = datasetDir + "higgs_social_network_edgelist.gz"
retweetDir = datasetDir + "higgs_retweet_network_edgelist.gz"
replyDir =   datasetDir + "higgs_reply_network_edgelist.gz"
mentionDir =  datasetDir + "higgs_mention_network_edgelist.gz"
activityDir =  datasetDir + "higgs_activity_time_txt.gz"


#Loading the Data in DataFrames

In [0]:
#Loading Social Data

# Second, it's recommended to specify the dataframe's schema to avoid spark calculate it
socialSchema = StructType([StructField("follower", IntegerType()), StructField("followed", IntegerType())])
socialDF = spark.read \
           .option("sep"," ") \
           .schema(socialSchema) \
           .csv(socialDir)
      
display(socialDF)

#Loading retweet Data

# Second, it's recommended to specify the dataframe's schema to avoid spark calculate it
retweetSchema = StructType([StructField("tweeter", IntegerType()), StructField("tweeted", IntegerType()), StructField("occur", IntegerType())])
retweetDF = spark.read \
           .option("sep"," ") \
           .schema(retweetSchema) \
           .csv(retweetDir)
      
display(retweetDF)


#Loading reply Data

# Second, it's recommended to specify the dataframe's schema to avoid spark calculate it
replySchema = StructType([StructField("replier", IntegerType()), StructField("replied", IntegerType()), StructField("occur", IntegerType())])
replyDF = spark.read \
           .option("sep"," ") \
           .schema(replySchema) \
           .csv(replyDir)
      
display(replyDF)



#Loading mention Data

# Second, it's recommended to specify the dataframe's schema to avoid spark calculate it
mentionSchema = StructType([StructField("mentioner", IntegerType()), StructField("mentioned", IntegerType()), StructField("occur", IntegerType())])
mentionDF = spark.read \
           .option("sep"," ") \
           .schema(mentionSchema) \
           .csv(mentionDir)
      
display(mentionDF)




#Loading activity Data

# Second, it's recommended to specify the dataframe's schema to avoid spark calculate it
activitySchema = StructType([StructField("userA", IntegerType()), \
                     StructField("userB", IntegerType()), \
                     StructField("timestamp", IntegerType()), \
                    StructField("interaction", StringType())])
                    #Interaction can be: RT (retweet), MT (mention) or RE (reply)
activityDF = spark.read \
           .option("sep"," ") \
           .schema(activitySchema) \
           .csv(activityDir)
      
display(activityDF.select('interaction').distinct())
display(retweetDF)


follower,followed
1,2
1,3
1,4
1,5
1,6
1,7
1,8
1,9
1,10
1,11


tweeter,tweeted,occur
298960,105232,1
95688,3393,1
353237,62217,1
4974,3571,1
241892,8,1
234866,60961,1
397808,171,1
235841,5795,1
233022,267152,1
110598,88,1


replier,replied,occur
161345,8614,1
428368,11792,1
77904,10701,1
124554,286277,1
194873,194873,1
341375,16460,1
436133,220,1
274148,274149,1
12866,22252,1
425029,35248,1


mentioner,mentioned,occur
316609,5011,1
439696,12389,1
60059,6929,1
161345,8614,1
137487,759,1
57587,107757,1
397696,6940,1
436988,71,1
43994,90976,1
124554,286277,1


interaction
RE
MT
RT


##Convert the Csv filesloaded in Parquet format and store them

In [0]:
#storage Path location

targetDir = "/user/harshrocking2911@gmail.com/"

socialDF.write.save(targetDir + "higgs-social_network.parquet")
retweetDF.write.save(targetDir + "higgs-retweet_network.parquet")
replyDF.write.save(targetDir + "higgs-reply_network.parquet")
mentionDF.write.save(targetDir + "higgs-mention_network.parquet")
activityDF.write.save(targetDir + "higgs-activity_time.parquet")


In [0]:
#retweetDF.rdd.getNumPartitions()

##Loading Parquest data in new Dataframes

In [0]:
socialDFpq = spark.read.load(targetDir + "higgs-social_network.parquet")
retweetDFpq = spark.read.load(targetDir + "higgs-retweet_network.parquet")
replyDFpq = spark.read.load(targetDir + "higgs-reply_network.parquet")
mentionDFpq = spark.read.load(targetDir + "higgs-mention_network.parquet")
activityDFpq = spark.read.load(targetDir + "higgs-activity_time.parquet")

###Some Dataframes Operation

In [0]:
socialDFpq.printSchema()
socialDFpq.schema

In [0]:
socialDFpq.show(5)
socialDFpq.take(5)
socialDFpq.limit(5).collect()

##Spark SQL Using Dataframes API

In [0]:
#users with most numbers of followers
socialDFpq.groupby('followed').agg(count('followed').alias('numFollowers')).orderBy(desc('numFollowers')).show(5)

#users with most number of mentioned
mentionDFpq.groupby('mentioned').agg(count('mentioned').alias('numMentioned')).orderBy(desc('numMentioned')).show(5)

#user who replied most
replyDFpq.groupby('replier').agg(count('replier').alias('numReplier')).orderBy(desc('numReplier')).show(5)

In [0]:
replyDFpq.where(col("replier") == 9021).count()

In [0]:
#find the number of mentioned for the top5  followers

top_5f = socialDFpq.groupby('followed').agg(count('followed').alias('numFollowers')).orderBy(desc('numFollowers')).limit(5)

#joining the data with mention DF

top_5f.join(mentionDFpq, top_5f.followed == mentionDFpq.mentioned )\
.groupby('followed','numFollowers'). \
agg(sum(mentionDFpq.occur)).alias("mentions"). \
orderBy(desc("numFollowers")).show(10)


###Spark SQL Using SQL Languages

In [0]:
# create temporary views so we can use SQL statements
socialDFpq.createOrReplaceTempView("social")
retweetDFpq.createOrReplaceTempView("retweet")
replyDFpq.createOrReplaceTempView("reply")
mentionDFpq.createOrReplaceTempView("mention")
activityDFpq.createOrReplaceTempView("activity")

In [0]:
#users with most numbers of followers
#socialDFpq.groupby('followed').agg(count('followed').alias('numFollowers')).orderBy(desc('numFollowers')).show(5)

spark.sql('select followed,  count(followed) as followers from social group by followed order by count(followed) desc' ).show(5)

#users with most number of mentioned
#mentionDFpq.groupby('mentioned').agg(count('mentioned').alias('numMentioned')).orderBy(desc('numMentioned')).show(5)
spark.sql('select mentioned,  count(occur) as numMentioned from mention group by mentioned order by count(occur) desc' ).show(5)

#user who replied most
#replyDFpq.groupby('replier').agg(count('replier').alias('numReplier')).orderBy(desc('numReplier')).show(5)
spark.sql('select replier,  count(replier) as numReplier from reply group by replier order by count(replier) desc' ).show(5)

In [0]:
# Of the top 5 followed users, how many mentions has each one?
spark.sql("""
select 5_top_f.followed, 5_top_f.followers, sum(m.occur) as mentions
    from 
        -- subquery that contains top 5 of followed users
        (select followed, count(follower) as followers from social group by followed order by followers desc limit 5) 5_top_f, 
        mention as m
    where 5_top_f.followed = m.mentioned
    group by 5_top_f.followed, followers
    order by followers desc
        """).show()

##Performance Testing

####GZIP Compressed CSV file vs Parquet file

In [0]:
%%time
# GZIP Compressed CSV
socialDF.groupBy("followed").agg(count("followed").alias("followers")).orderBy(desc("followers")).show(5)

In [0]:
%%time
# Parquet file
socialDFpq.groupBy("followed").agg(count("followed").alias("followers")).orderBy(desc("followers")).show(5)

###Cached DF vs not cached DF

#####This time we will cache the 2 previous dataframes (socialDF and socialDFpq) and see how faster is.

#####Note: The first time we run cached dataframes can be slower, but the next times they should run faster.

In [0]:
# cache dataframes
socialDF.cache()
socialDFpq.cache()

# remove from cache
#socialDF.unpersist()
#socialDFpq.unpersist()

In [0]:
%%time
# GZIP Compressed CSV
socialDF.groupBy("followed").agg(count("followed").alias("followers")).orderBy(desc("followers")).show(5)

In [0]:
%%time
# Parquet file (dataframe cached)
socialDFpq.groupBy("followed").agg(count("followed").alias("followers")).orderBy(desc("followers")).show(5)

###Project End