In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Créer une session Spark
spark = SparkSession.builder.appName("FilterBreastCancer").getOrCreate()

# Lire le fichier CSV sans en-têtes
df = spark.read.option("header", "false").csv('C:/Users/PcPack/Downloads/semmedVER43_2024_R_PREDICATION.csv/semmedVER43_2024_R_PREDICATION.23327.csv', encoding='latin1', inferSchema=True)
# Renommer les colonnes selon leur position
df = df.withColumnRenamed("_c0", "PREDICATION_ID")\
       .withColumnRenamed("_c1", "SENTENCE_ID")\
       .withColumnRenamed("_c2", "PMID")\
       .withColumnRenamed("_c3", "PREDICATE")\
       .withColumnRenamed("_c4", "SUBJECT_CUI")\
       .withColumnRenamed("_c5", "SUBJECT_NAME")\
       .withColumnRenamed("_c6", "SUBJECT_SEMTYPE")\
       .withColumnRenamed("_c7", "SUBJECT_NOVELTY")\
       .withColumnRenamed("_c8", "OBJECT_CUI")\
       .withColumnRenamed("_c9", "OBJECT_NAME")\
       .withColumnRenamed("_c10", "OBJECT_SEMTYPE")\
       .withColumnRenamed("_c11", "OBJECT_NOVELTY")\
       .withColumnRenamed("_c12", "unk1")\
       .withColumnRenamed("_c13", "unk2")\
       .withColumnRenamed("_c14", "unk3")
# Filtrer les lignes où la colonne OBJECT_CUI ou OBJECT_NAME correspond aux conditions
filtered_df = df.filter(
    (col("SUBJECT_NAME").contains("breast cancer")) |
    (col("SUBJECT_NAME").contains("recurent breast cancer"))|
    (col("SUBJECT_NAME").contains("Male breast cancer"))|
    (col("OBJECT_NAME").contains("breast cancer")) |
    (col("OBJECT_NAME").contains("recurent breast cancer"))|
    (col("OBJECT_NAME").contains("Male breast cancer"))    
).distinct()

filtered_df.show()



In [None]:
pandas_df=filtered_df.toPandas()
output_file = "filtered_dataset4.csv"
pandas_df.to_csv(output_file, index=False)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Créer une session Spark
spark = SparkSession.builder.appName("FilterBreastCancer")\
                  .config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.0-s_2.12") \
                   .getOrCreate()
# Lire le fichier CSV sans en-têtes
df = spark.read.option("header", "false").csv("filtered_data.csv",encoding='latin1', inferSchema=True)
df.printSchema()

# Compter le nombre de lignes dans le DataFrame
row_count = df.count()
print(f"Nombre de lignes : {row_count}")


In [None]:
df = df.withColumnRenamed("_c0", "PREDICATION_ID")\
       .withColumnRenamed("_c1", "SENTENCE_ID")\
       .withColumnRenamed("_c2", "PMID")\
       .withColumnRenamed("_c3", "PREDICATE")\
       .withColumnRenamed("_c4", "SUBJECT_CUI")\
       .withColumnRenamed("_c5", "SUBJECT_NAME")\
       .withColumnRenamed("_c6", "SUBJECT_SEMTYPE")\
       .withColumnRenamed("_c7", "SUBJECT_NOVELTY")\
       .withColumnRenamed("_c8", "OBJECT_CUI")\
       .withColumnRenamed("_c9", "OBJECT_NAME")\
       .withColumnRenamed("_c10", "OBJECT_SEMTYPE")\
       .withColumnRenamed("_c11", "OBJECT_NOVELTY")\
       .withColumnRenamed("_c12", "unk1")\
       .withColumnRenamed("_c13", "unk2")\
       .withColumnRenamed("_c14", "unk3")

In [None]:
from pyspark.sql.functions import col, when, count, isnan
# Vérifier les valeurs manquantes pour chaque colonne
missing_count_df = df.select([count(when(col(c).isNull() | isnan(c) | (col(c) == ''), c)).alias(c) for c in df.columns])
missing_count_df.show()

# Vérifier les lignes entièrement vides (toutes les colonnes sont nulles ou vides)
empty_rows_count = df.filter(~col("_c0").isNotNull() & ~col("_c1").isNotNull() & 
                             ~col("_c2").isNotNull() & ~col("_c3").isNotNull() & 
                             ~col("_c4").isNotNull() & ~col("_c5").isNotNull() & 
                             ~col("_c6").isNotNull() & ~col("_c7").isNotNull() & 
                             ~col("_c8").isNotNull() & ~col("_c9").isNotNull() & 
                             ~col("_c10").isNotNull() & ~col("_c11").isNotNull() & 
                             ~col("_c12").isNotNull() & ~col("_c13").isNotNull() & 
                             ~col("_c14").isNotNull()).count()
print(f"Nombre de lignes entièrement vides : {empty_rows_count}")

In [None]:
# First, filter the DataFrame to get rows containing 'breast cancer' in the specified column
filtered_df = df.filter(df["OBJECT_NAME"].contains("breast cancer"))

# Then, select distinct elements from the filtered DataFrame
distinct_elements = filtered_df.select("OBJECT_NAME").distinct()

# Show the distinct elements
distinct_elements.show()

# Count the number of distinct elements
distinct_count = distinct_elements.count()
print(f"Number of distinct elements: {distinct_count}")

In [None]:
# First, filter the DataFrame to get rows containing 'breast cancer' in the specified column
filtered_df = df.filter(df["SUBJECT_NAME"].contains("breast cancer"))

# Then, select distinct elements from the filtered DataFrame
distinct_elements = filtered_df.select("SUBJECT_NAME").distinct()

# Show the distinct elements
distinct_elements.show()

# Count the number of distinct elements
distinct_count = distinct_elements.count()
print(f"Number of distinct elements: {distinct_count}")

In [None]:
from pyspark.sql import SparkSession



# List of relevant predicates indicating causality
relevant_predicates = ['CAUSES', 'PREVENTS', 'TREATS', 'ASSOCIATED_WITH', 'PROCESS_OF', 'PART_OF']

# Filter rows related to the relevant predicates
causal_relationships_df = df.filter(df['PREDICATE'].isin(relevant_predicates))

# Show the filtered dataset
causal_relationships_df.show()

# Analyze the relationships
causal_relationships = causal_relationships_df.collect()
for row in causal_relationships:
    subject = row['SUBJECT_NAME']
    predicate = row['PREDICATE']
    obj = row['OBJECT_NAME']
    print(f"{subject} {predicate} {obj}")

In [None]:
# Group by predicate and count the occurrences
predicate_counts = causal_relationships_df.groupBy("PREDICATE").count()
predicate_counts.show()

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Convert to pandas DataFrame
causal_relationships_pd = causal_relationships_df.toPandas()

# Plotting
plt.figure(figsize=(10, 6))
sns.countplot(data=causal_relationships_pd, x='PREDICATE')
plt.title('Count of Each Predicate')
plt.xlabel('Predicate')
plt.ylabel('Count')
plt.xticks(rotation=45)
plt.show()

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Créer une session Spark
spark = SparkSession.builder.appName("FilterBreastCancer").getOrCreate()

# Chargement du fichier CSV
df = spark.read.option("header", "false").csv("filtered_data.csv", encoding='latin1', inferSchema=True)


# Définition de la fonction de mapping pour les relations causales
def map_causal_relations(subject_name):
    subject_name_lower = subject_name.lower()
    if 'brca1' in subject_name_lower or 'brca2' in subject_name_lower:
        return 'Genetic factors'
    elif 'diet' in subject_name_lower or 'exercise' in subject_name_lower or 'alcohol' in subject_name_lower:
        return 'Lifestyle factors'
    elif 'radiation' in subject_name_lower or 'pollutants' in subject_name_lower or 'light at night' in subject_name_lower or 'chemicals in consumer products' in subject_name_lower:
        return 'Environmental factors'
    elif 'previous cancer' in subject_name_lower or 'family history' in subject_name_lower:
        return 'Medical history'
    elif 'hormone replacement therapy' in subject_name_lower or 'early menstruation' in subject_name_lower or 'age at first menstruation' in subject_name_lower or 'age at menopause' in subject_name_lower or 'pregnancy' in subject_name_lower or 'breastfeeding' in subject_name_lower:
        return 'Hormonal factors'
    elif 'chemotherapy' in subject_name_lower or 'radiation therapy' in subject_name_lower or 'surgery' in subject_name_lower:
        return 'Treatments'
    elif 'survival rate' in subject_name_lower or 'recurrence rate' in subject_name_lower:
        return 'Outcomes'
    elif 'body mass index' in subject_name_lower or 'physical activity' in subject_name_lower:
        return 'Weight and activity factors'
    elif 'other genetic mutations' in subject_name_lower:
        return 'Other genetic factors'
    else:
        return 'Other'

# Convertir la fonction Python en une UDF
map_causal_relations_udf = udf(map_causal_relations, StringType())

# Appliquer l'UDF pour créer une nouvelle colonne 'Category'
df = df.withColumn('Category', map_causal_relations_udf(df['_c5']))



In [13]:
# Analyse exploratoire : compter le nombre d'occurrences par catégorie
category_counts_df = df.groupBy("Category").count().orderBy("count", ascending=False)

# Afficher les résultats pour vérifier
category_counts_df.show()

# Convertir les résultats en pandas DataFrame pour la visualisation
pd_category_counts = category_counts_df.toPandas()

# Visualisation : créer un graphique à barres pour représenter la distribution des catégories
plt.figure(figsize=(10, 6))
plt.bar(pd_category_counts['Category'], pd_category_counts['count'], color='skyblue')
plt.title('Distribution des catégories de facteurs associés au cancer du sein')
plt.xlabel('Catégorie de facteurs')
plt.ylabel("Nombre d'occurrences")
plt.xticks(rotation=45)
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.tight_layout()
plt.show()

Py4JJavaError: An error occurred while calling o276.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 20.0 failed 1 times, most recent failure: Lost task 0.0 in stage 20.0 (TID 20) (DESKTOP-JN4QT7E executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:54)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:858)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:701)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:745)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:695)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:660)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:636)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:582)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:541)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 29 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:54)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:858)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:701)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:745)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:695)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:660)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:636)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:582)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:541)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 29 more
