In [51]:
!pip install psycopg2-binary

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.6-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m6.2 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0mm
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.6

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0[0m[39;49m -> [0m[32;49m23.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [7]:
from typing import List, Tuple, Dict, Any
import json
import requests
import psycopg2
from contextlib import closing

In [8]:
endopoint = "http://localhost:8000/get_1_clients"

In [9]:
response: requests.Response = requests.get(endopoint)
response_json: Dict[str, Any] = json.loads(response.text)[0]
_ = [print(key) for key in response_json.keys()]

client_data
user_data
product_data
category_data
order_data
invoice_data


In [10]:
client_data: Dict[str, Any] = response_json.get("client_data")
user_data: Dict[str, Any] = response_json.get("user_data")
product_data: Dict[str, Any] = response_json.get("product_data")
category_data: Dict[str, Any] = response_json.get("category_data")
order_data: Dict[str, Any] = response_json.get("order_data")
invoice_data: Dict[str, Any] = response_json.get("invoice_data")

In [48]:
data_list_tuples: List[Tuple[str, str]] = [(col, str(sample).replace("\n", "")) for col, sample in invoice_data.items()]
DDL_Columns: str = "\n".join([f"  {col} varchar, --{sample}" for col, sample in data_list_tuples])
TABLE_NAME: str = "gcp_poc.onpremise.invoices"
DDl: str = f"""
DROP TABLE IF EXISTS {TABLE_NAME};
CREATE TABLE {TABLE_NAME} (
{DDL_Columns}
);"""

print(DDl)


DROP TABLE IF EXISTS gcp_poc.onpremise.invoices;
CREATE TABLE gcp_poc.onpremise.invoices (
  invoice_id varchar, --257E65DDFECE4C139CC0B4B0EC0B8A12
  invoice_customer_id varchar, --93C1B476399A4811831DDBAAA6663260
  invoice_order_id varchar, --E3CA997E7BD64583B8897C73ADCF9EE9
  invoice_amount varchar, --14409.0
  invoice_created_at varchar, --2023-03-31 02:50:15 UTC
  invoice_updated_at varchar, --
);


In [36]:
from typing import List, Tuple, Dict, Any
import json
import requests
import psycopg2
from contextlib import closing
import time
import random
import logging

# configure logging
logging.basicConfig(
    level=logging.NOTSET,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[
        logging.FileHandler("log_insert_data.log"),
        logging.StreamHandler()
    ]
)

def run_insert_query(query: str, connection: psycopg2.extensions.connection) -> None:
    try:
        cursor: psycopg2.extensions.cursor = connection.cursor()
        cursor.execute(query)
        connection.commit()
        cursor.close()
    except Exception as e:
        logging.error(f"Error: {e}")
        logging.error(f"Query: {query}")
        connection.rollback()


def get_sql_statements(
    client_data: Dict[str, Any],
    user_data: Dict[str, Any],
    product_data: Dict[str, Any],
    category_data: Dict[str, Any],
    order_data: Dict[str, Any],
    invoice_data: Dict[str, Any]
    ) -> Tuple[str, str, str, str, str, str]:
    client_columns: str = ", ".join([col for col, _ in client_data.items()])
    client_values: str = ", ".join([f"'{str(value)}'" for _, value in client_data.items()])
    SQL_CLIENTS: str = f"INSERT INTO gcp_poc.onpremise.clients ({client_columns}) VALUES ({client_values});".replace("''", "NULL")

    user_columns: str = ", ".join([col for col, _ in user_data.items()])
    user_values: str = ", ".join([f"'{str(value)}'" for _, value in user_data.items()])
    SQL_USERS: str = f"INSERT INTO gcp_poc.onpremise.users ({user_columns}) VALUES ({user_values});".replace("''", "NULL")

    product_columns: str = ", ".join([col for col, _ in product_data.items()])
    product_values: str = ", ".join([f"""'{str(value).replace("'", "")}'""" for _, value in product_data.items()])
    SQL_PRODUCT: str = f"INSERT INTO gcp_poc.onpremise.products ({product_columns}) VALUES ({product_values});".replace("''", "NULL")

    categories_columns: str = ", ".join([col for col, _ in category_data.items()])
    categories_values: str = ", ".join([f"""'{str(value).replace("'", "")}'""" for _, value in category_data.items()])
    SQL_CATEGORIES: str = f"INSERT INTO gcp_poc.onpremise.categories ({categories_columns}) VALUES ({categories_values});".replace("''", "NULL")

    orders_columns: str = ", ".join([col for col, _ in order_data.items()])
    orders_values: str = ", ".join([f"""'{str(value).replace("'", "")}'""" for _, value in order_data.items()])
    SQL_ORDERS: str = f"INSERT INTO gcp_poc.onpremise.orders ({orders_columns}) VALUES ({orders_values});".replace("''", "NULL")

    invoice_columns: str = ", ".join([col for col, _ in invoice_data.items()])
    invoice_values: str = ", ".join([f"""'{str(value).replace("'", "")}'""" for _, value in invoice_data.items()])
    SQL_INVOICE: str = f"INSERT INTO gcp_poc.onpremise.invoices ({invoice_columns}) VALUES ({invoice_values});".replace("''", "NULL")
    
    return SQL_CLIENTS, SQL_USERS, SQL_PRODUCT, SQL_CATEGORIES, SQL_ORDERS, SQL_INVOICE

def run_insert_row():
    # db connection
    connection: psycopg2.extensions.connection = psycopg2.connect(
        host="localhost",
        port=5432,
        database="gcp_poc",
        user="fabio",
        password="fabio"
    )

    with closing(connection) as cn:
        
        endopoint = "http://localhost:8000/get_1_clients"
        response: requests.Response = requests.get(endopoint)
        response_json: Dict[str, Any] = json.loads(response.text)[0]
        
        client_data: Dict[str, Any] = response_json.get("client_data")
        user_data: Dict[str, Any] = response_json.get("user_data")
        product_data: Dict[str, Any] = response_json.get("product_data")
        category_data: Dict[str, Any] = response_json.get("category_data")
        order_data: Dict[str, Any] = response_json.get("order_data")
        invoice_data: Dict[str, Any] = response_json.get("invoice_data")
        
        SQL_CLIENTS, SQL_USERS, SQL_PRODUCT, SQL_CATEGORIES, SQL_ORDERS, SQL_INVOICE = get_sql_statements(client_data, user_data, product_data, category_data, order_data, invoice_data)
        run_insert_query(SQL_CLIENTS, connection)
        run_insert_query(SQL_USERS, cn)
        run_insert_query(SQL_PRODUCT, cn)
        run_insert_query(SQL_CATEGORIES, cn)
        run_insert_query(SQL_ORDERS, cn)
        run_insert_query(SQL_INVOICE, cn)

if __name__ == "__main__":
    while True:
        seconds: int = random.randrange(1, 120)
        try:
            run_insert_row()
        except Exception as e:
            logging.error(f"Error: {e}")
        logging.info(f"Sleeping for {seconds} seconds")
        time.sleep(seconds)

2023-04-07 20:09:59,575 - urllib3.connectionpool - DEBUG - Starting new HTTP connection (1): localhost:8000
2023-04-07 20:09:59,659 - urllib3.connectionpool - DEBUG - http://localhost:8000 "GET /get_1_clients HTTP/1.1" 200 2816
2023-04-07 20:09:59,672 - root - INFO - Sleeping for 78 seconds
2023-04-07 20:11:17,757 - urllib3.connectionpool - DEBUG - Starting new HTTP connection (1): localhost:8000
2023-04-07 20:11:17,825 - urllib3.connectionpool - DEBUG - http://localhost:8000 "GET /get_1_clients HTTP/1.1" 200 2783
2023-04-07 20:11:17,840 - root - INFO - Sleeping for 41 seconds
2023-04-07 20:11:58,904 - urllib3.connectionpool - DEBUG - Starting new HTTP connection (1): localhost:8000
2023-04-07 20:11:58,996 - urllib3.connectionpool - DEBUG - http://localhost:8000 "GET /get_1_clients HTTP/1.1" 200 2720
2023-04-07 20:11:59,008 - root - INFO - Sleeping for 23 seconds
2023-04-07 20:12:22,038 - urllib3.connectionpool - DEBUG - Starting new HTTP connection (1): localhost:8000
2023-04-07 20:12

KeyboardInterrupt: 