In [1]:
%%init_spark
launcher.jars = ["file:///home/jovyan/drivers/postgresql-42.2.10.jar"]
launcher.master = "local[*]"
launcher.conf.spark.app.name = "Twitter social graph analysis"
launcher.conf.spark.executor.cores = 8
launcher.driver_memory = '9g'

In [2]:
import org.apache.spark._
import org.apache.spark.rdd.RDD
// import classes required for using GraphX
import org.apache.spark.graphx._
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.functions._

Intitializing Scala interpreter ...

Spark Web UI available at http://575ad6b14266:4040
SparkContext available as 'sc' (version = 2.4.5, master = local[*], app id = local-1587548387477)
SparkSession available as 'spark'


import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.functions._


In [3]:
// Download followers from postgres into parquet
// spark.read
//   .format("jdbc")
//   .option("driver", "org.postgresql.Driver")
//   .option("url", "jdbc:postgresql://postgres:5432/twitter")
//   .option("dbtable", "followers")
//   .option("user", "crawler")
//   .option("password", "12345")
//   .load().write..option("maxRecordsPerFile", 1000000).parquet("../data/followers_all.parquet")

In [4]:
// // LOAD USER DATA FROM PG TO PARQUET
// spark.read
//   .format("jdbc")
//   .option("driver", "org.postgresql.Driver")
//   .option("url", "jdbc:postgresql://postgres:5432/twitter")
//   .option("dbtable", "(SELECT u.id, u.screen_name, u.are_followers_downloaded, 
//   COUNT(f.follower_id) as follower_count FROM users u 
//   JOIN followers f ON f.user_id=u.id GROUP BY u.id) as t")
//   .option("user", "crawler")
//   .option("password", "***")
//   .load().write.parquet("../data/user_data.parquet")

In [5]:
// // Convert csv followers subgraph to parquet

// val df_str = spark.read.csv("../data/subgraph_1197663692621590529_10000000.csv")
// .withColumnRenamed("_c0","user_id").withColumnRenamed("_c1", "follower_id")
// val df_to_write = df_str.select(
//     df_str("user_id").cast(LongType).as("user_id"), 
//     df_str("follower_id").cast(LongType).as("follower_id"))
// df_to_write.write.parquet("../data/followers_10M_connected.parquet")

In [3]:
val userData = spark.read.parquet("../data/user_data.parquet")

userData: org.apache.spark.sql.DataFrame = [id: bigint, screen_name: string ... 2 more fields]


In [4]:
val followers = spark.read.parquet("../data/followers_10M_connected.parquet").distinct()

followers: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [user_id: bigint, follower_id: bigint]


In [5]:
val uniqUsers = followers.groupBy("user_id").count()
.withColumnRenamed("count", "follower_appeared_count")

val userDataActual = userData
.join(uniqUsers, uniqUsers("user_id") === userData("id"))
.select("user_id", 
        "screen_name", 
        "are_followers_downloaded", 
        "follower_count", 
        "follower_appeared_count")

uniqUsers: org.apache.spark.sql.DataFrame = [user_id: bigint, follower_appeared_count: bigint]
userDataActual: org.apache.spark.sql.DataFrame = [user_id: bigint, screen_name: string ... 3 more fields]


In [9]:
userDataActual.show(5)

+------------------+---------------+------------------------+--------------+-----------------------+
|           user_id|    screen_name|are_followers_downloaded|follower_count|follower_appeared_count|
+------------------+---------------+------------------------+--------------+-----------------------+
|728147518307991552| shuliang_zhang|                    true|           149|                    149|
|        1732140697|Carolinavivald1|                    true|           307|                    307|
|         259226865|      mbassam76|                   false|          5399|                   5399|
|856191093465124865|SegueEUsigoVCtb|                    true|           568|                    568|
|926190528575737856|      IveticAca|                    true|             1|                      1|
+------------------+---------------+------------------------+--------------+-----------------------+
only showing top 5 rows



In [10]:
userDataActual.count()

res4: Long = 5708


In [6]:
val edgesDf = followers
.withColumnRenamed("user_id","dst")
.withColumnRenamed("follower_id","src")

edgesDf: org.apache.spark.sql.DataFrame = [dst: bigint, src: bigint]


In [7]:
val edgesRDD = edgesDf
.select("src", "dst")
.rdd.map(r => (r.getAs[Long]("src"), r.getAs[Long]("dst")))

edgesRDD: org.apache.spark.rdd.RDD[(Long, Long)] = MapPartitionsRDD[12] at map at <console>:39


In [8]:
val graph = Graph.fromEdgeTuples(edgesRDD, "nowhere")

graph: org.apache.spark.graphx.Graph[String,Int] = org.apache.spark.graphx.impl.GraphImpl@b8eb426


In [9]:
// Graph vertices count
graph.vertices.count()

res0: Long = 5277556


In [10]:
// Graph edges count
graph.edges.count()

res1: Long = 7106158


# Pagerank

In [14]:
val rankedGraph = graph.pageRank(0.0001)
// 30 minutes, 7 millions edges

rankedGraph: org.apache.spark.graphx.Graph[Double,Double] = org.apache.spark.graphx.impl.GraphImpl@435d99eb


In [15]:
rankedGraph.vertices.take(10)

res5: Array[(org.apache.spark.graphx.VertexId, Double)] = Array((2854236096,0.15000007733152665), (40968200,0.15000007733152665), (2761873296,0.15000007733152665), (1055688600,0.15000007733152665), (609840800,0.15000007733152665), (879358797868937218,0.15000007733152665), (2784717896,0.15000007733152665), (927295438474170369,0.15000007733152665), (894074822950400000,0.15000007733152665), (225862200,0.15000007733152665))


In [16]:
val rankedGraphDf = rankedGraph.vertices.toDF()
.withColumnRenamed("_1", "user_id")
.withColumnRenamed("_2", "pagerank")
rankedGraphDf.show(10)

+------------------+-------------------+
|           user_id|           pagerank|
+------------------+-------------------+
|        2854236096|0.15000007733152665|
|          40968200|0.15000007733152665|
|        2761873296|0.15000007733152665|
|        1055688600|0.15000007733152665|
|         609840800|0.15000007733152665|
|879358797868937218|0.15000007733152665|
|        2784717896|0.15000007733152665|
|927295438474170369|0.15000007733152665|
|894074822950400000|0.15000007733152665|
|         225862200|0.15000007733152665|
+------------------+-------------------+
only showing top 10 rows



rankedGraphDf: org.apache.spark.sql.DataFrame = [user_id: bigint, pagerank: double]


In [19]:
// Unique values of pagerank metrics
rankedGraphDf.select("pagerank").distinct().count()

res9: Long = 4473


In [17]:
val ranksByUsername = userDataActual.join(rankedGraphDf, "user_id")
ranksByUsername.show(10)

+---------+---------------+------------------------+--------------+-----------------------+-------------------+
|  user_id|    screen_name|are_followers_downloaded|follower_count|follower_appeared_count|           pagerank|
+---------+---------------+------------------------+--------------+-----------------------+-------------------+
|104366327|deepakdeepakdas|                    true|            36|                     36| 1.1360304345266532|
|158935649|        VJanett|                    true|            60|                     60|  7.322603206936806|
|220690950|     DoubleT852|                    true|            24|                     24|  3.114376605595821|
|224126627|      AleksAK16|                    true|             2|                      2|  0.405000208795122|
|243240673|   LaraBipolara|                    true|            61|                     61|  7.672503955507593|
|259226865|      mbassam76|                   false|          5399|                   5399|  619.6998035

ranksByUsername: org.apache.spark.sql.DataFrame = [user_id: bigint, screen_name: string ... 4 more fields]


In [18]:
ranksByUsername.sort(desc("pagerank"))
.where(col("are_followers_downloaded") === true)
.select("screen_name", "follower_count", 
        "follower_appeared_count", "pagerank").show(100)

+---------------+--------------+-----------------------+------------------+
|    screen_name|follower_count|follower_appeared_count|          pagerank|
+---------------+--------------+-----------------------+------------------+
| ARTEM_KLYUSHIN|        813889|                 813889| 830395.9704494262|
| MusicIndustryU|        104814|                 104814| 34943.57321738785|
|        cassmlk|          5685|                   5685| 32222.82505016166|
|        bkbells|          2258|                   2258|31498.460945402785|
|   ShasiaDeonna|         12654|                  12654|28912.415676576675|
|     JBVoteArmy|         15619|                  15619|27847.881485641916|
|         i4e___|           518|                    518|19949.929392740338|
|       ATHer323|         15461|                  15461| 19919.06548982774|
|      geomisati|         14404|                  14404|19723.226918197597|
|  LelisSilveira|         28494|                  28494| 17025.55067342751|
|  cagetheco

# Connected components

In [37]:
val components = graph.connectedComponents()

components: org.apache.spark.graphx.Graph[org.apache.spark.graphx.VertexId,Int] = org.apache.spark.graphx.impl.GraphImpl@49b04576


In [39]:
components.vertices.map{case(_,cc) => cc}.distinct().count()

res12: Long = 1


In [38]:
graph.vertices.leftJoin(components.vertices) {
    case (id, data, comp) => s"${id} is in component ${comp.get}"
}.take(5)

res11: Array[(org.apache.spark.graphx.VertexId, String)] = Array((4408653323,4408653323 is in component 13), (868201562656649216,868201562656649216 is in component 13), (4157346022,4157346022 is in component 13), (1540571352,1540571352 is in component 13), (2656663204,2656663204 is in component 13))


# Community detection

In [11]:
import org.apache.spark.graphx.lib.LabelPropagation

import org.apache.spark.graphx.lib.LabelPropagation


In [31]:
// 40 MINS

In [27]:
val communities = LabelPropagation.run(graph, 5)

communities: org.apache.spark.graphx.Graph[org.apache.spark.graphx.VertexId,Int] = org.apache.spark.graphx.impl.GraphImpl@3b498975


In [34]:
communities.vertices.count()

res18: Long = 5277556


In [36]:
// Unique communities count
communities.vertices.map{case(_,cc) => cc}.distinct().count()

res20: Long = 7417
