In [26]:

from sqlalchemy import create_engine
from sqlalchemy import text
from sqlalchemy.engine.base import Engine

from dataclasses import dataclass

import pandas as pd

import requests
from requests.auth import HTTPBasicAuth



In [28]:
@dataclass
class Templates:
    source_stmt_template = "select {columns} from {table} order by {columns}"
    target_stmt_template = "select {columns} from {table} where is_active = 1 order by {columns}"
    template_update = "UPDATE {table} SET is_active=0 WHERE {where}"


@dataclass
class Source:
    schema = "public"
    table = "customers_upd"
    pks = ["id", "email"]

@dataclass
class Target:
    schema = "raw_public"
    table = "customers_upd"
    pks = ["id", "email"]


@dataclass
class Params:
    source = Source()                                        
    target = Target()


class Utils:
    @staticmethod
    def pks_list_to_string(pks):
        return ",".join(pks)

    @staticmethod
    def is_str(column):
        """
        Returns True if the column type is string
        return str(column.dtype) == "<class'str'>"
        """
        if isinstance(column, str):
            return f"'{column}'"
        return column



In [29]:
## Logica Core
def template_update_flag_delete(rows, pks, template_update, target_table):
    """
    Returns a string with the template for the update statement
    """
    where = []
    for row in rows:
        aux = []
        for index, value in enumerate(row):
            value = Utils.is_str(value)
            column = pks[index]
            aux.append(f"{column}={value}")
            
        where_tmp = " AND ".join(aux)
        where.append(template_update.format(table=target_table, where=where_tmp))
    return where



In [31]:
import logging
from sqlalchemy import inspect, MetaData, Table
from sqlalchemy.schema import CreateTable

logging.basicConfig(level=logging.INFO)


class DatabaseHelper:
    def __init__(self, engine):
        self.engine = engine
        self.inspector = inspect(self.engine)

    def table_exists(self, schema, table_name):
        if table_name in self.inspector.get_table_names(schema=schema):
            return True
        return False


class SourceTargetTable:
    def __init__(self, source_engine: Engine, target_engine: Engine):
        
        
        self.source_engine = source_engine
        self.target_engine = target_engine
        
        # Testing connections
        self.__test_connections()
        self.__test_tables_exists()
        
    def __test_connections(self):
        """
        Tests the connection to the source and target databases.

        Raises:
            Exception: If there is an error connecting to the database.
        """
        try:    
            with self.source_engine.connect():
                logging.info("Connection to source table successful.")
        except Exception as e:
            logging.error(f"Error connecting to source table: {e}")

        try:
            with self.target_engine.connect():
                logging.info("Connection to target table successful.")
        except Exception as e:
            logging.error(f"Error connecting to target table: {e}")
    
    def __test_tables_exists(self):

        db_helper = DatabaseHelper(self.source_engine)
        if not db_helper.table_exists(Params.source.schema, Params.source.table):
            raise Exception(f"Source table {Params.source.table} does not exist.")
        logging.info(f"Source table {Params.source.schema}.{Params.source.table} exists.")
        
        db_helper = DatabaseHelper(self.target_engine)
        if not db_helper.table_exists(Params.target.schema, Params.target.table):
            raise Exception(f"Target table {Params.target.table} does not exist.")
        logging.info(f"Target table {Params.target.schema}.{Params.target.table} exists.")
    
    


In [64]:
from sqlalchemy import MetaData, Table
from sqlalchemy.schema import CreateTable
from sqlalchemy.dialects.oracle import dialect as oracle_dialect
from sqlalchemy.dialects.postgresql import dialect as postgres_dialect
from sqlalchemy.dialects.oracle import NUMBER, DATE, VARCHAR as ORVARCHAR
from sqlalchemy.dialects.postgresql import NUMERIC, INTEGER, TIMESTAMP, VARCHAR as PGVARCHAR

def create_target_table_ddl(engine_source, engine_target, table_name):
    """
    Generates DDL to create the target table based on the structure of the source table.
    Returns:
        str: DDL to create the target table.
    """
    metadata = MetaData(bind=engine_source)
    source_table = Table(table_name, metadata, autoload=True, autoload_with=engine_source)
    
    for column in source_table.columns:
        
        if isinstance(column.type, ORVARCHAR):
            column.type = PGVARCHAR()
        
        
        if isinstance(column.type, DATE):
            column.type = TIMESTAMP()
            
            
        if isinstance(column.type, NUMBER):
            precision, scale, asdecimal = column.type.precision, column.type.scale, column.type.asdecimal
            if asdecimal:
                column.type = NUMERIC(precision=precision, scale=scale)
                
            else:
                column.type = INTEGER()
    source_table.name = table_name.lower()
    
    
    target_table_ddl = CreateTable(source_table, if_not_exists=True).compile(dialect=oracle_dialect())
    return str(target_table_ddl)


oracle_engine = create_engine("oracle://<USER>:<PASSWORD>@localhost:1521/?service_name=ORCL")
target_engine = create_engine("postgresql://postgres:postgres@localhost:54333/postgres")
ddl = create_target_table_ddl(oracle_engine, target_engine, table_name="APT_CARGAS")


with target_engine.connect() as conn:
    conn.execute(ddl)


In [90]:
import pandas as pd

def soft_delete(table_name):
    ...

def select(table_name):
    stmt = "SELECT * FROM {table_name}".format(table_name=table_name)
    return pd.read_sql(sql=stmt, con=oracle_engine)

df = select(table_name="APT_CARGAS")
df.to_sql("apt_cargas", schema="public", con=target_engine, if_exists="append", index=False)

54

In [33]:
from sqlalchemy import create_engine


## Engines
source_engine = create_engine("postgresql://postgres:postgres@localhost:54322/inventory_upd")
target_engine = create_engine("postgresql://postgres:postgres@localhost:54333/postgres")
oracle_engine = create_engine("oracle://<USER>:<PASSWORD>@localhost:1521/?service_name=ORCL")


In [34]:

source_target_table = SourceTargetTable(source_engine, target_engine)


INFO:root:Connection to source table successful.
INFO:root:Connection to target table successful.
INFO:root:Source table public.customers_upd exists.
INFO:root:Target table raw_public.customers_upd exists.


TypeError: TypeEngine.dialect_impl() missing 1 required positional argument: 'dialect'

In [4]:
# Source df
source_df = pd.read_sql(
    source_stmt_template.format(columns=columns_stmt, table=source_table), con=source_engine
)

source_set = set(map(tuple, source_df.values.tolist()))
source_set

{(1, 'sally.thomas@acme.com'),
 (2, 'gbailey@foobar.com'),
 (3, 'ed@walker.com'),
 (12, 'teste')}

In [5]:
# Target df
schema = "raw_public"
target_df = pd.read_sql(
    target_stmt_template.format(columns=columns_stmt, table=target_table), con=target_engine
)

# So para teste
target_df.loc[len(target_df), ["id", "email"]] = [int(100), "email@email"]
target_df.loc[len(target_df), ["id", "email"]] = [int(101), "email2@email"]

target_df["id"] = target_df["id"].astype(int)

target_set = set(map(tuple, target_df.values.tolist()))
target_set

{(1, 'sally.thomas@acme.com'),
 (2, 'gbailey@foobar.com'),
 (3, 'ed@walker.com'),
 (100, 'email@email'),
 (101, 'email2@email')}

In [6]:
diff_set = target_set - source_set
diff_set

{(100, 'email@email'), (101, 'email2@email')}

In [7]:



# Execute Update Delte row in source - desactive in target
if diff_set:
    where = template_update_flag_delete(diff_set, pks)
    # with target_engine.connect() as conn:
    #     for w in where:
    #         conn.execute(text(w))
where

["UPDATE raw_public.customers_upd SET is_active=0 WHERE id=101 AND email='email2@email'",
 "UPDATE raw_public.customers_upd SET is_active=0 WHERE id=100 AND email='email@email'"]

In [9]:


auth = HTTPBasicAuth("airbyte", "password")
url = "http://192.168.0.113:8006/v1/connections/37bce8a8-d3ed-40a8-8e10-109a017241d4"

response = requests.get(url, auth=auth)

if response.status_code == 200:
    data = response.json()
    schema_target = data["namespaceFormat"]
    streams = data["configurations"]["streams"]
    for stream in streams:
        table = f"{schema_target}.{stream['name']}"

        pks = stream["primaryKey"][0]
        columns_stmt = ",".join(pks)        
        print(target_stmt_template.format(columns=columns_stmt, table=table))
        target_df = pd.read_sql(
            target_stmt_template.format(columns=columns_stmt, table=table), con=target_engine
        )

        break
target_df
    

Unnamed: 0,id,email
0,1,sally.thomas@acme.com
1,2,gbailey@foobar.com
2,3,ed@walker.com
3,100,email@email
4,101,email2@email


In [None]:
help(requests.get)