Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -37,6 +38,7 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang.StringUtils;
import org.apache.pinot.spi.utils.JsonUtils;

Expand Down Expand Up @@ -133,7 +135,7 @@ public QueryGenerator(List<File> avroFiles, String pinotTableName, String h2Tabl
_multiValueColumnMaxNumElements.put(fieldName, 0);
} else {
_singleValueColumnNames.add(fieldName);
if (type != Schema.Type.STRING && type != Schema.Type.BOOLEAN) {
if (type != Schema.Type.STRING && type != Schema.Type.BOOLEAN && type != Schema.Type.BYTES) {
_singleValueNumericalColumnNames.add(fieldName);
}
}
Expand Down Expand Up @@ -182,7 +184,12 @@ public QueryGenerator(List<File> avroFiles, String pinotTableName, String h2Tabl
* @param avroValue Avro value.
*/
private static void storeAvroValueIntoValueSet(Set<String> valueSet, Object avroValue) {
if (avroValue instanceof Number) {
if (avroValue instanceof ByteBuffer) {
// for raw bytes
String hexRaw = StringUtils.stripStart(Hex.encodeHexString(((ByteBuffer) avroValue).array()), "0");
String hexAligned = (hexRaw.length() & 0x1) == 0 ? hexRaw : "0" + hexRaw;
Comment on lines +188 to +190
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not directly using Hex.encodeHexString(((ByteBuffer) avroValue).array())?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, does Avro always have ByteBuffer backed by an array?

valueSet.add("'" + hexAligned + "'");
} else if (avroValue instanceof Number) {
// For Number object, store raw value.
valueSet.add(avroValue.toString());
} else {
Expand Down Expand Up @@ -1012,8 +1019,8 @@ public QueryFragment generatePredicate(String columnName) {
}
}

String pql = String.format(" REGEXP_LIKE(%s, '%s')", columnName, pqlRegexBuilder.toString());
String sql = String.format(" REGEXP_LIKE(%s, '%s', 'i')", columnName, sqlRegexBuilder.toString());
String pql = String.format("REGEXP_LIKE(%s, '%s')", columnName, pqlRegexBuilder.toString());
String sql = String.format("REGEXP_LIKE(%s, '%s', 'i')", columnName, sqlRegexBuilder.toString());
return new StringQueryFragment(pql, sql);
} else {
String equalsPredicate = String.format("%s = %s", columnName, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ public static ObjectNode toAvroSchemaJsonObject(FieldSpec fieldSpec) {
case STRING:
jsonSchema.set("type", convertStringsToJsonArray("null", "string"));
return jsonSchema;
case BYTES:
jsonSchema.set("type", convertStringsToJsonArray("null", "bytes"));
return jsonSchema;
default:
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public boolean execute()
List<String> columns = new LinkedList<>();
final HashMap<String, DataType> dataTypes = new HashMap<>();
final HashMap<String, FieldType> fieldTypes = new HashMap<>();
final HashMap<String, Boolean> singleValueFlags = new HashMap<>();
final HashMap<String, TimeUnit> timeUnits = new HashMap<>();

final HashMap<String, Integer> cardinality = new HashMap<>();
Expand All @@ -136,7 +137,7 @@ public boolean execute()
buildCardinalityRangeMaps(_schemaAnnFile, cardinality, range, pattern);

final DataGeneratorSpec spec =
buildDataGeneratorSpec(schema, columns, dataTypes, fieldTypes, timeUnits, cardinality, range, pattern);
buildDataGeneratorSpec(schema, columns, dataTypes, fieldTypes, singleValueFlags, timeUnits, cardinality, range, pattern);

final DataGenerator gen = new DataGenerator();
gen.init(spec);
Expand Down Expand Up @@ -175,14 +176,16 @@ private void buildCardinalityRangeMaps(String file, HashMap<String, Integer> car
}

private DataGeneratorSpec buildDataGeneratorSpec(Schema schema, List<String> columns,
HashMap<String, DataType> dataTypes, HashMap<String, FieldType> fieldTypes, HashMap<String, TimeUnit> timeUnits,
HashMap<String, Integer> cardinality, HashMap<String, IntRange> range, HashMap<String, Map<String, Object>> pattern) {
HashMap<String, DataType> dataTypes, HashMap<String, FieldType> fieldTypes, HashMap<String, Boolean> singleValueFlags,
HashMap<String, TimeUnit> timeUnits, HashMap<String, Integer> cardinality, HashMap<String, IntRange> range,
HashMap<String, Map<String, Object>> pattern) {
for (final FieldSpec fs : schema.getAllFieldSpecs()) {
String col = fs.getName();

columns.add(col);
dataTypes.put(col, fs.getDataType());
fieldTypes.put(col, fs.getFieldType());
singleValueFlags.put(col, fs.isSingleValueField());

switch (fs.getFieldType()) {
case DIMENSION:
Expand Down Expand Up @@ -215,8 +218,8 @@ private DataGeneratorSpec buildDataGeneratorSpec(Schema schema, List<String> col
}
}

return new DataGeneratorSpec(columns, cardinality, range, pattern, dataTypes, fieldTypes, timeUnits, FileFormat.AVRO,
_outDir, _overwrite);
return new DataGeneratorSpec(columns, cardinality, range, pattern, dataTypes, fieldTypes, singleValueFlags,
timeUnits, FileFormat.AVRO, _outDir, _overwrite);
}

public static void main(String[] args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ private FieldSpec buildSpec(DataGeneratorSpec genSpec, String column) {

spec.setName(column);
spec.setDataType(dataType);
spec.setSingleValueField(true);
spec.setSingleValueField(genSpec.getSingleValueFlagsMap().getOrDefault(column, true));

return spec;
}
Expand All @@ -178,8 +178,8 @@ public static void main(String[] args)
cardinality.put(col, 1000);
}
final DataGeneratorSpec spec =
new DataGeneratorSpec(Arrays.asList(columns), cardinality, range, template, dataTypes, fieldTypes, timeUnits,
FileFormat.AVRO, "/tmp/out", true);
new DataGeneratorSpec(Arrays.asList(columns), cardinality, range, template, dataTypes, fieldTypes,
new HashMap<>(), timeUnits, FileFormat.AVRO, "/tmp/out", true);

final DataGenerator gen = new DataGenerator();
gen.init(spec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class DataGeneratorSpec {

private final Map<String, DataType> dataTypesMap;
private final Map<String, FieldType> fieldTypesMap;
private final Map<String, Boolean> singleValueFlagsMap;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we capable of generating MV values?

private final Map<String, TimeUnit> timeUnitMap;

private final FileFormat outputFileFormat;
Expand All @@ -49,13 +50,14 @@ public class DataGeneratorSpec {

public DataGeneratorSpec() {
this(new ArrayList<String>(), new HashMap<>(), new HashMap<>(), new HashMap<>(),
new HashMap<>(), new HashMap<>(), new HashMap<>(),
new HashMap<>(), new HashMap<>(), new HashMap<>(), new HashMap<>(),
FileFormat.AVRO, "/tmp/dataGen", true);
}

public DataGeneratorSpec(List<String> columns, Map<String, Integer> cardinalityMap, Map<String, IntRange> rangeMap,
Map<String, Map<String, Object>> patternMap, Map<String, DataType> dataTypesMap, Map<String, FieldType> fieldTypesMap, Map<String, TimeUnit> timeUnitMap,
FileFormat format, String outputDir, boolean override) {
Map<String, Map<String, Object>> patternMap, Map<String, DataType> dataTypesMap, Map<String, FieldType> fieldTypesMap,
Map<String, Boolean> singleValueFlagsMap, Map<String, TimeUnit> timeUnitMap, FileFormat format, String outputDir,
boolean override) {
this.columns = columns;
this.cardinalityMap = cardinalityMap;
this.rangeMap = rangeMap;
Expand All @@ -67,6 +69,7 @@ public DataGeneratorSpec(List<String> columns, Map<String, Integer> cardinalityM

this.dataTypesMap = dataTypesMap;
this.fieldTypesMap = fieldTypesMap;
this.singleValueFlagsMap = singleValueFlagsMap;
this.timeUnitMap = timeUnitMap;
}

Expand All @@ -78,6 +81,10 @@ public Map<String, FieldType> getFieldTypesMap() {
return fieldTypesMap;
}

public Map<String, Boolean> getSingleValueFlagsMap() {
return singleValueFlagsMap;
}

public Map<String, TimeUnit> getTimeUnitMap() {
return timeUnitMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.tools.data.generator;

import org.apache.commons.lang.RandomStringUtils;
import org.apache.pinot.spi.data.FieldSpec.DataType;

import java.util.Map;
Expand Down Expand Up @@ -46,6 +47,8 @@ public static Generator getGeneratorFor(DataType dataType, int start, int end) {
return new RangeFloatGenerator(start, end);
case DOUBLE:
return new RangeDoubleGenerator(start, end);
case BYTES:
return new RangeBytesGenerator(start, end);
default:
throw new RuntimeException(String.format("Invalid datatype '%s'", dataType));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
*/
package org.apache.pinot.tools.data.generator;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import com.google.common.primitives.Longs;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -64,6 +67,8 @@ public void init() {
intValues.add(new Integer(i));
}
break;
case BYTES:
// use long case
case LONG:
longValues = new ArrayList<Long>();
final long longStart = rand.nextInt(cardinality);
Expand Down Expand Up @@ -113,6 +118,8 @@ public Object next() {
return floatValues.get(random.nextInt(cardinality));
case DOUBLE:
return doubleValues.get(random.nextInt(cardinality));
case BYTES:
return ByteBuffer.wrap(Longs.toByteArray(longValues.get(random.nextInt(cardinality))));
default:
throw new RuntimeException("number generator can only accept a column of type number and this : " + columnType
+ " is not a supported number type");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.tools.data.generator;

import com.google.common.primitives.Ints;

import java.nio.ByteBuffer;
import java.util.Random;


public class RangeBytesGenerator implements Generator {
private final int _start;
private final int _end;
private final int _delta;

Random _randGen = new Random(System.currentTimeMillis());

public RangeBytesGenerator(int r1, int r2) {
_start = (r1 < r2) ? r1 : r2;
_end = (r1 > r2) ? r1 : r2;

_delta = _end - _start;
}

@Override
public void init() {
}

@Override
public Object next() {
return ByteBuffer.wrap(Ints.toByteArray(_start + _randGen.nextInt(_delta)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public class PerfBenchmarkDriver {
private PinotHelixResourceManager _helixResourceManager;

public PerfBenchmarkDriver(PerfBenchmarkDriverConf conf) {
this(conf, "/tmp/", "HEAP", null, false);
this(conf, "/tmp/", "HEAP", null, conf.isVerbose());
}

public PerfBenchmarkDriver(PerfBenchmarkDriverConf conf, String tempDir, String loadMode, String segmentFormatVersion,
Expand Down Expand Up @@ -386,47 +386,68 @@ private void postQueries()

public JsonNode postQuery(String query)
throws Exception {
return postQuery(query, null);
return postQuery(_conf.getDialect(), query, null);
}

public JsonNode postQuery(String query, String optimizationFlags)
throws Exception {
return postQuery(_conf.getDialect(), query, optimizationFlags);
}

public JsonNode postQuery(String dialect, String query, String optimizationFlags)
throws Exception {
ObjectNode requestJson = JsonUtils.newObjectNode();
requestJson.put("pql", query);
requestJson.put(dialect, query);

if (optimizationFlags != null && !optimizationFlags.isEmpty()) {
requestJson.put("debugOptions", "optimizationFlags=" + optimizationFlags);
}

long start = System.currentTimeMillis();
URLConnection conn = new URL(_brokerBaseApiUrl + "/query").openConnection();
String queryUrl = _brokerBaseApiUrl + "/query";
if (!"pql".equals(dialect)) {
queryUrl = _brokerBaseApiUrl + "/query/" + dialect;
}

URLConnection conn = new URL(queryUrl).openConnection();
conn.setDoOutput(true);

try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(conn.getOutputStream(),
StandardCharsets.UTF_8))) {
StandardCharsets.UTF_8))) {
String requestString = requestJson.toString();
writer.write(requestString);
writer.flush();

StringBuilder stringBuilder = new StringBuilder();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream(),
StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
stringBuilder.append(line);
try {
StringBuilder stringBuilder = new StringBuilder();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream(),
StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
stringBuilder.append(line);
}
}
}

long totalTime = System.currentTimeMillis() - start;
String responseString = stringBuilder.toString();
ObjectNode responseJson = (ObjectNode) JsonUtils.stringToJsonNode(responseString);
responseJson.put("totalTime", totalTime);
long totalTime = System.currentTimeMillis() - start;
String responseString = stringBuilder.toString();
ObjectNode responseJson = (ObjectNode) JsonUtils.stringToJsonNode(responseString);
responseJson.put("totalTime", totalTime);

if (_verbose) {
if (!responseJson.has("exceptions") || responseJson.get("exceptions").size() <= 0) {
LOGGER.info("requestString: {}", requestString);
LOGGER.info("responseString: {}", responseString);
} else {
LOGGER.error("requestString: {}", requestString);
LOGGER.error("responseString: {}", responseString);
}
}

if (_verbose && (responseJson.get("numDocsScanned").asLong() > 0)) {
LOGGER.info("requestString: {}", requestString);
LOGGER.info("responseString: {}", responseString);
return responseJson;
} catch (Exception e) {
LOGGER.error("requestString: {}", requestString);
throw e;
}
return responseJson;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public class PerfBenchmarkDriverConf {

String resultsOutputDirectory;

boolean verbose = false;

String dialect = "pql";

public String getClusterName() {
return clusterName;
}
Expand Down Expand Up @@ -263,4 +267,20 @@ public String getSchemaFileNamePath() {
public void setSchemaFileNamePath(String schemaFileNamePath) {
this.schemaFileNamePath = schemaFileNamePath;
}

public boolean isVerbose() {
return verbose;
}

public void setVerbose(boolean verbose) {
this.verbose = verbose;
}

public String getDialect() {
return dialect;
}

public void setDialect(String dialect) {
this.dialect = dialect;
}
}
Loading