diff --git a/aixplain/modules/corpus.py b/aixplain/modules/corpus.py index 0a95d714..f023dc4b 100644 --- a/aixplain/modules/corpus.py +++ b/aixplain/modules/corpus.py @@ -20,13 +20,18 @@ Description: Corpus Class """ +import logging + from aixplain.enums.function import Function from aixplain.enums.license import License from aixplain.enums.onboard_status import OnboardStatus from aixplain.enums.privacy import Privacy from aixplain.modules.asset import Asset from aixplain.modules.data import Data +from aixplain.utils import config +from aixplain.utils.file_utils import _request_with_retry from typing import Any, List, Optional, Text +from urllib.parse import urljoin class Corpus(Asset): @@ -76,3 +81,17 @@ def __init__( self.data = data self.length = length self.kwargs = kwargs + + def delete(self) -> None: + """Delete Corpus service""" + try: + url = urljoin(config.BACKEND_URL, f"sdk/corpora/{self.id}") + headers = {"Authorization": f"Token {config.TEAM_API_KEY}", "Content-Type": "application/json"} + logging.info(f"Start service for DELETE Corpus - {url} - {headers}") + r = _request_with_retry("delete", url, headers=headers) + if r.status_code != 200: + raise Exception() + except Exception: + message = "Corpus Deletion Error: Make sure the corpus exists and you are the owner." + logging.error(message) + raise Exception(f"{message}") diff --git a/aixplain/modules/dataset.py b/aixplain/modules/dataset.py index c78e5b3a..fd79e9f3 100644 --- a/aixplain/modules/dataset.py +++ b/aixplain/modules/dataset.py @@ -21,13 +21,17 @@ Datasets Class """ +import logging + from aixplain.enums.function import Function from aixplain.enums.license import License from aixplain.enums.onboard_status import OnboardStatus from aixplain.enums.privacy import Privacy from aixplain.modules.asset import Asset from aixplain.modules.data import Data -from aixplain.utils.file_utils import _request_with_retry, save_file +from aixplain.utils import config +from aixplain.utils.file_utils import _request_with_retry +from urllib.parse import urljoin from typing import Any, Dict, List, Optional, Text @@ -115,3 +119,17 @@ def __init__( def __repr__(self): return f"" + + def delete(self) -> None: + """Delete Dataset service""" + try: + url = urljoin(config.BACKEND_URL, f"sdk/datasets/{self.id}") + headers = {"Authorization": f"Token {config.TEAM_API_KEY}", "Content-Type": "application/json"} + logging.info(f"Start service for DELETE Dataset - {url} - {headers}") + r = _request_with_retry("delete", url, headers=headers) + if r.status_code != 200: + raise Exception() + except Exception: + message = "Dataset Deletion Error: Make sure the dataset exists and you are the owner." + logging.error(message) + raise Exception(f"{message}") diff --git a/aixplain/processes/data_onboarding/onboard_functions.py b/aixplain/processes/data_onboarding/onboard_functions.py index 22126a79..0c038a1f 100644 --- a/aixplain/processes/data_onboarding/onboard_functions.py +++ b/aixplain/processes/data_onboarding/onboard_functions.py @@ -155,7 +155,7 @@ def build_payload_corpus(corpus: Corpus, ref_data: List[Text], error_handler: Er "suggestedFunctions": [f.value for f in corpus.functions], "onboardingErrorsPolicy": error_handler.value, "tags": corpus.tags, - "pricing": {"type": "FREE", "cost": 0}, + "pricing": {"usage": ["benchmark", "finetune"], "type": "free", "cost": 0}, "privacy": corpus.privacy.value, "license": {"typeId": corpus.license.value}, "refData": ref_data, diff --git a/aixplain/processes/data_onboarding/process_interval_files.py b/aixplain/processes/data_onboarding/process_interval_files.py new file mode 100644 index 00000000..576cd434 --- /dev/null +++ b/aixplain/processes/data_onboarding/process_interval_files.py @@ -0,0 +1,190 @@ +__author__ = "thiagocastroferreira" + +import json +import logging +import os +import pandas as pd +import tarfile + +from aixplain.enums.data_type import DataType +from aixplain.enums.file_type import FileType +from aixplain.enums.storage_type import StorageType +from aixplain.modules.content_interval import ( + ContentInterval, + AudioContentInterval, + ImageContentInterval, + TextContentInterval, + VideoContentInterval, +) +from aixplain.modules.file import File +from aixplain.modules.metadata import MetaData +from aixplain.utils.file_utils import upload_data +from pathlib import Path +from tqdm import tqdm +from typing import Any, Dict, List, Text, Tuple + + +def compress_folder(folder_path: str): + with tarfile.open(folder_path + ".tgz", "w:gz") as tar: + for name in os.listdir(folder_path): + tar.add(os.path.join(folder_path, name)) + return folder_path + ".tgz" + + +def process_interval(interval: Any, storage_type: StorageType, interval_folder: Text) -> List[Dict]: + """Process text files + + Args: + intervals (Any): content intervals to process the content + storage_type (StorageType): type of storage: URL, local path or textual content + + Returns: + List[Dict]: content interval + """ + if storage_type == StorageType.FILE: + # Check the size of file and assert a limit of 50 MB + assert ( + os.path.getsize(interval.content) <= 25000000 + ), f'Data Asset Onboarding Error: Local text file "{interval}" exceeds the size limit of 25 MB.' + fname = os.path.basename(interval) + new_path = os.path.join(audio_folder, fname) + if os.path.exists(new_path) is False: + shutil.copy2(audio_path, new_path) + return [interval.__dict__ for interval in intervals] + + +def validate_format(index: int, interval: Dict, metadata: MetaData) -> ContentInterval: + """Validate the interval format + + Args: + index (int): row index + interval (Dict): interval to be validated + metadata (MetaData): metadata + + Returns: + ContentInterval: _description_ + """ + if metadata.dtype == DataType.AUDIO_INTERVAL: + try: + if isinstance(interval, list): + interval = [AudioContentInterval(**interval_) for interval_ in interval] + else: + interval = [AudioContentInterval(**interval)] + except Exception as e: + message = f'Data Asset Onboarding Error: Audio Interval in row {index} of Column "{metadata.name}" is not following the format. Check the "AudioContentInterval" class for the correct format.' + logging.exception(message) + raise Exception(message) + elif metadata.dtype == DataType.IMAGE_INTERVAL: + try: + if isinstance(interval, list): + interval = [ImageContentInterval(**interval_) for interval_ in interval] + else: + interval = [ImageContentInterval(**interval)] + except Exception as e: + message = f'Data Asset Onboarding Error: Image Interval in row {index} of Column "{metadata.name}" is not following the format. Check the "ImageContentInterval" class for the correct format.' + logging.exception(message) + raise Exception(message) + elif metadata.dtype == DataType.TEXT_INTERVAL: + try: + if isinstance(interval, list): + interval = [TextContentInterval(**interval_) for interval_ in interval] + else: + interval = [TextContentInterval(**interval)] + except Exception as e: + message = f'Data Asset Onboarding Error: Text Interval in row {index} of Column "{metadata.name}" is not following the format. Check the "TextContentInterval" class for the correct format.' + logging.exception(message) + raise Exception(message) + elif metadata.dtype == DataType.VIDEO_INTERVAL: + try: + if isinstance(interval, list): + interval = [VideoContentInterval(**interval_) for interval_ in interval] + else: + interval = [VideoContentInterval(**interval)] + except Exception as e: + message = f'Data Asset Onboarding Error: Video Interval in row {index} of Column "{metadata.name}" is not following the format. Check the "VideoContentInterval" class for the correct format.' + logging.exception(message) + raise Exception(message) + return interval + + +def run(metadata: MetaData, paths: List, folder: Path, batch_size: int = 1000) -> Tuple[List[File], int, int]: + """Process a list of local interval files, compress and upload them to pre-signed URLs in S3 + + Explanation: + Each interval on "paths" is processed. If the interval content is in a public link or local file, it will be downloaded and added to an index CSV file. + The intervals are processed in batches such that at each "batch_size" texts, the index CSV file is uploaded into a pre-signed URL in s3 and reset. + + Args: + metadata (MetaData): meta data of the asset + paths (List): list of paths to local files + folder (Path): local folder to save compressed files before upload them to s3. + + Returns: + Tuple[List[File], int, int]: list of s3 links, data colum index and number of rows + """ + logging.debug(f'Data Asset Onboarding: Processing "{metadata.name}".') + interval_folder = Path(".") + if metadata.storage_type in [StorageType.FILE, StorageType.TEXT]: + interval_folder = Path(os.path.join(folder, "data")) + interval_folder.mkdir(exist_ok=True) + + idx = 0 + data_column_idx = -1 + files, batch = [], [] + for i in tqdm(range(len(paths)), desc=f' Data "{metadata.name}" onboarding progress', position=1, leave=False): + path = paths[i] + try: + dataframe = pd.read_csv(path) + except Exception as e: + message = f'Data Asset Onboarding Error: Local file "{path}" not found.' + logging.exception(message) + raise Exception(message) + + # process intervals + for j in tqdm(range(len(dataframe)), desc=" File onboarding progress", position=2, leave=False): + row = dataframe.iloc[j] + try: + interval = row[metadata.name] + except Exception as e: + message = f'Data Asset Onboarding Error: Column "{metadata.name}" not found in the local file {path}.' + logging.exception(message) + raise Exception(message) + + # interval = validate_format(index=j, interval=interval, metadata=metadata) + + try: + interval = process_interval(interval, metadata.storage_type) + batch.append(interval) + except Exception as e: + logging.exception(e) + raise Exception(e) + + idx += 1 + if ((idx) % batch_size) == 0: + batch_index = str(len(files) + 1).zfill(8) + file_name = f"{folder}/{metadata.name}-{batch_index}.csv.gz" + + df = pd.DataFrame({metadata.name: batch}) + start, end = idx - len(batch), idx + df["@INDEX"] = range(start, end) + df.to_csv(file_name, compression="gzip", index=False) + s3_link = upload_data(file_name, content_type="text/csv", content_encoding="gzip") + files.append(File(path=s3_link, extension=FileType.CSV, compression="gzip")) + # get data column index + data_column_idx = df.columns.to_list().index(metadata.name) + batch = [] + + if len(batch) > 0: + batch_index = str(len(files) + 1).zfill(8) + file_name = f"{folder}/{metadata.name}-{batch_index}.csv.gz" + + df = pd.DataFrame({metadata.name: batch}) + start, end = idx - len(batch), idx + df["@INDEX"] = range(start, end) + df.to_csv(file_name, compression="gzip", index=False) + s3_link = upload_data(file_name, content_type="text/csv", content_encoding="gzip") + files.append(File(path=s3_link, extension=FileType.CSV, compression="gzip")) + # get data column index + data_column_idx = df.columns.to_list().index(metadata.name) + batch = [] + return files, data_column_idx, idx diff --git a/tests/functional/data_asset/corpus_onboarding_test.py b/tests/functional/data_asset/corpus_onboarding_test.py index 93a32c47..2d3179f1 100644 --- a/tests/functional/data_asset/corpus_onboarding_test.py +++ b/tests/functional/data_asset/corpus_onboarding_test.py @@ -17,13 +17,13 @@ """ import pytest -from aixplain.enums.language import Language -from aixplain.enums.license import License +import time +from aixplain.enums import Language, License, OnboardStatus from aixplain.factories.corpus_factory import CorpusFactory from uuid import uuid4 -def test_corpus_onboard(): +def test_corpus_onboard_get_delete(): upload_file = "tests/functional/data_asset/input/audio-en_url.csv" schema = [ { @@ -37,14 +37,25 @@ def test_corpus_onboard(): {"name": "text", "dtype": "text", "storage_type": "text", "languages": [Language.English_UNITED_STATES]}, ] - payload = CorpusFactory.create( + response = CorpusFactory.create( name=str(uuid4()), description="This corpus contain 20 English audios with their corresponding transcriptions.", license=License.MIT, content_path=upload_file, schema=schema, ) - assert payload["status"] == "onboarding" + asset_id = response["asset_id"] + onboard_status = OnboardStatus(response["status"]) + while onboard_status == OnboardStatus.ONBOARDING: + corpus = CorpusFactory.get(asset_id) + onboard_status = corpus.onboard_status + time.sleep(1) + # assert the asset was onboarded + assert onboard_status == OnboardStatus.ONBOARDED + # assert the asset was deleted + corpus.delete() + with pytest.raises(Exception): + corpus = CorpusFactory.get(asset_id) def test_corpus_listing(): diff --git a/tests/functional/data_asset/dataset_onboarding_test.py b/tests/functional/data_asset/dataset_onboarding_test.py index 577bd42d..539bb0e9 100644 --- a/tests/functional/data_asset/dataset_onboarding_test.py +++ b/tests/functional/data_asset/dataset_onboarding_test.py @@ -17,8 +17,10 @@ """ import pytest +import time + from uuid import uuid4 -from aixplain.enums import Function, Language, License, Privacy, DataSubtype, DataType, StorageType +from aixplain.enums import Function, Language, License, Privacy, DataSubtype, DataType, StorageType, OnboardStatus from aixplain.factories import DatasetFactory from aixplain.modules import MetaData @@ -52,7 +54,7 @@ def split(): ) -def test_dataset_onboard(meta1, meta2): +def test_dataset_onboard_get_delete(meta1, meta2): upload_file = "tests/functional/data_asset/input/audio-en_url.csv" response = DatasetFactory.create( @@ -66,7 +68,18 @@ def test_dataset_onboard(meta1, meta2): tags=[], privacy=Privacy.PRIVATE, ) - assert response["status"] == "onboarding" + asset_id = response["asset_id"] + onboard_status = OnboardStatus(response["status"]) + while onboard_status == OnboardStatus.ONBOARDING: + dataset = DatasetFactory.get(asset_id) + onboard_status = dataset.onboard_status + time.sleep(1) + # assert the asset was onboarded + assert onboard_status == OnboardStatus.ONBOARDED + # assert the asset was deleted + dataset.delete() + with pytest.raises(Exception): + dataset = DatasetFactory.get(asset_id) def test_invalid_dataset_onboard(meta1, meta2): @@ -138,3 +151,12 @@ def test_valid_dataset_splitting(meta1, meta2, split): ) assert response["status"] == "onboarding" + asset_id = response["asset_id"] + onboard_status = OnboardStatus(response["status"]) + while onboard_status == OnboardStatus.ONBOARDING: + dataset = DatasetFactory.get(asset_id) + onboard_status = dataset.onboard_status + time.sleep(1) + # assert the asset was onboarded + assert onboard_status == OnboardStatus.ONBOARDED + dataset.delete()