@@ -37,9 +37,11 @@ async def safe_execute(db: AsyncSession, stmt, params=None, max_retries: int = 3
3737 """
3838 dialect = db .bind .dialect .name
3939
40- # MySQL-specific IGNORE prefix
40+ # MySQL-specific IGNORE prefix - but skip if using ON DUPLICATE KEY UPDATE
4141 if dialect == "mysql" and isinstance (stmt , Insert ):
42- stmt = stmt .prefix_with ("IGNORE" )
42+ # Check if statement already has ON DUPLICATE KEY UPDATE
43+ if not hasattr (stmt , '_post_values_clause' ) or stmt ._post_values_clause is None :
44+ stmt = stmt .prefix_with ("IGNORE" )
4345
4446 for attempt in range (max_retries ):
4547 try :
@@ -50,14 +52,16 @@ async def safe_execute(db: AsyncSession, stmt, params=None, max_retries: int = 3
5052 # Rollback the session
5153 await db .rollback ()
5254
53- # Specific error code handling
55+ # Specific error code handling with exponential backoff
5456 if dialect == "mysql" :
5557 # MySQL deadlock (Error 1213)
5658 if err .orig .args [0 ] == 1213 and attempt < max_retries - 1 :
59+ await asyncio .sleep (0.05 * (2 ** attempt )) # 50ms, 100ms, 200ms
5760 continue
5861 elif dialect == "postgresql" :
5962 # PostgreSQL deadlock (Error 40P01)
6063 if err .orig .code == "40P01" and attempt < max_retries - 1 :
64+ await asyncio .sleep (0.05 * (2 ** attempt ))
6165 continue
6266 elif dialect == "sqlite" :
6367 # SQLite database locked error
0 commit comments