
## Install With `pip`
```bash
# requirements
pip install pandas pyarrow 

# install dialect MySQL, Postgres
pip install sqlalchemy pymysql psycopg2-binary
# install dialect Veritca
pip install git+https://github.com/attapon-th/sqlalchemy-vertica-python.git@latest
```

## Class DBUtil

- `has_tables`
- `list_tables`
- `get_columns` 
- `get_primarykeys`

In [2]:
import sqlalchemy as sa
from sqlalchemy.engine.interfaces import ReflectedColumn, ReflectedPrimaryKeyConstraint
from sqlalchemy import Engine
from typing import List, Any

class DBUtil:
    def __init__(self, engine: Engine, schema_or_dbname: str) -> None:
        self.engine: Engine = engine
        self.schema: str  = schema_or_dbname
        self.conn = engine.connect()
        self.closed: bool = True

    def execute(self, sql: str, parameters: Any = None):
        self.check_connect()
        return self.conn.execute(sa.text(sql), parameters)

    def close(self):
        self.closed = True
        self.conn.close()
        self.engine.dispose()

    def check_connect(self):
        if self.closed:
            self.conn = self.engine.connect()
            self.closed = False

    def has_tables(self, table: str) -> bool:
        self.check_connect()
        return self.conn.dialect.has_table(self.conn, table, schema=self.schema)

    def list_tables(self) -> List[str]:
        self.check_connect()
        return self.conn.dialect.get_table_names(self.conn, self.schema)

    def get_columns(self, table: str) -> list[ReflectedColumn]:
        self.check_connect()
        return self.conn.dialect.get_columns(self.conn, table, self.schema)
    
    def get_column_names(self, table: str) -> list[str]:
        self.check_connect()
        return [c["name"] for c in self.get_columns(table)]

    def get_primarykeys(self, table: str) -> list[str]:
        pk_cons: ReflectedPrimaryKeyConstraint = self.conn.dialect.get_pk_constraint(conn, table, self.schema)
        has_pk_constraint: bool = isinstance(pk_cons, dict) and "constrained_columns" in pk_cons and len(pk_cons["constrained_columns"]) > 0
        if has_pk_constraint:
            return pk_cons["constrained_columns"]

        columns: list[ReflectedColumn] = self.conn.dialect.get_columns(self.conn, table, schema=self.schema)
        return [c["name"] for c in columns if "primary_key" in c and c["primary_key"] is True]

## Vertica Copy CSV File With `STDIN`

### เงื่อนไข

1. ต้องมี Table รองรับแล้ว(Not auto create table)
2. request function helper
   1. `get_columns`

In [3]:
from sqlalchemy.engine import Engine
from pandas import DataFrame
from typing import Literal
from typing import List, Any


class VerticaCopy(DBUtil):
    def __init__(self, engine: Engine, schema: str) -> None:
        super().__init__(engine, schema)

        self._from_input = "STDIN"
        self._compression: Literal["BZIP", "GZIP", ""] = ""
        self._columns: list[str] = []

    def input(self, filepath: str):
        if filepath.upper() != "STDIN":
            filepath = "'" + filepath + "'"
        return self

    def commpression(self, compression: Literal["BZIP", "GZIP", ""]):
        self._compression = compression
        return self

    def columns(self, columns: list[str]):
        self._columns = columns
        return self

    def get_sql(self, table: str, reject_table: str | None = "") -> str:
        self.check_connect()
        schema: str = self.schema

        colStr = ""
        if len(self._columns) > 0:
            colStr = f"({', '.join(self._columns)})"
        sql: str = f"COPY {schema}.{table}{colStr} FROM {self._from_input} {self._compression} PARSER fcsvparser()"
        if reject_table is not None:
            sql += f" REJECTED DATA AS TABLE {schema}.__REJECT_{table}"
        return sql

    def copy(self, df: DataFrame, table: str) -> bool:
        self.check_connect()
        conn = self.engine.raw_connection().driver_connection
        if conn is None:
            return False
        try:
            src_columns: list[str] = df.columns.to_list()
            tgt_column: list[str] = self.get_column_names(table)

            # columns is src_columns intersect tgt_column
            columns: list[str] = [a for a in src_columns if a in tgt_column]

            sql: str = self.columns(columns).get_sql(table)

            if not hasattr(conn, "cursor"):
                print("cursor not support")
                return False

            cur = conn.cursor()
            if not hasattr(cur, "copy"):
                print("copy not support")
                return False
            cur.copy(sql, df.to_csv(index=False, escapechar='"', quotechar='"'))

        except Exception as err:
            conn.rollback()
            print(err)

        return False

## Merge Table

In [4]:
from sqlalchemy.engine import Engine
from pandas import DataFrame
from typing import Literal
from typing import List, Any, Dict, Optional


class VerticaMerge(DBUtil):
    def __init__(self, engine: Engine, source_schema: str, source_table: str) -> None:
        super().__init__(engine, source_schema)
        self._targetdb: DBUtil

        self._source_schema: str = source_schema
        self._source_table: str = source_table

        self._target_schema: str | None = None
        self._target_table: str | None = None

        self._on_columns: list[str] = []
        self._insert_columns: list[str] = []
        self._update_columns: list[str] = []

    def source(self, schema: str, table: str):
        self._source_schema = schema
        self._source_table = table
        return self

    def target(self, schema: str, table: str):
        self._target_schema = schema
        self._target_table = table
        self._targetdb = DBUtil(self.engine, schema)
        return self

    # Merge INTO ON ...
    # if columns is None then use target check primarykeys
    def on(self, on_columns: list[str] | None = None):
        if on_columns is not None:
            self._on_columns = on_columns

        if self._targetdb is None or self._target_table is None:
            raise Exception("target table not set")
        self._on_columns: list[str] = self._targetdb.get_primarykeys(self._target_table)
        return self

    def insert(self, insert_columns: Optional[List[str]] = None):
        if insert_columns is not None:
            self._insert_columns = insert_columns

        if self._targetdb is None or self._target_table is None:
            raise Exception("target table not set")
        tgt_cols: list[str] = self._targetdb.get_column_names(self._target_table)
        src_cols: list[str] = self.get_column_names(self._source_table)
        cols: list[str] = [c for c in src_cols if c in tgt_cols]
        self._insert_columns = cols
        return self

    def update(self, update_columns:  Optional[List[str]] = None):
        
        if update_columns is not None:
            self._update_columns = update_columns

        if len(self._on_columns) == 0:
            self.on()
        if self._targetdb is None or self._target_table is None:
            raise Exception("target table not set")
        tgt_cols: list[str] = self._targetdb.get_column_names(self._target_table)
        src_cols: list[str] = self.get_column_names(self._source_table)
        cols: list[str] = [c for c in src_cols if c in tgt_cols and c not in self._on_columns]
        self._update_columns = cols
        return self

    def get_sql(self, more_insert: Optional[Dict[str, str]] = None, more_update: Optional[Dict[str, str]] = None) -> str:
        if self._targetdb is None or self._target_table is None:
            raise Exception("target table not set")
        if len(self._on_columns) == 0:
            self.on()
        if len(self._insert_columns) == 0:
            self.insert()
        if len(self._update_columns) == 0:
            self.update()
        # target table alias name t
        sql: str = f"MERGE INTO {self._target_schema}.{self._target_table} t \n"

        # source table alias name s
        sql += f"USING {self._source_schema}.{self._source_table} s ON \n"
        sql += f"({self._on_columns}) \n"

        # UPDATE
        update_set: str = ", ".join([f"{a} = s.{a}" for a in self._update_columns])
        if more_update is not None:
            update_set = update_set + ", " + ", ".join([f"{k} = {v}" for k, v in more_update.items()])
        sql += f"WHEN MATCHED THEN UPDATE SET {update_set} \n"

        # INSERT cols
        insert_cols = ",".join(self._insert_columns)
        insert_values: str = ",".join([f"s.{a}" for a in self._insert_columns])
        if more_insert is not None:
            insert_cols: str = insert_cols + "," + ",".join(more_insert.keys())
            insert_values = insert_values + "," + ",".join(more_insert.values())
        sql += f"WHEN NOT MATCHED THEN INSERT ({insert_cols}) VALUES ({insert_values}) \n"
        sql += ";"
        return sql

    def merge(self, more_insert: Optional[Dict[str, str]] = None, more_update: Optional[Dict[str, str]] = None) -> Any:
        self.check_connect()
        sql: str = self.get_sql(more_insert, more_update)
        cur = self._targetdb.execute(sql)
        return cur.fetchone()