In [1]:
import argparse
import numpy as np
import pandas as pd
from tqdm import tqdm
from utils import replace_degenerate_nucleotides, remove_degenerate_nucleotides, load_sequences, save_features_to_parquet
from numba import jit
import os

import dask.dataframe as dd
import dask
from dask.distributed import Client, LocalCluster, get_worker
import pandas as pd
import numpy as np
import os
import gc
import time

dask.config.set({
    'distributed.worker.memory.target': 0.6,  # avoid using more than 60% of memory
    'distributed.worker.memory.spill': 0.7,  # spill to disk at 70% of memory usage
    'distributed.worker.memory.pause': 0.8,  # pause execution at 80% of memory usage
    'distributed.worker.memory.terminate': 0.95  # restart worker at 95% of memory usage
})

# Initialize Dask Client
cluster = LocalCluster(
    n_workers=6,
    threads_per_worker=1,
    memory_limit='2GB'
)

client = Client(cluster)
print(f"Number of workers: {len(client.cluster.workers)}")
print(f"Threads per worker: {client.cluster.workers[0].nthreads}")

@jit(nopython=True)
def generate_chaos_game_representation(sequence, resolution, nucleotide_mapping):
    image = np.zeros((resolution, resolution), dtype=np.uint8)

    x, y = 0.5, 0.5
    scale = resolution - 1

    for char in sequence:
        if char == 'A':
            index = 0
        elif char == 'C':
            index = 1
        elif char == 'G':
            index = 2
        elif char == 'T':
            index = 3
        else:
            continue  # Skip unknown characters

        corner_x, corner_y = nucleotide_mapping[index]
        x = (x + corner_x) / 2
        y = (y + corner_y) / 2

        ix, iy = int(x * scale), int(y * scale)
        image[iy, ix] += 1

    return image.flatten()

Number of workers: 6
Threads per worker: 1


In [2]:
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 6
Total threads: 6,Total memory: 11.18 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:35489,Workers: 6
Dashboard: http://127.0.0.1:8787/status,Total threads: 6
Started: Just now,Total memory: 11.18 GiB

0,1
Comm: tcp://127.0.0.1:39563,Total threads: 1
Dashboard: http://127.0.0.1:40341/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:38891,
Local directory: /tmp/dask-scratch-space/worker-n4fqyk8y,Local directory: /tmp/dask-scratch-space/worker-n4fqyk8y

0,1
Comm: tcp://127.0.0.1:46545,Total threads: 1
Dashboard: http://127.0.0.1:34823/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:38501,
Local directory: /tmp/dask-scratch-space/worker-ksukjyz8,Local directory: /tmp/dask-scratch-space/worker-ksukjyz8

0,1
Comm: tcp://127.0.0.1:32925,Total threads: 1
Dashboard: http://127.0.0.1:36457/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:35067,
Local directory: /tmp/dask-scratch-space/worker-6jnk0bs_,Local directory: /tmp/dask-scratch-space/worker-6jnk0bs_

0,1
Comm: tcp://127.0.0.1:33965,Total threads: 1
Dashboard: http://127.0.0.1:41141/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:37117,
Local directory: /tmp/dask-scratch-space/worker-uv6l8uz_,Local directory: /tmp/dask-scratch-space/worker-uv6l8uz_

0,1
Comm: tcp://127.0.0.1:33957,Total threads: 1
Dashboard: http://127.0.0.1:40655/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:34915,
Local directory: /tmp/dask-scratch-space/worker-_akpwpzf,Local directory: /tmp/dask-scratch-space/worker-_akpwpzf

0,1
Comm: tcp://127.0.0.1:42423,Total threads: 1
Dashboard: http://127.0.0.1:46657/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:40839,
Local directory: /tmp/dask-scratch-space/worker-47lcv5vx,Local directory: /tmp/dask-scratch-space/worker-47lcv5vx


In [2]:
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster, get_worker
import pandas as pd
import numpy as np
import os
import time

def generate_fcgr_features(partition, resolution, nucleotide_mapping):
    local_features = np.zeros((len(partition), resolution * resolution), dtype=np.uint8)
    for i, sequence in enumerate(partition['Sequence']):
        local_features[i, :] = generate_chaos_game_representation(sequence, resolution, nucleotide_mapping)
    return local_features

# Function to process each partition and directly write the results
def process_and_write_partition(partition, resolution, nucleotide_mapping, output_path):
    start_time = time.time()  # Start timing the operation

    # Generate features
    index = partition['Accession ID'][0]
    print(index)
    features_array = generate_fcgr_features(partition, resolution, nucleotide_mapping)
    df = pd.DataFrame(features_array, columns=[f'feature {j+1}' for j in range(features_array.shape[1])])
    df['Target'] = partition['Lineage'].tolist()
    df['Train'] = partition['Train'].tolist()
    # Construct a unique path to save each partition's results
    partition_output_path = os.path.join(output_path, f'features_part_{index}.parquet')
    
    # Write the DataFrame to Parquet using PyArrow
    df.to_parquet(partition_output_path, engine='pyarrow', index=False)

    end_time = time.time()  # End timing the operation
    processing_time = end_time - start_time

    # Get worker information
    worker = get_worker()
    worker_id = worker.id
    
    del partition
    del df
    gc.collect()

    # Construct output information
    result = pd.Series({'index': index, 'worker_no': worker_id, 'time': processing_time})
    return result

# Load dataset and set parameters
data_path = '../../../data/sequences/p_24'
base_output_path = 'features_output_directory'
resolution = 256
nucleotide_mapping = np.array([[0, 0], [0, 1], [1, 1], [1, 0]], dtype=np.float32)

os.makedirs(base_output_path, exist_ok=True)
sequence_df = dd.read_parquet(data_path, engine='pyarrow')

meta = pd.Series({'index': int(), 'worker_no': str(), 'time': float()})
result = sequence_df.map_partitions(
    process_and_write_partition,
    resolution=resolution,
    nucleotide_mapping=nucleotide_mapping,
    output_path=base_output_path,
    meta=meta
)

# Compute the result
result.compute()

# Close the client
client.close()

EPI_ISL_5603121
EPI_ISL_17002109
EPI_ISL_17008444
EPI_ISL_14223507
EPI_ISL_12401079
EPI_ISL_17233264
EPI_ISL_4572379
EPI_ISL_7310277
EPI_ISL_3101511
EPI_ISL_18341857
EPI_ISL_15728433
EPI_ISL_13959263
EPI_ISL_745164
EPI_ISL_17269454
EPI_ISL_14399462
EPI_ISL_12043223
EPI_ISL_10878668
EPI_ISL_2273070
EPI_ISL_17528828
EPI_ISL_12006181
EPI_ISL_14223528
EPI_ISL_16856066
EPI_ISL_13963363
EPI_ISL_15104785


