## Assignment 3 - Q2: Analyzing Social Networks

In [0]:
# Dataset Information
# Dataset Title: Social circles: Twitter
# Dataset Source: Stanford University
# Dataset Link: https://snap.stanford.edu/data/ego-Twitter.html

In [0]:
# Supporting Libraries in Cluster
"""
com.databricks:spark-xml_2.12:0.15.0 --- Maven
com.microsoft.azure.kusto:spark-kusto-connector:2.0.0 --- Maven
graphframes:graphframes:0.8.3-spark3.5-s_2.12 --- Maven
"""

Out[45]: '\ncom.databricks:spark-xml_2.12:0.15.0 --- Maven\ncom.microsoft.azure.kusto:spark-kusto-connector:2.0.0 --- Maven\ngraphframes:graphframes:0.8.3-spark3.5-s_2.12 --- Maven\n'

In [0]:
%sh  /databricks/python3/bin/pip install graphframes

You should consider upgrading via the '/databricks/python3/bin/python -m pip install --upgrade pip' command.


In [0]:
#Importing all the libraries

from graphframes import *
from pyspark.sql.functions import desc
from pyspark.sql.functions import col
from pyspark.sql import SparkSession

In [0]:
# Define input file path for the Twitter dataset
inputPath = "dbfs:/FileStore/twitter/twitter_c.csv"

# Define output file path for query results
outputPath = "/FileStore/output/"


In [0]:
# Initialize a Spark session
notebookTitle = "Social Network Analysis - Twitter"
spark = SparkSession.builder.appName(notebookTitle).getOrCreate()

# Load the dataset as a DataFrame
twitterEdges = spark.read.option("header", "true").option("inferSchema", "true").option("delimiter", " ").csv(inputPath)

# Display the loaded DataFrame
display(twitterEdges)

# Rename columns for clarity
twitterEdges = twitterEdges.select(col("SourceNode").alias("src"), col("DestinationNode").alias("dst"))


SourceNode,DestinationNode
214328887,34428380
17116707,28465635
380580781,18996905
221036078,153460275
107830991,17868918
151338729,222261763
19705747,34428380
222261763,88323281
19933035,149538028
158419434,17434613


In [0]:
# Extract distinct vertices from the edges
twitterVertices = twitterEdges.select(col("src").alias("id")).union(twitterEdges.select(col("dst").alias("id"))).distinct()

# Display the vertices DataFrame
display(twitterVertices)


id
451250774
7453822
59877557
8207762
1048661
10733192
525208028
219990849
110756260
197134784


In [0]:
# Create a GraphFrame object using the vertices and edges
twitterGraph = GraphFrame(twitterVertices, twitterEdges)

A) Find the top 5 nodes with the highest outdegree and find the count of the number of outgoing edges in each

In [0]:
# Calculate the out-degree for each vertex and sort in descending order
topOutDegree = twitterGraph.outDegrees.orderBy(desc("outDegree")).take(5)

# Convert the result into a DataFrame
topOutDegreeDF = spark.createDataFrame(topOutDegree)

# Save the out-degree results to a CSV file
topOutDegreeDF.write.mode("overwrite").option("header", "true").csv(outputPath + "QueryResults/QueryA")

# Display the top 5 vertices by out-degree
display(topOutDegreeDF)


id,outDegree
3359851,3373
59804598,2467
7860742,2458
18776017,2272
5442012,2204


B) Find the top 5 nodes with the highest indegree and find the count of the number of incoming edges in each

In [0]:
# Calculate the in-degree for each vertex and sort in descending order
topInDegree = twitterGraph.inDegrees.orderBy(desc("inDegree")).take(5)

# Convert the result into a DataFrame
topInDegreeDF = spark.createDataFrame(topInDegree)

# Save the in-degree results to a CSV file
topInDegreeDF.write.mode("overwrite").option("header", "true").csv(outputPath + "QueryResults/QueryB")

# Display the top 5 vertices by in-degree
display(topInDegreeDF)


id,inDegree
40981798,8660
43003845,7700
22462180,7623
34428380,7558
115485051,4798


C) Calculate PageRank for each of the nodes and output the top 5 nodes with the highest PageRank values.

In [0]:
# Perform PageRank calculation
pageRankResults = twitterGraph.pageRank(resetProbability=0.15, maxIter=10)

# Sort vertices by PageRank score in descending order
sortedPageRank = pageRankResults.vertices.orderBy(desc("pagerank")).select("id", "pagerank").distinct()

# Collect the top 5 vertices with the highest PageRank scores
topPageRank = sortedPageRank.orderBy(desc("pagerank")).take(5)

# Convert the result into a DataFrame
topPageRankDF = spark.createDataFrame(topPageRank)

# Save the PageRank results to a CSV file
topPageRankDF.write.mode("overwrite").option("header", "true").csv(outputPath + "QueryResults/QueryC")

# Display the top 5 vertices by PageRank score
display(topPageRankDF)



id,pagerank
115485051,885.5551611027488
116485573,764.8054731886077
813286,215.89732089206936
40981798,210.165125651054
11348282,189.8209454115426


D) Run the connected components algorithm on it and find the top 5 components with the largest number of nodes
 

In [0]:
# Set the checkpoint directory for Spark
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")

# Generate a subgraph by sampling a fraction of the edges
sampledSubGraph = GraphFrame(twitterVertices, twitterEdges.sample(False, 0.1))

# Compute the connected components in the subgraph
connectedComponentsResults = sampledSubGraph.connectedComponents()

# Count the vertices in each connected component and sort in descending order
largestComponents = connectedComponentsResults.groupBy("component").count().orderBy(desc("count")).take(5)

# Convert the result into a DataFrame
largestComponentsDF = spark.createDataFrame(largestComponents)

# Save the connected components results to a CSV file
largestComponentsDF.write.mode("overwrite").option("header", "true").csv(outputPath + "QueryResults/QueryD")

# Display the top 5 largest connected components
display(largestComponentsDF)

component,count
12,56945
7583152,38
15648827,38
660633,33
15122345,20


E) Run the triangle counts algorithm on each of the vertices and output the top 5 vertices with the largest triangle count

In [0]:
# Perform triangle count for the graph
triangleCountResults = twitterGraph.triangleCount()

# Sort vertices by triangle count in descending order and select the top 5
topTriangleCounts = triangleCountResults.select("id", "count").distinct().orderBy(desc("count")).take(5)

# Convert the result into a DataFrame
topTriangleCountsDF = spark.createDataFrame(topTriangleCounts)

# Save the triangle count results to a CSV file
topTriangleCountsDF.write.mode("overwrite").option("header", "true").csv(outputPath + "QueryResults/QueryE")

# Display the top 5 vertices with the highest triangle counts
display(topTriangleCountsDF)



id,count
40981798,96815
43003845,87551
22462180,86930
34428380,86510
3359851,69416
