
<br>
Utility functions for byte-genie API<br>


In [None]:
import os
import json
import time
import inspect

In [None]:
import pandas as pd
import requests
import numpy as np
import utils.common
from utils.logging import logger
from utils.async_utils import to_async
from tenacity import retry, stop_after_attempt, stop_after_delay, wait_random_exponential, wait_fixed, wait_exponential

In [None]:
class ByteGenieResponse:
    def __init__(
            self,
            response: dict = None,
            verbose: int = 1,
    ):
        # if not isinstance(response, dict):
        #     raise ValueError('response must be a dictionary')
        self.response = response
        self.verbose = verbose
    def get_task_attr(self, attr: str):
        resp = self.response
        if isinstance(resp, dict):
            if 'response' in resp.keys():
                resp = resp['response']
                if isinstance(resp, dict):
                    if 'task_1' in resp.keys():
                        resp = resp['task_1']
                        if isinstance(resp, dict):
                            if 'task' in resp:
                                resp = resp['task']
                                if isinstance(resp, dict):
                                    if attr in resp:
                                        attr_val = resp[attr]
                                        return attr_val
    def get_response_attr(self, attr: str):
        resp = self.response
        if isinstance(resp, dict):
            if 'response' in resp.keys():
                resp = resp['response']
                if isinstance(resp, dict):
                    if 'task_1' in resp.keys():
                        resp = resp['task_1']
                        if isinstance(resp, dict):
                            if attr in resp:
                                attr_val = resp[attr]
                                return attr_val
    def set_response_attr(self, attr: str, attr_val):
        try:
            if isinstance(self.response, dict):
                if 'response' in self.response.keys():
                    if isinstance(self.response['response'], dict):
                        if 'task_1' in self.response['response'].keys():
                            if isinstance(self.response['response']['task_1'], dict):
                                self.response['response']['task_1'][attr] = attr_val
        except Exception as e:
            if self.verbose:
                logger.error(f"Error in set_response_attr: {e}")
    def get_status(self):
        """
        Get the status of the task.
        Note that this is the status of the task at the time API call was made.
        In case a task was scheduled initially, even when the task is complete, the output of this method will not change.
        For such tasks, use check_output_file_exists() to check whether a task has finished generating its output.
        :return:
        """
        status = self.get_response_attr(attr='status')
        return status
    def get_data(self):
        """
        Get data returned in ByteGenie response.
        Note that this method only gets data that was returned from the api call,
        and in case a task is scheduled, even when the task output is ready, output of this method will not change.
        Use read_output_data() to read the current output of such scheduled tasks.
        :return:
        """
        data = self.get_response_attr(attr='data')
        for i in np.arange(0, 2, 1):
            if isinstance(data, dict):
                if 'data' in data.keys():
                    data = data['data']
        return data
    def get_output_file(
            self,
    ):
        """
        Get the output file of a task
        :return:
        """
        output_file = self.get_task_attr(attr='output_file')
        return output_file
    def get_start_time(self):
        """
        Get start time of a task
        :return:
        """
        start_time = self.get_task_attr(attr='start_time')
        return start_time
    def check_output_file_exists(self):
        """
        Check if the output file exists.
        This is the recommended method to check if the output of a task is complete.
        :return:
        """
        bg = ByteGenie(
            task_mode='sync',
        )
        output_file = self.get_output_file()
        if output_file is not None:
            resp = bg.check_file_exists(output_file)
            file_exists = resp.get_data()
        else:
            file_exists = False
        return file_exists
    def read_output_data(self):
        """
        Read output data from the task output file.
        This is the recommended method to read output for tasks that were previously scheduled.
        :return:
        """
        bg = ByteGenie(
            task_mode='sync',
        )
        if self.check_output_file_exists():
            resp = bg.read_file(self.get_output_file())
            resp_data = resp.get_data()
            return resp_data
        else:
            logger.warning(f"output does not yet exist: wait some more")
    @to_async
    def async_read_output_data(self):
        try:
            resp = self.read_output_data()
            return resp
        except Exception as e:
            if self.verbose:
                logger.warning(f"Error in read_output_data(): {e}")
    def get_output(self):
        """
        Returns the output data from the response if it is not None, otherwise reads it from the output file
        :return:
        """
        if self.get_data() is not None:
            return self.get_data()
        else:
            output_data = self.read_output_data()
            if output_data is not None:
                self.set_response_attr(attr='data', attr_val=output_data)
            return output_data
    def get_output_attr(self, attr: str):
        """
        Get a specific attribute from output, e.g. doc_name
        :param attr:
        :return:
        """
        output_data = self.get_output()
        if utils.common.is_convertible_to_df(output_data):
            output_data = pd.DataFrame(output_data)
            if attr in output_data.columns:
                attr_vals = output_data[attr].unique().tolist()
                return attr_vals
            else:
                logger.error(f"Attribute, {attr}, not found in output data; "
                             f"available attributes are: {list(output_data.columns)}")
        elif isinstance(output_data, dict):
            if attr in output_data.keys():
                attr_vals = output_data[attr]
                return attr_vals
            else:
                logger.error(f"Attribute, {attr}, not found in output data; "
                             f"available attributes are: {list(output_data.keys())}")

In [None]:
class ByteGenieResponses:
    def __init__(
            self,
            responses: list = None,
    ):
        if responses is not None:
            self.responses = [
                ByteGenieResponse(response=response.response)
                for response in responses
            ]
        else:
            self.responses = []
    def __getitem__(self, index):
        if isinstance(index, slice):
            # Handle slicing if needed
            return self.responses[index]
        else:
            return self.responses[index]
    def __setitem__(self, index, value):
        if isinstance(index, slice):
            # Handle slicing if needed
            self.responses[index] = value
        else:
            self.responses[index] = value
    def __len__(self):
        return len(self.responses)
    @staticmethod
    def concatenate_dict_output(outputs: list):
        keys = [list(out.keys()) for out in outputs]
        keys = [key for keys_ in keys for key in keys_]
        keys = list(set(keys))
        concatenated_dict = {}
        for key in keys:
            key_output = [out[key] for out in outputs]
            if all([isinstance(out, list) for out in key_output]):
                key_output = [out for outs in key_output for out in outs]
            concatenated_dict[key] = key_output
        return concatenated_dict
    def append(self, item):
        self.responses.append(item)
    def extend(self, items):
        self.responses.extend(items)
    def __repr__(self):
        return repr(self.responses)
    def __add__(self, other):
        if isinstance(other, ByteGenieResponses):
            return [resp for resp in self.responses] + [resp for resp in other.responses]
        else:
            logger.error(f"Responses to add are not of type `ByteGenieResponses`")
            raise TypeError("Unsupported operand type for +: object to add must be of type `ByteGenieResponses`")
    def read_output_data(self):
        tasks = [resp.async_read_output_data() for resp in self.responses]
        outputs = utils.async_utils.run_async_tasks(tasks)
        return outputs
    def get_output(self, concat: int = 0):
        outputs = [resp.get_output() for resp in self.responses]
        if concat:
            outputs = [outs for outs in outputs if outs is not None]
            if all([isinstance(out, list) for out in outputs]):
                outputs = [out for outs in outputs for out in outs]
            elif all([isinstance(out, dict) for out in outputs]):
                outputs = self.concatenate_dict_output(outputs=outputs)
        return outputs
    def get_output_attr(self, attr: str, concat: int = 0):
        attr_vals = [resp.get_output_attr(attr=attr) for resp in self.responses]
        if concat:
            attr_vals = [val for vals in attr_vals for val in vals]
        return attr_vals