Skip to content

Commit

Permalink
DRILL-1512: Refactor AvroFormatPlugin
Browse files Browse the repository at this point in the history
Extend EasyFormatPlugin and remove AvroGroupScan and AvroSubScan
  • Loading branch information
StevenMPhillips committed Apr 15, 2015
1 parent 55a9a59 commit bf3db31
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 474 deletions.
Expand Up @@ -18,111 +18,61 @@
package org.apache.drill.exec.store.avro; package org.apache.drill.exec.store.avro;


import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.ImmutableSet;


import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.AbstractWriter; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.StoragePluginOptimizerRule; import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.dfs.BasicFormatMatcher; import org.apache.drill.exec.store.RecordWriter;
import org.apache.drill.exec.store.dfs.FileSelection; import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FormatMatcher; import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.FormatPlugin; import org.apache.drill.exec.store.dfs.easy.EasyWriter;
import org.apache.drill.exec.store.dfs.shim.DrillFileSystem; import org.apache.drill.exec.store.dfs.easy.FileWork;


import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Set;


/** /**
* Format plugin for Avro data files. * Format plugin for Avro data files.
*/ */
public class AvroFormatPlugin implements FormatPlugin { public class AvroFormatPlugin extends EasyFormatPlugin<AvroFormatConfig> {

private final String name;
private final DrillbitContext context;
private final DrillFileSystem fs;
private final StoragePluginConfig storagePluginConfig;
private final AvroFormatConfig formatConfig;
private final BasicFormatMatcher matcher;


public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs,
StoragePluginConfig storagePluginConfig) { StoragePluginConfig storagePluginConfig) {
this(name, context, fs, storagePluginConfig, new AvroFormatConfig()); this(name, context, fs, storagePluginConfig, new AvroFormatConfig());
} }


public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig config, AvroFormatConfig formatPluginConfig) {
StoragePluginConfig storagePluginConfig, AvroFormatConfig formatConfig) { super(name, context, fs, config, formatPluginConfig, true, false, false, false, Lists.newArrayList("avro"), "avro");
this.name = name;
this.context = context;
this.fs = fs;
this.storagePluginConfig = storagePluginConfig;
this.formatConfig = formatConfig;

// XXX - What does 'compressible' mean in this context?
this.matcher = new BasicFormatMatcher(this, fs, Lists.newArrayList("avro"), false);
} }


@Override @Override
public boolean supportsRead() { public boolean supportsPushDown() {
return true; return true;
} }


@Override @Override
public boolean supportsWrite() { public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork, List<SchemaPath> columns) throws ExecutionSetupException {
return false; return new AvroRecordReader(context, fileWork.getPath(), dfs, columns);
}

@Override
public FormatMatcher getMatcher() {
return matcher;
}

@Override
public AbstractWriter getWriter(final PhysicalOperator child, final String location) throws IOException {
throw new UnsupportedOperationException("Unimplemented");
}

@Override
public AbstractGroupScan getGroupScan(final FileSelection selection) throws IOException {
return new AvroGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, null);
}

@Override
public AbstractGroupScan getGroupScan(final FileSelection selection, final List<SchemaPath> columns) throws IOException {
return new AvroGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, columns);
} }


@Override @Override
public Set<StoragePluginOptimizerRule> getOptimizerRules() { public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException {
return ImmutableSet.of(); throw new UnsupportedOperationException("unimplemented");
} }


@Override @Override
public AvroFormatConfig getConfig() { public int getReaderOperatorType() {
return formatConfig; return CoreOperatorType.AVRO_SUB_SCAN_VALUE;
} }


@Override @Override
public StoragePluginConfig getStorageConfig() { public int getWriterOperatorType() {
return storagePluginConfig; throw new UnsupportedOperationException("unimplemented");
} }


@Override
public DrillFileSystem getFileSystem() {
return fs;
}


@Override
public DrillbitContext getContext() {
return context;
}

@Override
public String getName() {
return name;
}
} }

This file was deleted.

Expand Up @@ -73,6 +73,7 @@ public class AvroRecordReader extends AbstractRecordReader {


private DataFileReader<GenericContainer> reader = null; private DataFileReader<GenericContainer> reader = null;
private OperatorContext operatorContext; private OperatorContext operatorContext;
private FileSystem fs;


private static final int DEFAULT_BATCH_SIZE = 1000; private static final int DEFAULT_BATCH_SIZE = 1000;


Expand All @@ -91,6 +92,7 @@ public AvroRecordReader(final FragmentContext fragmentContext,


hadoop = new Path(inputPath); hadoop = new Path(inputPath);
buffer = fragmentContext.getManagedBuffer(); buffer = fragmentContext.getManagedBuffer();
this.fs = fileSystem;


setColumns(projectedColumns); setColumns(projectedColumns);
} }
Expand All @@ -101,7 +103,7 @@ public void setup(final OutputMutator output) throws ExecutionSetupException {
writer = new VectorContainerWriter(output); writer = new VectorContainerWriter(output);


try { try {
reader = new DataFileReader<>(new FsInput(hadoop, new Configuration()), new GenericDatumReader<GenericContainer>()); reader = new DataFileReader<>(new FsInput(hadoop, fs.getConf()), new GenericDatumReader<GenericContainer>());
} catch (IOException e) { } catch (IOException e) {
throw new ExecutionSetupException(e); throw new ExecutionSetupException(e);
} }
Expand Down

0 comments on commit bf3db31

Please sign in to comment.