In [None]:
#| default_exp utils.upload_data

In [None]:
# | exporti
import httpx
import pandas as pd
import asyncio

import domolibrary.client.Logger as lc
import domolibrary.client.DomoAuth as dmda
import domolibrary.classes.DomoDataset as dmds

In [None]:
#| export
async def upload_data(instance_auth : dmda.DomoAuth, # instance where the data_fn function will execute against
                      consol_auth : dmda.DomoAuth, # instance where the data should be accumulated
                      consol_ds : dmds.DomoDataset, 
                      partition_key: str, 
                      data_fn : callable,
                      is_index: bool = False,
                      debug_prn: bool = False,
                      debug_api:bool = False,
                      logger: lc.Logger = None):

    try:
        # await asyncio.sleep(randrange(5))
        if logger : 
            logger.log_info (f" Upload_data function - starting {instance_auth.domo_instance} - {data_fn.__name__}")
        if debug_prn:
            print(
                f"starting {instance_auth.domo_instance} - {data_fn.__name__}")

        instance_session = httpx.AsyncClient()

        upload_df = await data_fn(instance_auth, instance_session, debug_api = debug_api)

        if upload_df is None or len(upload_df.index) == 0:
            return None

        await instance_session.aclose()

        return await consol_ds.upload_data(upload_df=upload_df,
                                          upload_method='REPLACE',
                                          partition_key=partition_key,
                                          is_index=False,
                                          debug_api=debug_api,
                                          debug_prn=debug_prn)

    except Exception as e:
        print(f"upload_data : unexpected error: {e}")
        if logger : 
            logger.log_error(f"upload_data : unexpected error: {e}")
        return None

    finally:
        await instance_session.aclose()

In [None]:
# | export
async def upload_data_with_date(instance_auth,
                                consol_auth,
                                data_fn,
                                consol_ds,
                                partition_date_col,
                                partition_delimiter,
                                start_date,
                                end_date,
                                debug: bool = False,
                                debug_prn: bool = False):

    instance_session = httpx.AsyncClient()

    print(
        f"'🎬 upload_with_data: starting retrieval {start_date}, {end_date}, {instance_auth.domo_instance}")

    upload_df = await data_fn(instance_auth=instance_auth,
                              session=instance_session,
                              start_date=start_date,
                              end_date=end_date,
                              debug=debug)

    await instance_session.aclose()

    if not isinstance(upload_df, pd.DataFrame):
        print(f"🛑 error no data returned {instance_auth.domo_instance}")
        print(upload_df)
        return None

    if debug_prn:
        print(
            f'🧻 upload_with_data: starting upload {len(upload_df)} rows for {instance_auth.domo_instance}')

    task = []

    for index, partition_set in upload_df.drop_duplicates(subset=[partition_date_col]).iterrows():
        partition_date = partition_set[partition_date_col]

        partition_key = f"{instance_auth.domo_instance}{partition_delimiter}{str(partition_date)}"

        task.append(consol_ds.upload_data(upload_df=upload_df[(upload_df[partition_date_col] == partition_date)],
                                          upload_method='REPLACE',
                                          partition_key=partition_key,
                                          is_index=False,
                                          debug_api=debug_api,
                                          debug_prn=debug_prn
                                          ))

    res = await asyncio.gather(*task)

    if debug_prn:
        print(
            f'🎉 upload_with_data : finished uploading {len(upload_df)} rows for {instance_auth.domo_instance}')
    return res


In [None]:
import os

token_auth = dmda.DomoTokenAuth(
    domo_instance="domo-dojo", domo_access_token=os.environ["DOMO_DOJO_ACCESS_TOKEN"]
)

ds_id = 'cbae0e0c-a92d-4a4c-8d0c-c9ccd38fe928'

ds = await dmds.DomoDataset.get_from_id( dataset_id = ds_id , auth = token_auth )

df = pd.DataFrame([{"col_a" : "a", "col_b": "b", "col_c": "c"}])

await ds.upload_data( upload_df = df)

DatasetNotProvidedError: upload_data: dataset_id not provided at domo-dojo