Skip to content

Commit

Permalink
DRILL-3810: Add Avro schema support
Browse files Browse the repository at this point in the history
Also updates the FileFormatPlugin to support providing schema.
  • Loading branch information
kameshb authored and jacques-n committed Nov 2, 2015
1 parent 454b499 commit ce593eb
Show file tree
Hide file tree
Showing 11 changed files with 293 additions and 25 deletions.
@@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.avro;

import java.io.IOException;
import java.util.List;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.mapred.FsInput;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.exec.store.dfs.FormatSelection;
import org.apache.hadoop.fs.Path;

import com.google.common.collect.Lists;

public class AvroDrillTable extends DrillTable {

static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AvroDrillTable.class);
private DataFileReader<GenericContainer> reader = null;

public AvroDrillTable(String storageEngineName,
FileSystemPlugin plugin,
String userName,
FormatSelection selection) {
super(storageEngineName, plugin, userName, selection);
List<String> asFiles = selection.getAsFiles();
Path path = new Path(asFiles.get(0));
try {
reader = new DataFileReader<>(new FsInput(path, plugin.getFsConf()), new GenericDatumReader<GenericContainer>());
} catch (IOException e) {
throw UserException.dataReadError(e).build(logger);
}
}

@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
List<RelDataType> typeList = Lists.newArrayList();
List<String> fieldNameList = Lists.newArrayList();

Schema schema = reader.getSchema();
for (Field field : schema.getFields()) {
fieldNameList.add(field.name());
typeList.add(getNullableRelDataTypeFromAvroType(typeFactory, field.schema()));
}

return typeFactory.createStructType(typeList, fieldNameList);
}

private RelDataType getNullableRelDataTypeFromAvroType(
RelDataTypeFactory typeFactory, Schema fieldSchema) {
RelDataType relDataType = null;
switch (fieldSchema.getType()) {
case ARRAY:
RelDataType eleType = getNullableRelDataTypeFromAvroType(typeFactory, fieldSchema.getElementType());
relDataType = typeFactory.createArrayType(eleType, -1);
break;
case BOOLEAN:
relDataType = typeFactory.createSqlType(SqlTypeName.BOOLEAN);
break;
case BYTES:
relDataType = typeFactory.createSqlType(SqlTypeName.BINARY);
break;
case DOUBLE:
relDataType = typeFactory.createSqlType(SqlTypeName.DOUBLE);
break;
case FIXED:
logger.error("{} type not supported", fieldSchema.getType());
throw UserException.unsupportedError().message("FIXED type not supported yet").build(logger);
case FLOAT:
relDataType = typeFactory.createSqlType(SqlTypeName.FLOAT);
break;
case INT:
relDataType = typeFactory.createSqlType(SqlTypeName.INTEGER);
break;
case LONG:
relDataType = typeFactory.createSqlType(SqlTypeName.BIGINT);
break;
case MAP:
RelDataType valueType = getNullableRelDataTypeFromAvroType(typeFactory, fieldSchema.getValueType());
RelDataType keyType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
relDataType = typeFactory.createMapType(keyType, valueType);
break;
case NULL:
relDataType = typeFactory.createSqlType(SqlTypeName.NULL);
break;
case RECORD:
// List<String> fieldNameList = Lists.newArrayList();
// List<RelDataType> fieldRelDataTypeList = Lists.newArrayList();
// for(Field field : fieldSchema.getFields()) {
// fieldNameList.add(field.name());
// fieldRelDataTypeList.add(getNullableRelDataTypeFromAvroType(typeFactory, field.schema()));
// }
// relDataType = typeFactory.createStructType(fieldRelDataTypeList, fieldNameList);

//TODO This has to be mapped to struct type but because of calcite issue,
//for now mapping it to map type.
keyType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
valueType = typeFactory.createSqlType(SqlTypeName.ANY);
relDataType = typeFactory.createMapType(keyType, valueType);
break;
case ENUM:
case STRING:
relDataType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
break;
case UNION:
RelDataType optinalType = getNullableRelDataTypeFromAvroType(typeFactory, fieldSchema.getTypes().get(1));
relDataType = typeFactory.createTypeWithNullability(optinalType, true);
break;
}
return relDataType;
}
}
Expand Up @@ -17,37 +17,50 @@
*/ */
package org.apache.drill.exec.store.avro; package org.apache.drill.exec.store.avro;


import com.google.common.collect.Lists; import java.io.IOException;
import java.util.List;
import java.util.regex.Pattern;


import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.RecordWriter; import org.apache.drill.exec.store.RecordWriter;
import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.exec.store.dfs.FormatMatcher;
import org.apache.drill.exec.store.dfs.FormatSelection;
import org.apache.drill.exec.store.dfs.MagicString;
import org.apache.drill.exec.store.dfs.NamedFormatPluginConfig;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin; import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasyWriter; import org.apache.drill.exec.store.dfs.easy.EasyWriter;
import org.apache.drill.exec.store.dfs.easy.FileWork; import org.apache.drill.exec.store.dfs.easy.FileWork;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;


import java.io.IOException; import com.google.common.collect.ImmutableList;
import java.util.List; import com.google.common.collect.Lists;


/** /**
* Format plugin for Avro data files. * Format plugin for Avro data files.
*/ */
public class AvroFormatPlugin extends EasyFormatPlugin<AvroFormatConfig> { public class AvroFormatPlugin extends EasyFormatPlugin<AvroFormatConfig> {


private final AvroFormatMatcher matcher;

public AvroFormatPlugin(String name, DrillbitContext context, Configuration fsConf, public AvroFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
StoragePluginConfig storagePluginConfig) { StoragePluginConfig storagePluginConfig) {
this(name, context, fsConf, storagePluginConfig, new AvroFormatConfig()); this(name, context, fsConf, storagePluginConfig, new AvroFormatConfig());
} }


public AvroFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, AvroFormatConfig formatPluginConfig) { public AvroFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, AvroFormatConfig formatPluginConfig) {
super(name, context, fsConf, config, formatPluginConfig, true, false, true, false, Lists.newArrayList("avro"), "avro"); super(name, context, fsConf, config, formatPluginConfig, true, false, true, false, Lists.newArrayList("avro"), "avro");
this.matcher = new AvroFormatMatcher(this);
} }


@Override @Override
Expand Down Expand Up @@ -75,5 +88,32 @@ public int getWriterOperatorType() {
throw new UnsupportedOperationException("unimplemented"); throw new UnsupportedOperationException("unimplemented");
} }


@Override
public FormatMatcher getMatcher() {
return this.matcher;
}

private static class AvroFormatMatcher extends BasicFormatMatcher {

public AvroFormatMatcher(AvroFormatPlugin plugin) {
super(plugin, ImmutableList.of(Pattern.compile(".*\\.avro$")), ImmutableList.<MagicString>of());
}

@Override
public DrillTable isReadable(DrillFileSystem fs,
FileSelection selection, FileSystemPlugin fsPlugin,
String storageEngineName, String userName) throws IOException {
if (isFileReadable(fs, selection.getFirstPath(fs))) {
if (plugin.getName() != null) {
NamedFormatPluginConfig namedConfig = new NamedFormatPluginConfig();
namedConfig.name = plugin.getName();
return new AvroDrillTable(storageEngineName, fsPlugin, userName, new FormatSelection(namedConfig, selection));
} else {
return new AvroDrillTable(storageEngineName, fsPlugin, userName, new FormatSelection(plugin.getConfig(), selection));
}
}
return null;
}
}


} }
Expand Up @@ -23,6 +23,8 @@
import java.util.List; import java.util.List;
import java.util.regex.Pattern; import java.util.regex.Pattern;


import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -71,14 +73,16 @@ public boolean supportDirectoryReads() {
} }


@Override @Override
public FormatSelection isReadable(DrillFileSystem fs, FileSelection selection) throws IOException { public DrillTable isReadable(DrillFileSystem fs,
FileSelection selection, FileSystemPlugin fsPlugin,
String storageEngineName, String userName) throws IOException {
if (isFileReadable(fs, selection.getFirstPath(fs))) { if (isFileReadable(fs, selection.getFirstPath(fs))) {
if (plugin.getName() != null) { if (plugin.getName() != null) {
NamedFormatPluginConfig namedConfig = new NamedFormatPluginConfig(); NamedFormatPluginConfig namedConfig = new NamedFormatPluginConfig();
namedConfig.name = plugin.getName(); namedConfig.name = plugin.getName();
return new FormatSelection(namedConfig, selection); return new DynamicDrillTable(fsPlugin, storageEngineName, userName, new FormatSelection(namedConfig, selection));
} else { } else {
return new FormatSelection(plugin.getConfig(), selection); return new DynamicDrillTable(fsPlugin, storageEngineName, userName, new FormatSelection(plugin.getConfig(), selection));
} }
} }
return null; return null;
Expand Down
Expand Up @@ -17,6 +17,7 @@
*/ */
package org.apache.drill.exec.store.dfs; package org.apache.drill.exec.store.dfs;


import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;


import java.io.IOException; import java.io.IOException;
Expand All @@ -25,7 +26,9 @@ public abstract class FormatMatcher {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatMatcher.class); static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatMatcher.class);


public abstract boolean supportDirectoryReads(); public abstract boolean supportDirectoryReads();
public abstract FormatSelection isReadable(DrillFileSystem fs, FileSelection selection) throws IOException; public abstract DrillTable isReadable(DrillFileSystem fs,
FileSelection selection, FileSystemPlugin fsPlugin,
String storageEngineName, String userName) throws IOException;
public abstract boolean isFileReadable(DrillFileSystem fs, FileStatus status) throws IOException; public abstract boolean isFileReadable(DrillFileSystem fs, FileStatus status) throws IOException;
public abstract FormatPlugin getFormatPlugin(); public abstract FormatPlugin getFormatPlugin();
} }
Expand Up @@ -27,6 +27,7 @@
import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.AbstractWriter; import org.apache.drill.exec.physical.base.AbstractWriter;
import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.StoragePluginOptimizerRule; import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
Expand Down
Expand Up @@ -40,7 +40,6 @@
import org.apache.drill.exec.planner.logical.CreateTableEntry; import org.apache.drill.exec.planner.logical.CreateTableEntry;
import org.apache.drill.exec.planner.logical.DrillTable; import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.planner.logical.DrillViewTable; import org.apache.drill.exec.planner.logical.DrillViewTable;
import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.planner.logical.FileSystemCreateTableEntry; import org.apache.drill.exec.planner.logical.FileSystemCreateTableEntry;
import org.apache.drill.exec.planner.sql.ExpandingConcurrentMap; import org.apache.drill.exec.planner.sql.ExpandingConcurrentMap;
import org.apache.drill.exec.store.AbstractSchema; import org.apache.drill.exec.store.AbstractSchema;
Expand Down Expand Up @@ -326,9 +325,9 @@ public DrillTable create(String key) {
if (fileSelection.containsDirectories(fs)) { if (fileSelection.containsDirectories(fs)) {
for (FormatMatcher m : dirMatchers) { for (FormatMatcher m : dirMatchers) {
try { try {
Object selection = m.isReadable(fs, fileSelection); DrillTable table = m.isReadable(fs, fileSelection, plugin, storageEngineName, schemaConfig.getUserName());
if (selection != null) { if (table != null) {
return new DynamicDrillTable(plugin, storageEngineName, schemaConfig.getUserName(), selection); return table;
} }
} catch (IOException e) { } catch (IOException e) {
logger.debug("File read failed.", e); logger.debug("File read failed.", e);
Expand All @@ -338,9 +337,9 @@ public DrillTable create(String key) {
} }


for (FormatMatcher m : fileMatchers) { for (FormatMatcher m : fileMatchers) {
Object selection = m.isReadable(fs, fileSelection); DrillTable table = m.isReadable(fs, fileSelection, plugin, storageEngineName, schemaConfig.getUserName());
if (selection != null) { if (table != null) {
return new DynamicDrillTable(plugin, storageEngineName, schemaConfig.getUserName(), selection); return table;
} }
} }
return null; return null;
Expand Down
Expand Up @@ -23,7 +23,6 @@
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;


import com.google.common.base.Preconditions;
import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.ArrayUtils;
import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.SchemaPath;
Expand Down Expand Up @@ -57,6 +56,7 @@


import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;

import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;


public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements FormatPlugin { public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements FormatPlugin {
Expand Down
Expand Up @@ -21,9 +21,6 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;


import com.fasterxml.jackson.annotation.JsonInclude;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.FormatPluginConfig;
Expand All @@ -35,16 +32,18 @@
import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.RecordWriter; import org.apache.drill.exec.store.RecordWriter;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSystemConfig; import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin; import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasyWriter; import org.apache.drill.exec.store.dfs.easy.EasyWriter;
import org.apache.drill.exec.store.dfs.easy.FileWork; import org.apache.drill.exec.store.dfs.easy.FileWork;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig; import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;


import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;


public class public class
Expand Down
Expand Up @@ -32,6 +32,8 @@
import org.apache.drill.exec.physical.base.AbstractWriter; import org.apache.drill.exec.physical.base.AbstractWriter;
import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.impl.WriterRecordBatch; import org.apache.drill.exec.physical.impl.WriterRecordBatch;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.DrillbitContext;
Expand All @@ -42,6 +44,7 @@
import org.apache.drill.exec.store.dfs.DrillPathFilter; import org.apache.drill.exec.store.dfs.DrillPathFilter;
import org.apache.drill.exec.store.dfs.FileSelection; import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.FileSystemConfig; import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.exec.store.dfs.FormatMatcher; import org.apache.drill.exec.store.dfs.FormatMatcher;
import org.apache.drill.exec.store.dfs.FormatPlugin; import org.apache.drill.exec.store.dfs.FormatPlugin;
import org.apache.drill.exec.store.dfs.FormatSelection; import org.apache.drill.exec.store.dfs.FormatSelection;
Expand Down Expand Up @@ -204,14 +207,17 @@ public boolean supportDirectoryReads() {
} }


@Override @Override
public FormatSelection isReadable(DrillFileSystem fs, FileSelection selection) throws IOException { public DrillTable isReadable(DrillFileSystem fs, FileSelection selection,
FileSystemPlugin fsPlugin, String storageEngineName, String userName)
throws IOException {
// TODO: we only check the first file for directory reading. This is because // TODO: we only check the first file for directory reading. This is because
if(selection.containsDirectories(fs)){ if(selection.containsDirectories(fs)){
if(isDirReadable(fs, selection.getFirstPath(fs))){ if(isDirReadable(fs, selection.getFirstPath(fs))){
return new FormatSelection(plugin.getConfig(), expandSelection(fs, selection)); return new DynamicDrillTable(fsPlugin, storageEngineName, userName,
new FormatSelection(plugin.getConfig(), expandSelection(fs, selection)));
} }
} }
return super.isReadable(fs, selection); return super.isReadable(fs, selection, fsPlugin, storageEngineName, userName);
} }


private FileSelection expandSelection(DrillFileSystem fs, FileSelection selection) throws IOException { private FileSelection expandSelection(DrillFileSystem fs, FileSelection selection) throws IOException {
Expand Down

0 comments on commit ce593eb

Please sign in to comment.