-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathutils.py
More file actions
105 lines (92 loc) · 2.84 KB
/
utils.py
File metadata and controls
105 lines (92 loc) · 2.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from typing import Any
import boto3
def get_dataset_path(local, scale):
remote_paths = {
1: "s3://coiled-runtime-ci/tpc-h/snappy/scale-1/",
10: "s3://coiled-runtime-ci/tpc-h/snappy/scale-10/",
100: "s3://coiled-runtime-ci/tpc-h/snappy/scale-100/",
1000: "s3://coiled-runtime-ci/tpc-h/snappy/scale-1000/",
10000: "s3://coiled-runtime-ci/tpc-h/snappy/scale-10000/",
}
local_paths = {
1: "./tpch-data/scale-1/",
10: "./tpch-data/scale-10/",
100: "./tpch-data/scale-100/",
}
if local:
return local_paths[scale]
else:
return remote_paths[scale]
def get_answers_path(local, scale):
if local:
return f"./tpch-data/answers/scale-{scale}/"
return f"s3://coiled-runtime-ci/tpc-h/answers/scale-{scale}/"
def get_bucket_region(path: str):
if not path.startswith("s3://"):
raise ValueError(f"'{path}' is not an S3 path")
bucket = path.replace("s3://", "").split("/")[0]
resp = boto3.client("s3").get_bucket_location(Bucket=bucket)
# Buckets in region 'us-east-1' results in None, b/c why not.
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/get_bucket_location.html#S3.Client.get_bucket_location
return resp["LocationConstraint"] or "us-east-1"
def get_cluster_spec(scale: int, shutdown_on_close: bool) -> dict[str, Any]:
everywhere = dict(
idle_timeout="1h",
wait_for_workers=True,
scheduler_vm_types=["m6i.xlarge"],
shutdown_on_close=shutdown_on_close,
)
if scale == 1:
return {
"worker_vm_types": ["m6i.large"],
"n_workers": 4,
**everywhere,
}
if scale == 10:
return {
"worker_vm_types": ["m6i.large"],
"n_workers": 8,
**everywhere,
}
elif scale == 100:
return {
"worker_vm_types": ["m6i.large"],
"n_workers": 16,
**everywhere,
}
elif scale == 1000:
return {
"worker_vm_types": ["m6i.xlarge"],
"n_workers": 32,
"worker_disk_size": 128,
**everywhere,
}
elif scale == 10000:
return {
"worker_vm_types": ["m6i.xlarge"],
"n_workers": 32,
"worker_disk_size": 200,
**everywhere,
}
def get_single_vm_spec(scale):
if scale == 1:
return {
"vm_type": "m6i.2xlarge",
}
if scale == 10:
return {
"vm_type": "m6i.4xlarge",
}
elif scale == 100:
return {
"vm_type": "m6i.8xlarge",
}
elif scale == 1000:
return {
"vm_type": "m6i.32xlarge",
}
elif scale == 10000:
return {
"vm_type": "m6i.32xlarge",
"disk_size": 1000,
}