Skip to content

Commit

Permalink
chore: support GSQL in tpcc benchmarking (#2018)
Browse files Browse the repository at this point in the history
* chore: support GSQL in tpcc benchmarking data loader

* Fix the expected NUMERIC error.

* Remove the code to restart the job

* Fix

* Support GSQL benchmark runner.

* Update docs.

* Fix the issue to call getter on null in getObject.

* Remove the rollback call if the transaction is aborted.

* Update benchmarks/tpcc/README.md

Co-authored-by: Knut Olav Løite <koloite@gmail.com>

* Update benchmarks/tpcc/README.md

Co-authored-by: Knut Olav Løite <koloite@gmail.com>

* Fix comments.

---------

Co-authored-by: Knut Olav Løite <koloite@gmail.com>
  • Loading branch information
hengfengli and olavloite committed Jul 8, 2024
1 parent 7906ab0 commit 372494e
Show file tree
Hide file tree
Showing 19 changed files with 638 additions and 98 deletions.
44 changes: 43 additions & 1 deletion benchmarks/tpcc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,27 @@ mvn spring-boot:run -Dspring-boot.run.arguments="

### Load data

Load data into a PostgreSQL-dialect DB:

```shell
mvn spring-boot:run -Dspring-boot.run.arguments="
--tpcc.benchmark-duration=PT600s
--tpcc.warehouses=10
--tpcc.benchmark-threads=1
--tpcc.load-data=true
--tpcc.truncate-before-load=false
--tpcc.run-benchmark=false
--tpcc.use-read-only-transactions=false
--tpcc.lock-scanned-ranges=false
--spanner.project=my-project
--spanner.instance=my-instance
--spanner.database=my-database
--pgadapter.credentials=/path/to/credentials.json
"
```

To load data into a GoogleSQL-dialect DB, you need to specify `client_lib_gsql` as the `benchmark-runner`. This instructs the benchmark application that it should use the Spanner Java client to connect to the database, and that it should use the GoogleSQL dialect:

```shell
mvn spring-boot:run -Dspring-boot.run.arguments="
--tpcc.benchmark-duration=PT600s
Expand All @@ -30,6 +51,7 @@ mvn spring-boot:run -Dspring-boot.run.arguments="
--tpcc.load-data=true
--tpcc.truncate-before-load=false
--tpcc.run-benchmark=false
--tpcc.benchmark-runner=client_lib_gsql
--tpcc.use-read-only-transactions=false
--tpcc.lock-scanned-ranges=false
--spanner.project=my-project
Expand All @@ -41,7 +63,7 @@ mvn spring-boot:run -Dspring-boot.run.arguments="

### Run benchmark

Currently, we support benchmark runners: `pgadapter`, `spanner_jdbc`, and `client_lib_pg`.
Currently, we support the following benchmark runners: `pgadapter`, `spanner_jdbc`, `client_lib_pg`, and `client_lib_gsql`.

Run with the default benchmark runner (PGAdapter with PG JDBC):

Expand Down Expand Up @@ -102,3 +124,23 @@ mvn spring-boot:run -Dspring-boot.run.arguments="
--pgadapter.credentials=/path/to/credentials.json
"
```

Run with the benchmark runner (Client library with GoogleSQL dialect):

```shell
mvn spring-boot:run -Dspring-boot.run.arguments="
--tpcc.benchmark-duration=PT600s
--tpcc.warehouses=10
--tpcc.benchmark-threads=1
--tpcc.load-data=false
--tpcc.truncate-before-load=false
--tpcc.run-benchmark=true
--tpcc.benchmark-runner=client_lib_gsql
--tpcc.use-read-only-transactions=true
--tpcc.lock-scanned-ranges=false
--spanner.project=my-project
--spanner.instance=my-instance
--spanner.database=my-database
--pgadapter.credentials=/path/to/credentials.json
"
```
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.cloud.pgadapter.tpcc.config.PGAdapterConfiguration;
import com.google.cloud.pgadapter.tpcc.config.SpannerConfiguration;
import com.google.cloud.pgadapter.tpcc.config.TpccConfiguration;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.jdbc.JdbcSqlExceptionFactory.JdbcAbortedException;
import java.io.IOException;
import java.math.BigDecimal;
Expand Down Expand Up @@ -44,6 +45,7 @@ abstract class AbstractBenchmarkRunner implements Runnable {
protected final PGAdapterConfiguration pgAdapterConfiguration;
protected final SpannerConfiguration spannerConfiguration;
protected final Metrics metrics;
protected final Dialect dialect;

private boolean failed;

Expand All @@ -52,12 +54,14 @@ abstract class AbstractBenchmarkRunner implements Runnable {
TpccConfiguration tpccConfiguration,
PGAdapterConfiguration pgAdapterConfiguration,
SpannerConfiguration spannerConfiguration,
Metrics metrics) {
Metrics metrics,
Dialect dialect) {
this.statistics = statistics;
this.tpccConfiguration = tpccConfiguration;
this.pgAdapterConfiguration = pgAdapterConfiguration;
this.spannerConfiguration = spannerConfiguration;
this.metrics = metrics;
this.dialect = dialect;
}

@Override
Expand Down Expand Up @@ -201,7 +205,7 @@ private void newOrder() throws SQLException {
new Object[] {districtNextOrderId + 1L, districtId, warehouseId});
executeParamStatement(
"INSERT INTO orders (o_id, d_id, w_id, c_id, o_entry_d, o_ol_cnt, o_all_local) "
+ "VALUES (?,?,?,?,NOW(),?,?)",
+ "VALUES (?,?,?,?,CURRENT_TIMESTAMP,?,?)",
new Object[] {
districtNextOrderId, districtId, warehouseId, customerId, orderLineCount, allLocal
});
Expand Down Expand Up @@ -436,7 +440,7 @@ private void payment() throws SQLException {
}
executeParamStatement(
"INSERT INTO history (d_id, w_id, c_id, h_d_id, h_w_id, h_date, h_amount, h_data) "
+ "VALUES (?,?,?,?,?,NOW(),?,?)",
+ "VALUES (?,?,?,?,?,CURRENT_TIMESTAMP,?,?)",
new Object[] {
customerDistrictId,
customerWarehouseId,
Expand Down Expand Up @@ -580,7 +584,7 @@ private void delivery() throws SQLException {
new Object[] {carrierId, newOrderId, customerId, districtId, warehouseId});
executeParamStatement(
"UPDATE order_line "
+ "SET ol_delivery_d = NOW() "
+ "SET ol_delivery_d = CURRENT_TIMESTAMP "
+ "WHERE o_id = ? AND c_id = ? AND d_id = ? AND w_id = ?",
new Object[] {newOrderId, customerId, districtId, warehouseId});
row =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.cloud.pgadapter.tpcc.config.TpccConfiguration;
import com.google.cloud.pgadapter.tpcc.dataloader.DataLoadStatus;
import com.google.cloud.pgadapter.tpcc.dataloader.DataLoader;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.SessionPoolOptions;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.pgadapter.ProxyServer;
Expand Down Expand Up @@ -97,16 +98,21 @@ public void run(String... args) throws Exception {
pgAdapterConfiguration.getMaxSessions(), tpccConfiguration.getBenchmarkThreads()));
try {
if (tpccConfiguration.isLoadData()) {
boolean isClientLibGSQLRunner =
tpccConfiguration.getBenchmarkRunner().equals(TpccConfiguration.CLIENT_LIB_GSQL_RUNNER);
Dialect dialect = isClientLibGSQLRunner ? Dialect.GOOGLE_STANDARD_SQL : Dialect.POSTGRESQL;
String loadDataConnectionUrl =
isClientLibGSQLRunner ? spannerConnectionUrl : pgadapterConnectionUrl;
System.out.println("Checking schema");
SchemaService schemaService = new SchemaService(pgadapterConnectionUrl);
SchemaService schemaService = new SchemaService(loadDataConnectionUrl, dialect);
schemaService.createSchema();
System.out.println("Checked schema, starting benchmark");

LOG.info("Starting data load");
ExecutorService executor = Executors.newSingleThreadExecutor();
DataLoadStatus status = new DataLoadStatus(tpccConfiguration);
Future<Long> loadDataFuture =
executor.submit(() -> loadData(status, pgadapterConnectionUrl));
executor.submit(() -> loadData(status, loadDataConnectionUrl));
executor.shutdown();
Stopwatch watch = Stopwatch.createStarted();
while (!loadDataFuture.isDone()) {
Expand Down Expand Up @@ -166,7 +172,21 @@ public void run(String... args) throws Exception {
tpccConfiguration,
pgAdapterConfiguration,
spannerConfiguration,
metrics));
metrics,
Dialect.POSTGRESQL));
} else if (tpccConfiguration
.getBenchmarkRunner()
.equals(TpccConfiguration.CLIENT_LIB_GSQL_RUNNER)) {
// Run client library PG benchmark
statistics.setRunnerName("Client library GSQL benchmark");
executor.submit(
new JavaClientBenchmarkRunner(
statistics,
tpccConfiguration,
pgAdapterConfiguration,
spannerConfiguration,
metrics,
Dialect.GOOGLE_STANDARD_SQL));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import com.google.cloud.pgadapter.tpcc.config.PGAdapterConfiguration;
import com.google.cloud.pgadapter.tpcc.config.SpannerConfiguration;
import com.google.cloud.pgadapter.tpcc.config.TpccConfiguration;
import com.google.cloud.spanner.AbortedException;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SessionPoolOptions;
import com.google.cloud.spanner.Spanner;
Expand All @@ -42,6 +42,7 @@
import org.slf4j.LoggerFactory;

class JavaClientBenchmarkRunner extends AbstractBenchmarkRunner {

private static final Logger LOG = LoggerFactory.getLogger(JavaClientBenchmarkRunner.class);
private Spanner spanner;
private final ThreadLocal<TransactionManager> transactionManagerThreadLocal = new ThreadLocal<>();
Expand All @@ -52,8 +53,15 @@ class JavaClientBenchmarkRunner extends AbstractBenchmarkRunner {
TpccConfiguration tpccConfiguration,
PGAdapterConfiguration pgAdapterConfiguration,
SpannerConfiguration spannerConfiguration,
Metrics metrics) {
super(statistics, tpccConfiguration, pgAdapterConfiguration, spannerConfiguration, metrics);
Metrics metrics,
Dialect dialect) {
super(
statistics,
tpccConfiguration,
pgAdapterConfiguration,
spannerConfiguration,
metrics,
dialect);
}

void setup() throws SQLException, IOException {
Expand All @@ -80,9 +88,6 @@ void executeStatement(String sql) throws SQLException {
transactionManagerThreadLocal.get().commit();
Duration executionDuration = stopwatch.elapsed();
metrics.recordLatency(executionDuration.toMillis());
} catch (AbortedException e) {
// Ignore the aborted exception. Roll back the transaction.
transactionManagerThreadLocal.get().rollback();
} finally {
transactionManagerThreadLocal.get().close();
}
Expand All @@ -98,7 +103,11 @@ void bindParams(Statement.Builder builder, Object[] params) throws SQLException
for (int i = 0; i < params.length; i++) {
Object param = params[i];
if (param instanceof BigDecimal) {
builder.bind("p" + (i + 1)).to(Value.pgNumeric(((BigDecimal) param).toPlainString()));
if (dialect == Dialect.POSTGRESQL) {
builder.bind("p" + (i + 1)).to(Value.pgNumeric(((BigDecimal) param).toPlainString()));
} else {
builder.bind("p" + (i + 1)).to((BigDecimal) param);
}
} else if (param instanceof String) {
builder.bind("p" + (i + 1)).to((String) param);
} else if (param instanceof Integer) {
Expand All @@ -112,7 +121,8 @@ void bindParams(Statement.Builder builder, Object[] params) throws SQLException
}

Object[] paramQueryRow(String sql, Object[] params) throws SQLException {
ParametersInfo parametersInfo = convertPositionalParametersToNamedParameters(sql, '?', "$");
ParametersInfo parametersInfo =
convertPositionalParametersToNamedParameters(sql, '?', getNamedParameterPrefix());
Statement.Builder builder = Statement.newBuilder(parametersInfo.sqlWithNamedParameters);
bindParams(builder, params);
Statement stmt = builder.build();
Expand All @@ -137,7 +147,8 @@ Object[] paramQueryRow(String sql, Object[] params) throws SQLException {
}

void executeParamStatement(String sql, Object[] params) throws SQLException {
ParametersInfo parametersInfo = convertPositionalParametersToNamedParameters(sql, '?', "$");
ParametersInfo parametersInfo =
convertPositionalParametersToNamedParameters(sql, '?', getNamedParameterPrefix());
Statement.Builder builder = Statement.newBuilder(parametersInfo.sqlWithNamedParameters);
bindParams(builder, params);
Statement stmt = builder.build();
Expand Down Expand Up @@ -196,7 +207,9 @@ static int countOccurrencesOf(char c, String string) {
// If getAsObject() in Struct can be a public method, then we should use it
// instead of using the following approach.
Object getObject(Value value) {
if (value.getType().equals(Type.numeric())) {
if (value == null || value.isNull()) {
return null;
} else if (value.getType().equals(Type.numeric())) {
return value.getNumeric();
} else if (value.getType().equals(Type.string())) {
return value.getString();
Expand All @@ -212,8 +225,13 @@ Object getObject(Value value) {
return null;
}

String getNamedParameterPrefix() {
return dialect == Dialect.POSTGRESQL ? "$" : "@p";
}

List<Object[]> executeParamQuery(String sql, Object[] params) throws SQLException {
ParametersInfo parametersInfo = convertPositionalParametersToNamedParameters(sql, '?', "$");
ParametersInfo parametersInfo =
convertPositionalParametersToNamedParameters(sql, '?', getNamedParameterPrefix());
List<Object[]> results = new ArrayList<>();

Statement.Builder builder = Statement.newBuilder(parametersInfo.sqlWithNamedParameters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.cloud.pgadapter.tpcc.config.PGAdapterConfiguration;
import com.google.cloud.pgadapter.tpcc.config.SpannerConfiguration;
import com.google.cloud.pgadapter.tpcc.config.TpccConfiguration;
import com.google.cloud.spanner.Dialect;
import com.google.common.base.Stopwatch;
import java.io.IOException;
import java.math.BigDecimal;
Expand Down Expand Up @@ -46,7 +47,13 @@ class JdbcBenchmarkRunner extends AbstractBenchmarkRunner {
PGAdapterConfiguration pgAdapterConfiguration,
SpannerConfiguration spannerConfiguration,
Metrics metrics) {
super(statistics, tpccConfiguration, pgAdapterConfiguration, spannerConfiguration, metrics);
super(
statistics,
tpccConfiguration,
pgAdapterConfiguration,
spannerConfiguration,
metrics,
Dialect.POSTGRESQL);
this.connectionUrl = connectionUrl;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.
package com.google.cloud.pgadapter.tpcc;

import com.google.cloud.spanner.Dialect;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
Expand All @@ -31,11 +32,16 @@ class SchemaService {

private final String connectionUrl;

SchemaService(String connectionUrl) {
private final Dialect dialect;

SchemaService(String connectionUrl, Dialect dialect) {
this.connectionUrl = connectionUrl;
this.dialect = dialect;
}

void createSchema() throws IOException, SQLException {
LOG.info("Schema connection URL: " + connectionUrl);
boolean isGoogleSQL = dialect == Dialect.GOOGLE_STANDARD_SQL;
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
// Check if the tables already exist.
try (ResultSet resultSet =
Expand All @@ -44,21 +50,24 @@ void createSchema() throws IOException, SQLException {
.executeQuery(
"select count(1) "
+ "from information_schema.tables "
+ "where table_schema='public' "
+ "and table_name in ('warehouse', 'district', 'customer', 'history', 'orders', 'new_orders', 'order_line', 'stock', 'item')")) {
+ "where "
+ (isGoogleSQL ? "table_schema='' and " : "table_schema='public' and ")
+ "table_name in ('warehouse', 'district', 'customer', 'history', 'orders', 'new_orders', 'order_line', 'stock', 'item')")) {
if (resultSet.next() && resultSet.getInt(1) == 9) {
LOG.info("Skipping schema creation as tables already exist");
return;
}
}

URL url = AbstractBenchmarkRunner.class.getResource("/schema.sql");
URL url =
AbstractBenchmarkRunner.class.getResource(
isGoogleSQL ? "/schema_googlesql.sql" : "/schema.sql");
Path path = Paths.get(Objects.requireNonNull(url).getPath());
String ddl = Files.readString(path);
LOG.info("Executing schema statements");
String[] statements = ddl.split(";");
String[] statements = ddl.trim().split(";");
for (String statement : statements) {
LOG.info(statement);
LOG.info("Statement: " + statement);
connection.createStatement().execute(statement);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ public class TpccConfiguration {
public static final String PGADAPTER_JDBC_RUNNER = "pgadapter";
public static final String SPANNER_JDBC_RUNNER = "spanner_jdbc";
public static final String CLIENT_LIB_PG_RUNNER = "client_lib_pg";
public static final String CLIENT_LIB_GSQL_RUNNER = "client_lib_gsql";
public static final Set<String> RUNNERS =
Set.of(PGADAPTER_JDBC_RUNNER, SPANNER_JDBC_RUNNER, CLIENT_LIB_PG_RUNNER);
Set.of(
PGADAPTER_JDBC_RUNNER, SPANNER_JDBC_RUNNER, CLIENT_LIB_PG_RUNNER, CLIENT_LIB_GSQL_RUNNER);

private boolean loadData;

Expand Down
Loading

0 comments on commit 372494e

Please sign in to comment.