# Neo4J v1.1 - Trellis : Job based Analysis

#### V1.0
##### - Temporarily skip to collect GATK subtasks informaion because the query takes more than 1 hour. 

#### V1.1
##### - Exclue QC job information in Bigquery 
================

## Set up the environment


### Install py2neo for querying Neo4J 

In [1]:
pip3 install -U py2neo

# #add python path of py2neo in system
pip3 install -U neotime
pip3 install -U neobolt
pip3 install -U pandas-gbq

### Import Packages

In [2]:
from py2neo import Graph
from google.cloud import storage
import yaml

import pandas as pd
import pandas_gbq

import numpy as np
import subprocess
import matplotlib 
import matplotlib.pyplot as plt
import seaborn as sns

pd.set_option('display.float_format', lambda x: '%.2f' % x)

### Load Neo4J DB

In [3]:
bucket_info=''
credential_info=''

In [4]:
## Option 1 : Read DB and Account Information in Google Storage (YAML)

# create storage client
storage_client = storage.Client()
# get bucket with name
bucket = storage_client.get_bucket(bucket_info)
# get bucket data as blob
blob = bucket.get_blob(credential_info)
# convert to string
yaml_data = blob.download_as_string()

account = yaml.load(yaml_data, Loader=yaml.FullLoader)

## Main Account
graph = Graph(account['NEO4J_SCHEME']+'://'+account['NEO4J_HOST']+":"+str(account['NEO4J_PORT']), auth=(account['NEO4J_USER'],account['NEO4J_PASSPHRASE']))

-------
## FQ2U Job

### FQ2U table

In [5]:
# Test
#query = "Match (fu:Job:FastqToUbam) RETURN fu.sample AS sample, fu.readGroup AS fq2urg_gatkid, fu.duplicate AS dup, fu.machineType AS vm_type, fu.durationMinutes as job_runtime"
#job_fq2u = graph.run(query).to_data_frame()
#job_fq2u.head()

In [6]:
## Query
query = "Match (fu:Job:FastqToUbam)-[:STATUS]->(s:Dstat) RETURN fu.sample AS sample, fu.readGroup AS fq2urg_gatkid, fu.duplicate AS dup, fu.machineType AS vm_type, fu.durationMinutes as job_runtime, fu.startTimeEpoch as start_time, fu.stopTimeEpoch as stop_time, s.status as dstat_status, s.statusMessage as dstat_msg, s.logging as dstat_log"
job_fq2u = graph.run(query).to_data_frame()
job_fq2u.set_index('sample')

## Variable
num_fq2u_sample=len(job_fq2u['sample'].unique())
num_fq2u_job=len(job_fq2u)

## Print (Info)
print("The number of samples with FQ2U jobs : " + str(num_fq2u_sample))
print("The number of FQ2U jobs : " + str(num_fq2u_job))

## Bigquery Table Format
job_fq2u['job_group']='GATK'
job_fq2u['vm_exp_cnt']=1
job_fq2u['job']='FQ2U'
job_fq2u['vm_cnt']=1
job_fq2u['vm_disk']=None
job_fq2u['vm_avg_runtime']=job_fq2u['job_runtime']
columnlist=['sample','job_group','job','fq2urg_gatkid','dup','vm_exp_cnt','vm_cnt','vm_avg_runtime','job_runtime','vm_type','vm_disk','start_time','stop_time','dstat_status','dstat_msg','dstat_log']
job_fq2u=job_fq2u[columnlist]

#display(job_fq2u.head())

The number of samples with FQ2U jobs : 41007
The number of FQ2U jobs : 167742


### FQ2U Duplication Check

In [7]:
## Query
fq2u_dup=job_fq2u.loc[job_fq2u['dup']==True,:]

## Variable
num_dup_fq2u_sample=len(fq2u_dup['sample'].unique())
num_dup_fq2u_job=len(fq2u_dup)

print("The number(percentage) of samples with duplicated FQ2U jobs : " + str(len(fq2u_dup['sample'].unique()))+" ("+'{:2f}'.format((num_dup_fq2u_sample/num_fq2u_sample)*100)+"%)")
print("The number(percentage) of FQ2U duplicated jobs : " + str(num_dup_fq2u_job)+" ("+'{:2f}'.format((num_dup_fq2u_job/num_fq2u_job)*100)+"%)")

#display(test)

The number(percentage) of samples with duplicated FQ2U jobs : 328 (0.799863%)
The number(percentage) of FQ2U duplicated jobs : 455 (0.271250%)


-------
## GATK Job

### GATK table

In [8]:
## Query
query = "MATCH (j:Job:CromwellWorkflow)-[:STATUS]->(s:Dstat) RETURN j.sample AS sample, j.cromwellWorkflowId AS fq2urg_gatkid, \
j.duplicate AS dup, j.durationMinutes as job_runtime, j.machineType as vm_type, j.startTimeEpoch as start_time, j.stopTimeEpoch as stop_time, s.status as dstat_status, s.statusMessage as dstat_msg, s.logging as dstat_log"
job_gatk = graph.run(query).to_data_frame()
job_gatk.set_index('sample')

## Variable
num_gatk_sample=len(job_gatk['sample'].unique())
num_gatk_job=len(job_gatk)

## Print (Info)
print("The number of samples with GATK jobs : " + str(num_gatk_sample))
print("The number of GATK jobs : " + str(num_gatk_job))

## Bigquery Table Format
job_gatk['job_group']='GATK'
job_gatk['vm_exp_cnt']=1
job_gatk['job']='cromwell'
job_gatk['vm_cnt']=1
job_gatk['vm_avg_runtime']=job_gatk['job_runtime']
job_gatk['vm_disk']=None
columnlist=['sample','job_group','job','fq2urg_gatkid','dup','vm_exp_cnt','vm_cnt','vm_avg_runtime','job_runtime','vm_type','vm_disk','start_time','stop_time','dstat_status','dstat_msg','dstat_log']
job_gatk=job_gatk[columnlist]

#display(job_gatk.head())

The number of samples with GATK jobs : 40196
The number of GATK jobs : 40477


### GATK Duplication Check

In [9]:
## Query
gatk_dup=job_gatk.loc[job_gatk['dup']==True,:]

## Variable
num_dup_gatk_sample=len(gatk_dup['sample'].unique())
num_dup_gatk_job=len(gatk_dup)

print("The number(percentage) of samples with duplicated GATK jobs : " + str(len(gatk_dup['sample'].unique()))+" ("+'{:2f}'.format((num_dup_gatk_sample/num_gatk_sample)*100)+"%)")
print("The number(percentage) of GATK duplicated jobs : " + str(num_dup_gatk_job)+" ("+'{:2f}'.format((num_dup_gatk_job/num_gatk_job)*100)+"%)")

#display(test)

The number(percentage) of samples with duplicated GATK jobs : 191 (0.475172%)
The number(percentage) of GATK duplicated jobs : 231 (0.570694%)


### GATK vm_exp_cnt and add_vm

In [10]:
## expected vm data frame
#vm_exp_cnt_df=pd.read_excel("./GATKstep_expected_vm.xlsx")

In [11]:
#merged_vm_cnt_df=pd.merge(vm_exp_cnt_df,job_gatk,left_on=['job'],right_on=['job'],how='right')
#merged_attemps_df['added_vm']=job_gatk['vm_cnt']-merged_attemps_df['vm_exp_cnt']

-------
## GATK substeps

### vm_cnt table

In [12]:
# ## Query
# #query = "MATCH (g:Job:CromwellWorkflow)-[:LED_TO*]->(s:CromwellStep)-[:HAS_ATTEMPT]-()-[*0..100]->(j:Job) \
# #WHERE g.cromwellWorkflowId=s.cromwellWorkflowId RETURN g.sample as sample, s.cromwellWorkflowId as fq2urg_gatkid, \
# #s.wdlCallAlias as job, count(distinct j) as vm_cnt, (max(j.stopTimeEpoch)-min(j.startTimeEpoch))/60 as job_runtime, avg(j.durationMinutes) as vm_avg_runtime, j.machineType as vm_type"
# query = "MATCH (g:Job:CromwellWorkflow), (s:CromwellStep)-[:HAS_ATTEMPT]-()-[*0..100]->(j:Job) \
# WHERE g.cromwellWorkflowId=s.cromwellWorkflowId RETURN g.sample as sample, s.cromwellWorkflowId as fq2urg_gatkid, \
# s.wdlCallAlias as job, count(distinct j) as vm_cnt, (max(j.stopTimeEpoch)-min(j.startTimeEpoch))/60 as job_runtime, avg(j.durationMinutes) as vm_avg_runtime, j.machineType as vm_type"
# #query = "MATCH (j:Job:CromwellWorkflow)-[:STATUS]->(s:Dstat) RETURN j.sample AS sample, j.duplicate AS dup, j.durationMinutes as job_runtime, s.status as dstat_status, s.statusMessage as dstat_msg, s.logging as dstat_log"
# job_gatk_step = graph.run(query).to_data_frame()
# job_gatk_step.set_index('sample')

# ## Variable
# num_gatk_sample=len(job_gatk_step['sample'].unique())
# num_gatk_subjobs=len(job_gatk_step)

# ## Print (Info)
# print("The number of samples with GATK steps : " + str(num_gatk_sample))
# print("The number of GATK subjobs : " + str(num_gatk_subjobs))

### GATK Duplication Check

In [13]:
# ## Bigquery Table Format
# job_gatk_step['job_group']='GATK'
# job_gatk_step['vm_disk']=None

# job_gatk_info=job_gatk[['sample','fq2urg_gatkid','dup','dstat_status','dstat_msg','dstat_log']]
# job_gatk_stepm=pd.merge(job_gatk_info, job_gatk_step, left_on=['sample','fq2urg_gatkid'], right_on=['sample','fq2urg_gatkid'], how='right')

# #display(job_gatk_stepm.head())

### GATK vm_exp_cnt and add_vm

In [14]:
# ## expected vm data frame
# vm_exp_cnt_file = 'gs://'+account['TRELLIS_BUCKET']+'/analysis-notebooks/GATKstep_expected_vm.xlsx'
# vm_exp_cnt_df=pd.read_excel(vm_exp_cnt_file)

In [15]:
# job_gatk_stepm=pd.merge(vm_exp_cnt_df,job_gatk_stepm,left_on=['job'],right_on=['job'],how='right')
# columnlist=['sample','job_group','job','fq2urg_gatkid','dup','vm_exp_cnt','vm_cnt','vm_avg_runtime','job_runtime','vm_type','vm_disk','dstat_status','dstat_msg','dstat_log']
# job_gatk_stepm=job_gatk_stepm[columnlist]
# #merged_attemps_df['added_vm']=job_gatk['vm_cnt']-merged_attemps_df['vm_exp_cnt']

-------
## QC Job

### Fastqc

In [16]:
# query="MATCH (j:Job:BamFastqc)-[:STATUS]->(d:Dstat) RETURN j.sample as sample, j.name as job, j.duplicate as dup, j.machineType as vm_type, j.durationMinutes as job_runtime, j.diskSize as vm_disk, j.startTimeEpoch as start_time, j.stopTimeEpoch as stop_time, d.status as dstat_status, d.statusMessage as dstat_msg, d.logging as dstat_log"
# job_fastqc = graph.run(query).to_data_frame()

# job_fastqc['job_group']='QC'
# job_fastqc['fq2urg_gatkid']=None
# job_fastqc['vm_exp_cnt']=1
# job_fastqc['vm_cnt']=1
# job_fastqc['vm_avg_runtime']=job_fastqc['job_runtime']

# job_fastqc=job_fastqc[columnlist]
# job_fastqc.set_index('sample')

# ## duplication check

# print('The number of rows : ' + str(len(job_fastqc)))
# print('The number of duplicated jobs : ' + str(len(job_fastqc.loc[job_fastqc['dup']==True,:])))

# job_fastqc.drop_duplicates(columnlist,keep='first',inplace=True)

# print('The number of rows dropped duplications : ' + str(len(job_fastqc)))

# ## column order

# columnlist=['sample','job_group','job','fq2urg_gatkid','dup','vm_exp_cnt','vm_cnt','vm_avg_runtime','job_runtime','vm_type','vm_disk','start_time','stop_time','dstat_status','dstat_msg','dstat_log']
# job_fastqc=job_fastqc[columnlist]

### Text2table for Fastqc

In [17]:
# query="MATCH (j:Job:BamFastqc)-[:OUTPUT]->()-[:INPUT_TO]->(t:Job:TextToTable)-[:STATUS]->(d:Dstat) RETURN j.sample as sample, j.name as fq2urg_gatkid, t.name as job, t.duplicate as dup, t.machineType as vm_type, t.durationMinutes as job_runtime, t.diskSize as vm_disk, t.startTimeEpoch as start_time, t.stopTimeEpoch as stop_time, d.status as dstat_status, d.statusMessage as dstat_msg, d.logging as dstat_log"
# job_fastqc_t2t=graph.run(query).to_data_frame()

# job_fastqc_t2t['job_group']='QC'
# job_fastqc_t2t['vm_exp_cnt']=1
# job_fastqc_t2t['vm_cnt']=1
# job_fastqc_t2t['vm_avg_runtime']=job_fastqc_t2t['job_runtime']

# job_fastqc_t2t=job_fastqc_t2t[columnlist]
# job_fastqc_t2t.set_index('sample')

# ## duplication check

# print('The number of rows : ' + str(len(job_fastqc_t2t)))
# print('The number of duplicated jobs : ' + str(len(job_fastqc_t2t.loc[job_fastqc_t2t['dup']==True,:])))

# ## drop duplication

# job_fastqc_t2t.drop_duplicates(columnlist,keep='first',inplace=True)
# print('The number of rows dropped duplications : ' + str(len(job_fastqc_t2t)))

# ## column order

# columnlist=['sample','job_group','job','fq2urg_gatkid','dup','vm_exp_cnt','vm_cnt','vm_avg_runtime','job_runtime','vm_type','vm_disk','start_time','stop_time','dstat_status','dstat_msg','dstat_log']
# job_fastqc_t2t=job_fastqc_t2t[columnlist]

In [18]:
#job_fastqc_t2t.loc[job_fastqc_t2t['sample']=='SHIP5119485','dstat_log'][196]
#job_fastqc_t2t['sample'].value_counts()

## Missing 
#set(job_fastqc['sample'])-set(job_fastqc_t2t['sample'])

### Flagstat

In [19]:
# query="MATCH (s:Sample), (j:Job:Flagstat)-[:STATUS]->(d:Dstat) WHERE s.sample=j.sample RETURN s.sample as sample, j.name as job, j.duplicate as dup, j.machineType as vm_type, j.durationMinutes as job_runtime, j.diskSize as vm_disk, j.startTimeEpoch as start_time, j.stopTimeEpoch as stop_time, d.status as dstat_status, d.statusMessage as dstat_msg, d.logging as dstat_log"
# job_flagstat = graph.run(query).to_data_frame()

# job_flagstat['job_group']='QC'
# job_flagstat['fq2urg_gatkid']=None
# job_flagstat['vm_exp_cnt']=1
# job_flagstat['vm_cnt']=1
# job_flagstat['vm_avg_runtime']=job_flagstat['job_runtime']

# job_flagstat=job_flagstat[columnlist]
# job_flagstat.set_index('sample')

# print('The number of rows : ' + str(len(job_flagstat)))
# print('The number of duplicated jobs : ' + str(len(job_flagstat.loc[job_flagstat['dup']==True,:])))

# ## drop duplication

# job_flagstat.drop_duplicates(columnlist,keep='first',inplace=True)
# print('The number of rows dropped duplications : ' + str(len(job_flagstat)))

# ## column order

# columnlist=['sample','job_group','job','fq2urg_gatkid','dup','vm_exp_cnt','vm_cnt','vm_avg_runtime','job_runtime','vm_type','vm_disk','start_time','stop_time','dstat_status','dstat_msg','dstat_log']
# job_flagstat=job_flagstat[columnlist]

In [20]:
#job_flagstat.loc[job_flagstat['sample']=='SHIP5119453',:]
#job_flagstat['sample'].value_counts()

## Missing 
#set(job_fastqc['sample'])-set(job_fastqc_t2t['sample'])

### text2table for Flagstat

In [21]:
# query="MATCH (s:Sample), (j:Job:Flagstat)-[:OUTPUT]->()-[:INPUT_TO]->(t:Job:TextToTable)-[:STATUS]->(d:Dstat) WHERE s.sample=j.sample RETURN s.sample as sample, j.name as fq2urg_gatkid, t.name as job, t.duplicate as dup, t.machineType as vm_type, t.durationMinutes as job_runtime, t.diskSize as vm_disk, t.startTimeEpoch as start_time, t.stopTimeEpoch as stop_time, d.status as dstat_status, d.statusMessage as dstat_msg, d.logging as dstat_log"
# job_flagstat_t2t=graph.run(query).to_data_frame()

# job_flagstat_t2t['job_group']='QC'
# job_flagstat_t2t['vm_exp_cnt']=1
# job_flagstat_t2t['vm_cnt']=1
# job_flagstat_t2t['vm_avg_runtime']=job_flagstat_t2t['job_runtime']

# job_flagstat_t2t=job_flagstat_t2t[columnlist]
# job_flagstat_t2t.set_index('sample')

# print('The number of rows : ' + str(len(job_flagstat_t2t)))
# print('The number of duplicated jobs : ' + str(len(job_flagstat_t2t.loc[job_flagstat_t2t['dup']==True,:])))

# ## drop duplication

# job_flagstat_t2t.drop_duplicates(columnlist,keep='first',inplace=True)
# print('The number of rows dropped duplications : ' + str(len(job_flagstat_t2t)))

# columnlist=['sample','job_group','job','fq2urg_gatkid','dup','vm_exp_cnt','vm_cnt','vm_avg_runtime','job_runtime','vm_type','vm_disk','start_time','stop_time','dstat_status','dstat_msg','dstat_log']
# job_flagstat_t2t=job_flagstat_t2t[columnlist]

In [22]:
#job_flagstat_t2t.loc[job_flagstat_t2t['sample']=='SHIP5119453',:]
#job_flagstat_t2t['sample'].value_counts()

## Missing 
#set(job_flagstat['sample'])-set(job_flagstat_t2t['sample'])

### Vcfstats

In [23]:
# query="MATCH (s:Sample), (j:Job:Vcfstats)-[:STATUS]->(d:Dstat) WHERE s.sample=j.sample RETURN s.sample as sample, j.name as job, j.duplicate as dup, j.machineType as vm_type, j.durationMinutes as job_runtime, j.diskSize as vm_disk, j.startTimeEpoch as start_time, j.stopTimeEpoch as stop_time, d.status as dstat_status, d.statusMessage as dstat_msg, d.logging as dstat_log"
# job_vcfstats = graph.run(query).to_data_frame()

# job_vcfstats['job_group']='QC'
# job_vcfstats['fq2urg_gatkid']=None
# job_vcfstats['vm_exp_cnt']=1
# job_vcfstats['vm_cnt']=1
# job_vcfstats['vm_avg_runtime']=job_vcfstats['job_runtime']

# job_vcfstats=job_vcfstats[columnlist]
# job_vcfstats.set_index('sample')

# print('The number of rows : ' + str(len(job_vcfstats)))
# print('The number of duplicated jobs : ' + str(len(job_vcfstats.loc[job_vcfstats['dup']==True,:])))

# ## drop duplication

# job_vcfstats.drop_duplicates(columnlist,keep='first',inplace=True)
# print('The number of rows dropped duplications : ' + str(len(job_vcfstats)))

# columnlist=['sample','job_group','job','fq2urg_gatkid','dup','vm_exp_cnt','vm_cnt','vm_avg_runtime','job_runtime','vm_type','vm_disk','start_time','stop_time','dstat_status','dstat_msg','dstat_log']
# job_vcfstats=job_vcfstats[columnlist]

### text2table for Vcfstats

In [24]:
# query="MATCH (s:Sample), (j:Job:Vcfstats)-[:OUTPUT]->()-[:INPUT_TO]->(t:Job:TextToTable)-[:STATUS]->(d:Dstat) WHERE s.sample=j.sample RETURN s.sample as sample, j.name as fq2urg_gatkid, t.name as job, t.duplicate as dup, t.machineType as vm_type, t.durationMinutes as job_runtime, t.diskSize as vm_disk, t.startTimeEpoch as start_time, t.stopTimeEpoch as stop_time, d.status as dstat_status, d.statusMessage as dstat_msg, d.logging as dstat_log"
# job_vcfstats_t2t=graph.run(query).to_data_frame()

# job_vcfstats_t2t['job_group']='QC'
# job_vcfstats_t2t['vm_exp_cnt']=1
# job_vcfstats_t2t['vm_cnt']=1
# job_vcfstats_t2t['vm_avg_runtime']=job_vcfstats_t2t['job_runtime']

# job_vcfstats_t2t=job_vcfstats_t2t[columnlist]
# job_vcfstats_t2t.set_index('sample')

# print('The number of rows : ' + str(len(job_vcfstats_t2t)))
# print('The number of duplicated jobs : ' + str(len(job_vcfstats_t2t.loc[job_vcfstats_t2t['dup']==True,:])))

# ## drop duplication

# job_vcfstats_t2t.drop_duplicates(columnlist,keep='first',inplace=True)
# print('The number of rows dropped duplications : ' + str(len(job_vcfstats_t2t)))

# columnlist=['sample','job_group','job','fq2urg_gatkid','dup','vm_exp_cnt','vm_cnt','vm_avg_runtime','job_runtime','vm_type','vm_disk','start_time','stop_time','dstat_status','dstat_msg','dstat_log']
# job_vcfstats_t2t=job_vcfstats_t2t[columnlist]

In [25]:
#job_flagstat_t2t.loc[job_flagstat_t2t['sample']=='SHIP5119453',:]
#job_vcfstats_t2t['sample'].value_counts()

## Missing 
#set(job_vcfstats['sample'])-set(job_vcfstats_t2t['sample'])

-------
## Merge 

In [26]:
# ## v1.0

# #job_df=pd.concat([job_fq2u,job_gatk,job_gatk_stepm,job_fastqc,job_fastqc_t2t,job_flagstat,job_flagstat_t2t,job_vcfstats,job_vcfstats_t2t]).sort_values(['sample','job_group','job','fq2urg_gatkid'])
# job_df=pd.concat([job_fq2u,job_gatk,job_fastqc,job_fastqc_t2t,job_flagstat,job_flagstat_t2t,job_vcfstats,job_vcfstats_t2t]).sort_values(['sample','job_group','job','fq2urg_gatkid'])
# display(job_df)

In [27]:
## v1.1

#job_df=pd.concat([job_fq2u,job_gatk,job_gatk_stepm,job_fastqc,job_fastqc_t2t,job_flagstat,job_flagstat_t2t,job_vcfstats,job_vcfstats_t2t]).sort_values(['sample','job_group','job','fq2urg_gatkid'])
job_df=pd.concat([job_fq2u,job_gatk]).sort_values(['sample','job_group','job','fq2urg_gatkid'])
display(job_df)

Unnamed: 0,sample,job_group,job,fq2urg_gatkid,dup,vm_exp_cnt,vm_cnt,vm_avg_runtime,job_runtime,vm_type,vm_disk,start_time,stop_time,dstat_status,dstat_msg,dstat_log
2,SHIP4946367,GATK,cromwell,6ca3d9e3-4c63-4f2c-9538-71d121ddca2e,,1,1,1783,1783,custom-2-12288,,1587071921.16,1587178924.45,SUCCESS,Success,gs://gbsc-gcp-project-mvp-from-personalis-phas...
0,SHIP4946367,GATK,cromwell,,,1,1,3,3,custom-2-12288,,1587069215.88,1587069436.32,FAILURE,,
1,SHIP4946367,GATK,cromwell,,,1,1,2,2,custom-2-12288,,1587071912.69,1587072089.36,FAILURE,worker was terminated,gs://gbsc-gcp-project-mvp-from-personalis-phas...
93,SHIP4946368,GATK,FQ2U,0,,1,1,62,62,custom-2-7680,,1587182492.78,1587186240.22,SUCCESS,Success,gs://gbsc-gcp-project-mvp-from-personalis-phas...
119,SHIP4946368,GATK,FQ2U,1,,1,1,62,62,custom-2-7680,,1587182478.31,1587186205.87,SUCCESS,Success,gs://gbsc-gcp-project-mvp-from-personalis-phas...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
92479,SHIP6083051,GATK,FQ2U,0,,1,1,48,48,custom-2-7680,,1591740050.90,1591742964.43,SUCCESS,Success,gs://gbsc-gcp-project-mvp-from-personalis-phas...
92482,SHIP6083051,GATK,FQ2U,1,,1,1,47,47,custom-2-7680,,1591740052.40,1591742909.99,SUCCESS,Success,gs://gbsc-gcp-project-mvp-from-personalis-phas...
92480,SHIP6083051,GATK,FQ2U,2,,1,1,46,46,custom-2-7680,,1591740062.01,1591742865.25,SUCCESS,Success,gs://gbsc-gcp-project-mvp-from-personalis-phas...
92485,SHIP6083051,GATK,FQ2U,3,,1,1,44,44,custom-2-7680,,1591740055.86,1591742736.54,SUCCESS,Success,gs://gbsc-gcp-project-mvp-from-personalis-phas...


-------
## Cost

### vm_avg_cost

In [28]:
## Custom CPU cost : 0.033174/CPU/Hour, # Custom Mem cost : 0.004446/GB/Hour, # Custom Disk cost : ???
cpu_sd_cost = 0.033174
memg_sd_cost = 0.004446

cpu_pem_cost = 0.00698
memg_pem_cost = 0.00094

## Extract cpu and mem info.
#temp=job_df.loc[:,['job','vm_type']]
job_df.loc[:,'vm_std']=None
job_df.loc[:,'vm_cpu']=0
job_df.loc[:,'vm_mem']=0
job_df[['vm_std','vm_cpu','vm_mem']]=[i.split('-') for i in job_df['vm_type']]
job_df['vm_cpu']=[int(x) for x in job_df['vm_cpu']]
job_df['vm_mem']=[int(x) for x in job_df['vm_mem']]
#columnlist=['job','vm_type','vm_type','cpu','mem']
#temp=temp[columnlist]

In [29]:
# FQ2U and GATK job unit cost with Standard VM
job_df.loc[(job_df['job_group']=='GATK')&(job_df['job'].isin(['FQ2U','cromwell'])),'vm_avg_cost']= np.array(job_df.loc[(job_df['job_group']=='GATK')&(job_df['job'].isin(['FQ2U','cromwell'])),'vm_cpu'])*cpu_sd_cost/60 + np.array(job_df.loc[(job_df['job_group']=='GATK')&(job_df['job'].isin(['FQ2U','cromwell'])),'vm_mem'])*memg_sd_cost/60/1000

# QC unit cost with Standard VM
#job_df.loc[(job_df['job_group']=='QC'),'vm_avg_cost']= np.array(job_df.loc[(job_df['job_group']=='QC'),'vm_cpu'])*cpu_sd_cost/60 + np.array(job_df.loc[(job_df['job_group']=='QC'),'vm_mem'])*memg_sd_cost/60/1000

# GATK sub jobs' unit cost with Preemptible VM
#job_df.loc[(job_df['job_group']=='GATK')&(job_df['job'].isin(['FQ2U','cromwell'])==False),'vm_avg_cost']= np.array(job_df.loc[(job_df['job_group']=='GATK')&(job_df['job'].isin(['FQ2U','cromwell'])==False),'vm_cpu'])*cpu_pem_cost/60 + np.array(job_df.loc[(job_df['job_group']=='GATK')&(job_df['job'].isin(['FQ2U','cromwell'])==False),'vm_mem'])*memg_pem_cost/60/1000 

job_df.loc[(job_df['job_group']=='GATK')&(job_df['job'].isin(['FQ2U','cromwell'])==False),'vm_avg_cost']= \
    np.array(job_df.loc[(job_df['job_group']=='GATK')&(job_df['job'].isin(['FQ2U','cromwell'])==False),'vm_cpu'])*cpu_pem_cost/60 + \
    np.array(job_df.loc[(job_df['job_group']=='GATK')&(job_df['job'].isin(['FQ2U','cromwell'])==False),'vm_mem'])*memg_pem_cost/60/1000

### job cost

In [30]:
pd.set_option('display.float_format', lambda x: '%.8f' % x)

## FQ2U
job_df.loc[job_df['job']=='FQ2U','job_cost']=np.array(job_df.loc[job_df['job']=='FQ2U','job_runtime'])*np.array(job_df.loc[job_df['job']=='FQ2U','vm_avg_cost'])
#merged_cost_df.head(2)

## GATK
job_df.loc[job_df['job']=='cromwell','job_cost']=np.array(job_df.loc[job_df['job']=='cromwell','job_runtime'])*np.array(job_df.loc[job_df['job']=='cromwell','vm_avg_cost'])
#merged_cost_df[merged_cost_df['job']=='GATK'].head(2)

## QC
#job_df.loc[job_df['job_group']=='QC','job_cost']=np.array(job_df.loc[job_df['job_group']=='QC','job_runtime'])*np.array(job_df.loc[job_df['job_group']=='QC','vm_avg_cost'])
#merged_cost_df[merged_cost_df['job']=='GATK'].head(2)

## GATK steps
job_df.loc[(job_df['job_group']=='GATK')&(job_df['job']!='FQ2U') & (job_df['job']!='cromwell'),'job_cost']=np.array(job_df.loc[(job_df['job_group']=='GATK')&(job_df['job']!='FQ2U') & (job_df['job']!='cromwell'),'vm_cnt']) \
*np.array(job_df.loc[(job_df['job_group']=='GATK')&(job_df['job']!='FQ2U') & (job_df['job']!='cromwell'),'vm_avg_runtime'])*np.array(job_df.loc[(job_df['job_group']=='GATK')&(job_df['job']!='FQ2U') & (job_df['job']!='cromwell'),'vm_avg_cost'])

In [31]:
columnlist=['sample','job_group','job','fq2urg_gatkid','dup','vm_exp_cnt','vm_cnt','vm_avg_runtime','job_runtime','vm_type','vm_cpu','vm_mem','vm_disk','vm_avg_cost','job_cost','start_time','stop_time','dstat_status','dstat_msg','dstat_log']
job_df=job_df[columnlist]
job_df.head()
#job_df.to_csv('job-based-analysis-v1.csv',index=False)

Unnamed: 0,sample,job_group,job,fq2urg_gatkid,dup,vm_exp_cnt,vm_cnt,vm_avg_runtime,job_runtime,vm_type,vm_cpu,vm_mem,vm_disk,vm_avg_cost,job_cost,start_time,stop_time,dstat_status,dstat_msg,dstat_log
2,SHIP4946367,GATK,cromwell,6ca3d9e3-4c63-4f2c-9538-71d121ddca2e,,1,1,1783,1783,custom-2-12288,2,12288,,0.00201634,3.59513565,1587071921.163,1587178924.453,SUCCESS,Success,gs://gbsc-gcp-project-mvp-from-personalis-phas...
0,SHIP4946367,GATK,cromwell,,,1,1,3,3,custom-2-12288,2,12288,,0.00201634,0.00604902,1587069215.883,1587069436.321,FAILURE,,
1,SHIP4946367,GATK,cromwell,,,1,1,2,2,custom-2-12288,2,12288,,0.00201634,0.00403268,1587071912.693,1587072089.355,FAILURE,worker was terminated,gs://gbsc-gcp-project-mvp-from-personalis-phas...
93,SHIP4946368,GATK,FQ2U,0,,1,1,62,62,custom-2-7680,2,7680,,0.00167489,0.10384306,1587182492.776,1587186240.22,SUCCESS,Success,gs://gbsc-gcp-project-mvp-from-personalis-phas...
119,SHIP4946368,GATK,FQ2U,1,,1,1,62,62,custom-2-7680,2,7680,,0.00167489,0.10384306,1587182478.311,1587186205.869,SUCCESS,Success,gs://gbsc-gcp-project-mvp-from-personalis-phas...


### Upload CSV Files to BigQuery

In [32]:
table_id=account['BIGQUERY_DATASET']+'.job_based_analysis'
projectid=account['GOOGLE_CLOUD_PROJECT']

pandas_gbq.to_gbq(
    job_df, table_id, project_id=projectid, if_exists='replace',
)

1it [00:42, 42.11s/it]
