Skip to content

Commit

Permalink
[HUDI-4165] Support Create/Drop/Show/Refresh Index Syntax for Spark S…
Browse files Browse the repository at this point in the history
…QL (#5761)

* Support Create/Drop/Show/Refresh Index Syntax for Spark SQL
  • Loading branch information
huberylee authored Jun 17, 2022
1 parent 7689e62 commit fec49dc
Show file tree
Hide file tree
Showing 8 changed files with 709 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.common.index;

import java.util.Arrays;
import java.util.Map;

public class HoodieIndex {
private String indexName;
private String[] colNames;
private HoodieIndexType indexType;
private Map<String, Map<String, String>> colOptions;
private Map<String, String> options;

public HoodieIndex() {
}

public HoodieIndex(
String indexName,
String[] colNames,
HoodieIndexType indexType,
Map<String, Map<String, String>> colOptions,
Map<String, String> options) {
this.indexName = indexName;
this.colNames = colNames;
this.indexType = indexType;
this.colOptions = colOptions;
this.options = options;
}

public String getIndexName() {
return indexName;
}

public String[] getColNames() {
return colNames;
}

public HoodieIndexType getIndexType() {
return indexType;
}

public Map<String, Map<String, String>> getColOptions() {
return colOptions;
}

public Map<String, String> getOptions() {
return options;
}

public static Builder builder() {
return new Builder();
}

@Override
public String toString() {
return "HoodieIndex{"
+ "indexName='" + indexName + '\''
+ ", colNames='" + Arrays.toString(colNames) + '\''
+ ", indexType=" + indexType
+ ", colOptions=" + colOptions
+ ", options=" + options
+ '}';
}

public static class Builder {
private String indexName;
private String[] colNames;
private HoodieIndexType indexType;
private Map<String, Map<String, String>> colOptions;
private Map<String, String> options;

public Builder setIndexName(String indexName) {
this.indexName = indexName;
return this;
}

public Builder setColNames(String[] colNames) {
this.colNames = colNames;
return this;
}

public Builder setIndexType(String indexType) {
this.indexType = HoodieIndexType.of(indexType);
return this;
}

public Builder setColOptions(Map<String, Map<String, String>> colOptions) {
this.colOptions = colOptions;
return this;
}

public Builder setOptions(Map<String, String> options) {
this.options = options;
return this;
}

public HoodieIndex build() {
return new HoodieIndex(indexName, colNames, indexType, colOptions, options);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.common.index;

import org.apache.hudi.exception.HoodieIndexException;

import java.util.Arrays;

public enum HoodieIndexType {
LUCENE((byte) 1);

private final byte type;

HoodieIndexType(byte type) {
this.type = type;
}

public byte getValue() {
return type;
}

public static HoodieIndexType of(byte indexType) {
return Arrays.stream(HoodieIndexType.values())
.filter(t -> t.type == indexType)
.findAny()
.orElseThrow(() ->
new HoodieIndexException("Unknown hoodie index type:" + indexType));
}

public static HoodieIndexType of(String indexType) {
return Arrays.stream(HoodieIndexType.values())
.filter(t -> t.name().equals(indexType.toUpperCase()))
.findAny()
.orElseThrow(() ->
new HoodieIndexException("Unknown hoodie index type:" + indexType));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@
statement
: compactionStatement #compactionCommand
| CALL multipartIdentifier '(' (callArgument (',' callArgument)*)? ')' #call
| CREATE INDEX (IF NOT EXISTS)? identifier ON TABLE?
tableIdentifier (USING indexType=identifier)?
LEFT_PAREN columns=multipartIdentifierPropertyList RIGHT_PAREN
(OPTIONS indexOptions=propertyList)? #createIndex
| DROP INDEX (IF EXISTS)? identifier ON TABLE? tableIdentifier #dropIndex
| SHOW INDEXES (FROM | IN) TABLE? tableIdentifier #showIndexes
| REFRESH INDEX identifier ON TABLE? tableIdentifier #refreshIndex
| .*? #passThrough
;

Expand Down Expand Up @@ -99,6 +106,14 @@
| MINUS? BIGDECIMAL_LITERAL #bigDecimalLiteral
;

multipartIdentifierPropertyList
: multipartIdentifierProperty (COMMA multipartIdentifierProperty)*
;

multipartIdentifierProperty
: multipartIdentifier (OPTIONS options=propertyList)?
;

multipartIdentifier
: parts+=identifier ('.' parts+=identifier)*
;
Expand All @@ -114,9 +129,53 @@
;

nonReserved
: CALL | COMPACTION | RUN | SCHEDULE | ON | SHOW | LIMIT
: CALL
| COMPACTION
| CREATE
| DROP
| EXISTS
| FROM
| IN
| INDEX
| INDEXES
| IF
| LIMIT
| NOT
| ON
| OPTIONS
| REFRESH
| RUN
| SCHEDULE
| SHOW
| TABLE
| USING
;

propertyList
: LEFT_PAREN property (COMMA property)* RIGHT_PAREN
;

property
: key=propertyKey (EQ? value=propertyValue)?
;

propertyKey
: identifier (DOT identifier)*
| STRING
;

propertyValue
: INTEGER_VALUE
| DECIMAL_VALUE
| booleanValue
| STRING
;

LEFT_PAREN: '(';
RIGHT_PAREN: ')';
COMMA: ',';
DOT: '.';

ALL: 'ALL';
AT: 'AT';
CALL: 'CALL';
Expand All @@ -132,6 +191,21 @@
FALSE: 'FALSE';
INTERVAL: 'INTERVAL';
TO: 'TO';
CREATE: 'CREATE';
INDEX: 'INDEX';
INDEXES: 'INDEXES';
IF: 'IF';
NOT: 'NOT';
EXISTS: 'EXISTS';
TABLE: 'TABLE';
USING: 'USING';
OPTIONS: 'OPTIONS';
DROP: 'DROP';
FROM: 'FROM';
IN: 'IN';
REFRESH: 'REFRESH';

EQ: '=' | '==';

PLUS: '+';
MINUS: '-';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.types.StringType

/**
* The logical plan of the CREATE INDEX command.
*/
case class CreateIndex(
table: LogicalPlan,
indexName: String,
indexType: String,
ignoreIfExists: Boolean,
columns: Seq[(Attribute, Map[String, String])],
properties: Map[String, String],
override val output: Seq[Attribute] = CreateIndex.getOutputAttrs) extends Command {

override def children: Seq[LogicalPlan] = Seq(table)

override lazy val resolved: Boolean = table.resolved && columns.forall(_._1.resolved)

def withNewChildrenInternal(newChild: IndexedSeq[LogicalPlan]): CreateIndex = {
copy(table = newChild.head)
}
}

object CreateIndex {
def getOutputAttrs: Seq[Attribute] = Seq.empty
}

/**
* The logical plan of the DROP INDEX command.
*/
case class DropIndex(
table: LogicalPlan,
indexName: String,
ignoreIfNotExists: Boolean,
override val output: Seq[Attribute] = DropIndex.getOutputAttrs) extends Command {

override def children: Seq[LogicalPlan] = Seq(table)

def withNewChildrenInternal(newChild: IndexedSeq[LogicalPlan]): DropIndex = {
copy(table = newChild.head)
}
}

object DropIndex {
def getOutputAttrs: Seq[Attribute] = Seq.empty
}

/**
* The logical plan of the SHOW INDEXES command.
*/
case class ShowIndexes(
table: LogicalPlan,
override val output: Seq[Attribute] = ShowIndexes.getOutputAttrs) extends Command {

override def children: Seq[LogicalPlan] = Seq(table)

def withNewChildrenInternal(newChild: IndexedSeq[LogicalPlan]): ShowIndexes = {
copy(table = newChild.head)
}
}

object ShowIndexes {
def getOutputAttrs: Seq[Attribute] = Seq(
AttributeReference("index_name", StringType, nullable = false)(),
AttributeReference("col_name", StringType, nullable = false)(),
AttributeReference("index_type", StringType, nullable = false)(),
AttributeReference("col_options", StringType, nullable = true)(),
AttributeReference("options", StringType, nullable = true)()
)
}

/**
* The logical plan of the REFRESH INDEX command.
*/
case class RefreshIndex(
table: LogicalPlan,
indexName: String,
override val output: Seq[Attribute] = RefreshIndex.getOutputAttrs) extends Command {

override def children: Seq[LogicalPlan] = Seq(table)

def withNewChildrenInternal(newChild: IndexedSeq[LogicalPlan]): RefreshIndex = {
copy(table = newChild.head)
}
}

object RefreshIndex {
def getOutputAttrs: Seq[Attribute] = Seq.empty
}
Loading

0 comments on commit fec49dc

Please sign in to comment.