# Assignment 3.1: Impacting the Business with a Distributed Data Science Pipeline (Part 2)

### Sources

Beat Acute Myeloid Leukemia (AML) 1.0 was accessed on 13Mar2023 from https://registry.opendata.aws/beataml. OHSU BeatAML Datasets Link: 

https://ctd2-data.nci.nih.gov/Public/OHSU-1/BeatAML_Waves1_2/

OpenCell Datasets Link: 

https://opencell.czbiohub.org/download

### Datasets S3 Location

Importing Raw Datasets from AWS S3. Use the AWS Command Line Interface (CLI) to list the S3 bucket content using the following CLI commands:

In [2]:
!aws s3 ls s3://team4rawdatasets/

                           PRE CSV/
2023-03-19 23:39:34    2075301 OHSU_BeatAMLWaves1_2_Tyner_DrugResponse.csv
2023-03-19 23:39:34     390632 OHSU_BeatAMLWaves1_2_Tyner_Sensitive_ResistantCalls.csv
2023-03-19 23:39:34      10867 OHSU_BeatAML_PubChemCID_InhibitorPanel_2019.csv
2023-03-19 23:39:34     647376 opencell-library-metadata.csv
2023-03-19 23:39:34      61612 opencell-localization-annotations.csv
2023-03-19 23:39:34    2046131 opencell-protein-abundance.csv


# Check Pre-Requisites from the 01_setup/ Folder

In [3]:
%store -r setup_instance_check_passed

In [4]:
try:
    setup_instance_check_passed
except NameError:
    print("+++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN ALL NOTEBOOKS IN THE SETUP FOLDER FIRST. You are missing Instance Check.")
    print("+++++++++++++++++++++++++++++++")

In [5]:
print(setup_instance_check_passed)

True


In [6]:
%store -r setup_dependencies_passed

In [7]:
try:
    setup_dependencies_passed
except NameError:
    print("+++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN ALL NOTEBOOKS IN THE SETUP FOLDER FIRST. You are missing Setup Dependencies.")
    print("+++++++++++++++++++++++++++++++")

In [8]:
print(setup_dependencies_passed)

True


In [9]:
%store -r setup_s3_bucket_passed

In [10]:
try:
    setup_s3_bucket_passed
except NameError:
    print("+++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN ALL NOTEBOOKS IN THE SETUP FOLDER FIRST. You are missing Setup S3 Bucket.")
    print("+++++++++++++++++++++++++++++++")

In [11]:
print(setup_s3_bucket_passed)

True


In [12]:
%store -r setup_iam_roles_passed

In [13]:
try:
    setup_iam_roles_passed
except NameError:
    print("+++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN ALL NOTEBOOKS IN THE SETUP FOLDER FIRST. You are missing Setup IAM Roles.")
    print("+++++++++++++++++++++++++++++++")

In [14]:
print(setup_iam_roles_passed)

True


In [15]:
if not setup_instance_check_passed:
    print("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN ALL NOTEBOOKS IN THE SETUP FOLDER FIRST. You are missing Instance Check.")
    print("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
if not setup_dependencies_passed:
    print("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN ALL NOTEBOOKS IN THE SETUP FOLDER FIRST. You are missing Setup Dependencies.")
    print("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
if not setup_s3_bucket_passed:
    print("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN ALL NOTEBOOKS IN THE SETUP FOLDER FIRST. You are missing Setup S3 Bucket.")
    print("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
if not setup_iam_roles_passed:
    print("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN ALL NOTEBOOKS IN THE SETUP FOLDER FIRST. You are missing Setup IAM Roles.")
    print("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")

In [16]:
import boto3
import sagemaker
import pandas as pd

sess = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name
account_id = boto3.client("sts").get_caller_identity().get("Account")

sm = boto3.Session().client(service_name="sagemaker", region_name=region)

# Set S3 Source Location 

In [17]:
s3_path_csv = "s3://team4rawdatasets/CSV"

In [18]:
%store s3_path_csv

Stored 's3_path_csv' (str)


In [19]:
%store

Stored variables and their in-db values:
ingest_create_athena_db_passed             -> True
s3_path_csv                                -> 's3://team4rawdatasets/CSV'
s3_public_path_csv                         -> 's3://team4rawdatasets/CSV'
setup_dependencies_passed                  -> True
setup_iam_roles_passed                     -> True
setup_instance_check_passed                -> True
setup_s3_bucket_passed                     -> True


# Create Athena Database Schema

In [20]:
ingest_create_athena_db_passed = False

In [21]:
%store -r s3_path_csv

In [22]:
try:
    s3_path_csv
except NameError:
    print("*****************************************************************************")
    print("[ERROR] PLEASE RE-RUN THE PREVIOUS COPY TSV TO S3 NOTEBOOK ******************")
    print("[ERROR] THIS NOTEBOOK WILL NOT RUN PROPERLY. ********************************")
    print("*****************************************************************************")

In [23]:
print(s3_path_csv)

s3://team4rawdatasets/CSV


# Import PyAthena

[PyAthena](https://pypi.org/project/PyAthena/) is a Python DB API 2.0 (PEP 249) compliant client for Amazon Athena.

In [24]:
!pip install --disable-pip-version-check -q PyAthena==2.1.0
from pyathena import connect

[0m

# Create Athena Database

In [25]:
database_name = "clsmaws"

In [26]:
# Set S3 staging directory -- this is a temporary directory used for Athena queries
s3_staging_dir = "s3://{0}/athena/staging".format(bucket)

In [27]:
conn = connect(region_name=region, s3_staging_dir=s3_staging_dir)

In [28]:
statement = "CREATE DATABASE IF NOT EXISTS {}".format(database_name)
print(statement)

CREATE DATABASE IF NOT EXISTS clsmaws


In [29]:
pd.read_sql(statement, conn)

# Verify The Database Has Been Created Succesfully

In [30]:
statement = "SHOW DATABASES"

df_show = pd.read_sql(statement, conn)
df_show.head(5)

Unnamed: 0,database_name
0,clsmaws
1,default
2,dsoaws


In [31]:
if database_name in df_show.values:
    ingest_create_athena_db_passed = True

In [32]:
%store ingest_create_athena_db_passed

Stored 'ingest_create_athena_db_passed' (bool)


In [33]:
%store

Stored variables and their in-db values:
ingest_create_athena_db_passed             -> True
s3_path_csv                                -> 's3://team4rawdatasets/CSV'
s3_public_path_csv                         -> 's3://team4rawdatasets/CSV'
setup_dependencies_passed                  -> True
setup_iam_roles_passed                     -> True
setup_instance_check_passed                -> True
setup_s3_bucket_passed                     -> True


# Register CSV Data With Athena

In [34]:
ingest_create_athena_table_csv_passed = False

In [35]:
%store -r ingest_create_athena_db_passed

In [36]:
try:
    ingest_create_athena_db_passed
except NameError:
    print("++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not create the Athena Database.")
    print("++++++++++++++++++++++++++++++++++++++++++++++")

In [37]:
print(ingest_create_athena_db_passed)

True


In [38]:
if not ingest_create_athena_db_passed:
    print("++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not create the Athena Database.")
    print("++++++++++++++++++++++++++++++++++++++++++++++")
else:
    print("[OK]")

[OK]


In [39]:
%store -r s3_path_csv

In [40]:
try:
    s3_path_csv
except NameError:
    print("*****************************************************************************")
    print("[ERROR] PLEASE RE-RUN THE PREVIOUS COPY TSV TO S3 NOTEBOOK ******************")
    print("[ERROR] THIS NOTEBOOK WILL NOT RUN PROPERLY. ********************************")
    print("*****************************************************************************")

In [41]:
print(s3_path_csv)

s3://team4rawdatasets/CSV


# Import PyAthena

In [42]:
from pyathena import connect

# Create Athena Table from Local CSV Files

#### BeatAML Clinical Summary Features: 

- 'lab_Id' : Unique identifier for each Specimen. 
- 'PatientId' : Unique  identifier for each Patient. 
- 'consensus_sex' : Sex description for each Patient obtained from the EMR and filled in with inferred sex information using RNAseq data and a set of 28 differentially expressed genes. 
- 'inferred_ethnicity' : Ethnicity for each Patient inferred from genomic data using a  methodology developped by Zheng and Weir (2015). 
- 'ageAtDiagnosis' : Difference in years between diagnosis date on Disease Instance and date of birth on Patient.
- 'isRelapse' : If Specimen is obtained at time of relapsed disease.
- 'isDenovo' : If Specimen is obtained at the time of de novo disease.
- 'specificDxAtAcquisition_MDSMPN' : Patient's specific diagnosis at inclusion was MDS/MPN related specific diagnosis. 
- 'priorMalignancyNonMyeloid' : If Patient had a prior non-myeloid malignancy. Yes if:  Patient had a prior non-myeloid malignancy  
- 'priorMalignancyType' : Type of prior non-myeloid malignancy. All instances  are listed in the order they were described in the EMR (separated by vertical bars); Disease instances can be duplicated due to multiple treatment regiments.
- 'cumulativeChemo' : If Patient has ever received chemotherapy during the course of their treatment. Yes if: Patient received radiation treatment for a prior malignancy (non- heme).
- 'priorMDS' : If Patient was ever diagnosed with MDS. Yes if:  Patient was ever diagnosed with MDS. 
- 'priorMDSMPN' : If Patient was ever diagnosed with MDS/MPN. Yes if:  Patient was ever diagnosed with MDS/MPN.
- 'priorMPN' : If Patient was ever diagnosed with MPN. Yes if: Patient was ever diagnosed with MPN. 
- 'ELN2017' : Risk classification assigned to the Specimen based on European Leukemia Network 2017 guidelines. Unknown = not enough information to deterrmine classification. 
- 'dxAtSpecimenAcquisition' : Diagnosis at time of Specimen acquisition.
- 'specificDxAtAcquisition' : Specific Diagnosis at time of Specimen acquisition.
- 'ageAtSpecimenAcquisition' : Difference in years between Specimen collection date and date of birth of Patient. 
- 'vitalStatus' : Patient vital status at the date of most recent follow-up	Unknown: unknown or lost to followup
- 'overallSurvival' :If vitalStatus is Dead or Alive, the calculated days between the diagnosis date and the followup date found when determining vitalStatus. Blank: vital status is unknown; -1: if the diagnosis date does not exist and overall Survival cannot be calculated; 0: if there was no follow-up after the date of diagnosis
- 'causeOfDeath' : If the vitalStatus is Dead then assessment of  cause of death. Alive: not applicable; Dead: Disease: Death related to disease; Dead: Other: Death not associated with disease; Dead: Treatment: Death associated with treatment administered to combat disease; Dead: Unknown: Death with unknown cause; Blank: if the vitalStaus is unknown
- 'FLT3-ITD_consensus' : Consensus call for FLT3 -ITD used in the analysis. Positive: consensus call was FLT3-ITD positive; Note: If the clinical and internal were discordant and WES results matched the clinical results, then we would manually review
- 'NPM1_consensus' : Consensus call for NPM1 used in the analysis. Positive: consensus call was NPM1 mutation positive; Note: If the clinical and internal were discordant and WES results matched the clinical results, then we would manually review. 


In [43]:
# Set S3 staging directory -- this is a temporary directory used for Athena queries
s3_staging_dir = "s3://{0}/athena/staging".format(bucket)

In [45]:
# Set Athena parameters
database_name = "clsmaws"
table_name_csv = "OHSU_BeatAMLWaves1_2_Tyner_ClinicalSummary"

In [46]:
conn = connect(region_name=region, s3_staging_dir=s3_staging_dir)

In [47]:
# SQL statement to execute
statement = """CREATE EXTERNAL TABLE IF NOT EXISTS {}.{}(
         lab_Id ,
         PatientId ,
         consensus_sex ,
         inferred_ethnicity ,
         ageAtDiagnosis ,
         isRelapse ,
         isDenovo ,
         specificDxAtAcquisition_MDSMPN ,
         priorMalignancyNonMyeloid ,
         priorMalignancyType ,
         cumulativeChemo ,
         priorMDS ,
         priorMDSMPN ,
         priorMPN ,
         ELN2017 ,
         dxAtSpecimenAcquisition ,
         specificDxAtAcquisition ,
         ageAtSpecimenAcquisition ,
         vitalStatus ,
         overallSurvival ,
         causeOfDeath , 
         FLT3-ITD_consensus , 
         NPM1_consensus ,
) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' LINES TERMINATED BY '\\n' LOCATION '{}'
TBLPROPERTIES ('compressionType'='gzip', 'skip.header.line.count'='1')""".format(
    database_name, table_name_csv, s3_path_csv
)

print(statement)

CREATE EXTERNAL TABLE IF NOT EXISTS clsmaws.OHSU_BeatAMLWaves1_2_Tyner_ClinicalSummary(
         lab_Id ,
         PatientId ,
         consensus_sex ,
         inferred_ethnicity ,
         ageAtDiagnosis ,
         isRelapse ,
         isDenovo ,
         specificDxAtAcquisition_MDSMPN ,
         priorMalignancyNonMyeloid ,
         priorMalignancyType ,
         cumulativeChemo ,
         priorMDS ,
         priorMDSMPN ,
         priorMPN ,
         ELN2017 ,
         dxAtSpecimenAcquisition ,
         specificDxAtAcquisition ,
         ageAtSpecimenAcquisition ,
         vitalStatus ,
         overallSurvival ,
         causeOfDeath , 
         FLT3-ITD_consensus , 
         NPM1_consensus ,
) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' LOCATION 's3://team4rawdatasets/CSV'
TBLPROPERTIES ('compressionType'='gzip', 'skip.header.line.count'='1')


In [48]:
pd.read_sql(statement, conn)

Failed to execute query.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/pyathena/common.py", line 305, in _execute
    **request
  File "/opt/conda/lib/python3.7/site-packages/pyathena/util.py", line 84, in retry_api_call
    return retry(func, *args, **kwargs)
  File "/opt/conda/lib/python3.7/site-packages/tenacity/__init__.py", line 379, in __call__
    do = self.iter(retry_state=retry_state)
  File "/opt/conda/lib/python3.7/site-packages/tenacity/__init__.py", line 314, in iter
    return fut.result()
  File "/opt/conda/lib/python3.7/concurrent/futures/_base.py", line 428, in result
    return self.__get_result()
  File "/opt/conda/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/opt/conda/lib/python3.7/site-packages/tenacity/__init__.py", line 382, in __call__
    result = fn(*args, **kwargs)
  File "/opt/conda/lib/python3.7/site-packages/botocore/client.py", line 530, in _api_call
    return

DatabaseError: Execution failed on sql: CREATE EXTERNAL TABLE IF NOT EXISTS clsmaws.OHSU_BeatAMLWaves1_2_Tyner_ClinicalSummary(
         lab_Id ,
         PatientId ,
         consensus_sex ,
         inferred_ethnicity ,
         ageAtDiagnosis ,
         isRelapse ,
         isDenovo ,
         specificDxAtAcquisition_MDSMPN ,
         priorMalignancyNonMyeloid ,
         priorMalignancyType ,
         cumulativeChemo ,
         priorMDS ,
         priorMDSMPN ,
         priorMPN ,
         ELN2017 ,
         dxAtSpecimenAcquisition ,
         specificDxAtAcquisition ,
         ageAtSpecimenAcquisition ,
         vitalStatus ,
         overallSurvival ,
         causeOfDeath , 
         FLT3-ITD_consensus , 
         NPM1_consensus ,
) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' LOCATION 's3://team4rawdatasets/CSV'
TBLPROPERTIES ('compressionType'='gzip', 'skip.header.line.count'='1')
An error occurred (InvalidRequestException) when calling the StartQueryExecution operation: line 1:8: mismatched input 'EXTERNAL'. Expecting: 'OR', 'SCHEMA', 'TABLE', 'VIEW'
unable to rollback

# Release Resources

In [49]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>
        
<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>

In [21]:
%%javascript

try {
    Jupyter.notebook.save_checkpoint();
    Jupyter.notebook.session.delete();
}
catch(err) {
    // NoOp
}

<IPython.core.display.Javascript object>