In [1]:
import pandas as pd
import os
from genoslurm.genoslurm import chunks, GenCallJob

In [2]:
# cluster config variables
max_nodes = 10

# paths
userhome = '/home/dan_datatecnica_com'
datapath = f'{userhome}/data'
# tmp_dir = f'{userhome}/tmp'
log_dir = f'{userhome}/logs'
ilmn_files_path = f'{userhome}/ilmn_files'
iaap = f'{userhome}/GenoTools/executables/iaap-cli-linux-x64-1.1.0-sha.80d7e5b3d9c1fdfc2e99b472a90652fd3848bbc7/iaap-cli/iaap-cli'
bpm = f'{ilmn_files_path}/NeuroBooster_20042459_A2.bpm'
egt = f'{ilmn_files_path}/recluster_09272022.egt'
plink_file_path = f'{datapath}/gp2_plink'
gcs_plink_path = f'gp2_uk/gp2_plink'
gcs_idat_path = f'gp2_uk/gp2_idats'

In [3]:
# list of idats in directory of choice
# !gsutil ls gs://gp2_uk/gp2_idats/ > tmp/idats.txt
idats_in = pd.read_csv('tmp/idats.txt', header=None)
idat_list = idats_in.loc[1:,0]
idat_list = [x.replace('gs://gp2_uk/gp2_idats',f'{datapath}')[:-1] for x in idat_list]
gcs_idat_paths = [x.replace('gs://', '').rstrip('/') for x in idats_in.loc[1:,0]]
# chunk list by max nodes
idat_list_chunks = chunks(idat_list, max_nodes)

In [4]:
for i, idat_list_chunk in enumerate(idat_list_chunks):

    script_path = f'cluster_scripts/call_gts_{i}.sh'
    job_name = f'callgts_{i}'
    log_path = f'{userhome}/logs'
    nodes = len(idat_list_chunk)
    ntasks = len(idat_list_chunk)
    time_limit = '01:00:00'
    array = f'1-{len(idat_list_chunk)}'
    cpus_per_task = '2'


    job = GenCallJob(
        sbatch_path=script_path, 
        idat_dir_ins=idat_list_chunk, 
        gcs_idat_path=gcs_idat_path, 
        iaap=iaap, 
        bpm=bpm, 
        egt=egt, 
        gcs_plink_path=gcs_plink_path,
        log_path=log_dir,
        job_name=job_name, 
        threads=4, 
        ntasks=ntasks, 
        cpus_per_task=3, 
        mem_per_cpu='2G', 
        time='01:00:00'
        )
        
    job.write_sbatch_script()

In [5]:
# copy scripts to vm
!gcloud compute scp cluster_scripts/call_gts_*.sh genoslurm-uk-v1-login0:/home/dan_datatecnica_com/scripts/ --project genotools --zone europe-west2-a

call_gts_0.sh                                 100% 9902    73.6KB/s   00:00    
call_gts_1.sh                                 100% 2085    15.5KB/s   00:00    


Updates are available for some Google Cloud CLI components.  To install them,
please run:
  $ gcloud components update



In [6]:
# launch commands
!gcloud compute ssh --zone europe-west2-a --project genotools genoslurm-uk-v1-login0 --command 'sbatch /home/dan_datatecnica_com/scripts/call_gts_1.sh'

Submitted batch job 10


In [5]:
!cat cluster_scripts/call_gts_{i}.sh

#!/bin/bash
#SBATCH --job-name=callgts_1
#SBATCH --ntasks=2
#SBATCH --cpus-per-task=3
#SBATCH --mem-per-cpu=2G
#SBATCH --time=01:00:00


# Processing /home/dan_datatecnica_com/data/206451070115
mkdir -p /home/dan_datatecnica_com/data/206451070115
gsutil cp gs://gp2_uk/gp2_idats/206451070115/* /home/dan_datatecnica_com/data/206451070115/
/home/dan_datatecnica_com/GenoTools/executables/iaap-cli-linux-x64-1.1.0-sha.80d7e5b3d9c1fdfc2e99b472a90652fd3848bbc7/iaap-cli/iaap-cli gencall /home/dan_datatecnica_com/ilmn_files/NeuroBooster_20042459_A2.bpm /home/dan_datatecnica_com/ilmn_files/recluster_09272022.egt /home/dan_datatecnica_com/data/206451070115 -f /home/dan_datatecnica_com/data/206451070115 -p -t 4

# Convert PED files to BED/BIM/FAM files using plink2
for ped_file in /home/dan_datatecnica_com/data/206451070115/*.ped; do

    cp /home/dan_datatecnica_com/data/206451070115/NeuroBooster_20042459_A2.map "${ped_file%.*}.map"
    plink2 --file "${ped_file%.*}" --make-bed --out "${ped_file%.

In [2]:
def chunks(lst, n):
    """Yield successive n-sized chunks from lst."""
    outlist = []
    for i in range(0, len(lst), n):
        outlist.append(lst[i:i + n])
    return outlist

class GenCallJob:
    def __init__(self, sbatch_path, idat_dir_ins, gcs_idat_path, iaap, bpm, egt, gcs_plink_path, log_path, job_name='my_job', threads=2, ntasks=1, cpus_per_task=1, mem_per_cpu='2G', time='01:00:00'):
        self.sbatch_path = sbatch_path
        self.idat_dir_ins = idat_dir_ins
        # self.tmp_out_dir = tmp_out_dir
        self.gcs_idat_path = gcs_idat_path
        self.iaap = iaap
        self.bpm = bpm
        self.egt = egt
        self.gcs_plink_path = gcs_plink_path
        self.log_path = log_path
        self.job_name = job_name
        self.threads = threads
        self.ntasks = ntasks
        self.cpus_per_task = cpus_per_task
        self.mem_per_cpu = mem_per_cpu
        self.time = time

    def write_header(self):
        header = f"""#!/bin/bash
#SBATCH --job-name={self.job_name}
#SBATCH --ntasks={self.ntasks}
#SBATCH --cpus-per-task={self.cpus_per_task}
#SBATCH --mem-per-cpu={self.mem_per_cpu}
#SBATCH --time={self.time}

"""
        return header


    def write_script(self, idat_dir_in, gcs_idat_path):
        script = f"""
# Processing {idat_dir_in}
mkdir {idat_dir_in}
gsutil cp gs://{gcs_idat_path}/* {idat_dir_in}/
{self.iaap} gencall {self.bpm} {self.egt} {idat_dir_in} -f {idat_dir_in} -p -t {self.threads}

# Convert PED files to BED/BIM/FAM files using plink2
for ped_file in {idat_dir_in}/*.ped; do

    cp {idat_dir_in}/NeuroBooster_20042459_A2.map "${{ped_file%.*}}.map"
    plink2 --file "${{ped_file%.*}}" --make-bed --out "${{ped_file%.*}}"
done

gsutil cp {idat_dir_in}/*.{{bed,bim,fam,log}} gs://{self.gcs_plink_path}/

"""
        return script

    def write_sbatch_script(self):
        commands = []
        header = self.write_header()
        for idat_dir_in in self.idat_dir_ins:
            code = idat_dir_in.split('/')[-1]
            gcs_idat_path = f'{self.gcs_idat_path}/{code}'
            command = self.write_script(idat_dir_in, gcs_idat_path)
            commands.append(
                f'\
srun \
--ntasks=1 \
--nodes=1 \
--output={self.log_path}/{self.job_name}-%j-%t.out \
--error={self.log_path}/{self.job_name}-%j-%t.err \
/bin/bash -c "{command}"'
                )
        
        script = " && \\\n".join(commands)

        full_script = header + script

        with open(f'{self.sbatch_path}', 'w') as f:
            f.write(full_script)
       




# removed stuff
# gcsfuse --dir-mode 776 --file-mode 776 --implicit-dirs {self.gcs_idat_path} {idat_dir_in}
# gsutil cp {idat_dir_in}/*.bed {idat_dir_in}/*.bim {idat_dir_in}/*.fam gs://{self.gcs_plink_path}/
    # base_name=$(basename ${{ped_file}} .ped)
     # os.system('sbatch my_sbatch_script.sh')
                 # sbatch_script += self.write_script(idat_dir_in, gcs_idat_path)

In [97]:
# now loop through chunks to create a job for every {max_nodes} barcodes 
# 1 node per barcode. (n_idats<=(80*max_nodes))

for i, chunk in enumerate(idat_list_chunks):

    script_path = f'cluster_scripts/call_gts_{i}.sh'
    job_name = f'callgts_{i}'
    log_path = f'{userhome}/logs'
    log_out = f'{log_path}/test.out'
    log_err = f'{log_path}/test.err'
    # log_out = f'{log_path}/{job_name}.%A_%a.out'
    # log_err = f'{log_path}/{job_name}.%A_%a.err'
    # ntasks = str(len(idat_list))
    # nodes = str(len(idat_list))
    nodes = len(chunk)
    ntasks = len(chunk)
    time = '01:00:00'
    array = f'1-{len(chunk)}'
    cpus_per_task = '1'

    # Create a sbatch script
    with open(f'cluster_scripts/call_gts_{i}.sh', 'w') as f:
        f.write('#!/bin/bash\n')
        f.write(f'#SBATCH --job-name={job_name}\n')
        # f.write(f'#SBATCH --output={log_out}\n')
        # f.write(f'#SBATCH --error={log_err}\n')
        f.write(f'#SBATCH --ntasks={ntasks}\n')
        f.write(f'#SBATCH --cpus-per-task={cpus_per_task}\n')
        # f.write(f'#SBATCH --nodes={nodes}\n')
        f.write(f'#SBATCH --time={time}\n')
        # f.write(f'#SBATCH --array={array}\n')
        f.write('\n')
        f.write('echo "Starting job"\n')
        f.write('date\n')
        f.write('\n')

        for code in chunk:
            
            # path in vm that is mounted to bucket
            idat_dir_in = f'{datapath}/gp2_idats/{code}'

            # bucket paths
            gcs_path = f'gp2_uk'
            gcs_idat_path = f'{gcs_path}/gp2_idats/{code}'
            gcs_plink_path = f'{gcs_path}/gp2_plink'
            
            # files not writing properly to gcs via gcsfuse so write to path in vm and copy to gcs 
            tmp_out_dir = f'{userhome}/tmp/{code}'
            
#             command = f'\
# gsutil cp -r gs://{gcs_plink_path}/ {tmp_out_dir}/; \
# {iaap} gencall \
# {bpm} \
# {egt} \
# {tmp_out_dir} \
# -f {tmp_out_dir} \
# -p \
# -t 2; \


# cp {tmp_out_dir}/NeuroBooster_20042459_A2.map 

# {userhome}/GenoTools/exec/plink2 
# gsutil cp -r {tmp_out_dir}/*.{{}} gs://{gcs_plink_path}/'



            command = f'\
mkdir {idat_dir_in}; \
mkdir {tmp_out_dir}; \
gcsfuse --dir-mode 776 --file-mode 776 --implicit-dirs {gcs_idat_path} {idat_dir_in}; \
{iaap} gencall \
{bpm} \
{egt} \
{tmp_out_dir} \
-f {idat_dir_in} \
-p \
-t 2; \
gsutil cp -r {tmp_out_dir}/ gs://{gcs_plink_path}/'

            f.write(f'# Run command for file {idat_dir_in}\n')
            f.write(f'srun --output={log_path}/{job_name}-%j-%t.out --error={log_path}/{job_name}-%j-%t.err "{command}" &\n')
            f.write('\n')

        f.write('\n')
        f.write('echo "Job finished"\n')
        f.write('date\n')
    f.close()