In [2]:
import datetime
import json
import uuid
import requests
import os
import typing

API_URL = "http://localhost:58491"


# Serialize UUID properties in json_body
def custom_serializer(obj):
    if isinstance(obj, uuid.UUID):
        return str(obj)
    elif isinstance(obj, datetime.datetime):
        return obj.isoformat()
    raise TypeError(
        f"Object of type {obj.__class__.__name__} is not JSON serializable"
    )


def _format_url(route: str, path: typing.Optional[str] = None):
    """Method to format url and parameters to be used in requests."""
    url = API_URL.rstrip("/") + "/" + route.lstrip("/")
    if path:
        url += "/" + path.lstrip("/")
    return url


def post_request(route, json_body):
    """Method to perform a POST request."""

    url = _format_url(route)

    response = requests.post(
        url, data=json.dumps(json_body, default=custom_serializer)
    )
    assert response.status_code == 201, response.text
    return response.json()


def patch_request(route, path, json_body):
    """Method to perform a PATCH request."""
    url = _format_url(route, path)

    response = requests.patch(
        url, data=json.dumps(json_body, default=custom_serializer)
    )

    assert response.status_code == 200, response.text
    return response.json()

def patch_request2(route: str, path: typing.Optional[str], json_body):
    """Method to perform a PATCH request."""
    url = _format_url(route, path)

    response = requests.patch(
        url, data=json.dumps(json_body, default=custom_serializer)
    )

    assert response.status_code == 200
    return response.json()


def get_request(route: str, path: typing.Optional[str] = None, params=None):
    """Method to perform a GET request."""
    url = _format_url(route, path)
    response = requests.get(url, params=params)
    return response.status_code, response.json()

def options_request(route: str, path: typing.Optional[str] = None, params=None):
    """Method to perform a OPTIONS request."""
    url = _format_url(route, path)
    response = requests.options(url, params=params)
    return response.status_code, response.json()


In [5]:
import json
import typing
import uuid
from app.collector.utils.constants_utils import (
    ASSET_ROUTE,
    CONNECTION_ROUTE,
    DATABASE_ROUTE,
    EXECUTION_ROUTE,
    INGESTION_ROUTE,
    PROVIDER_ROUTE,
    TABLE_ROUTE,
)
from app.schemas import (
    DatabaseItemSchema,
    DatabaseListSchema,
    DatabaseProviderConnectionItemSchema,
    DatabaseProviderIngestionExecutionItemSchema,
    DatabaseProviderIngestionItemSchema,
    DatabaseProviderItemSchema,
    DatabaseTableListSchema,
)


class AssetApiClient:
    @staticmethod
    def disable_many(ids: typing.List[typing.Union[str, uuid.UUID]]):
        status, info = patch_request(
            ASSET_ROUTE, path="disable-many", json_body=ids
        )
        return info


class DatabaseProviderApiClient:
    def get(self, provider_id: str):
        status, provider_info = get_request(PROVIDER_ROUTE, path=provider_id)
        return DatabaseProviderItemSchema(**provider_info)

    def get_ingestions(self, provider_id: str):
        status, ingestion_info = get_request(
            INGESTION_ROUTE, params={"provider_id": provider_id}
        )
        return [
            DatabaseProviderIngestionItemSchema(
                **{"provider_id": uuid.UUID(provider_id), **info}
            )
            for info in ingestion_info.get("items")
        ]

    def get_ingestion(self, ingestion_id: str):
        status, ingestion_info = get_request(INGESTION_ROUTE, path=ingestion_id)
        return DatabaseProviderIngestionItemSchema(**ingestion_info)

    def get_ingestion_execution(self, execution_id: str) -> DatabaseProviderIngestionExecutionItemSchema:
        status, execution_info = get_request(EXECUTION_ROUTE, path=execution_id)
        return DatabaseProviderIngestionExecutionItemSchema(**execution_info)

    def get_connections(self, provider_id: str):
        status, connection_info = get_request(
            CONNECTION_ROUTE, params={"provider_id": provider_id}
        )
        return [
            DatabaseProviderConnectionItemSchema(**info)
            for info in connection_info.get("items")
        ]


class DatabaseApiClient:
    def get(self, database_id: str):
        status, database_info = get_request(DATABASE_ROUTE, path=database_id)
        return DatabaseItemSchema(**database_info)

    def find_by_provider(self, provider_id: str):
        status, database_info = get_request(
            DATABASE_ROUTE, params={"provider_id": provider_id}
        )
        return [DatabaseListSchema(**info) for info in database_info["items"]]


class DatabaseTableApiClient:
    def find_by_database(self, database_id: str):
        status, database_info = get_request(
            TABLE_ROUTE, params={"database_id": database_id}
        )
        return [
            DatabaseTableListSchema(**info) for info in database_info["items"]
        ]


ImportError: cannot import name 'Annotated'

In [4]:
import app.collector.utils.constants_utils as constants

from app.collector.utils.api_client import (
    DatabaseProviderApiClient,
)

import app.collector.utils.constants_utils as constants
from app.collector.utils.cron_utils import check_if_cron_is_today

from app.models import (
    DatabaseProviderIngestionExecution,
    DatabaseProviderIngestionLog,
)
from app.collector.data_collection_engine import DataCollectionEngine
from app.collector.utils.logging_config import setup_collector_logger
from app.models import SchedulingType

KeyError: 'API_URL'