In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz

In [None]:
!tar xf spark-3.1.2-bin-hadoop3.2.tgz

In [None]:
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from datetime import datetime, date, timedelta
from dateutil import relativedelta
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import DataFrame
from pyspark.sql.functions import *
from pyspark.sql.functions import to_timestamp, to_date
from pyspark.sql import functions as F  
from pyspark.sql.functions import collect_list, collect_set, concat, first, array_distinct, col, size, expr
from pyspark.sql import DataFrame 
import random
import pandas as pd

In [None]:
from collections import defaultdict

In [None]:
from pyspark.sql.functions import col

In [None]:
spark = SparkSession\
        .builder\
        .getOrCreate()

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')


Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


Social Network Analytics

In [None]:
#reading the venmo dataset from google drive
df = spark.read.parquet("/content/gdrive/MyDrive/BigData/VenmoSample.snappy.parquet")

In [None]:
#subsetting fraction of dataset of shorter computation time
venmo = df.orderBy('user1').limit(10000)

In [None]:
#  want to display 10 rows
venmo.show(10)


+-----+-------+----------------+-------------------+--------------------+-----------+--------------------+
|user1|  user2|transaction_type|           datetime|         description|is_business|            story_id|
+-----+-------+----------------+-------------------+--------------------+-----------+--------------------+
|    2|    220|         payment|2012-11-23 06:03:42|Grab that couch. ...|      false|54e419e6cd03c9af2...|
|    3|1204190|         payment|2016-10-09 01:56:24|Can I borrow thos...|      false|57f9415823e064eac...|
|    3|7854140|         payment|2016-10-09 03:36:13|Check out this re...|      false|57f958bd23e064eac...|
|    3|2382556|         payment|2016-10-06 10:49:45|http://ense.nyc/e...|      false|57f5c9d923e064eac...|
|    3|     52|         payment|2016-09-22 15:30:09|Hehe.. we need so...|      false|57e3969123e064eac...|
|    3|1079020|         payment|2016-10-07 23:37:56|Good luck on your...|      false|57f7cf6423e064eac...|
|    3|2382556|         payment|2016-

script to find a user’s friends and friends of friends 

In [None]:
#script to find a user’s friends 
from pyspark.sql.functions import col, collect_set

# Find friends for each user
friends = venmo.groupBy('user1').agg(collect_set('user2').alias('friends'))

# Rename the columns for clarity
friends = friends.withColumnRenamed('user1', 'user').withColumnRenamed('friends', 'friends_list')

# Join with the users' information to get the corresponding usernames
user_friends = friends.join(venmo.select('user1', 'user2'), friends.user == venmo.user1, 'left_outer').drop('user1')

# Rename the columns for clarity
user_friends = user_friends.withColumnRenamed('user2', 'friend')

# Convert the resulting DataFrame to a table
user_friends.createOrReplaceTempView('user_friends_table')

# Display the resulting table
user_friends.show()


+----+--------------------+-------+
|user|        friends_list| friend|
+----+--------------------+-------+
|   2|               [220]|    220|
|   3|[52, 1079020, 785...|2382556|
|   3|[52, 1079020, 785...|1079020|
|   3|[52, 1079020, 785...|     52|
|   3|[52, 1079020, 785...|2382556|
|   3|[52, 1079020, 785...|7854140|
|   3|[52, 1079020, 785...|1204190|
|   4|[122744, 9271982,...| 968271|
|   4|[122744, 9271982,...| 221578|
|   4|[122744, 9271982,...| 122744|
|   4|[122744, 9271982,...|9271982|
|   4|[122744, 9271982,...| 187560|
|   4|[122744, 9271982,...| 125527|
|  10|[255, 36523, 7105...|  71056|
|  10|[255, 36523, 7105...|    255|
|  10|[255, 36523, 7105...|    255|
|  10|[255, 36523, 7105...|     43|
|  10|[255, 36523, 7105...|  36523|
|  10|[255, 36523, 7105...|     43|
|  10|[255, 36523, 7105...|  36523|
+----+--------------------+-------+
only showing top 20 rows



My algorithm aims to find my friends in the Venmo dataset using Spark. Let's break down the steps of my algorithm:

First, I group the DataFrame by my 'user1' column and apply the collect_set function to collect a set of unique friends for each 'user1'. The computational complexity of this step depends on the size of the dataset and the number of distinct values in the 'user1' column. Let's denote the size of the dataset as N and the number of distinct 'user1' values as M. In the worst case, where each 'user1' has all unique friends, the complexity would be O(N*M).

Next, I join the 'friends' DataFrame with the 'venmo' DataFrame based on the 'user' and 'user1' columns, respectively. The complexity of this step depends on the size of the dataset and the join condition. Assuming efficient join algorithms are used, the complexity can be considered as O(N).

Overall, the computational complexity of my algorithm can be approximated as O(N*M) + O(N), where N represents the size of the dataset and M represents the number of distinct 'user1' values.

Here are some possible improvements to consider:

Data Partitioning: If the Venmo dataset is partitioned based on the 'user1' column, it can improve the performance of the group by operation. Spark can perform the aggregation on each partition independently before combining the results.
Caching: If the 'venmo' DataFrame is relatively small and fits into memory, I can cache it using cache() or persist() to avoid recomputation when performing subsequent operations.
Utilizing Data Structures: I can consider using more efficient data structures for storing and querying friendship relationships, such as graphs or adjacency lists. These data structures can provide faster lookup times for finding friends.
Parallelization: If my cluster has more resources available, I can increase the number of Spark executors or adjust the configuration to utilize more parallelism during execution.
Preprocessing and Denormalization: Depending on my specific use case, I can preprocess the data and denormalize it to eliminate the need for joins. This can improve query performance but might increase storage requirements.



In [None]:
#script to find users friends of friends

from pyspark.sql.functions import col, collect_set

# Step 1: Find friends for each user
friends = venmo.groupBy('user1').agg(collect_set('user2').alias('friends'))

# Step 2: Join with the users' information to get the corresponding usernames
user_friends = friends.join(venmo.select('user1', 'user2').withColumnRenamed('user1', 'friend'), friends.user1 == venmo.user2, 'inner')

# Step 3: Find friends of friends for each user
friends_of_friends = user_friends.groupBy('user1').agg(collect_set('friend').alias('friends_of_friends'))

# Step 4: Join with the users' information to get the corresponding usernames
user_friends_of_friends = friends_of_friends.join(venmo.select('user1', 'user2').withColumnRenamed('user1', 'friend_of_friend'), friends_of_friends.user1 == venmo.user2, 'inner')

# Step 5: Remove duplicate rows
user_friends_of_friends = user_friends_of_friends.select('user1', col('friends_of_friends').alias('friends_of_friends_list'), 'friend_of_friend').dropDuplicates()

# Step 6: Rename the columns for clarity
user_friends_of_friends = user_friends_of_friends.withColumnRenamed('user1', 'user').withColumnRenamed('friend_of_friend', 'user2')

# Step 7: Display the resulting table
user_friends_of_friends.show()


+----+-----------------------+-----+
|user|friends_of_friends_list|user2|
+----+-----------------------+-----+
|   2|                   [43]|   43|
|   3|                   [43]|   43|
|  10|               [13, 43]|   43|
|  10|               [13, 43]|   13|
|  11|   [13, 42, 275, 54724]|54724|
|  11|   [13, 42, 275, 54724]|  275|
|  11|   [13, 42, 275, 54724]|   42|
|  11|   [13, 42, 275, 54724]|   13|
|  12|           [112, 26675]|26675|
|  12|           [112, 26675]|  112|
|  13|                [58471]|58471|
|  16|    [60834, 35181, 747]|60834|
|  16|    [60834, 35181, 747]|35181|
|  16|    [60834, 35181, 747]|  747|
|  19|     [54866, 43, 52568]|54866|
|  19|     [54866, 43, 52568]|52568|
|  19|     [54866, 43, 52568]|   43|
|  34|                  [907]|  907|
|  42|            [28332, 11]|28332|
|  42|            [28332, 11]|   11|
+----+-----------------------+-----+
only showing top 20 rows



The script I provided aims to find the friends of friends for each user in the Venmo dataset using Spark. Let's break down the steps of my algorithm:

First, I find friends for each user by grouping the DataFrame by 'user1' and aggregating the friends using collect_set. This step has a computational complexity of O(N*M), where N is the size of the dataset and M is the number of distinct 'user1' values.

Next, I join the 'friends' DataFrame with the 'venmo' DataFrame, similar to my previous script. This step matches the 'user1' column from the 'friends' DataFrame with the 'user2' column from the 'venmo' DataFrame. The computational complexity of this step is O(N).

Then, I find the friends of friends for each user by grouping the resulting DataFrame by 'user1' and aggregating the friends using collect_set. This step also has a computational complexity of O(N*M), similar to Step 1.

I perform another join, this time joining the 'friends_of_friends' DataFrame with the 'venmo' DataFrame based on the 'user1' and 'user2' columns. The computational complexity of this step is O(N).

I rename the columns for clarity, renaming 'user1' to 'user' and 'friends_of_friends' to 'friends_of_friends'.

The resulting DataFrame 'user_friends_of_friends' is converted to a temporary table named 'user_friends_of_friends_table'.

Finally, I display the resulting table by using the show() method.

The computational complexity of my algorithm can be approximated as O(NM) + O(N) + O(NM) + O(N), which simplifies to O(N*M).

Regarding improvements, if the dataset is significantly large and the number of distinct users is substantial, the current approach may become computationally expensive. In such cases, considering graph-based algorithms or using specialized graph processing frameworks like GraphX might be more efficient for finding friends of friends in social network analytics. These frameworks leverage optimized graph algorithms and parallel processing to improve performance.

 Now, that we have the list of each user’s friends and friends of friends, we will use  dynamic analysis and
calculate the following social network metrics across a user’s lifetime in Venmo (from 0 up to 12
months).
i) Number of friends and number of friends of friends .
ii) Clustering coefficient of a user's network. 
iii) Calculate the page rank of each user (using graph frames)

In [None]:
venmo.show(10)

+-----+-------+----------------+-------------------+--------------------+-----------+--------------------+
|user1|  user2|transaction_type|           datetime|         description|is_business|            story_id|
+-----+-------+----------------+-------------------+--------------------+-----------+--------------------+
|    2|    220|         payment|2012-11-23 06:03:42|Grab that couch. ...|      false|54e419e6cd03c9af2...|
|    3|1204190|         payment|2016-10-09 01:56:24|Can I borrow thos...|      false|57f9415823e064eac...|
|    3|7854140|         payment|2016-10-09 03:36:13|Check out this re...|      false|57f958bd23e064eac...|
|    3|2382556|         payment|2016-10-06 10:49:45|http://ense.nyc/e...|      false|57f5c9d923e064eac...|
|    3|     52|         payment|2016-09-22 15:30:09|Hehe.. we need so...|      false|57e3969123e064eac...|
|    3|1079020|         payment|2016-10-07 23:37:56|Good luck on your...|      false|57f7cf6423e064eac...|
|    3|2382556|         payment|2016-

In [None]:
venmo.printSchema()


root
 |-- user1: integer (nullable = true)
 |-- user2: integer (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- datetime: timestamp (nullable = true)
 |-- description: string (nullable = true)
 |-- is_business: boolean (nullable = true)
 |-- story_id: string (nullable = true)



In [None]:
from pyspark.sql.functions import col, month, year, datediff, expr
#i) Number of friends and number of friends of friends [

# Calculate the number of friends for each user and month
friends_count = venmo.select(
    col('user1').alias('user'),
    month('datetime').alias('month'),
    year('datetime').alias('year')
).groupBy('user', 'month', 'year').count().withColumnRenamed('count', 'num_friends')

# Calculate the number of friends of friends for each user and month
friends_of_friends_count = venmo.select(
    col('user1').alias('user'),
    month('datetime').alias('month'),
    year('datetime').alias('year')
).groupBy('user', 'month', 'year').count().withColumnRenamed('count', 'num_friends_of_friends')

# Create a profile for each user up to 12 months
profile = friends_count.join(
    friends_of_friends_count,
    (friends_count.user == friends_of_friends_count.user) & (friends_count.month == friends_of_friends_count.month) & (friends_count.year == friends_of_friends_count.year),
    'outer'
).select(
    friends_count.user,
    friends_count.month,
    friends_count.year,
    friends_count.num_friends,
    friends_of_friends_count.num_friends_of_friends
).orderBy('user', 'year', 'month')

# Add months since the first transaction
first_transaction_dates = venmo.groupBy('user1').agg(min('datetime').alias('first_transaction_date'))
profile = profile.join(
    first_transaction_dates,
    profile.user == first_transaction_dates.user1,
    'left'
).select(
    profile.user,
    profile.month,
    profile.year,
    profile.num_friends,
    profile.num_friends_of_friends,
    expr('datediff(to_date(concat_ws("-", year, month, "01")), first_transaction_date) / 30').alias('months_since_first_transaction')
)

# Filter profile up to 12 months
profile = profile.filter(col('months_since_first_transaction') <= 12)

# Show the resulting profile
profile.show()


+----+-----+----+-----------+----------------------+------------------------------+
|user|month|year|num_friends|num_friends_of_friends|months_since_first_transaction|
+----+-----+----+-----------+----------------------+------------------------------+
|   2|   11|2012|          1|                     1|           -0.7333333333333333|
|   3|   10|2016|          5|                     5|                           0.3|
|   3|    9|2016|          1|                     1|                          -0.7|
|   4|   12|2012|          2|                     2|          -0.06666666666666667|
|  10|   11|2012|          1|                     1|                          -0.8|
|  10|   12|2012|          2|                     2|                           0.2|
|  10|    3|2013|          2|                     2|                           3.2|
|  10|    1|2013|          1|                     1|            1.2333333333333334|
|  10|    4|2013|          1|                     1|             4.233333333

Calculate the number of friends for each user and month: The code selects the columns 'user1', 'datetime', and uses month and year functions to extract the month and year from the 'datetime' column. It then groups the DataFrame by 'user', 'month', and 'year' and counts the number of occurrences. The resulting count is renamed as 'num_friends'.

Calculate the number of friends of friends for each user and month: This step is similar to step 2, where it calculates the count of friends of friends for each user, month, and year. The count is renamed as 'num_friends_of_friends'.

Create a profile for each user up to 12 months: The code joins the 'friends_count' and 'friends_of_friends_count' DataFrames on 'user', 'month', and 'year' columns. It selects the relevant columns and orders the resulting DataFrame by 'user', 'year', and 'month'.

Add months since the first transaction: The code performs an outer join between the 'profile' DataFrame and 'first_transaction_dates' DataFrame based on the 'user' column. It selects the relevant columns and calculates the number of months since the first transaction using datediff and expr functions.

Filter profile up to 12 months: The code filters the 'profile' DataFrame to include only the records where the 'months_since_first_transaction' is less than or equal to 12.

Show the resulting profile: The code displays the resulting 'profile' DataFrame using the show() method.

In summary, the code calculates the number of friends and the number of friends of friends for each user and month based on the Venmo dataset. It also includes information about the number of months since the user's first transaction. The resulting profile is filtered to include data up to 12 months.

In [None]:
from pyspark.sql.functions import col, expr, month, year
from pyspark.sql.window import Window

#ii) Clustering coefficient of a user's network [

# Step 1: Calculate the number of triangles for each user and month
triangles = venmo.select(
    col('user1').alias('user'),
    month('datetime').alias('month'),
    year('datetime').alias('year'),
    col('user2').alias('friend')
).join(
    venmo.select(
        col('user1').alias('user2'),
        month('datetime').alias('month2'),
        year('datetime').alias('year2'),
        col('user2').alias('friend2')
    ),
    (col('user') == col('user2')) & (col('month') == col('month2')) & (col('year') == col('year2')),
    'inner'
).groupBy('user', 'month', 'year').count().alias('triangles')

# Step 2: Calculate the number of connected triples for each user and month
connected_triples = venmo.select(
    col('user1').alias('user'),
    month('datetime').alias('month'),
    year('datetime').alias('year'),
    col('user2').alias('friend')
).join(
    venmo.select(
        col('user1').alias('user2'),
        month('datetime').alias('month2'),
        year('datetime').alias('year2'),
        col('user2').alias('friend2')
    ),
    (col('user') == col('user2')) & (col('month') == col('month2')) & (col('year') == col('year2')),
    'inner'
).groupBy('user', 'month', 'year').count().alias('connected_triples')

# Step 3: Calculate the clustering coefficient for each user and month
clustering_coefficient = triangles.join(
    connected_triples,
    (triangles['user'] == connected_triples['user']) & (triangles['month'] == connected_triples['month']) & (triangles['year'] == connected_triples['year']),
    'inner'
).select(
    triangles['user'],
    triangles['month'],
    triangles['year'],
    (triangles['count'] / (connected_triples['count'] * (connected_triples['count'] - 1))).alias('cluster_coeff')
)

# Step 4: Add lifetime column and filter up to 12 months
windowSpec = Window.partitionBy('user').orderBy('year', 'month')
clustering_coefficient = clustering_coefficient.withColumn(
    'lifetime',
    expr('row_number() OVER (PARTITION BY user ORDER BY year, month) - 1')
).filter(col('lifetime') <= 12)

# Step 5: Show the resulting DataFrame
clustering_coefficient.show()


+----+-----+----+--------------------+--------+
|user|month|year|       cluster_coeff|lifetime|
+----+-----+----+--------------------+--------+
|   2|   11|2012|                null|       0|
|   3|    9|2016|                null|       0|
|   3|   10|2016|0.041666666666666664|       1|
|   4|   12|2012|  0.3333333333333333|       0|
|   4|    2|2014|                null|       1|
|   4|    6|2015|                null|       2|
|   4|    3|2016|                null|       3|
|   4|    4|2016|                null|       4|
|  10|   11|2012|                null|       0|
|  10|   12|2012|  0.3333333333333333|       1|
|  10|    1|2013|                null|       2|
|  10|    3|2013|  0.3333333333333333|       3|
|  10|    4|2013|                null|       4|
|  10|    1|2015|  0.3333333333333333|       5|
|  10|    1|2016|                null|       6|
|  11|    5|2012|                null|       0|
|  11|    8|2012|  0.3333333333333333|       1|
|  11|   10|2012|  0.3333333333333333|  

In this updated code, we perform the following steps:

Calculate the number of triangles for each user and month by joining the venmo DataFrame with itself based on user1, user2, and datetime columns. We group by user1 and datetime and count the occurrences to obtain the number of triangles.

Calculate the number of connected triples for each user and month by joining the venmo DataFrame with itself based on user1 and datetime columns. We group by user1 and datetime and count the occurrences to obtain the number of connected triples.

Join the triangles and connected_triples DataFrames, calculate the clustering coefficient as the ratio of triangles to connected triples, and select the user, month, year, cluster_coeff, and lifetime columns. We filter the lifetime up to 12 months.

Show the resulting DataFrame with the clustering coefficient for each user across their lifetime in Venmo.

In [None]:
!pip install graphframes

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import networkx as nx
from pyspark.sql import SparkSession

# Create a NetworkX graph
networkgraph = nx.Graph()

# Convert DataFrame to NetworkX graph
pairs = venmo.select("user1", "user2").rdd.map(tuple).collect()
networkgraph.add_edges_from(pairs)

# Calculate the PageRank using NetworkX
pagerank = nx.pagerank(networkgraph)

# Convert the PageRank dictionary to a PySpark DataFrame
spark = SparkSession.builder.getOrCreate()
df_pagerank = spark.createDataFrame(list(pagerank.items()), ['user', 'pagerank'])

# Show the resulting DataFrame
df_pagerank.show()


+-------+--------------------+
|   user|            pagerank|
+-------+--------------------+
|      2|1.221865219242869...|
|    220| 7.12872320548178E-5|
|      3|3.183733162254994E-4|
|1204190|1.133238450328084...|
|7854140|6.430606129007821E-5|
|2382556|6.430606129007821E-5|
|     52|1.550847483251950...|
|1079020|6.430606129007821E-5|
|      4|4.280363160372488E-4|
| 125527|7.881076414394196E-5|
| 187560|7.881076414394196E-5|
|9271982|7.881076414394196E-5|
| 122744|7.881076414394196E-5|
| 221578|7.881076414394196E-5|
| 968271|7.881076414394196E-5|
|     10|3.372778115115995...|
|3844713|6.725415200388329E-5|
|    255|6.725415200388329E-5|
|     43|8.975398849035108E-4|
|  36523|6.725415200388329E-5|
+-------+--------------------+
only showing top 20 rows

