Skip to content

Commit

Permalink
Merge pull request #9 from Blue-Yonder-OSS/logging-support_master
Browse files Browse the repository at this point in the history
Added logging support
  • Loading branch information
adityajaroli committed Jan 16, 2024
2 parents 6934853 + 2293cec commit eb649cf
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 15 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,7 @@ if __name__ == '__main__':

<h2> Development: </h2>

- Run this command to install the dependencies `pip install -r requirements.txt`
- Run this command to install test dependencies `pip install -r test-requirements.txt`
- Run this command to install the required development dependencies `pip install -r dev-requirements.txt`
- Run below commands to run the unit test cases: `pytest` or `coverage run --source=src.pg_bulk_loader --module pytest --verbose && coverage report --show-missing`


5 changes: 5 additions & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-r requirements.txt
-r test-requirements.txt

build
twine
7 changes: 4 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "pg_bulk_loader"
version = "1.1.1"
version = "1.1.2"
authors = [
{ name="Aditya Jaroli", email="adityajaroli@gmail.com" },
]
Expand All @@ -17,11 +17,12 @@ classifiers = [
dependencies = [
"pandas",
"psycopg",
"psycopg_binary",
"asyncio",
"psycopg_pool",
"retry"
]

[project.urls]
Homepage = "https://github.com/adityajaroli/pg-bulk-loader.git"
Issues = "https://github.com/adityajaroli/pg-bulk-loader/issues"
Homepage = "https://github.com/Blue-Yonder-OSS/pg-bulk-loader.git"
Issues = "https://github.com/Blue-Yonder-OSS/pg-bulk-loader/issues"
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pandas
psycopg
psycopg_binary
asyncio
psycopg_pool
retry
8 changes: 5 additions & 3 deletions src/pg_bulk_loader/batch/batch_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import asyncio
from .pg_connection_detail import PgConnectionDetail
from ..utils.common_utils import get_ranges
from ..utils.time_it_decorator import time_it
import logging
from retry import retry

logger = logging.getLogger(__name__)


class BatchInsert:

Expand Down Expand Up @@ -47,10 +49,10 @@ async def execute(self, data_df: pd.DataFrame, col_names: list = None):
"""
try:
partition_ranges = get_ranges(data_df.shape[0], self.batch_size)
print(f"Created {len(partition_ranges)} partitions!")
logger.debug(f"Created {len(partition_ranges)} partitions!")

if not partition_ranges:
print("warning: No data found to be inserted!")
logger.warning("No data found to be inserted!")
return

if col_names:
Expand Down
12 changes: 7 additions & 5 deletions src/pg_bulk_loader/batch/batch_insert_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
from .fast_load_hack import FastLoadHack
from .batch_insert import BatchInsert
import pandas as pd
import logging
from ..utils.time_it_decorator import time_it
import asyncio
from concurrent.futures import ProcessPoolExecutor
import math
import types

logger = logging.getLogger(__name__)


def __optimize_connection_pool_size(min_conn, total_data_size, batch_size):
Expand Down Expand Up @@ -90,13 +92,13 @@ async def batch_insert_to_postgres(
indexes = {}
if drop_and_create_index:
indexes: dict = fast_load_hack.get_indexes()
print(f'Indexes to be dropped and re-created: {indexes.keys()}')
logger.debug(f'Indexes to be dropped and re-created: {indexes.keys()}')
fast_load_hack.drop_indexes(list(indexes.keys()))

try:
if isinstance(input_data, pd.DataFrame) and not input_data.empty:
if isinstance(input_data, pd.DataFrame):
await run(input_data, batch_size, pg_conn_details, table_name, min_conn_pool_size, max_conn_pool_size)
elif isinstance(input_data, types.GeneratorType):
else:
await run_with_generator(
input_data, batch_size, pg_conn_details, table_name, min_conn_pool_size, max_conn_pool_size
)
Expand Down Expand Up @@ -140,7 +142,7 @@ async def batch_insert_to_postgres_with_multi_process(
indexes = {}
if drop_and_create_index:
indexes = fast_load_hack.get_indexes()
print(f'Indexes to be dropped and re-created: {indexes.keys()}')
logger.debug(f'Indexes to be dropped and re-created: {indexes.keys()}')
fast_load_hack.drop_indexes(list(indexes.keys()))

try:
Expand Down
5 changes: 4 additions & 1 deletion src/pg_bulk_loader/utils/time_it_decorator.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import asyncio
import functools
import logging
import time
from contextlib import contextmanager

logger = logging.getLogger()


def time_it(func):

@contextmanager
def wrapping_logic():
start_time = time.time()
yield
print(f'Function {func.__name__} executed in {(time.time() - start_time):.4f}s')
logger.debug(f'Function {func.__name__} executed in {(time.time() - start_time):.4f}s')

@functools.wraps(func)
def wrapper(*args, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/pg_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def fetch_rows_count_and_assert(pg_conn_details: PgConnectionDetail, table_name:
result = curser.execute(f"select count(1) from {table_name}").fetchone()
curser.close()
pg_conn.commit()
return result[0]
assert result[0] == expected
finally:
pg_conn.close()

Expand Down

0 comments on commit eb649cf

Please sign in to comment.