Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions aixplain/modules/corpus.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@
Description:
Corpus Class
"""
import logging

from aixplain.enums.function import Function
from aixplain.enums.license import License
from aixplain.enums.onboard_status import OnboardStatus
from aixplain.enums.privacy import Privacy
from aixplain.modules.asset import Asset
from aixplain.modules.data import Data
from aixplain.utils import config
from aixplain.utils.file_utils import _request_with_retry
from typing import Any, List, Optional, Text
from urllib.parse import urljoin


class Corpus(Asset):
Expand Down Expand Up @@ -76,3 +81,17 @@ def __init__(
self.data = data
self.length = length
self.kwargs = kwargs

def delete(self) -> None:
"""Delete Corpus service"""
try:
url = urljoin(config.BACKEND_URL, f"sdk/corpora/{self.id}")
headers = {"Authorization": f"Token {config.TEAM_API_KEY}", "Content-Type": "application/json"}
logging.info(f"Start service for DELETE Corpus - {url} - {headers}")
r = _request_with_retry("delete", url, headers=headers)
if r.status_code != 200:
raise Exception()
except Exception:
message = "Corpus Deletion Error: Make sure the corpus exists and you are the owner."
logging.error(message)
raise Exception(f"{message}")
20 changes: 19 additions & 1 deletion aixplain/modules/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@
Datasets Class
"""

import logging

from aixplain.enums.function import Function
from aixplain.enums.license import License
from aixplain.enums.onboard_status import OnboardStatus
from aixplain.enums.privacy import Privacy
from aixplain.modules.asset import Asset
from aixplain.modules.data import Data
from aixplain.utils.file_utils import _request_with_retry, save_file
from aixplain.utils import config
from aixplain.utils.file_utils import _request_with_retry
from urllib.parse import urljoin
from typing import Any, Dict, List, Optional, Text


Expand Down Expand Up @@ -115,3 +119,17 @@ def __init__(

def __repr__(self):
return f"<Dataset: {self.name}>"

def delete(self) -> None:
"""Delete Dataset service"""
try:
url = urljoin(config.BACKEND_URL, f"sdk/datasets/{self.id}")
headers = {"Authorization": f"Token {config.TEAM_API_KEY}", "Content-Type": "application/json"}
logging.info(f"Start service for DELETE Dataset - {url} - {headers}")
r = _request_with_retry("delete", url, headers=headers)
if r.status_code != 200:
raise Exception()
except Exception:
message = "Dataset Deletion Error: Make sure the dataset exists and you are the owner."
logging.error(message)
raise Exception(f"{message}")
2 changes: 1 addition & 1 deletion aixplain/processes/data_onboarding/onboard_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def build_payload_corpus(corpus: Corpus, ref_data: List[Text], error_handler: Er
"suggestedFunctions": [f.value for f in corpus.functions],
"onboardingErrorsPolicy": error_handler.value,
"tags": corpus.tags,
"pricing": {"type": "FREE", "cost": 0},
"pricing": {"usage": ["benchmark", "finetune"], "type": "free", "cost": 0},
"privacy": corpus.privacy.value,
"license": {"typeId": corpus.license.value},
"refData": ref_data,
Expand Down
190 changes: 190 additions & 0 deletions aixplain/processes/data_onboarding/process_interval_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
__author__ = "thiagocastroferreira"

import json
import logging
import os
import pandas as pd
import tarfile

from aixplain.enums.data_type import DataType
from aixplain.enums.file_type import FileType
from aixplain.enums.storage_type import StorageType
from aixplain.modules.content_interval import (
ContentInterval,
AudioContentInterval,
ImageContentInterval,
TextContentInterval,
VideoContentInterval,
)
from aixplain.modules.file import File
from aixplain.modules.metadata import MetaData
from aixplain.utils.file_utils import upload_data
from pathlib import Path
from tqdm import tqdm
from typing import Any, Dict, List, Text, Tuple


def compress_folder(folder_path: str):
with tarfile.open(folder_path + ".tgz", "w:gz") as tar:
for name in os.listdir(folder_path):
tar.add(os.path.join(folder_path, name))
return folder_path + ".tgz"


def process_interval(interval: Any, storage_type: StorageType, interval_folder: Text) -> List[Dict]:
"""Process text files

Args:
intervals (Any): content intervals to process the content
storage_type (StorageType): type of storage: URL, local path or textual content

Returns:
List[Dict]: content interval
"""
if storage_type == StorageType.FILE:
# Check the size of file and assert a limit of 50 MB
assert (
os.path.getsize(interval.content) <= 25000000
), f'Data Asset Onboarding Error: Local text file "{interval}" exceeds the size limit of 25 MB.'
fname = os.path.basename(interval)
new_path = os.path.join(audio_folder, fname)
if os.path.exists(new_path) is False:
shutil.copy2(audio_path, new_path)
return [interval.__dict__ for interval in intervals]


def validate_format(index: int, interval: Dict, metadata: MetaData) -> ContentInterval:
"""Validate the interval format

Args:
index (int): row index
interval (Dict): interval to be validated
metadata (MetaData): metadata

Returns:
ContentInterval: _description_
"""
if metadata.dtype == DataType.AUDIO_INTERVAL:
try:
if isinstance(interval, list):
interval = [AudioContentInterval(**interval_) for interval_ in interval]
else:
interval = [AudioContentInterval(**interval)]
except Exception as e:
message = f'Data Asset Onboarding Error: Audio Interval in row {index} of Column "{metadata.name}" is not following the format. Check the "AudioContentInterval" class for the correct format.'
logging.exception(message)
raise Exception(message)
elif metadata.dtype == DataType.IMAGE_INTERVAL:
try:
if isinstance(interval, list):
interval = [ImageContentInterval(**interval_) for interval_ in interval]
else:
interval = [ImageContentInterval(**interval)]
except Exception as e:
message = f'Data Asset Onboarding Error: Image Interval in row {index} of Column "{metadata.name}" is not following the format. Check the "ImageContentInterval" class for the correct format.'
logging.exception(message)
raise Exception(message)
elif metadata.dtype == DataType.TEXT_INTERVAL:
try:
if isinstance(interval, list):
interval = [TextContentInterval(**interval_) for interval_ in interval]
else:
interval = [TextContentInterval(**interval)]
except Exception as e:
message = f'Data Asset Onboarding Error: Text Interval in row {index} of Column "{metadata.name}" is not following the format. Check the "TextContentInterval" class for the correct format.'
logging.exception(message)
raise Exception(message)
elif metadata.dtype == DataType.VIDEO_INTERVAL:
try:
if isinstance(interval, list):
interval = [VideoContentInterval(**interval_) for interval_ in interval]
else:
interval = [VideoContentInterval(**interval)]
except Exception as e:
message = f'Data Asset Onboarding Error: Video Interval in row {index} of Column "{metadata.name}" is not following the format. Check the "VideoContentInterval" class for the correct format.'
logging.exception(message)
raise Exception(message)
return interval


def run(metadata: MetaData, paths: List, folder: Path, batch_size: int = 1000) -> Tuple[List[File], int, int]:
"""Process a list of local interval files, compress and upload them to pre-signed URLs in S3

Explanation:
Each interval on "paths" is processed. If the interval content is in a public link or local file, it will be downloaded and added to an index CSV file.
The intervals are processed in batches such that at each "batch_size" texts, the index CSV file is uploaded into a pre-signed URL in s3 and reset.

Args:
metadata (MetaData): meta data of the asset
paths (List): list of paths to local files
folder (Path): local folder to save compressed files before upload them to s3.

Returns:
Tuple[List[File], int, int]: list of s3 links, data colum index and number of rows
"""
logging.debug(f'Data Asset Onboarding: Processing "{metadata.name}".')
interval_folder = Path(".")
if metadata.storage_type in [StorageType.FILE, StorageType.TEXT]:
interval_folder = Path(os.path.join(folder, "data"))
interval_folder.mkdir(exist_ok=True)

idx = 0
data_column_idx = -1
files, batch = [], []
for i in tqdm(range(len(paths)), desc=f' Data "{metadata.name}" onboarding progress', position=1, leave=False):
path = paths[i]
try:
dataframe = pd.read_csv(path)
except Exception as e:
message = f'Data Asset Onboarding Error: Local file "{path}" not found.'
logging.exception(message)
raise Exception(message)

# process intervals
for j in tqdm(range(len(dataframe)), desc=" File onboarding progress", position=2, leave=False):
row = dataframe.iloc[j]
try:
interval = row[metadata.name]
except Exception as e:
message = f'Data Asset Onboarding Error: Column "{metadata.name}" not found in the local file {path}.'
logging.exception(message)
raise Exception(message)

# interval = validate_format(index=j, interval=interval, metadata=metadata)

try:
interval = process_interval(interval, metadata.storage_type)
batch.append(interval)
except Exception as e:
logging.exception(e)
raise Exception(e)

idx += 1
if ((idx) % batch_size) == 0:
batch_index = str(len(files) + 1).zfill(8)
file_name = f"{folder}/{metadata.name}-{batch_index}.csv.gz"

df = pd.DataFrame({metadata.name: batch})
start, end = idx - len(batch), idx
df["@INDEX"] = range(start, end)
df.to_csv(file_name, compression="gzip", index=False)
s3_link = upload_data(file_name, content_type="text/csv", content_encoding="gzip")
files.append(File(path=s3_link, extension=FileType.CSV, compression="gzip"))
# get data column index
data_column_idx = df.columns.to_list().index(metadata.name)
batch = []

if len(batch) > 0:
batch_index = str(len(files) + 1).zfill(8)
file_name = f"{folder}/{metadata.name}-{batch_index}.csv.gz"

df = pd.DataFrame({metadata.name: batch})
start, end = idx - len(batch), idx
df["@INDEX"] = range(start, end)
df.to_csv(file_name, compression="gzip", index=False)
s3_link = upload_data(file_name, content_type="text/csv", content_encoding="gzip")
files.append(File(path=s3_link, extension=FileType.CSV, compression="gzip"))
# get data column index
data_column_idx = df.columns.to_list().index(metadata.name)
batch = []
return files, data_column_idx, idx
21 changes: 16 additions & 5 deletions tests/functional/data_asset/corpus_onboarding_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
"""

import pytest
from aixplain.enums.language import Language
from aixplain.enums.license import License
import time
from aixplain.enums import Language, License, OnboardStatus
from aixplain.factories.corpus_factory import CorpusFactory
from uuid import uuid4


def test_corpus_onboard():
def test_corpus_onboard_get_delete():
upload_file = "tests/functional/data_asset/input/audio-en_url.csv"
schema = [
{
Expand All @@ -37,14 +37,25 @@ def test_corpus_onboard():
{"name": "text", "dtype": "text", "storage_type": "text", "languages": [Language.English_UNITED_STATES]},
]

payload = CorpusFactory.create(
response = CorpusFactory.create(
name=str(uuid4()),
description="This corpus contain 20 English audios with their corresponding transcriptions.",
license=License.MIT,
content_path=upload_file,
schema=schema,
)
assert payload["status"] == "onboarding"
asset_id = response["asset_id"]
onboard_status = OnboardStatus(response["status"])
while onboard_status == OnboardStatus.ONBOARDING:
corpus = CorpusFactory.get(asset_id)
onboard_status = corpus.onboard_status
time.sleep(1)
# assert the asset was onboarded
assert onboard_status == OnboardStatus.ONBOARDED
# assert the asset was deleted
corpus.delete()
with pytest.raises(Exception):
corpus = CorpusFactory.get(asset_id)


def test_corpus_listing():
Expand Down
28 changes: 25 additions & 3 deletions tests/functional/data_asset/dataset_onboarding_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
"""

import pytest
import time

from uuid import uuid4
from aixplain.enums import Function, Language, License, Privacy, DataSubtype, DataType, StorageType
from aixplain.enums import Function, Language, License, Privacy, DataSubtype, DataType, StorageType, OnboardStatus
from aixplain.factories import DatasetFactory
from aixplain.modules import MetaData

Expand Down Expand Up @@ -52,7 +54,7 @@ def split():
)


def test_dataset_onboard(meta1, meta2):
def test_dataset_onboard_get_delete(meta1, meta2):
upload_file = "tests/functional/data_asset/input/audio-en_url.csv"

response = DatasetFactory.create(
Expand All @@ -66,7 +68,18 @@ def test_dataset_onboard(meta1, meta2):
tags=[],
privacy=Privacy.PRIVATE,
)
assert response["status"] == "onboarding"
asset_id = response["asset_id"]
onboard_status = OnboardStatus(response["status"])
while onboard_status == OnboardStatus.ONBOARDING:
dataset = DatasetFactory.get(asset_id)
onboard_status = dataset.onboard_status
time.sleep(1)
# assert the asset was onboarded
assert onboard_status == OnboardStatus.ONBOARDED
# assert the asset was deleted
dataset.delete()
with pytest.raises(Exception):
dataset = DatasetFactory.get(asset_id)


def test_invalid_dataset_onboard(meta1, meta2):
Expand Down Expand Up @@ -138,3 +151,12 @@ def test_valid_dataset_splitting(meta1, meta2, split):
)

assert response["status"] == "onboarding"
asset_id = response["asset_id"]
onboard_status = OnboardStatus(response["status"])
while onboard_status == OnboardStatus.ONBOARDING:
dataset = DatasetFactory.get(asset_id)
onboard_status = dataset.onboard_status
time.sleep(1)
# assert the asset was onboarded
assert onboard_status == OnboardStatus.ONBOARDED
dataset.delete()