-
-
Notifications
You must be signed in to change notification settings - Fork 749
Open
Description
cluster = ECSCluster(
#cluster_name_template="categorization-pipeline",
image=docker_image,
fargate_scheduler=True,
fargate_workers=True,
scheduler_cpu=8192,
scheduler_mem=16384,
worker_cpu=4096,
worker_mem=8192, # 50 accounts per batch and 4096 seems to work fine
n_workers=100,
worker_nthreads = 1,
execution_role_arn=execution_role_arn,
task_role_arn=task_role_arn
)
client = Client(cluster)
for batch in zip_discard_compr(n_batch, account_ids):
(transactions, transactions_empty,) = dask.delayed(
process_accounts, nout=2
)(batch, conn_str, output_dir_path)
transactions_lst.append(
transactions
) #process account is basically a function where we are running a lot of pandas transformation
dd.from_delayed(transactions_lst, verify_meta=False).to_parquet(
os.path.join(output_dir_path, f"transactions.parquet"),
schema=schema,
partition_on=["is_labelled"],
engine="pyarrow",
compression=COMPRESSION
)
I am basically using an AWS ECS cluster + dask delayed function to perform the operations
When I am running the same code on Local cluster and on AWS ECS cluster with less workers it is working fine , the moment I increase the data , I am getting the below error
concurrent.futures._base.CancelledError: ('store-to-parquet-0b4eae35cbfa765ea0e930d53ca6ffaa', 0)
2022-12-02 11:53:26,378 - distributed.client - ERROR -
ConnectionRefusedError: [Errno 111] Connection refused
Anything else we need to know?:
Environment:
- Dask version:2022.11.1
- Python version:python:3.9.7
- Operating System:linux
- Install method (conda, pip, source):pip
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels