##Install and Import the necessary classes from the RDFlib library:

In [1]:
! pip install rdflib
! pip install owlrl
from rdflib import Graph, Literal, Namespace, RDF, URIRef
from rdflib.namespace import FOAF, XSD
from rdflib import Graph, Namespace, RDF, RDFS, OWL
from rdflib.plugins.sparql import prepareQuery
import pandas as pd
import rdflib
import pyspark 
import os
import pandas as pd
from pyspark.sql.functions import col,lit
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, isnan




### Step1: Create a SparkSession and read the Excel file into a PySpark DataFrame:

In [2]:
spark = SparkSession.builder.appName('DataCleaning').getOrCreate()

In [3]:
# Define the file paths
files = ['DGZ/DECIDE_MTA_UGENT_14nov2022.xlsx', 
         'DGZ/DECIDE_MTA_UGENT_BAC_AERO_14nov2022.xlsx', 
         'DGZ/DECIDE_MTA_UGENTBAC_MYCO_14nov2022.xlsx']

# Load the data into Spark dataframes
dfs = []
for file in files:
    df = spark.read.format('com.crealytics.spark.excel') \
                .option('header', 'true') \
                .option('inferSchema', 'true') \
                .load(file)
    dfs.append(df)

barometer_dt_raw = dfs[0]
barometer_aero_cult_raw = dfs[1]
barometer_myco_cult_raw = dfs[2]
#barometer_aero_cult_raw .show()


#### Step 2: Create an RDF graph and namespaces:

In [4]:
g = rdflib.Graph()
onto = Namespace("http://example.com/animal_health#")
g.bind('onto', onto)
xsd = Namespace('http://www.w3.org/2001/XMLSchema#')
g.bind('xsd', xsd)

### Step 03: Iterate over the PySpark DataFrame and map to ontology properties:

In [5]:
# Data manipulation AEROBIC CULTURE results
barometer_aero_cult = barometer_aero_cult_raw \
    .withColumnRenamed("Dossiernummer", "Filenumber") \
    .withColumnRenamed("KIEMSTAAL IDENTIFICATIE", "Pathogen_identification") \
    .withColumnRenamed("KIEMSTAAL RESULTAAT", "Pathogen_result") \
    .withColumnRenamed("Staalnummer", "Samplenumber") \
    .withColumn("Parameter_code", lit("BAC_AERO")) \
    .withColumn("Result", lit("OK")) \
    .select("Filenumber", "Pathogen_identification", "Pathogen_result", "Parameter_code", "Samplenumber", "Result") \
    .filter(col("Pathogen_identification").isin("Pasteurella multocida", "Mannheimia haemolytica", "Histophilus somni", "Mycoplasma bovis")) \
    .distinct()


df_samples = spark.createDataFrame([
  ("OK", "BAC_AERO", "Culture", "Pasteurella multocida"),
  ("OK", "BAC_AERO", "Culture", "Mannheimia haemolytica"),
  ("OK", "BAC_AERO", "Culture", "Histophilus somni"),
  ("OK", "BAC_MYCOPLASMA", "Culture", "Mycoplasma bovis")
], ["Result", "Parameter_code", "Diagnostic_test", "Pathogen_identification"])
 
#barometer_aero_cult.show()


### Data manipulation MYCOPLASMA CULTURE results

In [6]:
# Data manipulation MYCOPLASMA CULTURE results
barometer_myco_cult = barometer_myco_cult_raw \
    .withColumnRenamed("Dossiernummer", "Filenumber") \
    .withColumnRenamed("KIEMSTAAL IDENTIFICATIE", "Pathogen_identification") \
    .withColumnRenamed("KIEMSTAAL RESULTAAT", "Mycoplasma_result") \
    .withColumnRenamed("Staalnummer", "Samplenumber") \
    .withColumn("Parameter_code", lit("BAC_MYCOPLASMA")) \
    .withColumn("Result", lit("OK")) \
    .select("Filenumber", "Pathogen_identification", "Mycoplasma_result", "Parameter_code", "Samplenumber", "Result") \
    .filter(col("Pathogen_identification").isin("Mycoplasma bovis")) \
    .distinct()

#barometer_myco_cult.show()


### Data manipulation PCR results

In [7]:
# Data manipulation PCR results
barometer_dtt = barometer_dt_raw \
    .withColumnRenamed("Dossiernummer", "Filenumber")\
    .withColumnRenamed("Staalnummer", "Samplenumber")\
    .withColumnRenamed("Staaltype", "Sample_type") \
    .withColumnRenamed("PARAMETER_CODE", "Parameter_code")\
    .withColumnRenamed("Onderzoek", "Pathogen")\
    .withColumnRenamed("Resultaat", "Result")\
    .withColumnRenamed("Creatiedatum", "Date")\
    .withColumnRenamed("Postcode", "Postal_code")\
    .withColumnRenamed("ANON_ID", "Farm_ID")\
    .withColumn("Country", when(col("Parameter_code").isin("BAC_AERO", "BAC_MYCOPLASMA"), "Belgium")) \
    .withColumn("Diagnostic_test", when(col("Parameter_code").isin("BAC_AERO", "BAC_MYCOPLASMA"), "Culture").otherwise("PCR")) \
    .withColumn("Lab_reference", lit("1"))\
    .withColumn("Sample_type", when(col("Sample_type") == "RU Broncho-alveolar lavage (BAL)", "BAL")
        .when(col("Sample_type") == "RU Anderen", "Unknown")
        .when(col("Sample_type").isin("RU Swabs", "RU Swab", "RU Neusswab", "RU Neusswabs"), "Swab")
        .when(col("Sample_type").isin("RU Kadaver", "RU Organen"), "Autopsy")
        .otherwise("Missing")) \
    .withColumn("Breed", when(col("Bedrijfstype") == "VCALF", "Veal")
        .when(col("MEAT").isNull(), "Unknown")
        .when((col("MEAT") / col("TOTAL")) > 0.9, "Beef")
        .when((col("MILK") / col("TOTAL")) > 0.9, "Dairy")
        .otherwise("Mixed")) \
    .withColumn("Pathogen",
        when(col("Pathogen").isin(
            "AD Pasteurella multocida Ag (PCR)",
            "AD Pasteurella multocida Ag pool (PCR)",
            "AD P. multocida Ag (PCR)"
            "AD P. multocida Ag pool (PCR)"),"Pasteurella multocida")
               .when(col("Pathogen").isin(
                        "AD Mannheimia haemolytica Ag (PCR)",
                        "AD Mannheimia haemolytica Ag pool (PCR)"), "Mannheimia haemolytica")
               .when(col("Pathogen").isin(
                        "RU PI3 Ag (PCR)",
                        "RU PI3 Ag pool (PCR)"), "PI3")
               .when(col("Pathogen").isin(
                        "RU BRSV Ag (PCR)",
                        "RU BRSV Ag pool (PCR)"), "BRSV")
               .when(col("Pathogen").isin(
                        "AD Histophilus somnus (PCR)",
                        "AD Histophilus somnus Ag (PCR)",
                        "AD Histophilus somnus Ag pool (PCR)",
                        "AD Histophilus somni Ag (PCR)",
                    "AD Histophilus somni Ag pool (PCR)"), "Histophilus somni")
           .when(col("Pathogen").isin(
                    "RU Mycoplasma bovis (PCR)",
                    "RU Mycoplasma bovis Ag pool (PCR)",
                    "RU Mycoplasma bovis Ag (PCR)"), "Mycoplasma bovis")
           .when(col("Pathogen").isin(
                    "AD Corona Ag (PCR)", "AD Corona Ag pool (PCR)"), "BCV")) \
.withColumn("Province", 
                   when(col("Postal_code").between(1000, 1299), "Brussels") \
                   .when(col("Postal_code").between(1300, 1499), "Walloon Brabant") \
                   .when(col("Postal_code").between(1500, 1999), "Flemish Brabant") \
                   .when(col("Postal_code").between(3000, 3499), "Antwerp") \
                   .when(col("Postal_code").between(2000, 2999), "Limburg") \
                   .when(col("Postal_code").between(5000, 5999), "Namur") \
                   .when(col("Postal_code").between(6000, 6599), "Hainaut") \
                   .when(col("Postal_code").between(7000, 7999), "Hainaut") \
                   .when(col("Postal_code").between(6600, 6999), "Luxembourg") \
                   .when(col("Postal_code").between( 8000, 8999), "West Flanders") \
                   .otherwise("East Flanders"))
            
barometer_dtt= barometer_dtt.select("Filenumber", "Diagnostic_test", "Samplenumber", "Country", "Lab_reference", "Sample_type", "Breed", "Parameter_code", "Result", "Pathogen", "Date", "Postal_code", "Province", "Farm_ID") \
    .distinct() 

#barometer_dtt.show()

### All three joins and clean file

In [8]:


barometer = barometer_dtt.join(df_samples, ['Diagnostic_test', 'Result', 'Parameter_code'], 'left') \
                       .join(barometer_aero_cult, ['Filenumber', 'Samplenumber', 'Result', 'Parameter_code', 'Pathogen_identification'], 'left') \
                       .join(barometer_myco_cult, ['Filenumber', 'Samplenumber', 'Result', 'Parameter_code', 'Pathogen_identification'], 'left') \
                       .withColumn('Pathogen', when(col('Pathogen') == 'Pasteurella multocida', 'PM') \
                                             .when(col('Pathogen') == 'Histophilus somni', 'HS') \
                                             .when(col('Pathogen') == 'Mannheimia haemolytica', 'MH') \
                                             .when(col('Pathogen') == 'Mycoplasma bovis', 'MB') \
                                             .otherwise(col('Pathogen'))) \
                       .withColumn('Pathogen', when(col('Pathogen_identification') == 'Pasteurella multocida', 'PM') \
                                             .when(col('Pathogen_identification') == 'Histophilus somni', 'HS') \
                                             .when(col('Pathogen_identification') == 'Mannheimia haemolytica', 'MH') \
                                             .when(col('Pathogen_identification') == 'Mycoplasma bovis', 'MB') \
                                             .otherwise(col('Pathogen'))) \
                       .withColumn('Result', when(col('Result').isin(["Twijfelachtig (PCR)", "POSITIEF", "GEDETECTEERD", "GEDETECTEERD (sterk)", "GEDETECTEERD (zwak)", "GEDETECTEERD (matig)", "GEDETECTEERD (zeer sterk)", "GEDETECTEERD (zeer zwak)"]), 1) \
                                             .when(col('Result').isin(["negatief", "Niet gedetecteerd"]), 0) \
                                             .when(col('Result').isin(["NI", "niet interpreteerbaar", "Inhibitie"]), None) \
                                             .when((col('Parameter_code') == 'BAC_AERO') & (col('Pathogen_result').isNull()), 0) \
                                             .when((col('Parameter_code') == 'BAC_AERO') & (col('Pathogen_result').isNotNull()), 1) \
                                             .when((col('Parameter_code') == 'BAC_MYCOPLASMA') & (col('Mycoplasma_result').isNull()), None) \
                                             .when((col('Parameter_code') == 'BAC_MYCOPLASMA') & (col('Mycoplasma_result') == 'neg'), 0) \
                                             .when((col('Parameter_code') == 'BAC_MYCOPLASMA') & (col('Mycoplasma_result').rlike('POS')), 1) \
                                             .otherwise(None))


In [9]:
barometer.show(50)

+------------+----------------+------+--------------------+-----------------------+---------------+-------+-------------+-----------+-----+--------+-------------------+-----------+---------------+-------------+---------------+-----------------+
|  Filenumber|    Samplenumber|Result|      Parameter_code|Pathogen_identification|Diagnostic_test|Country|Lab_reference|Sample_type|Breed|Pathogen|               Date|Postal_code|       Province|      Farm_ID|Pathogen_result|Mycoplasma_result|
+------------+----------------+------+--------------------+-----------------------+---------------+-------+-------------+-----------+-----+--------+-------------------+-----------+---------------+-------------+---------------+-----------------+
|TO-17-116224|TO-17-116224-001|     0|      BO_VIR_RSB_PCR|                   null|            PCR|   null|            1|        BAL|Mixed|    BRSV|2017-05-05 14:40:55|       9860|  East Flanders| BOVBE40_1019|           null|             null|
|TO-16-291154|TO-16-

###Step 03: Iterate over the PySpark DataFrame and map to ontology properties:

In [10]:



barometer = barometer.withColumn("row_id", lit(0))  # add a row ID column
for row in barometer.collect():
    sample_id = f"{row['Diagnostic_test']}-{row['Samplenumber']}"
    sample = onto[f"sample_{sample_id}"]
    g.add((sample, RDF.type, onto.Sample))
    g.add((sample, onto.has_country, Literal(row['Country'], datatype=XSD.string)))
    g.add((sample, onto.has_sample_type, onto[row['Sample_type']]))
    g.add((sample, onto.has_breed, onto[row['Breed']]))
    g.add((sample, onto.has_parameter_code, onto[row['Parameter_code']]))
    g.add((sample, onto.has_result, Literal(row['Result'], datatype=XSD.string)))
    g.add((sample, onto.has_pathogen, onto[row['Pathogen']]))
    g.add((sample, onto.has_date, Literal(row['Date'], datatype=XSD.dateTime)))
    g.add((sample, onto.has_postal_code, Literal(row['Postal_code'], datatype=XSD.string)))
    g.add((sample, onto.has_province, onto[row['Province']]))
    g.add((sample, onto.has_farm_ID, onto[row['Farm_ID']]))
    if row['Pathogen'] == "PM":
        pathogen_result = onto[f"pathogen_result_{sample_id}"]
        g.add((pathogen_result, RDF.type, onto.PathogenResult))
        g.add((pathogen_result, onto.has_pathogen_identification, onto[row['Pathogen_identification']]))
        g.add((pathogen_result, onto.has_pathogen_result, Literal(row['Pathogen_result'], datatype=XSD.string)))
        g.add((sample, onto.has_pathogen_result, pathogen_result))
    elif row['Pathogen'] == "Mycoplasma":
        mycoplasma_result = onto[f"mycoplasma_result_{sample_id}"]
        g.add((mycoplasma_result, RDF.type, onto.MycoplasmaResult))
        g.add((mycoplasma_result, onto.has_mycoplasma_result, Literal(row['Mycoplasma_result'], datatype=XSD.string)))
        g.add((sample, onto.has_mycoplasma_result, mycoplasma_result))

# print RDF graph (for testing)
print(g.serialize(format='turtle'))

# output RDF graph to file (replace with your desired filename)
with open('output/outputTest01.ttl', 'wb') as f:
    f.write(g.serialize(format='turtle'))

Py4JJavaError: An error occurred while calling o402.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 32.0 failed 1 times, most recent failure: Lost task 0.0 in stage 32.0 (TID 143) (Saba.home executor driver): TaskResultLost (result lost from block manager)
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:424)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:348)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:376)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:348)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3688)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3685)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
