In [65]:
import os
os.environ["SPARK_HOME"] = "/home/anyes/spark"
os.environ["JAVA_HOME"] = "/usr"

# Importation des bibliothéques

In [66]:
import findspark
from pyspark.sql import SparkSession
from pyspark import SparkConf

# for dataframe and udf
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *


from pyspark.sql.functions import col
from pyspark.sql import functions as F

import igraph as ig

# Lancer Spark

In [67]:


# initialise environment variables for spark
findspark.init()

# Start spark session
# --------------------------
def start_spark():
  local = "local[*]"
  appName = "PLDAC"

  gf = "graphframes:graphframes:0.8.3-spark3.5-s_2.12"

  configLocale = SparkConf().setAppName(appName).setMaster(local).\
  set("spark.executor.memory", "6G").\
  set("spark.driver.memory","6G").\
  set("spark.sql.catalogImplementation","in-memory").\
  set("spark.jars.packages", gf)

  spark = SparkSession.builder.config(conf = configLocale).getOrCreate()
  sc = spark.sparkContext
  sc.setLogLevel("ERROR")

  spark.conf.set("spark.sql.autoBroadcastJoinThreshold","-1")

  # Adjust the query execution environment to the size of the cluster (4 cores)
  spark.conf.set("spark.sql.shuffle.partitions","4")
  print("session started, its id is ", sc.applicationId)
  return spark
spark = start_spark()

session started, its id is  local-1708707254104


# Lecture des fichiers du dossier BDLE_10K

In [68]:
# Initialisez une session Spark
spark = SparkSession.builder.appName("Proteines_graph").getOrCreate()

parquet_folder = "./local/data/BDLE_10K"

# Récuperer tous les fichiers Parquet compressé avec Snappy
parquet_files = parquet_folder + "/*.snappy.parquet"

#Création du data frame
df = spark.read.format("parquet").option("compression", "snappy").load(parquet_files)

# Affichez les 10 premiére ligne du DataFrame
df.show(10)


+-----------+-----------+-----+
|     seqID1|     seqID2|  sim|
+-----------+-----------+-----+
|117761605:5|152890023:5| 97.8|
|152904885:3|155591878:2| 88.5|
|152887848:4|153682181:0|100.0|
|152937692:5| 80009514:2| 82.3|
|152990923:2|154549183:4| 98.0|
|152867782:1|153171917:1| 83.1|
| 15111981:2|153137370:1|100.0|
|152794195:0| 15280704:2| 96.9|
| 62963742:1| 63783418:5| 98.4|
|152170568:3|153062631:2| 97.6|
+-----------+-----------+-----+
only showing top 10 rows



                                                                                

# Partionnement des arêtes en plusieurs sous ensembles

In [69]:

# Nombre de partitions
num_partitions = 20

# Ajout d'une nouvelle colonne 'partition' basée sur le modulo de seqID1
#F.hash(col('seqID1') Calcul le hash d'une colonne pour partitionner les données.
df_partitioned = df.withColumn('partition', (F.hash(col('seqID1')) % num_partitions + num_partitions) % num_partitions)

# Afficher le DataFrame partitionné
df_partitioned.show()


+-----------+-----------+-----+---------+
|     seqID1|     seqID2|  sim|partition|
+-----------+-----------+-----+---------+
|117761605:5|152890023:5| 97.8|        7|
|152904885:3|155591878:2| 88.5|        0|
|152887848:4|153682181:0|100.0|        0|
|152937692:5| 80009514:2| 82.3|        0|
|152990923:2|154549183:4| 98.0|        0|
|152867782:1|153171917:1| 83.1|       15|
| 15111981:2|153137370:1|100.0|        6|
|152794195:0| 15280704:2| 96.9|        0|
| 62963742:1| 63783418:5| 98.4|       17|
|152170568:3|153062631:2| 97.6|        5|
|152904832:4|154500443:5| 98.6|        0|
|152745429:3| 22988511:2|100.0|        0|
|153161980:3|154256473:0|100.0|        0|
|152903373:4|154426964:5| 80.4|        0|
|146010871:1|158434400:4| 98.7|        2|
|153040013:0|156563807:1| 96.5|        0|
|152933621:4|157634534:2| 96.8|        0|
|153141720:0|155282497:0| 93.2|        0|
|153082115:5|153887336:3| 90.0|        0|
|153025447:3|155660759:1| 97.4|        0|
+-----------+-----------+-----+---

In [70]:
#La fréquence de chaque partition
partition_freq = df_partitioned.groupBy('partition').count()
partition_freq.show()

+---------+-----+
|partition|count|
+---------+-----+
|       14|  883|
|       12|  821|
|       18|  752|
|       13|  835|
|       15|  859|
|        6|  862|
|       17|  819|
|        9|  907|
|       19|  810|
|       16|  888|
|        5|  770|
|        2| 1170|
|       10|  878|
|        4|  741|
|        7|  883|
|        0| 4906|
|        1|  771|
|        8|  763|
|        3|  772|
|       11|  752|
+---------+-----+



In [71]:
# Créer un DataFrame pour chaque partition
dfs_partitioned = []
for partition_id in range(num_partitions):
    df_partition = df_partitioned.filter(col('partition') == partition_id)
    dfs_partitioned.append(df_partition)

# Afficher les DataFrames de chaque partition
for i, df_partition in enumerate(dfs_partitioned):
    print(f"Partition {i}:")
    df_partition.show()

Partition 0:


+-----------+-----------+-----+---------+
|     seqID1|     seqID2|  sim|partition|
+-----------+-----------+-----+---------+
|152904885:3|155591878:2| 88.5|        0|
|152887848:4|153682181:0|100.0|        0|
|152937692:5| 80009514:2| 82.3|        0|
|152990923:2|154549183:4| 98.0|        0|
|152794195:0| 15280704:2| 96.9|        0|
|152904832:4|154500443:5| 98.6|        0|
|152745429:3| 22988511:2|100.0|        0|
|153161980:3|154256473:0|100.0|        0|
|152903373:4|154426964:5| 80.4|        0|
|153040013:0|156563807:1| 96.5|        0|
|152933621:4|157634534:2| 96.8|        0|
|153141720:0|155282497:0| 93.2|        0|
|153082115:5|153887336:3| 90.0|        0|
|153025447:3|155660759:1| 97.4|        0|
|152921945:1|157742939:2| 92.6|        0|
|152901882:2| 79639334:0| 81.7|        0|
|152772567:4|153848527:5| 90.3|        0|
|152897014:5|155771540:2| 84.1|        0|
|152778738:2|153768354:1| 98.2|        0|
|152985004:4|156710084:4| 85.7|        0|
+-----------+-----------+-----+---

# Calcul des composantes connexes de chaque partition

In [101]:
from igraph import Graph
# def calcul_composantes(df):
#     df = df.toPandas()
#     ig_graph = ig.Graph.TupleList(df[['seqID1', 'seqID2']].itertuples(index=False), directed=False)
#     connected_components = ig_graph.components().membership
#     return connected_components

def calcul_composantes(df):
    edges = df.select('seqID1', 'seqID2').collect()
    # Créer un graphe à partir des arêtes
    g = Graph.TupleList(edges, directed=False)
    # Trouver les composantes connexes
    connected_components = g.components().membership
    return connected_components



In [73]:
dfs_partitioned[0].show()

+-----------+-----------+-----+---------+
|     seqID1|     seqID2|  sim|partition|
+-----------+-----------+-----+---------+
|152904885:3|155591878:2| 88.5|        0|
|152887848:4|153682181:0|100.0|        0|
|152937692:5| 80009514:2| 82.3|        0|
|152990923:2|154549183:4| 98.0|        0|
|152794195:0| 15280704:2| 96.9|        0|
|152904832:4|154500443:5| 98.6|        0|
|152745429:3| 22988511:2|100.0|        0|
|153161980:3|154256473:0|100.0|        0|
|152903373:4|154426964:5| 80.4|        0|
|153040013:0|156563807:1| 96.5|        0|
|152933621:4|157634534:2| 96.8|        0|
|153141720:0|155282497:0| 93.2|        0|
|153082115:5|153887336:3| 90.0|        0|
|153025447:3|155660759:1| 97.4|        0|
|152921945:1|157742939:2| 92.6|        0|
|152901882:2| 79639334:0| 81.7|        0|
|152772567:4|153848527:5| 90.3|        0|
|152897014:5|155771540:2| 84.1|        0|
|152778738:2|153768354:1| 98.2|        0|
|152985004:4|156710084:4| 85.7|        0|
+-----------+-----------+-----+---

In [104]:
components = calcul_composantes(dfs_partitioned[0])
print(len(components))

7740


In [105]:
# Nombre de lignes
nb_rows = dfs_partitioned[0].count()

# Liste des noms des colonnes
columns_list = dfs_partitioned[0].columns

# Nombre de colonnes
nb_columns = len(columns_list)

# Affichage des informations
print(f"Nombre de lignes : {nb_rows}")
print(f"Nombre de colonnes : {nb_columns}")
print(f"Noms des colonnes : {columns_list}")

Nombre de lignes : 4906
Nombre de colonnes : 4
Noms des colonnes : ['seqID1', 'seqID2', 'sim', 'partition']


In [96]:
components = calcul_composantes(dfs_partitioned[0])
composantes_triees = sorted(components, key=len, reverse=True)
print(composantes_triees[1])

[2171, 2172, 2218, 2219, 2291, 4286, 4302, 6017, 6041, 6082, 7685, 7692]


In [99]:
# Créer une liste de lignes pour chaque composante
rows = []
for i, component in enumerate(composantes_triees):
    for seqid in component:
        rows.append(Row(nom_composante=i, SEQid=seqid))

# Créer un DataFrame Spark à partir de la liste de lignes
result_df = spark.createDataFrame(rows)

# Afficher le DataFrame résultant
result_df.show()

+--------------+-----+
|nom_composante|SEQid|
+--------------+-----+
|             0| 2098|
|             0| 2099|
|             0| 2113|
|             0| 2114|
|             0| 4094|
|             0| 4095|
|             0| 4145|
|             0| 4174|
|             0| 5922|
|             0| 5948|
|             0| 5950|
|             0| 5968|
|             0| 7601|
|             0| 7621|
|             1| 2171|
|             1| 2172|
|             1| 2218|
|             1| 2219|
|             1| 2291|
|             1| 4286|
+--------------+-----+
only showing top 20 rows



In [100]:
import pandas as pd
# Créer une liste de DataFrame pour chaque composante
dataframes = []
for i, component in enumerate(composantes_triees):
    # Créer un DataFrame à partir de la composante
    df_component = pd.DataFrame(component, columns=['SEQid'])
    # Ajouter une colonne pour le nom de la composante
    df_component['nom_composante'] = i
    dataframes.append(df_component)

# Concaténer les DataFrames individuels en un seul DataFrame
result_df = pd.concat(dataframes, ignore_index=True)

# Afficher le DataFrame résultant
print(result_df)

      SEQid  nom_composante
0      2098               0
1      2099               0
2      2113               0
3      2114               0
4      4094               0
...     ...             ...
7735   7673            2958
7736   7714            2959
7737   7715            2959
7738   7718            2960
7739   7719            2960

[7740 rows x 2 columns]


In [97]:
print(len(composantes_triees))

2961


In [91]:
print(calcul_composantes(dfs_partitioned[0]))

Clustering with 7740 elements and 2961 clusters
[   0] 152904885:3, 155591878:2
[   1] 152887848:4, 153682181:0
[   2] 152937692:5, 80009514:2
[   3] 152990923:2, 154549183:4
[   4] 152794195:0, 15280704:2
[   5] 152904832:4, 154500443:5
[   6] 152745429:3, 22988511:2
[   7] 153161980:3, 154256473:0
[   8] 152903373:4, 154426964:5
[   9] 153040013:0, 156563807:1
[  10] 152933621:4, 157634534:2
[  11] 153141720:0, 155282497:0
[  12] 153082115:5, 153887336:3
[  13] 153025447:3, 155660759:1
[  14] 152921945:1, 157742939:2
[  15] 152901882:2, 79639334:0
[  16] 152772567:4, 153848527:5
[  17] 152897014:5, 155771540:2
[  18] 152778738:2, 153768354:1
[  19] 152985004:4, 156710084:4
[  20] 152782228:1, 152810615:4
[  21] 153099598:1, 155793668:2
[  22] 152775953:1, 156128239:0
[  23] 153037119:3, 153710066:2
[  24] 152771539:3, 153588430:0
[  25] 153180239:2, 62246699:2
[  26] 152782911:2, 154263685:0
[  27] 152768681:0, 27955181:2
[  28] 152778422:3, 153519684:5
[  29] 152759894:5, 158177234:

In [84]:
#compute_connected_components_udf = udf(calcul_composantes, ArrayType(IntegerType()))
data = dfs_partitioned[0]
df_with_components = data.withColumn("connected_components", inx_component)

# Afficher le DataFrame résultant
df_with_components.show()


PySparkTypeError: [NOT_COLUMN] Argument `col` should be a Column, got list.

In [86]:
from pyspark.sql.functions import lit

# Définir la fonction UDF
def calcul_composantes(df):
    edges = df.select('seqID1', 'seqID2').collect()
    # Créer un graphe à partir des arêtes
    g = Graph.TupleList(edges, directed=False)
    # Trouver les composantes connexes
    connected_components = g.components().membership
    return [int(i) for i in connected_components]  # Convertir les valeurs en entiers

# Appliquer la fonction UDF
inx_component = calcul_composantes(dfs_partitioned[1])
data = dfs_partitioned[1]
df_with_components = data.withColumn("connected_components", lit(inx_component))

# Afficher le DataFrame résultant
df_with_components.show()

+-----------+-----------+-----+---------+--------------------+
|     seqID1|     seqID2|  sim|partition|connected_components|
+-----------+-----------+-----+---------+--------------------+
|152797616:4|153077244:3| 80.9|        1|[0, 0, 1, 1, 2, 2...|
|152195812:3|152910392:0| 97.2|        1|[0, 0, 1, 1, 2, 2...|
|152233364:3|152860059:4|100.0|        1|[0, 0, 1, 1, 2, 2...|
|152228143:5|152971380:4| 98.3|        1|[0, 0, 1, 1, 2, 2...|
|152716349:0|153040585:3| 97.4|        1|[0, 0, 1, 1, 2, 2...|
|126635248:5|131579132:2| 80.4|        1|[0, 0, 1, 1, 2, 2...|
|154165205:2|156325288:2|100.0|        1|[0, 0, 1, 1, 2, 2...|
| 58291451:3| 64776984:4| 89.1|        1|[0, 0, 1, 1, 2, 2...|
|126635248:5|126803785:3| 84.0|        1|[0, 0, 1, 1, 2, 2...|
|152641991:5|153416927:2| 98.6|        1|[0, 0, 1, 1, 2, 2...|
|146952620:5| 49941002:1| 90.1|        1|[0, 0, 1, 1, 2, 2...|
|116187856:1|153331872:1|100.0|        1|[0, 0, 1, 1, 2, 2...|
|153348971:3|153348973:4| 98.3|        1|[0, 0, 1, 1, 2