In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col, size, array_intersect, array_union, lit
from graphframes import GraphFrame

**Initialize Spark Session**

In [3]:
spark = SparkSession.builder \
    .appName("ProteinGraph") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

**Load Data**

In [6]:
file_path = "data/test.tsv"  # Replace with your file path
df = spark.read.csv(file_path, sep="\t", header=True)

print("Preview of raw data:")
df.show(5)

print(f"Total rows: {df.count()}")

Preview of raw data:
+----------+----------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+
|     Entry|      Entry Name|       Protein names|          Gene Names|            Organism|            Sequence|EC number|            InterPro|
+----------+----------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+
|A0A075F5C6|A0A075F5C6_MOUSE|Heat shock factor...|                Hsf1|Mus musculus (Mouse)|MDLAVGPGAAGPSNVPA...|     NULL|IPR000232;IPR0277...|
|A0A087WPF7|     AUTS2_MOUSE|Autism susceptibi...|      Auts2 Kiaa0442|Mus musculus (Mouse)|MDGPTRGHGLRKKRRSR...|     NULL|          IPR023246;|
|A0A087WRK1|A0A087WRK1_MOUSE|Predicted gene, 2...|Gm20905 Gm20814 G...|Mus musculus (Mouse)|MRRMALKKLKVIPKEGY...|     NULL|IPR051443;IPR006888;|
|A0A087WRT4|A0A087WRT4_MOUSE|FAT atypical cadh...|                Fat1|Mus musculus (Mouse)|MGRHLTLLLLLLLFLQQ

In [7]:
df.printSchema()

root
 |-- Entry: string (nullable = true)
 |-- Entry Name: string (nullable = true)
 |-- Protein names: string (nullable = true)
 |-- Gene Names: string (nullable = true)
 |-- Organism: string (nullable = true)
 |-- Sequence: string (nullable = true)
 |-- EC number: string (nullable = true)
 |-- InterPro: string (nullable = true)



**Data Cleaning**

In [9]:
df_cleaned = df.dropna(subset=["Entry", "InterPro"])
df_cleaned = df_cleaned.withColumn("InterPro_domains", split(col("InterPro"), ";"))

print("Preview of cleaned data:")
df_cleaned.show(5)

Preview of cleaned data:
+----------+----------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+--------------------+
|     Entry|      Entry Name|       Protein names|          Gene Names|            Organism|            Sequence|EC number|            InterPro|    InterPro_domains|
+----------+----------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+--------------------+
|A0A075F5C6|A0A075F5C6_MOUSE|Heat shock factor...|                Hsf1|Mus musculus (Mouse)|MDLAVGPGAAGPSNVPA...|     NULL|IPR000232;IPR0277...|[IPR000232, IPR02...|
|A0A087WPF7|     AUTS2_MOUSE|Autism susceptibi...|      Auts2 Kiaa0442|Mus musculus (Mouse)|MDGPTRGHGLRKKRRSR...|     NULL|          IPR023246;|       [IPR023246, ]|
|A0A087WRK1|A0A087WRK1_MOUSE|Predicted gene, 2...|Gm20905 Gm20814 G...|Mus musculus (Mouse)|MRRMALKKLKVIPKEGY...|     NULL|IPR051443;IPR006888;|[

**Pairwise Similarity Computation**

In [10]:
protein_pairs = df_cleaned.alias("a").crossJoin(df_cleaned.alias("b")) \
    .filter(col("a.Entry") < col("b.Entry"))

protein_pairs = protein_pairs.withColumn(
    "IntersectionSize", size(array_intersect(col("a.InterPro_domains"), col("b.InterPro_domains")))
).withColumn(
    "UnionSize", size(array_union(col("a.InterPro_domains"), col("b.InterPro_domains")))
)

protein_pairs = protein_pairs.withColumn(
    "JaccardSimilarity", col("IntersectionSize") / col("UnionSize")
)

# Advanced Filtering
similarity_threshold = 0.2
min_intersection_size = 2
filtered_pairs = protein_pairs.filter((col("JaccardSimilarity") >= similarity_threshold) &
                                      (col("IntersectionSize") >= min_intersection_size))

edges_df = filtered_pairs.select(
    col("a.Entry").alias("Protein1"),
    col("b.Entry").alias("Protein2"),
    col("JaccardSimilarity").alias("Weight")
)

print("Preview of graph edges:")
edges_df.show(5)

Preview of graph edges:
+----------+--------+-------------------+
|  Protein1|Protein2|             Weight|
+----------+--------+-------------------+
|A0A075F5C6|  A2A432|0.23076923076923078|
|A0A075F5C6|  D0VYS2| 0.8333333333333334|
|A0A075F5C6|  D3KU66|                0.3|
|A0A075F5C6|  D3Z120|                0.3|
|A0A075F5C6|  D3Z6Q4| 0.2727272727272727|
+----------+--------+-------------------+
only showing top 5 rows



In [16]:
# Step 5: Graph Construction and Analysis
vertices_df = df_cleaned.select(col("Entry").alias("id")).distinct()

# Create GraphFrame
protein_graph = GraphFrame(vertices_df, edges_df)

# Graph Analysis - Node Degrees
degrees = protein_graph.degrees
print("Node degrees:")
degrees.show(5)

# Graph Analysis - Clustering (Connected Components)
clusters = protein_graph.connectedComponents()
print("Protein clusters (connected components):")
clusters.show(5)

# Centrality - PageRank
pagerank = protein_graph.pageRank(resetProbability=0.15, maxIter=10)
print("PageRank results:")
pagerank.vertices.show(5)

Py4JJavaError: An error occurred while calling o111.loadClass.
: java.lang.ClassNotFoundException: org.graphframes.GraphFramePythonAPI
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)


In [17]:
# Step 6: Optimization and Performance Tuning
# Partition data to optimize for larger datasets
filtered_pairs = filtered_pairs.repartition(100, col("Protein1"))
filtered_pairs.cache()

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `Protein1` cannot be resolved. Did you mean one of the following? [`UnionSize`, `a`.`Entry`, `b`.`Entry`, `a`.`InterPro`, `b`.`InterPro`].;
'RepartitionByExpression ['Protein1], 100
+- Filter ((JaccardSimilarity#220 >= 0.2) AND (IntersectionSize#179 >= 2))
   +- Project [Entry#17, Entry Name#18, Protein names#19, Gene Names#20, Organism#21, Sequence#22, EC number#23, InterPro#24, InterPro_domains#96, Entry#151, Entry Name#152, Protein names#153, Gene Names#154, Organism#155, Sequence#156, EC number#157, InterPro#158, InterPro_domains#159, IntersectionSize#179, UnionSize#199, (cast(IntersectionSize#179 as double) / cast(UnionSize#199 as double)) AS JaccardSimilarity#220]
      +- Project [Entry#17, Entry Name#18, Protein names#19, Gene Names#20, Organism#21, Sequence#22, EC number#23, InterPro#24, InterPro_domains#96, Entry#151, Entry Name#152, Protein names#153, Gene Names#154, Organism#155, Sequence#156, EC number#157, InterPro#158, InterPro_domains#159, IntersectionSize#179, size(array_union(InterPro_domains#96, InterPro_domains#159), true) AS UnionSize#199]
         +- Project [Entry#17, Entry Name#18, Protein names#19, Gene Names#20, Organism#21, Sequence#22, EC number#23, InterPro#24, InterPro_domains#96, Entry#151, Entry Name#152, Protein names#153, Gene Names#154, Organism#155, Sequence#156, EC number#157, InterPro#158, InterPro_domains#159, size(array_intersect(InterPro_domains#96, InterPro_domains#159), true) AS IntersectionSize#179]
            +- Filter (Entry#17 < Entry#151)
               +- Join Cross
                  :- SubqueryAlias a
                  :  +- Project [Entry#17, Entry Name#18, Protein names#19, Gene Names#20, Organism#21, Sequence#22, EC number#23, InterPro#24, split(InterPro#24, ;, -1) AS InterPro_domains#96]
                  :     +- Filter atleastnnonnulls(2, Entry#17, InterPro#24)
                  :        +- Relation [Entry#17,Entry Name#18,Protein names#19,Gene Names#20,Organism#21,Sequence#22,EC number#23,InterPro#24] csv
                  +- SubqueryAlias b
                     +- Project [Entry#151, Entry Name#152, Protein names#153, Gene Names#154, Organism#155, Sequence#156, EC number#157, InterPro#158, split(InterPro#158, ;, -1) AS InterPro_domains#159]
                        +- Filter atleastnnonnulls(2, Entry#151, InterPro#158)
                           +- Relation [Entry#151,Entry Name#152,Protein names#153,Gene Names#154,Organism#155,Sequence#156,EC number#157,InterPro#158] csv


In [18]:
# Step 7: Pre-Visualization Preparation
# Sample subgraph for visualization (e.g., proteins with degree > 10)
sampled_graph = protein_graph.filter("degree > 10")

NameError: name 'protein_graph' is not defined

In [None]:
# Step 8: Save Results
# Save graph edges for further use
output_edges_path = "output/graph_edges.csv"
edges_df.write.csv(output_edges_path, header=True)

# Save node degrees
output_degrees_path = "output/node_degrees.csv"
degrees.write.csv(output_degrees_path, header=True)

# Save clusters
output_clusters_path = "output/protein_clusters.csv"
clusters.write.csv(output_clusters_path, header=True)

print(f"Graph edges saved to {output_edges_path}")
print(f"Node degrees saved to {output_degrees_path}")
print(f"Protein clusters saved to {output_clusters_path}")

In [None]:
# Step 9: Validation Checks
# Check for duplicate edges
duplicate_edges = filtered_pairs.groupBy("Protein1", "Protein2").count().filter(col("count") > 1)
print("Duplicate edges:")
duplicate_edges.show()

# Check for isolated nodes
isolated_nodes = vertices_df.join(degrees, vertices_df.id == degrees.id, "left_anti")
print("Isolated nodes (no edges):")
isolated_nodes.show()

# Stop Spark Session
spark.stop()