This work book shows how different types of input data can be manipulated manually and loaded into `pandas` dataframes , which are subsequently used by the `CommonDataModel`

Importing packages:

In [1]:
import coconnect
import glob
import pandas as pd
import os
from sqlalchemy import create_engine

## CSV Files

Create a map between the csv filename and a `pandas` dataframe, loaded from the csv

__note__: `iterator=True` tells pandas to not read the data into memory, but setup a `parsers.TextFileReader`
          specifying `chunksize=<value>` will also return an iterator, allowing for easy looping over data chunks

In [2]:
df_map = {
            os.path.basename(x):pd.read_csv(x,iterator=True) 
            for x in glob.glob('../data/part1/*.csv')
         }
df_map

{'Symptoms.csv': <pandas.io.parsers.TextFileReader at 0x10e4a9880>,
 'Blood_Test.csv': <pandas.io.parsers.TextFileReader at 0x10d766550>,
 'Serology.csv': <pandas.io.parsers.TextFileReader at 0x10a9b2370>,
 'GP_Records.csv': <pandas.io.parsers.TextFileReader at 0x10a9e3520>,
 'Vaccinations.csv': <pandas.io.parsers.TextFileReader at 0x10e4fac40>,
 'Hospital_Visit.csv': <pandas.io.parsers.TextFileReader at 0x10e4faeb0>,
 'Demographics.csv': <pandas.io.parsers.TextFileReader at 0x10e4fac70>}

Create a co-connect `LocalDataCollection` object to store the dataframes

In [3]:
csv_inputs = coconnect.io.LocalDataCollection()
csv_inputs.load_input_dataframe(df_map)
csv_inputs

[32m2022-03-16 16:33:17[0m - [34mLocalDataCollection[0m - [1;37mINFO[0m - DataCollection Object Created
[32m2022-03-16 16:33:17[0m - [34mLocalDataCollection[0m - [1;37mINFO[0m - Registering  Symptoms.csv [<coconnect.io.common.DataBrick object at 0x10a9e3cd0>]
[32m2022-03-16 16:33:17[0m - [34mLocalDataCollection[0m - [1;37mINFO[0m - Registering  Blood_Test.csv [<coconnect.io.common.DataBrick object at 0x10a9e3a00>]
[32m2022-03-16 16:33:17[0m - [34mLocalDataCollection[0m - [1;37mINFO[0m - Registering  Serology.csv [<coconnect.io.common.DataBrick object at 0x10a9e3c40>]
[32m2022-03-16 16:33:17[0m - [34mLocalDataCollection[0m - [1;37mINFO[0m - Registering  GP_Records.csv [<coconnect.io.common.DataBrick object at 0x10a9e3a90>]
[32m2022-03-16 16:33:17[0m - [34mLocalDataCollection[0m - [1;37mINFO[0m - Registering  Vaccinations.csv [<coconnect.io.common.DataBrick object at 0x10e5194c0>]
[32m2022-03-16 16:33:17[0m - [34mLocalDataCollection[0m - [1;37mINF

<coconnect.io.plugins.local.LocalDataCollection at 0x10e5190d0>

Check to see what data has been loaded:

In [4]:
csv_inputs.keys()

dict_keys(['Symptoms.csv', 'Blood_Test.csv', 'Serology.csv', 'GP_Records.csv', 'Vaccinations.csv', 'Hospital_Visit.csv', 'Demographics.csv'])

## SQL 

The following shows how these objects can be used to write the csv files from the input collection to a SQL database.

In [5]:
sql_store = coconnect.io.SqlDataCollection(connection_string="postgresql://localhost:5432/ExampleCOVID19DataSet",
                                          drop_existing=True)
sql_store

[32m2022-03-16 16:33:17[0m - [34mSqlDataCollection[0m - [1;37mINFO[0m - DataCollection Object Created
[32m2022-03-16 16:33:18[0m - [34mSqlDataCollection[0m - [1;37mINFO[0m - Engine(postgresql://localhost:5432/ExampleCOVID19DataSet)


<coconnect.io.plugins.sql.SqlDataCollection at 0x10e519790>

Loop over all the inputs, get a loaded dataframe from the input collections, and use the sql store to write the dataframe to the SQL database 

In [6]:
for name in csv_inputs.keys():
    df = csv_inputs[name]
    name = name.split(".")[0]
    sql_store.write(name,df)

[32m2022-03-16 16:33:18[0m - [34mLocalDataCollection[0m - [1;37mINFO[0m - Retrieving initial dataframe for 'Symptoms.csv' for the first time
[32m2022-03-16 16:33:18[0m - [34mSqlDataCollection[0m - [1;37mINFO[0m - updating Symptoms in Engine(postgresql://localhost:5432/ExampleCOVID19DataSet)
[32m2022-03-16 16:33:27[0m - [34mSqlDataCollection[0m - [1;37mINFO[0m - finished save to psql
[32m2022-03-16 16:33:27[0m - [34mLocalDataCollection[0m - [1;37mINFO[0m - Retrieving initial dataframe for 'Blood_Test.csv' for the first time
[32m2022-03-16 16:33:27[0m - [34mSqlDataCollection[0m - [1;37mINFO[0m - updating Blood_Test in Engine(postgresql://localhost:5432/ExampleCOVID19DataSet)
[32m2022-03-16 16:33:44[0m - [34mSqlDataCollection[0m - [1;37mINFO[0m - finished save to psql
[32m2022-03-16 16:33:44[0m - [34mLocalDataCollection[0m - [1;37mINFO[0m - Retrieving initial dataframe for 'Serology.csv' for the first time
[32m2022-03-16 16:33:44[0m - [34mSqlDa

Now we can used pandas to test the SQL database we created, and load in some filtered data:

In [7]:
connection_string="postgresql://localhost:5432/ExampleCOVID19DataSet"
engine = create_engine(connection_string)

Retrieve a filtered pandas dataframe from the SQL connection

In [8]:
df_demo = pd.read_sql('SELECT * FROM "Demographics" WHERE "IMD" = 5 LIMIT 1000;',con=engine)
df_demo

Unnamed: 0,ID,Age,Sex,Ethnicity,IMD
0,pk1,73.0,Male,Black,5
1,pk2,65.0,Female,Black,5
2,pk11,58.0,Male,White,5
3,pk15,55.0,Female,White,5
4,pk16,42.0,Female,White,5
...,...,...,...,...,...
995,pk4155,84.0,Male,White,5
996,pk4156,82.0,Female,White,5
997,pk4161,63.0,Male,White,5
998,pk4164,61.0,Female,Black,5


Use a more complex SQL command to filter the Serology table based on information in the demographics table, creating a pandas dataframe object.

In [9]:
sql_command = r'''
SELECT 
    * 
FROM "Serology" 
WHERE "ID" in (
    SELECT 
        "ID" 
    FROM "Demographics" 
    WHERE "IMD" = 5 LIMIT 1000
    )
'''
df_serology = pd.read_sql(sql_command,con=engine)
df_serology

Unnamed: 0,ID,Date,IgG
0,pk3262,2020-03-29,40.481863
1,pk1096,2022-07-26,9.057812
2,pk1249,2019-10-15,6.761957
3,pk2090,2022-01-08,1.981973
4,pk195,2022-04-12,1.441420
...,...,...,...
358,pk1066,2023-03-17,66.714910
359,pk259,2020-03-09,31.556371
360,pk1873,2020-11-09,15.658885
361,pk1491,2019-08-26,6.093198


Build a new LocalDataCollection from the dataframes pulled from SQL and loaded in memory:

In [10]:
sql_inputs = coconnect.io.LocalDataCollection()
sql_inputs.load_input_dataframe({'Serology.csv':df_serology,'Demographics.csv':df_demo})
sql_inputs

[32m2022-03-16 16:37:43[0m - [34mLocalDataCollection[0m - [1;37mINFO[0m - DataCollection Object Created
[32m2022-03-16 16:37:43[0m - [34mLocalDataCollection[0m - [1;37mINFO[0m - Registering  Serology.csv [<coconnect.io.common.DataBrick object at 0x10e5609a0>]
[32m2022-03-16 16:37:43[0m - [34mLocalDataCollection[0m - [1;37mINFO[0m - Registering  Demographics.csv [<coconnect.io.common.DataBrick object at 0x10e519ca0>]


<coconnect.io.plugins.local.LocalDataCollection at 0x125e916d0>

Load some rules (and remove some missing source tables, since we only are dealing with two tables, and only want to apply rules associated with them):

In [12]:
rules = coconnect.tools.load_json("../data/rules.json")
rules = coconnect.tools.remove_missing_sources_from_rules(rules,sql_inputs.keys())
rules



{'metadata': {'date_created': '2022-02-11T12:22:48.465257',
  'dataset': 'FAILED: ExampleV4'},
 'cdm': {'person': {'MALE 3025': {'birth_datetime': {'source_table': 'Demographics.csv',
     'source_field': 'Age',
     'operations': ['get_datetime_from_age']},
    'gender_concept_id': {'source_table': 'Demographics.csv',
     'source_field': 'Sex',
     'term_mapping': {'Male': 8507}},
    'gender_source_concept_id': {'source_table': 'Demographics.csv',
     'source_field': 'Sex',
     'term_mapping': {'Male': 8507}},
    'gender_source_value': {'source_table': 'Demographics.csv',
     'source_field': 'Sex'},
    'person_id': {'source_table': 'Demographics.csv', 'source_field': 'ID'}},
   'FEMALE 3026': {'birth_datetime': {'source_table': 'Demographics.csv',
     'source_field': 'Age',
     'operations': ['get_datetime_from_age']},
    'gender_concept_id': {'source_table': 'Demographics.csv',
     'source_field': 'Sex',
     'term_mapping': {'Female': 8532}},
    'gender_source_concept_i

Create a common data model object and process it to create CDM tables

In [13]:
cdm = coconnect.cdm.CommonDataModel.from_rules(rules,inputs=sql_inputs)
cdm.process()

[32m2022-03-16 16:38:03[0m - [34mCommonDataModel[0m - [1;37mINFO[0m - CommonDataModel (5.3.1) created with co-connect-tools version 0.0.0
[32m2022-03-16 16:38:03[0m - [34mCommonDataModel[0m - [1;37mINFO[0m - Running with an DataCollection object
[32m2022-03-16 16:38:03[0m - [34mCommonDataModel[0m - [1;37mINFO[0m - Turning on automatic cdm column filling
[32m2022-03-16 16:38:03[0m - [34mCommonDataModel[0m - [1;37mINFO[0m - Added MALE 3025 of type person
[32m2022-03-16 16:38:03[0m - [34mCommonDataModel[0m - [1;37mINFO[0m - Added FEMALE 3026 of type person
[32m2022-03-16 16:38:03[0m - [34mCommonDataModel[0m - [1;37mINFO[0m - Added Antibody 3027 of type observation
[32m2022-03-16 16:38:03[0m - [34mCommonDataModel[0m - [1;37mINFO[0m - Starting processing in order: ['person', 'observation']
[32m2022-03-16 16:38:03[0m - [34mCommonDataModel[0m - [1;37mINFO[0m - Number of objects to process for each table...
{
      "person": 2,
      "observation

In [14]:
cdm['person'].dropna(axis=1)

Unnamed: 0_level_0,gender_concept_id,year_of_birth,month_of_birth,day_of_birth,birth_datetime,gender_source_value,gender_source_concept_id
person_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
1,8507,1947,7,20,1947-07-20 00:00:00.000000,Male,8507
2,8507,1962,7,16,1962-07-16 00:00:00.000000,Male,8507
3,8507,1957,7,17,1957-07-17 00:00:00.000000,Male,8507
4,8507,1946,7,20,1946-07-20 00:00:00.000000,Male,8507
5,8507,1944,7,20,1944-07-20 00:00:00.000000,Male,8507
...,...,...,...,...,...,...,...
991,8532,1944,7,20,1944-07-20 00:00:00.000000,Female,8532
992,8532,1951,7,19,1951-07-19 00:00:00.000000,Female,8532
993,8532,1938,7,22,1938-07-22 00:00:00.000000,Female,8532
994,8532,1959,7,17,1959-07-17 00:00:00.000000,Female,8532


In [15]:
cdm['observation'].dropna(axis=1)

Unnamed: 0_level_0,person_id,observation_concept_id,observation_date,observation_datetime,observation_source_value,observation_source_concept_id
observation_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
1,880,4288455,2020-03-29,2020-03-29 00:00:00.000000,40.481862620281696,4288455
2,638,4288455,2022-07-26,2022-07-26 00:00:00.000000,9.057812397131036,4288455
3,654,4288455,2019-10-15,2019-10-15 00:00:00.000000,6.761957236947811,4288455
4,286,4288455,2022-01-08,2022-01-08 00:00:00.000000,1.9819726698915583,4288455
5,19,4288455,2022-04-12,2022-04-12 00:00:00.000000,1.441420109316674,4288455
...,...,...,...,...,...,...
359,161,4288455,2023-03-17,2023-03-17 00:00:00.000000,66.71490996683399,4288455
360,570,4288455,2020-03-09,2020-03-09 00:00:00.000000,31.55637115308043,4288455
361,726,4288455,2020-11-09,2020-11-09 00:00:00.000000,15.658884987181032,4288455
362,679,4288455,2019-08-26,2019-08-26 00:00:00.000000,6.093197917384714,4288455


## PySpark 

Using `PySpark` we can create a session and a reader to connect to the same SQL database we created above

In [16]:
from pyspark.sql import SparkSession

Define the session:

In [17]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.jars", "/Users/calummacdonald/Downloads/postgresql-42.3.1.jar") \
    .getOrCreate()

Create a reader:

In [18]:
reader = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/ExampleCOVID19DataSet") \
    .option("driver", "org.postgresql.Driver") 
reader

<pyspark.sql.readwriter.DataFrameReader at 0x1246e8d30>

Create and load a spark dataframe for the Demographics table and specify to filter this on all people under the age of 20, selecting only the first 2000 rows:

In [20]:
sdf_demo = reader.option("dbtable", '"Demographics"')\
                 .load()\

sdf_demo = sdf_demo.filter(sdf_demo.Age<20).limit(2000)

Select the first 1000 rows:

In [21]:
sdf_demo_first = sdf_demo.limit(1000)

Drop the first 1000 rows by subtracting the first 1000:

In [22]:
sdf_demo = sdf_demo.subtract(sdf_demo_first).limit(1000)
sdf_demo

DataFrame[ID: string, Age: double, Sex: string, Ethnicity: string, IMD: bigint]

Load the serology table, selecting only those whos ID is in the already loaded spark dataframe for the demographics

In [31]:
sdf_serology = reader.option("dbtable", '"Serology"')\
                     .load()

sdf_serology = sdf_serology.join(sdf_demo,
                                 ['ID'])\
                            .select(*sdf_serology.columns)
                         
sdf_serology.count()

390

Retrieve pandas dataframes from these spark dataframes and put them in a new map
_note_: we keep the name as '.csv' because this is what the name is in the rules file!

In [27]:
df_map = {
            'Demographics.csv': sdf_demo.select('*').toPandas(),
            'Serology.csv': sdf_serology.select('*').toPandas()
         }

In [28]:
spark_inputs = coconnect.io.LocalDataCollection()
spark_inputs.load_input_dataframe(df_map)
spark_inputs

[32m2022-03-16 16:43:17[0m - [34mLocalDataCollection[0m - [1;37mINFO[0m - DataCollection Object Created
[32m2022-03-16 16:43:17[0m - [34mLocalDataCollection[0m - [1;37mINFO[0m - Registering  Demographics.csv [<coconnect.io.common.DataBrick object at 0x1245dcb50>]
[32m2022-03-16 16:43:17[0m - [34mLocalDataCollection[0m - [1;37mINFO[0m - Registering  Serology.csv [<coconnect.io.common.DataBrick object at 0x1245dc940>]


<coconnect.io.plugins.local.LocalDataCollection at 0x124802bb0>

In [29]:
cdm = coconnect.cdm.CommonDataModel.from_rules(rules,inputs=spark_inputs)
cdm.process()

[32m2022-03-16 16:43:32[0m - [34mCommonDataModel[0m - [1;37mINFO[0m - CommonDataModel (5.3.1) created with co-connect-tools version 0.0.0
[32m2022-03-16 16:43:32[0m - [34mCommonDataModel[0m - [1;37mINFO[0m - Running with an DataCollection object
[32m2022-03-16 16:43:32[0m - [34mCommonDataModel[0m - [1;37mINFO[0m - Turning on automatic cdm column filling
[32m2022-03-16 16:43:32[0m - [34mCommonDataModel[0m - [1;37mINFO[0m - Added MALE 3025 of type person
[32m2022-03-16 16:43:32[0m - [34mCommonDataModel[0m - [1;37mINFO[0m - Added FEMALE 3026 of type person
[32m2022-03-16 16:43:32[0m - [34mCommonDataModel[0m - [1;37mINFO[0m - Added Antibody 3027 of type observation
[32m2022-03-16 16:43:32[0m - [34mCommonDataModel[0m - [1;37mINFO[0m - Starting processing in order: ['person', 'observation']
[32m2022-03-16 16:43:32[0m - [34mCommonDataModel[0m - [1;37mINFO[0m - Number of objects to process for each table...
{
      "person": 2,
      "observation

In [30]:
cdm['observation'].dropna(axis=1)

Unnamed: 0_level_0,person_id,observation_concept_id,observation_date,observation_datetime,observation_source_value,observation_source_concept_id
observation_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
1,709,4288455,2021-03-19,2021-03-19 00:00:00.000000,2.4434023669112914,4288455
2,12,4288455,2019-10-10,2019-10-10 00:00:00.000000,2.524992108387176,4288455
3,756,4288455,2020-09-30,2020-09-30 00:00:00.000000,2.498767194399477,4288455
4,389,4288455,2021-10-17,2021-10-17 00:00:00.000000,65.13603812569892,4288455
5,328,4288455,2022-01-03,2022-01-03 00:00:00.000000,38.47070924884126,4288455
...,...,...,...,...,...,...
386,931,4288455,2020-08-12,2020-08-12 00:00:00.000000,36.53392857849371,4288455
387,84,4288455,2020-10-26,2020-10-26 00:00:00.000000,26.51166231662655,4288455
388,216,4288455,2020-05-23,2020-05-23 00:00:00.000000,28.424229915504444,4288455
389,12,4288455,2020-12-19,2020-12-19 00:00:00.000000,24.663227817629537,4288455


In [32]:
cdm['person'].dropna(axis=1)

Unnamed: 0_level_0,gender_concept_id,year_of_birth,month_of_birth,day_of_birth,birth_datetime,gender_source_value,gender_source_concept_id
person_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
1,8507,2003,7,6,2003-07-06 00:00:00.000000,Male,8507
2,8507,2001,7,6,2001-07-06 00:00:00.000000,Male,8507
3,8507,2006,7,5,2006-07-05 00:00:00.000000,Male,8507
4,8507,2001,7,6,2001-07-06 00:00:00.000000,Male,8507
5,8507,2001,7,6,2001-07-06 00:00:00.000000,Male,8507
...,...,...,...,...,...,...,...
994,8532,2008,7,4,2008-07-04 00:00:00.000000,Female,8532
995,8532,2009,7,4,2009-07-04 00:00:00.000000,Female,8532
996,8532,2006,7,5,2006-07-05 00:00:00.000000,Female,8532
997,8532,2011,7,4,2011-07-04 00:00:00.000000,Female,8532
