diff --git a/data_utils/__init__.py b/data_utils/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/data_utils/datahub_services/__init__.py b/data_utils/datahub_services/__init__.py deleted file mode 100644 index 9457897..0000000 --- a/data_utils/datahub_services/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -""" -DataHub 유틸리티 패키지 - -DataHub와의 상호작용을 위한 모듈들을 제공합니다. - -주요 구성요소: -- DataHubBaseClient: 기본 연결 및 통신 -- MetadataService: 메타데이터, 리니지, URN 관련 기능 -- QueryService: 쿼리 관련 기능 -- GlossaryService: 용어집 관련 기능 -""" - -from .base_client import DataHubBaseClient -from .metadata_service import MetadataService -from .query_service import QueryService -from .glossary_service import GlossaryService - -__all__ = ["DataHubBaseClient", "MetadataService", "QueryService", "GlossaryService"] diff --git a/db_utils/__init__.py b/db_utils/__init__.py deleted file mode 100644 index f5a3901..0000000 --- a/db_utils/__init__.py +++ /dev/null @@ -1,125 +0,0 @@ -from typing import Optional -import os -from .config import DBConfig -from .logger import logger - -from .base_connector import BaseConnector - -from .clickhouse_connector import ClickHouseConnector -from .postgres_connector import PostgresConnector -from .mysql_connector import MySQLConnector -from .mariadb_connector import MariaDBConnector -from .oracle_connector import OracleConnector -from .duckdb_connector import DuckDBConnector -from .databricks_connector import DatabricksConnector -from .snowflake_connector import SnowflakeConnector -from .trino_connector import TrinoConnector - -env_path = os.path.join(os.getcwd(), ".env") - - -def get_db_connector(db_type: Optional[str] = None, config: Optional[DBConfig] = None): - """ - Return the appropriate DB connector instance. - - If db_type is not provided, loads from environment variable DB_TYPE - - If config is not provided, loads from environment using db_type - - Parameters: - db_type (Optional[str]): Database type (e.g., 'postgresql', 'mysql') - config (Optional[DBConfig]): Connection config - - Returns: - BaseConnector: Initialized DB connector instance - - Raises: - ValueError: If type/config is missing or invalid - """ - if db_type is None: - db_type = os.getenv("DB_TYPE") - if not db_type: - raise ValueError( - "DB type must be provided or set in environment as DB_TYPE." - ) - - db_type = db_type.lower() - - if config is None: - config = load_config_from_env(db_type.upper()) - - connector_map = { - "clickhouse": ClickHouseConnector, - "postgresql": PostgresConnector, - "mysql": MySQLConnector, - "mariadb": MariaDBConnector, - "oracle": OracleConnector, - "duckdb": DuckDBConnector, - "databricks": DatabricksConnector, - "snowflake": SnowflakeConnector, - "trino": TrinoConnector, - } - - if db_type not in connector_map: - logger.error(f"Unsupported DB type: {db_type}") - raise ValueError(f"Unsupported DB type: {db_type}") - - required_fields = { - "oracle": ["extra.service_name"], - "databricks": ["extra.http_path", "extra.access_token"], - "snowflake": ["extra.account"], - } - - missing = [] - for path in required_fields.get(db_type, []): - cur = config - for key in path.split("."): - cur = cur.get(key) if isinstance(cur, dict) else None - if cur is None: - missing.append(path) - break - - if missing: - logger.error(f"Missing required fields for {db_type}: {', '.join(missing)}") - raise ValueError(f"Missing required fields for {db_type}: {', '.join(missing)}") - - return connector_map[db_type](config) - - -def load_config_from_env(prefix: str) -> DBConfig: - """ - Load DBConfig from environment variables with a given prefix. - Standard keys are extracted, all other prefixed keys go to 'extra'. - - Example: - If prefix = 'SNOWFLAKE', loads: - - SNOWFLAKE_HOST - - SNOWFLAKE_USER - - SNOWFLAKE_PASSWORD - - SNOWFLAKE_PORT - - SNOWFLAKE_DATABASE - Other keys like SNOWFLAKE_ACCOUNT, SNOWFLAKE_WAREHOUSE -> extra - """ - base_keys = {"HOST", "PORT", "USER", "PASSWORD", "DATABASE"} - - # Extract standard values - config = { - "host": os.getenv(f"{prefix}_HOST"), - "port": ( - int(os.getenv(f"{prefix}_PORT")) if os.getenv(f"{prefix}_PORT") else None - ), - "user": os.getenv(f"{prefix}_USER"), - "password": os.getenv(f"{prefix}_PASSWORD"), - "database": os.getenv(f"{prefix}_DATABASE"), - } - - # Auto-detect extra keys - extra = {} - for key, value in os.environ.items(): - if key.startswith(f"{prefix}_"): - suffix = key[len(f"{prefix}_") :] - if suffix.upper() not in base_keys: - extra[suffix.lower()] = value - - if extra: - config["extra"] = extra - - return DBConfig(**config) diff --git a/db_utils/base_connector.py b/db_utils/base_connector.py deleted file mode 100644 index 47c17d1..0000000 --- a/db_utils/base_connector.py +++ /dev/null @@ -1,28 +0,0 @@ -from abc import ABC, abstractmethod -import pandas as pd - - -class BaseConnector(ABC): - """ - Abstract base class for database connectors. - """ - - @abstractmethod - def connect(self): - """ - Initialize the database connection. - """ - pass - - @abstractmethod - def run_sql(self, sql: str) -> pd.DataFrame: - """ - Returns the result of the SQL query as a pandas DataFrame. - - Parameters: - sql (str): SQL query string to be executed. - - Returns: - pd.DataFrame: Result of the SQL query as a pandas DataFrame. - """ - pass diff --git a/db_utils/clickhouse_connector.py b/db_utils/clickhouse_connector.py deleted file mode 100644 index 50b11cc..0000000 --- a/db_utils/clickhouse_connector.py +++ /dev/null @@ -1,72 +0,0 @@ -from .base_connector import BaseConnector -from clickhouse_driver import Client -import pandas as pd -from db_utils import DBConfig, logger - - -class ClickHouseConnector(BaseConnector): - """ - Connect to ClickHouse and execute SQL queries. - """ - - client = None - - def __init__(self, config: DBConfig): - """ - Initialize the ClickHouseConnector with connection parameters. - - Parameters: - config (DBConfig): Configuration object containing connection parameters. - """ - self.host = config["host"] - self.port = config["port"] - self.user = config["user"] - self.password = config["password"] - self.database = config["database"] - self.connect() - - def connect(self) -> None: - """ - Establish a connection to the ClickHouse server. - """ - try: - self.client = Client( - host=self.host, - port=self.port, - user=self.user, - password=self.password, - database=self.database, - ) - logger.info("Successfully connected to ClickHouse.") - except Exception as e: - logger.error(f"Failed to connect to ClickHouse: {e}") - raise - - def run_sql(self, sql: str) -> pd.DataFrame: - """ - Execute a SQL query and return the result as a pandas DataFrame. - - Parameters: - sql (str): SQL query string to be executed. - - Returns: - pd.DataFrame: Result of the SQL query as a pandas DataFrame. - """ - if self.client is None: - self.connect() - - try: - result = self.client.query_dataframe(sql) - return result - except Exception as e: - logger.error(f"Failed to execute SQL query: {e}") - raise - - def close(self) -> None: - """ - Close the connection to the ClickHouse server. - """ - if self.client: - self.client.disconnect() - logger.error("Connection to ClickHouse closed.") - self.client = None diff --git a/db_utils/config.py b/db_utils/config.py deleted file mode 100644 index 4deb2c4..0000000 --- a/db_utils/config.py +++ /dev/null @@ -1,11 +0,0 @@ -from typing import Optional, Dict, TypedDict - - -class DBConfig(TypedDict): - - host: str - port: Optional[int] - user: Optional[str] - password: Optional[str] - database: Optional[str] - extra: Optional[Dict[str, str]] diff --git a/db_utils/databricks_connector.py b/db_utils/databricks_connector.py deleted file mode 100644 index 6cd60b6..0000000 --- a/db_utils/databricks_connector.py +++ /dev/null @@ -1,80 +0,0 @@ -from databricks import sql -import pandas as pd -from .base_connector import BaseConnector -from .config import DBConfig -from .logger import logger - - -class DatabricksConnector(BaseConnector): - """ - Connect to Databricks SQL Warehouse and execute queries. - """ - - connection = None - - def __init__(self, config: DBConfig): - """ - Initialize the DatabricksConnector with connection parameters. - - Parameters: - config (DBConfig): Configuration object containing connection parameters. - Required keys: host, extra.http_path, extra.access_token - Optional keys: extra.catalog, extra.schema - """ - self.server_hostname = config["host"] - self.http_path = config["extra"]["http_path"] - self.access_token = config["extra"]["access_token"] - self.catalog = config.get("extra", {}).get("catalog") - self.schema = config.get("extra", {}).get("schema") - self.connect() - - def connect(self) -> None: - """ - Establish a connection to the Databricks SQL endpoint. - """ - try: - self.connection = sql.connect( - server_hostname=self.server_hostname, - http_path=self.http_path, - access_token=self.access_token, - catalog=self.catalog, - schema=self.schema, - ) - logger.info("Successfully connected to Databricks.") - except Exception as e: - logger.error(f"Failed to connect to Databricks: {e}") - raise - - def run_sql(self, sql: str) -> pd.DataFrame: - """ - Execute a SQL query and return result as pandas DataFrame. - - Parameters: - sql (str): SQL query string to be executed. - - Returns: - pd.DataFrame: Result of the SQL query as a pandas DataFrame. - """ - if self.connection is None: - self.connect() - - try: - cursor = self.connection.cursor() - cursor.execute(sql) - columns = [desc[0] for desc in cursor.description] - rows = cursor.fetchall() - return pd.DataFrame(rows, columns=columns) - except Exception as e: - logger.error(f"Failed to execute SQL query: {e}") - raise - finally: - cursor.close() - - def close(self) -> None: - """ - Close the Databricks connection. - """ - if self.connection: - self.connection.close() - logger.error("Connection to Databricks closed.") - self.connection = None diff --git a/db_utils/duckdb_connector.py b/db_utils/duckdb_connector.py deleted file mode 100644 index 41f7665..0000000 --- a/db_utils/duckdb_connector.py +++ /dev/null @@ -1,60 +0,0 @@ -import duckdb -import pandas as pd -from .base_connector import BaseConnector -from .config import DBConfig -from .logger import logger - - -class DuckDBConnector(BaseConnector): - """ - Connect to DuckDB and execute SQL queries. - """ - - connection = None - - def __init__(self, config: DBConfig): - """ - Initialize the DuckDBConnector with connection parameters. - - Parameters: - config (DBConfig): Configuration object containing connection parameters. - Uses config['path'] as the file path or ':memory:'. - """ - self.database = config.get("path", ":memory:") - self.connect() - - def connect(self) -> None: - """ - Establish a connection to the DuckDB database. - """ - try: - self.connection = duckdb.connect(database=self.database) - logger.info("Successfully connected to DuckDB.") - except Exception as e: - logger.error(f"Failed to connect to DuckDB: {e}") - raise - - def run_sql(self, sql: str) -> pd.DataFrame: - """ - Execute a SQL query and return the result as a pandas DataFrame. - - Parameters: - sql (str): SQL query string to be executed. - - Returns: - pd.DataFrame: Result of the SQL query as a pandas DataFrame. - """ - try: - return self.connection.execute(sql).fetchdf() - except Exception as e: - logger.error(f"Failed to execute SQL query: {e}") - raise - - def close(self) -> None: - """ - Close the connection to the DuckDB database. - """ - if self.connection: - self.connection.close() - logger.error("Connection to DuckDB closed.") - self.connection = None diff --git a/db_utils/mariadb_connector.py b/db_utils/mariadb_connector.py deleted file mode 100644 index 5770198..0000000 --- a/db_utils/mariadb_connector.py +++ /dev/null @@ -1,76 +0,0 @@ -import mysql.connector -import pandas as pd -from .base_connector import BaseConnector -from .config import DBConfig -from .logger import logger - - -class MariaDBConnector(BaseConnector): - """ - Connect to MariaDB and execute SQL queries. - This class uses **mysql-connector-python** to connect to the MariaDB server. - """ - - connection = None - - def __init__(self, config: DBConfig): - """ - Initialize the MariaDBConnector with connection parameters. - - Parameters: - config (DBConfig): Configuration object containing connection parameters. - """ - self.host = config["host"] - self.port = config.get("port", 3306) - self.user = config["user"] - self.password = config["password"] - self.database = config["database"] - self.connect() - - def connect(self) -> None: - """ - Establish a connection to the MariaDB server using mysql-connector-python. - """ - try: - self.connection = mysql.connector.connect( - host=self.host, - port=self.port, - user=self.user, - password=self.password, - database=self.database, - ) - logger.info("Successfully connected to MariaDB.") - except Exception as e: - logger.error(f"Failed to connect to MariaDB: {e}") - raise - - def run_sql(self, sql: str) -> pd.DataFrame: - """ - Execute a SQL query and return the result as a pandas DataFrame. - - Parameters: - sql (str): SQL query string to be executed. - - Returns: - pd.DataFrame: Result of the SQL query as a pandas DataFrame. - """ - try: - cursor = self.connection.cursor() - cursor.execute(sql) - columns = [column[0] for column in cursor.description] - rows = cursor.fetchall() - return pd.DataFrame(rows, columns=columns) - except Exception as e: - print(f"Failed to execute SQL query: {e}") - raise - finally: - cursor.close() - - def close(self) -> None: - """ - Close the connection to the MariaDB server. - """ - if self.connection: - self.connection.close() - print("Connection to MariaDB closed.") - self.connection = None diff --git a/db_utils/oracle_connector.py b/db_utils/oracle_connector.py deleted file mode 100644 index 6568daa..0000000 --- a/db_utils/oracle_connector.py +++ /dev/null @@ -1,67 +0,0 @@ -import oracledb -import pandas as pd -from .base_connector import BaseConnector -from .config import DBConfig -from .logger import logger - - -class OracleConnector(BaseConnector): - """ - Connect to Oracle database and execute SQL queries. - """ - - connection = None - - def __init__(self, config: DBConfig): - """ - Initialize the OracleConnector with connection parameters. - Parameters: - config (DBConfig): Configuration object containing connection parameters. - """ - self.host = config["host"] - self.port = config["port"] - self.user = config["user"] - self.password = config["password"] - self.service_name = config.get("extra").get("service_name", "orcl") - self.connect() - - def connect(self) -> None: - """ - Establish a connection to the Oracle server. - """ - try: - self.connection = oracledb.connect( - user=self.user, - password=self.password, - dsn=f"{self.host}:{self.port}/{self.service_name}", - ) - logger.info("Successfully connected to Oracle.") - except Exception as e: - logger.error(f"Failed to connect to Oracle: {e}") - raise - - def run_sql(self, sql: str) -> pd.DataFrame: - """ - Execute a SQL query and return the result as a pandas DataFrame. - Parameters: - sql (str): SQL query string to be executed. - Returns: - pd.DataFrame: Result of the SQL query as a pandas DataFrame. - """ - try: - cursor = self.connection.cursor() - cursor.execute(sql) - columns = [desc[0] for desc in cursor.description] - rows = cursor.fetchall() - return pd.DataFrame(rows, columns=columns) - except Exception as e: - logger.error(f"Failed to execute SQL query: {e}") - raise - finally: - cursor.close() - - def close(self) -> None: - if self.connection: - self.connection.close() - logger.error("Connection to Oracle closed.") - self.connection = None diff --git a/db_utils/sqlite_connector.py b/db_utils/sqlite_connector.py deleted file mode 100644 index aeb871c..0000000 --- a/db_utils/sqlite_connector.py +++ /dev/null @@ -1,70 +0,0 @@ -import sqlite3 -import pandas as pd -from .base_connector import BaseConnector -from .config import DBConfig -from .logger import logger - - -class SQLiteConnector(BaseConnector): - """ - Connect to SQLite and execute SQL queries. - """ - - connection = None - - def __init__(self, config: DBConfig): - """ - Initialize the SQLiteConnector with connection parameters. - - Parameters: - config (DBConfig): Configuration object containing connection parameters. - Uses config["database"] as the SQLite file path. - If None or ":memory:", creates an in-memory database. - """ - self.database = config.get("path", ":memory:") - self.connect() - - def connect(self) -> None: - """ - Establish a connection to the SQLite database. - """ - try: - self.connection = sqlite3.connect(self.database) - logger.info(f"Successfully connected to SQLite ({self.database}).") - except Exception as e: - logger.error(f"Failed to connect to SQLite: {e}") - raise - - def run_sql(self, sql: str) -> pd.DataFrame: - """ - Execute a SQL query and return the result as a pandas DataFrame. - - Parameters: - sql (str): SQL query string to be executed. - - Returns: - pd.DataFrame: Result of the SQL query as a pandas DataFrame. - """ - if self.connection is None: - self.connect() - - try: - cursor = self.connection.cursor() - cursor.execute(sql) - columns = [col[0] for col in cursor.description] - rows = cursor.fetchall() - return pd.DataFrame(rows, columns=columns) - except Exception as e: - logger.error(f"Failed to execute SQL query: {e}") - raise - finally: - cursor.close() - - def close(self) -> None: - """ - Close the connection to the SQLite database. - """ - if self.connection: - self.connection.close() - logger.info("Connection to SQLite closed.") - self.connection = None diff --git a/db_utils/trino_connector.py b/db_utils/trino_connector.py deleted file mode 100644 index b471674..0000000 --- a/db_utils/trino_connector.py +++ /dev/null @@ -1,120 +0,0 @@ -import pandas as pd -from .base_connector import BaseConnector -from .config import DBConfig -from .logger import logger - -try: - import trino -except Exception as e: # pragma: no cover - trino = None - _import_error = e - - -class TrinoConnector(BaseConnector): - """ - Connect to Trino and execute SQL queries. - """ - - connection = None - - def __init__(self, config: DBConfig): - """ - Initialize the TrinoConnector with connection parameters. - - Parameters: - config (DBConfig): Configuration object containing connection parameters. - """ - self.host = config["host"] - self.port = config["port"] or 8080 - self.user = config.get("user") or "anonymous" - self.password = config.get("password") - self.database = config.get("database") # e.g., catalog.schema - self.extra = config.get("extra") or {} - self.http_scheme = self.extra.get("http_scheme", "http") - self.catalog = self.extra.get("catalog") - self.schema = self.extra.get("schema") - - # If database given as "catalog.schema", split into fields - if self.database and (not self.catalog or not self.schema): - if "." in self.database: - db_catalog, db_schema = self.database.split(".", 1) - self.catalog = self.catalog or db_catalog - self.schema = self.schema or db_schema - - self.connect() - - def connect(self) -> None: - """ - Establish a connection to the Trino cluster. - """ - if trino is None: - logger.error(f"Failed to import trino driver: {_import_error}") - raise _import_error - - try: - auth = None - if self.password: - # If HTTP, ignore password to avoid insecure auth error - if self.http_scheme == "http": - logger.warning( - "Password provided for HTTP; ignoring password. " - "Set TRINO_HTTP_SCHEME=https to enable password authentication." - ) - else: - # Basic auth over HTTPS - auth = trino.auth.BasicAuthentication(self.user, self.password) - - self.connection = trino.dbapi.connect( - host=self.host, - port=self.port, - user=self.user, - http_scheme=self.http_scheme, - catalog=self.catalog, - schema=self.schema, - auth=auth, - # Optional: session properties - # session_properties={} - ) - logger.info("Successfully connected to Trino.") - except Exception as e: - logger.error(f"Failed to connect to Trino: {e}") - raise - - def run_sql(self, sql: str) -> pd.DataFrame: - """ - Execute a SQL query and return the result as a pandas DataFrame. - - Parameters: - sql (str): SQL query string to be executed. - - Returns: - pd.DataFrame: Result of the SQL query as a pandas DataFrame. - """ - try: - cursor = self.connection.cursor() - cursor.execute(sql) - columns = ( - [desc[0] for desc in cursor.description] if cursor.description else [] - ) - rows = cursor.fetchall() if cursor.description else [] - return pd.DataFrame(rows, columns=columns) - except Exception as e: - logger.error(f"Failed to execute SQL query on Trino: {e}") - raise - finally: - try: - cursor.close() - except Exception: - pass - - def close(self) -> None: - """ - Close the connection to the Trino cluster. - """ - if self.connection: - try: - self.connection.close() - except Exception: - pass - logger.info("Connection to Trino closed.") - self.connection = None diff --git a/docker/docker-compose-pgvector.yml b/docker/docker-compose-pgvector.yml new file mode 100644 index 0000000..8ad5e16 --- /dev/null +++ b/docker/docker-compose-pgvector.yml @@ -0,0 +1,23 @@ +# docker compose -f docker-compose-pgvector.yml up +# docker compose -f docker-compose-pgvector.yml down + +services: + pgvector: + image: pgvector/pgvector:pg17 + hostname: pgvector + container_name: pgvector + restart: always + ports: + - "5432:5432" + environment: + POSTGRES_USER: pgvector + POSTGRES_PASSWORD: pgvector + POSTGRES_DB: pgvector + TZ: Asia/Seoul + LANG: en_US.utf8 + volumes: + - pgvector_data:/var/lib/postgresql/data + - ./pgvector/init:/docker-entrypoint-initdb.d + +volumes: + pgvector_data: diff --git a/docker/pgvector/init/001_create_database.sql b/docker/pgvector/init/001_create_database.sql new file mode 100644 index 0000000..2173146 --- /dev/null +++ b/docker/pgvector/init/001_create_database.sql @@ -0,0 +1,2 @@ +CREATE DATABASE lang2sql; +CREATE DATABASE test; diff --git a/docker/pgvector/init/002_create_user_and_grant.sql b/docker/pgvector/init/002_create_user_and_grant.sql new file mode 100644 index 0000000..8da26fb --- /dev/null +++ b/docker/pgvector/init/002_create_user_and_grant.sql @@ -0,0 +1,5 @@ +CREATE USER lang2sql WITH PASSWORD 'lang2sqlpassword'; +GRANT ALL PRIVILEGES ON DATABASE lang2sql TO lang2sql; + +CREATE USER test WITH PASSWORD 'testpassword'; +GRANT ALL PRIVILEGES ON DATABASE test TO test; diff --git a/interface/app_pages/settings_sections/db_section.py b/interface/app_pages/settings_sections/db_section.py index db9da6b..2ebeec5 100644 --- a/interface/app_pages/settings_sections/db_section.py +++ b/interface/app_pages/settings_sections/db_section.py @@ -1,15 +1,16 @@ import os + import streamlit as st from interface.core.config import ( - get_db_connections_registry, add_db_connection, - update_db_connection, delete_db_connection, + get_db_connections_registry, + update_db_connection, update_db_settings, ) -from db_utils import get_db_connector, load_config_from_env - +from utils.databases import DatabaseFactory +from utils.databases.factory import load_config_from_env DB_TYPES = [ "postgresql", @@ -216,7 +217,8 @@ def render_db_section() -> None: update_db_settings( db_type=new_type, values=values, secrets=secrets ) - connector = get_db_connector(db_type=new_type) + + connector = DatabaseFactory.get_connector(db_type=new_type) # 간단한 SELECT 1 테스트 (DB마다 상이할 수 있음) test_sql = ( "SELECT 1" diff --git a/interface/core/result_renderer.py b/interface/core/result_renderer.py index 14101e8..e96b062 100644 --- a/interface/core/result_renderer.py +++ b/interface/core/result_renderer.py @@ -10,9 +10,9 @@ import streamlit as st from langchain_core.messages import AIMessage -from db_utils import get_db_connector from infra.observability.token_usage import TokenUtils from llm_utils.llm_response_parser import LLMResponseParser +from utils.databases import DatabaseFactory from viz.display_chart import DisplayChart @@ -154,7 +154,7 @@ def should_show(_key: str) -> bool: st.info("QUERY_MAKER 없이 실행되었습니다. 검색된 테이블 정보만 표시합니다.") if show_table_section or show_chart_section: - database = get_db_connector() + database = DatabaseFactory.get_connector() df = pd.DataFrame() try: sql_raw = ( diff --git a/llm_utils/tools/datahub.py b/llm_utils/tools/datahub.py index f6b0b32..42e564d 100644 --- a/llm_utils/tools/datahub.py +++ b/llm_utils/tools/datahub.py @@ -1,12 +1,12 @@ import os import re -from typing import List, Dict, Optional, TypeVar, Callable, Iterable, Any +from concurrent.futures import ThreadPoolExecutor +from typing import Callable, Dict, Iterable, List, Optional, TypeVar from langchain.schema import Document - -from data_utils.datahub_source import DatahubMetadataFetcher from tqdm import tqdm -from concurrent.futures import ThreadPoolExecutor + +from utils.data.datahub_source import DatahubMetadataFetcher T = TypeVar("T") R = TypeVar("R") diff --git a/pyproject.toml b/pyproject.toml index 3f310f7..9213be5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -67,14 +67,13 @@ include = [ [tool.hatch.build.targets.wheel] packages = [ "cli", - "data_utils", - "db_utils", "interface", "llm_utils", "engine", "infra", "viz", "prompt", + "utils", ] [tool.uv] @@ -82,4 +81,3 @@ dev-dependencies = [ "pre_commit==4.1.0", "pytest>=8.3.5", ] - diff --git a/utils/data/datahub_services/__init__.py b/utils/data/datahub_services/__init__.py new file mode 100644 index 0000000..2743ef9 --- /dev/null +++ b/utils/data/datahub_services/__init__.py @@ -0,0 +1,23 @@ +""" +DataHub 유틸리티 패키지 + +DataHub와의 상호작용을 위한 모듈들을 제공합니다. + +주요 구성요소: +- DataHubBaseClient: 기본 연결 및 통신 +- MetadataService: 메타데이터, 리니지, URN 관련 기능 +- QueryService: 쿼리 관련 기능 +- GlossaryService: 용어집 관련 기능 +""" + +from utils.data.datahub_services.base_client import DataHubBaseClient +from utils.data.datahub_services.glossary_service import GlossaryService +from utils.data.datahub_services.metadata_service import MetadataService +from utils.data.datahub_services.query_service import QueryService + +__all__ = [ + "DataHubBaseClient", + "MetadataService", + "QueryService", + "GlossaryService", +] diff --git a/data_utils/datahub_services/base_client.py b/utils/data/datahub_services/base_client.py similarity index 97% rename from data_utils/datahub_services/base_client.py rename to utils/data/datahub_services/base_client.py index 59f03b4..6637b4b 100644 --- a/data_utils/datahub_services/base_client.py +++ b/utils/data/datahub_services/base_client.py @@ -6,7 +6,6 @@ import requests from datahub.emitter.rest_emitter import DatahubRestEmitter -from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph class DataHubBaseClient: diff --git a/data_utils/datahub_services/glossary_service.py b/utils/data/datahub_services/glossary_service.py similarity index 98% rename from data_utils/datahub_services/glossary_service.py rename to utils/data/datahub_services/glossary_service.py index 05e5374..36eeec7 100644 --- a/data_utils/datahub_services/glossary_service.py +++ b/utils/data/datahub_services/glossary_service.py @@ -4,12 +4,12 @@ DataHub의 glossary 관련 기능을 제공합니다. """ -from data_utils.queries import ( - ROOT_GLOSSARY_NODES_QUERY, +from utils.data.datahub_services.base_client import DataHubBaseClient +from utils.data.queries import ( GLOSSARY_NODE_QUERY, GLOSSARY_TERMS_BY_URN_QUERY, + ROOT_GLOSSARY_NODES_QUERY, ) -from data_utils.datahub_services.base_client import DataHubBaseClient class GlossaryService: diff --git a/data_utils/datahub_services/metadata_service.py b/utils/data/datahub_services/metadata_service.py similarity index 99% rename from data_utils/datahub_services/metadata_service.py rename to utils/data/datahub_services/metadata_service.py index ba85f0d..5c5a4f9 100644 --- a/data_utils/datahub_services/metadata_service.py +++ b/utils/data/datahub_services/metadata_service.py @@ -4,15 +4,16 @@ 테이블 메타데이터, 리니지, URN 관련 기능을 제공합니다. """ +from collections import defaultdict + +from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph from datahub.metadata.schema_classes import ( DatasetPropertiesClass, SchemaMetadataClass, UpstreamLineageClass, ) -from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph -from collections import defaultdict -from data_utils.datahub_services.base_client import DataHubBaseClient +from utils.data.datahub_services.base_client import DataHubBaseClient class MetadataService: diff --git a/data_utils/datahub_services/query_service.py b/utils/data/datahub_services/query_service.py similarity index 97% rename from data_utils/datahub_services/query_service.py rename to utils/data/datahub_services/query_service.py index ee232fb..6622629 100644 --- a/data_utils/datahub_services/query_service.py +++ b/utils/data/datahub_services/query_service.py @@ -4,11 +4,12 @@ DataHub의 쿼리 관련 기능을 제공합니다. """ -from data_utils.queries import ( +from utils.data.datahub_services.base_client import DataHubBaseClient +from utils.data.queries import ( + GLOSSARY_TERMS_BY_URN_QUERY, LIST_QUERIES_QUERY, QUERIES_BY_URN_QUERY, ) -from data_utils.datahub_services.base_client import DataHubBaseClient class QueryService: @@ -154,7 +155,5 @@ def get_glossary_terms_by_urn(self, dataset_urn): Returns: dict: glossary terms 정보 """ - from data_utils.queries import GLOSSARY_TERMS_BY_URN_QUERY - variables = {"urn": dataset_urn} return self.client.execute_graphql_query(GLOSSARY_TERMS_BY_URN_QUERY, variables) diff --git a/data_utils/datahub_source.py b/utils/data/datahub_source.py similarity index 96% rename from data_utils/datahub_source.py rename to utils/data/datahub_source.py index 4d1fa3c..aeb261a 100644 --- a/data_utils/datahub_source.py +++ b/utils/data/datahub_source.py @@ -7,10 +7,10 @@ 기존 코드와의 완벽한 호환성을 보장합니다. """ -from data_utils.datahub_services.base_client import DataHubBaseClient -from data_utils.datahub_services.metadata_service import MetadataService -from data_utils.datahub_services.query_service import QueryService -from data_utils.datahub_services.glossary_service import GlossaryService +from utils.data.datahub_services.base_client import DataHubBaseClient +from utils.data.datahub_services.glossary_service import GlossaryService +from utils.data.datahub_services.metadata_service import MetadataService +from utils.data.datahub_services.query_service import QueryService class DatahubMetadataFetcher: diff --git a/data_utils/queries.py b/utils/data/queries.py similarity index 100% rename from data_utils/queries.py rename to utils/data/queries.py diff --git a/utils/databases/__init__.py b/utils/databases/__init__.py new file mode 100644 index 0000000..2a6cf19 --- /dev/null +++ b/utils/databases/__init__.py @@ -0,0 +1,14 @@ +""" +데이터베이스 유틸리티 패키지 초기화 모듈. + +이 모듈은 주요 구성 요소인 DatabaseFactory와 DBConfig를 외부로 노출하여 +데이터베이스 관련 기능을 손쉽게 사용할 수 있도록 합니다. +""" + +from utils.databases.config import DBConfig +from utils.databases.factory import DatabaseFactory + +__all__ = [ + "DatabaseFactory", + "DBConfig", +] diff --git a/utils/databases/config.py b/utils/databases/config.py new file mode 100644 index 0000000..0b48328 --- /dev/null +++ b/utils/databases/config.py @@ -0,0 +1,32 @@ +""" +데이터베이스 설정 정보를 정의하는 모듈. + +이 모듈은 데이터베이스 연결에 필요한 기본 설정값과 +추가 옵션(extra)을 포함한 타입 힌트를 제공합니다. +""" + +from typing import Dict, Optional, TypedDict + + +class DBConfig(TypedDict): + """ + 데이터베이스 연결 설정 정보를 표현하는 타입 딕셔너리. + + 데이터베이스 커넥터가 공통적으로 사용하는 설정 필드를 정의합니다. + 일부 필드는 선택적으로 제공될 수 있습니다. + + Attributes: + host (str): 데이터베이스 호스트명 또는 IP 주소. + port (Optional[int]): 데이터베이스 포트 번호. + user (Optional[str]): 접속 사용자명. + password (Optional[str]): 접속 비밀번호. + database (Optional[str]): 대상 데이터베이스 이름. + extra (Optional[Dict[str, str]]): 드라이버별 추가 설정값. + """ + + host: str + port: Optional[int] + user: Optional[str] + password: Optional[str] + database: Optional[str] + extra: Optional[Dict[str, str]] diff --git a/utils/databases/connector/base_connector.py b/utils/databases/connector/base_connector.py new file mode 100644 index 0000000..c1fbe90 --- /dev/null +++ b/utils/databases/connector/base_connector.py @@ -0,0 +1,58 @@ +""" +데이터베이스 커넥터의 기본 인터페이스 정의 모듈. + +이 모듈은 모든 DB 커넥터 클래스가 상속해야 하는 +공통 추상 클래스(BaseConnector)를 제공합니다. +""" + +from abc import ABC, abstractmethod + +import pandas as pd + + +class BaseConnector(ABC): + """ + 데이터베이스 커넥터의 기본 추상 클래스. + + 모든 구체적인 DB 커넥터(Postgres, MySQL 등)는 + 이 클래스를 상속받아 공통 메서드(`connect`, `run_sql`, `close`)를 구현해야 합니다. + + Attributes: + connection (Any): DB 연결 객체. 구체 클래스에서 초기화 및 관리됩니다. + """ + + connection = None + + @abstractmethod + def connect(self): + """ + 데이터베이스 연결을 수행합니다. + + 이 메서드는 각 DB별 커넥터에서 구체적으로 구현되어야 합니다. + """ + pass + + @abstractmethod + def run_sql(self, sql: str) -> pd.DataFrame: + """ + SQL 쿼리를 실행하고 결과를 반환합니다. + + Args: + sql (str): 실행할 SQL 쿼리 문자열. + + Returns: + pd.DataFrame: 쿼리 결과를 포함하는 데이터프레임. + """ + pass + + @abstractmethod + def close(self) -> None: + """ + 데이터베이스 연결을 종료합니다. + + 모든 리소스(커서, 연결 등)를 안전하게 해제해야 합니다. + + Raises: + RuntimeError: 연결 종료 중 예외가 발생한 경우. + """ + pass diff --git a/utils/databases/connector/clickhouse_connector.py b/utils/databases/connector/clickhouse_connector.py new file mode 100644 index 0000000..4e755b4 --- /dev/null +++ b/utils/databases/connector/clickhouse_connector.py @@ -0,0 +1,91 @@ +""" +ClickHouse 데이터베이스 커넥터 모듈. + +이 모듈은 ClickHouse 서버에 연결하여 SQL 쿼리를 실행하고, +그 결과를 pandas DataFrame 형태로 반환하는 기능을 제공합니다. +""" + +import pandas as pd +from clickhouse_driver import Client + +from utils.databases.config import DBConfig +from utils.databases.connector.base_connector import BaseConnector +from utils.databases.logger import logger + + +class ClickHouseConnector(BaseConnector): + """ + ClickHouse 데이터베이스 커넥터 클래스. + + ClickHouse 서버에 연결하고 SQL 쿼리를 실행하거나 연결을 종료하는 기능을 제공합니다. + """ + + client = None + + def __init__(self, config: DBConfig): + """ + ClickHouseConnector 인스턴스를 초기화합니다. + + Args: + config (DBConfig): ClickHouse 연결 설정 정보를 담은 객체. + """ + self.host = config["host"] + self.port = config["port"] + self.user = config["user"] + self.password = config["password"] + self.database = config["database"] + self.connect() + + def connect(self) -> None: + """ + ClickHouse 서버에 연결을 설정합니다. + + Raises: + ConnectionError: 서버 연결에 실패한 경우 발생합니다. + """ + try: + self.client = Client( + host=self.host, + port=self.port, + user=self.user, + password=self.password, + database=self.database, + ) + logger.info("Successfully connected to ClickHouse.") + except Exception as e: + logger.error("Failed to connect to ClickHouse: %s", e) + raise + + def run_sql(self, sql: str) -> pd.DataFrame: + """ + SQL 쿼리를 실행하고 결과를 DataFrame으로 반환합니다. + + Args: + sql (str): 실행할 SQL 쿼리 문자열. + + Returns: + pd.DataFrame: 쿼리 결과를 담은 DataFrame 객체. + + Raises: + RuntimeError: SQL 실행 중 오류가 발생한 경우. + """ + if self.client is None: + self.connect() + + try: + result = self.client.query_dataframe(sql) + return result + except Exception as e: + logger.error("Failed to execute SQL query: %s", e) + raise + + def close(self) -> None: + """ + ClickHouse 서버와의 연결을 종료합니다. + + 연결이 존재할 경우 안전하게 닫고 리소스를 해제합니다. + """ + if self.client: + self.client.disconnect() + logger.info("Connection to ClickHouse closed.") + self.client = None diff --git a/utils/databases/connector/databricks_connector.py b/utils/databases/connector/databricks_connector.py new file mode 100644 index 0000000..34ddcfb --- /dev/null +++ b/utils/databases/connector/databricks_connector.py @@ -0,0 +1,99 @@ +""" +Databricks SQL Warehouse 커넥터 모듈. + +이 모듈은 Databricks SQL Warehouse에 연결하여 SQL 쿼리를 실행하고, +결과를 pandas DataFrame 형태로 반환하는 기능을 제공합니다. +""" + +import pandas as pd +from databricks import sql + +from utils.databases.config import DBConfig +from utils.databases.connector.base_connector import BaseConnector +from utils.databases.logger import logger + + +class DatabricksConnector(BaseConnector): + """ + Databricks SQL Warehouse 커넥터 클래스. + + Databricks SQL 엔드포인트에 연결하여 쿼리를 실행하고, + 결과를 DataFrame으로 반환하는 기능을 제공합니다. + """ + + connection = None + + def __init__(self, config: DBConfig): + """ + DatabricksConnector 인스턴스를 초기화합니다. + + Args: + config (DBConfig): Databricks 연결 정보를 담은 설정 객체. + - 필수 키: host, extra.http_path, extra.access_token + - 선택 키: extra.catalog, extra.schema + """ + self.server_hostname = config["host"] + self.http_path = config["extra"]["http_path"] + self.access_token = config["extra"]["access_token"] + self.catalog = config.get("extra", {}).get("catalog") + self.schema = config.get("extra", {}).get("schema") + self.connect() + + def connect(self) -> None: + """ + Databricks SQL Warehouse에 연결을 설정합니다. + + Raises: + ConnectionError: 연결 설정 중 오류가 발생한 경우. + """ + try: + self.connection = sql.connect( + server_hostname=self.server_hostname, + http_path=self.http_path, + access_token=self.access_token, + catalog=self.catalog, + schema=self.schema, + ) + logger.info("Successfully connected to Databricks.") + except Exception as e: + logger.error("Failed to connect to Databricks: %s", e) + raise + + def run_sql(self, sql: str) -> pd.DataFrame: + """ + SQL 쿼리를 실행하고 결과를 pandas DataFrame으로 반환합니다. + + Args: + sql (str): 실행할 SQL 쿼리 문자열. + + Returns: + pd.DataFrame: 쿼리 결과를 담은 DataFrame 객체. + + Raises: + RuntimeError: SQL 실행 중 오류가 발생한 경우. + """ + if self.connection is None: + self.connect() + + try: + cursor = self.connection.cursor() + cursor.execute(sql) + columns = [desc[0] for desc in cursor.description] + rows = cursor.fetchall() + return pd.DataFrame(rows, columns=columns) + except Exception as e: + logger.error("Failed to execute SQL query: %s", e) + raise + finally: + cursor.close() + + def close(self) -> None: + """ + Databricks SQL Warehouse와의 연결을 종료합니다. + + 연결이 존재할 경우 안전하게 닫고 리소스를 해제합니다. + """ + if self.connection: + self.connection.close() + logger.info("Connection to Databricks closed.") + self.connection = None diff --git a/utils/databases/connector/duckdb_connector.py b/utils/databases/connector/duckdb_connector.py new file mode 100644 index 0000000..7ad7f2a --- /dev/null +++ b/utils/databases/connector/duckdb_connector.py @@ -0,0 +1,79 @@ +""" +DuckDB 데이터베이스 커넥터 모듈. + +이 모듈은 DuckDB 데이터베이스에 연결하여 SQL 쿼리를 실행하고, +결과를 pandas DataFrame 형태로 반환하는 기능을 제공합니다. +""" + +import duckdb +import pandas as pd + +from utils.databases.config import DBConfig +from utils.databases.connector.base_connector import BaseConnector +from utils.databases.logger import logger + + +class DuckDBConnector(BaseConnector): + """ + DuckDB 데이터베이스 커넥터 클래스. + + DuckDB 데이터베이스에 연결하여 쿼리를 실행하고, + 결과를 DataFrame 형태로 반환하거나 연결을 종료하는 기능을 제공합니다. + """ + + connection = None + + def __init__(self, config: DBConfig): + """ + DuckDBConnector 인스턴스를 초기화합니다. + + Args: + config (DBConfig): DuckDB 연결 정보를 담은 설정 객체. + `path` 키를 사용하여 파일 경로를 지정하거나, `:memory:`를 사용하여 인메모리 DB로 설정합니다. + """ + self.database = config.get("path", ":memory:") + self.connect() + + def connect(self) -> None: + """ + DuckDB 데이터베이스에 연결을 설정합니다. + + Raises: + ConnectionError: DuckDB 연결에 실패한 경우 발생합니다. + """ + try: + self.connection = duckdb.connect(database=self.database) + logger.info("Successfully connected to DuckDB.") + except Exception as e: + logger.error("Failed to connect to DuckDB: %s", e) + raise + + def run_sql(self, sql: str) -> pd.DataFrame: + """ + SQL 쿼리를 실행하고 결과를 pandas DataFrame으로 반환합니다. + + Args: + sql (str): 실행할 SQL 쿼리 문자열. + + Returns: + pd.DataFrame: 쿼리 결과를 담은 DataFrame 객체. + + Raises: + RuntimeError: SQL 실행 중 오류가 발생한 경우. + """ + try: + return self.connection.execute(sql).fetchdf() + except Exception as e: + logger.error("Failed to execute SQL query: %s", e) + raise + + def close(self) -> None: + """ + DuckDB 데이터베이스 연결을 종료합니다. + + 연결이 존재할 경우 안전하게 닫고 리소스를 해제합니다. + """ + if self.connection: + self.connection.close() + logger.info("Connection to DuckDB closed.") + self.connection = None diff --git a/utils/databases/connector/mariadb_connector.py b/utils/databases/connector/mariadb_connector.py new file mode 100644 index 0000000..d6120ef --- /dev/null +++ b/utils/databases/connector/mariadb_connector.py @@ -0,0 +1,94 @@ +""" +MariaDB 데이터베이스 커넥터 모듈. + +이 모듈은 mysql-connector-python을 사용하여 MariaDB 서버에 연결하고, +SQL 쿼리를 실행하여 pandas DataFrame 형태로 결과를 반환하는 기능을 제공합니다. +""" + +import mysql.connector +import pandas as pd + +from utils.databases.config import DBConfig +from utils.databases.connector.base_connector import BaseConnector +from utils.databases.logger import logger + + +class MariaDBConnector(BaseConnector): + """ + MariaDB 데이터베이스 커넥터 클래스. + + mysql-connector-python을 이용해 MariaDB 서버에 연결하고, + SQL 쿼리를 실행하거나 연결을 종료하는 기능을 제공합니다. + """ + + connection = None + + def __init__(self, config: DBConfig): + """ + MariaDBConnector 인스턴스를 초기화합니다. + + Args: + config (DBConfig): MariaDB 연결 정보를 담은 설정 객체. + """ + self.host = config["host"] + self.port = config.get("port", 3306) + self.user = config["user"] + self.password = config["password"] + self.database = config["database"] + self.connect() + + def connect(self) -> None: + """ + mysql-connector-python을 사용하여 MariaDB 서버에 연결을 설정합니다. + + Raises: + ConnectionError: MariaDB 서버 연결에 실패한 경우 발생합니다. + """ + try: + self.connection = mysql.connector.connect( + host=self.host, + port=self.port, + user=self.user, + password=self.password, + database=self.database, + ) + logger.info("Successfully connected to MariaDB.") + except Exception as e: + logger.error("Failed to connect to MariaDB: %s", e) + raise + + def run_sql(self, sql: str) -> pd.DataFrame: + """ + SQL 쿼리를 실행하고 결과를 pandas DataFrame으로 반환합니다. + + Args: + sql (str): 실행할 SQL 쿼리 문자열. + + Returns: + pd.DataFrame: 쿼리 결과를 담은 DataFrame 객체. + + Raises: + RuntimeError: SQL 실행 중 오류가 발생한 경우. + """ + try: + cursor = self.connection.cursor() + cursor.execute(sql) + columns = [column[0] for column in cursor.description] + rows = cursor.fetchall() + return pd.DataFrame(rows, columns=columns) + except Exception as e: + logger.error("Failed to execute SQL query: %s", e) + raise + finally: + cursor.close() + + def close(self) -> None: + """ + MariaDB 서버와의 연결을 종료합니다. + + 연결이 존재할 경우 안전하게 닫고 리소스를 해제합니다. + """ + if self.connection: + self.connection.close() + logger.info("Connection to MariaDB closed.") + self.connection = None diff --git a/db_utils/mysql_connector.py b/utils/databases/connector/mysql_connector.py similarity index 51% rename from db_utils/mysql_connector.py rename to utils/databases/connector/mysql_connector.py index 2bcd2df..1c02a11 100644 --- a/db_utils/mysql_connector.py +++ b/utils/databases/connector/mysql_connector.py @@ -1,23 +1,33 @@ +""" +MySQL 데이터베이스 커넥터 모듈. + +이 모듈은 MySQL 서버에 연결하여 SQL 쿼리를 실행하고, +그 결과를 pandas DataFrame 형태로 반환하는 기능을 제공합니다. +""" + import mysql.connector import pandas as pd -from .base_connector import BaseConnector -from .config import DBConfig -from .logger import logger + +from utils.databases.config import DBConfig +from utils.databases.connector.base_connector import BaseConnector +from utils.databases.logger import logger class MySQLConnector(BaseConnector): """ - Connect to MySQL and execute SQL queries. + MySQL 데이터베이스 커넥터 클래스. + + MySQL 서버에 연결하여 SQL 쿼리를 실행하거나 연결을 종료하는 기능을 제공합니다. """ connection = None def __init__(self, config: DBConfig): """ - Initialize the MySQLConnector with connection parameters. + MySQLConnector 인스턴스를 초기화합니다. - Parameters: - config (DBConfig): Configuration object containing connection parameters. + Args: + config (DBConfig): MySQL 연결 정보를 담은 설정 객체. """ self.host = config["host"] self.port = config.get("port", 3306) @@ -28,7 +38,10 @@ def __init__(self, config: DBConfig): def connect(self) -> None: """ - Establish a connection to the MySQL server. + MySQL 서버에 연결을 설정합니다. + + Raises: + ConnectionError: MySQL 서버 연결에 실패한 경우 발생합니다. """ try: self.connection = mysql.connector.connect( @@ -40,18 +53,21 @@ def connect(self) -> None: ) logger.info("Successfully connected to MySQL.") except Exception as e: - logger.error(f"Failed to connect to MySQL: {e}") + logger.error("Failed to connect to MySQL: %s", e) raise def run_sql(self, sql: str) -> pd.DataFrame: """ - Execute a SQL query and return the result as a pandas DataFrame. + SQL 쿼리를 실행하고 결과를 pandas DataFrame으로 반환합니다. - Parameters: - sql (str): SQL query string to be executed. + Args: + sql (str): 실행할 SQL 쿼리 문자열. Returns: - pd.DataFrame: Result of the SQL query as a pandas DataFrame. + pd.DataFrame: 쿼리 결과를 담은 DataFrame 객체. + + Raises: + RuntimeError: SQL 실행 중 오류가 발생한 경우. """ try: cursor = self.connection.cursor() @@ -60,14 +76,16 @@ def run_sql(self, sql: str) -> pd.DataFrame: rows = cursor.fetchall() return pd.DataFrame(rows, columns=columns) except Exception as e: - logger.error(f"Failed to execute SQL query: {e}") + logger.error("Failed to execute SQL query: %s", e) raise finally: cursor.close() def close(self) -> None: """ - Close the connection to the MySQL server. + MySQL 서버와의 연결을 종료합니다. + + 연결이 존재할 경우 안전하게 닫고 리소스를 해제합니다. """ if self.connection: self.connection.close() diff --git a/utils/databases/connector/oracle_connector.py b/utils/databases/connector/oracle_connector.py new file mode 100644 index 0000000..915614f --- /dev/null +++ b/utils/databases/connector/oracle_connector.py @@ -0,0 +1,93 @@ +""" +Oracle 데이터베이스 커넥터 모듈. + +이 모듈은 Oracle 데이터베이스에 연결하여 SQL 쿼리를 실행하고, +결과를 pandas DataFrame 형태로 반환하는 기능을 제공합니다. +""" + +import oracledb +import pandas as pd + +from utils.databases.config import DBConfig +from utils.databases.connector.base_connector import BaseConnector +from utils.databases.logger import logger + + +class OracleConnector(BaseConnector): + """ + Oracle 데이터베이스 커넥터 클래스. + + Oracle 서버에 연결하여 SQL 쿼리를 실행하거나 연결을 종료하는 기능을 제공합니다. + """ + + connection = None + + def __init__(self, config: DBConfig): + """ + OracleConnector 인스턴스를 초기화합니다. + + Args: + config (DBConfig): Oracle 연결 정보를 담은 설정 객체. + - 필수 키: host, port, user, password + - 선택 키: extra.service_name (기본값: "orcl") + """ + self.host = config["host"] + self.port = config["port"] + self.user = config["user"] + self.password = config["password"] + self.service_name = config.get("extra").get("service_name", "orcl") + self.connect() + + def connect(self) -> None: + """ + Oracle 데이터베이스에 연결을 설정합니다. + + Raises: + ConnectionError: Oracle 서버 연결에 실패한 경우 발생합니다. + """ + try: + self.connection = oracledb.connect( + user=self.user, + password=self.password, + dsn=f"{self.host}:{self.port}/{self.service_name}", + ) + logger.info("Successfully connected to Oracle.") + except Exception as e: + logger.error("Failed to connect to Oracle: %s", e) + raise + + def run_sql(self, sql: str) -> pd.DataFrame: + """ + SQL 쿼리를 실행하고 결과를 pandas DataFrame으로 반환합니다. + + Args: + sql (str): 실행할 SQL 쿼리 문자열. + + Returns: + pd.DataFrame: 쿼리 결과를 담은 DataFrame 객체. + + Raises: + RuntimeError: SQL 실행 중 오류가 발생한 경우. + """ + try: + cursor = self.connection.cursor() + cursor.execute(sql) + columns = [desc[0] for desc in cursor.description] + rows = cursor.fetchall() + return pd.DataFrame(rows, columns=columns) + except Exception as e: + logger.error("Failed to execute SQL query: %s", e) + raise + finally: + cursor.close() + + def close(self) -> None: + """ + Oracle 데이터베이스 연결을 종료합니다. + + 연결이 존재할 경우 안전하게 닫고 리소스를 해제합니다. + """ + if self.connection: + self.connection.close() + logger.info("Connection to Oracle closed.") + self.connection = None diff --git a/db_utils/postgres_connector.py b/utils/databases/connector/postgres_connector.py similarity index 50% rename from db_utils/postgres_connector.py rename to utils/databases/connector/postgres_connector.py index fb31013..a94ae4f 100644 --- a/db_utils/postgres_connector.py +++ b/utils/databases/connector/postgres_connector.py @@ -1,23 +1,33 @@ -import psycopg2 +""" +PostgreSQL 데이터베이스 커넥터 모듈. + +이 모듈은 PostgreSQL 서버에 연결하여 SQL 쿼리를 실행하고, +결과를 pandas DataFrame 형태로 반환하는 기능을 제공합니다. +""" + import pandas as pd -from .base_connector import BaseConnector -from .config import DBConfig -from .logger import logger +import psycopg2 + +from utils.databases.config import DBConfig +from utils.databases.connector.base_connector import BaseConnector +from utils.databases.logger import logger class PostgresConnector(BaseConnector): """ - Connect to PostgreSQL and execute SQL queries. + PostgreSQL 데이터베이스 커넥터 클래스. + + PostgreSQL 서버에 연결하고 SQL 쿼리를 실행하거나 연결을 종료하는 기능을 제공합니다. """ connection = None def __init__(self, config: DBConfig): """ - Initialize the PostgresConnector with connection parameters. + PostgresConnector 인스턴스를 초기화합니다. - Parameters: - config (DBConfig): Configuration object containing connection parameters. + Args: + config (DBConfig): PostgreSQL 연결 정보를 담은 설정 객체. """ self.host = config["host"] self.port = config["port"] @@ -28,7 +38,10 @@ def __init__(self, config: DBConfig): def connect(self) -> None: """ - Establish a connection to the PostgreSQL server. + PostgreSQL 서버에 연결합니다. + + Raises: + ConnectionError: 서버 연결에 실패한 경우 발생합니다. """ try: self.connection = psycopg2.connect( @@ -40,18 +53,21 @@ def connect(self) -> None: ) logger.info("Successfully connected to PostgreSQL.") except Exception as e: - logger.error(f"Failed to connect to PostgreSQL: {e}") + logger.error("Failed to connect to PostgreSQL: %s", e) raise def run_sql(self, sql: str) -> pd.DataFrame: """ - Execute a SQL query and return the result as a pandas DataFrame. + SQL 쿼리를 실행하고 결과를 DataFrame으로 반환합니다. - Parameters: - sql (str): SQL query string to be executed. + Args: + sql (str): 실행할 SQL 쿼리 문자열. Returns: - pd.DataFrame: Result of the SQL query as a pandas DataFrame. + pd.DataFrame: 쿼리 결과를 담은 DataFrame 객체. + + Raises: + RuntimeError: SQL 실행 중 오류가 발생한 경우. """ try: cursor = self.connection.cursor() @@ -60,14 +76,16 @@ def run_sql(self, sql: str) -> pd.DataFrame: rows = cursor.fetchall() return pd.DataFrame(rows, columns=columns) except Exception as e: - logger.error(f"Failed to execute SQL query: {e}") + logger.error("Failed to execute SQL query: %s", e) raise finally: cursor.close() def close(self) -> None: """ - Close the connection to the PostgreSQL server. + PostgreSQL 서버와의 연결을 종료합니다. + + 연결이 존재할 경우 안전하게 닫고 리소스를 해제합니다. """ if self.connection: self.connection.close() diff --git a/db_utils/snowflake_connector.py b/utils/databases/connector/snowflake_connector.py similarity index 52% rename from db_utils/snowflake_connector.py rename to utils/databases/connector/snowflake_connector.py index 401233e..69ef907 100644 --- a/db_utils/snowflake_connector.py +++ b/utils/databases/connector/snowflake_connector.py @@ -1,25 +1,35 @@ -from .base_connector import BaseConnector -from snowflake import connector +""" +Snowflake 데이터베이스 커넥터 모듈. + +이 모듈은 Snowflake 데이터베이스에 연결하여 SQL 쿼리를 실행하고, +결과를 pandas DataFrame 형태로 반환하는 기능을 제공합니다. +""" + import pandas as pd -from .config import DBConfig -from .logger import logger +from snowflake import connector + +from utils.databases.config import DBConfig +from utils.databases.connector.base_connector import BaseConnector +from utils.databases.logger import logger class SnowflakeConnector(BaseConnector): """ - Connect to Snowflake database and execute SQL queries. + Snowflake 데이터베이스 커넥터 클래스. + + Snowflake 서버에 연결하여 SQL 쿼리를 실행하거나 연결을 종료하는 기능을 제공합니다. """ connection = None def __init__(self, config: DBConfig): """ - Initialize the SnowflakeConnector with connection parameters. + SnowflakeConnector 인스턴스를 초기화합니다. - Parameters: - config (DBConfig): Configuration object containing connection parameters. - Required: user, password, extra.account - Optional: extra.warehouse, database, extra.schema + Args: + config (DBConfig): Snowflake 연결 정보를 담은 설정 객체. + - 필수 키: user, password, extra.account + - 선택 키: extra.warehouse, database, extra.schema """ self.user = config["user"] self.password = config["password"] @@ -31,7 +41,10 @@ def __init__(self, config: DBConfig): def connect(self) -> None: """ - Establish a connection to the Snowflake server. + Snowflake 데이터베이스에 연결을 설정합니다. + + Raises: + ConnectionError: Snowflake 서버 연결에 실패한 경우 발생합니다. """ try: self.connection = connector.connect( @@ -45,18 +58,21 @@ def connect(self) -> None: logger.info("Successfully connected to Snowflake.") self.cursor = self.connection.cursor() except Exception as e: - logger.error(f"Failed to connect to Snowflake: {e}") + logger.error("Failed to connect to Snowflake: %s", e) raise def run_sql(self, sql: str) -> pd.DataFrame: """ - Execute a SQL query and return the result as a pandas DataFrame. + SQL 쿼리를 실행하고 결과를 pandas DataFrame으로 반환합니다. - Parameters: - sql (str): SQL query string to be executed. + Args: + sql (str): 실행할 SQL 쿼리 문자열. Returns: - pd.DataFrame: Result of the SQL query as a pandas DataFrame. + pd.DataFrame: 쿼리 결과를 담은 DataFrame 객체. + + Raises: + RuntimeError: SQL 실행 중 오류가 발생한 경우. """ if self.connection is None: self.connect() @@ -69,14 +85,16 @@ def run_sql(self, sql: str) -> pd.DataFrame: data = self.cursor.fetchall() return pd.DataFrame(data, columns=columns) except Exception as e: - logger.error(f"Failed to execute SQL query: {e}") + logger.error("Failed to execute SQL query: %s", e) raise finally: cursor.close() def close(self) -> None: """ - Close the connection to the Snowflake server. + Snowflake 데이터베이스 연결을 종료합니다. + + 연결이 존재할 경우 안전하게 닫고 리소스를 해제합니다. """ if self.connection: self.connection.close() diff --git a/utils/databases/connector/sqlite_connector.py b/utils/databases/connector/sqlite_connector.py new file mode 100644 index 0000000..ca5b138 --- /dev/null +++ b/utils/databases/connector/sqlite_connector.py @@ -0,0 +1,90 @@ +""" +SQLite 데이터베이스 커넥터 모듈. + +이 모듈은 SQLite 데이터베이스에 연결하여 SQL 쿼리를 실행하고, +그 결과를 pandas DataFrame 형태로 반환하는 기능을 제공합니다. +""" + +import sqlite3 + +import pandas as pd + +from utils.databases.config import DBConfig +from utils.databases.connector.base_connector import BaseConnector +from utils.databases.logger import logger + + +class SQLiteConnector(BaseConnector): + """ + SQLite 데이터베이스 커넥터 클래스. + + SQLite 파일 또는 인메모리 데이터베이스에 연결하여 + SQL 쿼리를 실행하거나 연결을 종료하는 기능을 제공합니다. + """ + + connection = None + + def __init__(self, config: DBConfig): + """ + SQLiteConnector 인스턴스를 초기화합니다. + + Args: + config (DBConfig): SQLite 연결 정보를 담은 설정 객체. + - `path` 키를 사용하여 SQLite 파일 경로를 지정합니다. + - 값이 None 또는 ":memory:"인 경우 인메모리 데이터베이스를 생성합니다. + """ + self.database = config.get("path", ":memory:") + self.connect() + + def connect(self) -> None: + """ + SQLite 데이터베이스에 연결을 설정합니다. + + Raises: + ConnectionError: 데이터베이스 연결에 실패한 경우 발생합니다. + """ + try: + self.connection = sqlite3.connect(self.database) + logger.info("Successfully connected to SQLite (%s).", self.database) + except Exception as e: + logger.error("Failed to connect to SQLite: %s", e) + raise + + def run_sql(self, sql: str) -> pd.DataFrame: + """ + SQL 쿼리를 실행하고 결과를 pandas DataFrame으로 반환합니다. + + Args: + sql (str): 실행할 SQL 쿼리 문자열. + + Returns: + pd.DataFrame: 쿼리 결과를 담은 DataFrame 객체. + + Raises: + RuntimeError: SQL 실행 중 오류가 발생한 경우. + """ + if self.connection is None: + self.connect() + + try: + cursor = self.connection.cursor() + cursor.execute(sql) + columns = [col[0] for col in cursor.description] + rows = cursor.fetchall() + return pd.DataFrame(rows, columns=columns) + except Exception as e: + logger.error("Failed to execute SQL query: %s", e) + raise + finally: + cursor.close() + + def close(self) -> None: + """ + SQLite 데이터베이스 연결을 종료합니다. + + 연결이 존재할 경우 안전하게 닫고 리소스를 해제합니다. + """ + if self.connection: + self.connection.close() + logger.info("Connection to SQLite closed.") + self.connection = None diff --git a/utils/databases/connector/trino_connector.py b/utils/databases/connector/trino_connector.py new file mode 100644 index 0000000..521c74e --- /dev/null +++ b/utils/databases/connector/trino_connector.py @@ -0,0 +1,132 @@ +""" +Trino 데이터베이스 커넥터 모듈. + +이 모듈은 Trino 클러스터에 연결하여 SQL 쿼리를 실행하고, +그 결과를 pandas DataFrame 형태로 반환하는 기능을 제공합니다. +""" + +import pandas as pd + +from utils.databases.config import DBConfig +from utils.databases.connector.base_connector import BaseConnector +from utils.databases.logger import logger + + +class TrinoConnector(BaseConnector): + """ + Trino 데이터베이스 커넥터 클래스. + + Trino 클러스터에 연결하여 SQL 쿼리를 실행하거나 + 연결을 종료하는 기능을 제공합니다. + """ + + connection = None + + def __init__(self, config: DBConfig): + """ + TrinoConnector 인스턴스를 초기화합니다. + + Args: + config (DBConfig): Trino 연결 정보를 담은 설정 객체. + - 필수 키: host, port + - 선택 키: user, password, database, extra.catalog, extra.schema, extra.http_scheme + - database가 "catalog.schema" 형태일 경우 자동으로 분리되어 설정됩니다. + """ + # pylint: disable=import-outside-toplevel + try: + import trino + + self.trino = trino + except ImportError as e: + logger.error( + "Trino 드라이버가 설치되어 있지 않습니다. pip install trino 명령을 실행하세요." + ) + raise ImportError("Trino 라이브러리가 설치되어 있지 않습니다.") from e + + self.host = config["host"] + self.port = config["port"] or 8080 + self.user = config.get("user") or "anonymous" + self.password = config.get("password") + self.database = config.get("database") # e.g., catalog.schema + self.extra = config.get("extra") or {} + self.http_scheme = self.extra.get("http_scheme", "http") + self.catalog = self.extra.get("catalog") + self.schema = self.extra.get("schema") + + # If database given as "catalog.schema", split into fields + if self.database and (not self.catalog or not self.schema): + if "." in self.database: + db_catalog, db_schema = self.database.split(".", 1) + self.catalog = self.catalog or db_catalog + self.schema = self.schema or db_schema + + self.connect() + + def connect(self) -> None: + """ + Trino 클러스터에 연결을 설정합니다. + + Raises: + ImportError: trino 드라이버를 불러오지 못한 경우 발생합니다. + ConnectionError: Trino 서버 연결에 실패한 경우 발생합니다. + """ + try: + auth = None + if self.password and self.http_scheme == "https": + auth = self.trino.auth.BasicAuthentication(self.user, self.password) + + self.connection = self.trino.dbapi.connect( + host=self.host, + port=self.port, + user=self.user, + http_scheme=self.http_scheme, + catalog=self.catalog, + schema=self.schema, + auth=auth, + ) + logger.info("Successfully connected to Trino.") + except Exception as e: + logger.error("Failed to connect to Trino: %s", e) + raise + + def run_sql(self, sql: str) -> pd.DataFrame: + """ + SQL 쿼리를 실행하고 결과를 pandas DataFrame으로 반환합니다. + + Args: + sql (str): 실행할 SQL 쿼리 문자열. + + Returns: + pd.DataFrame: 쿼리 결과를 담은 DataFrame 객체. + + Raises: + RuntimeError: SQL 실행 중 오류가 발생한 경우. + """ + try: + cursor = self.connection.cursor() + cursor.execute(sql) + columns = ( + [desc[0] for desc in cursor.description] if cursor.description else [] + ) + rows = cursor.fetchall() if cursor.description else [] + return pd.DataFrame(rows, columns=columns) + except Exception as e: + logger.error("Failed to execute SQL query on Trino: %s", e) + raise + finally: + try: + cursor.close() + logger.info("Cursor closed successfully.") + except Exception as e: # pylint: disable=broad-exception-caught + logger.error("Failed to close cursor: %s", e) + + def close(self) -> None: + """ + Trino 클러스터와의 연결을 종료합니다. + + 연결이 존재할 경우 안전하게 닫고 리소스를 해제합니다. + """ + if self.connection: + self.connection.close() + logger.info("Connection to Trino closed.") + self.connection = None diff --git a/utils/databases/factory.py b/utils/databases/factory.py new file mode 100644 index 0000000..4bfcb5b --- /dev/null +++ b/utils/databases/factory.py @@ -0,0 +1,115 @@ +""" +데이터베이스 커넥터 팩토리 모듈. + +이 모듈은 DB 타입에 따라 알맞은 커넥터 클래스를 동적으로 로드하여 +해당 DB에 연결할 수 있는 인스턴스를 생성하는 기능을 제공합니다. +환경변수로부터 접속 설정을 자동으로 로드하는 유틸리티 함수도 포함합니다. +""" + +import importlib +import inspect +import os +from typing import Optional + +from utils.databases.config import DBConfig +from utils.databases.logger import logger + + +class DatabaseFactory: + """ + 데이터베이스 커넥터 팩토리 클래스. + + DB 타입에 따라 알맞은 Connector 클래스를 동적으로 로드하고, + 해당 인스턴스를 반환하는 기능을 제공합니다. + """ + + @staticmethod + def get_connector(db_type: Optional[str] = None, config: Optional[DBConfig] = None): + """ + 주어진 DB 타입에 해당하는 Connector 인스턴스를 반환합니다. + + 지정된 DB 타입에 맞는 커넥터 모듈을 동적으로 로드하고, + 해당 모듈 내의 Connector 클래스를 탐색하여 인스턴스를 생성합니다. + DB 타입이 지정되지 않은 경우 환경 변수(DB_TYPE)에서 자동으로 가져옵니다. + + Args: + db_type (Optional[str]): 데이터베이스 타입 문자열 (예: 'postgres', 'mysql', 'trino'). + config (Optional[DBConfig]): 데이터베이스 연결 설정 객체. + 지정되지 않은 경우 환경 변수에서 자동으로 로드됩니다. + + Returns: + BaseConnector: 주어진 DB 타입에 해당하는 Connector 인스턴스. + + Raises: + ValueError: DB_TYPE이 지정되지 않았거나, + 지원되지 않는 DB 타입이거나, + 모듈 또는 Connector 클래스를 찾을 수 없는 경우. + """ + if not db_type: + db_type = os.getenv("DB_TYPE") + if not db_type: + raise ValueError("DB_TYPE이 환경변수 또는 인자로 제공되어야 합니다.") + db_type = db_type.lower() + + if not config: + config = load_config_from_env(db_type.upper()) + + try: + module_name = f"utils.databases.connector.{db_type}_connector" + module = importlib.import_module(module_name) + + connector_class = None + for name, cls in inspect.getmembers(module, inspect.isclass): + if name.lower() == f"{db_type}connector": + connector_class = cls + break + if connector_class is None: + raise ValueError(f"No matching Connector class found for {db_type}") + except (ImportError, AttributeError) as e: + logger.error( + "지원되지 않는 DB 타입이거나 모듈을 로드할 수 없습니다: %s", + db_type, + ) + raise ValueError(f"Unsupported DB type: {db_type}") from e + + return connector_class(config) + + +def load_config_from_env(prefix: str) -> DBConfig: + """ + 환경변수에서 데이터베이스 접속 설정을 로드합니다. + + Args: + prefix (str): 환경변수 접두어 (예: 'POSTGRES', 'MYSQL'). + + Returns: + DBConfig: 환경변수에서 로드된 설정 정보를 담은 DBConfig 객체. + """ + base_keys = { + "HOST", + "PORT", + "USER", + "PASSWORD", + "DATABASE", + } + config = { + "host": os.getenv(f"{prefix}_HOST"), + "port": ( + int(os.getenv(f"{prefix}_PORT")) if os.getenv(f"{prefix}_PORT") else None + ), + "user": os.getenv(f"{prefix}_USER"), + "password": os.getenv(f"{prefix}_PASSWORD"), + "database": os.getenv(f"{prefix}_DATABASE"), + } + + extra = {} + for key, value in os.environ.items(): + if ( + key.startswith(f"{prefix}_") + and key.split("_", 1)[1].upper() not in base_keys + ): + extra[key[len(prefix) + 1 :].lower()] = value + if extra: + config["extra"] = extra + + return DBConfig(**config) diff --git a/db_utils/logger.py b/utils/databases/logger.py similarity index 50% rename from db_utils/logger.py rename to utils/databases/logger.py index f2c3104..f9d0404 100644 --- a/db_utils/logger.py +++ b/utils/databases/logger.py @@ -1,3 +1,10 @@ +""" +로깅 설정 모듈. + +이 모듈은 애플리케이션 전역에서 사용할 기본 로깅 설정을 정의하고, +표준 로거 인스턴스(logger)를 제공합니다. +""" + import logging logging.basicConfig(