Skip to content
ATTAPON THANAWANG edited this page Apr 20, 2022 · 1 revision

DB Helper API Documentation

Table of Contents

connection

create_connection

def create_connection(dsn: str, password: str = None)

Create Database Connection

MySQL: mysql://{user}@{host}:{port}/{db_name}

Vertica: vertica://{user}@{host}:{port}/{db_name}

OtherDB: {dialect}+{driver}://{user}@{host}:{port}/{db_name}

Arguments:

  • dsn str - dsn string connection Example: mysql://root@127.0.0.1:3306/mydb
  • password str, optional - Password Database. Defaults to None.

Returns:

  • DBConnection - Connection Database if success.

dataframe

convert_dtypes

def convert_dtypes(df: pd.DataFrame) -> pd.DataFrame

Convert Type in Pandas dataframe to pandas type

Arguments:

  • df pd.DataFrame - source PandasDataFrame to convert type

Returns:

  • pd.DataFrame - result after convert to dtype

select_column

def select_column(df: pd.DataFrame,
                  columns: list,
                  raise_not_exists: bool = False) -> pd.DataFrame

select column in Pandas DataFrame

Arguments:

  • df pd.DataFrame - source PandasDataFrame to convert type
  • columns list - list column name for check
  • raise_not_exists bool, optional - raise error if column name if not exist. Defaults to False.

Raises:

  • Exception - Error column if not exists.

Returns:

  • pd.DataFrame - result pandas DataFrame

parquet

vertica

create_table_with_query

def create_table_with_query(vertica_connection: VerticaConnection,
                            query: str,
                            to_table: str,
                            is_temp: bool = False) -> str

Create Table in Vertica with SQL Query

Arguments:

  • vertica_connection VerticaConnection - Vertica Connection
  • query str - SQL Query (SELECT Only)
  • to_table str - Table name ([schema.tablename|tablename])
  • is_temp_table bool, optional - create temp table delete auto when Vertica Session Connect closed. Defaults to False.

Raises:

  • Exception - Create Table Error

create_table_from

def create_table_from(vertica_connection: VerticaConnection, from_table: str,
                      to_table: str)

Create Table from another table in vertica database

Arguments:

  • vertica_connection VerticaConnection - Vertica Connection
  • from_table str - Source table copy DDL.
  • to_table str - Target table name

Raises:

  • Exception - Execute Create Table Error

create_table_local_temp

def create_table_local_temp(vertica_connection: VerticaConnection, query: str,
                            to_tablename: str)

Create local temp table in vertica

Arguments:

  • vertica_connection VerticaConnection - Vertica Connection
  • query str - SQL Statement (SELECT Only).
  • to_tablename str - Target table name only (No Schema name)

Raises:

  • Exception - Execute Create Table Error

get_ddl

def get_ddl(vertica_connection: VerticaConnection, query: str,
            to_table: str) -> str

Get SQL Create Table Statement With Query

Arguments:

  • vertica_connection VerticaConnection - Vertica Connection
  • query str - query (str): SQL Statement (SELECT Only).
  • to_table str - Target table name (Full call: 'schema.table' )

Returns:

  • str - SQL Create table

copy_to_vertica

def copy_to_vertica(vertica_connection: VerticaConnection,
                    fs: Union[os.PathLike, io.BytesIO, io.StringIO, Any],
                    table: str,
                    columns: List[str],
                    comprassion: str = "",
                    reject_table: str = None)

summary

Arguments:

  • vertica_connection VerticaConnection - Vertica Connection .
  • fs Union[os.PathLike, io.BytesIO, io.StringIO] - file path or file open. Example: open("/tmp/file.csv", "rb")
  • table str - Target table name.
  • comprassion str - Specifies the input format. [UNCOMPRESSED (default), BZIP,GZIP,LZO,ZSTD]
  • reject_table str, optional - Reject Data to table name. Defaults to None.
  • check_column bool, optional - Check column if exists. Defaults to True.

Raises:

  • Exception - Target table copy is not exist.
  • Exception - Copy data error.

Returns:

  • str - SQL COPY

merge_to_table

def merge_to_table(
        vertica_connection: VerticaConnection,
        from_table: str,
        to_table: str,
        merge_on_columns: List[str],
        *,
        no_execute: bool = False,
        add_field_insert: Dict[str, AnyStr] = None,
        add_field_update: Dict[str, AnyStr] = None) -> Union[AnyStr, int]

Vertica Merge Data between table and table

Arguments:

  • vertica_connection VerticaConnection - Vertica Connection.
  • from_table str - Source table name.
  • to_table str - Target table name.
  • merge_on_columns List[str] - Check columns match is UPDATE and not match is INSERT
  • no_execute bool, optional - If True Return SQL Statement Only. Defaults to False.
  • add_field_insert Dict[str, AnyStr], optional - Add field insret more. Defaults to None.
  • add_field_update Dict[str, AnyStr], optional - Add field update more. Defaults to None.

Raises:

  • Exception - Error Merge is not success.

Returns:

Union[AnyStr, int]: Return if no_execute == True Return SQL Statement else Return merge_total count total data merge into table target.

table_check

def table_check(vertica_connection: VerticaConnection,
                table: str) -> pd.DataFrame

summary

Arguments:

  • vertica_connection VerticaConnection - Vertica Connection.
  • table str - Full Table name.

Returns:

  • pd.DataFrame - table if exsit return Padas DataFrame.

drop_table

def drop_table(vertica_connection: VerticaConnection, table: str) -> bool

Drop Table if exists.

Arguments:

  • vertica_connection VerticaConnection - Vertica Connection.
  • table str - Full Table name.

Returns:

  • bool - is success.

csv

to_csv

def to_csv(engine,
           sql_query: str,
           file_name: os.PathLike,
           compression=CSV_COMPRESSION_GZIP,
           func_print: Callable = print) -> int

SQL Query Statemet to CSV format file and compression data.

Arguments:

  • engine Connection - Connection Database And SQLAlchemy.Engine
  • sql_query str - SQL Query Statement (SELECT Only)
  • file_name os.PathLike - save with filename and extention file (Example: ./mycsv.csv.gz)
  • compression str, optional - Compression file type (plain|gzip|zip). Defaults to CSV_COMPRESSION_GZIP.
  • func_print Callable, optional - Callback Print Massage function . Defaults to print.

Raises:

  • ex - Errror Handler

Returns:

  • int - Total count record data.

read_csv

def read_csv(filename: os.PathLike, **pandas_option) -> pd.DataFrame

Read Csv file

pandas option: https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html

Arguments:

  • filename os.PathLike - Any valid string path is acceptable. The string could be a URL. Valid URL schemes include http, ftp, s3, gs, and file. For file URLs, a host is expected. A local file could be: file://localhost/path/to/table.csv.

    If you want to pass in a path object, pandas accepts any os.PathLike.

    By file-like object, we refer to objects with a read() method, such as a file handle (e.g. via builtin open function) or StringIO.

Returns:

  • pd.DataFrame - pandas DataFrame

head_csv

def head_csv(filename: os.PathLike,
             nrows: int = 10,
             **pandas_option) -> pd.DataFrame

Read Head record in csv file

Arguments:

  • filename os.PathLike - filename
  • nrows int, optional - number rows. Defaults to 10.

Returns:

  • pd.DataFrame - pandas DataFrame

batch_csv

def batch_csv(filename: os.PathLike,
              batch_size: int = 10000,
              **pandas_option) -> Iterator[pd.DataFrame]

Read CSV file for iteration object

Arguments:

  • filename os.PathLike - filename
  • batch_size int, optional - batch_size or chunksize row number. Defaults to 10000.

Yields:

  • Iterator[pd.DataFrame] - Return Iterator[pd.DataFrame]