In [1]:
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
from dask.utils import parse_bytes
import cudf
import dask_cudf

# sorted(glob.glob("./Input/KJS/EYS_2nd_lib/**/*Frags.fastq", recursive=True))
INPUT_DATA = ['./Input/KJS/EYS_2nd_lib/1/20230717_1.extendedFrags.fastq',
 './Input/KJS/EYS_2nd_lib/2/20230717_2.extendedFrags.fastq',
 './Input/KJS/EYS_2nd_lib/3/20230717_3.extendedFrags.fastq',
 './Input/KJS/EYS_2nd_lib/4/20230717_4.extendedFrags.fastq',
 './Input/KJS/EYS_2nd_lib/5/20230717_5.extendedFrags.fastq',
 './Input/KJS/EYS_2nd_lib/6/20230717_6.extendedFrags.fastq',
 './Input/KJS/EYS_2nd_lib/7/20230717_7.extendedFrags_error_truncated.fastq',
 './Input/KJS/EYS_2nd_lib/8/20230717_8.extendedFrags.fastq']

In [2]:
cluster = LocalCUDACluster(
    CUDA_VISIBLE_DEVICES="0",
    rmm_pool_size=parse_bytes("20GB"), # This GPU has 48GB of memory
    device_memory_limit=parse_bytes("20GB"),
)
client = Client(cluster)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 44501 instead


0,1
Connection method: Cluster object,Cluster type: dask_cuda.LocalCUDACluster
Dashboard: http://127.0.0.1:44501/status,

0,1
Dashboard: http://127.0.0.1:44501/status,Workers: 1
Total threads: 1,Total memory: 62.67 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:39951,Workers: 1
Dashboard: http://127.0.0.1:44501/status,Total threads: 1
Started: Just now,Total memory: 62.67 GiB

0,1
Comm: tcp://127.0.0.1:37159,Total threads: 1
Dashboard: http://127.0.0.1:33001/status,Memory: 62.67 GiB
Nanny: tcp://127.0.0.1:42233,
Local directory: /tmp/dask-scratch-space/worker-2z4dcq2m,Local directory: /tmp/dask-scratch-space/worker-2z4dcq2m
GPU: NVIDIA GeForce RTX 3090,GPU memory: 24.00 GiB


In [3]:
!du -sh ./Input/KJS/EYS_2nd_lib/*/*.fastq

25G	./Input/KJS/EYS_2nd_lib/1/20230717_1.extendedFrags.fastq
25G	./Input/KJS/EYS_2nd_lib/2/20230717_2.extendedFrags.fastq
22G	./Input/KJS/EYS_2nd_lib/3/20230717_3.extendedFrags.fastq
23G	./Input/KJS/EYS_2nd_lib/4/20230717_4.extendedFrags.fastq
26G	./Input/KJS/EYS_2nd_lib/5/20230717_5.extendedFrags.fastq
26G	./Input/KJS/EYS_2nd_lib/6/20230717_6.extendedFrags.fastq
22G	./Input/KJS/EYS_2nd_lib/7/20230717_7.extendedFrags_error_truncated.fastq
25G	./Input/KJS/EYS_2nd_lib/8/20230717_8.extendedFrags.fastq


In [6]:
%%time
ddf = dask_cudf.read_csv(INPUT_DATA[0], header=None, names=["DATA"],blocksize="2 GB")
print(ddf.npartitions)
len(ddf)

14
CPU times: user 83.3 ms, sys: 89.4 ms, total: 173 ms
Wall time: 6.54 s


179206932

In [7]:
ddf_reshaped = ddf["DATA"].to_dask_array(lengths=True).reshape(-1, 4)
ddf_reshaped

TypeError: String Arrays is not yet implemented in cudf

In [5]:
qs = [
    dask_cudf.read_csv(f, blocksize="1 GB",header=None, names=["DATA"], dtype=["str"])
    for i, f in enumerate(
        INPUT_DATA
    )
]
qs

[<dask_cudf.DataFrame | 27 tasks | 27 npartitions>,
 <dask_cudf.DataFrame | 26 tasks | 26 npartitions>,
 <dask_cudf.DataFrame | 23 tasks | 23 npartitions>,
 <dask_cudf.DataFrame | 25 tasks | 25 npartitions>,
 <dask_cudf.DataFrame | 28 tasks | 28 npartitions>,
 <dask_cudf.DataFrame | 28 tasks | 28 npartitions>,
 <dask_cudf.DataFrame | 24 tasks | 24 npartitions>,
 <dask_cudf.DataFrame | 26 tasks | 26 npartitions>]

In [6]:
# [q.select(pl.count()).collect() for q in qs]

In [7]:
qs[1].columns[0]

'DATA'

In [8]:
qs[1][qs[1].columns[0]].head()

0    @ST-E00127:1400:HJTMMCCX2:2:1101:15554:2944 1:...
1    GTTTCAACTTGAAAAAGTGGCACCGAGGCGGTGCGAACTATTCTTT...
2                                                    +
3    AAFFFJJJAAA<FJJJJFAFJJFJAAA-AFAAA<F<<AFJJFJFJJ...
4    @ST-E00127:1400:HJTMMCCX2:2:1101:17827:2944 1:...
Name: DATA, dtype: object

In [9]:
sample_query = (
    qs[1][qs[1].columns[0]]
    .str.contains(
        r"TTTTACTACACATCGCTACTACTG(.*)TCATCCTTGGAAGAATCCATTACCCTATCAAAGTAATTTG"
    )
    .sum()
).compute()
sample_query

558

In [10]:
# sample_query.to_numpy().squeeze().item()

In [11]:
[
    q[q.columns[0]]
    .str.contains(
        r"TTTTACTACACATCGCTACTACTG(.*)TCATCCTTGGAAGAATCCATTACCCTATCAAAGTAATTTG"
    )
    .sum()
    .compute()
    for q in qs
]

[540, 558, 583, 654, 757, 552, 663, 693]

In [12]:
[
    q[q.columns[0]]
    .str.contains(
        r"TTTTACTACACATCGCTACTACTG"
    )
    .sum()
    .compute()
    for q in qs
]

[636, 782, 689, 776, 889, 749, 760, 811]