In [1]:
import pandas as pd
import os

In [2]:
path = "gs://test_bucket_1st/group_project/training_small"
person = os.path.join(path, "person.csv")
death = os.path.join(path, "death.csv")
visit = os.path.join(path, "visit_occurrence.csv")
condition = os.path.join(path, "condition_occurrence.csv")
measure = os.path.join(path, "measurement.csv")
obs = os.path.join(path, "observation.csv")
procedure = os.path.join(path, "procedure_occurrence.csv")
drug = os.path.join(path, "drug_exposure.csv")

In [3]:
df_person = spark.read.format("csv").option("header", 'true').option("inferschema", 'true').load(person)

In [4]:
df_person.printSchema()

root
 |-- year_of_birth: integer (nullable = true)
 |-- gender_source_value: integer (nullable = true)
 |-- ethnicity_concept_id: integer (nullable = true)
 |-- provider_id: string (nullable = true)
 |-- race_source_concept_id: string (nullable = true)
 |-- person_id: integer (nullable = true)
 |-- person_source_value: string (nullable = true)
 |-- month_of_birth: integer (nullable = true)
 |-- gender_source_concept_id: string (nullable = true)
 |-- ethnicity_source_concept_id: string (nullable = true)
 |-- care_site_id: string (nullable = true)
 |-- day_of_birth: integer (nullable = true)
 |-- ethnicity_source_value: integer (nullable = true)
 |-- location_id: integer (nullable = true)
 |-- race_concept_id: double (nullable = true)
 |-- gender_concept_id: integer (nullable = true)
 |-- birth_datetime: timestamp (nullable = true)
 |-- race_source_value: integer (nullable = true)



In [5]:
df_person.count() # in total 97918 rows

97918

In [4]:
df_death = spark.read.format("csv").option("header", 'true').option("inferschema", 'true').load(death)
df_visit = spark.read.format("csv").option("header", 'true').option("inferschema", 'true').load(visit)
df_con = spark.read.format("csv").option("header", 'true').option("inferschema", 'true').load(condition)

In [7]:
print(df_death.count()) # 1096
print(df_visit.count()) # 1108648
print(df_con.count()) # 1836041

1096
1108648
1836041


## Drop empty rows/columns and redundant columns

In [5]:
df_person = df_person.dropna(how='all')
df_death = df_death.dropna(how='all')
df_visit = df_visit.dropna(how='all')
df_con = df_con.dropna(how='all')

In [9]:
df_person.select("year_of_birth").distinct().collect()[0][0]

1959

In [6]:
# Function to drop the empty columns of the dataframe
def dropNullColumns(df):
    null_set = {"none", "null" , "nan"}
    for col in df.columns:
        unique_val = df.select(col).distinct().collect()[0][0]
        if str(unique_val).lower() in null_set:
            print("Dropping " + col + " because of all null values.")
            df = df.drop(col)
    return(df)

In [7]:
print("for person table:")
df_person = dropNullColumns(df_person)
print("for death table:")
df_death = dropNullColumns(df_death)
print("for visit table:")
df_visit = dropNullColumns(df_visit)
print("for condition table:")
df_con = dropNullColumns(df_con)

for person table:
Dropping provider_id because of all null values.
Dropping race_source_concept_id because of all null values.
Dropping gender_source_concept_id because of all null values.
Dropping ethnicity_source_concept_id because of all null values.
Dropping care_site_id because of all null values.
for death table:
Dropping cause_concept_id because of all null values.
Dropping cause_source_value because of all null values.
for visit table:
Dropping preceding_visit_occurrence_id because of all null values.
Dropping admitting_source_value because of all null values.
Dropping discharge_to_source_value because of all null values.
Dropping admitting_source_concept_id because of all null values.
Dropping visit_source_concept_id because of all null values.
Dropping discharge_to_concept_id because of all null values.
for condition table:
Dropping visit_detail_id because of all null values.
Dropping condition_status_source_value because of all null values.
Dropping stop_reason because of al

In [8]:
print(df_person.count()) # 97918
print(df_death.count()) # 1096
print(df_visit.count()) # 1108648
print(df_con.count()) # 1836041

97918
1096
1108648
1836041


In [13]:
df_person.show(1)

+-------------+-------------------+--------------------+---------+-------------------+--------------+------------+----------------------+-----------+---------------+-----------------+-------------------+-----------------+
|year_of_birth|gender_source_value|ethnicity_concept_id|person_id|person_source_value|month_of_birth|day_of_birth|ethnicity_source_value|location_id|race_concept_id|gender_concept_id|     birth_datetime|race_source_value|
+-------------+-------------------+--------------------+---------+-------------------+--------------+------------+----------------------+-----------+---------------+-----------------+-------------------+-----------------+
|         1937|                  2|            38003564|    19076|   3FE03A5F3E571720|             8|           1|                     1|        404|         8557.0|             8532|1937-08-01 00:00:00|                1|
+-------------+-------------------+--------------------+---------+-------------------+--------------+-----------

In [14]:
df_death.show(1)

+---------+-------------------+-------------------+---------------------+-----------------------+
|person_id|         death_date|     death_datetime|death_type_concept_id|cause_source_concept_id|
+---------+-------------------+-------------------+---------------------+-----------------------+
|   144964|2010-11-18 00:00:00|2010-11-18 00:00:00|             38003565|                      0|
+---------+-------------------+-------------------+---------------------+-----------------------+
only showing top 1 row



In [15]:
df_visit.show(1)

+-----------+-------------------+---------+------------+-------------------+------------------+-------------------+--------------------+----------------+---------------------+-------------------+
|provider_id|   visit_start_date|person_id|care_site_id|visit_occurrence_id|visit_source_value|     visit_end_date|visit_start_datetime|visit_concept_id|visit_type_concept_id| visit_end_datetime|
+-----------+-------------------+---------+------------+-------------------+------------------+-------------------+--------------------+----------------+---------------------+-------------------+
|     5973.0|2009-10-02 00:00:00| 129617.0|      4057.0|            1863806|   887153388320050|2009-10-02 00:00:00| 2009-10-02 00:00:00|               0|             44818517|2009-10-02 00:00:00|
+-----------+-------------------+---------+------------+-------------------+------------------+-------------------+--------------------+----------------+---------------------+-------------------+
only showing top 1 r

In [9]:
from pyspark.sql.types import IntegerType
df_visit = df_visit.withColumn("person_id", df_visit["person_id"].cast(IntegerType()))

In [10]:
df_3 = df_person.join(df_death, ["person_id"], 'left').join(df_visit, ["person_id"], 'left')

In [10]:
df_3.count()

1108650

In [19]:
df_3.printSchema()

root
 |-- person_id: integer (nullable = true)
 |-- year_of_birth: integer (nullable = true)
 |-- gender_source_value: integer (nullable = true)
 |-- ethnicity_concept_id: integer (nullable = true)
 |-- person_source_value: string (nullable = true)
 |-- month_of_birth: integer (nullable = true)
 |-- day_of_birth: integer (nullable = true)
 |-- ethnicity_source_value: integer (nullable = true)
 |-- location_id: integer (nullable = true)
 |-- race_concept_id: double (nullable = true)
 |-- gender_concept_id: integer (nullable = true)
 |-- birth_datetime: timestamp (nullable = true)
 |-- race_source_value: integer (nullable = true)
 |-- death_date: timestamp (nullable = true)
 |-- death_datetime: timestamp (nullable = true)
 |-- death_type_concept_id: integer (nullable = true)
 |-- cause_source_concept_id: integer (nullable = true)
 |-- provider_id: double (nullable = true)
 |-- visit_start_date: timestamp (nullable = true)
 |-- care_site_id: double (nullable = true)
 |-- visit_occurrence_

In [37]:
df_con.show(1)

+-----------+------------------------+----------------------+---------------------------+---------+-------------------+--------------------+---------------------------+-------------------------+--------------------+----------------------+-----------------------+-------------------+
|provider_id|condition_start_datetime|condition_end_datetime|condition_source_concept_id|person_id| condition_end_date|condition_start_date|condition_status_concept_id|condition_type_concept_id|condition_concept_id|condition_source_value|condition_occurrence_id|visit_occurrence_id|
+-----------+------------------------+----------------------+---------------------------+---------+-------------------+--------------------+---------------------------+-------------------------+--------------------+----------------------+-----------------------+-------------------+
|   461776.0|     2008-11-20 00:00:00|   2008-11-20 00:00:00|                4.4829639E7| 115208.0|2008-11-20 00:00:00| 2008-11-20 00:00:00|           

In [11]:
df_con = df_con.withColumn("person_id", df_con["person_id"].cast(IntegerType()))

In [12]:
df_4_con = df_3.join(df_con, ["person_id", "visit_occurrence_id"], 'left').drop(df_con.provider_id)

In [40]:
df_4_con.count()

2670178

In [15]:
df_4_con.printSchema()

root
 |-- person_id: integer (nullable = true)
 |-- visit_occurrence_id: integer (nullable = true)
 |-- year_of_birth: integer (nullable = true)
 |-- gender_source_value: integer (nullable = true)
 |-- ethnicity_concept_id: integer (nullable = true)
 |-- person_source_value: string (nullable = true)
 |-- month_of_birth: integer (nullable = true)
 |-- day_of_birth: integer (nullable = true)
 |-- ethnicity_source_value: integer (nullable = true)
 |-- location_id: integer (nullable = true)
 |-- race_concept_id: double (nullable = true)
 |-- gender_concept_id: integer (nullable = true)
 |-- birth_datetime: timestamp (nullable = true)
 |-- race_source_value: integer (nullable = true)
 |-- death_date: timestamp (nullable = true)
 |-- death_datetime: timestamp (nullable = true)
 |-- death_type_concept_id: integer (nullable = true)
 |-- cause_source_concept_id: integer (nullable = true)
 |-- provider_id: double (nullable = true)
 |-- visit_start_date: timestamp (nullable = true)
 |-- care_site

In [16]:
df_4_con.coalesce(1).write.option("header", "true").csv('gs://test_bucket_1st/group_project/comp_condition.csv')

In [None]:
df_measure = spark.read.format("csv").option("header", 'true').option("inferschema", 'true').load(measure)
df_obs = spark.read.format("csv").option("header", 'true').option("inferschema", 'true').load(obs)
df_procedure = spark.read.format("csv").option("header", 'true').option("inferschema", 'true').load(procedure)
df_drug = spark.read.format("csv").option("header", 'true').option("inferschema", 'true').load(drug)

In [18]:
print("for measurement table:")
df_measure = dropNullColumns(df_measure)
print("for observation table:")
df_obs = dropNullColumns(df_obs)
print("for procedure table:")
df_procedure = dropNullColumns(df_procedure)
print("for drug table:")
df_drug = dropNullColumns(df_drug)

for measurement table:
Dropping value_source_value because of all null values.
Dropping measurement_type_concept_id because of all null values.
Dropping operator_concept_id because of all null values.
Dropping range_low because of all null values.
Dropping unit_source_value because of all null values.
Dropping measurement_time because of all null values.
Dropping range_high because of all null values.
Dropping visit_detail_id because of all null values.
for observation table:
Dropping observation_type_concept_id because of all null values.
Dropping visit_detail_id because of all null values.
Dropping value_as_number because of all null values.
Dropping unit_source_value because of all null values.
Dropping qualifier_source_value because of all null values.
Dropping unit_concept_id because of all null values.
Dropping qualifier_concept_id because of all null values.
for procedure table:
Dropping visit_detail_id because of all null values.
Dropping quantity because of all null values.
Dr

In [19]:
df_measure.show(1)

+--------------+--------------------+-----------+-------------------+---------+---------------+-------------------+----------------------+-----------------------------+---------------+------------------------+-------------------+
|measurement_id|measurement_datetime|provider_id|   measurement_date|person_id|value_as_number|value_as_concept_id|measurement_concept_id|measurement_source_concept_id|unit_concept_id|measurement_source_value|visit_occurrence_id|
+--------------+--------------------+-----------+-------------------+---------+---------------+-------------------+----------------------+-----------------------------+---------------+------------------------+-------------------+
|     3655800.0| 2013-01-26 00:00:00|   315461.0|2009-08-12 00:00:00|  25142.0|           12.0|                0.0|             2212093.0|                    2212093.0|         8555.0|                   80053|            3023687|
+--------------+--------------------+-----------+-------------------+---------+-

In [20]:
df_measure = df_measure.withColumn("person_id", df_measure["person_id"].cast(IntegerType()))
df_4_measure = df_3.join(df_measure, ["person_id", "visit_occurrence_id"], 'left').drop(df_measure.provider_id)

In [19]:
df_4_measure.count()

11512503

In [None]:
df_4_measure.coalesce(1).write.option("header", "true").csv('gs://test_bucket_1st/group_project/comp_measure.csv')

In [21]:
df_obs.show(1)

+-----------+-------------------+-------------------+---------+-----------------------------+---------------+----------------------+--------------+------------------------+----------------------------+--------------------+-------------------+
|provider_id|value_as_concept_id|   observation_date|person_id|observation_source_concept_id|value_as_string|observation_concept_id|observation_id|observation_source_value|observation_source_concept_i|observation_datetime|visit_occurrence_id|
+-----------+-------------------+-------------------+---------+-----------------------------+---------------+----------------------+--------------+------------------------+----------------------------+--------------------+-------------------+
|    77182.0|          4317150.0|2010-02-19 00:00:00|  64285.0|                  4.4836678E7|            1-5|              440927.0|     1540843.0|                   V5864|                        null| 2018-10-25 00:00:00|             448979|
+-----------+---------------

In [22]:
df_obs = df_obs.withColumn("person_id", df_obs["person_id"].cast(IntegerType()))
df_4_obs = df_3.join(df_obs, ["person_id", "visit_occurrence_id"], 'left').drop(df_obs.provider_id)

In [24]:
df_4_obs.count()

2495565

In [23]:
df_4_obs.coalesce(1).write.option("header", "true").csv('gs://test_bucket_1st/group_project/comp_observation.csv')

In [24]:
df_procedure.show(1)

+-------------------+-----------+---------+-------------------+---------------------------+--------------------+----------------------+-----------------------+-------------------+
| procedure_datetime|provider_id|person_id|     procedure_date|procedure_source_concept_id|procedure_concept_id|procedure_source_value|procedure_occurrence_id|visit_occurrence_id|
+-------------------+-----------+---------+-------------------+---------------------------+--------------------+----------------------+-----------------------+-------------------+
|2017-03-09 00:00:00|     3919.0|  25002.0|2009-02-16 00:00:00|                  2005197.0|           2005197.0|                  7804|              3609743.0|               7275|
+-------------------+-----------+---------+-------------------+---------------------------+--------------------+----------------------+-----------------------+-------------------+
only showing top 1 row



In [25]:
df_procedure = df_procedure.withColumn("person_id", df_procedure["person_id"].cast(IntegerType()))
df_4_procedure = df_3.join(df_procedure, ["person_id", "visit_occurrence_id"], 'left').drop(df_procedure.provider_id)

In [28]:
df_4_procedure.count()

7659241

In [26]:
df_4_procedure.coalesce(1).write.option("header", "true").csv('gs://test_bucket_1st/group_project/comp_procedure.csv')

In [27]:
df_drug.show(1)

+---------+------------------------+-----------------+--------+-----------+----------------------+-----------+----------------+----------------------------+-------------------+---------------+-------------------+
|person_id|drug_exposure_start_date|drug_source_value|quantity|days_supply|drug_source_concept_id|provider_id|drug_exposure_id|drug_exposure_start_datetime|  verbatim_end_date|drug_concept_id|visit_occurrence_id|
+---------+------------------------+-----------------+--------+-----------+----------------------+-----------+----------------+----------------------------+-------------------+---------------+-------------------+
| 106959.0|     2009-08-26 00:00:00|      61392018839|    30.0|       90.0|           4.4988429E7|       null|       3134766.0|         2009-08-26 00:00:00|2009-11-24 00:00:00|    1.9133578E7|            4007216|
+---------+------------------------+-----------------+--------+-----------+----------------------+-----------+----------------+---------------------

In [28]:
df_drug = df_drug.withColumn("person_id", df_drug["person_id"].cast(IntegerType()))
df_4_drug = df_3.join(df_drug, ["person_id", "visit_occurrence_id"], 'left').drop(df_drug.provider_id)

In [32]:
df_4_drug.count()

1282238

In [29]:
df_4_drug.coalesce(1).write.option("header", "true").csv('gs://test_bucket_1st/group_project/comp_drug.csv')