## Importing cases from the TCGA-BRCA project with their population characteristics

In [0]:
import requests
import json
import pandas as pd
import io
import os

In [0]:
# Creating a list of projects with somatic mutations across different cancer types
ssms_projects = ["TCGA-BRCA", "TCGA-COAD", "TCGA-READ", "TCGA-LUSC", "TCGA-LUAD", "CPTAC-3", "TARGET-AML", "TARGET-ALL-P2", "MMRF-COMMPASS", "BEATAML1.0-COHORT", "TCGA-LAML", "TCGA-GBM", "TCGA-LGG", "TCGA-UCEC", "TCGA-KIRC", "TCGA-KIRP", "TCGA-KICH", "TCGA-PAAD", "TCGA-SKCM", "HCMI-CMDC", "CMI-ASC", "TCGA-THCA", "TCGA-DLBC"]

In [0]:
cases_fields = [
    "project.project_id",
    "primary_site",
    "submitter_id",
    "demographic.race",
    "demographic.year_of_death",
    "disease_type",
    "diagnoses.age_at_diagnosis",
    "diagnoses.primary_diagnosis",
    "diagnoses.ajcc_pathologic_stage",
    "diagnoses.prior_malignancy",
    "diagnoses.synchronous_malignancy"
    ]

cases_fields = ",".join(cases_fields)

cases_endpt = "https://api.gdc.cancer.gov/cases"

cases_filters = {
    "op": "and",
    "content":[
        {
        "op": "in",
        "content": {
            "field": "project.project_id",
            "value": ssms_projects
            }
        },
    ]
}

# With a GET request, the filters parameter needs to be converted
# from a dictionary to JSON-formatted string

cases_params = {
    "filters": json.dumps(cases_filters),
    "fields": cases_fields,
    "format": "CSV",
    "size": "20000"
    }

cases_response = requests.get(cases_endpt, params = cases_params)

byte_cases_data = cases_response.content

string_cases_data = byte_cases_data.decode('ASCII')

cases_data = io.StringIO(string_cases_data)

cases_df = pd.read_csv(cases_data, sep=",")

print(cases_df.shape) 
cases_df.head()

(14304, 22)


Unnamed: 0,demographic.race,demographic.year_of_death,diagnoses.0.age_at_diagnosis,diagnoses.0.ajcc_pathologic_stage,diagnoses.0.primary_diagnosis,diagnoses.0.prior_malignancy,diagnoses.0.synchronous_malignancy,diagnoses.1.age_at_diagnosis,diagnoses.1.ajcc_pathologic_stage,diagnoses.1.primary_diagnosis,diagnoses.1.prior_malignancy,diagnoses.1.synchronous_malignancy,diagnoses.2.age_at_diagnosis,diagnoses.2.ajcc_pathologic_stage,diagnoses.2.primary_diagnosis,diagnoses.2.prior_malignancy,diagnoses.2.synchronous_malignancy,disease_type,id,primary_site,project.project_id,submitter_id
0,black or african american,,3505.0,,"Acute myeloid leukemia, NOS",,,,,,,,,,,,,Myeloid Leukemias,29312892-078e-4a35-809c-729f55370967,Hematopoietic and reticuloendothelial systems,TARGET-AML,TARGET-20-PAUPIY
1,white,,2327.0,,"Acute myeloid leukemia, NOS",,,,,,,,,,,,,Myeloid Leukemias,2939c0a9-3c47-4019-b9e3-958e84a12bb5,Hematopoietic and reticuloendothelial systems,TARGET-AML,TARGET-20-PAVDDU
2,white,,6115.0,,"Acute myeloid leukemia, NOS",,,,,,,,,,,,,Myeloid Leukemias,cecefca5-6308-49f6-b9c2-226235d60613,Hematopoietic and reticuloendothelial systems,TARGET-AML,TARGET-20-PAWYTW
3,white,,5854.0,,"Acute myeloid leukemia, NOS",,,,,,,,,,,,,Myeloid Leukemias,56404ff6-a971-4d84-9891-0053b1075ee3,Hematopoietic and reticuloendothelial systems,TARGET-AML,TARGET-20-PAWLHJ
4,white,,5129.0,,"Acute myeloid leukemia, NOS",,,,,,,,,,,,,Myeloid Leukemias,792187f7-d5c3-497d-9573-f7411f027aa3,Hematopoietic and reticuloendothelial systems,TARGET-AML,TARGET-20-PAXLJV


In [0]:
selected_cols = ['demographic.race', 
                 'demographic.year_of_death', 
                 'diagnoses.0.age_at_diagnosis', 
                 'diagnoses.0.ajcc_pathologic_stage', 
                 'diagnoses.0.primary_diagnosis', 
                 'diagnoses.0.prior_malignancy', 
                 'diagnoses.0.synchronous_malignancy', 
                 'disease_type', 
                 'id', 
                 'primary_site', 
                 'project.project_id', 
                 'submitter_id']

cases_df = cases_df[selected_cols]
print(cases_df.shape) 
cases_df.head()

(14304, 12)


Unnamed: 0,demographic.race,demographic.year_of_death,diagnoses.0.age_at_diagnosis,diagnoses.0.ajcc_pathologic_stage,diagnoses.0.primary_diagnosis,diagnoses.0.prior_malignancy,diagnoses.0.synchronous_malignancy,disease_type,id,primary_site,project.project_id,submitter_id
0,black or african american,,3505.0,,"Acute myeloid leukemia, NOS",,,Myeloid Leukemias,29312892-078e-4a35-809c-729f55370967,Hematopoietic and reticuloendothelial systems,TARGET-AML,TARGET-20-PAUPIY
1,white,,2327.0,,"Acute myeloid leukemia, NOS",,,Myeloid Leukemias,2939c0a9-3c47-4019-b9e3-958e84a12bb5,Hematopoietic and reticuloendothelial systems,TARGET-AML,TARGET-20-PAVDDU
2,white,,6115.0,,"Acute myeloid leukemia, NOS",,,Myeloid Leukemias,cecefca5-6308-49f6-b9c2-226235d60613,Hematopoietic and reticuloendothelial systems,TARGET-AML,TARGET-20-PAWYTW
3,white,,5854.0,,"Acute myeloid leukemia, NOS",,,Myeloid Leukemias,56404ff6-a971-4d84-9891-0053b1075ee3,Hematopoietic and reticuloendothelial systems,TARGET-AML,TARGET-20-PAWLHJ
4,white,,5129.0,,"Acute myeloid leukemia, NOS",,,Myeloid Leukemias,792187f7-d5c3-497d-9573-f7411f027aa3,Hematopoietic and reticuloendothelial systems,TARGET-AML,TARGET-20-PAXLJV


## Top most commonly mutated genes across the cancer types

In [0]:
def getSSMS(project):
    top_ssms_endpt = "https://api.gdc.cancer.gov/analysis/top_mutated_genes_by_project"

    top_ssms_fields = [
        "name",
        "symbol"
        ]

    top_ssms_fields = ",".join(top_ssms_fields)

    top_ssms_filters = {  
    "op":"AND",
    "content":[  
        {  
            "op":"=",
            "content":{  
                "field":"case.project.project_id",
                "value": project
            }
        },
        {  
            "op":"in",
            "content":{  
                "field":"case.ssm.consequence.transcript.annotation.vep_impact",
                "value":[  
                "HIGH",
                "MODERATE"
                ]
            }
        }
    ]
    }

    top_ssms_params = {
        "filters": json.dumps(top_ssms_filters),
        "fields": top_ssms_fields,
        "format": "CSV",
        "size": "3"
        }

    top_ssms_response = requests.get(top_ssms_endpt, params = top_ssms_params)

    if top_ssms_response == None or top_ssms_response.content == bytes('\r\n', 'utf-8'):
        return []
    else:
        byte_top_ssms_data = top_ssms_response.content
        string_top_ssms_data = byte_top_ssms_data.decode('ASCII')
        top_ssms_data = io.StringIO(string_top_ssms_data)
        top_ssms_df = pd.read_csv(top_ssms_data, sep=",")
        return top_ssms_df['symbol'].tolist()
    return None

In [0]:
ssms_list = []

for project in ssms_projects:
    ssms_list += getSSMS(project)

ssms_list = list(dict.fromkeys(ssms_list))
print(ssms_list)
print(len(ssms_list))

['TP53', 'PIK3CA', 'TTN', 'APC', 'CSMD3', 'MUC16', 'KRAS', 'NRAS', 'MUC5B', 'NOTCH1', 'IGHV2-70', 'IGLV3-1', 'DNMT3A', 'NPM1', 'FLT3', 'PTEN', 'IDH1', 'ATRX', 'ARID1A', 'VHL', 'PBRM1', 'MET', 'SMAD4', 'BRAF', 'LRP2', 'FREM2', 'KMT2D', 'BTG2', 'B2M']
29


# Querying ssm occurrences in each case using Apache Spark UDF

In [0]:
# You can also use this section to suppress warnings generated by your code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# import SparkSession
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("TCGA").getOrCreate()

from pyspark import SparkContext
sc = spark.sparkContext

In [0]:
from pyspark.sql.functions import udf, col, array_contains, lit
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType
from pyspark.sql import Row

In [0]:
# Defining the api get request
def executeRestApi(project_id, case_id, top_ssms):
    
    ssm_filters = {  
       "op":"AND",
       "content":[  
          {  
             "op":"=",
             "content":{  
                "field":"case.project.project_id",
                "value": "{project_id}".format(project_id=project_id)
             }
          },
          {  
                "op":"=",
                "content":{  
                  "field":"case.submitter_id",
                  "value": "{case_id}".format(case_id=case_id)
             }
          },
           {  
             "op":"in",
             "content":{  
                "field":"ssm.consequence.transcript.gene.symbol",
                "value": top_ssms
             }
          }
       ]
    }
    
    ssm_params = {
        "filters": json.dumps(ssm_filters),
        "fields": "ssm.consequence.transcript.gene.symbol",
        "format": "CSV",
        "size": "29"
    }
    
    res = None
    
    try:
        res = requests.get("https://api.gdc.cancer.gov/ssm_occurrences", params = ssm_params)
    except Exception as e:
        return e
    if res!= None and res.content != bytes('\r\n', 'utf-8'):
        bytes_data = res.content
        
        string_ssm_data = bytes_data.decode('ASCII')
        
        ssm_data = io.StringIO(string_ssm_data)

        temp_ssm_df = pd.read_csv(ssm_data, sep=",")

        ssms_result = temp_ssm_df["ssm.consequence.0.transcript.gene.symbol"].tolist()
        
        return ssms_result
    else:
        return ""
    return None

In [0]:
# Creating the UDF
udf_executeRestApi = udf(executeRestApi, ArrayType(StringType()))

In [0]:
# Converting the cases dataframe from Pandas to Spark df
spark_cases_df = spark.createDataFrame(cases_df)
spark_cases_df = spark_cases_df.withColumn("Top_SSMS", lit(ssms_list))
spark_cases_df.printSchema()
spark_cases_df.show(5)

root
 |-- demographic.race: string (nullable = true)
 |-- demographic.year_of_death: double (nullable = true)
 |-- diagnoses.0.age_at_diagnosis: double (nullable = true)
 |-- diagnoses.0.ajcc_pathologic_stage: string (nullable = true)
 |-- diagnoses.0.primary_diagnosis: string (nullable = true)
 |-- diagnoses.0.prior_malignancy: string (nullable = true)
 |-- diagnoses.0.synchronous_malignancy: string (nullable = true)
 |-- disease_type: string (nullable = true)
 |-- id: string (nullable = true)
 |-- primary_site: string (nullable = true)
 |-- project.project_id: string (nullable = true)
 |-- submitter_id: string (nullable = true)
 |-- Top_SSMS: array (nullable = false)
 |    |-- element: string (containsNull = false)

+--------------------+-------------------------+----------------------------+---------------------------------+-----------------------------+----------------------------+----------------------------------+-----------------+--------------------+--------------------+-------

In [0]:
# Using UDF to execute the API get request once per submitter ID in the cases df
result_df = spark_cases_df.withColumn("SSMS", udf_executeRestApi(col("`project.project_id`"), col("submitter_id"), col("Top_SSMS")))

result_df.show()

+--------------------+-------------------------+----------------------------+---------------------------------+-----------------------------+----------------------------+----------------------------------+-----------------+--------------------+--------------------+------------------+--------------------+--------------------+----+
|    demographic.race|demographic.year_of_death|diagnoses.0.age_at_diagnosis|diagnoses.0.ajcc_pathologic_stage|diagnoses.0.primary_diagnosis|diagnoses.0.prior_malignancy|diagnoses.0.synchronous_malignancy|     disease_type|                  id|        primary_site|project.project_id|        submitter_id|            Top_SSMS|SSMS|
+--------------------+-------------------------+----------------------------+---------------------------------+-----------------------------+----------------------------+----------------------------------+-----------------+--------------------+--------------------+------------------+--------------------+--------------------+----+
|bla

In [0]:
# Hotcoding SSMS columns and renaming columns
final_df = result_df.select(col("`demographic.race`").alias("Race"),
                            col("`demographic.year_of_death`").alias("Deceased_Status"),
                            col("`diagnoses.0.age_at_diagnosis`").alias("Age_At_Diagnosis"),
                            col("`diagnoses.0.ajcc_pathologic_stage`").alias("Pathologic_Stage"),
                            col("`diagnoses.0.primary_diagnosis`").alias("Primary_Diagnosis"),
                            col("`diagnoses.0.prior_malignancy`").alias("Prior_Malignancy"),
                            col("`diagnoses.0.synchronous_malignancy`").alias("Synchronous_Malignancy"),
                            col("`disease_type`").alias("Disease_Type"),
                            col("`id`").alias("ID"),
                            col("`primary_site`").alias("Primary_Site"),
                            col("`submitter_id`").alias("Submitter_ID"),
                            array_contains('SSMS', ssms_list[0]).alias(ssms_list[0]),
                            array_contains('SSMS', ssms_list[1]).alias(ssms_list[1]),
                            array_contains('SSMS', ssms_list[2]).alias(ssms_list[2]),
                            array_contains('SSMS', ssms_list[3]).alias(ssms_list[3]),
                            array_contains('SSMS', ssms_list[4]).alias(ssms_list[4]),
                            array_contains('SSMS', ssms_list[5]).alias(ssms_list[5]),
                            array_contains('SSMS', ssms_list[6]).alias(ssms_list[6]),
                            array_contains('SSMS', ssms_list[7]).alias(ssms_list[7]),
                            array_contains('SSMS', ssms_list[8]).alias(ssms_list[8]),
                            array_contains('SSMS', ssms_list[9]).alias(ssms_list[9]),
                            array_contains('SSMS', ssms_list[10]).alias(ssms_list[10]),
                            array_contains('SSMS', ssms_list[11]).alias(ssms_list[11]),
                            array_contains('SSMS', ssms_list[12]).alias(ssms_list[12]),
                            array_contains('SSMS', ssms_list[13]).alias(ssms_list[13]),
                            array_contains('SSMS', ssms_list[14]).alias(ssms_list[14]),
                            array_contains('SSMS', ssms_list[15]).alias(ssms_list[15]),
                            array_contains('SSMS', ssms_list[16]).alias(ssms_list[16]),
                            array_contains('SSMS', ssms_list[17]).alias(ssms_list[17]),
                            array_contains('SSMS', ssms_list[18]).alias(ssms_list[18]),
                            array_contains('SSMS', ssms_list[19]).alias(ssms_list[19]),
                            array_contains('SSMS', ssms_list[20]).alias(ssms_list[20]),
                            array_contains('SSMS', ssms_list[21]).alias(ssms_list[21]),
                            array_contains('SSMS', ssms_list[22]).alias(ssms_list[22]),
                            array_contains('SSMS', ssms_list[23]).alias(ssms_list[23]),
                            array_contains('SSMS', ssms_list[24]).alias(ssms_list[24]),
                            array_contains('SSMS', ssms_list[25]).alias(ssms_list[25]),
                            array_contains('SSMS', ssms_list[26]).alias(ssms_list[26]),
                            array_contains('SSMS', ssms_list[27]).alias(ssms_list[27]),
                            array_contains('SSMS', ssms_list[28]).alias(ssms_list[28]))
                            # array_contains('SSMS', ssms_list[29]).alias(ssms_list[29]),
                            # array_contains('SSMS', ssms_list[30]).alias(ssms_list[30]),
                            # array_contains('SSMS', ssms_list[31]).alias(ssms_list[31]),
                            # array_contains('SSMS', ssms_list[32]).alias(ssms_list[32]),
                            # array_contains('SSMS', ssms_list[33]).alias(ssms_list[33]),
                            # array_contains('SSMS', ssms_list[34]).alias(ssms_list[34]),
                            # array_contains('SSMS', ssms_list[35]).alias(ssms_list[35]),
                            # array_contains('SSMS', ssms_list[36]).alias(ssms_list[36]),
                            # array_contains('SSMS', ssms_list[37]).alias(ssms_list[37]),
                            # array_contains('SSMS', ssms_list[38]).alias(ssms_list[38]),
                            # array_contains('SSMS', ssms_list[39]).alias(ssms_list[39]),
                            # array_contains('SSMS', ssms_list[40]).alias(ssms_list[40]),
                            # array_contains('SSMS', ssms_list[41]).alias(ssms_list[41]),
                            # array_contains('SSMS', ssms_list[41]).alias(ssms_list[42]),
                            # array_contains('SSMS', ssms_list[41]).alias(ssms_list[43]),
                            # array_contains('SSMS', ssms_list[41]).alias(ssms_list[44]),
                            # array_contains('SSMS', ssms_list[41]).alias(ssms_list[45]))
                            
final_df.show()

+--------------------+---------------+----------------+----------------+--------------------+----------------+----------------------+-----------------+--------------------+--------------------+--------------------+----+------+----+----+-----+-----+----+----+-----+------+--------+-------+------+----+----+----+----+----+------+----+-----+----+-----+----+----+-----+-----+----+----+
|                Race|Deceased_Status|Age_At_Diagnosis|Pathologic_Stage|   Primary_Diagnosis|Prior_Malignancy|Synchronous_Malignancy|     Disease_Type|                  ID|        Primary_Site|        Submitter_ID|TP53|PIK3CA| TTN| APC|CSMD3|MUC16|KRAS|NRAS|MUC5B|NOTCH1|IGHV2-70|IGLV3-1|DNMT3A|NPM1|FLT3|PTEN|IDH1|ATRX|ARID1A| VHL|PBRM1| MET|SMAD4|BRAF|LRP2|FREM2|KMT2D|BTG2| B2M|
+--------------------+---------------+----------------+----------------+--------------------+----------------+----------------------+-----------------+--------------------+--------------------+--------------------+----+------+----+----+

In [0]:
final_df.groupBy('Primary_Site').count().show(50)

+--------------------+-----+
|        Primary_Site|count|
+--------------------+-----+
|     Small intestine|    7|
|                Skin|  505|
|   Bronchus and lung| 1431|
|         Lymph nodes|   30|
|Hematopoietic and...| 6040|
|Heart, mediastinu...|    2|
|               Colon|  505|
|         Uterus, NOS|  254|
|             Unknown|   70|
|              Kidney| 1204|
|Other and ill-def...|  112|
|             Bladder|    1|
|Other and ill-def...|    2|
|             Stomach|   17|
|Bones, joints and...|    5|
|Retroperitoneum a...|    1|
|            Pancreas|  401|
|       Thyroid gland|  509|
|               Brain| 1294|
|Other and unspeci...|    1|
|              Testis|    1|
|Connective, subcu...|    8|
|Other and ill-def...|    1|
|              Breast| 1120|
|Rectosigmoid junc...|   80|
|              Rectum|  110|
|        Corpus uteri|  548|
|Liver and intrahe...|    1|
|         Gallbladder|    1|
|           Esophagus|   34|
|Other and unspeci...|    1|
|Other and uns

## Writing the final ingested Spark dataframe as a CSV and upload to the S3 bucket

In [0]:
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")

In [0]:
sc._jsc.hadoopConfiguration().set('fs.s3a.access.key', AWS_ACCESS_KEY_ID)
sc._jsc.hadoopConfiguration().set('fs.s3a.secret.key', AWS_SECRET_ACCESS_KEY)
sc._jsc.hadoopConfiguration().set('fs.s3a.path.style.access', 'true')
sc._jsc.hadoopConfiguration().set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
sc._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 's3.amazonaws.com')

In [0]:
final_df.repartition(1).write.format('csv').option('header','true').save('s3a://csis-4495-tcga/rawdata/', mode='overwrite')

In [0]:
# Rename the file on S3 for SageMaker pipeline
source_path = 's3a://csis-4495-tcga/rawdata/'
destination_path = 's3a://csis-4495-tcga/rawdata/'

def rename_file_with_location(source_path,destination_path,file_name):
    files = dbutils.fs.ls(source_path)
    csv_file = [x.path for x in files if x.path.endswith(".csv")][0]
    dbutils.fs.mv(csv_file, destination_path + file_name)
    print("File has been renamed from"+source_path+"to this"+destination_path+file_name)

rename_file_with_location(source_path, destination_path, 'tcga_raw.csv')

File has been renamed froms3a://csis-4495-tcga/rawdata/to thiss3a://csis-4495-tcga/rawdata/tcga_raw.csv


In [0]:
# Stop the Spark session
spark.stop()