Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ with AwsWrapperConnection.connect(
autocommit=True
) as awsconn:
awscursor = awsconn.cursor()
awscursor.execute("SELECT aurora_db_instance_identifier()")
awscursor.execute("SELECT pg_catalog.aurora_db_instance_identifier()")
row = awscursor.fetchone()
print(row)
```
Expand Down
35 changes: 18 additions & 17 deletions aws_advanced_python_wrapper/database_dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def default_port(self) -> int:

@property
def host_alias_query(self) -> str:
return "SELECT CONCAT(@@hostname, ':', @@port)"
return "SELECT pg_catalog.CONCAT(@@hostname, ':', @@port)"

@property
def server_version_query(self) -> str:
Expand Down Expand Up @@ -228,11 +228,11 @@ def default_port(self) -> int:

@property
def host_alias_query(self) -> str:
return "SELECT CONCAT(inet_server_addr(), ':', inet_server_port())"
return "SELECT pg_catalog.CONCAT(pg_catalog.inet_server_addr(), ':', pg_catalog.inet_server_port())"

@property
def server_version_query(self) -> str:
return "SELECT 'version', VERSION()"
return "SELECT 'version', pg_catalog.VERSION()"

@property
def dialect_update_candidates(self) -> Optional[Tuple[DialectCode, ...]]:
Expand All @@ -249,7 +249,7 @@ def is_dialect(self, conn: Connection, driver_dialect: DriverDialect) -> bool:
initial_transaction_status: bool = driver_dialect.is_in_transaction(conn)
try:
with closing(conn.cursor()) as cursor:
cursor.execute('SELECT 1 FROM pg_proc LIMIT 1')
cursor.execute('SELECT 1 FROM pg_catalog.pg_proc LIMIT 1')
if cursor.fetchone() is not None:
return True
except Exception:
Expand Down Expand Up @@ -329,8 +329,8 @@ def is_blue_green_status_available(self, conn: Connection) -> bool:
class RdsPgDialect(PgDatabaseDialect, BlueGreenDialect):
_EXTENSIONS_QUERY = ("SELECT (setting LIKE '%rds_tools%') AS rds_tools, "
"(setting LIKE '%aurora_stat_utils%') AS aurora_stat_utils "
"FROM pg_settings "
"WHERE name='rds.extensions'")
"FROM pg_catalog.pg_settings "
"WHERE name OPERATOR(pg_catalog.=) 'rds.extensions'")
_DIALECT_UPDATE_CANDIDATES = (DialectCode.AURORA_PG, DialectCode.MULTI_AZ_CLUSTER_PG)

_BG_STATUS_QUERY = (f"SELECT version, endpoint, port, role, status "
Expand Down Expand Up @@ -427,24 +427,25 @@ class AuroraPgDialect(PgDatabaseDialect, TopologyAwareDatabaseDialect, AuroraLim
_DIALECT_UPDATE_CANDIDATES: Tuple[DialectCode, ...] = (DialectCode.MULTI_AZ_CLUSTER_PG,)

_EXTENSIONS_QUERY = "SELECT (setting LIKE '%aurora_stat_utils%') AS aurora_stat_utils " \
"FROM pg_settings WHERE name='rds.extensions'"
"FROM pg_catalog.pg_settings WHERE name OPERATOR(pg_catalog.=) 'rds.extensions'"

_HAS_TOPOLOGY_QUERY = "SELECT 1 FROM aurora_replica_status() LIMIT 1"
_HAS_TOPOLOGY_QUERY = "SELECT 1 FROM pg_catalog.aurora_replica_status() LIMIT 1"

_TOPOLOGY_QUERY = \
("SELECT SERVER_ID, CASE WHEN SESSION_ID = 'MASTER_SESSION_ID' THEN TRUE ELSE FALSE END, "
("SELECT SERVER_ID, CASE WHEN SESSION_ID OPERATOR(pg_catalog.=) 'MASTER_SESSION_ID' THEN TRUE ELSE FALSE END, "
"CPU, COALESCE(REPLICA_LAG_IN_MSEC, 0), LAST_UPDATE_TIMESTAMP "
"FROM aurora_replica_status() "
"WHERE EXTRACT(EPOCH FROM(NOW() - LAST_UPDATE_TIMESTAMP)) <= 300 OR SESSION_ID = 'MASTER_SESSION_ID' "
"FROM pg_catalog.aurora_replica_status() "
"WHERE EXTRACT(EPOCH FROM(pg_catalog.NOW() OPERATOR(pg_catalog.-) LAST_UPDATE_TIMESTAMP)) OPERATOR(pg_catalog.<=) 300 "
"OR SESSION_ID OPERATOR(pg_catalog.=) 'MASTER_SESSION_ID' "
"OR LAST_UPDATE_TIMESTAMP IS NULL")

_HOST_ID_QUERY = "SELECT aurora_db_instance_identifier()"
_IS_READER_QUERY = "SELECT pg_is_in_recovery()"
_LIMITLESS_ROUTER_ENDPOINT_QUERY = "SELECT router_endpoint, load FROM aurora_limitless_router_endpoints()"
_HOST_ID_QUERY = "SELECT pg_catalog.aurora_db_instance_identifier()"
_IS_READER_QUERY = "SELECT pg_catalog.pg_is_in_recovery()"
_LIMITLESS_ROUTER_ENDPOINT_QUERY = "SELECT router_endpoint, load FROM pg_catalog.aurora_limitless_router_endpoints()"

_BG_STATUS_QUERY = (f"SELECT version, endpoint, port, role, status "
f"FROM get_blue_green_fast_switchover_metadata('aws_advanced_python_wrapper-{DriverInfo.DRIVER_VERSION}')")
_BG_STATUS_EXISTS_QUERY = "SELECT 'get_blue_green_fast_switchover_metadata'::regproc"
f"FROM pg_catalog.get_blue_green_fast_switchover_metadata('aws_advanced_python_wrapper-{DriverInfo.DRIVER_VERSION}')")
_BG_STATUS_EXISTS_QUERY = "SELECT 'pg_catalog.get_blue_green_fast_switchover_metadata'::regproc"

@property
def dialect_update_candidates(self) -> Optional[Tuple[DialectCode, ...]]:
Expand Down Expand Up @@ -560,7 +561,7 @@ class MultiAzClusterPgDialect(PgDatabaseDialect, TopologyAwareDatabaseDialect):
_WRITER_HOST_QUERY = \
"SELECT multi_az_db_cluster_source_dbi_resource_id FROM rds_tools.multi_az_db_cluster_source_dbi_resource_id()"
_HOST_ID_QUERY = "SELECT dbi_resource_id FROM rds_tools.dbi_resource_id()"
_IS_READER_QUERY = "SELECT pg_is_in_recovery()"
_IS_READER_QUERY = "SELECT pg_catalog.pg_is_in_recovery()"
_exception_handler: Optional[ExceptionHandler] = None

@property
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/PGLimitless.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
plugins="limitless",
autocommit=True
) as awsconn, awsconn.cursor() as awscursor:
awscursor.execute("SELECT * FROM aurora_db_instance_identifier()")
awscursor.execute("SELECT * FROM pg_catalog.aurora_db_instance_identifier()")

res = awscursor.fetchone()
print(res)
2 changes: 1 addition & 1 deletion docs/examples/PGOktaAuthentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
db_user="john",
autocommit=True
) as awsconn, awsconn.cursor() as awscursor:
awscursor.execute("SELECT * FROM aurora_db_instance_identifier()")
awscursor.execute("SELECT * FROM pg_catalog.aurora_db_instance_identifier()")

res = awscursor.fetchone()
print(res)
2 changes: 1 addition & 1 deletion docs/examples/PGSecretsManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@
secrets_manager_region="us-east-2",
plugins="aws_secrets_manager"
) as awsconn, awsconn.cursor() as cursor:
cursor.execute("SELECT aurora_db_instance_identifier()")
cursor.execute("SELECT pg_catalog.aurora_db_instance_identifier()")
for record in cursor.fetchone():
print(record)
6 changes: 3 additions & 3 deletions tests/integration/container/test_blue_green_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ class TestBlueGreenDeployment:
"FROM mysql.rds_topology")
PG_AURORA_BG_STATUS_QUERY = \
("SELECT id, SPLIT_PART(endpoint, '.', 1) as hostId, endpoint, port, role, status, version "
"FROM get_blue_green_fast_switchover_metadata('aws_jdbc_driver')")
"FROM pg_catalog.get_blue_green_fast_switchover_metadata('aws_advanced_python_wrapper')")
PG_RDS_BG_STATUS_QUERY = \
(f"SELECT id, SPLIT_PART(endpoint, '.', 1) as hostId, endpoint, port, role, status, version "
f"FROM rds_tools.show_topology('aws_jdbc_driver-{DriverInfo.DRIVER_VERSION}')")
f"FROM rds_tools.show_topology('aws_advanced_python_wrapper-{DriverInfo.DRIVER_VERSION}')")
results: ConcurrentDict[str, BlueGreenResults] = ConcurrentDict()
unhandled_exceptions: Deque[Exception] = deque()
mysql_dialect = MySQLDriverDialect(Properties())
Expand Down Expand Up @@ -686,7 +686,7 @@ def wrapper_blue_executing_connectivity_monitor(
if engine == DatabaseEngine.MYSQL:
query = "SELECT sleep(5)"
elif engine == DatabaseEngine.PG:
query = "SELECT pg_sleep(5)"
query = "SELECT pg_catalog.pg_sleep(5)"
else:
pytest.fail(f"Unsupported database engine: {engine}")

Expand Down
2 changes: 1 addition & 1 deletion tests/integration/container/test_failover_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def _measure_performance(
sleep_delay_sec: int,
props: Properties,
data: PerfStatMonitoring):
query: str = "SELECT pg_sleep(600)"
query: str = "SELECT pg_catalog.pg_sleep(600)"
downtime: AtomicInt = AtomicInt()
elapsed_times: List[int] = []

Expand Down
12 changes: 6 additions & 6 deletions tests/integration/container/utils/rds_test_utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def _query_aurora_instance_id(self, conn: Connection, engine: DatabaseEngine) ->
if engine == DatabaseEngine.MYSQL:
sql = "SELECT @@aurora_server_id"
elif engine == DatabaseEngine.PG:
sql = "SELECT aurora_db_instance_identifier()"
sql = "SELECT pg_catalog.aurora_db_instance_identifier()"
else:
raise UnsupportedOperationError(engine.value)

Expand Down Expand Up @@ -392,18 +392,18 @@ def _get_aurora_topology_sql(self, engine: DatabaseEngine) -> str:
return ("SELECT SERVER_ID, SESSION_ID FROM information_schema.replica_host_status "
"ORDER BY IF(SESSION_ID = 'MASTER_SESSION_ID', 0, 1)")
elif engine == DatabaseEngine.PG:
return ("SELECT SERVER_ID, SESSION_ID FROM aurora_replica_status() "
"ORDER BY CASE WHEN SESSION_ID = 'MASTER_SESSION_ID' THEN 0 ELSE 1 END")
return ("SELECT SERVER_ID, SESSION_ID FROM pg_catalog.aurora_replica_status() "
"ORDER BY CASE WHEN SESSION_ID OPERATOR(pg_catalog.=) 'MASTER_SESSION_ID' THEN 0 ELSE 1 END")
else:
raise UnsupportedOperationError(engine.value)

def _get_multi_az_topology_sql(self, engine: DatabaseEngine, writer_id) -> str:
if engine == DatabaseEngine.MYSQL:
return f"SELECT id, endpoint, port FROM mysql.rds_topology ORDER BY id='{writer_id}' DESC"
return f"SELECT id, endpoint, port FROM mysql.rds_topology ORDER BY id = '{writer_id}' DESC"
elif engine == DatabaseEngine.PG:
return (f"SELECT id, endpoint, port "
f"FROM rds_tools.show_topology('aws_python_driver-{DriverInfo.DRIVER_VERSION}')"
f"ORDER BY id='{writer_id}' DESC")
f"ORDER BY id OPERATOR(pg_catalog.=) '{writer_id}' DESC")
else:
raise UnsupportedOperationError(engine.value)

Expand Down Expand Up @@ -536,7 +536,7 @@ def temporary_failure():
def get_sleep_sql(self, seconds: float) -> str:
engine = TestEnvironment.get_current().get_engine()
if engine == DatabaseEngine.PG:
return f"SELECT pg_sleep({seconds})"
return f"SELECT pg_catalog.pg_sleep({seconds})"
elif engine == DatabaseEngine.MYSQL:
return f"SELECT SLEEP({seconds})"
else:
Expand Down