Skip to content

Commit

Permalink
Layering and Error refactor (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
akshaisarma committed Jan 4, 2021
1 parent db85413 commit dd82788
Show file tree
Hide file tree
Showing 20 changed files with 532 additions and 227 deletions.
19 changes: 8 additions & 11 deletions src/main/java/com/yahoo/bullet/bql/BulletQueryBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.yahoo.bullet.bql.parser.ParsingException;
import com.yahoo.bullet.bql.query.ProcessedQuery;
import com.yahoo.bullet.bql.query.QueryError;
import com.yahoo.bullet.bql.query.QueryProcessor;
import com.yahoo.bullet.bql.parser.BQLParser;
import com.yahoo.bullet.bql.query.QueryBuilder;
Expand Down Expand Up @@ -48,11 +49,11 @@ public BulletQueryBuilder(BulletConfig bulletConfig) {
*/
public BQLResult buildQuery(String bql) {
if (Utilities.isEmpty(bql)) {
return makeBQLResultError("The given BQL query is empty.", "Please specify a non-empty query.");
return makeError(QueryError.EMPTY_QUERY.format());
}
if (bql.length() > maxQueryLength) {
return makeBQLResultError("The given BQL string is too long. (" + bql.length() + " characters)",
"Please reduce the length of the query to at most " + maxQueryLength + " characters.");
String resolution = "Please reduce the length of the query to at most " + maxQueryLength + " characters.";
return makeError(QueryError.QUERY_TOO_LONG.formatWithResolution(resolution, bql.length()));
}
try {
// Parse BQL into node tree
Expand All @@ -73,19 +74,15 @@ public BQLResult buildQuery(String bql) {

return new BQLResult(query, ExpressionFormatter.format(queryNode, true));
} catch (BulletException e) {
return makeBQLResultError(e.getError());
return makeError(e.getError());
} catch (ParsingException e) {
return makeBQLResultError(e.getMessage(), "This is a parsing error.");
return makeError(QueryError.GENERIC_PARSING_ERROR.format(e.getMessage()));
} catch (Exception e) {
return makeBQLResultError(e.getMessage(), "This is an application error and not a user error.");
return makeError(QueryError.GENERIC_ERROR.format(e.getMessage()));
}
}

private BQLResult makeBQLResultError(BulletError error) {
private BQLResult makeError(BulletError error) {
return new BQLResult(Collections.singletonList(error));
}

private BQLResult makeBQLResultError(String error, String resolution) {
return makeBQLResultError(new BulletError(error, resolution));
}
}
192 changes: 157 additions & 35 deletions src/main/java/com/yahoo/bullet/bql/query/LayeredSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,82 +15,204 @@
import java.util.Map;
import java.util.Set;

@Getter
@AllArgsConstructor
public class LayeredSchema {
private Schema schema;
private Map<String, String> aliases;
private LayeredSchema subSchema;
private int depth;
@Getter
private boolean locked;
private static final int TOP_LAYER = 0;

@Getter
public static class FieldLocation {
private Schema.Field field;
private Type type;
private int depth;

private static FieldLocation from(Schema.Field field, Type type, int depth) {
FieldLocation location = new FieldLocation();
location.field = field;
location.type = type;
location.depth = depth;
return location;
}
}

/**
* Constructor.
*
* @param schema The {@link Schema} to use.
*/
public LayeredSchema(Schema schema) {
this.schema = schema;
this.aliases = Collections.emptyMap();
this.depth = TOP_LAYER;
}

/**
* Adds a new layer to the top of this, pushing every layer one deeper. Note that if this layered schema had layers
* on top, their depths will not be adjusted. It is recommended to add layers from the top.
*
* @param newSchema The new {@link Schema} to add to the top layer.
* @param newAliases The new {@link Map} of aliases to add to the top layer.
*/
public void addLayer(Schema newSchema, Map<String, String> newAliases) {
subSchema = new LayeredSchema(schema, aliases, subSchema, locked);
subSchema = new LayeredSchema(schema, aliases, subSchema, depth, locked);
schema = newSchema;
aliases = newAliases;
locked = false;
subSchema.increaseDepth();
}

/**
* Locks this {@link LayeredSchema}, preventing access to all layers below.
*/
public void lock() {
locked = true;
}

public Schema.Field getField(String field) {
if (schema == null) {
return null;
}
Type type = schema.getType(field);
if (type != Type.NULL) {
return new Schema.PlainField(field, type);
}
String alias = aliases.get(field);
if (alias != null) {
return new Schema.PlainField(alias, schema.getType(alias));
}
return subSchema != null && !subSchema.locked ? subSchema.getField(field) : null;
/**
* Unlocks this {@link LayeredSchema}, allowing access to layers below.
*/
public void unlock() {
locked = false;
}

public Type getType(String field) {
/**
* Gets the current depth of this {@link LayeredSchema}. Depth is defined starting at 0 for the top layer and
* increases as you go deeper.
*
* @return The depth of this layer.
*/
public int depth() {
return depth;
}

/**
* Searches for the given field from this layer. The minimum depth parameter can be provided to ensure that
* the field, if found, is at that depth or greater. The depth is the depth of this layered schema as defined by
* {@link #depth()}. This can be used to skip layers for the search.
*
* @param field The field to search for.
* @param minimumDepth The minimum (whole number) for the depth to find the field from.
* @return A {@link FieldLocation} for the field. It is non-null. If the schema does not exist, the type will be
* {@link Type#UNKNOWN}. If field is not found, the type will be be {@link Type#NULL}.
*/
public FieldLocation findField(String field, int minimumDepth) {
if (schema == null) {
// If the schema is null, ignore the subschema and just return Type.UNKNOWN
return Type.UNKNOWN;
return FieldLocation.from(null, Type.UNKNOWN, depth);
}
Type type = schema.getType(field);
if (type != Type.NULL) {
return type;
if (depth >= minimumDepth) {
Type type = schema.getType(field);
if (type != Type.NULL) {
return FieldLocation.from(new Schema.PlainField(field, type), type, depth);
}
String alias = aliases.get(field);
if (alias != null) {
type = schema.getType(alias);
return FieldLocation.from(new Schema.PlainField(alias, type), type, depth);
}
}
String alias = aliases.get(field);
if (alias != null) {
return schema.getType(alias);
}
return subSchema != null && !subSchema.locked ? subSchema.getType(field) : Type.NULL;
return canGoDeeper() ? subSchema.findField(field, minimumDepth) : FieldLocation.from(null, Type.NULL, depth);
}

/**
* Searches for the given field in this layer and below.
*
* @param field The field to search for.
* @return A {@link FieldLocation} for the field. It is non-null. If the schema does not exist, the type will be
* {@link Type#UNKNOWN}. If field is not found, the type will be be {@link Type#NULL}.
*/
public FieldLocation findField(String field) {
// No depth requirement
return findField(field, depth);
}

/**
* Searches for the given field in this layer and below.
*
* @param field The field to search for.
* @return The {@link Schema.Field} or null if not found.
*/
public Schema.Field getField(String field) {
return findField(field).getField();
}

/**
* Searches for the type of the given field in this layer and below.
*
* @param field The field to search for.
* @return The {@link Type} or {@link Type#NULL} if not found, or if the schema is absent, {@link Type#UNKNOWN}.
*/
public Type getType(String field) {
return findField(field).getType();
}

/**
* Checks to see if the given field exists in this layer or below.
*
* @param field The field to search for.
* @return A boolean denoting if the field exists or not.
*/
public boolean hasField(String field) {
if (schema == null) {
return false;
}
if (schema.hasField(field)) {
return true;
}
if (aliases.containsKey(field)) {
return true;
return findField(field).getField() != null;
}

/**
* Adds a new field to the {@link Schema} at this layer.
*
* @param field The name of the field to add.
* @param type The {@link Type} of the field to add.
*/
public void addField(String field, Type type) {
if (schema != null) {
schema.addField(field, type);
}
return subSchema != null && !subSchema.locked && subSchema.hasField(field);
}

/**
* Retrieves the names of all the fields in this and accessible layers below.
*
* @return The {@link Set} of field names after flattening.
*/
public Set<String> getFieldNames() {
Set<String> fields = new HashSet<>();
if (subSchema != null && !subSchema.locked) {
if (canGoDeeper()) {
fields.addAll(subSchema.getFieldNames());
}
if (schema != null) {
schema.getFields().stream().map(Schema.Field::getName).forEach(fields::add);
}
return fields;
}

/**
* Retrieves field names that have aliases but do not exist in the schema at each accessible layer.
*
* @return The {@link Set} of extraneous aliases.
*/
public Set<String> getExtraneousAliases() {
Set<String> fields = new HashSet<>();
if (canGoDeeper()) {
fields.addAll(subSchema.getExtraneousAliases());
}
if (schema != null) {
aliases.keySet().stream().filter(field -> !schema.hasField(field)).forEach(fields::add);
}
return fields;
}

private void increaseDepth() {
depth++;
if (subSchema != null) {
subSchema.increaseDepth();
}
}

private boolean canGoDeeper() {
return subSchema != null && !locked;
}
}
16 changes: 14 additions & 2 deletions src/main/java/com/yahoo/bullet/bql/query/OrderByProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package com.yahoo.bullet.bql.query;

import com.yahoo.bullet.bql.query.LayeredSchema.FieldLocation;
import com.yahoo.bullet.bql.tree.DefaultTraversalVisitor;
import com.yahoo.bullet.bql.tree.ExpressionNode;
import com.yahoo.bullet.bql.tree.FieldExpressionNode;
Expand Down Expand Up @@ -47,11 +48,22 @@ protected Void visitExpression(ExpressionNode node, LayeredSchema layeredSchema)
@Override
protected Void visitFieldExpression(FieldExpressionNode node, LayeredSchema layeredSchema) {
String name = node.getField().getValue();
Type type = layeredSchema.getSubSchema().getType(name);
/*
Since order by is visited after and the top layer in the schema is seen as the schema of the record past
other aggregations, we need to see if we should add additional projections to do the order by (this only
happens in case of RAW queries). So resolve these additional fields by looking past the top layer after unlock
*/
boolean wasLocked = layeredSchema.isLocked();
layeredSchema.unlock();
FieldLocation field = layeredSchema.findField(name, layeredSchema.depth() + 1);
Type type = field.getType();
if (type != Type.NULL) {
layeredSchema.getSchema().addField(name, type);
layeredSchema.addField(name, type);
additionalFields.add(name);
}
if (wasLocked) {
layeredSchema.lock();
}
return null;
}

Expand Down
31 changes: 1 addition & 30 deletions src/main/java/com/yahoo/bullet/bql/query/ProcessedQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@
import com.yahoo.bullet.bql.tree.WindowNode;
import com.yahoo.bullet.common.BulletError;
import com.yahoo.bullet.query.expressions.Operation;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -44,33 +42,6 @@ public enum QueryType {
TOP_K
}

@Getter
@AllArgsConstructor
public enum QueryError {
MULTIPLE_QUERY_TYPES(new BulletError("Query consists of multiple aggregation types.", "Please specify a valid query with only one aggregation type.")),
EMPTY_ALIAS(new BulletError("Cannot have an empty string as an alias.", "Please specify a non-empty string instead.")),
NESTED_AGGREGATE(new BulletError("Aggregates cannot be nested.", "Please remove any nested aggregates.")),
WHERE_WITH_AGGREGATE(new BulletError("WHERE clause cannot contain aggregates.", "If you wish to filter on an aggregate, please specify it in the HAVING clause.")),
GROUP_BY_WITH_AGGREGATE(new BulletError("GROUP BY clause cannot contain aggregates.", "Please remove any aggregates from the GROUP BY clause.")),
MULTIPLE_COUNT_DISTINCT(new BulletError("Cannot have multiple COUNT DISTINCT.", "Please specify only one COUNT DISTINCT.")),
COUNT_DISTINCT_WITH_ORDER_BY(new BulletError("ORDER BY clause is not supported for queries with COUNT DISTINCT.", "Please remove the ORDER BY clause.")),
COUNT_DISTINCT_WITH_LIMIT(new BulletError("LIMIT clause is not supported for queries with COUNT DISTINCT.", "Please remove the LIMIT clause.")),
MULTIPLE_DISTRIBUTION(new BulletError("Cannot have multiple distribution functions.", "Please specify only one distribution function.")),
DISTRIBUTION_AS_VALUE(new BulletError("Distribution functions cannot be treated as values.", Arrays.asList("Please consider using the distribution's output fields instead.",
"For QUANTILE distributions, the output fields are: [\"Value\", \"Quantile\"].",
"For FREQ and CUMFREQ distributions, the output fields are: [\"Probability\", \"Count\", \"Quantile\"]."))),
MULTIPLE_TOP_K(new BulletError("Cannot have multiple TOP functions.", "Please specify only one TOP function.")),
TOP_K_AS_VALUE(new BulletError("TOP function cannot be treated as a value.", Arrays.asList("Please consider using the TOP function's output field instead. The default name is \"Count\".",
"The output field can also be renamed by selecting TOP with an field."))),
TOP_K_WITH_ORDER_BY(new BulletError("ORDER BY clause is not supported for queries with a TOP function.", "Please remove the ORDER BY clause.")),
TOP_K_WITH_LIMIT(new BulletError("LIMIT clause is not supported for queries with a TOP function.", "Please remove the LIMIT clause.")),
HAVING_WITHOUT_GROUP_BY(new BulletError("HAVING clause is only supported with GROUP BY clause.", "Please remove the HAVING clause, and consider using a WHERE clause instead.")),
NON_POSITIVE_DURATION(new BulletError("Query duration must be positive.", "Please specify a positive duration.")),
NON_POSITIVE_LIMIT(new BulletError("LIMIT clause must be positive.", "Please specify a positive LIMIT clause."));

private BulletError error;
}

private Set<QueryType> queryTypes = EnumSet.noneOf(QueryType.class);
private Set<QueryError> queryErrors = EnumSet.noneOf(QueryError.class);

Expand Down Expand Up @@ -156,7 +127,7 @@ public boolean validate() {
}

public List<BulletError> getErrors() {
return queryErrors.stream().map(QueryError::getError).collect(Collectors.toList());
return queryErrors.stream().map(QueryError::format).collect(Collectors.toList());
}

public void addQueryType(QueryType queryType) {
Expand Down

0 comments on commit dd82788

Please sign in to comment.