AkHQ: http://localhost:8082  
Spark-Master http://localhost:8083  
Spark-Worker-1 http://localhost:8084

https://github.com/aehrc/pathling/tree/issue/452/lib/python#python-api-for-pathling

# CHANGE PARTITION NUMBER IN save FUNCTION

In [1]:
kafka = True

In [2]:
appName = "Kafka, Spark and FHIR Data"
master = "local[8]" # test for writing checkpoint to local filesystem - SET a NUMBER IN THE BRACKETS FOR PARTITIONS 3-4 TIMES THE NUMBER OF CPU CORES IN YOUR CLUSTER
#master = "spark://spark-master:7077"

In [3]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pathling.etc import find_jar


#.config("spark.worker.cores", "10") \ #.config("spark.worker.cores", "8") \
    #.config("spark.worker.memory", "30g") \ #.config("spark.worker.memory", "24") \
    #.config("spark.executor.memory", "26g") \ #.config("spark.executor.memory", "20g") \
    #.config("spark.driver.memory", "28g") \ #.config("spark.driver.memory", "22g") \
    
    #spark.executer und spark.driver.memory reduzieren
    
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .config("spark.ui.port","4040") \
    .config("spark.rpc.message.maxSize", "1000") \
    .config("spark.worker.memory", "20g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.network.timeout", "60000s") \
    .config("spark.driver.maxResultSize", "8g") \
    .config("spark.sql.broadcastTimeout", "1200s") \
    .config("spark.executor.heartbeatInterval", "1200s") \
    .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'") \
    .getOrCreate()

spark.sparkContext.addFile(find_jar())
spark.sparkContext.setCheckpointDir("checkpoints")

"""

    """

'\n\n    '

In [4]:
from pyspark.sql.functions import udf, when
from pyspark.sql.types import StringType

### Bring Pathling into the game

In [5]:
import pathling
from pathling import PathlingContext, Expression as exp
from pyspark.sql.functions import regexp_replace, col, explode, concat_ws, explode_outer, first, to_date, max

ptl = PathlingContext.create(spark = spark, enable_extensions = True)

Der Parquet-Ordner muss immer wieder gelöscht werden!

In [6]:
! rm -rf parquet


### specify partition

In [None]:
if kafka:
    print("Read data from kafka")

    kafka_server = "svm-ap-dizk8s1q.srv.uk-erlangen.de:32386" # node forwarding

    df = spark \
      .read \
      .format("kafka") \
      .option("kafka.bootstrap.servers", kafka_server) \
      .option("assign", """{"fhir.pacs.imagingStudy":[0,1]}""") \
      .load()

    #          .writeStream \ -- remove stream AND           .trigger(once=True) \

    kafka_data = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    kafka_data.show()

else:
    print("Read data from folder")

    bundles_dir = 'testdata'
    bundles = ptl.spark.read.text(bundles_dir, wholetext=True)

    # ImagingStudies
    imaging_studies = ptl.encode_bundle(bundles, "ImagingStudy")
    imaging_studies_dataset = ptl.read.datasets({"ImagingStudy": imaging_studies})
    imaging_studies_dataset.write.parquet("./parquet")

# IMAGING_STUDIES

In [8]:
import time
start = time.monotonic()

In [9]:
imaging_data_kafka = ptl.encode_bundle(kafka_data.select("value"), 'ImagingStudy')
imaging_data_kafka_dataset = ptl.read.datasets({"ImagingStudy": imaging_data_kafka})

pathling.datasource.DataSource

In [11]:
imaging_studies = imaging_data_kafka_dataset.extract("ImagingStudy", columns=[
    exp("numberOfSeries", "no_series"),
    exp("numberOfInstances", "no_instances")
    ]
)

studies_no = imaging_studies.count()

imaging_series = imaging_data_kafka_dataset.extract("ImagingStudy", columns=[
    exp("series.bodySite.code", "bodysite"),
    exp("series.modality.code", "modality"),
    exp("series.laterality.code", "laterality"), 
    exp("series.numberOfInstances", "s_no_instances")
    ]
)


series_no = imaging_series.count()
instance_no = imaging_series.agg({"s_no_instances": "sum"}).collect()[0][0]

result_bodysite = imaging_series.groupBy("bodysite").count().orderBy("count", ascending=False)
result_bodysite_with_modality = imaging_series.filter(col("modality") != "SR").filter(col("modality") != "DOC").filter(col("modality") != "SEG").filter(col("modality") != "PR").filter(col("modality") != "KO").filter(col("modality") != "OT").filter(col("modality") != "REG").filter(col("modality") != "SC").filter(col("modality") != "RTSTRUCT").groupBy("bodysite").count().orderBy("count", ascending=False)
result_laterality = imaging_series.groupBy("laterality").count()
result_laterality_L_bodySite = imaging_series.filter(col("laterality") == "L").groupBy("bodySite").count().orderBy("count", ascending=False)
result_laterality_R_bodySite = imaging_series.filter(col("laterality") == "R").groupBy("bodySite").count().orderBy("count", ascending=False)
result_modality = imaging_series.groupBy("modality").count().orderBy("count", ascending=False)
result_instances_modality = imaging_series.groupBy("modality").agg({"s_no_instances": "avg"}).orderBy("avg(s_no_instances)", ascending=False)
result_instances_bodysite = imaging_series.filter(col("bodysite") == "10200004").agg({"s_no_instances": "avg"})
result_modality_bodysite = imaging_series.filter(col("modality") == "XA").groupBy("bodysite").count().orderBy("count", ascending=False)
result_series = imaging_studies.agg({"no_series": "avg"})
result_instances = imaging_studies.agg({"no_instances": "avg"})

In [12]:
import os
def save_final_df(final_df, df_name):
    output_folder = "csv-output"
    output_file = str(df_name + "_partition_0-1.csv")  # RENAME numbers HERE!
    
    final_df_pandas = final_df.toPandas()

    output_path_filename = os.path.join(
        output_folder, output_file
    )
    print(output_path_filename)
    print("###### current dir: ", os.getcwd())
    print("###### output_path_filename : ", output_path_filename)

    final_df_pandas.to_csv(output_path_filename)

In [None]:
print ("Total number of studies: "+str(studies_no))
print("Total number of series: "+str(series_no))
print ("Total number of instances: "+str(instance_no))
# #result_bodysite.show()
save_final_df(result_bodysite, "result_bodysite")
# print("Bodysite without non-acquisition modalities")
# result_bodysite_with_modality.show()
save_final_df(result_bodysite_with_modality, "result_bodysite_with_modality")
# #result_modality.show()
save_final_df(result_modality, "result_modality")
# #result_laterality.show()
save_final_df(result_laterality, "result_laterality")
# print("Laterality left")
# result_laterality_L_bodySite.show()
save_final_df(result_laterality_L_bodySite, "result_laterality_L_bodySite")
# print("Laterality right")
# result_laterality_R_bodySite.show()
save_final_df(result_laterality_R_bodySite, "result_laterality_R_bodySite")
# #result_series.show()
save_final_df(result_series, "result_series")
# result_instances.show()
save_final_df(result_instances, "result_instances")
# result_instances_modality.show()
save_final_df(result_instances_modality, "result_instances_modality")
# result_instances_bodysite.show()
save_final_df(result_instances_bodysite, "result_instances_bodysite")
# result_modality_bodysite.show()
save_final_df(result_modality_bodysite, "result_modality_bodysite")

In [None]:
# ACHTUNG - zwar übersichtlicher, aber kann sehr langsam sein
imaging_studies.toPandas()

In [None]:
# Parquet Ordner löschen
! rm -rf parquet