In [1]:
import subprocess
import os
import pandas as pd

In [2]:
CHEXPERT_FOLDER = '~/chexpert/chexpert-labeler'

In [3]:
CHEXPERT_PYTHON = '~/miniconda3/envs/chexpert-label/bin/python'

In [4]:
TMP_FOLDER = '~/medvqa-workspace/tmp'

In [5]:
NEGBIO_PATH = '/home/pamessina/chexpert/NegBio'

In [6]:
def _get_custom_env():
    custom_env = os.environ.copy()
    prev = custom_env.get('PYTHONPATH', '')
    custom_env['PYTHONPATH'] = f'{NEGBIO_PATH}:{prev}'
    return custom_env

In [11]:
# input_path = os.path.join(TMP_FOLDER, 'reports-input.csv')
input_path = '/home/pamessina/medvqa-workspace/tmp/chexpert-labeler/labeler-input.csv'
output_path = os.path.join(TMP_FOLDER, 'reports-output-v2.csv')
cmd_cd = f'cd {CHEXPERT_FOLDER}'
cmd_call = f'{CHEXPERT_PYTHON} label.py --reports_path {input_path} --output_path {output_path}'
cmd = f'{cmd_cd} && {cmd_call}'

try:            
    subprocess.run(
        cmd, shell=True, check=True,
        stdout=subprocess.PIPE, stderr=subprocess.PIPE,
        env=_get_custom_env(),
    )
except subprocess.CalledProcessError as e:
    print('Labeler failed, stdout and stderr:')
    print(e.stdout)
    print(e.stderr)
    raise

In [14]:
!head /home/pamessina/medvqa-workspace/tmp/reports-input.csv

no free air below the right hemidiaphragm is seen
however , other radiopaque fluid , such as fluid , , , be filling the airspace in this region
there is no free air under the hemidiaphragms . low lung volumes but no acute process and no evidence of free peritoneal air .


In [33]:
len(pd.DataFrame(['asdfasdf', 'asdfasdfasdf', 'asdfasdf']))

3

In [2]:
import getpass, os

In [3]:
os.getegid()

998

In [4]:
os.geteuid()

1001

In [5]:
os.getlogin()

'pamessina'

In [6]:
getpass.getuser()

'pamessina'

In [5]:
import grp

In [6]:
groupinfo = grp.getgrnam('docker')

In [7]:
groupinfo

grp.struct_group(gr_name='docker', gr_passwd='x', gr_gid=998, gr_mem=['pamessina'])

In [17]:
os.setgid(groupinfo.gr_gid)

PermissionError: [Errno 1] Operation not permitted

In [None]:
os.setegid(998)

In [8]:
directory_path = '/home/pamessina/medvqa-workspace/tmp/chexpert-labeler'
input_filename = 'labeler-input.csv'
output_filename = 'labeler-output-v1234.csv'
cmd = (f'docker run -v {directory_path}:/data chexpert-labeler:latest '
       f'python label.py --reports_path /data/{input_filename} --output_path /data/{output_filename}')

print(cmd)

try:            
    subprocess.run(
        cmd, shell=True, check=True,
        stdout=subprocess.PIPE, stderr=subprocess.PIPE,
    )
except subprocess.CalledProcessError as e:
    print('Labeler failed, stdout and stderr:')
    print(e.stdout)
    print(e.stderr)
    raise

docker run -v /home/pamessina/medvqa-workspace/tmp/chexpert-labeler:/data chexpert-labeler:latest python label.py --reports_path /data/labeler-input.csv --output_path /data/labeler-output-v1234.csv --verbose


In [43]:
from medvqa.utils.common import TMP_DIR
from medvqa.utils.constants import CHEXPERT_LABELS
import numpy as np
import pandas as pd
import csv
import time

TMP_FOLDER = os.path.join(TMP_DIR, 'chexpert-labeler')

class ChexpertLabelerJob:
    def __init__(self, texts, input_filename, output_filename):
        
        self.texts = texts
        
        # Define input & output paths for i-th chunk
        self.input_path = os.path.join(TMP_FOLDER, input_filename)
        self.output_path = os.path.join(TMP_FOLDER, output_filename)

        # Create input file
        os.makedirs(TMP_FOLDER, exist_ok=True)
        in_df = pd.DataFrame(texts)        
        in_df.to_csv(self.input_path, header=False, index=False, quoting=csv.QUOTE_ALL)

        # Build command
        self.cmd = (f'docker run -v {TMP_FOLDER}:/data chexpert-labeler:latest '
        f'python label.py --reports_path /data/{input_filename} --output_path /data/{output_filename}')
        

def invoke_chexpert_labeler_process(texts, tmp_suffix='', n_chunks = 1, max_processes = 1):

    n = len(texts)
    if n < 50:
        n_chunks = 1

    chunk_size = n // n_chunks + (n % n_chunks > 0)
    processes = []
    output_paths = []
    
    print(f'Chexpert labeler: running a maximum of {max_processes} concurrent processes over {n_chunks} chunks')
    
    jobs = []
    for i in range(n_chunks):
        b = i * chunk_size
        e = b + chunk_size
        texts_chunk = texts[b:e]
        print(f'i={i}, b={b}, e={e}, n={n}, chunk_size={len(texts_chunk)}')
        input_filename = f'labeler-input{tmp_suffix}_{i}.csv'
        output_filename = f'labeler-output{tmp_suffix}_{i}.csv'
        jobs.append(ChexpertLabelerJob(texts_chunk, input_filename, output_filename))

    start = time.time()    
    idx = 1    
    job_idxs = list(range(len(jobs)))
    
    while len(job_idxs) > 0 or len(processes) > 0:
        
        if len(processes) == max_processes or len(job_idxs) == 0:
            
            next_processes = []
            
            for p in processes:
                p.wait()
                print(f'\tprocess {idx} finished, elapsed time = {time.time() - start}')
                idx += 1
                
                if len(job_idxs) > 0:
                    time.sleep(1)
                    i = job_idxs.pop(0)
                    print(f'\t{i+1}) Running chexpert labeler over {len(jobs[i].texts)} texts ...')
                    print(f'\tCommand = {jobs[i].cmd}')
                    next_processes.append(subprocess.Popen(jobs[i].cmd, shell=True))
                    
            
            processes.clear()
            processes = next_processes
        
        else:
            time.sleep(1)
            i = job_idxs.pop(0)
            print(f'\t{i+1}) Running chexpert labeler over {len(jobs[i].texts)} texts ...')
            print(f'\tCommand = {jobs[i].cmd}')
            processes.append(subprocess.Popen(jobs[i].cmd, shell=True))    
       
    time.sleep(3)
    
    out_labels = np.empty((n, len(CHEXPERT_LABELS)), np.int8)
    
    offset = 0
    
    for job in jobs:
        # Read chexpert-labeler output
        out_df = pd.read_csv(job.output_path)
        out_df = out_df.fillna(-2)
        assert len(out_df) == len(job.texts)
        out_labels[offset : offset + len(out_df)] = out_df[CHEXPERT_LABELS].to_numpy().astype(np.int8)
        offset += len(out_df)

    assert offset == n

    return out_labels

In [44]:
texts = list(pd.read_csv(os.path.join(TMP_FOLDER, 'labeler-input.csv'), header=None)[0])

In [45]:
len(texts)

477

In [46]:
labels1 = invoke_chexpert_labeler_process(texts, '__debug01', n_chunks=1, max_processes=1)

Chexpert labeler: running a maximum of 1 concurrent processes over 1 chunks
i=0, b=0, e=477, n=477, chunk_size=477
	1) Running chexpert labeler over 477 texts ...
	Command = docker run -v /home/pamessina/medvqa-workspace/tmp/chexpert-labeler:/data chexpert-labeler:latest python label.py --reports_path /data/labeler-input__debug01_0.csv --output_path /data/labeler-output__debug01_0.csv
	process 1 finished, elapsed time = 111.12745642662048


In [49]:
labels2 = invoke_chexpert_labeler_process(texts, '__debug01', n_chunks=2, max_processes=1)

Chexpert labeler: running a maximum of 1 concurrent processes over 2 chunks
i=0, b=0, e=239, n=477, chunk_size=239
i=1, b=239, e=478, n=477, chunk_size=238
	1) Running chexpert labeler over 239 texts ...
	Command = docker run -v /home/pamessina/medvqa-workspace/tmp/chexpert-labeler:/data chexpert-labeler:latest python label.py --reports_path /data/labeler-input__debug01_0.csv --output_path /data/labeler-output__debug01_0.csv
	process 1 finished, elapsed time = 58.21437931060791
	2) Running chexpert labeler over 238 texts ...
	Command = docker run -v /home/pamessina/medvqa-workspace/tmp/chexpert-labeler:/data chexpert-labeler:latest python label.py --reports_path /data/labeler-input__debug01_1.csv --output_path /data/labeler-output__debug01_1.csv
	process 2 finished, elapsed time = 119.26192998886108


In [51]:
labels5 = invoke_chexpert_labeler_process(texts, '__debug_c10_p5', n_chunks=10, max_processes=5)

Chexpert labeler: running a maximum of 5 concurrent processes over 10 chunks
i=0, b=0, e=48, n=477, chunk_size=48
i=1, b=48, e=96, n=477, chunk_size=48
i=2, b=96, e=144, n=477, chunk_size=48
i=3, b=144, e=192, n=477, chunk_size=48
i=4, b=192, e=240, n=477, chunk_size=48
i=5, b=240, e=288, n=477, chunk_size=48
i=6, b=288, e=336, n=477, chunk_size=48
i=7, b=336, e=384, n=477, chunk_size=48
i=8, b=384, e=432, n=477, chunk_size=48
i=9, b=432, e=480, n=477, chunk_size=45
	1) Running chexpert labeler over 48 texts ...
	Command = docker run -v /home/pamessina/medvqa-workspace/tmp/chexpert-labeler:/data chexpert-labeler:latest python label.py --reports_path /data/labeler-input__debug_c10_p5_0.csv --output_path /data/labeler-output__debug_c10_p5_0.csv
	2) Running chexpert labeler over 48 texts ...
	Command = docker run -v /home/pamessina/medvqa-workspace/tmp/chexpert-labeler:/data chexpert-labeler:latest python label.py --reports_path /data/labeler-input__debug_c10_p5_1.csv --output_path /data/

In [53]:
labels5_ = invoke_chexpert_labeler_process(texts, '__debug_c5_p5', n_chunks=5, max_processes=5)

Chexpert labeler: running a maximum of 5 concurrent processes over 5 chunks
i=0, b=0, e=96, n=477, chunk_size=96
i=1, b=96, e=192, n=477, chunk_size=96
i=2, b=192, e=288, n=477, chunk_size=96
i=3, b=288, e=384, n=477, chunk_size=96
i=4, b=384, e=480, n=477, chunk_size=93
	1) Running chexpert labeler over 96 texts ...
	Command = docker run -v /home/pamessina/medvqa-workspace/tmp/chexpert-labeler:/data chexpert-labeler:latest python label.py --reports_path /data/labeler-input__debug_c5_p5_0.csv --output_path /data/labeler-output__debug_c5_p5_0.csv
	2) Running chexpert labeler over 96 texts ...
	Command = docker run -v /home/pamessina/medvqa-workspace/tmp/chexpert-labeler:/data chexpert-labeler:latest python label.py --reports_path /data/labeler-input__debug_c5_p5_1.csv --output_path /data/labeler-output__debug_c5_p5_1.csv
	3) Running chexpert labeler over 96 texts ...
	Command = docker run -v /home/pamessina/medvqa-workspace/tmp/chexpert-labeler:/data chexpert-labeler:latest python label

In [18]:
labels1 = invoke_chexpert_labeler_process(texts, '__debug0', n_processes=1)

Chexpert labeler: running 1 processes in parallel
i=0, b=0, e=477, n=477, chunk_size=477
	1) Running chexpert labeler over 477 texts ...
	Command = docker run -v /home/pamessina/medvqa-workspace/tmp/chexpert-labeler:/data chexpert-labeler:latest python label.py --reports_path /data/labeler-input__debug0_0.csv --output_path /data/labeler-output__debug0_0.csv
	process 0 finished, elapsed time = 109.28207302093506


In [21]:
labels2 = invoke_chexpert_labeler_process(texts, '__debug2', n_processes=2)

Chexpert labeler: running 2 processes in parallel
i=0, b=0, e=238, n=477, chunk_size=238
	1) Running chexpert labeler over 238 texts ...
	Command = docker run -v /home/pamessina/medvqa-workspace/tmp/chexpert-labeler:/data chexpert-labeler:latest python label.py --reports_path /data/labeler-input__debug2_0.csv --output_path /data/labeler-output__debug2_0.csv
i=1, b=238, e=477, n=477, chunk_size=238
	2) Running chexpert labeler over 239 texts ...
	Command = docker run -v /home/pamessina/medvqa-workspace/tmp/chexpert-labeler:/data chexpert-labeler:latest python label.py --reports_path /data/labeler-input__debug2_1.csv --output_path /data/labeler-output__debug2_1.csv
	process 0 finished, elapsed time = 59.74182081222534
	process 1 finished, elapsed time = 62.908774852752686


In [24]:
labels3 = invoke_chexpert_labeler_process(texts, '__debug3', n_processes=3)

Chexpert labeler: running 3 processes in parallel
i=0, b=0, e=159, n=477, chunk_size=159
	1) Running chexpert labeler over 159 texts ...
	Command = docker run -v /home/pamessina/medvqa-workspace/tmp/chexpert-labeler:/data chexpert-labeler:latest python label.py --reports_path /data/labeler-input__debug3_0.csv --output_path /data/labeler-output__debug3_0.csv
i=1, b=159, e=318, n=477, chunk_size=159
	2) Running chexpert labeler over 159 texts ...
	Command = docker run -v /home/pamessina/medvqa-workspace/tmp/chexpert-labeler:/data chexpert-labeler:latest python label.py --reports_path /data/labeler-input__debug3_1.csv --output_path /data/labeler-output__debug3_1.csv
i=2, b=318, e=477, n=477, chunk_size=159
	3) Running chexpert labeler over 159 texts ...
	Command = docker run -v /home/pamessina/medvqa-workspace/tmp/chexpert-labeler:/data chexpert-labeler:latest python label.py --reports_path /data/labeler-input__debug3_2.csv --output_path /data/labeler-output__debug3_2.csv
	process 0 finis

In [26]:
labels8 = invoke_chexpert_labeler_process(texts, '__debug8', n_processes=8)

Chexpert labeler: running 8 processes in parallel
i=0, b=0, e=59, n=477, chunk_size=59
	1) Running chexpert labeler over 59 texts ...
	Command = docker run -v /home/pamessina/medvqa-workspace/tmp/chexpert-labeler:/data chexpert-labeler:latest python label.py --reports_path /data/labeler-input__debug8_0.csv --output_path /data/labeler-output__debug8_0.csv
i=1, b=59, e=118, n=477, chunk_size=59
	2) Running chexpert labeler over 59 texts ...
	Command = docker run -v /home/pamessina/medvqa-workspace/tmp/chexpert-labeler:/data chexpert-labeler:latest python label.py --reports_path /data/labeler-input__debug8_1.csv --output_path /data/labeler-output__debug8_1.csv
i=2, b=118, e=177, n=477, chunk_size=59
	3) Running chexpert labeler over 59 texts ...
	Command = docker run -v /home/pamessina/medvqa-workspace/tmp/chexpert-labeler:/data chexpert-labeler:latest python label.py --reports_path /data/labeler-input__debug8_2.csv --output_path /data/labeler-output__debug8_2.csv
i=3, b=177, e=236, n=477

In [59]:
n = 1000
n_chunks = 1
chunk_size = n // n_chunks + (n % n_chunks > 0)

if chunk_size < 80:
    chunk_size = 80
    n_chunks = n // chunk_size + (n % chunk_size > 0)
    chunk_size = n // n_chunks + (n % n_chunks > 0)
    
(chunk_size, n_chunks)

(1000, 1)