In [None]:

from dotenv import load_dotenv

load_dotenv()

import requests
import os

AIRBYTE_OSS_BASE_URL='https://api.airbyte.com/v1/'


In [None]:


url = "sources"


def create_source(url:str, payload: dict, headers: dict):
     res = requests.post(url, json=payload, headers=headers)
     return res

payload = {
    "configuration": {
        "sourceType": "postgres",
        "host": os.environ["DB_HOST"],
        "port": int(os.environ["DB_PORT"]),
        "database":  os.environ["DB_NAME"],
        "username":  os.environ["DB_USER"],
        "password":  os.environ["DB_PASSWORD"],
        "ssl_mode": { "mode": "require" }
    },
    "name": "pg-source",
    "workspaceId": os.environ["WORKSPACE_ID"]
}
access_token = os.environ["ACCESS_TOKEN"]
headers = {
    "accept": "application/json",
    "content-type": "application/json",
    "authorization": f"Bearer {access_token}"
    }

res_source = create_source(url=AIRBYTE_OSS_BASE_URL+url, payload=payload, headers=headers)
print(res_source.text)

# response = requests.post(url, json=payload, headers=headers)

# print(response.text)

In [None]:


url = "destinations"


def create_destinations(url:str, payload: dict, headers: dict):
     res = requests.post(url, json=payload, headers=headers)
     return res

payload = {
    "configuration": {
        "destinationType": "redshift",
        "uploading_method": {
            "method": "S3 Staging",
            "s3_bucket_name":  os.environ["S3_NAME"],
            "s3_bucket_region":  os.environ["REGION"],
            "access_key_id":  os.environ["ACCESS_KEY_ID"],
            "secret_access_key": os.environ["SECRET_ACCESS_KEY"]
        },
        "host": os.environ["DEST_HOST"],
        "port": 5439,
        "database": os.environ["DEST_DB"],
        "schema": "public",
        "username": os.environ["DEST_USERNAME"],
        "password": os.environ["DEST_PASS"],
    },
    "name": "dest-db",
    "workspaceId": os.environ["WORKSPACE_ID"]
}

access_token = os.environ["ACCESS_TOKEN"]
headers = {
    "accept": "application/json",
    "content-type": "application/json",
    "authorization": f"Bearer {access_token}" 
}

res_dest = create_destinations(url=AIRBYTE_OSS_BASE_URL+url, payload=payload, headers=headers)
print(res_dest.text)

In [None]:
import json

def create_connections(url: str, payload:dict, headers: dict):
     res = requests.post(url, json=payload, headers=headers)
     return res

res_source_dict = json.loads(res_source.text)
sourceId = res_source_dict.get("sourceId")
res_dest_dict = json.loads(res_dest.text)
destId = res_dest_dict.get("destinationId")
payload= {
     "name": "support-agent-connection",
     "sourceId": sourceId,
     "destinationId":destId
}
access_token = os.environ["ACCESS_TOKEN"]
headers = {
    "accept": "application/json",
    "content-type": "application/json",
    "authorization": f"Bearer {access_token}" 
}

url = "connections"

res = create_connections(AIRBYTE_OSS_BASE_URL+url, payload=payload, headers=headers)
print(res.text)