# **Social Network Analytics**

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from datetime import datetime, date, timedelta
from dateutil import relativedelta
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import DataFrame
from pyspark.sql.functions import *
from pyspark.sql.functions import to_timestamp, to_date
from pyspark.sql import functions as F  
from pyspark.sql.functions import collect_list, collect_set, concat, first, array_distinct, col, size, expr
from pyspark.sql import DataFrame 
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder

In [2]:
spark = SparkSession.builder.getOrCreate()

In [4]:
df_venmo = spark.read.parquet('VenmoSample.snappy.parquet')
df_venmo.printSchema()
df_venmo.show()

root
 |-- user1: integer (nullable = true)
 |-- user2: integer (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- datetime: timestamp (nullable = true)
 |-- description: string (nullable = true)
 |-- is_business: boolean (nullable = true)
 |-- story_id: string (nullable = true)

+--------+-------+----------------+-------------------+--------------------+-----------+--------------------+
|   user1|  user2|transaction_type|           datetime|         description|is_business|            story_id|
+--------+-------+----------------+-------------------+--------------------+-----------+--------------------+
| 1218774|1528945|         payment|2015-11-27 10:48:19|                Uber|      false|5657c473cd03c9af2...|
| 5109483|4782303|         payment|2015-06-17 11:37:04|              Costco|      false|5580f9702b64f70ab...|
| 4322148|3392963|         payment|2015-06-19 07:05:31|        Sweaty balls|      false|55835ccb1a624b14a...|
|  469894|1333620|          charge|2016-

In [5]:
df_venmo.createOrReplaceTempView('df_venmo')

**Task_5: Find a user’s friends and friends of friends (Friend definition: A user’s friend is someone who has transacted with the user, either sending money to the user or receiving money from the user). Describe your algorithm and calculate its computational complexity. Can you do it better?**

In [9]:
# Users' friends
user_friends = spark.sql('SELECT user1 AS user, user2 AS friend, datetime\
                          FROM (SELECT user1, user2, datetime FROM df_venmo\
                                UNION\
                                SELECT user2, user1, datetime FROM df_venmo)\
                          ORDER BY user, datetime')

user_friends.show()

user_friends.createOrReplaceTempView('user_friends')

+----+-------+-------------------+
|user| friend|           datetime|
+----+-------+-------------------+
|   2|    220|2012-11-23 06:03:42|
|   2|     43|2016-04-09 09:29:31|
|   2|     43|2016-09-07 07:53:16|
|   2| 191142|2016-09-26 09:56:55|
|   3|     43|2016-06-27 01:14:37|
|   3| 263437|2016-07-22 21:16:37|
|   3|     52|2016-09-22 15:30:09|
|   3|2382556|2016-10-06 10:49:45|
|   3|2382556|2016-10-07 08:50:23|
|   3|1079020|2016-10-07 23:37:56|
|   3|1204190|2016-10-09 01:56:24|
|   3|7854140|2016-10-09 03:36:13|
|   3| 567957|2016-10-29 09:28:07|
|   4| 122744|2012-12-03 03:35:53|
|   4| 125527|2012-12-10 10:27:55|
|   4| 125527|2012-12-15 05:51:12|
|   4| 125755|2013-01-04 10:20:36|
|   4| 968271|2014-02-04 06:51:33|
|   4| 187560|2015-06-17 09:23:30|
|   4|9271982|2016-03-03 12:45:57|
+----+-------+-------------------+
only showing top 20 rows



In [10]:
# Total number of users
spark.sql('SELECT COUNT(DISTINCT user)\
           FROM user_friends').show()

+--------------------+
|count(DISTINCT user)|
+--------------------+
|             3018657|
+--------------------+



In [12]:
# Users' friends of friends
user_fof = spark.sql('SELECT t1.user, t1.friend, t2.friend AS friend_of_friend\
                      FROM user_friends AS t1 INNER JOIN user_friends AS t2 ON t1.friend = t2.user\
                      WHERE t1.user <> t2.friend\
                      ORDER BY t1.user')

user_fof.show()

user_fof.createOrReplaceTempView('user_fof')

+----+------+----------------+
|user|friend|friend_of_friend|
+----+------+----------------+
|   2|    43|               3|
|   2|    43|           20639|
|   2|    43|              19|
|   2|    43|           82697|
|   2|    43|          316862|
|   2|    43|          316862|
|   2|    43|           47104|
|   2|    43|           47104|
|   2|    43|              96|
|   2|    43|           47104|
|   2|    43|             629|
|   2|    43|              96|
|   2|    43|           47104|
|   2|    43|              10|
|   2|    43|           47104|
|   2|    43|           55895|
|   2|    43|          307603|
|   2|    43|           49778|
|   2|    43|          297967|
|   2|    43|            3565|
+----+------+----------------+
only showing top 20 rows



**Task_6: Now, that we have the list of each user’s friends and friends of friends, we are in position to calculate many social network variables. Use the dynamic analysis from before, and calculate the following social network metrics across a user’s lifetime in Venmo (from 0 up to 12 months).**

1. Number of friends and number of friends of friends

In [13]:
# Find users' first transaction dates
user_tran_0 = spark.sql('SELECT user, MIN(datetime) AS datetime_0\
                         FROM user_friends\
                         GROUP BY user\
                         ORDER BY user')

user_tran_0.show()

user_tran_0.createOrReplaceTempView('user_tran_0')

+----+-------------------+
|user|         datetime_0|
+----+-------------------+
|   2|2012-11-23 06:03:42|
|   3|2016-06-27 01:14:37|
|   4|2012-12-03 03:35:53|
|   6|2014-06-28 00:18:16|
|   8|2015-08-11 02:08:47|
|   9|2012-06-28 04:28:32|
|  10|2012-11-25 09:20:39|
|  11|2012-05-15 22:08:58|
|  12|2012-09-01 03:40:16|
|  13|2012-09-28 06:11:39|
|  16|2014-07-12 04:20:53|
|  18|2016-02-21 07:17:22|
|  19|2012-05-26 02:19:04|
|  24|2013-12-20 10:31:39|
|  28|2014-05-22 11:25:51|
|  29|2014-04-25 23:37:38|
|  31|2013-05-19 11:19:15|
|  32|2012-11-07 01:29:50|
|  34|2014-07-21 23:31:20|
|  42|2012-08-25 00:30:23|
+----+-------------------+
only showing top 20 rows



In [14]:
# Assign lifetime_indicator to each transaction
user_trans = spark.sql('SELECT user_friends.user, user_friends.friend, datetime, datetime_0, ceil(months_between(datetime, datetime_0)) AS lifetime_indicator\
                        FROM user_friends INNER JOIN user_tran_0 ON user_friends.user = user_tran_0.user\
                        ORDER BY user_friends.user, datetime')

user_trans.show()

user_trans.createOrReplaceTempView('user_trans')

+----+-------+-------------------+-------------------+------------------+
|user| friend|           datetime|         datetime_0|lifetime_indicator|
+----+-------+-------------------+-------------------+------------------+
|   2|    220|2012-11-23 06:03:42|2012-11-23 06:03:42|                 0|
|   2|     43|2016-04-09 09:29:31|2012-11-23 06:03:42|                41|
|   2|     43|2016-09-07 07:53:16|2012-11-23 06:03:42|                46|
|   2| 191142|2016-09-26 09:56:55|2012-11-23 06:03:42|                47|
|   3|     43|2016-06-27 01:14:37|2016-06-27 01:14:37|                 0|
|   3| 263437|2016-07-22 21:16:37|2016-06-27 01:14:37|                 1|
|   3|     52|2016-09-22 15:30:09|2016-06-27 01:14:37|                 3|
|   3|2382556|2016-10-06 10:49:45|2016-06-27 01:14:37|                 4|
|   3|2382556|2016-10-07 08:50:23|2016-06-27 01:14:37|                 4|
|   3|1079020|2016-10-07 23:37:56|2016-06-27 01:14:37|                 4|
|   3|1204190|2016-10-09 01:56:24|2016

In [15]:
# Keep the transactions with a lifetime_indicator <= 12
user_trans = spark.sql('SELECT *\
                        FROM user_trans\
                        WHERE lifetime_indicator <= 12\
                        ORDER BY user, lifetime_indicator')

user_trans.show()

user_trans.createOrReplaceTempView('user_trans')

+----+-------+-------------------+-------------------+------------------+
|user| friend|           datetime|         datetime_0|lifetime_indicator|
+----+-------+-------------------+-------------------+------------------+
|   2|    220|2012-11-23 06:03:42|2012-11-23 06:03:42|                 0|
|   3|     43|2016-06-27 01:14:37|2016-06-27 01:14:37|                 0|
|   3| 263437|2016-07-22 21:16:37|2016-06-27 01:14:37|                 1|
|   3|     52|2016-09-22 15:30:09|2016-06-27 01:14:37|                 3|
|   3|1204190|2016-10-09 01:56:24|2016-06-27 01:14:37|                 4|
|   3|7854140|2016-10-09 03:36:13|2016-06-27 01:14:37|                 4|
|   3|2382556|2016-10-06 10:49:45|2016-06-27 01:14:37|                 4|
|   3|1079020|2016-10-07 23:37:56|2016-06-27 01:14:37|                 4|
|   3|2382556|2016-10-07 08:50:23|2016-06-27 01:14:37|                 4|
|   3| 567957|2016-10-29 09:28:07|2016-06-27 01:14:37|                 5|
|   4| 122744|2012-12-03 03:35:53|2012

In [16]:
# Find number of friends using dynamic metrics
user_dynamic_friends = spark.sql('SELECT user, COUNT(DISTINCT friend) AS friend_count, lifetime_indicator\
                                  FROM user_trans\
                                  GROUP BY user, lifetime_indicator\
                                  ORDER BY user, lifetime_indicator')

user_dynamic_friends.show()

user_dynamic_friends.createOrReplaceTempView('user_dynamic_friends')

+----+------------+------------------+
|user|friend_count|lifetime_indicator|
+----+------------+------------------+
|   2|           1|                 0|
|   3|           1|                 0|
|   3|           1|                 1|
|   3|           1|                 3|
|   3|           4|                 4|
|   3|           1|                 5|
|   4|           1|                 0|
|   4|           1|                 1|
|   4|           1|                 2|
|   6|           1|                 0|
|   6|           1|                10|
|   8|           1|                 0|
|   8|           1|                 2|
|   8|           1|                 6|
|   8|           1|                 7|
|   8|           1|                 9|
|   8|           1|                10|
|   9|           1|                 0|
|   9|           1|                 2|
|   9|           1|                 5|
+----+------------+------------------+
only showing top 20 rows



In [17]:
# Add all months to the table
period = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
user_trans_period = user_tran_0.withColumn('period', F.array([F.lit(x) for x in period]))
user_trans_period = user_trans_period.withColumn('period', F.explode(col('period')))

user_trans_period.createOrReplaceTempView('user_trans_period')

user_dynamic_friends = spark.sql('SELECT user, lifetime_indicator, SUM(friend_count) AS friend_count\
                                  FROM (SELECT user, lifetime_indicator, friend_count FROM user_dynamic_friends\
                                        UNION\
                                        SELECT user, period, 0 FROM user_trans_period)\
                                  GROUP BY user, lifetime_indicator\
                                  ORDER BY user, lifetime_indicator')

user_dynamic_friends.show()

user_dynamic_friends.createOrReplaceTempView('user_dynamic_friends')

+----+------------------+------------+
|user|lifetime_indicator|friend_count|
+----+------------------+------------+
|   2|                 0|           1|
|   2|                 1|           0|
|   2|                 2|           0|
|   2|                 3|           0|
|   2|                 4|           0|
|   2|                 5|           0|
|   2|                 6|           0|
|   2|                 7|           0|
|   2|                 8|           0|
|   2|                 9|           0|
|   2|                10|           0|
|   2|                11|           0|
|   2|                12|           0|
|   3|                 0|           1|
|   3|                 1|           1|
|   3|                 2|           0|
|   3|                 3|           1|
|   3|                 4|           4|
|   3|                 5|           1|
|   3|                 6|           0|
+----+------------------+------------+
only showing top 20 rows



In [18]:
# Add cum friend count
user_dynamic_friends = spark.sql('SELECT user, lifetime_indicator, friend_count, SUM(friend_count) OVER (PARTITION BY user ORDER BY lifetime_indicator) AS cum_friend_count\
                                  FROM user_dynamic_friends\
                                  ORDER BY user, lifetime_indicator')

user_dynamic_friends.show()

user_dynamic_friends.createOrReplaceTempView('user_dynamic_friends')

+----+------------------+------------+----------------+
|user|lifetime_indicator|friend_count|cum_friend_count|
+----+------------------+------------+----------------+
|   2|                 0|           1|               1|
|   2|                 1|           0|               1|
|   2|                 2|           0|               1|
|   2|                 3|           0|               1|
|   2|                 4|           0|               1|
|   2|                 5|           0|               1|
|   2|                 6|           0|               1|
|   2|                 7|           0|               1|
|   2|                 8|           0|               1|
|   2|                 9|           0|               1|
|   2|                10|           0|               1|
|   2|                11|           0|               1|
|   2|                12|           0|               1|
|   3|                 0|           1|               1|
|   3|                 1|           1|          

In [46]:
# Find friends of friends
user_dynamic_fof_details = spark.sql('SELECT t1.user, t1.friend, t1.datetime AS datetime_friend, t1.lifetime_indicator, t2.friend AS friend_of_friend, t2.datetime as datetime_fof\
                                      FROM user_trans AS t1 INNER JOIN user_trans AS t2 ON t1.friend = t2.user AND t1.user <> t2.friend AND t1.datetime > t2.datetime\
                                      ORDER BY t1.user, t1.lifetime_indicator')

user_dynamic_fof_details.show()

user_dynamic_fof_details.createOrReplaceTempView('user_dynamic_fof_details')

+----+------+-------------------+------------------+----------------+-------------------+
|user|friend|    datetime_friend|lifetime_indicator|friend_of_friend|       datetime_fof|
+----+------+-------------------+------------------+----------------+-------------------+
|   3|    43|2016-06-27 01:14:37|                 0|           47104|2013-03-26 11:18:15|
|   3|    43|2016-06-27 01:14:37|                 0|           47104|2012-06-20 19:06:21|
|   3|    43|2016-06-27 01:14:37|                 0|           47104|2012-11-29 01:43:46|
|   3|    43|2016-06-27 01:14:37|                 0|              10|2012-12-31 12:44:12|
|   3|    43|2016-06-27 01:14:37|                 0|              10|2013-01-01 02:26:16|
|   3|    43|2016-06-27 01:14:37|                 0|           47104|2012-05-30 08:29:25|
|   3|    43|2016-06-27 01:14:37|                 0|           47104|2013-03-31 05:24:37|
|   3|    43|2016-06-27 01:14:37|                 0|           49778|2013-03-07 07:46:55|
|   3|    

In [62]:
# Find number of friends of friends using dynamic metrics
user_dynamic_fof = spark.sql('SELECT user, lifetime_indicator, SUM(fof_count) AS fof_count\
                              FROM (SELECT user, lifetime_indicator, COUNT(DISTINCT friend_of_friend) AS fof_count\
                                    FROM user_dynamic_fof_details\
                                    GROUP BY user, lifetime_indicator\
                                    UNION\
                                    SELECT user, period, 0 FROM user_trans_period)\
                              GROUP BY user, lifetime_indicator\
                              ORDER BY user, lifetime_indicator')

user_dynamic_fof.show()

user_dynamic_fof.createOrReplaceTempView('user_dynamic_fof')

+----+------------------+---------+
|user|lifetime_indicator|fof_count|
+----+------------------+---------+
|   2|                 0|        0|
|   2|                 1|        0|
|   2|                 2|        0|
|   2|                 3|        0|
|   2|                 4|        0|
|   2|                 5|        0|
|   2|                 6|        0|
|   2|                 7|        0|
|   2|                 8|        0|
|   2|                 9|        0|
|   2|                10|        0|
|   2|                11|        0|
|   2|                12|        0|
|   3|                 0|        6|
|   3|                 1|        8|
|   3|                 2|        0|
|   3|                 3|        1|
|   3|                 4|        6|
|   3|                 5|        8|
|   3|                 6|        0|
+----+------------------+---------+
only showing top 20 rows



In [63]:
# Add cum number of friends of friends
user_dynamic_fof_cum = spark.sql('SELECT *, SUM(fof_count) OVER (PARTITION BY user ORDER BY lifetime_indicator) AS cum_fof_count\
                              FROM user_dynamic_fof\
                              ORDER BY user, lifetime_indicator')
user_dynamic_fof_cum.show()
user_dynamic_fof_cum.createOrReplaceTempView('user_dynamic_fof_cum')

+----+------------------+---------+-------------+
|user|lifetime_indicator|fof_count|cum_fof_count|
+----+------------------+---------+-------------+
|   2|                 0|        0|            0|
|   2|                 1|        0|            0|
|   2|                 2|        0|            0|
|   2|                 3|        0|            0|
|   2|                 4|        0|            0|
|   2|                 5|        0|            0|
|   2|                 6|        0|            0|
|   2|                 7|        0|            0|
|   2|                 8|        0|            0|
|   2|                 9|        0|            0|
|   2|                10|        0|            0|
|   2|                11|        0|            0|
|   2|                12|        0|            0|
|   3|                 0|        6|            6|
|   3|                 1|        8|           14|
|   3|                 2|        0|           14|
|   3|                 3|        1|           15|


2. Clustering coefficient of a user's network

In [16]:
user_dynamic_edges = spark.sql('SELECT *, (cum_friend_count * (cum_friend_count - 1)) / 2 AS edges\
                                FROM user_dynamic_friends')

user_dynamic_edges.show(50)

user_dynamic_edges.createOrReplaceTempView('user_dynamic_edges')

+----+------------------+------------+----------------+-----+
|user|lifetime_indicator|friend_count|cum_friend_count|edges|
+----+------------------+------------+----------------+-----+
|2866|                 0|           1|               1|  0.0|
|2866|                 1|           0|               1|  0.0|
|2866|                 2|           0|               1|  0.0|
|2866|                 3|           0|               1|  0.0|
|2866|                 4|           0|               1|  0.0|
|2866|                 5|           0|               1|  0.0|
|2866|                 6|           0|               1|  0.0|
|2866|                 7|           0|               1|  0.0|
|2866|                 8|           0|               1|  0.0|
|2866|                 9|           0|               1|  0.0|
|2866|                10|           0|               1|  0.0|
|2866|                11|           0|               1|  0.0|
|2866|                12|           0|               1|  0.0|
|3918|  

In [47]:
temp_1 = spark.sql('SELECT t2.user, t2.lifetime_indicator, t2.friend, t2.friend_of_friend\
                    FROM user_dynamic_fof_details AS t1 INNER JOIN user_dynamic_fof_details AS t2 ON t1.user = t2.user AND t1.friend = t2.friend_of_friend')

temp_1.show()

temp_1.createOrReplaceTempView('temp_1')

+-----+------------------+------+----------------+
| user|lifetime_indicator|friend|friend_of_friend|
+-----+------------------+------+----------------+
|52805|                10| 59891|          124198|
|52805|                10| 59891|          124198|
|52805|                10| 59891|          124198|
|52805|                10| 59891|          124198|
|52805|                10| 59891|          124198|
|52805|                10| 59891|          124198|
|52805|                10| 59891|          124198|
|52805|                10| 59891|          124198|
|54032|                12|854731|         2275553|
|54032|                12|854731|         2275553|
|54032|                12|854731|         2275553|
|54032|                12|854731|         2275553|
|63348|                 6|134481|           87240|
|63348|                 6|134481|           87240|
|67259|                10|145279|           57236|
|67259|                10|145279|           57236|
|67259|                12|14527

In [49]:
temp_2 = spark.sql("SELECT DISTINCT user, lifetime_indicator, CONCAT(user,friend,friend_of_friend) AS C1, CONCAT(user,friend_of_friend,friend) AS C2 FROM temp_1\
                    ORDER BY 1,2 ")
temp_2.show()
temp_2.createOrReplaceTempView("temp_2")

+----+------------------+-----------------+-----------------+
|user|lifetime_indicator|               C1|               C2|
+----+------------------+-----------------+-----------------+
|  16|                 1|     161079065747|     167471079065|
|  89|                 2| 8951973471340705| 8913407055197347|
|  89|                 4| 8951973471340705| 8913407055197347|
| 112|                 8|      11212135778|      11213577812|
| 112|                 8|      11213577812|      11212135778|
| 224|                 4|22431616822223424|22422234243161682|
| 224|                 6|22422234243161682|22431616822223424|
| 322|                 7| 3222222652411317| 3224113172222652|
| 322|                12| 3224113172222652| 3222222652411317|
| 361|                 0|36128923093971076|36139710762892309|
| 361|                 1|36128923093971076|36139710762892309|
| 361|                 5|36139710761672387|36116723873971076|
| 361|                 5|36139710762892309|36128923093971076|
| 361|  

In [50]:
temp_3 = spark.sql("SELECT *, ROW_NUMBER() OVER (ORDER BY C1,C2) AS r_id FROM temp_2")
temp_3.show()
temp_3.createOrReplaceTempView('temp_3')

+--------+------------------+--------------------+--------------------+----+
|    user|lifetime_indicator|                  C1|                  C2|r_id|
+--------+------------------+--------------------+--------------------+----+
|10000123|                 0|10000123149578779...|10000123791866149...|   1|
|10000128|                 3|10000128431263447...|10000128471913943...|   2|
|10000128|                 0|10000128471913943...|10000128431263447...|   3|
|10000128|                 3|10000128471913943...|10000128431263447...|   4|
|10000128|                 4|10000128471913943...|10000128431263447...|   5|
| 1000013|                12|10000131000058101...|10000131016562100...|   6|
| 1000013|                12|10000131000058882320|10000138823201000058|   7|
| 1000013|                12|10000131016562100...|10000131000058101...|   8|
| 1000013|                12|10000131016562882320|10000138823201016562|   9|
| 1000013|                 4|10000138823201000058|10000131000058882320|  10|

In [51]:
temp_4 = spark.sql("SELECT t1.r_id FROM temp_3 t1 INNER JOIN temp_3 t2 ON t1.user=t2.user\
                     AND t1.lifetime_indicator=t2.lifetime_indicator\
                     AND t1.C1=t2.C2\
                     AND t1.C2 = t2.C1\
                     AND t1.r_id < t2.r_id")
temp_4.show()
temp_4.createOrReplaceTempView('temp_4')

+-------+
|   r_id|
+-------+
|  54341|
|2072080|
|  96215|
| 798389|
|1071316|
|1794879|
| 413143|
| 459641|
| 461081|
| 636152|
| 636489|
| 660968|
| 660969|
| 660974|
| 660971|
| 660981|
| 660982|
| 661008|
| 673607|
| 706965|
+-------+
only showing top 20 rows



In [55]:
temp_5 = spark.sql("SELECT t1.r_id, t1.user, t1.lifetime_indicator, t1.C1, t1.C2 FROM temp_3 t1\
                    LEFT JOIN temp_4 t2 ON t1.r_id = t2.r_id\
                    WHERE t2.r_id IS NULL")
temp_5.show()
temp_5.createOrReplaceTempView("temp_5")

+----+--------+------------------+--------------------+--------------------+
|r_id|    user|lifetime_indicator|                  C1|                  C2|
+----+--------+------------------+--------------------+--------------------+
|   1|10000123|                 0|10000123149578779...|10000123791866149...|
|   3|10000128|                 0|10000128471913943...|10000128431263447...|
|   4|10000128|                 3|10000128471913943...|10000128431263447...|
|   5|10000128|                 4|10000128471913943...|10000128431263447...|
|   8| 1000013|                12|10000131016562100...|10000131000058101...|
|  10| 1000013|                 4|10000138823201000058|10000131000058882320|
|  11| 1000013|                12|10000138823201000058|10000131000058882320|
|  12| 1000013|                12|10000138823201016562|10000131016562882320|
|  13| 1000024|                11|10000241571549533515|10000245335151571549|
|  15| 1000024|                12|10000242868592157...|10000241571549286...|

In [56]:
temp_6 = spark.sql('SELECT user, lifetime_indicator, COUNT(*) AS cnt FROM temp_5\
                    GROUP BY 1,2\
                    ORDER BY 1,2')
temp_6.show()
temp_6.createOrReplaceTempView('temp_6')

+----+------------------+---+
|user|lifetime_indicator|cnt|
+----+------------------+---+
|  16|                 1|  1|
|  89|                 2|  1|
|  89|                 4|  1|
| 112|                 8|  1|
| 224|                 4|  1|
| 224|                 6|  1|
| 322|                 7|  1|
| 322|                12|  1|
| 361|                 0|  1|
| 361|                 1|  1|
| 361|                 5|  2|
| 361|                 9|  1|
| 759|                 5|  1|
| 777|                 0|  1|
| 777|                 2|  1|
| 946|                11|  1|
|1224|                 8|  1|
|1234|                 0|  1|
|1293|                12|  2|
|1296|                10|  1|
+----+------------------+---+
only showing top 20 rows



In [57]:
clustering_coef = spark.sql("SELECT t1.user,t1.lifetime_indicator, t1.friend_count, t1.cum_friend_count, t1.edges, IFNULL(t2.cnt/t1.edges,0) AS cluster_coef\
                             FROM user_dynamic_edges t1\
                             LEFT JOIN temp_6 t2\
                             ON t1.user = t2.user \
                             AND t1.lifetime_indicator = t2.lifetime_indicator\
                             ORDER BY 1,2")
clustering_coef.show(60)
clustering_coef.createOrReplaceTempView('clustering_coef')

+----+------------------+------------+----------------+-----+------------+
|user|lifetime_indicator|friend_count|cum_friend_count|edges|cluster_coef|
+----+------------------+------------+----------------+-----+------------+
|   2|                 0|           1|               1|  0.0|         0.0|
|   2|                 1|           0|               1|  0.0|         0.0|
|   2|                 2|           0|               1|  0.0|         0.0|
|   2|                 3|           0|               1|  0.0|         0.0|
|   2|                 4|           0|               1|  0.0|         0.0|
|   2|                 5|           0|               1|  0.0|         0.0|
|   2|                 6|           0|               1|  0.0|         0.0|
|   2|                 7|           0|               1|  0.0|         0.0|
|   2|                 8|           0|               1|  0.0|         0.0|
|   2|                 9|           0|               1|  0.0|         0.0|
|   2|                10|

3. Calculate the page rank of each user

In [75]:
import networkx as nx
G = nx.Graph()

pairs = user_friends.select("user", "friend")
nx_tuples = pairs.rdd.map(tuple).collect()
G.add_edges_from(nx_tuples)

In [76]:
pagerank = nx.pagerank_scipy(G)
dict(list(pagerank.items())[0:10])

  """Entry point for launching an IPython kernel.


{2: 4.157477977789461e-07,
 3: 8.653820777473779e-07,
 43: 3.2127973908043654e-06,
 52: 1.6571325641882515e-07,
 220: 1.435516964442576e-07,
 191142: 2.7629757837595304e-07,
 263437: 7.489934678145931e-07,
 1079020: 4.2278735212380876e-07,
 1204190: 8.981909064048241e-07,
 2382556: 8.488874357040234e-08}

In [77]:
# covert dictionary to pyspark dateframe
df_pagerank = pd.DataFrame(pagerank.items(), columns=['user', 'pagerank'])
df_pagerank = spark.createDataFrame(df_pagerank)
df_pagerank.createOrReplaceTempView("df_pagerank")
df_pagerank.show()

+-------+--------------------+
|   user|            pagerank|
+-------+--------------------+
|      2|4.157477977789461E-7|
|     43|3.212797390804365...|
| 191142|2.762975783759530...|
|    220|1.435516964442576E-7|
|      3|8.653820777473779E-7|
| 263437|7.489934678145931E-7|
|1204190|8.981909064048241E-7|
|     52|1.657132564188251...|
|2382556|8.488874357040234E-8|
|1079020|4.227873521238087...|
|7854140|8.488874357040234E-8|
| 567957|8.505700929517217E-7|
|      4| 1.32619682638118E-6|
| 125527|3.714991704683809...|
| 221578|4.278156052272820...|
| 187560|5.122902573656336E-7|
|9271982|8.991699667387565E-8|
| 968271|1.837777212720441E-7|
| 122744|8.991699667387565E-8|
| 125755|8.991699667387565E-8|
+-------+--------------------+
only showing top 20 rows



In [71]:
social_network_all = spark.sql("SELECT t1.user, t1.lifetime_indicator, t1.friend_count, t1.cum_friend_count,t1.edges, t1.cluster_coef, t2.fof_count, t2.cum_fof_count\
                                FROM clustering_coef t1 INNER JOIN user_dynamic_fof_cum t2\
                                ON t1.user = t2.user\
                                AND t1.lifetime_indicator = t2.lifetime_indicator\
                                ORDER BY 1,2 ")
social_network_all.show()

+----+------------------+------------+----------------+-----+------------+---------+-------------+
|user|lifetime_indicator|friend_count|cum_friend_count|edges|cluster_coef|fof_count|cum_fof_count|
+----+------------------+------------+----------------+-----+------------+---------+-------------+
|   2|                 0|           1|               1|  0.0|         0.0|        0|            0|
|   2|                 1|           0|               1|  0.0|         0.0|        0|            0|
|   2|                 2|           0|               1|  0.0|         0.0|        0|            0|
|   2|                 3|           0|               1|  0.0|         0.0|        0|            0|
|   2|                 4|           0|               1|  0.0|         0.0|        0|            0|
|   2|                 5|           0|               1|  0.0|         0.0|        0|            0|
|   2|                 6|           0|               1|  0.0|         0.0|        0|            0|
|   2|    

In [None]:
#social_network_all.createOrReplaceTempView('social_network_all')
social_network_all.write.parquet('/content/drive/social_network_all.parquet')