In [46]:
import sys
!{sys.executable} -m pip install PyAthena

Looking in indexes: https://pypi.org/simple, https://pip.repos.neuron.amazonaws.com
You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m[33m
[0m

In [47]:
import boto3

import numpy as np
import pandas as pd

from pyathena import connect
from pyathena.cursor import DictCursor

s3 = boto3.resource("s3")
cfn = boto3.client("cloudformation")

# Create a connection to Athena 

In [48]:
import os, jmespath

session = boto3.session.Session()
region = session.region_name
print(region)

project_name = os.environ.get('RESOURCE_PREFIX')
database_name = project_name.lower()
work_group_name = project_name.lower() + '-' + region

resources = cfn.describe_stacks(StackName='{0}-Pipeline'.format(project_name))
query = 'Stacks[].Outputs[?OutputKey==`DataLakeBucket`].OutputValue'
data_lake_bucket = path = jmespath.search(query, resources)[0][0]

conn = connect(s3_staging_dir=f's3://{data_lake_bucket}/query-output',
               region_name=region,
               schema_name=database_name)
cursor = conn.cursor(DictCursor, work_group=work_group_name)

us-west-1


# Run a sample query

To count the number of entries in a table that match our main table (`clinical_patient`), we can join the main table with the matching table. In the following example, we will consider the `tcia_patients` table and try to find the rows which have a `patientid` that matches against an entry in the `bcr_patient_barcode` column of `clinical_patient`.

```
 Main table                    Matching table
+---------------------+       +-------------------+
| clinical_patient    |       | tcia_patients     |
+---------------------+       +-------------------+
| bcr_patient_barcode |+----->| patientid         |
| ...                 |       | ...               |
|                     |       |                   |
+---------------------+       +-------------------+
```

Additionally, the matching table (`tcia_patients`) may have more than one row that matches against a row in the main table. To account for this, rather than joining directly to the matching table, we create a subquery that first groups the rows in the matching table by the join key (in this case, by `patientid`) and then counts the size of that group. The resulting `quantity` will be the column retained in the output table.

In [49]:
QUERY = """
SELECT clinical_patient.bcr_patient_barcode,
    QTY_IMGS.quantity as num_images
FROM clinical_patient

LEFT JOIN
    (   SELECT COUNT(tcia_patients.patientid) AS quantity, tcia_patients.patientid 
        FROM tcia_patients
        GROUP BY tcia_patients.patientid
    ) AS QTY_IMGS
ON clinical_patient.bcr_patient_barcode = QTY_IMGS.patientid
"""

query_res = cursor.execute(QUERY).fetchall()
print(f'Received {len(query_res)} rows')
query_df = pd.DataFrame(query_res)
query_df.head()

Received 1026 rows


Unnamed: 0,bcr_patient_barcode,num_images
0,TCGA-05-4245,
1,TCGA-05-4249,
2,TCGA-05-4382,
3,TCGA-05-4384,
4,TCGA-05-4389,


Most of the patients in the `clinical_patient` table don't have image records, so let's preview just the ones which have images:

In [50]:
query_df.dropna()

Unnamed: 0,bcr_patient_barcode,num_images
17,TCGA-38-4626,3.0
18,TCGA-38-4628,4.0
20,TCGA-38-A44F,1.0
65,TCGA-50-5049,1.0
66,TCGA-50-5055,2.0
...,...,...
897,TCGA-60-2726,2.0
992,TCGA-92-7340,2.0
993,TCGA-92-8064,2.0
994,TCGA-92-8065,3.0


# Run the full query

Now that we've demonstrated we can perform the basic join query and retrieve the data, we will run the full multi-table join query. This query has one join for each quantity we are collecting, meaning typically one join per matching table -- although, in some examples noted in comments below, we have to use additional tweaks to get the appropriate data.

In [51]:
QUERY = """
SELECT clinical_patient.bcr_patient_barcode as patient_id,
    QTY_IMGS.quantity as num_images,
    QTY_IMG_SER.quantity as num_image_series,
    QTY_MUT.quantity as num_mutation_records,
    QTY_EXP.quantity as num_expression_records,
    QTY_CNV.quantity as num_cnv_records,
    QTY_CLIN_DRUG.quantity as num_clin_drug_records,
    QTY_CLIN_RAD.quantity as num_clin_rad_records,
    QTY_CLIN_FOL.quantity as num_clin_fol_records,
    QTY_CLIN_OMF.quantity as num_clin_omf_records,
    QTY_CLIN_NTE.quantity as num_clin_nte_records
FROM clinical_patient

LEFT JOIN
    (   SELECT COUNT(tcia_patients.patientid) AS quantity, 
            tcia_patients.patientid 
        FROM tcia_patients
        GROUP BY tcia_patients.patientid
    ) AS QTY_IMGS
ON clinical_patient.bcr_patient_barcode = QTY_IMGS.patientid

LEFT JOIN
    (   SELECT COUNT(tcia_image_series.patientid) AS quantity, 
            tcia_image_series.patientid 
        FROM tcia_image_series
        GROUP BY tcia_image_series.patientid
    ) AS QTY_IMG_SER
ON clinical_patient.bcr_patient_barcode = QTY_IMG_SER.patientid

LEFT JOIN
    (   SELECT COUNT(tcga_mutation.submitter_id) AS quantity, 
            tcga_mutation.submitter_id
        FROM tcga_mutation
        GROUP BY tcga_mutation.submitter_id
    ) AS QTY_MUT
ON clinical_patient.bcr_patient_barcode = QTY_MUT.submitter_id

-- The expression data is stored in a unique format - each patient ID is a column in one of two tables.
-- In order to query this, we use the `information_schema` special table which contains the metadata
-- about all tables in the database. This special table is first filtered and transformed via a computed
-- table expression and then grouped and joined to match the results of the other tables.
LEFT JOIN     
    (   WITH expression_patients AS (
            SELECT upper(substring(column_name, 1, 12)) AS patientid
            FROM information_schema.columns
            WHERE table_schema = '{database_name}'
            AND table_name LIKE 'expression_tcga_%'
            AND upper(column_name) LIKE 'TCGA-%'
        )
        SELECT COUNT(expression_patients.patientid) AS quantity, 
            expression_patients.patientid
        FROM expression_patients
        GROUP BY expression_patients.patientid
    ) AS QTY_EXP
ON clinical_patient.bcr_patient_barcode = QTY_EXP.patientid

LEFT JOIN
    (   SELECT COUNT(tcga_cnv.submitter_id[1]) AS quantity, 
            tcga_cnv.submitter_id[1] as submitter_id
        FROM tcga_cnv
        WHERE copy_number IS NOT NULL
        GROUP BY tcga_cnv.submitter_id[1]
    ) AS QTY_CNV
ON clinical_patient.bcr_patient_barcode = QTY_CNV.submitter_id

LEFT JOIN
    (   SELECT COUNT(clinical_drug.bcr_patient_barcode) AS quantity, 
            clinical_drug.bcr_patient_barcode
        FROM clinical_drug
        GROUP BY clinical_drug.bcr_patient_barcode
    ) AS QTY_CLIN_DRUG
ON clinical_patient.bcr_patient_barcode = QTY_CLIN_DRUG.bcr_patient_barcode

LEFT JOIN
    (   SELECT COUNT(clinical_radiation.bcr_patient_barcode) AS quantity, 
            clinical_radiation.bcr_patient_barcode
        FROM clinical_radiation
        GROUP BY clinical_radiation.bcr_patient_barcode
    ) AS QTY_CLIN_RAD
ON clinical_patient.bcr_patient_barcode = QTY_CLIN_RAD.bcr_patient_barcode

LEFT JOIN
    (   SELECT COUNT(clinical_follow_up_v1_0.bcr_patient_barcode) AS quantity, 
            clinical_follow_up_v1_0.bcr_patient_barcode
        FROM clinical_follow_up_v1_0
        GROUP BY clinical_follow_up_v1_0.bcr_patient_barcode
    ) AS QTY_CLIN_FOL
ON clinical_patient.bcr_patient_barcode = QTY_CLIN_FOL.bcr_patient_barcode

LEFT JOIN
    (   SELECT COUNT(clinical_omf_v4_0.bcr_patient_barcode) AS quantity, 
            clinical_omf_v4_0.bcr_patient_barcode
        FROM clinical_omf_v4_0
        GROUP BY clinical_omf_v4_0.bcr_patient_barcode
    ) AS QTY_CLIN_OMF
ON clinical_patient.bcr_patient_barcode = QTY_CLIN_OMF.bcr_patient_barcode

-- The NTE data is split across two tables, so in order to have one quantity for both tables, we union
-- the results of the same query together.

LEFT JOIN
    (   SELECT COUNT(clinical_nte_tcga_luad.bcr_patient_barcode) AS quantity, 
            clinical_nte_tcga_luad.bcr_patient_barcode
        FROM clinical_nte_tcga_luad
        GROUP BY clinical_nte_tcga_luad.bcr_patient_barcode
        UNION ALL 
        SELECT COUNT(clinical_nte_tcga_lusc.bcr_patient_barcode) AS quantity,
            clinical_nte_tcga_lusc.bcr_patient_barcode
        FROM clinical_nte_tcga_lusc
        GROUP BY clinical_nte_tcga_lusc.bcr_patient_barcode
    ) AS QTY_CLIN_NTE
ON clinical_patient.bcr_patient_barcode = QTY_CLIN_NTE.bcr_patient_barcode
""".format(database_name=database_name)

query_res = cursor.execute(QUERY).fetchall()
print(f'Received {len(query_res)} rows')
query_df = pd.DataFrame(query_res).fillna(0)
query_df = query_df.astype({c: 'int' for c in query_df.columns if c.startswith('num_')})
query_df.head()

Received 1026 rows


Unnamed: 0,patient_id,num_images,num_image_series,num_mutation_records,num_expression_records,num_cnv_records,num_clin_drug_records,num_clin_rad_records,num_clin_fol_records,num_clin_omf_records,num_clin_nte_records
0,TCGA-05-4417,0,0,330,1,60264,0,0,1,0,0
1,TCGA-05-4424,0,0,729,1,60222,1,3,1,0,0
2,TCGA-05-4426,0,0,46,1,60273,0,0,1,0,0
3,TCGA-05-5420,0,0,90,1,60215,0,0,1,2,0
4,TCGA-05-5425,0,0,602,1,60263,4,1,1,0,0


In [52]:
query_df.describe()

Unnamed: 0,num_images,num_image_series,num_mutation_records,num_expression_records,num_cnv_records,num_clin_drug_records,num_clin_rad_records,num_clin_fol_records,num_clin_omf_records,num_clin_nte_records
count,1026.0,1026.0,1026.0,1026.0,1026.0,1026.0,1026.0,1026.0,1026.0,1026.0
mean,0.132554,0.52924,343.067251,1.110136,62118.999025,0.797271,0.193957,1.154971,0.184211,0.066277
std,0.560464,2.488926,386.902819,0.354146,20701.282508,1.460315,0.488305,0.65694,0.457122,0.248887
min,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,0.0,0.0,134.0,1.0,60221.0,0.0,0.0,1.0,0.0,0.0
50%,0.0,0.0,243.0,1.0,60234.0,0.0,0.0,1.0,0.0,0.0
75%,0.0,0.0,409.75,1.0,60263.0,2.0,0.0,1.0,0.0,0.0
max,5.0,27.0,5214.0,3.0,241077.0,12.0,4.0,4.0,3.0,1.0


# Save the result to Athena

In [53]:
s3.Bucket(data_lake_bucket).objects.filter(Prefix='tcga-summary/').delete()
s3_url = f's3://{data_lake_bucket}/tcga-summary/part-0000.parquet'
query_df.to_parquet(s3_url)

# Register the new table

In [54]:
cursor.execute('DROP TABLE IF EXISTS tcga_summary;')

QUERY = """
CREATE EXTERNAL TABLE tcga_summary ({columns})
STORED AS PARQUET
LOCATION 's3://{data_lake_bucket}/tcga-summary/'
""".format(
    data_lake_bucket=data_lake_bucket,
    columns=', '.join(
        ['patient_id STRING'] + [
            col + ' INTEGER'
            for col in query_df.columns
            if col.startswith('num_')
        ]
    )
)
print(QUERY)

query_res = cursor.execute(QUERY).fetchall()
print(query_res)


CREATE EXTERNAL TABLE tcga_summary (patient_id STRING, num_images INTEGER, num_image_series INTEGER, num_mutation_records INTEGER, num_expression_records INTEGER, num_cnv_records INTEGER, num_clin_drug_records INTEGER, num_clin_rad_records INTEGER, num_clin_fol_records INTEGER, num_clin_omf_records INTEGER, num_clin_nte_records INTEGER)
STORED AS PARQUET
LOCATION 's3://genomicsanalysis-pipeline-datalakebucket-1g5hynzwz9kmd/tcga-summary/'

[]
