In [1]:
from sqlalchemy import create_engine, text
import pandas as pd
import re
LIKE_PATTERN = r"LIKE[\s\S]*'"
from func_timeout import func_timeout
from utils.creds import db_creds_all

# for escaping percent signs in regex matches
def escape_percent(match):
    # Extract the matched group
    group = match.group(0)
    # Replace '%' with '%%' within the matched group
    escaped_group = group.replace("%", "%%")
    # Return the escaped group
    return escaped_group

def query_postgres_db(
    query: str,
    db_name: str,
    db_creds: dict = None,
    timeout: float = 10.0,
    decimal_points: int = None,
) -> pd.DataFrame:
    """
    Runs query on postgres db and returns results as a dataframe.
    This assumes that you have the evaluation database running locally.
    If you don't, you can following the instructions in the README (Start Postgres Instance) to set it up.

    timeout: time in seconds to wait for query to finish before timing out
    decimal_points: number of decimal points to round floats to
    """
    engine = None
    if db_creds is None:
        db_creds = db_creds_all["postgres"]
    try:
        try:
            import psycopg

            has_psycopg = True
        except ImportError:
            has_psycopg = False
        try:
            import psycopg2

            has_psycopg2 = True
        except ImportError:
            has_psycopg2 = False
        if not has_psycopg2 and not has_psycopg:
            print(
                "You do not have psycopg2 or psycopg installed. Please install either."
            )
            exit(1)
        if has_psycopg2:
            dialect_prefix = "postgresql"
        elif has_psycopg:
            dialect_prefix = "postgresql+psycopg"
        db_url = f"{dialect_prefix}://{db_creds['user']}:{db_creds['password']}@{db_creds['host']}:{db_creds['port']}/{db_name}"
        engine = create_engine(db_url)
        escaped_query = re.sub(
            LIKE_PATTERN, escape_percent, query, flags=re.IGNORECASE
        )  # ignore case of LIKE
        results_df = func_timeout(
            timeout, pd.read_sql_query, args=(escaped_query, engine)
        )

        # round floats to decimal_points
        if decimal_points:
            results_df = results_df.round(decimal_points)

        engine.dispose()  # close connection
        return results_df
    except Exception as e:
        if engine:
            engine.dispose()  # close connection if query fails/timeouts
        raise e

In [2]:
import traceback

try:
    df=query_postgres_db('''
select year_2022 from devices where title like '%Number of POS terminals%' and reporting_country='T√ºrkiye' and collection_indicator='End of period';
''','cpmi')
except Exception as e:
    print(e._sql_message())


In [3]:
df

Unnamed: 0,year_2022
