Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transaction retries are not always handled properly #207

Open
dikshant opened this issue Apr 14, 2023 · 19 comments
Open

Transaction retries are not always handled properly #207

dikshant opened this issue Apr 14, 2023 · 19 comments

Comments

@dikshant
Copy link

We have a customer who is experiencing issues with transaction retries not being handled in sqlalchemy v 1.4.x. More details to follow but we would like to add transaction retries where applicable.

cc: @gordthompson @rafiss

@dikshant dikshant added the bug label Apr 14, 2023
@rafiss
Copy link
Contributor

rafiss commented Apr 14, 2023

Are they already using the helper?

def run_transaction(transactor, callback, max_retries=None, max_backoff=0):

@dikshant
Copy link
Author

They mentioned they are using Pandas which is using sqlalchemy underneath, I do wonder if Pandas is making the right call underneath

@gordthompson
Copy link
Collaborator

If they are using vanilla pandas .to_sql() then it is most likely using plain old DBAPI .executemany(). If something special is required for pandas then they might need to use a "callable with signature" .to_sql() method. One example of such a method for PostgreSQL is here:

https://pandas.pydata.org/docs/user_guide/io.html#io-sql-method

@ddennerline3
Copy link

I took a look at the pandas github repo. The code checks for a SQLAlchemy connection and the starts using the normal statement functionality. The code keeps going down the normal SQL text() statement path. Hopefully this information is helpful.

   sql.py
   
    if sqlalchemy is not None and isinstance(con, (str, sqlalchemy.engine.Engine)):
        raise TypeError("pandas.io.sql.execute requires a connection")  # GH50185
    with pandasSQL_builder(con, need_transaction=True) as pandas_sql:
        return pandas_sql.execute(sql, params)
        
def pandas_SQLBuilder():
   if sqlalchemy is not None and isinstance(con, (str, sqlalchemy.engine.Connectable)):
        return SQLDatabase(con, schema, need_transaction)
        
class SQLDatabase(PandasSQL):
    def __init__(
        self, con, schema: str | None = None, need_transaction: bool = False
    ) -> None:
        from sqlalchemy import create_engine
        from sqlalchemy.engine import Engine
        from sqlalchemy.schema import MetaData

        # self.exit_stack cleans up the Engine and Connection and commits the
        # transaction if any of those objects was created below.
        # Cleanup happens either in self.__exit__ or at the end of the iterator
        # returned by read_sql when chunksize is not None.
        self.exit_stack = ExitStack()
        if isinstance(con, str):
            con = create_engine(con)
            self.exit_stack.callback(con.dispose)
        if isinstance(con, Engine):
            con = self.exit_stack.enter_context(con.connect())
        if need_transaction and not con.in_transaction():
            self.exit_stack.enter_context(con.begin())
        self.con = con
        self.meta = MetaData(schema=schema)
        self.returns_generator = False

    def __exit__(self, *args) -> None:
        if not self.returns_generator:
            self.exit_stack.close()

@gordthompson
Copy link
Collaborator

Further to my earlier comment, this is what I had in mind:

import pandas as pd
import sqlalchemy as sa
from sqlalchemy_cockroachdb.transaction import run_transaction

engine = sa.create_engine(
    "cockroachdb+psycopg2://root@localhost:26257/defaultdb"
)


def crdb_insert_trans(table, eng, keys, data_iter):
    # to_sql() "method=" adapted from the example at
    # https://pandas.pydata.org/docs/user_guide/io.html#io-sql-method

    def callback(conn):
        tbl = sa.Table(
            table.name,
            sa.MetaData(),
            schema=table.schema,
            autoload_with=eng,
        )
        data = [dict(zip(keys, row)) for row in data_iter]
        conn.execute(sa.insert(tbl), data)

    run_transaction(eng, callback)


df = pd.DataFrame([(1, "foo"), (2, "bar")], columns=["id", "txt"])
df.to_sql(
    "table1", engine, if_exists="append", index=False, method=crdb_insert_trans
)

@gordthompson
Copy link
Collaborator

@dikshant - Does my code sample above resolve the issue?

@ddennerline3
Copy link

ddennerline3 commented Apr 20, 2023

@gordthompson The code calls the pandas read_sql_query().

   engine = SqlDatabase.create_engine(connection_string, timeout_seconds, app_name)
    try:
        assert engine, "SQL Alchemy Engine is NULL"
        with engine.connect() as conn:
            if chunk_size:
                conn = conn.execution_options(stream_results=True)
                df = None
                for chunk_df in pd.read_sql_query(
                    get_qualified_gigs_prod_query,
                    params=params,
                    con=conn,
                    parse_dates=parse_dates,
                    chunksize=chunk_size,
                ):
                    if df is None:
                        df = chunk_df
                    else:
                        df = pd.concat([df, chunk_df])

@gordthompson
Copy link
Collaborator

@ddennerline3 - So is read_sql_query() ultimately throwing a transaction-related exception? If so, can you provide a stack trace?

@ddennerline3
Copy link

pd.read_sql_query() is only generating an error that is probably ignored somewhere when the exception is raised.

(psycopg2.errors.SerializationFailure) restart transaction: TransactionRetryWithProtoRefreshError: ReadWithinUncertaintyIntervalError: read at time 1681307575.550021729,0 encountered previous write with future timestamp 1681307575.677729181,0 within uncertainty interval `t <= (local=0,0, global=1681307575.800021729,0)`; observed timestamps: [{4 1681307575.771496910,0} {5 1681307578.720783940,0} {6 1681307575.771576082,0} {9 1681307575.550021729,0}]: \"sql txn\" meta={id=cdc6eb3f key=/Min

@gordthompson
Copy link
Collaborator

Try wrapping the .read_sql_query() call in a function that invokes it via the run_transaction() helper. A simplified example would be:

import pandas as pd
import sqlalchemy as sa
from sqlalchemy_cockroachdb.transaction import run_transaction

engine = sa.create_engine(
    "cockroachdb+psycopg2://root@localhost:26257/defaultdb"
)


def read_sql_qry_trans(qry, engine_):

    def callback(conn):
        return pd.read_sql_query(qry, conn)

    return run_transaction(engine_, callback)


sql = """\
SELECT 1 AS id, 'foo' AS txt
UNION ALL
SELECT 2 AS id, 'bar' AS txt
"""
df = read_sql_qry_trans(sql, engine)
print(df)
"""
   id  txt
0   1  foo
1   2  bar
"""

@ddennerline3
Copy link

Is run_transaction() compatible with Postgres?
We would like to maintain one codebase.
If not compatible, I saw it's possible to for SQLA to check the engine instance type.

@gordthompson
Copy link
Collaborator

Is run_transaction() compatible with Postgres?

I just tried my sample code above with a postgresql+psycopg2://… URL against a real PostgreSQL server and nobody complained.

If not compatible, I saw it's possible to for SQLA to check the engine instance type.

True. Your wrapper could always call run_transaction() for the cockroachdb dialect and just invoke the callback (with read_sql_query()) directly for postgresql.

@ddennerline3
Copy link

I recently built a custom DB query tool and used SQLAlchemy; it started experiencing transaction errors. To work around the Transaction retry errors, I created this run_statement() function below. I saw the CRDB adapter code is following a similar pattern, but I don't believe it’s auto-retrying. FWIW, it took some effort to find the right combination of exceptions and error code to make this function work correctly. It works for both Postgres and CRDB.

def run_statement(db_conn: Session, stmt: TextClause, bind_params: dict, max_retries=3):
    """Run SQL statement and retry connection for SerializationErrors

    Args:
        db_conn: Active MainAPI DB connection
        stmt: the SQLAlchemy text() statement to execute
        bind_params: a dictionary containing all input values for SQL
        max_retries: If non-Null, then set the maximum number of retries

    Returns:
        The SQLAlchemy ResultSet list of dictionaries
    """
    retry_count = 0
    while True:
        try:
            sql = db_conn.execute(stmt, bind_params)
            return sql
        except DBAPIError as e:
            db_conn.rollback()
            if (max_retries is None) or (
                max_retries is not None and retry_count >= max_retries
            ):
                raise
            retry_count += 1
            if isinstance(e.orig, psycopg2.OperationalError):
                if e.orig.pgcode == psycopg2.errorcodes.SERIALIZATION_FAILURE:
                    sleep_ms = (2**retry_count) * 0.1 * (random.random() + 0.5)
                    time.sleep(sleep_ms)
                    continue
            raise

@gordthompson
Copy link
Collaborator

@ddennerline3 - Thanks for sharing your code. Just to confirm: Did it solve the read_sql_query() problem for you?

I saw the CRDB adapter code is following a similar pattern, but I don't believe it’s auto-retrying.

Did you try it? It's certainly set up to do that.

One subtle difference between your code and the code in transaction.py is that your code interprets max_retries=None as "never retry", whereas the existing adapter code interprets max_retries=None as "unlimited retries".

@ddennerline3
Copy link

ddennerline3 commented May 3, 2023

There are two issues I was trying to address:

  • SQLAlchemy serialization retries may possibly not be working. The reason for statement is one of our non-pandas CRDB queries was generating an exception and stopping the process. After I wrote the run_statement() function above, the process no longer crashed.
  • Panda's integration with SQLAlchemy is not retrying when a serialization error is encountered.

On both items, I suspect if the transaction.py code was changed slightly, then raw SQLAlchemy statements and Pandas my operate correctly.

The run_statement() code above is required because the transaction.py code doesn't seem to retry.

            except sqlalchemy.exc.DatabaseError as e:   <<< I am not sure this exception is caught
                if max_retries is not None and retry_count >= max_retries:
                    raise
                retry_count += 1
                if isinstance(e.orig, psycopg2.OperationalError):
                    if e.orig.pgcode == psycopg2.errorcodes.SERIALIZATION_FAILURE:
                        if max_backoff > 0:
                            retry_exponential_backoff(retry_count, max_backoff)
                        continue

I didn't try to 100% emulate the transaction.py interface; thus the difference in retry=None.

@gordthompson
Copy link
Collaborator

gordthompson commented May 3, 2023

Oh, so you suspect that

except sqlalchemy.exc.DatabaseError as e:

in transaction.py might be too specific?

@gordthompson
Copy link
Collaborator

Also, are you running SQLAlchemy 2.0, or 1.4?

@ddennerline3
Copy link

ddennerline3 commented May 5, 2023

SQLAlchemy 2.0

I tried originally to use DatabaseError, but that exception wasn't caught. When I shifted to DBAPIError, the exception was caught and I could check the original error.

I did have benefit of running the code on stressed cluster in a debugger.

gordthompson added a commit to gordthompson/sqlalchemy-cockroachdb that referenced this issue May 5, 2023
Fixes: cockroachdb#207

Catch DBAPIError to handle errors not covered
by DatabaseError.
@gordthompson
Copy link
Collaborator

Okay, thanks. I have pushed an update to my branch. Before I submit a PR I would be grateful if you could

pip install git+https://github.com/gordthompson/sqlalchemy-cockroachdb@issue_207

and verify that it avoids the issue in your particular environment.

@gordthompson gordthompson changed the title Transaction retries are not properly handled in v1.4.x Transaction retries are not always handled properly May 5, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants