# PRACTISE

In [13]:
from pathlib import Path
import sys

# Get current working directory
cwd = Path.cwd()
print(f"Current working directory: {cwd}")

# Get parent directory (main project folder)
main_folder = cwd.parent
print(f"Main project folder: {main_folder}")

# Add the main folder to sys.path
sys.path.append(str(main_folder))
print(f"Added to sys.path: {main_folder}")

# # Verify sys.path
# print("\nPaths in sys.path:")
# for path in sys.path:
#     print(f"  - {path}")

Current working directory: /Users/vamsi_mbmax/Library/CloudStorage/OneDrive-Personal/01_vam_PROJECTS/LEARNING/proj_PersonalProjects/dev/pract_pp_etl_based_chatbot/notebooks
Main project folder: /Users/vamsi_mbmax/Library/CloudStorage/OneDrive-Personal/01_vam_PROJECTS/LEARNING/proj_PersonalProjects/dev/pract_pp_etl_based_chatbot
Added to sys.path: /Users/vamsi_mbmax/Library/CloudStorage/OneDrive-Personal/01_vam_PROJECTS/LEARNING/proj_PersonalProjects/dev/pract_pp_etl_based_chatbot



## CHATBOT


### bot.py


In [14]:
from utils.logger import setup_logger, log_structured

# create a logger for this module
logger = setup_logger(__name__)

In [15]:
class Chatbot:
    def __init__(self, model_name="gpt-4o-mini"):
        logger.info(f"Initializing chatbot with model: {model_name}")
        self.model_name = model_name

    def process_query(self, query):
        log_structured(
            logger,
            "info",
            "process_query_start",
            query_length=len(query),
            query_preview=query[:50] if len(query) > 50 else query,
        )
        try:
            response = f"processed response for: {query}"
            logger.debug("Query processed successfully")
            log_structured(
                logger, "debug", "process_query_complete", response_length=len(response)
            )
            return response
        except Exception as e:
            log_structured(
                logger,
                "error",
                "process_query_error",
                error=str(e),
                query_length=len(query),
            )
            return "I'm sorry, i couldn't process your request"


bot = Chatbot()
bot.process_query("SELECT * FROM weather_forecast;")

2025-03-25 21:07:11,434 - __main__ - INFO - Initializing chatbot with model: gpt-4o-mini
2025-03-25 21:07:11,434 - __main__ - INFO - Initializing chatbot with model: gpt-4o-mini
2025-03-25 21:07:11,435 - __main__ - INFO - STRUCTURED_LOG: {'event': 'process_query_start', 'data': {'query_length': 31, 'query_preview': 'SELECT * FROM weather_forecast;'}}
2025-03-25 21:07:11,435 - __main__ - INFO - STRUCTURED_LOG: {'event': 'process_query_start', 'data': {'query_length': 31, 'query_preview': 'SELECT * FROM weather_forecast;'}}


'processed response for: SELECT * FROM weather_forecast;'


## CONFIG


### logging_config.py


In [16]:
import sys
import logging
import logging.config
import logging.handlers
import os
import functools
from pathlib import Path

from config.settings import LOG_LEVEL, LOG_DIR

In [17]:
# Constants for logging
DEFAULT_LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
DETAILED_LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s"
MAX_LOG_SIZE = 10 * 1024 * 1024  # 10MB
BACKUP_COUNT = 5  # Keep 5 backup files

In [18]:
# Map string log levels to logging constants
LOG_LEVELS = {
    "DEBUG": logging.DEBUG,
    "INFO": logging.INFO,
    "WARNING": logging.WARNING,
    "ERROR": logging.ERROR,
    "CRITICAL": logging.CRITICAL,
}

DEFAULT_LOG_LEVEL = logging.INFO

In [19]:
os.makedirs(LOG_DIR, exist_ok=True)

In [20]:
from config.logging_constants import (
    LOG_DIR,
    ETL_LOG_FILE,
    WEB_LOG_FILE,
    DB_LOG_FILE,
    DEFAULT_LOG_LEVEL,
    LOG_LEVELS,
)

# Make constants available for import from this module
__all__ = [
    "DEFAULT_LOG_LEVEL",
    "LOG_LEVELS",
    "LOG_DIR",
    "ETL_LOG_FILE",
    "WEB_LOG_FILE",
    "DB_LOG_FILE",
    "configure_logging",
    "get_logger",
    "set_log_level",
    "log_function_call",
]

In [21]:
def configure_logging(log_dir="logs", level=logging.INFO):
    """
    Configure the logging system with detailed settings.

    Args:
        log_dir: Directory to store logs
        level: Default logging level

    Returns:
        None
    """
    # Ensure log directory exists
    Path(log_dir).mkdir(parents=True, exist_ok=True)

    # Define logging configuration
    config = {
        "version": 1,
        "formatters": {
            "standard": {
                "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
            },
            "detailed": {
                "format": "%(asctime)s - %(name)s - %(levelname)s - %(module)s - %(funcName)s - %(message)s"
            },
        },
        "handlers": {
            "console": {
                "class": "logging.StreamHandler",
                "level": level,
                "formatter": "standard",
                "stream": "ext://sys.stdout",
            },
            "file": {
                "class": "logging.handlers.RotatingFileHandler",
                "level": logging.DEBUG,
                "formatter": "detailed",
                "filename": os.path.join(log_dir, "application.log"),
                "maxBytes": 10485760,  # 10MB
                "backupCount": 5,
            },
            "error_file": {
                "class": "logging.handlers.RotatingFileHandler",
                "level": logging.ERROR,
                "formatter": "detailed",
                "filename": os.path.join(log_dir, "errors.log"),
                "maxBytes": 10485760,  # 10MB
                "backupCount": 5,
            },
        },
        "loggers": {
            "": {  # Root logger
                "handlers": ["console", "file", "error_file"],
                "level": logging.DEBUG,
                "propagate": True,
            }
        },
    }

    # Apply configuration
    logging.config.dictConfig(config)
    logging.info("Logging system configured")

    return logging.getLogger()

In [22]:
def get_logger(name, log_file=None, console=True, level=None):
    logger = logging.getLogger(name)

    # Only configure if handlers aren't already set up
    if not logger.handlers:
        # Set level (from param or env or default)
        level = level or os.environ.get("LOG_LEVEL", DEFAULT_LOG_LEVEL)
        if isinstance(level, str):
            level = LOG_LEVELS.get(level.lower(), DEFAULT_LOG_LEVEL)

        logger.setLevel(level)

        # Create formatter
        formatter = logging.Formatter(
            "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
        )

        # Add console handler if requested
        if console:
            console_handler = logging.StreamHandler()
            console_handler.setFormatter(formatter)
            logger.addHandler(console_handler)

        # Add file handler if specified
        if log_file:
            # Ensure directory exists
            log_dir = os.path.dirname(log_file)
            Path(log_dir).mkdir(parents=True, exist_ok=True)

            file_handler = logging.handlers.RotatingFileHandler(
                log_file, maxBytes=10485760, backupCount=5  # 10MB
            )
            file_handler.setFormatter(formatter)
            logger.addHandler(file_handler)

    return logger

In [23]:
"""
Logging configuration module.

This module provides functions to configure the logging system for the application.
"""

import sys
import logging
import logging.config
import logging.handlers
import os
import functools
from pathlib import Path

from config.settings import LOG_LEVEL, LOG_DIR

# Constants for logging
DEFAULT_LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
DETAILED_LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s"
MAX_LOG_SIZE = 10 * 1024 * 1024  # 10MB
BACKUP_COUNT = 5  # Keep 5 backup files

# Map string log levels to logging constants
LOG_LEVELS = {
    "DEBUG": logging.DEBUG,
    "INFO": logging.INFO,
    "WARNING": logging.WARNING,
    "ERROR": logging.ERROR,
    "CRITICAL": logging.CRITICAL,
}

DEFAULT_LOG_LEVEL = logging.INFO

# Make sure log directory exists
os.makedirs(LOG_DIR, exist_ok=True)

# Import and re-export constants
from .logging_constants import (
    DEFAULT_LOG_LEVEL,
    LOG_LEVELS,
    LOG_DIR,
    ETL_LOG_FILE,
    WEB_LOG_FILE,
    DB_LOG_FILE,
)

# Make constants available for import from this module
__all__ = [
    "DEFAULT_LOG_LEVEL",
    "LOG_LEVELS",
    "LOG_DIR",
    "ETL_LOG_FILE",
    "WEB_LOG_FILE",
    "DB_LOG_FILE",
    "configure_logging",
    "get_logger",
    "set_log_level",
    "log_function_call",
]


def configure_logging(log_dir="logs", level=logging.INFO):
    """
    Configure the logging system with detailed settings.

    Args:
        log_dir: Directory to store logs
        level: Default logging level

    Returns:
        None
    """
    # Ensure log directory exists
    Path(log_dir).mkdir(parents=True, exist_ok=True)

    # Define logging configuration
    config = {
        "version": 1,
        "formatters": {
            "standard": {
                "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
            },
            "detailed": {
                "format": "%(asctime)s - %(name)s - %(levelname)s - %(module)s - %(funcName)s - %(message)s"
            },
        },
        "handlers": {
            "console": {
                "class": "logging.StreamHandler",
                "level": level,
                "formatter": "standard",
                "stream": "ext://sys.stdout",
            },
            "file": {
                "class": "logging.handlers.RotatingFileHandler",
                "level": logging.DEBUG,
                "formatter": "detailed",
                "filename": os.path.join(log_dir, "application.log"),
                "maxBytes": 10485760,  # 10MB
                "backupCount": 5,
            },
            "error_file": {
                "class": "logging.handlers.RotatingFileHandler",
                "level": logging.ERROR,
                "formatter": "detailed",
                "filename": os.path.join(log_dir, "errors.log"),
                "maxBytes": 10485760,  # 10MB
                "backupCount": 5,
            },
        },
        "loggers": {
            "": {  # Root logger
                "handlers": ["console", "file", "error_file"],
                "level": logging.DEBUG,
                "propagate": True,
            }
        },
    }

    # Apply configuration
    logging.config.dictConfig(config)
    logging.info("Logging system configured")

    return logging.getLogger()


def get_logger(name, log_file=None, console=True, level=None):
    """
    Get a configured logger instance.

    Args:
        name: Logger name
        log_file: Optional log file path
        console: Whether to log to console
        level: Log level (defaults to DEFAULT_LOG_LEVEL)

    Returns:
        Logger instance
    """
    logger = logging.getLogger(name)

    # Only configure if handlers aren't already set up
    if not logger.handlers:
        # Set level (from param or env or default)
        level = level or os.environ.get("LOG_LEVEL", DEFAULT_LOG_LEVEL)
        if isinstance(level, str):
            level = LOG_LEVELS.get(level.lower(), DEFAULT_LOG_LEVEL)

        logger.setLevel(level)

        # Create formatter
        formatter = logging.Formatter(
            "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
        )

        # Add console handler if requested
        if console:
            console_handler = logging.StreamHandler()
            console_handler.setFormatter(formatter)
            logger.addHandler(console_handler)

        # Add file handler if specified
        if log_file:
            # Ensure directory exists
            log_dir = os.path.dirname(log_file)
            Path(log_dir).mkdir(parents=True, exist_ok=True)

            file_handler = logging.handlers.RotatingFileHandler(
                log_file, maxBytes=10485760, backupCount=5  # 10MB
            )
            file_handler.setFormatter(formatter)
            logger.addHandler(file_handler)

    return logger


def set_log_level(logger, level):
    """
    Set log level for a logger and all its handlers.

    Args:
        logger: Logger to modify
        level: New log level
    """
    if isinstance(level, str):
        level = LOG_LEVELS.get(level.lower(), DEFAULT_LOG_LEVEL)

    logger.setLevel(level)
    for handler in logger.handlers:
        handler.setLevel(level)


def log_function_call(func_or_logger=None):
    """
    Decorator for logging function calls.

    Can be used in two ways:
    1. As a direct decorator: @log_function_call
    2. With a logger: @log_function_call(logger)

    Args:
        func_or_logger: Function to decorate or logger to use

    Returns:
        Wrapped function or decorator function
    """
    if func_or_logger is None or isinstance(func_or_logger, logging.Logger):
        # Case 2: Called with logger or no args
        logger = func_or_logger or logging.getLogger()

        def decorator(func):
            @functools.wraps(func)
            def wrapper(*args, **kwargs):
                logger.debug(f"Calling {func.__name__}")
                try:
                    result = func(*args, **kwargs)
                    logger.debug(f"Completed {func.__name__}")
                    return result
                except Exception as e:
                    logger.error(f"Error in {func.__name__}: {str(e)}")
                    raise

            return wrapper

        return decorator
    else:
        # Case 1: Called as direct decorator
        func = func_or_logger
        logger = logging.getLogger(func.__module__)

        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            logger.debug(f"Calling {func.__name__}")
            try:
                result = func(*args, **kwargs)
                logger.debug(f"Completed {func.__name__}")
                return result
            except Exception as e:
                logger.error(f"Error in {func.__name__}: {str(e)}")
                raise

        return wrapper


def get_log_level(level_name=None):
    """
    Get the log level from a string.

    Args:
        level_name (str, optional): Log level name (DEBUG, INFO, etc.).
            If None, will use LOG_LEVEL from settings.

    Returns:
        int: The log level constant (e.g., logging.INFO)
    """
    if level_name is None:
        level_name = LOG_LEVEL

    level_name = level_name.upper()

    # Map string log levels to logging constants
    log_levels = {
        "DEBUG": logging.DEBUG,
        "INFO": logging.INFO,
        "WARNING": logging.WARNING,
        "ERROR": logging.ERROR,
        "CRITICAL": logging.CRITICAL,
    }

    return log_levels.get(level_name, logging.INFO)


def create_log_formatter(detailed=False):
    """
    Create a log formatter.

    Args:
        detailed (bool): Whether to use detailed format.
            Detailed format includes filename, line number, and function name.

    Returns:
        logging.Formatter: A log formatter
    """
    log_format = DETAILED_LOG_FORMAT if detailed else DEFAULT_LOG_FORMAT
    return logging.Formatter(log_format)


def create_file_handler(log_file, level=None, formatter=None):
    """
    Create a rotating file handler for logging.

    Args:
        log_file (str): Path to the log file.
        level (int, optional): Log level. If None, will use level from settings.
        formatter (logging.Formatter, optional): Log formatter.
            If None, will create a default formatter.

    Returns:
        logging.Handler: A file handler for logging
    """
    if level is None:
        level = get_log_level()

    if formatter is None:
        formatter = create_log_formatter()

    # Create directory for log file if it doesn't exist
    os.makedirs(os.path.dirname(log_file), exist_ok=True)

    # Create a rotating file handler
    handler = logging.handlers.RotatingFileHandler(
        log_file, maxBytes=MAX_LOG_SIZE, backupCount=BACKUP_COUNT
    )

    handler.setLevel(level)
    handler.setFormatter(formatter)

    return handler


def create_console_handler(level=None, formatter=None):
    """
    Create a console handler for logging.

    Args:
        level (int, optional): Log level. If None, will use level from settings.
        formatter (logging.Formatter, optional): Log formatter.
            If None, will create a default formatter.

    Returns:
        logging.Handler: A console handler for logging
    """
    if level is None:
        level = get_log_level()

    if formatter is None:
        formatter = create_log_formatter()

    # Create a console handler
    handler = logging.StreamHandler(sys.stdout)
    handler.setLevel(level)
    handler.setFormatter(formatter)

    return handler


def configure_logger(
    logger, level=None, add_console_handler=True, log_file=None, detailed=False
):
    """
    Configure a logger with handlers and formatters.

    Args:
        logger (logging.Logger): The logger to configure.
        level (int, optional): Log level. If None, will use level from settings.
        add_console_handler (bool): Whether to add a console handler.
        log_file (str, optional): Path to the log file.
            If None, no file handler will be added.
        detailed (bool): Whether to use detailed format.

    Returns:
        logging.Logger: The configured logger
    """
    if level is None:
        level = get_log_level()

    # Set the logger's level
    logger.setLevel(level)

    # Remove any existing handlers to avoid duplicates
    while logger.handlers:
        logger.removeHandler(logger.handlers[0])

    # Create formatter
    formatter = create_log_formatter(detailed=detailed)

    # Add console handler if requested
    if add_console_handler:
        console_handler = create_console_handler(level=level, formatter=formatter)
        logger.addHandler(console_handler)

    # Add file handler if log_file is specified
    if log_file:
        file_handler = create_file_handler(log_file, level=level, formatter=formatter)
        logger.addHandler(file_handler)

    # Don't propagate to root logger if this is not the root logger
    if logger.name != "root":
        logger.propagate = False

    return logger


def get_component_logger(
    component, subcomponent=None, level=None, add_console_handler=True, detailed=False
):
    """
    Get a logger for a specific component.

    Args:
        component (str): The component name (e.g., 'etl', 'web').
        subcomponent (str, optional): The subcomponent name (e.g., 'extractor').
        level (int, optional): Log level. If None, will use level from settings.
        add_console_handler (bool): Whether to add a console handler.
        detailed (bool): Whether to use detailed format.

    Returns:
        logging.Logger: The configured logger
    """
    # Construct logger name
    logger_name = component
    if subcomponent:
        logger_name = f"{component}.{subcomponent}"

    # Get or create the logger
    logger = logging.getLogger(logger_name)

    # Determine log file based on component
    log_file = os.path.join(LOG_DIR, f"{component}.log")

    # Configure the logger
    configure_logger(
        logger,
        level=level,
        add_console_handler=add_console_handler,
        log_file=log_file,
        detailed=detailed,
    )

    return logger


def configure_root_logger():
    """
    Configure the root logger for the application.

    Returns:
        logging.Logger: The configured root logger
    """
    # Get the root logger
    root_logger = logging.getLogger()

    # Configure with both console and file handlers
    log_file = os.path.join(LOG_DIR, "app.log")
    configure_logger(
        root_logger,
        level=get_log_level(),
        add_console_handler=True,
        log_file=log_file,
        detailed=True,
    )

    root_logger.info("Root logger configured")
    return root_logger

ImportError: attempted relative import with no known parent package

### logging_constants.py


In [None]:
import logging
import os
from pathlib import Path

In [None]:
LOG_DIR = Path("logs")
ETL_LOG_FILE = str(LOG_DIR / "etl" / "etl.log")
WEB_LOG_FILE = str(LOG_DIR / "web" / "web.log")
DB_LOG_FILE = str(LOG_DIR / "db" / "db.log")

In [None]:
# Log levels
DEFAULT_LOG_LEVEL = logging.INFO
LOG_LEVELS = {
    "debug": logging.DEBUG,
    "info": logging.INFO,
    "warning": logging.WARNING,
    "error": logging.ERROR,
    "critical": logging.CRITICAL,
    "production": logging.WARNING,
    "development": logging.DEBUG,
    "testing": logging.DEBUG,
}

### settings.py


In [None]:
import os
import logging
from pathlib import Path
from dotenv import load_dotenv

In [None]:
# Use the already defined cwd variable from cell 1
PROJECT_ROOT = Path.cwd().parent
CONFIG_DIR = os.path.join(PROJECT_ROOT, "config")

In [None]:
load_dotenv(os.path.join(PROJECT_ROOT, ".env"))

True

In [None]:
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
LOG_LEVEL




## DATABASE


### db_connector.py


In [None]:
import psycopg2
from psycopg2.extras import RealDictCursor, DictCursor, Json
import logging
from contextlib import contextmanager
import json
from datetime import datetime
import time
import random

In [None]:
from config.settings import DB_CONFIG
from utils.logger import get_component_logger, log_db_function, log_structured

logger = get_component_logger("db", "connector")


class DatabaseConnectionError(Exception):
    pass


class DatabaseQueryError(Exception):
    pass


class DatabaseConnector:
    def __init__(self, config=None, max_retries=3, retry_delay=1):
        self.config = config or DB_CONFIG
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self._test_connection()

    def _test_connection(self):
        try:
            with self.get_connection() as conn:
                with conn.cursor() as cursor:
                    cursor.execute("SELECT 1")
                    result = cursor.fetchone()
                    if result and result[0] == 1:
                        logger.debug("database connection test successful")
        except Exception as e:
            logger.error(f"database connection test failed {e}")
            raise DatabaseConnectionError(f"failed to connect to db {e}")

    @contextmanager
    @log_db_function
    def get_connection(self):
        """
        Context manager that yields a database connection.
        Automatically closes the connection when exiting the context.
        Implements retry logic for transient connection failures.

        Yields:
            psycopg2.connection: A PostgreSQL database connection

        Raises:
            DatabaseConnectionError: If connection cannot be established after retries
        """
        conn = None
        attempt = 0
        last_error = None

        while attempt < self.max_retries:
            try:
                logger.debug(
                    f"Connecting to the PostgreSQL database (attempt {attempt+1})"
                )
                conn = psycopg2.connect(**self.config)
                yield conn
                return
            except psycopg2.Error as e:
                last_error = e
                logger.warning(f"Database connection error (attempt {attempt+1}): {e}")

                if conn is not None:
                    try:
                        conn.close()
                    except:
                        pass  # Ignore errors on close

                # Add jitter to retry delay to avoid thundering herd
                jitter = random.uniform(0, 0.5)
                retry_wait = self.retry_delay * (2**attempt) + jitter
                logger.debug(f"Retrying in {retry_wait:.2f} seconds")
                time.sleep(retry_wait)
                attempt += 1

        # If we get here, all attempts failed
        logger.error(f"All {self.max_retries} connection attempts failed")
        raise DatabaseConnectionError(
            f"Failed to connect after {self.max_retries} attempts: {last_error}"
        )

    @contextmanager
    def get_cursor(self, cursor_factory=None, named=False):
        pass

In [None]:
DB_CONFIG

{'host': 'localhost',
 'port': 5432,
 'database': 'weather_db',
 'user': 'postgres',
 'password': ''}

### create_weather_stats.py


In [43]:
import os
import sys
from pathlib import Path

project_root = Path.cwd().parent
sys.path.insert(0, str(project_root))

In [32]:
from database.db_connector import DatabaseConnector
from utils.logger import get_component_logger

logger = get_component_logger("db", "schema")


def create_weather_stats_table():
    db = DatabaseConnector()

    try:
        schema_file = os.path.join(Path.cwd(), "weather_stats.sql")
        with open(schema_file, "r") as f:
            sql = f.read()

        with db.transaction() as cursor:
            cursor.execute(sql)
        return True

    except Exception as e:
        return False

In [30]:
os.path.join(Path.cwd(), "weather_stats.sql")

'/Users/vamsi_mbmax/Library/CloudStorage/OneDrive-Personal/01_vam_PROJECTS/LEARNING/proj_PersonalProjects/dev/pract_pp_etl_based_chatbot/notebooks/weather_stats.sql'

### db_utils.py


In [37]:
import json
import logging
from datetime import datetime
from typing import Dict, List, Any, Optional, Tuple, Union

from database.db_connector import DatabaseConnector
from utils.logger import get_component_logger, log_db_function

In [38]:
class DatabaseError(Exception):
    pass

In [39]:
def get_value_from_result(result, key_or_index, default=None):
    if result is None:
        return default

    try:
        if isinstance(result, dict):
            return result.get(key_or_index, default)
        elif isinstance(result, (list, tuple)):
            # If the key_or_index is a string but the result is a tuple,
            # we need to convert it to an integer index
            if isinstance(key_or_index, str) and key_or_index == "location_id":
                return result[0]  # Assume first column is location_id
            elif isinstance(key_or_index, str) and key_or_index == "weather_id":
                return result[0]  # Assume first column is weather_id
            elif isinstance(key_or_index, str) and key_or_index == "forecast_id":
                return result[0]  # Assume first column is forecast_id
            elif isinstance(key_or_index, str) and key_or_index == "report_id":
                return result[0]  # Assume first column is report_id
            elif isinstance(key_or_index, int):
                return (
                    result[key_or_index] if 0 <= key_or_index < len(result) else default
                )
            else:
                return default
    except Exception as e:
        logger.warning(f"Failed to extract {key_or_index} from result: {str(e)}")
        return default

In [40]:
@log_db_function
def get_or_create_location(
    city_name: str,
    country: str,
    latitude: float = None,
    longitude: float = None,
    timezone: str = None,
    population: int = None,
) -> int:
    db = DatabaseConnector()

    try:
        query = """
            SELECT location_id FROM locations
            WHERE city_name = %s AND country = %s
        """
        result = db.execute_query(query, (city_name, country))

        if result:
            # Extract location_id safely
            location_id = get_value_from_result(result[0], "location_id")
            if location_id is not None:
                logger.debug(
                    f"Found existing location ID {location_id} for {city_name}, {country}"
                )
                return location_id

        # If not found or location_id was None, create a new location
        logger.info(f"Creating new location for {city_name}, {country}")
        insert_query = """
            INSERT INTO locations (city_name, country, latitude, longitude, timezone, population)
            VALUES (%s, %s, %s, %s, %s, %s)
            RETURNING location_id
        """

        result = db.execute_query(
            insert_query,
            (city_name, country, latitude, longitude, timezone, population),
        )

        if not result:
            raise DatabaseError(f"Failed to create location for {city_name}, {country}")

        # Extract location_id safely
        location_id = get_value_from_result(result[0], "location_id")
        if location_id is None:
            # If still None, try to access as first element of tuple
            location_id = get_value_from_result(result[0], 0)

        if location_id is None:
            raise DatabaseError("Could not extract location_id from database result")

        logger.info(f"Created new location with ID {location_id}")
        return location_id

    except Exception as e:
        logger.error(f"Error getting or creating location: {str(e)}")
        raise

In [None]:
@log_db_function
def save_current_weather():
    pass

### init_db.py


### models.py


### sample_data.py


### schema.sql


### weather_stats.sql



## ETL


### data_loader.py


### data_processor.py


### etl_pipeline.py


### extract.py


### load.py


### transform.py


### weather_collector.py



## EXAMPLES


### logging_example.py




## TESTS


### data (directory)


### logger-tests.py


### test_chatbot.py


### test_data_processor.py


### test_database.py


### test_db_utils.py


### test_location_validator.py


### test_logging.py


### test_weather_collector.py



## UTILS


### check_api_availability.py


### check_api_key.py


### cleanup.py


### location_validator.py


### logger.py


### logger_migration.py


### test_city_format.py



## WEB


### api.py


### app.py


### chatbot.py


### chatbot_utils.py


### routes.py


### static (directory)


### templates (directory)



## ROOT DIRECTORY FILES



### main.py


### setup.py