In [1]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
from pyspark.sql import SparkSession

spark=(
    SparkSession.builder.config("neo4j.url", "neo4j://localhost:7687")
    .appName("BDA Assignment 3")
    .config("spark.driver.memory", "15g")
    .config("spark.jars.packages","org.neo4j:neo4j-connector-apache-spark_2.12:5.3.2_for_spark_3")
    .config("neo4j.authentication.basic.username", "neo4j")
    .config("neo4j.authentication.basic.password", "12345678")
    .config("neo4j.database", "assignmentbda3")
    .getOrCreate()
)

node_df=(
    spark.read.format("org.neo4j.spark.DataSource")
    .option("labels","Paper")
    .load()
)
query = """
MATCH (p1:Paper)-[:cite]->(p2:Paper)
RETURN p1.id AS citing_paper, p2.id AS cited_paper
"""

# Run the query on Neo4j and load the result into a DataFrame
edge_df = (
    spark.read.format("org.neo4j.spark.DataSource")
    .option("query", query)
    .load()
)

24/11/15 02:25:02 WARN Utils: Your hostname, Alice-In-Chains.local resolves to a loopback address: 127.0.0.1; using 192.168.48.70 instead (on interface en0)
24/11/15 02:25:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/Users/arnavsingh/opt/anaconda3/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/arnavsingh/.ivy2/cache
The jars for the packages stored in: /Users/arnavsingh/.ivy2/jars
org.neo4j#neo4j-connector-apache-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f7f789fe-1a11-454c-87f7-42634cb5b0ee;1.0
	confs: [default]
	found org.neo4j#neo4j-connector-apache-spark_2.12;5.3.2_for_spark_3 in central
	found org.neo4j#neo4j-connector-apache-spark_2.12_common;5.3.2_for_spark_3 in central
	found org.neo4j#neo4j-cypher-dsl;2022.11.0 in central
	found org.apiguardian#apiguardian-api;1.1.2 in local-m2-cache
	found org.neo4j.driver#neo4j-java-driver;4.4.18 in central
	found org.reactivestreams#reactive-streams;1.0.4 in central
:: resolution report :: resolve 181ms :: artifacts dl 10ms
	:: modules in use:
	org.apiguardian#apiguardian-api;1.1.2 from local-m2-cache in [default]
	org.neo4j#neo4j-connector-apache-spark_2.12;5.3.2_for_spark_3 from central in [default]
	org.neo4j#neo4j-connector-apache-spark_2

In [3]:
edges = edge_df.rdd.map(lambda row: (row['citing_paper'], row['cited_paper'])).collect()

                                                                                

In [4]:
import networkx as nx

In [5]:
G = nx.DiGraph()
G.add_edges_from(edges)

In [6]:
query_nodes = ["2982615777", "1556418098"]
c_values = [0.7, 0.8, 0.9]

# Initialize a list to store the results
results = []

In [10]:
from collections import defaultdict, Counter
from pyspark.sql.types import StructType, StructField, StringType, FloatType, Row
import gc

def get_local_neighborhood(G, node, max_depth=10):
    """Get local neighborhood of a node up to max_depth"""
    neighborhood = {node}
    current_level = {node}
    
    for _ in range(max_depth):
        next_level = set()
        for n in current_level:
            # Get both predecessors and successors
            neighbors = set(G.predecessors(n)) | set(G.successors(n))
            next_level.update(neighbors)
        current_level = next_level - neighborhood
        neighborhood.update(next_level)
        if not current_level:
            break
    
    return neighborhood

def local_simrank_score(G, node1, node2, c=0.8, max_depth=10):
    """Compute SimRank score based only on local structure"""
    if node1 == node2:
        return 1.0
        
    # Get immediate predecessors
    pred1 = set(G.predecessors(node1))
    pred2 = set(G.predecessors(node2))
    
    # Get immediate successors
    succ1 = set(G.successors(node1))
    succ2 = set(G.successors(node2))
    
    # Calculate similarity based on common neighbors
    pred_similarity = len(pred1 & pred2) / (len(pred1) * len(pred2))**0.5 if pred1 and pred2 else 0
    succ_similarity = len(succ1 & succ2) / (len(succ1) * len(succ2))**0.5 if succ1 and succ2 else 0
    
    return c * (pred_similarity + succ_similarity) / 2

def find_similar_nodes_local(G, query_node, c=0.8, top_k=5, max_depth=10):
    """Find similar nodes using only local neighborhood analysis"""
    try:
        # Get local neighborhood
        local_nodes = get_local_neighborhood(G, query_node, max_depth)
        
        # Remove the query node itself
        local_nodes.remove(query_node)
        
        # Calculate similarities only for local neighborhood
        similarities = []
        for other_node in local_nodes:
            sim = local_simrank_score(G, query_node, other_node, c, max_depth)
            if sim > 0:  # Only keep non-zero similarities
                similarities.append((other_node, sim))
        
        # Get top-k results
        similarities.sort(key=lambda x: x[1], reverse=True)
        return similarities[:top_k]
    
    except Exception as e:
        print(f"Error processing node {query_node}: {str(e)}")
        return []

def process_nodes_in_chunks(G, query_nodes, c_values, top_k=5, chunk_size=10):
    """Process nodes in small chunks to manage memory"""
    schema = StructType([
        StructField("query_node", StringType(), False),
        StructField("similar_node", StringType(), False),
        StructField("similarity", FloatType(), False),
        StructField("c_value", FloatType(), False)
    ])
    
    all_results = []
    
    try:
        for c in c_values:
            print(f"\nProcessing with c = {c}")
            
            for i in range(0, len(query_nodes), chunk_size):
                chunk = query_nodes[i:i + chunk_size]
                chunk_results = []
                
                for node in chunk:
                    print(f"Processing node {node}")
                    similar_nodes = find_similar_nodes_local(G, node, c, top_k)
                    
                    for similar_node, similarity in similar_nodes:
                        chunk_results.append(Row(
                            query_node=node,
                            similar_node=similar_node,
                            similarity=float(similarity),
                            c_value=c
                        ))
                
                # Convert chunk results to DataFrame
                if chunk_results:
                    chunk_df = spark.createDataFrame(chunk_results, schema)
                    if not all_results:
                        all_results = chunk_df
                    else:
                        all_results = all_results.unionAll(chunk_df)
                
                # Force computation and cache
                if all_results:
                    all_results = all_results.cache()
                    all_results.count()
                
                # Clear memory
                gc.collect()
    
    except Exception as e:
        print(f"Error in chunk processing: {str(e)}")
        if all_results:
            return all_results
        return spark.createDataFrame([], schema)
    
    return all_results

# Usage
try:
    # Process with very conservative parameters
    final_df = process_nodes_in_chunks(
        G=G,
        query_nodes=query_nodes,
        c_values=c_values,
        top_k=5,
        chunk_size=1  # Process one node at a time
    )
    
    # Show results sorted by similarity
    final_df = final_df.orderBy(
        ["query_node", "c_value", "similarity"], 
        ascending=[True, True, False]
    )
    final_df.show(truncate=False)

except Exception as e:
    print(f"Global error: {str(e)}")


Processing with c = 0.7
Processing node 2982615777
Processing node 1556418098

Processing with c = 0.8
Processing node 2982615777
Processing node 1556418098

Processing with c = 0.9
Processing node 2982615777
Processing node 1556418098
+----------+------------+-----------+-------+
|query_node|similar_node|similarity |c_value|
+----------+------------+-----------+-------+
|1556418098|1606626619  |0.2738696  |0.7    |
|1556418098|2150185515  |0.24748737 |0.7    |
|1556418098|1979504468  |0.24748737 |0.7    |
|1556418098|2051613311  |0.24748737 |0.7    |
|1556418098|2145343602  |0.24748737 |0.7    |
|1556418098|1606626619  |0.31299385 |0.8    |
|1556418098|1979504468  |0.28284273 |0.8    |
|1556418098|2150185515  |0.28284273 |0.8    |
|1556418098|2051613311  |0.28284273 |0.8    |
|1556418098|2145343602  |0.28284273 |0.8    |
|1556418098|1606626619  |0.35211807 |0.9    |
|1556418098|2145343602  |0.31819806 |0.9    |
|1556418098|1979504468  |0.31819806 |0.9    |
|1556418098|2051613311  |0.