In [4]:
# Install PySpark
!pip install pyspark

# Import necessary libraries
from pyspark.sql import SparkSession
import pandas as pd
import random

# Step 1: Create the CSV file with random edges
edges = []
for _ in range(5000):
    u = random.randint(1, 100)
    v = random.randint(1, 100)
    if u < v:  # Ensure u < v to prevent duplicate edges
        edges.append((u, v))

# Create a DataFrame and save to CSV
edges_df = pd.DataFrame(edges, columns=['u', 'v'])
edges_df.to_csv('random_edges.csv', index=False, header=False)

# Step 2: Initialize Spark Session
spark = SparkSession.builder.appName("OTP Triangle Finder").getOrCreate()

# Step 3: Load the CSV file into an RDD
edges_rdd = spark.sparkContext.textFile("random_edges.csv").map(lambda line: tuple(map(int, line.split(','))))

# Step 4: Cache the edges RDD
edges_rdd.cache()  # Caching to optimize performance

# Step 5: Define Map function
def map_function(edge):
    u, v = edge
    # Get the partition number based on some logic (Here we use a simple partitioning based on node value)
    partition_u = (u % 4) + 1  # Example partitioning logic
    partition_v = (v % 4) + 1
    yield (partition_u, (u, v))

# Step 6: Map phase
mapped_edges = edges_rdd.flatMap(map_function)

# Step 7: Group edges by partition
grouped_edges = mapped_edges.groupByKey().mapValues(list)

# Step 8: Define Reduce function to collect triangles
def reduce_function(edges):
    triangles = []  # List to collect triangle sets
    edges_list = list(edges)
    for (u, v) in edges_list:
        # Find neighbors of u and v
        neighbors_u = set(x for x, y in edges_list if x != u and (u, x) in edges_list)
        neighbors_v = set(x for x, y in edges_list if x != v and (v, x) in edges_list)
        common_neighbors = neighbors_u.intersection(neighbors_v)

        # For each common neighbor, create a triangle
        for w in common_neighbors:
            triangles.append((u, v, w))  # Add the triangle (u, v, w)
    return triangles

# Step 9: Reduce phase to collect triangles
triangle_lists = grouped_edges.mapValues(reduce_function)

# Step 10: Collect results and flatten the triangle lists
all_triangles = triangle_lists.flatMap(lambda x: x[1]).collect()

# Step 11: Print the triangles found
print("Triangles found:")
for triangle in all_triangles:
    print(triangle)

# Step 12: Stop the Spark session
spark.stop()

Triangles found:
(47, 71, 91)
(47, 71, 87)
(47, 71, 95)
(31, 35, 83)
(31, 35, 75)
(31, 35, 71)
(31, 35, 83)
(31, 35, 75)
(31, 35, 71)
(35, 71, 79)
(11, 19, 55)
(27, 31, 83)
(27, 31, 35)
(27, 31, 71)
(71, 79, 91)
(71, 79, 95)
(15, 39, 59)
(79, 91, 95)
(3, 15, 59)
(3, 15, 47)
(3, 15, 71)
(43, 75, 91)
(43, 75, 95)
(43, 55, 75)
(43, 55, 67)
(7, 55, 67)
(23, 47, 59)
(23, 47, 75)
(11, 23, 55)
(11, 23, 47)
(11, 23, 79)
(15, 47, 59)
(15, 47, 71)
(35, 63, 83)
(35, 63, 67)
(35, 63, 79)
(35, 63, 71)
(15, 31, 71)
(15, 31, 39)
(23, 47, 59)
(23, 47, 75)
(71, 91, 95)
(55, 67, 75)
(43, 67, 91)
(43, 67, 75)
(23, 79, 83)
(31, 83, 87)
(3, 83, 91)
(35, 79, 83)
(7, 23, 55)
(7, 23, 47)
(3, 35, 83)
(3, 35, 63)
(3, 35, 79)
(3, 35, 71)
(67, 71, 91)
(43, 55, 75)
(43, 55, 67)
(79, 83, 91)
(47, 75, 91)
(47, 75, 95)
(47, 51, 75)
(75, 91, 95)
(3, 47, 59)
(3, 47, 91)
(3, 47, 71)
(35, 67, 75)
(35, 67, 71)
(47, 91, 95)
(63, 79, 91)
(63, 79, 83)
(3, 79, 91)
(3, 79, 83)
(43, 59, 67)
(11, 55, 67)
(11, 55, 79)
(27, 35, 83