In [1]:
from yaml import safe_load as yaml_load
from yaml import dump as yaml_dump
import os
import shutil
import time
import tempfile
# set the dask config
os.environ['DASK_CONFIG'] = '../.config/dask/'

import pandas as pd
import logging
import logging_tree

from codecarbon import EmissionsTracker
import dask
import dask_jobqueue
import distributed

import learn2therm.utils
import learn2therm.blast

LOGLEVEL = logging.INFO
LOGNAME = 'loger'
LOGFILE = f'./logs/test_cluster.log'

PROTEIN_SEQ_DIR = '../data/taxa/proteins/'
# NUM_SAMPLE = 32
WORKER_WAKE_UP_TIME = 25 # this is to ensure that if a worker that is about to be shut down due to previous task completetion doesn't actually start running

def worker_function(alignment_handler):
    """Run one taxa pair on a worker."""
    # we want to wait for execution to see if this worker is actually being used
    # or if it is in the process of being killed
    time.sleep(WORKER_WAKE_UP_TIME)
    # begin execution
    t0=time.time()
    worker_logfile = f'./logs/t1.4_protein_alignment_resource_test_workers/pair_{alignment_handler.pair_indexes}.log'
    logger = learn2therm.utils.start_logger_if_necessary(LOGNAME, worker_logfile, LOGLEVEL, filemode='a', worker=True)
    learn2therm.blast.logger = learn2therm.utils.start_logger_if_necessary('learn2therm.blast', worker_logfile, LOGLEVEL, filemode='a', worker=True)
    learn2therm.io.logger = learn2therm.utils.start_logger_if_necessary('learn2therm.io', worker_logfile, LOGLEVEL, filemode='a', worker=True)
    # logger.info(logging_tree.tree())
    logger.info(f"recieved pair {alignment_handler.pair_indexes}")
    
    with EmissionsTracker(project_name=f"t1.4_{alignment_handler.pair_indexes}", output_dir='./logs/t1.4_protein_alignment_resource_test_workers/') as tracker:
        out_dic = alignment_handler.run()
    t1=time.time()
    logger.info(f"Completed pair {alignment_handler.pair_indexes}. Took {(t1-t0)/60}m")
    return out_dic

In [2]:
Aligner = getattr(learn2therm.blast, 'BlastAlignmentHandler')

In [3]:
with open("../params.yaml", "r") as stream:
    params = yaml_load(stream)['get_protein_blast_scores']
logger = logging.basicConfig(level=logging.INFO)
logging.info(f"Loaded parameters: {params}")

INFO:root:Loaded parameters: {'dask_cluster_class': 'SLURMCluster', 'max_protein_length': 250, 'method': 'blast', 'n_jobs': 8, 'restart': False, 'method_blast_params': {'num_threads': 6, 'word_size': 3, 'gapopen': 11, 'gapextend': 1, 'matrix': 'BLOSUM62', 'threshold': 11, 'ungapped': False}, 'method_diamond_params': {'num_threads': 5, 'sensitivity': 'sensitive', 'iterate': True, 'global_ranking': None, 'gapopen': 11, 'gapextend': 1, 'matrix': 'BLOSUM62'}, 'blast_metrics': ['local_E_value', 'scaled_local_query_percent_id', 'scaled_local_symmetric_percent_id', 'local_gap_compressed_percent_id']}


In [4]:
aligner_params = params[f"method_blast_params"]

In [5]:
aligner_params['num_threads'] = 28

In [6]:
pairs = [('5223','10686'), ('85', '7')]

In [7]:
Cluster = getattr(dask_jobqueue, params['dask_cluster_class'])
cluster = Cluster(silence_logs=None, job_cpu=28)

INFO:distributed.http.proxy:To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
INFO:bokeh.server.tornado:Keep-alive ping configured every 500 milliseconds
INFO:bokeh.server.tornado:Check for unused sessions every 500 milliseconds
INFO:bokeh.server.tornado:User authentication hooks NOT provided (default user enabled)
INFO:distributed.scheduler:Clear task state
INFO:distributed.scheduler:  Scheduler at:   tcp://10.64.66.32:33995
INFO:distributed.scheduler:  dashboard at:                     :8787


In [8]:
cluster.adapt(minimum=2, maximum=3)

INFO:distributed.deploy.adaptive:Adaptive scaling started: minimum=2 maximum=3


<distributed.deploy.adaptive.Adaptive at 0x14f9a2c6ceb0>

In [9]:
with distributed.Client(cluster) as client:
    # run the job
    results = []
    with learn2therm.blast.AlignmentClusterFutures(
        pairs=pairs,
        client=client,
        worker_function=worker_function,
        max_seq_len=params['max_protein_length'],
        protein_deposit=PROTEIN_SEQ_DIR,
        alignment_score_deposit='./logs/t1.4_protein_alignment_resource_test_workers/output/',
        metrics=params['blast_metrics'],
        alignment_params=aligner_params,
        restart=False
    ) as futures:
        for future in futures:
            results.append(future)

INFO:distributed.scheduler:Receive client connection: Client-cfa76a59-6467-11ed-8dc8-0a94eff00317
INFO:distributed.core:Starting established connection
INFO:learn2therm.blast:Completed pairs: ['85-7']
INFO:learn2therm.blast:Pair 85-7 already completed
INFO:learn2therm.blast:Pair 5223-10686 erroneously ended, cleaning up file
INFO:learn2therm.blast:Found 1 pairs already complete. Cleaned up 1 erroneous files.
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://10.64.66.7:34815', name: SLURMCluster-1, status: undefined, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://10.64.66.7:34815
INFO:distributed.core:Starting established connection
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://10.64.66.32:38523', name: SLURMCluster-0, status: undefined, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://10.64.66.32:38523
INFO:distributed.core:Starting established connection
INFO:learn2therm

In [2]:
import pandas as pd

In [5]:
df = pd.DataFrame([{'a':1, 'b':2}, {'a':1, 'b':2}])

In [6]:
s = pd.Series({'a':1, 'b':2})

In [22]:
pd.concat([df, s.to_frame().T], axis=0, ignore_index=True)

Unnamed: 0,a,b
0,1,2
1,1,2
2,1,2


In [11]:
df

Unnamed: 0,a,b
0,1,2
1,1,2
