In [5]:
import os, sys
import pandas as pd
from datetime import date

todays_date = date.today().strftime("%Y-%m-%d")

In [31]:
import parsl
from parsl.app.app import python_app

In [7]:
import utility_functions

In [39]:
def localParslConfig_threadpool(params):
 
    import parsl
    from parsl.executors import ThreadPoolExecutor
    from parsl.config import Config

    import os
    workingdir = params['working_dir']
    rundir = os.path.join(workingdir, 'runinfo')
    parsl.clear()
    local_tpex = Config(
        executors=[
            ThreadPoolExecutor(
                label="tpex_Local",
                max_threads=8,
                thread_name_prefix='tpex_run',
                working_dir=workingdir,
            )
        ],
        strategy=None,
        run_dir=rundir
    )

    return(local_tpex)

In [9]:
enformer_predictions_path = "/lus/grand/projects/covid-ct/imlab/users/temi/projects/TFXcan/enformer_pipeline/enformer_predictions/kawakami/predictions_2022-12-20/kawakami" #sys.argv[1] 
log_path = "/lus/grand/projects/covid-ct/imlab/users/temi/projects/TFXcan/enformer_pipeline/predictions_log/kawakami/predictions_log_2022-12-20" #sys.argv[2]
each_id = "kawakami" #sys.argv[3] # e.g. "kawakami" or "cistrome"
agg_type = "aggByMean" #sys.argv[4]

print(f'[INFO] Currently on {each_id}')

upstream = list(range(0, 8))
center = [8]
downstream = list(range(9, 17))

base_path = '/grand/projects/covid-ct/imlab/users/temi/projects/TFXcan'
# enformer_predictions_path = f'{base_path}/enformer_pipeline/enformer_predictions/{each_id}_reference/predictions_2022-12-11/{each_id}_FOXA1'
# log_path = f'{base_path}/enformer_pipeline/predictions_log/predictions_log_2022-12-11'

save_dir = f'{base_path}/modeling_pipeline/data/train-test-val/{each_id}/data_{todays_date}'
if not os.path.isdir(save_dir):
    os.makedirs(save_dir)

TF = 'FOXA1'
#data_names = ['aggByCenter']
#data_names = ['aggByMean', 'aggByCenter', 'aggByUpstream', 'aggByDownstream', 'aggByUpstreamDownstream']

logpath = f'{log_path}/{each_id}_{TF}_predictions_log.csv'
log_data = pd.read_csv(logpath)
log_data = log_data.drop_duplicates(subset=['motif']).iloc[1:20, ]

[INFO] Currently on kawakami


In [26]:
def generate_batch(lst, batch_n, len_lst = None):
    """
    Given a list, this function yields batches of an unspecified size but the number of batches is equal to `batch_n`
    E.g. generate_batch([0, 1, 2, 3, 4, 5, 6], batch_n=2) -> (0, 1, 2, 3), (4, 5, 6)
    
    Parameters:
        lst: list
        batch_n: int
            Number of batches to return
        len_lst: None or num (length of the input list)
    Yields
        `batch_n` batches of the list
    """
    import math
    # how many per batch
    if len_lst is not None:
        n_elems = math.ceil(len_lst/batch_n)
    else:
        n_elems = math.ceil(len(lst)/batch_n)
        
    for i in range(0, len(lst), n_elems):
        yield lst[i:(i + n_elems)]

In [44]:
# load parsl
bpath = os.path.join(base_path, 'modeling_pipeline')
parsl.load(localParslConfig_threadpool({'working_dir': bpath}))

<parsl.dataflow.dflow.DataFlowKernel at 0x7fc34e49c190>

In [45]:
@python_app
def collect_modeling_data_for_kawakami(each_id, log_data, predictions_path, TF, base_path, save_dir, agg_types, batch_num=None):

    import h5py
    import numpy as np
    import os
    import pandas as pd
    #import tqdm
    # read in one of the files

    try:
        import utility_functions
    except ModuleNotFoundError:
        print(f'[ERROR] Utility_functions module not found.')

    #exec(open(f'{base_path}/modeling_pipeline/scripts/collect_model_data/utility-functions.py').read(), globals(), globals())
    # bpath = os.path.join(base_path, 'modeling_pipeline')
    # localParslConfig_threadpool({'working_dir': bpath})

    kawakami_predictions = {}

    for dt in log_data.loc[log_data['sequence_type'] == 'ref', ].motif.values.tolist():
        fle = f'{predictions_path}/{dt}_predictions.h5'
        if os.path.isfile(fle):
            with h5py.File(fle, 'r') as f:
                filekey = list(f.keys())[0]
                kawakami_predictions[dt] = np.vstack(list(f[filekey]))
        else:
            print(f'[ERROR] {dt} predictions file does not exist.')

    print(f'[INFO] Finished collecting {len(kawakami_predictions)} predictions for {each_id}')

    #dt_aggbycenter = agg_by_center(kawakami_predictions, center=8)
    #data_list = [dt_aggbycenter]

    data_dict = {}
    for agg_type in agg_types:
        if agg_type == 'aggByMean': data_dict[agg_type] = utility_functions.agg_by_mean(kawakami_predictions)
        if agg_type == 'aggByCenter': data_dict[agg_type] = utility_functions.agg_by_center(kawakami_predictions)
        if agg_type == 'aggByUpstream': data_dict[agg_type] = utility_functions.agg_by_mean(kawakami_predictions, use_bins=upstream)
        if agg_type == 'aggByDownstream': data_dict[agg_type] = utility_functions.agg_by_mean(kawakami_predictions, use_bins=downstream)
        if agg_type == 'aggByUpstreamDownstream': data_dict[agg_type] = utility_functions.agg_by_mean(kawakami_predictions, use_bins=upstream + downstream)

    #test_aggbymean, test_aggbycenter, test_aggbymean_upstream, test_aggbymean_downstream, test_aggbymean_upstream_downstream = agg_byall(kawakami_predictions)
    #data_list = [test_aggbymean, test_aggbycenter, test_aggbymean_upstream, test_aggbymean_downstream, test_aggbymean_upstream_downstream]

    for i, agg_type in enumerate(data_dict):

        #ty = pd.concat([pd.Series(list(kawakami_predictions.keys())), pd.DataFrame(dt)], axis=1)
        ty = pd.DataFrame(data_dict[agg_type])
        print(f'[INFO] Dimension of collected data is {ty.shape[0]} by {ty.shape[1]}')

        column_names = ['id']
        column_names.extend([f'f_{i}' for i in range(1, ty.shape[1])])

        #print(len(column_names))
        ty = ty.set_axis(column_names, axis=1, inplace=False)
        print(ty.iloc[0:5, 0:5])

        if batch_num is None:
            ty.to_csv(path_or_buf=f'{save_dir}/{each_id}_{agg_type}_{TF}.csv.gz', index=False, compression='gzip')
        else:
            ty.to_csv(path_or_buf=f'{save_dir}/{each_id}_{agg_type}_{TF}_batch_{batch_num}.csv.gz', index=False, compression='gzip')
            
    print(f'[INFO] Finished saving data for {each_id}')

    return(0)

In [62]:
range_batches = generate_batch(range(0, log_data.shape[0]), batch_n=3)

In [61]:
for r in range_batches:
    print(log_data.iloc[list(r), ])

                      motif individual     status sequence_type
1  chr3_192940730_192940738   kawakami  completed           ref
2   chr12_38940385_38940393   kawakami  completed           ref
3   chr10_31409432_31409440   kawakami  completed           ref
4  chr2_135230278_135230286   kawakami  completed           ref
5   chr11_33763385_33763393   kawakami  completed           ref
6     chr20_9265818_9265826   kawakami  completed           ref
7    chr4_42560188_42560196   kawakami  completed           ref
                       motif individual     status sequence_type
8    chr11_48899081_48899089   kawakami  completed           ref
9    chr10_47536459_47536467   kawakami  completed           ref
10  chr6_129623391_129623399   kawakami  completed           ref
11   chr20_58329599_58329607   kawakami  completed           ref
12   chr13_57434336_57434344   kawakami  completed           ref
13   chr19_37202645_37202653   kawakami  completed           ref
14    chr2_17741138_17741146   ka

In [59]:
log_data

Unnamed: 0,motif,individual,status,sequence_type
1,chr3_192940730_192940738,kawakami,completed,ref
2,chr12_38940385_38940393,kawakami,completed,ref
3,chr10_31409432_31409440,kawakami,completed,ref
4,chr2_135230278_135230286,kawakami,completed,ref
5,chr11_33763385_33763393,kawakami,completed,ref
6,chr20_9265818_9265826,kawakami,completed,ref
7,chr4_42560188_42560196,kawakami,completed,ref
8,chr11_48899081_48899089,kawakami,completed,ref
9,chr10_47536459_47536467,kawakami,completed,ref
10,chr6_129623391_129623399,kawakami,completed,ref


In [63]:
count = 0
app_futures = []
for range_batch in range_batches:

    collected = collect_modeling_data_for_kawakami(each_id=each_id, log_data=log_data.iloc[list(range_batch), ], predictions_path=enformer_predictions_path, TF=TF, agg_types=[agg_type], base_path=base_path, save_dir=save_dir, batch_num=count)
    app_futures.append(collected)
    count = count + 1

app_execs = [r.result() for r in app_futures]

[INFO] Finished collecting 5 predictions for kawakami
[INFO] Dimension of collected data is 5 by 5314
                         id          f_1          f_2          f_3  \
0   chr10_98869401_98869409   0.03649443   0.03414614   0.02135184   
1  chr9_125357974_125357982  0.066400915  0.056020923   0.03805268   
2  chrX_127426104_127426112  0.046020515  0.039052837  0.029655889   
3   chr19_20837597_20837605   0.03973038  0.036300007   0.02595007   
4  chr6_136092701_136092709  0.039942842  0.049277883   0.02926413   

           f_4  
0  0.102983005  
1   0.08018745  
2   0.07954953  
3  0.061543964  
4    0.0838695  
[INFO] Finished collecting 7 predictions for kawakami
[INFO] Finished collecting 7 predictions for kawakami
[INFO] Dimension of collected data is 7 by 5314
                         id          f_1          f_2          f_3  \
0   chr11_48899081_48899089   0.08941948   0.10191547   0.20742415   
1   chr10_47536459_47536467   0.07756555   0.09292847   0.07055065   
2  chr6_1

  ty = ty.set_axis(column_names, axis=1, inplace=False)
  ty = ty.set_axis(column_names, axis=1, inplace=False)
  ty = ty.set_axis(column_names, axis=1, inplace=False)


[INFO] Finished saving data for kawakami
[INFO] Finished saving data for kawakami


In [25]:
collected = collect_modeling_data_for_kawakami(each_id=each_id, log_data=log_data, predictions_path=enformer_predictions_path, TF=TF, agg_types=[agg_type], base_path=base_path, save_dir=save_dir)

[INFO] Finished collecting 19 predictions for kawakami
[INFO] Dimension of collected data is 19 by 5314
                         id          f_1           f_2          f_3  \
0  chr3_192940730_192940738  0.052521303   0.059343856   0.08436744   
1   chr12_38940385_38940393   0.05062969   0.042285476  0.031967606   
2   chr10_31409432_31409440  0.004776896  0.0055661146  0.004850283   
3  chr2_135230278_135230286  0.119184524    0.13753144  0.091157116   
4   chr11_33763385_33763393   0.06338547    0.05521163  0.030234452   

           f_4  
0  0.077099375  
1   0.08609707  
2  0.008029616  
3  0.104358084  
4   0.06341503  


  ty = ty.set_axis(column_names, axis=1, inplace=False)


[INFO] Finished saving data for kawakami


In [9]:
def collect_modeling_data_for_kawakami(each_id, log_data, predictions_path, TF, data_names, base_path, save_dir):

    import h5py
    import numpy as np
    import os
    import pandas as pd
    import tqdm
    # read in one of the files

    exec(open(f'{base_path}/modeling_pipeline/scripts/collect_model_data/utility-functions.py').read(), globals(), globals())

    kawakami_predictions = {}

    for dt in log_data.loc[log_data['sequence_type'] == 'ref', ].motif.values.tolist():
        fle = f'{predictions_path}/{dt}_predictions.h5'
        print(fle)
        if os.path.isfile(fle):
            with h5py.File(fle, 'r') as f:
                filekey = list(f.keys())[0]
                # should I select tracks? ; maybe not yet
                kawakami_predictions[dt] = np.vstack(list(f[filekey]))
        else:
            print('File does not exist')

    print(f'[INFO] Finished collecting {len(kawakami_predictions)} predictions for {each_id}')

    dt_aggbycenter = agg_by_center(kawakami_predictions, center=8)
    data_list = [dt_aggbycenter]

    # test_aggbymean, test_aggbycenter, test_aggbymean_upstream, test_aggbymean_downstream, test_aggbymean_upstream_downstream = agg_byall(freedman_predictions)
    # data_list = [test_aggbymean, test_aggbycenter, test_aggbymean_upstream, test_aggbymean_downstream, test_aggbymean_upstream_downstream]

    for i, dt in enumerate(data_list):

        ty = pd.concat([pd.Series(list(kawakami_predictions.keys())), pd.DataFrame(dt)], axis=1)

        column_names = ['id', 'class']
        column_names.extend([f'f_{i}' for i in range(1, ty.shape[1] - 1)])

        ty = ty.set_axis(column_names, axis=1, copy=False)

        ty.to_csv(path_or_buf=f'{save_dir}/{each_id}_{data_names[i]}_{TF}.csv.gz', index=False, compression='gzip')
    print(f'[INFO] Finished saving data for {each_id}')

    return(0)

In [10]:
collected = collect_modeling_data_for_kawakami(each_id=each_id, log_data=log_data, predictions_path=enformer_predictions_path, TF=TF, data_names=data_names, base_path=base_path, save_dir=save_dir)

#print(collected)

print(f'[INFO] Status: {collected} for {log_data.shape[0]} predictions for {each_id}.')

[INFO] Finished collecting 0 predictions for kawakami


ValueError: need at least one array to concatenate