Skip to content

Commit

Permalink
get count and list in async way (#797)
Browse files Browse the repository at this point in the history
* get count and list in async way

* Improve method name
  • Loading branch information
FrankChen021 authored Jun 1, 2024
1 parent 71f315e commit b571ac4
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,16 @@
package org.bithon.server.storage.jdbc.clickhouse.metric;

import lombok.extern.slf4j.Slf4j;
import org.bithon.component.commons.expression.IExpression;
import org.bithon.component.commons.utils.StringUtils;
import org.bithon.server.commons.time.TimeSpan;
import org.bithon.server.storage.datasource.ISchema;
import org.bithon.server.storage.datasource.query.Query;
import org.bithon.server.storage.jdbc.common.dialect.Expression2Sql;
import org.bithon.server.storage.jdbc.common.dialect.ISqlDialect;
import org.bithon.server.storage.jdbc.metric.MetricJdbcReader;
import org.jooq.DSLContext;
import org.jooq.Record;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -54,20 +52,17 @@ public JdbcReader(DSLContext dslContext, ISqlDialect sqlDialect) {
* Rewrite the SQL to use group-by instead of distinct so that we can leverage PROJECTIONS defined at the underlying table to speed up queries
*/
@Override
public List<Map<String, String>> distinct(TimeSpan start,
TimeSpan end,
ISchema schema,
IExpression filter,
String dimension) {
start = start.floor(Duration.ofMinutes(1));
end = end.ceil(Duration.ofMinutes(1));
public List<String> distinct(Query query) {
TimeSpan start = query.getInterval().getStartTime().floor(Duration.ofMinutes(1));
TimeSpan end = query.getInterval().getEndTime().ceil(Duration.ofMinutes(1));

String condition = filter == null ? "" : Expression2Sql.from(schema, sqlDialect, filter) + " AND ";
String dimension = query.getResultColumns().get(0).getResultColumnName();
String condition = query.getFilter() == null ? "" : Expression2Sql.from(query.getSchema(), sqlDialect, query.getFilter()) + " AND ";

String sql = StringUtils.format(
"SELECT \"%s\" FROM \"%s\" WHERE %s toStartOfMinute(\"timestamp\") >= %s AND toStartOfMinute(\"timestamp\") < %s GROUP BY \"%s\" ORDER BY \"%s\"",
dimension,
schema.getDataStoreSpec().getStore(),
query.getSchema().getDataStoreSpec().getStore(),
condition,
sqlDialect.formatTimestamp(start),
sqlDialect.formatTimestamp(end),
Expand All @@ -77,10 +72,7 @@ public List<Map<String, String>> distinct(TimeSpan start,
log.info("Executing {}", sql);
List<Record> records = dslContext.fetch(sql);
return records.stream()
.map(record -> {
Map<String, String> mapObject = new HashMap<>();
mapObject.put("value", record.get(0).toString());
return mapObject;
}).collect(Collectors.toList());
.map(record -> record.get(0).toString())
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@

import com.alibaba.druid.pool.DruidDataSource;
import lombok.extern.slf4j.Slf4j;
import org.bithon.component.commons.expression.IExpression;
import org.bithon.component.commons.utils.Preconditions;
import org.bithon.component.commons.utils.StringUtils;
import org.bithon.server.commons.time.TimeSpan;
import org.bithon.server.storage.datasource.ISchema;
import org.bithon.server.storage.datasource.query.IDataSourceReader;
import org.bithon.server.storage.datasource.query.OrderBy;
import org.bithon.server.storage.datasource.query.Query;
Expand Down Expand Up @@ -154,7 +151,7 @@ private String getOrderBySQL(OrderBy orderBy, String timestampColumn) {
}

@Override
public List<Map<String, Object>> list(Query query) {
public List<Map<String, Object>> select(Query query) {
String sqlTableName = query.getSchema().getDataStoreSpec().getStore();
String timestampCol = query.getSchema().getTimestampSpec().getColumnName();
String filter = Expression2Sql.from(query.getSchema(), sqlDialect, query.getFilter());
Expand Down Expand Up @@ -187,7 +184,7 @@ public List<Map<String, Object>> list(Query query) {
}

@Override
public int listSize(Query query) {
public int count(Query query) {
String sqlTableName = query.getSchema().getDataStoreSpec().getStore();
String timestampCol = query.getSchema().getTimestampSpec().getColumnName();

Expand Down Expand Up @@ -243,36 +240,27 @@ private List<Map<String, Object>> executeSql(String sql) {
}

@Override
public List<Map<String, String>> distinct(TimeSpan start,
TimeSpan end,
ISchema schema,
IExpression filter,
String dimension) {
String filterText = filter == null ? "" : Expression2Sql.from(schema, sqlDialect, filter) + " AND ";
public List<String> distinct(Query query) {
String filterText = query.getFilter() == null ? "" : Expression2Sql.from(query.getSchema(), sqlDialect, query.getFilter()) + " AND ";
String dimension = query.getResultColumns().get(0).getResultColumnName();

String sql = StringUtils.format(
"SELECT DISTINCT(\"%s\") \"%s\" FROM \"%s\" WHERE %s \"timestamp\" >= %s AND \"timestamp\" < %s AND \"%s\" IS NOT NULL ORDER BY \"%s\"",
dimension,
dimension,
schema.getDataStoreSpec().getStore(),
query.getSchema().getDataStoreSpec().getStore(),
filterText,
sqlDialect.formatTimestamp(start),
sqlDialect.formatTimestamp(end),
sqlDialect.formatTimestamp(query.getInterval().getStartTime()),
sqlDialect.formatTimestamp(query.getInterval().getEndTime()),
dimension,
dimension
);

log.info("Executing {}", sql);
List<Record> records = dslContext.fetch(sql);
return records.stream()
.map(record -> {
Field<?>[] fields = record.fields();
Map<String, String> mapObject = new HashMap<>(fields.length);
for (Field<?> field : fields) {
mapObject.put("value", record.get(field).toString());
}
return mapObject;
}).collect(Collectors.toList());
.map(record -> record.get(0).toString())
.collect(Collectors.toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,22 +363,18 @@ public List<?> groupBy(Query query) {
}

@Override
public List<Map<String, Object>> list(Query query) {
return getDataSourceReader().list(query);
public List<Map<String, Object>> select(Query query) {
return getDataSourceReader().select(query);
}

@Override
public int listSize(Query query) {
return getDataSourceReader().listSize(query);
public int count(Query query) {
return getDataSourceReader().count(query);
}

@Override
public List<Map<String, String>> distinct(TimeSpan start,
TimeSpan end,
ISchema schema,
IExpression filter,
String dimension) {
return getDataSourceReader().distinct(start, end, schema, filter, dimension);
public List<String> distinct(Query query) {
return getDataSourceReader().distinct(query);
}

static class SpanKindIsRootDetector implements IExpressionVisitor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
package org.bithon.server.storage.datasource.query;

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.bithon.component.commons.expression.IExpression;
import org.bithon.server.commons.time.TimeSpan;
import org.bithon.server.storage.datasource.ISchema;

import java.util.List;
import java.util.Map;
Expand All @@ -38,14 +35,11 @@ public interface IDataSourceReader extends AutoCloseable {
*/
List<?> groupBy(Query query);

List<Map<String, Object>> list(Query query);
int listSize(Query query);
List<Map<String, Object>> select(Query query);

List<Map<String, String>> distinct(TimeSpan start,
TimeSpan end,
ISchema schema,
IExpression filter,
String dimension);
int count(Query query);

List<String> distinct(Query query);

default void close() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.bithon.server.web.service.datasource.api.impl;

import org.bithon.component.commons.concurrency.NamedThreadFactory;
import org.bithon.component.commons.exception.HttpMappableException;
import org.bithon.component.commons.utils.CollectionUtils;
import org.bithon.component.commons.utils.Preconditions;
import org.bithon.component.commons.utils.StringUtils;
Expand Down Expand Up @@ -46,15 +48,24 @@
import org.bithon.server.web.service.datasource.api.TimeSeriesQueryResult;
import org.bithon.server.web.service.datasource.api.UpdateTTLRequest;
import org.springframework.context.annotation.Conditional;
import org.springframework.http.HttpStatus;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
Expand All @@ -69,6 +80,7 @@ public class DataSourceApi implements IDataSourceApi {
private final IMetricStorage metricStorage;
private final SchemaManager schemaManager;
private final DataSourceService dataSourceService;
private final Executor asyncExecutor;

public DataSourceApi(MetricStorageConfig storageConfig,
IMetricStorage metricStorage,
Expand All @@ -78,6 +90,12 @@ public DataSourceApi(MetricStorageConfig storageConfig,
this.metricStorage = metricStorage;
this.schemaManager = schemaManager;
this.dataSourceService = dataSourceService;
this.asyncExecutor = new ThreadPoolExecutor(0,
32,
180L,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
NamedThreadFactory.of("datasource-async"));
}

@Override
Expand Down Expand Up @@ -114,8 +132,9 @@ public GeneralQueryResponse timeseriesV4(@Validated @RequestBody GeneralQueryReq

@Override
public GeneralQueryResponse list(GeneralQueryRequest request) throws IOException {
ISchema schema = schemaManager.getSchema(request.getDataSource());
Preconditions.checkNotNull(request.getLimit(), "limit parameter should not be NULL");

ISchema schema = schemaManager.getSchema(request.getDataSource());
validateQueryRequest(schema, request);

Query query = Query.builder()
Expand All @@ -135,16 +154,31 @@ public GeneralQueryResponse list(GeneralQueryRequest request) throws IOException
.build();

try (IDataSourceReader reader = schema.getDataStoreSpec().createReader()) {
return GeneralQueryResponse.builder()
// Only query the total number of records for the first page
// This also has a restriction
// that the page number is not a query parameter on web page URL
.total(query.getLimit().getOffset() == 0 ? reader.listSize(query) : 0)
.limit(query.getLimit())
.data(reader.list(query))
.startTimestamp(query.getInterval().getStartTime().getMilliseconds())
.startTimestamp(query.getInterval().getEndTime().getMilliseconds())
.build();

CompletableFuture<Integer> total = CompletableFuture.supplyAsync(() -> {
// Only query the total number of records for the first page
// This also has a restriction
// that the page number is not a query parameter on web page URL
return request.getLimit().getOffset() == 0 ? reader.count(query) : 0;
}, asyncExecutor);

CompletableFuture<List<Map<String, Object>>> list = CompletableFuture.supplyAsync(() -> reader.select(query), asyncExecutor);

try {
return GeneralQueryResponse.builder()
.total(total.get())
.limit(query.getLimit())
.data(list.get())
.startTimestamp(query.getInterval().getStartTime().getMilliseconds())
.startTimestamp(query.getInterval().getEndTime().getMilliseconds())
.build();
} catch (ExecutionException e) {
throw new HttpMappableException(e.getCause(),
HttpStatus.INTERNAL_SERVER_ERROR.value(),
"Unexpected exception occurred");
} catch (InterruptedException e) {
throw new HttpMappableException(HttpStatus.INTERNAL_SERVER_ERROR.value(), e.getMessage());
}
}
}

Expand Down Expand Up @@ -241,11 +275,19 @@ public Collection<Map<String, String>> getDimensions(GetDimensionRequest request
Preconditions.checkNotNull(column, "Field [%s] does not exist in the schema.", request.getName());

try (IDataSourceReader reader = schema.getDataStoreSpec().createReader()) {
return reader.distinct(TimeSpan.fromISO8601(request.getStartTimeISO8601()),
TimeSpan.fromISO8601(request.getEndTimeISO8601()),
schema,
FilterExpressionToFilters.toExpression(schema, request.getFilterExpression(), CollectionUtils.emptyOrOriginal(request.getFilters())),
column.getName());
Query query = Query.builder()
.interval(Interval.of(TimeSpan.fromISO8601(request.getStartTimeISO8601()), TimeSpan.fromISO8601(request.getEndTimeISO8601())))
.schema(schema)
.resultColumns(Collections.singletonList(column.getResultColumn()))
.filter(FilterExpressionToFilters.toExpression(schema, request.getFilterExpression(), CollectionUtils.emptyOrOriginal(request.getFilters())))
.build();
return reader.distinct(query)
.stream()
.map((val) -> {
Map<String, String> map = new HashMap<>();
map.put("value", val);
return map;
}).collect(Collectors.toList());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ public GetEventListResponse getEventList(GetEventListRequest request) throws IOE
.interval(Interval.of(start, end))
.build();

return new GetEventListResponse(reader.listSize(query),
reader.list(query));
return new GetEventListResponse(reader.count(query),
reader.select(query));
}
}
}

0 comments on commit b571ac4

Please sign in to comment.