In [13]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz
!pip install -q findspark
!pip -q install findspark pyspark graphframes
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 pyspark-shell"

In [14]:
import findspark
findspark.init()
findspark.find()
from pyspark.sql import SparkSession


spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [15]:
from pyspark.sql.functions import *
from pyspark.sql import functions as f
from pyspark.sql.types import IntegerType, StringType, ArrayType
from pyspark.sql.functions import monotonically_increasing_id

from graphframes import GraphFrame

In [16]:
from google.colab import drive
drive.mount('/content/gdrive')
!ls "/content/gdrive/My Drive/Bigdata/final-exercises/data"
USERS_PATH = "/content/gdrive/My Drive/Bigdata/final-exercises/data/users.txt"
FOLLOWERS_PATH = "/content/gdrive/My Drive/Bigdata/final-exercises/data/followers.txt"

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).
followers.txt  foodmart.csv  iris.csv  movies.json  mushrooms.csv  users.txt


In [17]:
users_df = spark.read.load(USERS_PATH, format="csv", delimiter=",", inferSchema=True)
users_df = users_df.withColumnRenamed('_c0', 'id')\
                  .withColumnRenamed('_c1', 'account_name')\
                  .withColumnRenamed('_c2', 'username')
users_df.show()

+---+-------------+---------------+
| id| account_name|       username|
+---+-------------+---------------+
|  1|  BarackObama|   Barack Obama|
|  2|     ladygaga|Goddess of Love|
|  3|      jeresig|     John Resig|
|  4| justinbieber|  Justin Bieber|
|  6|matei_zaharia|  Matei Zaharia|
|  7|      odersky| Martin Odersky|
|  8|      anonsys|           null|
+---+-------------+---------------+



In [18]:
followers_df = spark.read.load(FOLLOWERS_PATH, format="csv", delimiter=" ", inferSchema=True)
followers_df = followers_df.withColumnRenamed('_c0', 'src')\
                  .withColumnRenamed('_c1', 'dst')
followers_df.show()

+---+---+
|src|dst|
+---+---+
|  2|  1|
|  4|  1|
|  1|  2|
|  6|  3|
|  7|  3|
|  7|  6|
|  6|  7|
|  3|  7|
+---+---+



In [19]:
graph = GraphFrame(users_df, followers_df)
print(graph.vertices.count())
print(graph.edges.count())
print(users_df.count())
print(followers_df.count())

7
8
7
8


In [20]:
graph.vertices.show(truncate=False)
graph.edges.show(truncate=False)

+---+-------------+---------------+
|id |account_name |username       |
+---+-------------+---------------+
|1  |BarackObama  |Barack Obama   |
|2  |ladygaga     |Goddess of Love|
|3  |jeresig      |John Resig     |
|4  |justinbieber |Justin Bieber  |
|6  |matei_zaharia|Matei Zaharia  |
|7  |odersky      |Martin Odersky |
|8  |anonsys      |null           |
+---+-------------+---------------+

+---+---+
|src|dst|
+---+---+
|2  |1  |
|4  |1  |
|1  |2  |
|6  |3  |
|7  |3  |
|7  |6  |
|6  |7  |
|3  |7  |
+---+---+



In [21]:
ranks = graph.pageRank(resetProbability=0.15, maxIter=10)
ranks.edges.show()
ranks.vertices.orderBy(desc("pagerank")).select("id", "pagerank").show()

+---+---+------+
|src|dst|weight|
+---+---+------+
|  1|  2|   1.0|
|  2|  1|   1.0|
|  3|  7|   1.0|
|  4|  1|   1.0|
|  6|  3|   0.5|
|  6|  7|   0.5|
|  7|  3|   0.5|
|  7|  6|   0.5|
+---+---+------+

+---+-------------------+
| id|           pagerank|
+---+-------------------+
|  2| 1.6856869000946866|
|  1| 1.5582155389297032|
|  7|  1.477612668780208|
|  3| 1.1382113821138213|
|  6| 0.7988100954474346|
|  4|0.17073170731707324|
|  8|0.17073170731707324|
+---+-------------------+



In [26]:
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
minGraph = GraphFrame(users_df, followers_df)
connected_graph = minGraph.connectedComponents().where("component != 0")
connected_graph.show()
connected_graph.groupBy('component').count().show()

+---+-------------+---------------+---------+
| id| account_name|       username|component|
+---+-------------+---------------+---------+
|  1|  BarackObama|   Barack Obama|        1|
|  2|     ladygaga|Goddess of Love|        1|
|  3|      jeresig|     John Resig|        3|
|  4| justinbieber|  Justin Bieber|        1|
|  6|matei_zaharia|  Matei Zaharia|        3|
|  7|      odersky| Martin Odersky|        3|
|  8|      anonsys|           null|        8|
+---+-------------+---------------+---------+

+---------+-----+
|component|count|
+---------+-----+
|        1|    3|
|        3|    3|
|        8|    1|
+---------+-----+



In [27]:
scc = minGraph.stronglyConnectedComponents(maxIter=3)
scc.show()
scc.groupBy("component").count().show()

+---+-------------+---------------+---------+
| id| account_name|       username|component|
+---+-------------+---------------+---------+
|  4| justinbieber|  Justin Bieber|        4|
|  1|  BarackObama|   Barack Obama|        1|
|  6|matei_zaharia|  Matei Zaharia|        3|
|  3|      jeresig|     John Resig|        3|
|  7|      odersky| Martin Odersky|        3|
|  8|      anonsys|           null|        8|
|  2|     ladygaga|Goddess of Love|        1|
+---+-------------+---------------+---------+

+---------+-----+
|component|count|
+---------+-----+
|        1|    2|
|        3|    3|
|        8|    1|
|        4|    1|
+---------+-----+

