-
Notifications
You must be signed in to change notification settings - Fork 0
/
kaggle_import.py
385 lines (330 loc) · 12.4 KB
/
kaggle_import.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
"""Kaggle Dataset workflow plugin module"""
import tempfile
from typing import Sequence, Tuple, Any
import os
import time
from zipfile import ZipFile
from kaggle.rest import ApiException
from kaggle.api import KaggleApi
from cmem_plugin_base.dataintegration.context import (
ExecutionContext,
PluginContext,
ExecutionReport,
)
from cmem_plugin_base.dataintegration.description import Plugin, PluginParameter
from cmem_plugin_base.dataintegration.entity import Entities
from cmem_plugin_base.dataintegration.parameter.dataset import DatasetParameterType
from cmem_plugin_base.dataintegration.parameter.password import Password
from cmem_plugin_base.dataintegration.plugins import WorkflowPlugin
from cmem_plugin_base.dataintegration.types import StringParameterType, Autocompletion
from cmem_plugin_base.dataintegration.utils import write_to_dataset
api = KaggleApi()
DATASET_TYPES = {
"csv": "csv",
"json": "json",
"xlsx": "excel",
"xml": "xml",
"zip": "multiCsv",
"txt": "text",
}
class KaggleDataset:
"""Kaggle Dataset Object for Internal Purpose"""
def __init__(self, owner, name):
"""Constructor"""
self.owner = owner
self.name = name
def get_slugs(dataset) -> KaggleDataset:
"""Dataset Slugs"""
if "/" in dataset:
api.validate_dataset_string(dataset)
dataset_urls = dataset.split("/")
dataset_slugs = KaggleDataset(dataset_urls[0], dataset_urls[1])
return dataset_slugs
return KaggleDataset(owner="", name="")
def upload_file(
dataset_id: str, remote_file_name: str, path: str, context: ExecutionContext
):
"""Check whether the file is downloaded or not"""
file_path = os.path.join(path, remote_file_name)
try:
if os.path.isfile(file_path):
create_resource_from_file(
dataset_id=dataset_id, remote_file_name=file_path, context=context
)
elif os.path.isfile(get_zip_file_path(file_path)):
unzip_file(get_zip_file_path(file_path))
upload_file(
dataset_id=dataset_id,
remote_file_name=remote_file_name,
path=path,
context=context,
)
else:
raise FileNotFoundError
except FileNotFoundError:
files = os.listdir(path)
paths = [os.path.join(path, file) for file in files]
summary = [("Files in the downloaded directory", list_to_string(paths))]
context.report.update(
ExecutionReport(
entity_count=0,
operation="write",
operation_desc="failed",
summary=summary,
)
)
def get_zip_file_path(file_name) -> str:
"""Returns the zip of a file name"""
return f"{file_name}.zip"
def unzip_file(file_path):
"""Unzip the file"""
with ZipFile(file_path, "r") as zip_file:
zip_file.extractall(os.path.dirname(file_path))
zip_file.close()
def create_resource_from_file(
dataset_id: str, remote_file_name: str, context: ExecutionContext
):
"""Create Resource"""
with open(remote_file_name, "rb") as response_file:
write_to_dataset(
dataset_id=dataset_id, file_resource=response_file, context=context.user
)
def list_to_string(query_list: list[str]):
"""Converts each query term to a single search term"""
string_join = ""
return string_join.join(query_list)
def auth(username: str, api_key: str):
"""Kaggle Authenticate"""
# Set environment variables
os.environ["KAGGLE_USERNAME"] = username
os.environ["KAGGLE_KEY"] = api_key
api.authenticate()
def search(query_terms: list[str]):
"""Kaggle Dataset Search"""
try:
datasets = api.dataset_list(search=list_to_string(query_list=query_terms))
return datasets
except ApiException:
raise ValueError("Failed to authenticate with Kaggle API") from ApiException
def list_files(dataset):
"""List Dataset Files"""
files = api.dataset_list_files(dataset).files
if len(files) != 0:
return files
return None
class DatasetFileType(DatasetParameterType):
"""Dataset File Type"""
def __init__(self, dependent_params: list[str]):
super().__init__()
self.autocompletion_depends_on_parameters = dependent_params
def autocomplete(
self,
query_terms: list[str],
depend_on_parameter_values: list[Any],
context: PluginContext,
) -> list[Autocompletion]:
try:
self.dataset_type = DATASET_TYPES[
depend_on_parameter_values[0].split(".")[-1]
]
except KeyError:
self.dataset_type = ""
return super().autocomplete( # type: ignore
query_terms, depend_on_parameter_values, context
)
class DatasetFile(StringParameterType):
"""Kaggle Dataset File Autocomplete"""
autocompletion_depends_on_parameters: list[str] = ["kaggle_dataset"]
# auto complete for values
allow_only_autocompleted_values: bool = True
# auto complete for labels
autocomplete_value_with_labels: bool = True
def autocomplete(
self,
query_terms: list[str],
depend_on_parameter_values: list[Any],
context: PluginContext,
) -> list[Autocompletion]:
if not depend_on_parameter_values:
raise ValueError("Select dataset before choosing a file")
result = []
files = list_files(dataset=depend_on_parameter_values[0])
count_csv = sum(1 for file in files if str(file).endswith(".csv"))
can_support_multi_csv = count_csv == len(files) > 1
if can_support_multi_csv:
slug = get_slugs(depend_on_parameter_values[0])
result.append(
Autocompletion(
value=f"{slug.name}.zip",
label="Download all csv files as a Zip file",
)
)
for file in files:
result.append(Autocompletion(value=f"{file}", label=f"{file}"))
if len(result) != 0:
result.sort(key=lambda x: x.label) # type: ignore
else:
result.append(
Autocompletion(value="", label="No files found for this dataset")
)
return result
class KaggleSearch(StringParameterType):
"""Kaggle Search Type"""
autocompletion_depends_on_parameters: list[str] = ["username", "api_key"]
# auto complete for values
allow_only_autocompleted_values: bool = True
# auto complete for labels
autocomplete_value_with_labels: bool = True
def autocomplete(
self,
query_terms: list[str],
depend_on_parameter_values: list[Any],
context: PluginContext,
) -> list[Autocompletion]:
auth(depend_on_parameter_values[0], depend_on_parameter_values[1].decrypt())
result = []
if len(query_terms) != 0:
datasets = search(query_terms=query_terms)
for dataset in datasets:
slug = get_slugs(str(dataset))
result.append(
Autocompletion(
value=f"{slug.owner}/{slug.name}",
label=f"{slug.owner}/{slug.name}",
)
)
result.sort(key=lambda x: x.label) # type: ignore
return result
if len(query_terms) == 0:
label = "Search for kaggle datasets"
result.append(Autocompletion(value="", label=f"{label}"))
result.sort(key=lambda x: x.label) # type: ignore
return result
@Plugin(
label="Kaggle",
plugin_id="cmem_plugin_kaggle",
description="Import dataset resources from Kaggle.",
documentation="""
This workflow operator downloads a dataset from the Kaggle library.
To download datasets, you will need your Kaggle username and API Key,
which you can obtain from the [Kaggle Public API](https://www.kaggle.com/docs/api).
""",
parameters=[
PluginParameter(
name="username",
label="Kaggle Username",
description="Username of Kaggle Account",
),
PluginParameter(
name="api_key",
label="Kaggle Key",
description="API Token of Kaggle Account",
),
PluginParameter(
name="kaggle_dataset",
label="Kaggle Dataset",
description="Name of the dataset to be needed",
param_type=KaggleSearch(),
),
PluginParameter(
name="file_name",
label="File Name",
description="Name of the file to be downloaded",
param_type=DatasetFile(),
),
PluginParameter(
name="dataset",
label="Dataset",
description="To which Dataset to write the response",
param_type=DatasetFileType(dependent_params=["file_name"]),
),
],
)
class KaggleImport(WorkflowPlugin):
"""Example Workflow Plugin: Kaggle Dataset"""
# pylint: disable=too-many-arguments
def __init__(
self,
username: str,
api_key: Password,
kaggle_dataset: str,
file_name: str,
dataset: str,
) -> None:
self.username = username
self.api_key = api_key
api.validate_dataset_string(dataset=kaggle_dataset)
if not file_name.endswith(".zip"):
if self.validate_file_name(dataset=kaggle_dataset, file_name=file_name):
raise ValueError(
"The specified file doesn't exists in the specified "
f"dataset and it must be from "
f"{list_files(kaggle_dataset)}"
)
self.kaggle_dataset = kaggle_dataset
self.file_name = file_name
self.dataset = dataset
def execute(self, inputs: Sequence[Entities], context: ExecutionContext) -> None:
summary: list[Tuple[str, str]] = []
warnings: list[str] = []
if context.user is None:
warnings.append("User info not available")
else:
summary.append(("Executed by", context.user.user_uri()))
self.log.info("Start loading kaggle dataset.")
dataset_id = f"{context.task.project_id()}:{self.dataset}"
dataset_file_name = self.get_downloadable_file_name()
with tempfile.TemporaryDirectory() as temp_dir:
context.report.update(
ExecutionReport(
operation="wait",
operation_desc=f"{dataset_file_name} downloading",
)
)
self.download_files(
dataset=self.kaggle_dataset, file_name=dataset_file_name, path=temp_dir
)
time.sleep(1)
upload_file(
dataset_id=dataset_id,
remote_file_name=dataset_file_name,
path=temp_dir,
context=context,
)
summary.append(("Kaggle Dataset", self.kaggle_dataset))
summary.append(("File", dataset_file_name))
summary.append(("Dataset ID", dataset_id))
context.report.update(
ExecutionReport(
entity_count=1,
operation="write",
operation_desc=f"{dataset_file_name} downloaded",
summary=summary,
warnings=warnings,
)
)
def get_downloadable_file_name(self) -> str:
"""Get the file name for the dataset"""
dataset_filename = ""
if "" in self.file_name:
dataset_filename = self.file_name.replace(" ", "%20")
if "." in dataset_filename:
file_type = dataset_filename.split(".")[-1]
if file_type in DATASET_TYPES:
return dataset_filename
return f"{get_slugs(self.kaggle_dataset).name}.zip"
def validate_file_name(self, dataset: str, file_name: str) -> bool:
"""Validate File Exists"""
auth(self.username, self.api_key.decrypt())
files = list_files(dataset=dataset)
for file in files:
if str(file).lower() == file_name.lower():
return False
return True
def download_files(self, dataset, file_name, path):
"""Kaggle Single Dataset File Download"""
auth(self.username, self.api_key.decrypt())
if file_name.endswith(".zip"):
api.dataset_download_files(dataset=dataset, path=path)
else:
api.dataset_download_file(dataset=dataset, file_name=file_name, path=path)