In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [4]:
observations_df  = spark.read.option("header","true").csv(r"..\output\csv\observations.csv")

In [6]:
observations_df.drop("ENCOUNTER").show(5)

+--------------------+--------------------+-----------+-------+--------------------+-----+-------+-------+
|                DATE|             PATIENT|   CATEGORY|   CODE|         DESCRIPTION|VALUE|  UNITS|   TYPE|
+--------------------+--------------------+-----------+-------+--------------------+-----+-------+-------+
|2013-06-17T15:35:01Z|ee81448e-95eb-01c...|vital-signs| 8302-2|         Body Height| 54.8|     cm|numeric|
|2016-03-01T14:34:18Z|16567962-4bec-368...|vital-signs| 8302-2|         Body Height| 52.3|     cm|numeric|
|2013-06-17T15:35:01Z|ee81448e-95eb-01c...|vital-signs|72514-3|Pain severity - 0...|  1.0|{score}|numeric|
|2016-03-01T14:34:18Z|16567962-4bec-368...|vital-signs|72514-3|Pain severity - 0...|  1.0|{score}|numeric|
|2013-06-17T15:35:01Z|ee81448e-95eb-01c...|vital-signs|29463-7|         Body Weight|  4.8|     kg|numeric|
+--------------------+--------------------+-----------+-------+--------------------+-----+-------+-------+
only showing top 5 rows



In [7]:
from pyspark.sql.functions import *

In [8]:
systolic_observations_df = (observations_df.select("PATIENT", "DATE", "VALUE")
                   .withColumnRenamed("VALUE", "systolic")
                   .withColumnRenamed("PATIENT","patientid")
                   .withColumnRenamed("DATE","dateofobservation")
                   .filter((col("CODE") == "8480-6"))
  )


In [11]:
systolic_observations_df.show(5)

+--------------------+--------------------+--------+
|           patientid|   dateofobservation|systolic|
+--------------------+--------------------+--------+
|ee81448e-95eb-01c...|2013-06-17T15:35:01Z|   131.0|
|16567962-4bec-368...|2016-03-01T14:34:18Z|   116.0|
|16567962-4bec-368...|2016-04-05T14:34:18Z|   109.0|
|ee81448e-95eb-01c...|2013-07-22T15:35:01Z|   136.0|
|ee81448e-95eb-01c...|2013-09-23T15:35:01Z|   125.0|
+--------------------+--------------------+--------+
only showing top 5 rows



In [13]:
diastolic_observations_df = (
    observations_df.select("PATIENT", "DATE", "VALUE")
                   .withColumnRenamed("VALUE", "diastolic")
                   .withColumnRenamed("PATIENT","patientid")
                   .withColumnRenamed("DATE","dateofobservation")
                   .filter((col("code") == "8462-4"))
    )

hdl_observations_df = (
    observations_df.select("PATIENT", "DATE", "VALUE")
                   .withColumnRenamed("VALUE", "hdl")
                   .withColumnRenamed("PATIENT","patientid")
                   .withColumnRenamed("DATE","dateofobservation")
                   .filter((col("code") == "2085-9"))
    )

ldl_observations_df = (
    observations_df.select("PATIENT", "DATE", "VALUE")
                   .withColumnRenamed("VALUE", "ldl")
                   .withColumnRenamed("PATIENT","patientid")
                   .withColumnRenamed("DATE","dateofobservation")
                   .filter((col("code") == "18262-6"))
    )

bmi_observations_df = (
    observations_df.select("PATIENT", "DATE", "VALUE")
                   .withColumnRenamed("VALUE", "bmi")
                   .withColumnRenamed("PATIENT","patientid")
                   .withColumnRenamed("DATE","dateofobservation")
                   .filter((col("code") == "39156-5"))
    )

In [20]:
bmi_observations_df.show(5)

+--------------------+--------------------+----+
|           patientid|   dateofobservation| bmi|
+--------------------+--------------------+----+
|f2cdc9a2-6cef-c62...|2011-09-09T00:31:24Z|17.9|
|ee81448e-95eb-01c...|2015-11-23T15:35:01Z|18.3|
|ee81448e-95eb-01c...|2016-05-23T15:35:01Z|17.4|
|ee81448e-95eb-01c...|2016-11-21T15:35:01Z|16.8|
|ee81448e-95eb-01c...|2017-05-22T15:35:01Z|16.4|
+--------------------+--------------------+----+
only showing top 5 rows



In [19]:
merged_observations_df = (
    systolic_observations_df.join(diastolic_observations_df, ["patientid", "dateofobservation"])
                            .join(hdl_observations_df, ["patientid", "dateofobservation"])
                            .join(ldl_observations_df, ["patientid", "dateofobservation"])
                            .join(bmi_observations_df, ["patientid", "dateofobservation"])
)

merged_observations_df.show(5)

+--------------------+--------------------+--------+---------+----+-----+----+
|           patientid|   dateofobservation|systolic|diastolic| hdl|  ldl| bmi|
+--------------------+--------------------+--------+---------+----+-----+----+
|007dca45-b0c3-cbf...|2008-07-20T00:06:17Z|   124.0|     88.0|62.0| 83.7|27.7|
|007f51fe-4731-86b...|2016-11-10T11:03:34Z|   127.0|     80.0|65.8|109.3|30.1|
|0115568f-b00b-19f...|2018-02-27T02:19:08Z|   123.0|     88.0|55.0|151.3|27.8|
|012da0c4-8f2d-f59...|2015-10-30T07:56:09Z|   127.0|     74.0|43.7|155.6|28.4|
|01f5bb36-ba72-80b...|2018-07-10T23:38:57Z|   109.0|     84.0|67.0| 99.3|30.1|
+--------------------+--------------------+--------+---------+----+-----+----+
only showing top 5 rows



In [22]:
patients_df = (spark.read.option("header","true").csv(r"C:\work\argo\ml\synthea\output\csv\patients.csv")
              .select(col("Id").alias("patientid"),col("BIRTHDATE").alias("dateofbirth"))
              )

patients_df.show(5)

+--------------------+-----------+
|           patientid|dateofbirth|
+--------------------+-----------+
|ee81448e-95eb-01c...| 2013-06-17|
|16567962-4bec-368...| 2016-03-01|
|f2cdc9a2-6cef-c62...| 1998-05-07|
|cbb876f7-4e53-8f1...| 1972-04-15|
|73cd25ba-4d2c-576...| 1971-05-08|
+--------------------+-----------+
only showing top 5 rows



In [23]:
merged_observations_with_age_df = (
  merged_observations_df.join(patients_df, "patientid")
                        .withColumn("age", datediff(col("dateofobservation"), col("dateofbirth"))/365)
                        .drop("dateofbirth")
  )

merged_observations_with_age_df.show(5)

+--------------------+--------------------+--------+---------+----+-----+----+------------------+
|           patientid|   dateofobservation|systolic|diastolic| hdl|  ldl| bmi|               age|
+--------------------+--------------------+--------+---------+----+-----+----+------------------+
|007dca45-b0c3-cbf...|2008-07-20T00:06:17Z|   124.0|     88.0|62.0| 83.7|27.7|  94.3972602739726|
|007f51fe-4731-86b...|2016-11-10T11:03:34Z|   127.0|     80.0|65.8|109.3|30.1|42.057534246575344|
|0115568f-b00b-19f...|2018-02-27T02:19:08Z|   123.0|     88.0|55.0|151.3|27.8| 71.01917808219179|
|012da0c4-8f2d-f59...|2015-10-30T07:56:09Z|   127.0|     74.0|43.7|155.6|28.4|  86.3972602739726|
|01f5bb36-ba72-80b...|2018-07-10T23:38:57Z|   109.0|     84.0|67.0| 99.3|30.1| 74.43013698630136|
+--------------------+--------------------+--------+---------+----+-----+----+------------------+
only showing top 5 rows



In [26]:
diabetics_df = (spark.read.option("header","true").csv(r"C:\work\argo\ml\synthea\output\csv\conditions.csv")
               .filter(col("DESCRIPTION") == "Diabetes")
               .select(col('PATIENT').alias("patientid"),col("START").alias("start"))
               )

In [29]:
diabetics_df.show(5)

+--------------------+----------+
|           patientid|     start|
+--------------------+----------+
|511427c5-b2ad-1ba...|2005-03-06|
|167a4921-4e0f-a76...|2002-03-29|
|85d54c72-0b41-eff...|1972-04-30|
|c8acf77a-1831-86a...|2019-05-24|
|bd1c8c51-7876-bf4...|2006-01-13|
+--------------------+----------+
only showing top 5 rows



In [30]:
observations_and_condition_df = (
    merged_observations_with_age_df.join(diabetics_df, "patientid", "left_outer")
    .withColumn("diabetic", when(col("start").isNotNull(), 1).otherwise(0))
)

observations_and_condition_df.show(5)

+--------------------+--------------------+--------+---------+----+-----+----+------------------+----------+--------+
|           patientid|   dateofobservation|systolic|diastolic| hdl|  ldl| bmi|               age|     start|diabetic|
+--------------------+--------------------+--------+---------+----+-----+----+------------------+----------+--------+
|02314aa6-55ab-e60...|2021-03-24T14:33:45Z|   106.0|     76.0|48.7|137.2|29.7|37.320547945205476|2015-02-18|       1|
|02314aa6-55ab-e60...|2019-03-13T14:33:45Z|   128.0|     77.0|58.3|114.2|28.3| 35.28767123287671|2015-02-18|       1|
|02314aa6-55ab-e60...|2018-03-07T14:33:45Z|   116.0|     84.0|51.1|151.0|27.6|34.271232876712325|2015-02-18|       1|
|02314aa6-55ab-e60...|2017-03-01T14:33:45Z|   112.0|     77.0|46.7|154.3|29.7| 33.25479452054795|2015-02-18|       1|
|02314aa6-55ab-e60...|2016-02-24T14:33:45Z|   121.0|     74.0|53.9|134.4|29.7| 32.23835616438356|2015-02-18|       1|
+--------------------+--------------------+--------+----

In [31]:
observations_and_condition_df = (
    observations_and_condition_df.filter((col("diabetic") == 0) | ((col("dateofobservation") >= col("start"))))
)


In [32]:
from pyspark.sql.window import Window

w = Window.partitionBy(observations_and_condition_df["patientid"]).orderBy(merged_observations_df["dateofobservation"].asc())

first_observation_df = observations_and_condition_df.withColumn("rn", row_number().over(w)).where(col("rn") == 1).drop("rn")

In [33]:
observations_and_condition_df.count()

132567

In [34]:
first_observation_df.count()

22736

In [38]:
first_pdf = first_observation_df.toPandas()

In [41]:
first_pdf.to_csv(r"./data/processedData_first.csv",index=False)

In [42]:
first_pdf.to_parquet(r"./data/processedData_first.parquet",index=False)

In [45]:
observations_and_condition_pdf = observations_and_condition_df.toPandas()

In [46]:
observations_and_condition_pdf.to_parquet(r"./data/processedData_full.parquet",index=False)

In [47]:
observations_and_condition_pdf.to_csv(r"./data/processedData_full.csv",index=False)

In [48]:
spark.stop()