In [1]:
import os
execfile(os.path.join(os.environ["SPARK_HOME"], 'python/pyspark/shell.py'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.


In [2]:
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.enableHiveSupport().master("local [2]").getOrCreate()

In [3]:
graphPath = "/data/graphDFSample"

In [39]:
from pyspark.sql.functions import explode, collect_list, size, col, row_number, sort_array
from pyspark.sql import Window

<h3>Read the parquet file into DF</h3>

In [29]:
reversedGraph = sparkSession.read.parquet(graphPath)

In [22]:
reversedGraph.show(3)

+--------+--------------------+
|    user|             friends|
+--------+--------------------+
|22991438|[20699, 175973, 5...|
|37586597|[83616, 139192, 1...|
|56325000|[504270, 645333, ...|
+--------+--------------------+
only showing top 3 rows



In [23]:
reversedGraph.rdd.take(1)

[Row(user=22991438, friends=[20699, 175973, 533235, 584091, 610338, 652317, 840678, 1141589, 1155542, 1239091, 1261113, 1813794, 1815252, 1836438, 1859405, 1860435, 2367768, 2376721, 2460191, 2472708, 2490099, 2492701, 2543851, 2544247, 2554811, 2574091, 2574600, 2615946, 2675180, 2979511, 3039028, 3099044, 3154250, 3183942, 3219940, 3226424, 3227846, 3319312, 3486958, 3717976, 3726009, 3752079, 3759339, 3795687, 3811911, 3823029, 3833714, 3858043, 3874376, 3931989, 3937061, 4064631, 4223807, 4463433, 4479681, 4484282, 4506540, 4558935, 4573610, 4669468, 4709040, 4967644, 4977794, 5047458, 5081445, 5131859, 5133162, 5137845, 5170075, 5183106, 5226216, 5243853, 5247908, 5319361, 5338755, 5510830, 5754472, 5757073, 5807689, 5808282, 5878476, 5899824, 6024877, 6317398, 6345778, 6455669, 6488432, 6535763, 6570921, 6653476, 7103746, 7116529, 7121632, 7155565, 7186124, 7283715, 7497456, 7635033, 7638835, 7641472, 7664866, 7700256, 7727810, 7727974, 7736770, 7747311, 7759953, 7789379, 7973578

<h3>Step 1. Explode fliends into frend column</h3>

In [30]:
reversedGraph = reversedGraph.withColumn("friend", explode('friends'))

In [31]:
reversedGraph.show(3)

+--------+--------------------+------+
|    user|             friends|friend|
+--------+--------------------+------+
|22991438|[20699, 175973, 5...| 20699|
|22991438|[20699, 175973, 5...|175973|
|22991438|[20699, 175973, 5...|533235|
+--------+--------------------+------+
only showing top 3 rows



<h3>Step 2 and 3. Create DF of friends with all the users, which have a that friend.</h3><br>
<p>group by friend collecting users in the list</p> 

In [32]:
reversedGraph = reversedGraph.groupBy("friend").agg(collect_list("user").alias("users"))

In [33]:
reversedGraph.show(3)

+------+--------------------+
|friend|               users|
+------+--------------------+
|   148|[65051219, 146311...|
|  5518|          [58573511]|
|  9900|          [36844066]|
+------+--------------------+
only showing top 3 rows



<h3>add user count for each friend</h3>

In [35]:
reversedGraph = reversedGraph.withColumn("users_size", size("users"))

In [36]:
reversedGraph.show(3)

+------+--------------------+----------+
|friend|               users|users_size|
+------+--------------------+----------+
|   148|[65051219, 146311...|         4|
|  5518|          [58573511]|         1|
|  9900|          [36844066]|         1|
+------+--------------------+----------+
only showing top 3 rows



In [37]:
reversedGraph.rdd.take(3)

[Row(friend=148, users=[65051219, 14631101, 3195315, 14957568], users_size=4),
 Row(friend=5518, users=[58573511], users_size=1),
 Row(friend=9900, users=[36844066], users_size=1)]

<h3>Step 4. Sort the elements in the column “users” by the function sort_array</h3>

In [44]:
reversedGraph = reversedGraph.select(reversedGraph.friend, sort_array(reversedGraph.users), "users_size")

In [45]:
reversedGraph.show(3)

+------+-----------------------+----------+
|friend|sort_array(users, true)|users_size|
+------+-----------------------+----------+
|   148|   [3195315, 1463110...|         4|
|  5518|             [58573511]|         1|
|  9900|             [36844066]|         1|
+------+-----------------------+----------+
only showing top 3 rows



In [4]:
from pyspark.sql.functions import explode, collect_list, size, col, row_number
from pyspark.sql import Window

reversedGraph = sparkSession.read.parquet(graphPath) \
    .withColumn("friend", explode('friends')) \
    .groupBy("friend") \
    .agg(collect_list("user").alias("users")) \
    .withColumn("users_size", size("users")) \

In [9]:
reversedGraph.printSchema()

root
 |-- user: integer (nullable = true)
 |-- friends: array (nullable = true)
 |    |-- element: integer (containsNull = true)



In [6]:
window = Window.orderBy(col("users_size").desc())
    
top50 = reversedGraph.withColumn("row_number", row_number().over(window)) \
            .filter(col("row_number") < 50) \
            .select(col("friend"), col("users_size")) \
            .orderBy(col("users_size").desc()) \
            .collect()

In [7]:
for val in top50:
    print '%s %s' % val

9606655 244
62922315 241
1288836 240
36402159 239
36079654 239
40342046 235
24319760 234
34854364 234
45353567 233
28229916 231
16364918 230
52511791 229
549319 227
5137947 227
65079230 227
17636074 226
49067109 225
53106903 225
6570168 223
44621704 223
34850500 223
27338193 222
32810368 222
25606717 222
34201873 220
6147442 219
62386165 219
45239367 219
32821462 218
30234171 218
63649194 217
53826156 217
13813472 217
26158314 217
17679500 217
14394422 216
7153815 216
13062446 216
36039499 216
64373911 216
12890141 215
20291955 215
36757249 214
64856469 214
40043869 213
34071175 212
11768267 211
38750752 211
3295906 211
