diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2ConnectionImpl.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2ConnectionImpl.java index a24109532..fd9cf2c00 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2ConnectionImpl.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2ConnectionImpl.java @@ -26,6 +26,7 @@ import io.vertx.db2client.impl.command.PingCommand; import io.vertx.sqlclient.impl.Connection; import io.vertx.sqlclient.impl.SqlConnectionImpl; +import io.vertx.sqlclient.impl.tracing.QueryTracer; public class DB2ConnectionImpl extends SqlConnectionImpl implements DB2Connection { @@ -37,21 +38,22 @@ public static Future connect(Vertx vertx, DB2ConnectOptions optio } catch (Exception e) { return ctx.failedFuture(e); } + QueryTracer tracer = ctx.tracer() == null ? null : new QueryTracer(ctx.tracer(), options); Promise promise = ctx.promise(); - ctx.dispatch(null, v -> connect(client, ctx, promise)); + ctx.dispatch(null, v -> connect(client, ctx, tracer, promise)); return promise.future(); } - private static void connect(DB2ConnectionFactory client, ContextInternal ctx, Promise promise) { + private static void connect(DB2ConnectionFactory client, ContextInternal ctx, QueryTracer tracer, Promise promise) { client.connect().map(conn -> { - DB2ConnectionImpl db2Connection = new DB2ConnectionImpl(client, ctx, conn); + DB2ConnectionImpl db2Connection = new DB2ConnectionImpl(client, ctx, conn, tracer); conn.init(db2Connection); return (DB2Connection) db2Connection; }).onComplete(promise); } - public DB2ConnectionImpl(DB2ConnectionFactory factory, ContextInternal context, Connection conn) { - super(context, conn); + public DB2ConnectionImpl(DB2ConnectionFactory factory, ContextInternal context, Connection conn, QueryTracer tracer) { + super(context, conn, tracer); } @Override diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2PoolImpl.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2PoolImpl.java index ca56a19df..093a4997e 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2PoolImpl.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2PoolImpl.java @@ -25,12 +25,14 @@ import io.vertx.sqlclient.impl.Connection; import io.vertx.sqlclient.impl.PoolBase; import io.vertx.sqlclient.impl.SqlConnectionImpl; +import io.vertx.sqlclient.impl.tracing.QueryTracer; public class DB2PoolImpl extends PoolBase implements DB2Pool { public static DB2PoolImpl create(ContextInternal context, boolean closeVertx, DB2ConnectOptions connectOptions, PoolOptions poolOptions) { - DB2PoolImpl pool = new DB2PoolImpl(context, poolOptions, new DB2ConnectionFactory(context, connectOptions)); + QueryTracer tracer = context.tracer() == null ? null : new QueryTracer(context.tracer(), connectOptions); + DB2PoolImpl pool = new DB2PoolImpl(context, poolOptions, new DB2ConnectionFactory(context, connectOptions), tracer); CloseFuture closeFuture = pool.closeFuture(); if (closeVertx) { closeFuture.onComplete(ar -> context.owner().close()); @@ -42,8 +44,8 @@ public static DB2PoolImpl create(ContextInternal context, boolean closeVertx, DB private final DB2ConnectionFactory factory; - private DB2PoolImpl(ContextInternal context, PoolOptions poolOptions, DB2ConnectionFactory factory) { - super(context, factory, poolOptions); + private DB2PoolImpl(ContextInternal context, PoolOptions poolOptions, DB2ConnectionFactory factory, QueryTracer tracer) { + super(context, factory, tracer, poolOptions); this.factory = factory; } @@ -55,6 +57,6 @@ public void connect(Handler> completionHandler) { @SuppressWarnings("rawtypes") @Override protected SqlConnectionImpl wrap(ContextInternal context, Connection conn) { - return new DB2ConnectionImpl(factory, context, conn); + return new DB2ConnectionImpl(factory, context, conn, tracer); } } diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLConnectionImpl.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLConnectionImpl.java index 3a29179ce..3673a5089 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLConnectionImpl.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLConnectionImpl.java @@ -19,12 +19,13 @@ import io.vertx.core.Vertx; import io.vertx.sqlclient.impl.Connection; import io.vertx.sqlclient.impl.SqlConnectionImpl; +import io.vertx.sqlclient.impl.tracing.QueryTracer; public class MSSQLConnectionImpl extends SqlConnectionImpl implements MSSQLConnection { private final MSSQLConnectionFactory factory; - public MSSQLConnectionImpl(MSSQLConnectionFactory factory, ContextInternal context, Connection conn) { - super(context, conn); + public MSSQLConnectionImpl(MSSQLConnectionFactory factory, ContextInternal context, Connection conn, QueryTracer tracer) { + super(context, conn, tracer); this.factory = factory; } @@ -36,12 +37,13 @@ public int appendQueryPlaceholder(StringBuilder queryBuilder, int index, int cur public static Future connect(Vertx vertx, MSSQLConnectOptions options) { ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext(); + QueryTracer tracer = ctx.tracer() == null ? null : new QueryTracer(ctx.tracer(), options); PromiseInternal promise = ctx.promise(); MSSQLConnectionFactory client = new MSSQLConnectionFactory(ctx, options); ctx.dispatch(null, v -> { client.connect() .map(conn -> { - MSSQLConnectionImpl msConn = new MSSQLConnectionImpl(client, ctx, conn); + MSSQLConnectionImpl msConn = new MSSQLConnectionImpl(client, ctx, conn, tracer); conn.init(msConn); return msConn; }).onComplete(promise); diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLPoolImpl.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLPoolImpl.java index bb441172c..7d795310a 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLPoolImpl.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLPoolImpl.java @@ -23,13 +23,15 @@ import io.vertx.sqlclient.impl.Connection; import io.vertx.sqlclient.impl.PoolBase; import io.vertx.sqlclient.impl.SqlConnectionImpl; +import io.vertx.sqlclient.impl.tracing.QueryTracer; import java.util.function.Function; public class MSSQLPoolImpl extends PoolBase implements MSSQLPool { public static MSSQLPoolImpl create(ContextInternal context, boolean closeVertx, MSSQLConnectOptions connectOptions, PoolOptions poolOptions) { - MSSQLPoolImpl pool = new MSSQLPoolImpl(context, new MSSQLConnectionFactory(context, connectOptions), poolOptions); + QueryTracer tracer = context.tracer() == null ? null : new QueryTracer(context.tracer(), connectOptions); + MSSQLPoolImpl pool = new MSSQLPoolImpl(context, new MSSQLConnectionFactory(context, connectOptions), tracer, poolOptions); CloseFuture closeFuture = pool.closeFuture(); if (closeVertx) { closeFuture.onComplete(ar -> context.owner().close()); @@ -41,8 +43,8 @@ public static MSSQLPoolImpl create(ContextInternal context, boolean closeVertx, private final MSSQLConnectionFactory connectionFactory; - private MSSQLPoolImpl(ContextInternal context, MSSQLConnectionFactory factory, PoolOptions poolOptions) { - super(context, factory, poolOptions); + private MSSQLPoolImpl(ContextInternal context, MSSQLConnectionFactory factory, QueryTracer tracer, PoolOptions poolOptions) { + super(context, factory, tracer, poolOptions); this.connectionFactory = factory; } @@ -59,7 +61,7 @@ public void connect(Handler> completionHandler) { @Override protected SqlConnectionImpl wrap(ContextInternal context, Connection connection) { - return new MSSQLConnectionImpl(connectionFactory, context, connection); + return new MSSQLConnectionImpl(connectionFactory, context, connection, tracer); } @Override diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLConnectionImpl.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLConnectionImpl.java index acccab6c8..c4a7c48f3 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLConnectionImpl.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLConnectionImpl.java @@ -31,6 +31,7 @@ import io.vertx.mysqlclient.impl.command.StatisticsCommand; import io.vertx.sqlclient.impl.Connection; import io.vertx.sqlclient.impl.SqlConnectionImpl; +import io.vertx.sqlclient.impl.tracing.QueryTracer; public class MySQLConnectionImpl extends SqlConnectionImpl implements MySQLConnection { @@ -42,15 +43,16 @@ public static Future connect(Vertx vertx, MySQLConnectOptions o } catch (Exception e) { return ctx.failedFuture(e); } + QueryTracer tracer = ctx.tracer() == null ? null : new QueryTracer(ctx.tracer(), options); Promise promise = ctx.promise(); - ctx.dispatch(null, v -> connect(client, ctx, promise)); + ctx.dispatch(null, v -> connect(client, ctx, tracer, promise)); return promise.future(); } - private static void connect(MySQLConnectionFactory client, ContextInternal ctx, Promise promise) { + private static void connect(MySQLConnectionFactory client, ContextInternal ctx, QueryTracer tracer, Promise promise) { client.connect() .map(conn -> { - MySQLConnectionImpl mySQLConnection = new MySQLConnectionImpl(client, ctx, conn); + MySQLConnectionImpl mySQLConnection = new MySQLConnectionImpl(client, ctx, conn, tracer); conn.init(mySQLConnection); return (MySQLConnection) mySQLConnection; }).onComplete(promise); @@ -58,8 +60,8 @@ private static void connect(MySQLConnectionFactory client, ContextInternal ctx, private final MySQLConnectionFactory factory; - public MySQLConnectionImpl(MySQLConnectionFactory factory, ContextInternal context, Connection conn) { - super(context, conn); + public MySQLConnectionImpl(MySQLConnectionFactory factory, ContextInternal context, Connection conn, QueryTracer tracer) { + super(context, conn, tracer); this.factory = factory; } diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLPoolImpl.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLPoolImpl.java index b86021a35..aa587f360 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLPoolImpl.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLPoolImpl.java @@ -21,11 +21,13 @@ import io.vertx.sqlclient.impl.Connection; import io.vertx.sqlclient.impl.PoolBase; import io.vertx.sqlclient.impl.SqlConnectionImpl; +import io.vertx.sqlclient.impl.tracing.QueryTracer; public class MySQLPoolImpl extends PoolBase implements MySQLPool { public static MySQLPoolImpl create(ContextInternal context, boolean closeVertx, MySQLConnectOptions connectOptions, PoolOptions poolOptions) { - MySQLPoolImpl pool = new MySQLPoolImpl(context, new MySQLConnectionFactory(context, connectOptions), poolOptions); + QueryTracer tracer = context.tracer() == null ? null : new QueryTracer(context.tracer(), connectOptions); + MySQLPoolImpl pool = new MySQLPoolImpl(context, new MySQLConnectionFactory(context, connectOptions), tracer, poolOptions); CloseFuture closeFuture = pool.closeFuture(); if (closeVertx) { closeFuture.onComplete(ar -> context.owner().close()); @@ -37,8 +39,8 @@ public static MySQLPoolImpl create(ContextInternal context, boolean closeVertx, private final MySQLConnectionFactory factory; - private MySQLPoolImpl(ContextInternal context, MySQLConnectionFactory factory, PoolOptions poolOptions) { - super(context, factory, poolOptions); + private MySQLPoolImpl(ContextInternal context, MySQLConnectionFactory factory, QueryTracer tracer, PoolOptions poolOptions) { + super(context, factory, tracer, poolOptions); this.factory = factory; } @@ -49,6 +51,6 @@ public void connect(Handler> completionHandler) { @Override protected SqlConnectionImpl wrap(ContextInternal context, Connection conn) { - return new MySQLConnectionImpl(factory, context, conn); + return new MySQLConnectionImpl(factory, context, conn, tracer); } } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java index 890fe0e07..12b0a3fa9 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java @@ -28,6 +28,7 @@ import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Vertx; +import io.vertx.sqlclient.impl.tracing.QueryTracer; public class PgConnectionImpl extends SqlConnectionImpl implements PgConnection { @@ -38,7 +39,8 @@ public static Future connect(ContextInternal context, PgConnectOpt PgConnectionFactory client = new PgConnectionFactory(context.owner(), context, options); return client.connect() .map(conn -> { - PgConnectionImpl pgConn = new PgConnectionImpl(client, context, conn); + QueryTracer tracer = context.tracer() == null ? null : new QueryTracer(context.tracer(), options); + PgConnectionImpl pgConn = new PgConnectionImpl(client, context, conn, tracer); conn.init(pgConn); return pgConn; }); @@ -48,8 +50,8 @@ public static Future connect(ContextInternal context, PgConnectOpt private final PgConnectionFactory factory; private volatile Handler notificationHandler; - PgConnectionImpl(PgConnectionFactory factory, ContextInternal context, Connection conn) { - super(context, conn); + PgConnectionImpl(PgConnectionFactory factory, ContextInternal context, Connection conn, QueryTracer tracer) { + super(context, conn, tracer); this.factory = factory; } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgPoolImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgPoolImpl.java index 16084d849..adde75cea 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgPoolImpl.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgPoolImpl.java @@ -25,6 +25,7 @@ import io.vertx.sqlclient.impl.PoolBase; import io.vertx.sqlclient.impl.SqlConnectionImpl; import io.vertx.core.*; +import io.vertx.sqlclient.impl.tracing.QueryTracer; /** * Todo : @@ -38,7 +39,8 @@ public class PgPoolImpl extends PoolBase implements PgPool { public static PgPoolImpl create(ContextInternal context, boolean closeVertx, PgConnectOptions connectOptions, PoolOptions poolOptions) { - PgPoolImpl pool = new PgPoolImpl(context, new PgConnectionFactory(context.owner(), context, connectOptions), poolOptions); + QueryTracer tracer = context.tracer() == null ? null : new QueryTracer(context.tracer(), connectOptions); + PgPoolImpl pool = new PgPoolImpl(context, new PgConnectionFactory(context.owner(), context, connectOptions), tracer, poolOptions); CloseFuture closeFuture = pool.closeFuture(); if (closeVertx) { closeFuture.onComplete(ar -> context.owner().close()); @@ -50,8 +52,8 @@ public static PgPoolImpl create(ContextInternal context, boolean closeVertx, PgC private final PgConnectionFactory factory; - private PgPoolImpl(ContextInternal context, PgConnectionFactory factory, PoolOptions poolOptions) { - super(context, factory, poolOptions); + private PgPoolImpl(ContextInternal context, PgConnectionFactory factory, QueryTracer tracer, PoolOptions poolOptions) { + super(context, factory, tracer, poolOptions); this.factory = factory; } @@ -68,6 +70,6 @@ public void connect(Handler> completionHandler) { @Override protected SqlConnectionImpl wrap(ContextInternal context, Connection conn) { - return new PgConnectionImpl(factory, context, conn); + return new PgConnectionImpl(factory, context, conn, tracer); } } diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/TracingTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/TracingTest.java new file mode 100644 index 000000000..3321b48a2 --- /dev/null +++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/TracingTest.java @@ -0,0 +1,185 @@ +package io.vertx.pgclient; + +import io.vertx.core.Context; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; +import io.vertx.core.spi.VertxTracerFactory; +import io.vertx.core.spi.tracing.TagExtractor; +import io.vertx.core.spi.tracing.VertxTracer; +import io.vertx.core.tracing.TracingOptions; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.sqlclient.PoolOptions; +import io.vertx.sqlclient.RowSet; +import io.vertx.sqlclient.SqlClient; +import io.vertx.sqlclient.Tuple; +import io.vertx.sqlclient.impl.tracing.QueryRequest; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.Function; + +public class TracingTest extends PgTestBase { + + Vertx vertx; + VertxTracer tracer; + PgPool pool; + + @Before + public void setup() throws Exception { + super.setup(); + vertx = Vertx.vertx(new VertxOptions().setTracingOptions( + new TracingOptions().setEnabled(true).setFactory(tracingOptions -> new VertxTracer() { + @Override + public Object sendRequest(Context context, Object request, String operation, BiConsumer headers, TagExtractor tagExtractor) { + return tracer.sendRequest(context, request, operation, headers, tagExtractor); + } + + @Override + public void receiveResponse(Context context, Object response, Object payload, Throwable failure, TagExtractor tagExtractor) { + tracer.receiveResponse(context, response, payload, failure, tagExtractor); + } + })) + ); + pool = PgPool.pool(vertx, options, new PoolOptions()); + } + + @After + public void teardown(TestContext ctx) { + vertx.close(ctx.asyncAssertSuccess()); + } + + @Test + public void testTraceSimpleQuery(TestContext ctx) { + String sql = "SELECT * FROM Fortune WHERE id=1"; + testTraceQuery(ctx, sql, Collections.emptyList(), conn -> conn.query(sql).execute()); + } + + @Test + public void testTracePreparedQuery(TestContext ctx) { + String sql = "SELECT * FROM Fortune WHERE id=$1"; + Tuple tuple = Tuple.of(1); + testTraceQuery(ctx, sql, Collections.singletonList(tuple), conn -> conn.preparedQuery(sql).execute(tuple)); + } + + @Test + public void testTraceBatchQuery(TestContext ctx) { + String sql = "SELECT * FROM Fortune WHERE id=$1"; + List tuples = Arrays.asList(Tuple.of(1), Tuple.of(2)); + testTraceQuery(ctx, sql, tuples, conn -> conn.preparedQuery(sql).executeBatch(tuples)); + } + + public void testTraceQuery(TestContext ctx, String expectedSql, List expectedTuples, Function> fn) { + AtomicBoolean called = new AtomicBoolean(); + AtomicReference requestContext = new AtomicReference<>(); + AtomicReference responseContext = new AtomicReference<>(); + Object expectedPayload = new Object(); + tracer = new VertxTracer() { + @Override + public Object sendRequest(Context context, R request, String operation, BiConsumer headers, TagExtractor tagExtractor) { + QueryRequest query = (QueryRequest) request; + ctx.assertEquals(expectedSql, query.sql()); + ctx.assertEquals(expectedTuples, query.tuples()); + Map tags = tagExtractor.extract(request); + ctx.assertEquals("client", tags.get("span.kind")); + ctx.assertEquals("sql", tags.get("db.type")); + ctx.assertEquals(expectedSql, tags.get("db.statement")); + requestContext.set(context); + return expectedPayload; + } + @Override + public void receiveResponse(Context context, R response, Object payload, Throwable failure, TagExtractor tagExtractor) { + RowSet rs = (RowSet) response; + ctx.assertTrue(rs.iterator().hasNext()); + ctx.assertEquals(expectedPayload, payload); + ctx.assertNull(failure); + called.set(true); + responseContext.set(context); + } + }; + Async async = ctx.async(); + vertx.runOnContext(v1 -> { + Context context = Vertx.currentContext(); + pool.getConnection(ctx.asyncAssertSuccess(conn -> { + fn.apply(conn).onComplete(ctx.asyncAssertSuccess(v2 -> { + conn.close(ctx.asyncAssertSuccess(v3 -> { + vertx.runOnContext(v4 -> { + ctx.assertEquals(context, requestContext.get()); + ctx.assertEquals(context, responseContext.get()); + ctx.assertTrue(called.get()); + async.complete(); + }); + })); + })); + })); + }); + } + + @Test + public void testTracingFailure(TestContext ctx) { + AtomicBoolean called = new AtomicBoolean(); + tracer = new VertxTracer() { + @Override + public Object sendRequest(Context context, R request, String operation, BiConsumer headers, TagExtractor tagExtractor) { + return null; + } + @Override + public void receiveResponse(Context context, R response, Object payload, Throwable failure, TagExtractor tagExtractor) { + ctx.assertNull(response); + ctx.assertNotNull(failure); + called.set(true); + } + }; + pool.getConnection(ctx.asyncAssertSuccess(conn -> { + conn + .preparedQuery("SELECT 1 / $1") + .execute(Tuple.of(0), ctx.asyncAssertFailure(err -> { + ctx.assertTrue(called.get()); + conn.close(); + })); + })); + } + + @Test + public void testMappingFailure(TestContext ctx) { + RuntimeException failure = new RuntimeException(); + AtomicInteger called = new AtomicInteger(); + String sql = "SELECT * FROM Fortune WHERE id=$1"; + tracer = new VertxTracer() { + @Override + public Object sendRequest(Context context, R request, String operation, BiConsumer headers, TagExtractor tagExtractor) { + return null; + } + @Override + public void receiveResponse(Context context, R response, Object payload, Throwable failure, TagExtractor tagExtractor) { + ctx.assertEquals(1, called.incrementAndGet()); + } + }; + Async async = ctx.async(); + pool.getConnection(ctx.asyncAssertSuccess(conn -> { + conn + .preparedQuery(sql) + .mapping(row -> { + throw failure; + }) + .execute(Tuple.of(1), ctx.asyncAssertFailure(err -> { + conn.close(ctx.asyncAssertSuccess(v1 -> { + vertx.runOnContext(v2 -> { + ctx.assertEquals(1, called.get()); + async.complete(); + }); + })); + })); + })); + } +} diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/CursorImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/CursorImpl.java index 9fe1365f8..a52c65c43 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/CursorImpl.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/CursorImpl.java @@ -39,7 +39,7 @@ public class CursorImpl implements Cursor { private String id; private boolean closed; - private SqlResultHandler, RowSetImpl, RowSet> result; + private QueryResultBuilder, RowSetImpl, RowSet> result; CursorImpl(PreparedStatementImpl ps, ContextInternal context, TupleInternal params) { this.ps = ps; @@ -66,13 +66,12 @@ public void read(int count, Handler>> handler) { @Override public synchronized Future> read(int count) { Promise> promise = context.promise(); - SqlResultBuilder, RowSetImpl, RowSet> builder = new SqlResultBuilder<>(RowSetImpl.FACTORY, RowSetImpl.COLLECTOR); - SqlResultHandler, RowSetImpl, RowSet> handler = builder.createHandler(promise); + QueryExecutor, RowSetImpl, RowSet> builder = new QueryExecutor<>(ps.tracer, RowSetImpl.FACTORY, RowSetImpl.COLLECTOR); if (id == null) { id = UUID.randomUUID().toString(); - result = builder.execute(ps.conn, ps.ps, ps.autoCommit, params, count, id, false, handler); + result = builder.executeExtendedQuery(ps.conn, ps.ps, ps.autoCommit, params, count, id, false, promise); } else if (result.isSuspended()) { - result = builder.execute(ps.conn, ps.ps, ps.autoCommit, params, count, id, true, handler); + result = builder.executeExtendedQuery(ps.conn, ps.ps, ps.autoCommit, params, count, id, true, promise); } else { throw new IllegalStateException(); } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolBase.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolBase.java index e9dfff005..37a251e51 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolBase.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolBase.java @@ -31,6 +31,7 @@ import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.sqlclient.impl.pool.ConnectionPool; +import io.vertx.sqlclient.impl.tracing.QueryTracer; import java.util.function.Function; @@ -45,7 +46,8 @@ public abstract class PoolBase

extends SqlClientBase

implemen private final ConnectionPool pool; private final CloseFuture closeFuture; - public PoolBase(ContextInternal context, ConnectionFactory factory, PoolOptions poolOptions) { + public PoolBase(ContextInternal context, ConnectionFactory factory, QueryTracer tracer, PoolOptions poolOptions) { + super(tracer); this.vertx = context.owner(); this.factory = factory; this.pool = new ConnectionPool(factory, context, poolOptions.getMaxSize(), poolOptions.getMaxWaitQueueSize()); diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PreparedStatementImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PreparedStatementImpl.java index 2344f56c0..662eb1015 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PreparedStatementImpl.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PreparedStatementImpl.java @@ -29,6 +29,7 @@ import io.vertx.sqlclient.Row; import io.vertx.sqlclient.Tuple; import io.vertx.core.*; +import io.vertx.sqlclient.impl.tracing.QueryTracer; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -40,18 +41,20 @@ */ class PreparedStatementImpl implements PreparedStatement { - static PreparedStatement create(Connection conn, ContextInternal context, io.vertx.sqlclient.impl.PreparedStatement ps, boolean autoCommit) { - return new PreparedStatementImpl(conn, context, ps, autoCommit); + static PreparedStatement create(Connection conn, QueryTracer tracer, ContextInternal context, io.vertx.sqlclient.impl.PreparedStatement ps, boolean autoCommit) { + return new PreparedStatementImpl(conn, tracer, context, ps, autoCommit); } final Connection conn; + final QueryTracer tracer; final ContextInternal context; final io.vertx.sqlclient.impl.PreparedStatement ps; final boolean autoCommit; private final AtomicBoolean closed = new AtomicBoolean(); - private PreparedStatementImpl(Connection conn, ContextInternal context, io.vertx.sqlclient.impl.PreparedStatement ps, boolean autoCommit) { + private PreparedStatementImpl(Connection conn, QueryTracer tracer, ContextInternal context, io.vertx.sqlclient.impl.PreparedStatement ps, boolean autoCommit) { this.conn = conn; + this.tracer = tracer; this.context = context; this.ps = ps; this.autoCommit = autoCommit; @@ -59,7 +62,7 @@ private PreparedStatementImpl(Connection conn, ContextInternal context, io.vertx @Override public PreparedQuery> query() { - SqlResultBuilder, RowSetImpl, RowSet> builder = new SqlResultBuilder<>(RowSetImpl.FACTORY, RowSetImpl.COLLECTOR); + QueryExecutor, RowSetImpl, RowSet> builder = new QueryExecutor<>(tracer, RowSetImpl.FACTORY, RowSetImpl.COLLECTOR); return new PreparedStatementQuery<>(builder); } @@ -67,11 +70,10 @@ > void execute(Tuple args, int fetch, String cursorId, boolean suspended, - SqlResultBuilder builder, + QueryExecutor builder, Promise p) { if (context == Vertx.currentContext()) { - SqlResultHandler handler = builder.createHandler(p); - builder.execute( + builder.executeExtendedQuery( conn, ps, autoCommit, @@ -79,7 +81,7 @@ > void execute(Tuple args, fetch, cursorId, suspended, - handler); + p); } else { context.runOnContext(v -> execute(args, fetch, cursorId, suspended, builder, p)); } @@ -111,11 +113,10 @@ public Future close() { } > void executeBatch(List argsList, - SqlResultBuilder builder, + QueryExecutor builder, Promise p) { if (context == Vertx.currentContext()) { - SqlResultHandler handler = builder.createHandler(p); - builder.executeBatch(conn, ps, autoCommit, argsList, handler); + builder.executeBatchQuery(conn, ps, autoCommit, argsList, p); } else { context.runOnContext(v -> executeBatch(argsList, builder, p)); } @@ -141,12 +142,12 @@ void closeCursor(String cursorId, Promise promise) { private class PreparedStatementQuery> extends QueryBase implements PreparedQuery { - public PreparedStatementQuery(SqlResultBuilder builder) { + public PreparedStatementQuery(QueryExecutor builder) { super(builder); } @Override - protected > QueryBase copy(SqlResultBuilder builder) { + protected > QueryBase copy(QueryExecutor builder) { return new PreparedStatementQuery<>(builder); } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryBase.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryBase.java index 7575e261b..3dd75d554 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryBase.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryBase.java @@ -31,23 +31,23 @@ */ abstract class QueryBase> implements Query { - protected final SqlResultBuilder builder; + protected final QueryExecutor builder; - public QueryBase(SqlResultBuilder builder) { + public QueryBase(QueryExecutor builder) { this.builder = builder; } - protected abstract > QueryBase copy(SqlResultBuilder builder); + protected abstract > QueryBase copy(QueryExecutor builder); @Override public Query> collecting(Collector collector) { Objects.requireNonNull(collector, "Supplied collector must not be null"); - return copy(new SqlResultBuilder<>(SqlResultImpl::new, collector)); + return copy(new QueryExecutor<>(builder.tracer(), SqlResultImpl::new, collector)); } @Override public Query> mapping(Function mapper) { Objects.requireNonNull(mapper, "Supplied mapper must not be null"); - return copy(new SqlResultBuilder<>(RowSetImpl.factory(), RowSetImpl.collector(mapper))); + return copy(new QueryExecutor<>(builder.tracer(), RowSetImpl.factory(), RowSetImpl.collector(mapper))); } } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java new file mode 100644 index 000000000..5214cbb32 --- /dev/null +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java @@ -0,0 +1,195 @@ +/* + * Copyright (C) 2017 Julien Viet + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.vertx.sqlclient.impl; + +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.impl.ContextInternal; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.SqlResult; +import io.vertx.sqlclient.Tuple; +import io.vertx.sqlclient.impl.command.BiCommand; +import io.vertx.sqlclient.impl.command.CommandScheduler; +import io.vertx.sqlclient.impl.command.ExtendedBatchQueryCommand; +import io.vertx.sqlclient.impl.command.ExtendedQueryCommand; +import io.vertx.sqlclient.impl.command.PrepareStatementCommand; +import io.vertx.sqlclient.impl.command.SimpleQueryCommand; +import io.vertx.sqlclient.impl.tracing.QueryTracer; + +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collector; + +/** + * Executes query. + */ +class QueryExecutor, L extends SqlResult> { + + private final QueryTracer tracer; + private final Function factory; + private final Collector collector; + + public QueryExecutor(QueryTracer tracer, + Function factory, + Collector collector) { + this.tracer = tracer; + this.factory = factory; + this.collector = collector; + } + + QueryTracer tracer() { + return tracer; + } + + private QueryResultBuilder createHandler(Promise promise, Object payload) { + return new QueryResultBuilder<>(factory, tracer, payload, promise); + } + + void executeSimpleQuery(CommandScheduler scheduler, + String sql, + boolean autoCommit, + boolean singleton, + Promise promise) { + ContextInternal context = (ContextInternal) promise.future().context(); + Object payload; + if (tracer != null) { + payload = tracer.sendRequest(context, sql); + } else { + payload = null; + } + QueryResultBuilder handler = createHandler(promise, payload); + scheduler.schedule(new SimpleQueryCommand<>(sql, singleton, autoCommit, collector, handler), handler); + } + + QueryResultBuilder executeExtendedQuery(CommandScheduler scheduler, + PreparedStatement preparedStatement, + boolean autoCommit, + Tuple arguments, + int fetch, + String cursorId, + boolean suspended, + Promise promise) { + ContextInternal context = (ContextInternal) promise.future().context(); + Object payload; + if (tracer != null) { + payload = tracer.sendRequest(context, preparedStatement.sql(), arguments); + } else { + payload = null; + } + QueryResultBuilder handler = createHandler(promise, payload); + String msg = preparedStatement.prepare((TupleInternal) arguments); + if (msg != null) { + handler.fail(msg); + return null; + } + ExtendedQueryCommand cmd = new ExtendedQueryCommand<>( + preparedStatement, + arguments, + fetch, + cursorId, + suspended, + autoCommit, + collector, + handler); + scheduler.schedule(cmd, handler); + return handler; + } + + void executeExtendedQuery(CommandScheduler scheduler, String sql, boolean autoCommit, Tuple arguments, Promise promise) { + ContextInternal context = (ContextInternal) promise.future().context(); + Object payload; + if (tracer != null) { + payload = tracer.sendRequest(context, sql, arguments); + } else { + payload = null; + } + QueryResultBuilder handler = this.createHandler(promise, payload); + BiCommand cmd = new BiCommand<>(new PrepareStatementCommand(sql, true), ps -> { + String msg = ps.prepare((TupleInternal) arguments); + if (msg != null) { + return Future.failedFuture(msg); + } + return Future.succeededFuture(createExtendedQueryCommand(ps, autoCommit, arguments, handler)); + }); + scheduler.schedule(cmd, handler); + } + + private ExtendedQueryCommand createExtendedQueryCommand(PreparedStatement preparedStatement, + boolean autoCommit, + Tuple args, + QueryResultBuilder handler) { + return new ExtendedQueryCommand<>( + preparedStatement, + args, + autoCommit, + collector, + handler); + } + + void executeBatchQuery(CommandScheduler scheduler, + PreparedStatement preparedStatement, + boolean autoCommit, + List batch, + Promise promise) { + ContextInternal context = (ContextInternal) promise.future().context(); + Object payload; + if (tracer != null) { + payload = tracer.sendRequest(context, preparedStatement.sql(), batch); + } else { + payload = null; + } + QueryResultBuilder handler = createHandler(promise, payload); + for (Tuple args : batch) { + String msg = preparedStatement.prepare((TupleInternal)args); + if (msg != null) { + handler.fail(msg); + return; + } + } + ExtendedBatchQueryCommand cmd = new ExtendedBatchQueryCommand<>(preparedStatement, batch, autoCommit, collector, handler); + scheduler.schedule(cmd, handler); + } + + void executeBatchQuery(CommandScheduler scheduler, String sql, boolean autoCommit, List batch, Promise promise) { + ContextInternal context = (ContextInternal) promise.future().context(); + Object payload; + if (tracer != null) { + payload = tracer.sendRequest(context, sql, batch); + } else { + payload = null; + } + QueryResultBuilder handler = createHandler(promise, payload); + BiCommand cmd = new BiCommand<>(new PrepareStatementCommand(sql, true), ps -> { + for (Tuple args : batch) { + String msg = ps.prepare((TupleInternal) args); + if (msg != null) { + return Future.failedFuture(msg); + } + } + return Future.succeededFuture(createBatchQueryCommand(ps, autoCommit, batch, handler)); + }); + scheduler.schedule(cmd, handler); + } + + private ExtendedBatchQueryCommand createBatchQueryCommand(PreparedStatement preparedStatement, + boolean autoCommit, + List argsList, + QueryResultBuilder handler) { + return new ExtendedBatchQueryCommand<>(preparedStatement, argsList, autoCommit, collector, handler); + } +} diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SqlResultHandler.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryResultBuilder.java similarity index 67% rename from vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SqlResultHandler.java rename to vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryResultBuilder.java index 62aaedf9e..cf37b6dab 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SqlResultHandler.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryResultBuilder.java @@ -19,26 +19,34 @@ import io.vertx.core.Future; import io.vertx.core.Promise; +import io.vertx.core.impl.ContextInternal; import io.vertx.sqlclient.SqlResult; import io.vertx.sqlclient.PropertyKind; +import io.vertx.sqlclient.impl.tracing.QueryTracer; import java.util.HashMap; import java.util.function.Function; /** - * A query result handler for building a {@link SqlResult}. + * A query result for building a {@link SqlResult}. */ -class SqlResultHandler, L extends SqlResult> implements QueryResultHandler, Promise { +class QueryResultBuilder, L extends SqlResult> implements QueryResultHandler, Promise { private final Promise handler; private final Function factory; + private final ContextInternal context; + private final QueryTracer tracer; + private final Object tracingPayload; private R first; private R current; private Throwable failure; private boolean suspended; - SqlResultHandler(Function factory, Promise handler) { + QueryResultBuilder(Function factory, QueryTracer tracer, Object tracingPayload, Promise handler) { this.factory = factory; + this.context = (ContextInternal) handler.future().context(); + this.tracer = tracer; + this.tracingPayload = tracingPayload; this.handler = handler; } @@ -82,15 +90,27 @@ public void addProperty(PropertyKind property, V value) { public boolean tryComplete(Boolean result) { suspended = result; if (failure != null) { - return handler.tryFail(failure); + return tryFail(failure); } else { - return handler.tryComplete((L) first); + boolean completed = handler.tryComplete((L) first); + if (completed) { + if (tracer != null) { + tracer.receiveResponse(context, tracingPayload, first, null); + } + } + return completed; } } @Override public boolean tryFail(Throwable cause) { - return handler.tryFail(cause); + boolean completed = handler.tryFail(cause); + if (completed) { + if (tracer != null) { + tracer.receiveResponse(context, tracingPayload, null, cause); + } + } + return completed; } @Override diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SqlClientBase.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SqlClientBase.java index 627d32235..8f6949011 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SqlClientBase.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SqlClientBase.java @@ -20,10 +20,7 @@ import io.vertx.core.Promise; import io.vertx.sqlclient.PreparedQuery; import io.vertx.sqlclient.Query; -import io.vertx.sqlclient.impl.command.BiCommand; import io.vertx.sqlclient.impl.command.CommandScheduler; -import io.vertx.sqlclient.impl.command.ExtendedBatchQueryCommand; -import io.vertx.sqlclient.impl.command.PrepareStatementCommand; import io.vertx.sqlclient.SqlResult; import io.vertx.sqlclient.RowSet; import io.vertx.sqlclient.Row; @@ -32,6 +29,7 @@ import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.sqlclient.impl.tracing.QueryTracer; import java.util.List; import java.util.function.Function; @@ -39,6 +37,12 @@ public abstract class SqlClientBase implements SqlClientInternal, CommandScheduler { + protected final QueryTracer tracer; + + public SqlClientBase(QueryTracer tracer) { + this.tracer = tracer; + } + @Override public int appendQueryPlaceholder(StringBuilder queryBuilder, int index, int current) { queryBuilder.append("?"); @@ -51,13 +55,13 @@ public int appendQueryPlaceholder(StringBuilder queryBuilder, int index, int cur @Override public Query> query(String sql) { - SqlResultBuilder, RowSetImpl, RowSet> builder = new SqlResultBuilder<>(RowSetImpl.FACTORY, RowSetImpl.COLLECTOR); + QueryExecutor, RowSetImpl, RowSet> builder = new QueryExecutor<>(tracer, RowSetImpl.FACTORY, RowSetImpl.COLLECTOR); return new QueryImpl<>(autoCommit(), false, sql, builder); } @Override public PreparedQuery> preparedQuery(String sql) { - SqlResultBuilder, RowSetImpl, RowSet> builder = new SqlResultBuilder<>(RowSetImpl.FACTORY, RowSetImpl.COLLECTOR); + QueryExecutor, RowSetImpl, RowSet> builder = new QueryExecutor<>(tracer, RowSetImpl.FACTORY, RowSetImpl.COLLECTOR); return new PreparedQueryImpl<>(autoCommit(), false, sql, builder); } @@ -71,7 +75,7 @@ private class QueryImpl> extends QueryBase { protected final boolean singleton; protected final String sql; - private QueryImpl(boolean autoCommit, boolean singleton, String sql, SqlResultBuilder builder) { + private QueryImpl(boolean autoCommit, boolean singleton, String sql, QueryExecutor builder) { super(builder); this.autoCommit = autoCommit; this.singleton = singleton; @@ -79,7 +83,7 @@ private QueryImpl(boolean autoCommit, boolean singleton, String sql, SqlResultBu } @Override - protected > QueryBase copy(SqlResultBuilder builder) { + protected > QueryBase copy(QueryExecutor builder) { return new QueryImpl<>(autoCommit, singleton, sql, builder); } @@ -96,14 +100,13 @@ public Future execute() { } protected void execute(Promise promise) { - SqlResultHandler handler = builder.createHandler(promise); - builder.execute(SqlClientBase.this, sql, autoCommit, singleton, handler); + builder.executeSimpleQuery(SqlClientBase.this, sql, autoCommit, singleton, promise); } } private class PreparedQueryImpl> extends QueryImpl implements PreparedQuery { - private PreparedQueryImpl(boolean autoCommit, boolean singleton, String sql, SqlResultBuilder builder) { + private PreparedQueryImpl(boolean autoCommit, boolean singleton, String sql, QueryExecutor builder) { super(autoCommit, singleton, sql, builder); } @@ -118,7 +121,7 @@ public PreparedQuery> mapping(Function mapper) { } @Override - protected > QueryBase copy(SqlResultBuilder builder) { + protected > QueryBase copy(QueryExecutor builder) { return new PreparedQueryImpl<>(autoCommit, singleton, sql, builder); } @@ -128,15 +131,7 @@ protected void execute(Promise promise) { } private void execute(Tuple arguments, Promise promise) { - SqlResultHandler handler = builder.createHandler(promise); - BiCommand abc = new BiCommand<>(new PrepareStatementCommand(sql, true), ps -> { - String msg = ps.prepare((TupleInternal) arguments); - if (msg != null) { - return Future.failedFuture(msg); - } - return Future.succeededFuture(builder.createCommand(ps, autoCommit, arguments, handler)); - }); - schedule(abc, handler); + builder.executeExtendedQuery(SqlClientBase.this, sql, autoCommit, arguments, promise); } @Override @@ -164,17 +159,7 @@ public Future executeBatch(List batch) { } private void executeBatch(List batch, Promise promise) { - SqlResultHandler handler = builder.createHandler(promise); - BiCommand abc = new BiCommand<>(new PrepareStatementCommand(sql, true), ps -> { - for (Tuple args : batch) { - String msg = ps.prepare((TupleInternal) args); - if (msg != null) { - return Future.failedFuture(msg); - } - } - return Future.succeededFuture(builder.createBatchCommand(ps, autoCommit, batch, handler)); - }); - schedule(abc, handler); + builder.executeBatchQuery(SqlClientBase.this, sql, autoCommit, batch, promise); } } } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SqlConnectionBase.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SqlConnectionBase.java index 4961d00ce..cbcbbb25e 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SqlConnectionBase.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SqlConnectionBase.java @@ -22,6 +22,7 @@ import io.vertx.sqlclient.SqlClient; import io.vertx.sqlclient.impl.command.PrepareStatementCommand; import io.vertx.core.*; +import io.vertx.sqlclient.impl.tracing.QueryTracer; /** * @author Julien Viet @@ -31,7 +32,8 @@ public abstract class SqlConnectionBase extends SqlClientBa protected final ContextInternal context; protected final Connection conn; - protected SqlConnectionBase(ContextInternal context, Connection conn) { + protected SqlConnectionBase(ContextInternal context, Connection conn, QueryTracer tracer) { + super(tracer); this.context = context; this.conn = conn; } @@ -47,6 +49,6 @@ public C prepare(String sql, Handler> handler) { public Future prepare(String sql) { Promise promise = promise(); schedule(new PrepareStatementCommand(sql, false), promise); - return promise.future().map(cr -> PreparedStatementImpl.create(conn, context, cr, autoCommit())); + return promise.future().map(cr -> PreparedStatementImpl.create(conn, tracer, context, cr, autoCommit())); } } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SqlConnectionImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SqlConnectionImpl.java index d232c3ea4..c0ae506b7 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SqlConnectionImpl.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SqlConnectionImpl.java @@ -22,6 +22,7 @@ import io.vertx.sqlclient.impl.command.CommandBase; import io.vertx.sqlclient.Transaction; import io.vertx.core.*; +import io.vertx.sqlclient.impl.tracing.QueryTracer; /** * @author Julien Viet @@ -32,8 +33,8 @@ public class SqlConnectionImpl extends SqlConnectionBas private volatile Handler closeHandler; private TransactionImpl tx; - public SqlConnectionImpl(ContextInternal context, Connection conn) { - super(context, conn); + public SqlConnectionImpl(ContextInternal context, Connection conn, QueryTracer tracer) { + super(context, conn, tracer); } @Override diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SqlResultBuilder.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SqlResultBuilder.java deleted file mode 100644 index b50707ca9..000000000 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SqlResultBuilder.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Copyright (C) 2017 Julien Viet - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package io.vertx.sqlclient.impl; - -import io.vertx.core.Promise; -import io.vertx.sqlclient.Row; -import io.vertx.sqlclient.SqlResult; -import io.vertx.sqlclient.Tuple; -import io.vertx.sqlclient.impl.command.CommandScheduler; -import io.vertx.sqlclient.impl.command.ExtendedBatchQueryCommand; -import io.vertx.sqlclient.impl.command.ExtendedQueryCommand; -import io.vertx.sqlclient.impl.command.SimpleQueryCommand; - -import java.util.List; -import java.util.function.Function; -import java.util.stream.Collector; - -/** - * A query result handler for building a {@link SqlResult}. - */ -class SqlResultBuilder, L extends SqlResult> { - - private final Function factory; - private final Collector collector; - - public SqlResultBuilder(Function factory, - Collector collector) { - this.factory = factory; - this.collector = collector; - } - - SqlResultHandler createHandler(Promise resultHandler) { - return new SqlResultHandler<>(factory, resultHandler); - } - - void execute(CommandScheduler scheduler, - String sql, - boolean autoCommit, - boolean singleton, - SqlResultHandler handler) { - SimpleQueryCommand cmd = new SimpleQueryCommand<>(sql, singleton, autoCommit, collector, handler); - scheduler.schedule(cmd, handler); - } - - SqlResultHandler execute(CommandScheduler scheduler, - PreparedStatement preparedStatement, - boolean autoCommit, - Tuple args, - int fetch, - String cursorId, - boolean suspended, - SqlResultHandler handler) { - String msg = preparedStatement.prepare((TupleInternal) args); - if (msg != null) { - handler.fail(msg); - return null; - } - ExtendedQueryCommand cmd = new ExtendedQueryCommand<>( - preparedStatement, - args, - fetch, - cursorId, - suspended, - autoCommit, - collector, - handler); - scheduler.schedule(cmd, handler); - return handler; - } - - ExtendedQueryCommand createCommand(PreparedStatement preparedStatement, - boolean autoCommit, - Tuple args, - SqlResultHandler handler) { - return new ExtendedQueryCommand<>( - preparedStatement, - args, - autoCommit, - collector, - handler); - } - - - void executeBatch(CommandScheduler scheduler, - PreparedStatement preparedStatement, - boolean autoCommit, - List argsList, - SqlResultHandler handler) { - for (Tuple args : argsList) { - String msg = preparedStatement.prepare((TupleInternal)args); - if (msg != null) { - handler.fail(msg); - return; - } - } - ExtendedBatchQueryCommand cmd = new ExtendedBatchQueryCommand<>(preparedStatement, argsList, autoCommit, collector, handler); - scheduler.schedule(cmd, handler); - } - - ExtendedBatchQueryCommand createBatchCommand(PreparedStatement preparedStatement, - boolean autoCommit, - List argsList, - SqlResultHandler handler) { - return new ExtendedBatchQueryCommand<>(preparedStatement, argsList, autoCommit, collector, handler); - } -} diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/tracing/QueryRequest.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/tracing/QueryRequest.java new file mode 100644 index 000000000..c363577ab --- /dev/null +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/tracing/QueryRequest.java @@ -0,0 +1,29 @@ +package io.vertx.sqlclient.impl.tracing; + +import io.vertx.sqlclient.Tuple; + +import java.util.List; + +/** + * A traceable query. + */ +public class QueryRequest { + + final QueryTracer tracer; + final String sql; + final List tuples; + + public QueryRequest(QueryTracer tracer, String sql, List tuples) { + this.tracer = tracer; + this.sql = sql; + this.tuples = tuples; + } + + public String sql() { + return sql; + } + + public List tuples() { + return tuples; + } +} diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/tracing/QueryTracer.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/tracing/QueryTracer.java new file mode 100644 index 000000000..ec4a3e594 --- /dev/null +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/tracing/QueryTracer.java @@ -0,0 +1,93 @@ +package io.vertx.sqlclient.impl.tracing; + +import io.vertx.core.impl.ContextInternal; +import io.vertx.core.spi.tracing.TagExtractor; +import io.vertx.core.spi.tracing.VertxTracer; +import io.vertx.sqlclient.SqlConnectOptions; +import io.vertx.sqlclient.Tuple; + +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +/** + * Tracer for queries, wrapping the generic tracer. + */ +public class QueryTracer { + + enum RequestTags { + + // Generic + PEER_ADDRESS("peer.address", q -> q.tracer.address), + SPAN_KIND("span.kind", q -> "client"), + + // DB + // See https://github.com/opentracing/specification/blob/master/semantic_conventions.md + + DB_USER("db.user", q -> q.tracer.user), + DB_INSTANCE("db.instance", q -> q.tracer.database), + DB_STATEMENT("db.statement", QueryRequest::sql), + DB_TYPE("db.type", q -> "sql"); + + final String name; + final Function fn; + + RequestTags(String name, Function fn) { + this.name = name; + this.fn = fn; + } + } + + private static final TagExtractor REQUEST_TAG_EXTRACTOR = new TagExtractor() { + + private final RequestTags[] TAGS = RequestTags.values(); + + @Override + public int len(QueryRequest obj) { + return TAGS.length; + } + @Override + public String name(QueryRequest obj, int index) { + return TAGS[index].name; + } + @Override + public String value(QueryRequest obj, int index) { + return TAGS[index].fn.apply(obj); + } + }; + + private final VertxTracer tracer; + private final String address; + private final String host; + private final int port; + private final String user; + private final String database; + + public QueryTracer(VertxTracer tracer, SqlConnectOptions options) { + this.tracer = tracer; + this.address = options.getHost() + ":" + options.getPort(); + this.host = options.getHost(); + this.port = options.getPort(); + this.user = options.getUser(); + this.database = options.getDatabase(); + } + + public Object sendRequest(ContextInternal context, String sql) { + QueryRequest request = new QueryRequest(this, sql, Collections.emptyList()); + return tracer.sendRequest(context, request, "Query", (k,v) -> {}, REQUEST_TAG_EXTRACTOR); + } + + public Object sendRequest(ContextInternal context, String sql, Tuple tuple) { + QueryRequest request = new QueryRequest(this, sql, Collections.singletonList(tuple)); + return tracer.sendRequest(context, request, "Query", (k,v) -> {}, REQUEST_TAG_EXTRACTOR); + } + + public Object sendRequest(ContextInternal context, String sql, List tuples) { + QueryRequest request = new QueryRequest(this, sql, tuples); + return tracer.sendRequest(context, request, "Query", (k,v) -> {}, REQUEST_TAG_EXTRACTOR); + } + + public void receiveResponse(ContextInternal context, Object payload, Object result, Throwable failure) { + tracer.receiveResponse(context, result, payload, failure, TagExtractor.empty()); + } +}