In [1]:
# Import Venmo data
from pyspark.sql import SparkSession
venmo=spark.read.parquet('/FileStore/tables/venmo/VenmoSample_snappy-e020d.parquet')
venmo.show(5)

### Social Network Analytics

In [3]:
venmo.count()

In [4]:
# limit dataset to test functionality
users = venmo.select('user1','user2', 'datetime')#.limit(100000)
users.createOrReplaceTempView("users")

In [5]:
%sql
create table default.venmo_nw
as (select * from users)

d
#### Q5 [5 pts]: Write a script to find a user’s friends and friends of friends (Friend definition: A user’s friend is someone who has transacted with the user, either sending money to the user or receiving money from the user). Describe your algorithm and calculate its computational complexity. Can you do it better?

In [7]:
%sql

CREATE OR REPLACE TEMP VIEW
friends as
(
(SELECT distinct a.user1, b.user2, 2 as degree
FROM 
    users a 
    INNER JOIN users b ON a.user2 = b.user1 AND a.user1 != b.user2
ORDER BY 1)
union
(SELECT distinct a.user2, b.user1, 2 as degree
FROM 
    users a 
    INNER JOIN users b ON a.user1 = b.user2 AND a.user2 != b.user1
ORDER BY 1)
union
(select distinct user1, user2, 1 as degree
FROM
    users)
order by user1, degree
    );
    
CREATE TABLE
default.friends as 
(
SELECT distinct * 
FROM friends
)

#### Q6 [10 pts]: Now, that you have the list of each user’s friends and friends of friends, you are in position to calculate many social network variables. Use the dynamic analysis from before, and calculate the following social network metrics across a user’s lifetime in Venmo (from 0 up to 12 months).

In [9]:
all_users = venmo.selectExpr("user1","user2","CAST( MONTHS_BETWEEN(datetime, FIRST_VALUE(datetime) OVER (PARTITION BY user1 ORDER BY datetime))+ 1 as INT) as month").limit(100000)
users = all_users.where('month <= 12')
users.createOrReplaceTempView('users')

In [10]:
%sql
create table default.network
as(
select user1,user2,CAST( MONTHS_BETWEEN(datetime, FIRST_VALUE(datetime) OVER (PARTITION BY user1 ORDER BY datetime))+ 1 as INT) as month
from default.venmo_nw);

-- drop table default.venmo_nw;

create table default.venmo_nw
as (
select * from default.network
where month <= 12)

In [11]:
a = [1,2,3,4,5,6,7,8,9,10,11,12]
  
schema = users.schema
for i in a:
  conn_string = f"""(
(SELECT distinct a.user1, b.user2, a.month, 2 as degree
FROM 
    default.venmo_nw a 
    INNER JOIN default.venmo_nw b ON a.user2 = b.user1 AND a.user1 != b.user2
WHERE a.month = {i}
and b.month = {i}
ORDER BY 1)
union
(SELECT distinct a.user2, b.user1, b.month, 2 as degree
FROM 
    default.venmo_nw a 
    INNER JOIN default.venmo_nw b ON a.user1 = b.user2 AND a.user2 != b.user1
WHERE a.month = {i}
and b.month = {i}
ORDER BY 1)
union
(select distinct *, 1 as degree
FROM
    default.venmo_nw
WHERE month = {i})
order by user1, degree, month
    )""" 
  if i==1:
    df = spark.sql(conn_string)
    dff = df
  else:
    df = spark.sql(conn_string)
    dff = dff.union(df)

connections = dff.drop_duplicates()
connections.createOrReplaceTempView("connections")

In [12]:
%sql
create table default.vm_network
as(
select * from connections)

- 1. Number of friends and number of friends of friends [very easy, 2pts]

In [14]:
%sql
drop table default.friends

In [15]:
%sql
create table
friends as (
select month, degree, user1, count(user2) as number_of_friends
from default.vm_network
group by month, degree, user1
order by month, degree, user1)

In [16]:
%sql
select user1, count(*) as number_of_friends
from default.vm_network
where month = 1
and degree = 1
group by user1
order by user1

user1,number_of_friends
2,1
3,5
4,2
10,2
11,1
12,1
13,1
16,1
19,1
28,1


In [17]:
%sql
select user1, count(*) as friendsOffriends
from default.vm_network
where month = 1
and degree = 2
group by user1
order by user1

user1,friendsOffriends
3,3
9,4
10,2
11,7
12,7
13,2
16,1
19,3
24,3
28,2


In [18]:
# %sql
# select month, degree, avg(number_of_friends) as avg_friends
# from (select user1, month, degree, count(*) as number_of_friends from default.vm_network group by user1, month, degree)
# group by month, degree
# order by month, degree

- 2.Clustering coefficient of a user's network [easy, 3 pts]. (Hint: the easiest way to calculate this is to program it yourselves. Alternatively, you can use “networkx” python package. The latter approach will slow down your script significantly).

In [20]:
%sql
select * from default.vm_network limit 10

user1,user2,month,degree
130,65392,1,2
173,152420,1,2
588,5063782,1,2
879,551709,1,2
1241,4906931,1,2
2033,136,1,1
2190,888755,1,1
2210,4911130,1,2
3243,122314,1,2
5272,1586294,1,1


In [21]:
%sql
drop table default.all_conn

In [22]:
%sql
create table default.all_conn as
(
select a.user1, a.user2 as friend, b.user2 as mutual_friends, a.month
from default.vm_network as a
left join default.vm_network as b
on a.user2 = b.user1
where a.degree = 1
order by user1, friend
)

In [23]:
from pyspark.sql.functions import array, collect_list, flatten, udf, desc, asc
from pyspark.sql.types import StringType, ArrayType, IntegerType, FloatType, MapType
month = 1

mutual_friends = collect_list("mutual_friends").alias("mutual_friends")
friends = collect_list("friend").alias("friends")
users_df = spark.sql('select user1, friend, mutual_friends from default.all_conn where month = %s'%month)
user_network = users_df.groupBy("user1","friend").agg(mutual_friends).orderBy(asc("user1"), asc('friend'))
user_network_coalesced = user_network.groupBy('user1').agg(mutual_friends,friends).orderBy(asc('user1'))

append_udf = udf(lambda x,y: {a:b for a,b in zip(x,y)}, MapType(IntegerType(),ArrayType(IntegerType())))
# append_udf = udf(lambda x,y: {a:b for a,b in zip(x,y)}, StringType())
all_conn = user_network_coalesced.withColumn('friends_dict', append_udf('friends','mutual_friends')).select('user1','friends_dict')

In [25]:
user_network_coalesced.show(5)

In [26]:
df = user_network_coalesced.toPandas()

In [27]:
df[['friends','mutual_friends']].head().apply(lambda x: coeff(x['friends'],x['mutual_friends']), axis = 1)

In [28]:
def coeff(x,y):
  '''
  formula for clustering coeff:  number of connections among friends/(k*(k-1)/2)
  '''
#   friends = x.keys()
  k = len(x)
  den = k*(k-1)/2
  if den == 0:
    den = 1
  num = []
  for friend, mutual in zip(x,y):
    x_conns = [a+friend for a in x if a in mutual and a+friend not in num]
    num = num+x_conns
  return (len(num)/den)
clustering_coeff_udf=udf(lambda x,y: coeff(x,y), FloatType())
clustering_coeff = user_network_coalesced.withColumn('ClusteringCoef',clustering_coeff_udf('friends','mutual_friends'))

In [29]:
clustering_coeff.createOrReplaceTempView('clustering_coeff')

In [30]:
%sql
select * from default.all_conn

- 3 Calculate the page rank of each user (hard, 5 pts). (Hint: First of all, you need to use GraphX to do this. Moreover, notice that page rank is a global social network metric. If you go ahead and calculate the page rank for each user at each of her lifetime points, you will soon realize it will be a dead end. Can you think of a smart way to do this?)