In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
cd drive/MyDrive/Venmo/

/content/drive/MyDrive/Venmo


In [None]:
ls

 [0m[01;34mresult_new.parquet[0m/
 [01;34mresult.parquet[0m/
 Venmo_Emoji_Classification_Dictionary.csv
 VenmoSample.snappy.parquet
'Venmo Word Classification Dictonary BAX-423.xlsx'


In [None]:
# Initialize SparkSession with GraphFrames package
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("VenmoHomework") \
    .config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.2-s_2.12") \
    .getOrCreate()

In [None]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [None]:
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType, BooleanType
from functools import reduce
import matplotlib.pyplot as plt
from tqdm import tqdm

In [None]:
from IPython import display, html

  warn("The `IPython.html` package has been deprecated since IPython 4.0. "


In [None]:
# Read the Parquet file
df = spark.read.parquet(
    "./VenmoSample.snappy.parquet"
)

In [None]:
df.limit(5).toPandas().head()

Unnamed: 0,user1,user2,transaction_type,datetime,description,is_business,story_id
0,1218774,1528945,payment,2015-11-27 10:48:19,Uber,False,5657c473cd03c9af22cff874
1,5109483,4782303,payment,2015-06-17 11:37:04,Costco,False,5580f9702b64f70ab0114e94
2,4322148,3392963,payment,2015-06-19 07:05:31,Sweaty balls,False,55835ccb1a624b14ac62cef4
3,469894,1333620,charge,2016-06-03 23:34:13,🎥,False,5751b185cd03c9af224c0d17
4,2960727,3442373,payment,2016-05-29 23:23:42,⚡,False,574b178ecd03c9af22cf67f4


In [None]:
df.select('is_business').distinct().show()

+-----------+
|is_business|
+-----------+
|      false|
|       NULL|
+-----------+



In [None]:
df.select('transaction_type').distinct().show()

+----------------+
|transaction_type|
+----------------+
|          charge|
|         payment|
+----------------+



In [None]:
df.groupBy('is_business').count().show()

+-----------+-------+
|is_business|  count|
+-----------+-------+
|       NULL|     34|
|      false|7113103|
+-----------+-------+



In [None]:
#is_business columns: mostly false i.e., payments are not to merchants but some individuals. Dropping NULL values (as it can be either to a merchant or user/friend)
df_clean = df.filter(F.col("is_business").isNotNull()).select("user1", "user2").dropna()

In [None]:
#friends -> either pay or recieve money
payments_connections = df_clean.select("user1", "user2") \
    .union(df_clean.select(F.col("user2").alias("user1"), F.col("user1").alias("user2"))) \
    .dropDuplicates() \
    .withColumnRenamed("user1", "user") \
    .withColumnRenamed("user2", "friend")

In [None]:
# Cache because used multiple times --> faster runtime --> can use cached value without recomputing again
payments_connections.cache()

DataFrame[user: int, friend: int]

In [None]:
direct_friends_df = payments_connections.groupBy("user") \
    .agg(F.collect_set(F.col("friend")).alias("direct_friends"))

In [None]:
user_friend_pairs = direct_friends_df \
    .select("user", F.explode("direct_friends").alias("friend")) \
    .dropDuplicates()

user_friend_pairs.cache()

DataFrame[user: int, friend: int]

In [None]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [None]:
friend_of_friend = user_friend_pairs \
    .join(
        user_friend_pairs.select(F.col("user").alias("friend"), F.col("friend").alias("fof")),
        on="friend", how="inner"
    ) \
    .select("user", "fof") \
    .filter(F.col("user") != F.col("fof"))  # Remove self

In [None]:
direct_edges = payments_connections.select("user", F.col("friend").alias("fof"))
friend_of_friend_final = friend_of_friend.join(direct_edges, ["user", "fof"], "left_anti")

In [None]:
friend_of_friend_agg = friend_of_friend_final.groupBy("user") \
    .agg(F.collect_set("fof").alias("friends_of_friends"))

In [None]:
result = direct_friends_df \
    .join(friend_of_friend_agg, on="user", how="left") \
    .select("user", "direct_friends", "friends_of_friends")

result.show(truncate=False)

In [None]:
result.show(truncate=False)

+----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
result.write.parquet("result_new.parquet")

In [None]:
result = spark.read.parquet("result_new.parquet")

In [None]:
friends_count = result.withColumn("num_friends", F.size("direct_friends")) \
                           .withColumn("num_fofs", F.size("friends_of_friends"))

In [None]:
friends_count.show()

+----+--------------------+--------------------+-----------+--------+
|user|      direct_friends|  friends_of_friends|num_friends|num_fofs|
+----+--------------------+--------------------+-----------+--------+
|  10|[255, 13, 36523, ...|[189467, 126820, ...|          7|      45|
| 129|[2176924, 243198,...|[3367901, 235107,...|          3|      29|
| 239|            [693143]|[21837, 180400, 1...|          1|       5|
| 267|[152025, 1882292,...|[170964, 2914671,...|          4|      24|
| 293|           [7210780]|                NULL|          1|      -1|
| 312|  [4040424, 1991151]|           [6582876]|          2|       1|
| 361|[1672387, 2892309...|[2606588, 4627919...|          3|      15|
| 771|           [1647493]|[150025, 3558782,...|          1|       6|
| 794|[961485, 600806, ...|[1351855, 6346, 1...|          4|       9|
| 907|                [34]|[2048982, 934623,...|          1|       4|
|1006|           [1012983]|[904681, 509692, ...|          1|       4|
|1020|    [313880, 3

In [None]:
edges = result.select(
    F.col("user").alias("u"),
    F.explode("direct_friends").alias("v")
).distinct()

In [None]:
#friend relationships is symmetric
edges_symmetric = edges.union(edges.select(F.col("v").alias("u"), F.col("u").alias("v"))).distinct()

In [None]:
# Step 2: For each user, get friend pairs among their friends
friend_pairs = result.select(F.col("user"), F.explode("direct_friends").alias("f1")) \
    .join(result.select(F.col("user").alias("user_"), F.explode("direct_friends").alias("f2")),
          F.col("user") == F.col("user_")) \
    .filter(F.col("f1") < F.col("f2")) \
    .select(F.col("user"), F.col("f1"), F.col("f2"))

In [None]:
# Step 3: Check which friend pairs are actually connected
triangles = friend_pairs.join(
    edges_symmetric,
    (friend_pairs["f1"] == edges_symmetric["u"]) & (friend_pairs["f2"] == edges_symmetric["v"]),
    how="inner"
).groupBy("user").agg(F.count("*").alias("triangles"))

In [None]:
triangles.show(5)

+-------+---------+
|   user|triangles|
+-------+---------+
|1028166|        2|
| 804638|        3|
|1677468|        8|
|2664828|        7|
| 199855|       12|
+-------+---------+
only showing top 5 rows



In [None]:
# Step 4: Compute degree for each user
result_with_degree = result.withColumn("degree", F.size("direct_friends"))

In [None]:
# Step 5: Join triangles and compute clustering coefficient
result_joined = result_with_degree.join(triangles, on="user", how="left").fillna({"triangles": 0})

In [None]:
coeff_result = result_joined.withColumn("possible", (F.col("degree") * (F.col("degree") - 1)) / 2) \
    .withColumn("clustering_coefficient",
                F.when(F.col("possible") == 0, 0.0).otherwise((2 * F.col("triangles")) / F.col("possible")))

In [None]:
coeff_result.show()

+----+--------------------+--------------------+------+---------+--------+----------------------+
|user|      direct_friends|  friends_of_friends|degree|triangles|possible|clustering_coefficient|
+----+--------------------+--------------------+------+---------+--------+----------------------+
|  12|[135778, 221813, ...|[459250, 292151, ...|     9|        4|    36.0|    0.2222222222222222|
|  28|    [582796, 769787]|[4287347, 386429,...|     2|        0|     1.0|                   0.0|
|  31|               [192]|[4100542, 1397541...|     1|        0|     0.0|                   0.0|
|  34|[2048982, 934623,...|[178013, 7428923,...|     5|        0|    10.0|                   0.0|
| 108|            [717814]|[2582444, 7816576...|     1|        0|     0.0|                   0.0|
| 126|[1009401, 1405397...|[631426, 79940, 9...|     5|        0|    10.0|                   0.0|
| 133|[3228, 318572, 71...|[317884, 80020, 3...|     3|        2|     3.0|    1.3333333333333333|
| 159|    [1051823, 

In [None]:
coeff_result.write.parquet("coeff_result.parquet")

In [None]:
!unzip "/content/coeff_result.parquet-20250531T053302Z-1-001.zip" -d "/content/coeff_result/"


Archive:  /content/coeff_result.parquet-20250531T053302Z-1-001.zip
  inflating: /content/coeff_result/coeff_result.parquet/_SUCCESS  
  inflating: /content/coeff_result/coeff_result.parquet/.part-00005-5157d41a-4566-41a8-8100-9006ca92e029-c000.snappy.parquet.crc  
  inflating: /content/coeff_result/coeff_result.parquet/._SUCCESS.crc  
  inflating: /content/coeff_result/coeff_result.parquet/.part-00001-5157d41a-4566-41a8-8100-9006ca92e029-c000.snappy.parquet.crc  
  inflating: /content/coeff_result/coeff_result.parquet/.part-00003-5157d41a-4566-41a8-8100-9006ca92e029-c000.snappy.parquet.crc  
  inflating: /content/coeff_result/coeff_result.parquet/.part-00000-5157d41a-4566-41a8-8100-9006ca92e029-c000.snappy.parquet.crc  
  inflating: /content/coeff_result/coeff_result.parquet/.part-00002-5157d41a-4566-41a8-8100-9006ca92e029-c000.snappy.parquet.crc  
  inflating: /content/coeff_result/coeff_result.parquet/.part-00004-5157d41a-4566-41a8-8100-9006ca92e029-c000.snappy.parquet.crc  
  inflat

In [None]:
# Step 1: Read coefficient result (already done)
coeff_result = spark.read.parquet("/content/coeff_result/coeff_result.parquet")

# Step 2: Select top 500 users by degree and clustering coefficient
top_by_degree = coeff_result.orderBy(F.col("degree").desc()).limit(500).select(F.col("user").alias("id"))
top_by_cluster = coeff_result.orderBy(F.col("clustering_coefficient").desc()).limit(500).select(F.col("user").alias("id"))

# Step 3: Union and deduplicate
sampled_users = top_by_degree.union(top_by_cluster).distinct()

# Step 4: Read full edge list
df = spark.read.parquet("./VenmoSample.snappy.parquet")

# Optional: Filter for individual transactions
df_clean = df.filter(F.col("is_business") == False).select("user1", "user2").dropna()

# Step 5: Filter edges where both src and dst are in sampled users
edges = df_clean.join(sampled_users.withColumnRenamed("id", "user1"), on="user1") \
                .join(sampled_users.withColumnRenamed("id", "user2"), on="user2") \
                .select(F.col("user1").alias("src"), F.col("user2").alias("dst"))

# Step 6: Create vertices from edge list
vertices = edges.select("src").union(edges.select("dst")).distinct().withColumnRenamed("src", "id")

# Step 7: Build GraphFrame
from graphframes import GraphFrame
g = GraphFrame(vertices, edges)

# Step 8: Compute PageRank
pagerank_result = g.pageRank(resetProbability=0.15, maxIter=5)

# Step 9: View results
pagerank = pagerank_result.vertices.select("id", "pagerank")
pagerank.orderBy("pagerank", ascending=False).show(20)



+-------+------------------+
|     id|          pagerank|
+-------+------------------+
| 277650| 4.501162579016981|
| 986090| 4.483048029976847|
| 871852|  4.19654205898615|
| 696697|3.8393950975604847|
| 206476| 3.825739419561554|
| 294580|3.6672665956659447|
| 267842| 3.319669082681493|
| 256162| 3.134575158813574|
|1622649|3.1345751588135737|
| 989733| 3.101263554534139|
|4249204|2.9635776622329435|
|2820628|2.9305638235176703|
| 440727|2.7293011271132834|
| 367855|    2.672819728976|
| 872748|2.6168655613021796|
|2110559|2.6042975851786747|
| 126153| 2.580928629829397|
|9318612| 2.501457226574429|
|5579336| 2.380934460203776|
|2069464| 2.365534349234172|
+-------+------------------+
only showing top 20 rows



In [None]:
pagerank.write.mode("overwrite").parquet("pagerank_scores.parquet")


In [None]:
pagerank.write.mode("overwrite").parquet("MyDrive/pagerank/pagerank_scores.parquet")
# Save as CSV (if you need to open it in Excel etc.)
pagerank.write.mode("overwrite").option("header", True).csv("pagerank_scores.csv")


In [None]:
#below code is for full data

In [None]:
#PageRank

In [None]:
from graphframes import GraphFrame

In [None]:
# Create vertices (unique users)
vertices = df.select(F.col("user1").alias("id")).union(df.select(F.col("user2").alias("id"))).distinct()

# Create edges (user1 → user2 = a directed transaction)
edges = df.select(F.col("user1").alias("src"), F.col("user2").alias("dst"))

In [None]:
#memory error
# g = GraphFrame(vertices, edges)
# result = g.pageRank(resetProbability=0.15, maxIter=5)

In [None]:
# Filter users with at least 3 connections
active_users = edges.groupBy("src").count().filter("count >= 3").select("src")

# Join on `src`, disambiguating with aliases
filtered_edges = edges.alias("e").join(
    active_users.alias("a"), F.col("e.src") == F.col("a.src")
).select("e.src", "e.dst")

In [None]:
# Create unique list of users for vertices
vertices = edges.select("src").union(edges.select("dst")).distinct().withColumnRenamed("src", "id")

# Create GraphFrame
g = GraphFrame(vertices, edges)



In [None]:
pagerank_result = g.pageRank(resetProbability=0.15, maxIter=3)
pagerank = pagerank_result.vertices.select("id", "pagerank")

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/pyspark/errors/exceptions/captured.py", line 179, in deco
    return f(*a, **kw)
           ^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <exception str() failed>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/socket.py", line 718, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
ConnectionResetError: [Errno 104] Connection reset by peer

During handling of the above exception, another exception occurred:

Traceback (most recent cal

Py4JError: org.apache.spark does not exist in the JVM