From aabf082ce4593907e921754234a800185965ba1d Mon Sep 17 00:00:00 2001 From: Vlad Storona Date: Thu, 21 Jun 2018 16:41:17 +0300 Subject: [PATCH] DRILL-6179: Added pcapng-format support --- .../client/src/protobuf/UserBitShared.pb.cc | 13 +- .../client/src/protobuf/UserBitShared.pb.h | 5 +- exec/java-exec/pom.xml | 5 + .../drill/exec/store/pcap/decoder/Packet.java | 16 +- .../exec/store/pcapng/PcapngFormatConfig.java | 52 +++ .../exec/store/pcapng/PcapngFormatPlugin.java | 76 +++ .../exec/store/pcapng/PcapngRecordReader.java | 214 +++++++++ .../store/pcapng/decoder/PacketDecoder.java | 61 +++ .../drill/exec/store/pcapng/package-info.java | 23 + .../exec/store/pcapng/schema/Column.java | 28 ++ .../store/pcapng/schema/DummyArrayImpl.java | 34 ++ .../exec/store/pcapng/schema/DummyImpl.java | 34 ++ .../exec/store/pcapng/schema/Schema.java | 441 ++++++++++++++++++ .../drill/exec/store/pcapng/schema/Util.java | 59 +++ .../resources/bootstrap-storage-plugins.json | 3 + .../dfs/TestFormatPluginOptionExtractor.java | 1 + .../exec/store/pcapng/TestPcapngHeaders.java | 212 +++++++++ .../store/pcapng/TestPcapngRecordReader.java | 100 ++++ .../resources/store/pcapng/example.pcapng | Bin 0 -> 512 bytes .../test/resources/store/pcapng/sniff.pcapng | Bin 0 -> 33464 bytes .../drill/exec/proto/UserBitShared.java | 21 +- .../exec/proto/beans/CoreOperatorType.java | 4 +- .../src/main/protobuf/UserBitShared.proto | 1 + 23 files changed, 1380 insertions(+), 23 deletions(-) create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/decoder/PacketDecoder.java create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Column.java create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyArrayImpl.java create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyImpl.java create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Schema.java create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Util.java create mode 100644 exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngHeaders.java create mode 100644 exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java create mode 100644 exec/java-exec/src/test/resources/store/pcapng/example.pcapng create mode 100644 exec/java-exec/src/test/resources/store/pcapng/sniff.pcapng diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.cc b/contrib/native/client/src/protobuf/UserBitShared.pb.cc index 739804844bf..8f98e06a0a2 100644 --- a/contrib/native/client/src/protobuf/UserBitShared.pb.cc +++ b/contrib/native/client/src/protobuf/UserBitShared.pb.cc @@ -750,7 +750,7 @@ void protobuf_AddDesc_UserBitShared_2eproto() { "TATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020" "\000\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022" "\014\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005" - "\022\032\n\026CANCELLATION_REQUESTED\020\006*\316\010\n\020CoreOpe" + "\022\032\n\026CANCELLATION_REQUESTED\020\006*\343\010\n\020CoreOpe" "ratorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAS" "T_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE" "\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HAS" @@ -778,11 +778,11 @@ void protobuf_AddDesc_UserBitShared_2eproto() { "ER\0200\022\026\n\022OPEN_TSDB_SUB_SCAN\0201\022\017\n\013JSON_WRI" "TER\0202\022\026\n\022HTPPD_LOG_SUB_SCAN\0203\022\022\n\016IMAGE_S" "UB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\0205\022\023\n\017PART" - "ITION_LIMIT\0206*g\n\nSaslStatus\022\020\n\014SASL_UNKN" - "OWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRES" - "S\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B." - "\n\033org.apache.drill.exec.protoB\rUserBitSh" - "aredH\001", 5406); + "ITION_LIMIT\0206\022\023\n\017PCAPNG_SUB_SCAN\0207*g\n\nSa" + "slStatus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START" + "\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS" + "\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org.apache.drill." + "exec.protoB\rUserBitSharedH\001", 5427); ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile( "UserBitShared.proto", &protobuf_RegisterTypes); UserCredentials::default_instance_ = new UserCredentials(); @@ -958,6 +958,7 @@ bool CoreOperatorType_IsValid(int value) { case 52: case 53: case 54: + case 55: return true; default: return false; diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.h b/contrib/native/client/src/protobuf/UserBitShared.pb.h index 4599abb23aa..a07cbfa67e8 100644 --- a/contrib/native/client/src/protobuf/UserBitShared.pb.h +++ b/contrib/native/client/src/protobuf/UserBitShared.pb.h @@ -258,11 +258,12 @@ enum CoreOperatorType { HTPPD_LOG_SUB_SCAN = 51, IMAGE_SUB_SCAN = 52, SEQUENCE_SUB_SCAN = 53, - PARTITION_LIMIT = 54 + PARTITION_LIMIT = 54, + PCAPNG_SUB_SCAN = 55 }; bool CoreOperatorType_IsValid(int value); const CoreOperatorType CoreOperatorType_MIN = SINGLE_SENDER; -const CoreOperatorType CoreOperatorType_MAX = PARTITION_LIMIT; +const CoreOperatorType CoreOperatorType_MAX = PCAPNG_SUB_SCAN; const int CoreOperatorType_ARRAYSIZE = CoreOperatorType_MAX + 1; const ::google::protobuf::EnumDescriptor* CoreOperatorType_descriptor(); diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml index f175c654c01..f4068952ee5 100644 --- a/exec/java-exec/pom.xml +++ b/exec/java-exec/pom.xml @@ -534,6 +534,11 @@ metadata-extractor 2.11.0 + + fr.bmartel + pcapngdecoder + 1.2 + diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java index 9cc98de9c44..a0a07a99d11 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java @@ -42,18 +42,18 @@ public class Packet { private long timestamp; private int originalLength; - private byte[] raw; + protected byte[] raw; // index into the raw data where the current ethernet packet starts private int etherOffset; // index into the raw data where the current IP packet starts. Should be just after etherOffset - private int ipOffset; + protected int ipOffset; private int packetLength; - private int etherProtocol; - private int protocol; + protected int etherProtocol; + protected int protocol; - private boolean isRoutingV6; + protected boolean isRoutingV6; @SuppressWarnings("WeakerAccess") public boolean readPcap(final InputStream in, final boolean byteOrder, final int maxLength) throws IOException { @@ -379,7 +379,7 @@ private int ipV4HeaderLength() { return (getByte(raw, ipOffset) & 0xf) * 4; } - private int ipVersion() { + protected int ipVersion() { return getByte(raw, ipOffset) >>> 4; } @@ -409,12 +409,12 @@ private void decodeEtherPacket() { // everything is decoded lazily } - private int processIpV4Packet() { + protected int processIpV4Packet() { validateIpV4Packet(); return getByte(raw, ipOffset + 9); } - private int processIpV6Packet() { + protected int processIpV6Packet() { Preconditions.checkState(ipVersion() == 6, "Should have seen IP version 6, got %d", ipVersion()); int headerLength = 40; int nextHeader = raw[ipOffset + 6] & 0xff; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java new file mode 100644 index 00000000000..7ff875acf82 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng; + +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.common.logical.FormatPluginConfig; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +@JsonTypeName("pcapng") +public class PcapngFormatConfig implements FormatPluginConfig { + + public List extensions = Collections.singletonList("pcapng"); + + public List getExtensions() { + return extensions; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PcapngFormatConfig that = (PcapngFormatConfig) o; + return Objects.equals(extensions, that.extensions); + } + + @Override + public int hashCode() { + return Objects.hash(extensions); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java new file mode 100644 index 00000000000..832c0ec3bd3 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng; + +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.RecordReader; +import org.apache.drill.exec.store.RecordWriter; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin; +import org.apache.drill.exec.store.dfs.easy.EasyWriter; +import org.apache.drill.exec.store.dfs.easy.FileWork; +import org.apache.hadoop.conf.Configuration; + +import java.util.List; + +public class PcapngFormatPlugin extends EasyFormatPlugin { + + public static final String DEFAULT_NAME = "pcapng"; + + public PcapngFormatPlugin(String name, DrillbitContext context, Configuration fsConf, + StoragePluginConfig storagePluginConfig) { + this(name, context, fsConf, storagePluginConfig, new PcapngFormatConfig()); + } + + public PcapngFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, PcapngFormatConfig formatPluginConfig) { + super(name, context, fsConf, config, formatPluginConfig, true, + false, true, false, + formatPluginConfig.getExtensions(), DEFAULT_NAME); + } + + @Override + public boolean supportsPushDown() { + return true; + } + + @Override + public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, + FileWork fileWork, List columns, + String userName) { + return new PcapngRecordReader(fileWork.getPath(), dfs, columns); + } + + @Override + public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) { + throw new UnsupportedOperationException("unimplemented"); + } + + @Override + public int getReaderOperatorType() { + return UserBitShared.CoreOperatorType.PCAPNG_SUB_SCAN_VALUE; + } + + @Override + public int getWriterOperatorType() { + throw new UnsupportedOperationException("unimplemented"); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java new file mode 100644 index 00000000000..b1c5f242729 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng; + +import fr.bmartel.pcapdecoder.PcapDecoder; +import fr.bmartel.pcapdecoder.structure.types.IPcapngType; +import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock; +import org.apache.commons.io.IOUtils; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.store.AbstractRecordReader; +import org.apache.drill.exec.store.pcapng.schema.Column; +import org.apache.drill.exec.store.pcapng.schema.DummyArrayImpl; +import org.apache.drill.exec.store.pcapng.schema.DummyImpl; +import org.apache.drill.exec.store.pcapng.schema.Schema; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.function.BiConsumer; + +public class PcapngRecordReader extends AbstractRecordReader { + private static final Logger logger = LoggerFactory.getLogger(PcapngRecordReader.class); + + // batch size should not exceed max allowed record count + private static final int BATCH_SIZE = 40_000; + + private final Path pathToFile; + private OutputMutator output; + private List projectedCols; + private FileSystem fs; + private FSDataInputStream in; + private List columns; + + private Iterator it; + + public PcapngRecordReader(final String pathToFile, + final FileSystem fileSystem, + final List columns) { + this.fs = fileSystem; + this.pathToFile = fs.makeQualified(new Path(pathToFile)); + this.columns = columns; + setColumns(columns); + } + + @Override + public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException { + try { + + this.output = output; + this.in = fs.open(pathToFile); + PcapDecoder decoder = new PcapDecoder(IOUtils.toByteArray(in)); + decoder.decode(); + this.it = decoder.getSectionList().iterator(); + setupProjection(); + } catch (IOException io) { + throw UserException.dataReadError(io) + .addContext("File name:", pathToFile.toUri().getPath()) + .build(logger); + } + } + + @Override + public int next() { + if (isSkipQuery()) { + return iterateOverBlocks((block, counter) -> { + }); + } else { + return iterateOverBlocks((block, counter) -> putToTable((IEnhancedPacketBLock) block, counter)); + } + } + + private void putToTable(IEnhancedPacketBLock bLock, Integer counter) { + for (ProjectedColumnInfo pci : projectedCols) { + pci.getColumn().process(bLock, pci.getVv(), counter); + } + } + + @Override + public void close() throws Exception { + if (in != null) { + in.close(); + in = null; + } + } + + private void setupProjection() { + if (isSkipQuery()) { + projectedCols = projectNone(); + } else if (isStarQuery()) { + projectedCols = projectAllCols(Schema.getColumnsNames()); + } else { + projectedCols = projectCols(columns); + } + } + + private List projectNone() { + List pciBuilder = new ArrayList<>(); + pciBuilder.add(makeColumn("dummy", new DummyImpl())); + return Collections.unmodifiableList(pciBuilder); + } + + private List projectAllCols(final Set columns) { + List pciBuilder = new ArrayList<>(); + for (String colName : columns) { + pciBuilder.add(makeColumn(colName, Schema.getColumns().get(colName))); + } + return Collections.unmodifiableList(pciBuilder); + } + + private List projectCols(final List columns) { + List pciBuilder = new ArrayList<>(); + for (SchemaPath schemaPath : columns) { + String projectedName = schemaPath.rootName(); + if (schemaPath.isArray()) { + pciBuilder.add(makeColumn(projectedName, new DummyArrayImpl())); + } else if (Schema.getColumns().containsKey(projectedName.toLowerCase())) { + pciBuilder.add(makeColumn(projectedName, + Schema.getColumns().get(projectedName.toLowerCase()))); + } else { + pciBuilder.add(makeColumn(projectedName, new DummyImpl())); + } + } + return Collections.unmodifiableList(pciBuilder); + } + + private ProjectedColumnInfo makeColumn(final String colName, final Column column) { + MaterializedField field = MaterializedField.create(colName, column.getMinorType()); + ValueVector vector = getValueVector(field, output); + return new ProjectedColumnInfo(vector, column, colName); + } + + private ValueVector getValueVector(final MaterializedField field, final OutputMutator output) { + try { + TypeProtos.MajorType majorType = field.getType(); + final Class clazz = TypeHelper.getValueVectorClass( + majorType.getMinorType(), majorType.getMode()); + + return output.addField(field, clazz); + } catch (SchemaChangeException sce) { + throw UserException.internalError(sce) + .addContext("The addition of this field is incompatible with this OutputMutator's capabilities") + .build(logger); + } + } + + private Integer iterateOverBlocks(BiConsumer consumer) { + int counter = 0; + while (it.hasNext() && counter < BATCH_SIZE) { + IPcapngType block = it.next(); + if (block instanceof IEnhancedPacketBLock) { + consumer.accept(block, counter); + counter++; + } + } + return counter; + } + + private static class ProjectedColumnInfo { + + private ValueVector vv; + private Column colDef; + private String columnName; + + ProjectedColumnInfo(ValueVector vv, Column colDef, String columnName) { + this.vv = vv; + this.colDef = colDef; + this.columnName = columnName; + } + + public ValueVector getVv() { + return vv; + } + + Column getColumn() { + return colDef; + } + + public String getColumnName() { + return columnName; + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/decoder/PacketDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/decoder/PacketDecoder.java new file mode 100644 index 00000000000..ea5d83104e4 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/decoder/PacketDecoder.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng.decoder; + +import org.apache.drill.exec.store.pcap.decoder.Packet; +import org.apache.drill.exec.store.pcap.decoder.PacketConstants; + +import static org.apache.drill.exec.store.pcap.PcapFormatUtils.getByte; +import static org.apache.drill.exec.store.pcap.PcapFormatUtils.getShort; + +public class PacketDecoder extends Packet { + + @SuppressWarnings("WeakerAccess") + public boolean readPcapng(final byte[] raw) { + this.raw = raw; + return decodeEtherPacket(); + } + + private boolean decodeEtherPacket() { + etherProtocol = getShort(raw, PacketConstants.PACKET_PROTOCOL_OFFSET); + ipOffset = PacketConstants.IP_OFFSET; + if (isIpV4Packet()) { + protocol = processIpV4Packet(); + return true; + } else if (isIpV6Packet()) { + int tmp = processIpV6Packet(); + if (tmp != -1) { + protocol = tmp; + } + return true; + } else if (isPPPoV6Packet()) { + protocol = getByte(raw, 48); + return true; + } + return false; + } + + @Override + protected int processIpV6Packet() { + try { + return super.processIpV6Packet(); + } catch (IllegalStateException ise) { + return -1; + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java new file mode 100644 index 00000000000..dafeaa399e9 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * For comments on realization of this format plugin look at : + * + * @see Jira + */ +package org.apache.drill.exec.store.pcapng; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Column.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Column.java new file mode 100644 index 00000000000..109b7ddc96f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Column.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng.schema; + +import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.vector.ValueVector; + +public interface Column { + TypeProtos.MajorType getMinorType(); + + void process(IEnhancedPacketBLock block, ValueVector vv, int count); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyArrayImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyArrayImpl.java new file mode 100644 index 00000000000..2023d195ea6 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyArrayImpl.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng.schema; + +import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.vector.ValueVector; + +public class DummyArrayImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.repeated(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyImpl.java new file mode 100644 index 00000000000..a8c26a06a20 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyImpl.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng.schema; + +import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.vector.ValueVector; + +public class DummyImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Schema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Schema.java new file mode 100644 index 00000000000..a9738bdbe9f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Schema.java @@ -0,0 +1,441 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng.schema; + +import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.store.pcapng.decoder.PacketDecoder; +import org.apache.drill.exec.vector.ValueVector; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.apache.drill.exec.store.pcap.PcapFormatUtils.parseBytesToASCII; +import static org.apache.drill.exec.store.pcapng.schema.Util.setNullableLongColumnValue; + +public class Schema { + + private final static Map columns = new HashMap<>(); + + static { + columns.put("timestamp", new TimestampImpl()); + columns.put("packet_length", new PacketLenImpl()); + columns.put("type", new TypeImpl()); + columns.put("src_ip", new SrcIpImpl()); + columns.put("dst_ip", new DstIpImpl()); + columns.put("src_port", new SrcPortImpl()); + columns.put("dst_port", new DstPortImpl()); + columns.put("src_mac_address", new SrcMacImpl()); + columns.put("dst_mac_address", new DstMacImpl()); + columns.put("tcp_session", new TcpSessionImpl()); + columns.put("tcp_ack", new TcpAckImpl()); + columns.put("tcp_flags", new TcpFlags()); + columns.put("tcp_flags_ns", new TcpFlagsNsImpl()); + columns.put("tcp_flags_cwr", new TcpFlagsCwrImpl()); + columns.put("tcp_flags_ece", new TcpFlagsEceImpl()); + columns.put("tcp_flags_ece_ecn_capable", new TcpFlagsEceEcnCapableImpl()); + columns.put("tcp_flags_ece_congestion_experienced", new TcpFlagsEceCongestionExperiencedImpl()); + columns.put("tcp_flags_urg", new TcpFlagsUrgIml()); + columns.put("tcp_flags_ack", new TcpFlagsAckImpl()); + columns.put("tcp_flags_psh", new TcpFlagsPshImpl()); + columns.put("tcp_flags_rst", new TcpFlagsRstImpl()); + columns.put("tcp_flags_syn", new TcpFlagsSynImpl()); + columns.put("tcp_flags_fin", new TcpFlagsFinImpl()); + columns.put("tcp_parsed_flags", new TcpParsedFlags()); + columns.put("packet_data", new PacketDataImpl()); + } + + public static Map getColumns() { + return columns; + } + + public static Set getColumnsNames() { + return columns.keySet(); + } + + static class TimestampImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.required(TypeProtos.MinorType.TIMESTAMP); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + Util.setTimestampColumnValue(block.getTimeStamp(), vv, count); + } + } + + static class PacketLenImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.required(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + Util.setIntegerColumnValue(block.getPacketLength(), vv, count); + } + } + + static class TypeImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.VARCHAR); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableStringColumnValue(packet.getPacketType(), vv, count); + } + } + } + + static class SrcIpImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.VARCHAR); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableStringColumnValue(packet.getSrc_ip().getHostAddress(), vv, count); + } + } + } + + static class DstIpImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.VARCHAR); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableStringColumnValue(packet.getDst_ip().getHostAddress(), vv, count); + } + } + } + + static class SrcPortImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableIntegerColumnValue(packet.getSrc_port(), vv, count); + } + } + } + + static class DstPortImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableIntegerColumnValue(packet.getDst_port(), vv, count); + } + } + } + + static class SrcMacImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.VARCHAR); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableStringColumnValue(packet.getEthernetSource(), vv, count); + } + } + } + + static class DstMacImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.VARCHAR); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableStringColumnValue(packet.getEthernetDestination(), vv, count); + } + } + } + + static class TcpSessionImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.BIGINT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + setNullableLongColumnValue(packet.getSessionHash(), vv, count); + } + } + } + + static class TcpAckImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableIntegerColumnValue(packet.getAckNumber(), vv, count); + } + } + } + + static class TcpFlags implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableIntegerColumnValue(packet.getFlags(), vv, count); + } + } + } + + static class TcpFlagsNsImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x100) != 0, vv, count); + } + } + } + + static class TcpFlagsCwrImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x80) != 0, vv, count); + } + } + } + + static class TcpFlagsEceImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x40) != 0, vv, count); + } + } + } + + static class TcpFlagsEceEcnCapableImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x42) == 0x42, vv, count); + } + } + } + + static class TcpFlagsEceCongestionExperiencedImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x42) == 0x40, vv, count); + } + } + } + + static class TcpFlagsUrgIml implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x20) != 0, vv, count); + } + } + } + + static class TcpFlagsAckImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x10) != 0, vv, count); + } + } + } + + static class TcpFlagsPshImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x8) != 0, vv, count); + } + } + } + + static class TcpFlagsRstImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x4) != 0, vv, count); + } + } + } + + static class TcpFlagsSynImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x2) != 0, vv, count); + } + } + } + + static class TcpFlagsFinImpl implements Column { + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x1) != 0, vv, count); + } + } + } + + static class TcpParsedFlags implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.VARCHAR); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableStringColumnValue(packet.getParsedFlags(), vv, count); + } + } + } + + static class PacketDataImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.VARCHAR); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableStringColumnValue(parseBytesToASCII(block.getPacketData()), vv, count); + } + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Util.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Util.java new file mode 100644 index 00000000000..06e8e6ac652 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Util.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng.schema; + +import org.apache.drill.exec.vector.IntVector; +import org.apache.drill.exec.vector.NullableBigIntVector; +import org.apache.drill.exec.vector.NullableIntVector; +import org.apache.drill.exec.vector.NullableVarCharVector; +import org.apache.drill.exec.vector.TimeStampVector; +import org.apache.drill.exec.vector.ValueVector; + +import static java.nio.charset.StandardCharsets.UTF_8; + +public class Util { + static void setNullableIntegerColumnValue(final int data, final ValueVector vv, final int count) { + ((NullableIntVector.Mutator) vv.getMutator()) + .setSafe(count, data); + } + + static void setIntegerColumnValue(final int data, final ValueVector vv, final int count) { + ((IntVector.Mutator) vv.getMutator()) + .setSafe(count, data); + } + + static void setTimestampColumnValue(final long data, final ValueVector vv, final int count) { + ((TimeStampVector.Mutator) vv.getMutator()) + .setSafe(count, data / 1000); + } + + static void setNullableLongColumnValue(final long data, final ValueVector vv, final int count) { + ((NullableBigIntVector.Mutator) vv.getMutator()) + .setSafe(count, data); + } + + static void setNullableStringColumnValue(final String data, final ValueVector vv, final int count) { + ((NullableVarCharVector.Mutator) vv.getMutator()) + .setSafe(count, data.getBytes(UTF_8), 0, data.length()); + } + + static void setNullableBooleanColumnValue(final boolean data, final ValueVector vv, final int count) { + ((NullableIntVector.Mutator) vv.getMutator()) + .setSafe(count, data ? 1 : 0); + } +} \ No newline at end of file diff --git a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json index 42cddd8655a..46f162052a6 100644 --- a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json +++ b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json @@ -46,6 +46,9 @@ "pcap" : { type: "pcap" }, + "pcapng" : { + type: "pcapng" + }, "avro" : { type: "avro" }, diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java index f51fe4c89fb..e53c394ece1 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java @@ -59,6 +59,7 @@ public void test() { case "json": case "sequencefile": case "pcap": + case "pcapng": case "avro": assertEquals(d.typeName, "(type: String)", d.presentParams()); break; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngHeaders.java new file mode 100644 index 00000000000..5dcffa95c81 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngHeaders.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng; + +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.metadata.TupleSchema; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterTest; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSetBuilder; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.nio.file.Paths; + +public class TestPcapngHeaders extends ClusterTest { + @BeforeClass + public static void setupTestFiles() throws Exception { + startCluster(ClusterFixture.builder(dirTestWatcher).maxParallelization(1)); + dirTestWatcher.copyResourceToRoot(Paths.get("store", "pcapng")); + } + + @Test + public void testValidHeadersForStarQuery() throws IOException { + String query = "select * from dfs.`store/pcapng/sniff.pcapng`"; + RowSet actual = client.queryBuilder().sql(query).rowSet(); + + TupleSchema expectedSchema = new TupleSchema(); + + expectedSchema.add(MaterializedField.create("tcp_flags_ece_ecn_capable", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_flags_ece_congestion_experienced", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_flags_psh", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("type", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("tcp_flags_cwr", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("dst_ip", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("src_ip", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("tcp_flags_fin", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_flags_ece", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_flags", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_flags_ack", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("src_mac_address", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("tcp_flags_syn", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_flags_rst", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("timestamp", Types.required(TypeProtos.MinorType.TIMESTAMP))); + expectedSchema.add(MaterializedField.create("tcp_session", Types.optional(TypeProtos.MinorType.BIGINT))); + expectedSchema.add(MaterializedField.create("packet_data", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("tcp_parsed_flags", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("tcp_flags_ns", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("src_port", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("packet_length", Types.required(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_flags_urg", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_ack", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("dst_port", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("dst_mac_address", Types.optional(TypeProtos.MinorType.VARCHAR))); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + } + + @Test + public void testValidHeadersForProjection() throws IOException { + String query = "select sRc_ip, dst_IP, dst_mAc_address, src_Port, tcp_session, `Timestamp` from dfs.`store/pcapng/sniff.pcapng`"; + RowSet actual = client.queryBuilder().sql(query).rowSet(); + + TupleSchema expectedSchema = new TupleSchema(); + + expectedSchema.add(MaterializedField.create("sRc_ip", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("dst_IP", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("dst_mAc_address", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("src_Port", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_session", Types.optional(TypeProtos.MinorType.BIGINT))); + expectedSchema.add(MaterializedField.create("Timestamp", Types.required(TypeProtos.MinorType.TIMESTAMP))); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + } + + @Test + public void testValidHeadersForMissColumns() throws IOException { + String query = "select `timestamp`, `name`, `color` from dfs.`store/pcapng/sniff.pcapng`"; + RowSet actual = client.queryBuilder().sql(query).rowSet(); + + TupleSchema expectedSchema = new TupleSchema(); + + expectedSchema.add(MaterializedField.create("timestamp", Types.required(TypeProtos.MinorType.TIMESTAMP))); + expectedSchema.add(MaterializedField.create("name", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("color", Types.optional(TypeProtos.MinorType.INT))); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + } + + @Test + public void testMixColumns() throws IOException { + String query = "select src_ip, dst_ip, dst_mac_address, src_port, tcp_session, `timestamp` from dfs.`store/pcapng/sniff.pcapng`"; + RowSet actual = client.queryBuilder().sql(query).rowSet(); + + TupleSchema expectedSchema = new TupleSchema(); + + expectedSchema.add(MaterializedField.create("sRc_ip", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("dst_IP", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("dst_mAc_address", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("src_Port", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_session", Types.optional(TypeProtos.MinorType.BIGINT))); + expectedSchema.add(MaterializedField.create("Timestamp", Types.required(TypeProtos.MinorType.TIMESTAMP))); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + + String queryWithDiffOrder = "select `timestamp`, src_ip, dst_ip, src_port, tcp_session, dst_mac_address from dfs.`store/pcapng/sniff.pcapng`"; + actual = client.queryBuilder().sql(queryWithDiffOrder).rowSet(); + + expectedSchema = new TupleSchema(); + + expectedSchema.add(MaterializedField.create("timestamp", Types.required(TypeProtos.MinorType.TIMESTAMP))); + expectedSchema.add(MaterializedField.create("src_ip", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("dst_ip", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("src_port", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_session", Types.optional(TypeProtos.MinorType.BIGINT))); + expectedSchema.add(MaterializedField.create("dst_mac_address", Types.optional(TypeProtos.MinorType.VARCHAR))); + + expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + } + + @Test + public void testValidHeaderForArrayColumns() throws IOException { + // query with non-existent field + String query = "select arr[3] as arr from dfs.`store/pcapng/sniff.pcapng`"; + RowSet actual = client.queryBuilder().sql(query).rowSet(); + + TupleSchema expectedSchema = new TupleSchema(); + + expectedSchema.add(MaterializedField.create("arr", Types.optional(TypeProtos.MinorType.INT))); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + + // query with an existent field which doesn't support arrays + query = "select type[45] as arr from dfs.`store/pcapng/sniff.pcapng`"; + + expectedSchema = new TupleSchema(); + actual = client.queryBuilder().sql(query).rowSet(); + + expectedSchema.add(MaterializedField.create("arr", Types.optional(TypeProtos.MinorType.INT))); + + expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + } + + @Test + public void testValidHeaderForNestedColumns() throws IOException { + // query with non-existent field + String query = "select top['nested'] as nested from dfs.`store/pcapng/sniff.pcapng`"; + RowSet actual = client.queryBuilder().sql(query).rowSet(); + + TupleSchema expectedSchema = new TupleSchema(); + + expectedSchema.add(MaterializedField.create("nested", Types.optional(TypeProtos.MinorType.INT))); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + + // query with an existent field which doesn't support nesting + query = "select type['nested'] as nested from dfs.`store/pcapng/sniff.pcapng`"; + + expectedSchema = new TupleSchema(); + actual = client.queryBuilder().sql(query).rowSet(); + + expectedSchema.add(MaterializedField.create("nested", Types.optional(TypeProtos.MinorType.INT))); + + expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java new file mode 100644 index 00000000000..98d7b67384a --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng; + +import org.apache.drill.PlanTestBase; +import org.apache.drill.common.exceptions.UserRemoteException; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.nio.file.Paths; + +public class TestPcapngRecordReader extends PlanTestBase { + @BeforeClass + public static void setupTestFiles() { + dirTestWatcher.copyResourceToRoot(Paths.get("store", "pcapng")); + } + + @Test + public void testStarQuery() throws Exception { + Assert.assertEquals(123, testSql("select * from dfs.`store/pcapng/sniff.pcapng`")); + Assert.assertEquals(1, testSql("select * from dfs.`store/pcapng/example.pcapng`")); + } + + @Test + public void testProjectingByName() throws Exception { + Assert.assertEquals(123, testSql("select `timestamp`, packet_data, type from dfs.`store/pcapng/sniff.pcapng`")); + Assert.assertEquals(1, testSql("select src_ip, dst_ip, `timestamp` from dfs.`store/pcapng/example.pcapng`")); + } + + @Test + public void testDiffCaseQuery() throws Exception { + Assert.assertEquals(123, testSql("select `timestamp`, paCket_dAta, TyPe from dfs.`store/pcapng/sniff.pcapng`")); + Assert.assertEquals(1, testSql("select src_ip, dst_ip, `Timestamp` from dfs.`store/pcapng/example.pcapng`")); + } + + @Test + public void testProjectingMissColls() throws Exception { + Assert.assertEquals(123, testSql("select `timestamp`, `name`, `color` from dfs.`store/pcapng/sniff.pcapng`")); + Assert.assertEquals(1, testSql("select src_ip, `time` from dfs.`store/pcapng/example.pcapng`")); + } + + + @Test + public void testCountQuery() throws Exception { + testBuilder() + .sqlQuery("select count(*) as ct from dfs.`store/pcapng/sniff.pcapng`") + .ordered() + .baselineColumns("ct") + .baselineValues(123L) + .build() + .run(); + + testBuilder() + .sqlQuery("select count(*) as ct from dfs.`store/pcapng/example.pcapng`") + .ordered() + .baselineColumns("ct") + .baselineValues(1L) + .build() + .run(); + } + + @Test + public void testGroupBy() throws Exception { + Assert.assertEquals(47, testSql("select src_ip, count(1), sum(packet_length) from dfs.`store/pcapng/sniff.pcapng` group by src_ip")); + } + + @Test + public void testDistinctQuery() throws Exception { + Assert.assertEquals(119, testSql("select distinct `timestamp`, src_ip from dfs.`store/pcapng/sniff.pcapng`")); + Assert.assertEquals(1, testSql("select distinct packet_data from dfs.`store/pcapng/example.pcapng`")); + } + + @Test(expected = UserRemoteException.class) + public void testBasicQueryWithIncorrectFileName() throws Exception { + testSql("select * from dfs.`store/pcapng/snaff.pcapng`"); + } + + @Test + public void testPhysicalPlanExecutionBasedOnQuery() throws Exception { + String query = "EXPLAIN PLAN for select * from dfs.`store/pcapng/sniff.pcapng`"; + String plan = getPlanInString(query, JSON_FORMAT); + Assert.assertEquals(123, testPhysical(plan)); + } +} diff --git a/exec/java-exec/src/test/resources/store/pcapng/example.pcapng b/exec/java-exec/src/test/resources/store/pcapng/example.pcapng new file mode 100644 index 0000000000000000000000000000000000000000..002cb8d9b4f508fc8e56215736cbc179d7d9816f GIT binary patch literal 512 zcmaJ;OG^S#6#mAUDU}f-Nkm%Qh4yxQ6(kfyM!2Ym0$YUPYm7l1Wm2n^k?q>I4(v~~ z?KiZn7D0Q-tz_rw1g<)8?m6e4$9KMaoi3NN24H_<*(Z=4SR%rMJ>6(rv6#B9hJ%rK z@LV%AQ_lkuLfC0kYWZx9Mbt=Gjk1N~)B!uvnrtl+kFBzzS*@^8-E0`8Ag@@Y$pFXU z7)jC=>0kiZAmg%}HNq4-C_6z``3IZQ!+u`(#siYRTSmIy?md&w6_|!fAXB!E;}I{D zzzyAvh{69_QqtD>l@S6TQ4z>MkYL|6HSI(%>Vp0D?*3s#nf8i;B1^O*mINs(`@_jh zIp`}xq*+5I9E&ruWYeR-{TGbcmfZ@r-n}^dwh7cDJ^}Nj+le#eq9JtV+_dPBlT5a3 zb#QLo&(x2AStP0rqpq13tY%i5dO<6&+%+qVKEek3@EuOO?Vf1)De5Cymfg< G$M^+y7He4m literal 0 HcmV?d00001 diff --git a/exec/java-exec/src/test/resources/store/pcapng/sniff.pcapng b/exec/java-exec/src/test/resources/store/pcapng/sniff.pcapng new file mode 100644 index 0000000000000000000000000000000000000000..cd542bd4d638ac9d28657cd6459fdd7479470dd1 GIT binary patch literal 33464 zcmeHw2V7Ih^Y{iLBA|c`3#SoKK?$Z3z!nu0JyZlyPelwVhysZrfL-(yu@|slFIezw zXFriMpkKRU?*;4Wo&CJC%YWv*1fHRY{=EBpepo&m-oE#Cc6MfVw#?2xtICzF_7Orl zw`f|21^@F6!Hh^q#~7V5$~DN1(@w2bx_0mE#)*`0JtaJze>YCMzz~j0jWi^J!6@P!^icTRX*WR!x#m-vb#99LhNHX$+w zQXV1?o*R_QhCIG;(Hgl-!*T5ysa3`fkZA|GK?x!c5yv&2`p@NYBfNPsNw|`ac`zXE zh<9JZe?lyY8DOA{;gL%4!~*WydIiJ}5=ut-pifoqKGo?dU=GUBqIpgm1cxK(<8Zf%WtQaI-S?c;i@|J6%pM=FB)9q)2`UH<(Bd(s!}%bjHtXIAgNX8G!! zWXB(;w$^Ua!gIysk=OgyHt)W-kz)_*YYZF}aE$@!xDs$|3)dKn&{1Td;~Nt?J_bCW zCfeD)f1Xf4$Bl?5AqX4kkPHz6mLcmnOcepw7+ItrA#c`6+I4!KPtoCr=c}b8-+rnb z%9xx7&#K?AexLa5y9HuHK!m7JDa=>Uu8o*4@nVO?#)VslMX1#gQA)WiR>uz0$u$+C z)N)xAN*GID7k{bpP4W1(OR0FQq3{U*XYhDQSA?R+wWyJ*bCAXe^cbE0dUx3KizzE9 zdK`Q_NlIAuqc>Wa**<=ppZ|K9zG4*Fx}#}lZm?gQpmzS8CLI6n-2d~Y7FvQ_6cU3@rBv9St`k0LTwu8vn~o#1XS9}Z;1Y;oEccp{79 zM#n|z;0>)(8N*d;BYa|&+W1Jhl6vR^FNXyC1##PS@aqDZI#877v>TvRM=P6o3O#rp zLM)%p^Wp?V#>5S&Y*pEc5UelKe*i=bCF^tMEXtNK+118psgbsc2R>gaCvUi$)>8*#G6Z#7)Z;!R|Z(nsvQ4-Qhw$b!~L z9REOWfDD>RC#88!3yCaeI4ub>{jC{PkP{IZfXbz?*SUzL+oOiFsU&oPNzAP#Xz@ z1#R(98~2EkDL7nHSXe5;;sG#$umBdn49H?Oz?&}=73R}FL4*mhPi5_{q#L3GwG-Qt zZ#h_}*2$t;^6P0eEj2WjvCT}$$Q6inLt7T1N_(|BTBeJXH&dzAf?)$=xx@GzSFKX6 zj*eEwD3l5}KbayLbdNTw6$23#Rbs!nBI;~O`?aq9lQK&vlt7CPb zc_TGC_sD2jgtFN{jWWV;sfj7LN{HZUp|ewJ4Ugiqs)`{&?OXbG7P^Imbo6a07OVK- zDxQ$*rI7QuLXkqo6^SHDu0kjfE4;)KxguOrv3o#pOTLG|)<6TmPX-kh*D$7aLp92l zydhpZ=s!I5IR$*&Qw$%Sn0)lrMn*+(O}Y7by!0=Ymj%LE94m`2-j-sgZ&1g80B)Ng zzc&9BR(i=*GO?FX%oQqCBCb%v6L2LmxffUD?HR5RD8)R1H~-UE3F;7aBIub}bE<#JD%SSjJk#Y%6k&|9M7isT?RBAG-k^5!XpJQc45 zi4hjx_)YtWqWz6afp%C?4EG$F+%wS&#p(Dmu(9o@v(dER{|Y~4;R>0jmzRJm4(EX~ zmnr33iD$T&tMc|%io9Xg&G!_48b7IakL`a1qy3L;F+ueMq%jt4|7_>7wSTs=V+~rT z&|{;|8RV3r8?F>1UG5)9*N0=OFVqW#6p=R5fmv~S!(_-*1eY3Amt+d5EMm#~3e=29 zAoCW=y;NLpxd=KYPZ(Z^c@hOz$oCfU9@s0SawdcP9cnd^mg!v$#2^S--8{a#@aXlF;fz^X_H!b5s0FaZJKChVM8 zV7n}0Ulk=hmJ*FcT2+LJuvwW;mm^Goot9{4X7;`XR1s{zwrOX3)n9Jaz0h{aw0rp8 zn^<5|VI}53$couLBC1p{+C7-^ zFzD$-721w57TN*lSW@zE!0)*f#TKB;$z`qhA<87UCA+kb^9}5|M($XOce)v`@PSAT_ACVWf);I^#phLoT7SS=e;eo z9e<7k>WNFxU)oxteq6j3?f6?gzE1u4F*`n+(Jm&vo2$ABq`^qf3D(uuX=^Q_>U~Bp z2kX8)YY+4bukwuhW%N7mQ?z8kmMh3)8wR#fjVuOCk=&@rm_Z*HqH&Xq4Yf&z*e4T* z>Sz_dnV1&KfxTjF)#^cuRUuC|Q1zi}1xwS0_E3vK1AEJPs=uKs80ocX`fbn+?9gnYFI=0p zTvvHZmOd`pL^sg;i1d+X005HlKS&p(hv^t{>(sPT#tiLYzR0p~0(*zPg)xJMVjVr! z+f!?S3WvZLDS0sLWij*>wF>tRmC#qPlE{M-QUgT%$1*@51j|7>!+?zn*f+gsxImQ$ z^v7uEQz@B+^4z?qu+~~gjeV0*{*3ABNjJWiPB{E#@9U>0E7pyVMnO>4!GGww=}X+f~P`SJHCTo}<+lowrv`TGm0O8=8Cl*h%xIA;TkQN*nt(nSX!S z@>cWKNrLYexS}bio_f#FXdz%jq>_6sL z*SsDb$bdfn5z`(I>9>NfOYivh_k-3Ol3(|J?I1lj``-QkRjlz;*}Z4mD;;j;9J*1d z>7Vju{x_A1S681m&T4sS*9yz6;jfQY+R1v6nC9@VP0|8>*6zQ9ep&F#um!DNyH;{d zvh}a+eWJnd->w>wQf+wv2OPhUd}v|`7GaC*M9ZbxySNvo_Kw9jDF6}8x`~`m*h$^eh5D9voi5r z)TqjwjL8q;otHF<=y__S)4PC;x}K6v(yelv4t{A1L-+UbJsnZyRQT&U`}@A0mUeGh zqlqp{It<&<H*C;M&%A>WbzLumF>bBpjs3sgNIjgi(ov=jjU&2o|nr;z_fejE93 zl!vD$_r6BnSFEdvg1wS|PK6$o-g6qW6=i@f56fN3(0i8}FHj{y8e`FNS)D$yT-fVs z_u|8HDY=_nY02*o&h)wK&|&nVhjl$_tr#>@^31pPwdFHbO3jV|31;`i7;Sry%J{gm88XHOsAa_fNS%PSN0?6^e9`y*5Q zkL2{#ysB#L?>F}F?bLl$ySH9`B==_P;1xdXUO{2*c7F?=-18e3uy*m1-QVf&C5>8> zvhqOi{CTx)>+U`8F?)pS=taABuU=Mr!auvzVN$dAbE22!u&>tteq~l_w@1NM+$%V@ zf7s&~%O*N%Vd#e1j`M3Q71wT~&gypleZ;YSF1LEWPTtgZRMmL@8$Ff=*Q=P$-&y-b z<-iAh&MNQ4Sj8mHjEk@g)nBby?bpfa{_HE(T`%@ObMd)_HaSt_#d{`AX_+vOoA^>jKPqO5BL_PoD!N0`W|OaW27xoi*k& zV2?Hd3`rwwCch#puPWL$3+E{_kY5*Pd>iu=CQV1oUtPejFTFO#;%1n%0o)^RTpR0< zj5KXx@okKA`y0dz$DMWf87e8HG1BWT_g=Up#VTj8NZwnx2TG7gr7&Re_4N&bK!&TR zkU)iGDx?`ge|Q~r3;xG|@MC^m`%(CTp9p^(;ADjF*kPS29MTv~*Lqpqv0hS(Gv>Gs ziVJy^FFncoo?{6sjd^p{krPZ|JF`?BZP$M5e3BQPU_D#bb@it6 zk7i`dxL5!F@F^S8$He`hwmLO1C;gNghn3;OJvt+GZ>2YK>zqXU#ln#OSMOvTxh)^s z_`3c?i_{*0m+R%ve9*N{=c9`}s>L*p>$zN(nF|I1pfmEA$%8v(;6c+3$ODmi_Lv7x zSeZI&R=cch`fZ{=uF;~JykY>L8ijwKcNm=VVjBxO~FmkWxcZWxr zokFr7ez)=R@mi93Bd=W_a8^ClB|dj?n~?f-cn>>Vl(fpuz8d!H)wvyNG~XQhrgC7* zADwsl?E0zhk;M*+l>MwbF0cIe0qZA3?a-+2L#uJWCfyJ$%I`T}I=kOs{m~^YeEi&+ zr~ZB;pk4T!#+`fL*^yfc9S%uPR(9#To9DA{&noixRG&+glpb;Z-aa=) zb%b*xdnumxxuJR?X)|sAdcO1@?)%6?#V_uwbZ{P0HJugwe zIloyav*afRI}EB_$OGp5A)@QLg82j8j_vmNC&mh>KHPtGF0=!C8LsVnL%)!JqsEOI zM0B`yutu#5QD}Gv#<^_UEZQ{{T07f+c8>XolQS~=_nsN=nh{hXb?arD(6d!s3&)Af z@lw}b>q;9hy+*y+VfcT4yflJVIh0O2{^NR8Z%AV_)j1@f+D*_p_RU|RyytiZ*1Nix z3@2XcDb5B6AN!Yf`{K~!-#-k4y_4`Nwl!wi*5gVmdk=ad+S?w(va{=+JpI>wEPJ#? zS<9Aoj+6K~XM-nPab??WDJU!yM88OtZbq2^wK02gy}SSz$u5Mu4@ zJNPfNYYj)8J!}14RbxG`H5|R}K)nZ<)PGLOs^ZK3t>m(I(q*@>pOuVtrptz!dsV&W zVrKpnmRZ~hF%M`po797{-vswP6l|;~mbrhR?-uI+p1Ru$%N}506ZT;TFRVB5BRlcG zKZiOlgSoktuRqg|jJzVps?F5RHCY=3KQepZC+aymXQ3ZiUq(4ouBh_>4r?~7yaK@wLl}?wfSFNoeo4y~Zz$@QWYB}=S%D8w4{~Act56S`XLM>`KHnPf@2Hp58g4Y2g0tZ|3j1a^iL#GFtx!I3S9lqsiSF0twW|3Wcu=N=Z z54^YZyy7)?lx_IE_KlYAS&(Vh!iIle^o!7O#JIj8)3k?wX=Syr+M6zIGFMOj`FSj9 z>Ky9cexZA-=0DEsb|mb_JB_vn1f2KHxjA7UzuU7x!`wFq>>Sjp`y<|s-s>Cx)J8YH z!Tsr-;^vxvmo;SOh9}F@)Gr#Y6-z^dZeO^6BChwld-W!S|H8{xaGN&~_fOyd&N_iL z?7HVmbAtRqn*ozgLrUdSl|@qW*dqE5wCNi7^aO3XI%_jI@@Z5HnokY2L!||L>T$rl z)i}oiW5e{~#lpY(56i!hxAp3)=AAD(IxWqv@bW&t>7n$v_bX=wM7Mk!Y3`9CTE8aH zC1TH2kGJ(VUG8%Ec&|>!L$C8L{LtLxfZ4_TDcN@>?;4Pk$vQp%k2ALU7eh}ktPq$s zc+x7{nhSq)jC|1GY{JpoPe%2K8+77Tmu`a45cM&^q;2beI@F5nQoc@FGX3bBU5&d%IIf{Ce0bPZPx%^yI8Jd^_og{8|Uw0rQAQe(evci#%+_oo9AHTIrWd! zH7C~vwGMxlsbvj`?(T6wx~F6O2Ah|gerLV!eoNXrv*-U}q8u;TJbEj0t@`^fCw7im zzIt`Uu5s3L=00hh%e&F|R<%d5ZI88>XA?90ZULXboJIw-Pn`fhwS^SMBKAlT+U2Ac zXPuEpg?k{@=3BXKb%uSSf2C*Bd;%VA!Zuatm-eY;xF5$d3T^ym@~rHt=dg^YV6`^v zv)F|4P*}zQHdPJEu6iqZqskA`7->HQ*n?x73rPYWQgflh%rOAl`S5Raew+}&PHmF| zQB2ohi#zbHlx5iRvtYiO33>ThFlBOqHXaOFeijTKDF_+?^$YfE%>9#&X&X)U`hBq9 z^_&#;tV|jUy6hX+@7f>gjsg3L7K=Bkq;QRq-Xny*#TVZ)_xdHa1x5M^d;MkrEDQ*L z%j-=l^liaN>u-9u1GRr4eDHiI)xImPWgW;f`jK^+I2>Na*QsBdu-GW2Qp1IlFNX|e zhs7xhPg+idSKKNrWq5VjJfn7D&}ASV$mev37!aQdte;im;2IUu=7r+s5Lmc}<#DF+HnzmUbg=>r@@nps)@`Smk#3f~O!JZNtei{5c zM*%V>!MLY{3H#x^&wxGB1TZ9vgKfR&GhmM>8|=o*3SoaJr|k*h9J2;in}u2$#EX~)d< zIkaQuF9Ypt2+F=`|GIL^wo?2P=JuCD|9U|W?du!xqh^jPHtgP3+IcJHSnoj4E8oI- zE6jRlp5AFPo?`k=_>abVyTSSQ--3@YbMJGdhnc19eReyE^=_XIocZu(M9jqyftrb}Ur7leEL#kfcH;ThPYI#7K2lYPbDa}jZx7>$os zt=1`&_;CpO`x3={UP6dpfIu8k4;hC5*cZRI+EcJko9c^e7L5PkP2xgW?-TI?Lh<`{ zYx+PPD?m>^o!n^vZ^0Q!FiL|oBfyvd%QV%2an<&ivJ8gp)s;(CERv$f?FE3mI5mg^LUWNN3j~2Tp5e`_#&7mioC@StrABcwfjEeob>N5lCgaP^EvQ}Tp9BtML;F`S$tQCGRN2ABRAD^Q^ zYx&eU+VD;KFEU4a8$?y{|BPcgfI%5xo*NYJkyCzdkU@T?2y#AsK1t8G>J3htc_&zs13R?eiJ1hccnk*GAZS!z0A|)$YBjC6LBQ&nM}x1yMS0<`EgT zslU+gYHyx#-D6~r^%MZ?P#}|K*Z}yt(V)m8Uzf5BlSOJ#q3LVj1g}DvR;Jd_*G1P# z^mXV-;D^g3J)cn@Ju-a2NDsxC_EkOh?=5Y-z6|@S3!?%5Z(zKB2I&m}`IdETuWAci zW2EQm1>c9VywjJAES#&C@O>zHeQ0Db49oN25c))7Kc?)aE;NhW)_$> z&u$VWSN#@BrJ&<(!x>?=GxLl(uG5a)@G`jGQNN^Of?7L>8`#eH?L0^7STGi086Q*% zi&g0~+}PLw)bU~Z>ZAc-Ar2@LG$cheiZr6uAQ8tAL*P>Y7-|~d);sHch@A?AN&q%f3^XkIy71S?qlJcB$fLv=M^qyq zjgh7cZ+ynj!r2sEVlG`qx~v%p^YG?*M!NK_U(Y}nYUJdiQhM=x=s$(0#7E)f&GQLY z2qivzIZxhD8KV)#@{ClXC{YCt{4r9Za>RuhSrSJZ#0l%ooGT0V9w}w6+@uBeG3}v` z{IAcIvF;dXLR7znu@Wk7zp;9-XWM%|*C;9K8Feux|k0)l3fPlEw79LaOLK-8D8@*1AxS3^V zTtwW?>;&0cm}kJvZ2h!9sqE!z(q-6XV6q#zd`%kk3!KOJudGRu2J6FcD$7J znBQzs|4hWQaK8bYuZC)-bUSal`}jY8j$tyRyuI1SRVqkhG}v4N)8;<=73^O&v24OC z98*7dkuJ>YczNmG6i}*HBE-)06Gp~92!;>zw||k}Iw}iY@0|U|Rk$R8 zk=Ab~C_E_L>zlP3^;^z!&~LBvjJlUst0x7ZOl4wOSmp>S3teV2=?PU|NMoeSgtf1> zFBVf}cKK~AmN~yG$XITku}q)0^_lv<9jCq1(+h;<3ALbd$Zg-t-Acp!@?>IoMH9bq!WK3oxI}FSW}vRYESIkGSe?%G#-2%NoyYv$P*ux7rav*MH2%>U)@FAD{DfG_p2clr~| z1nX=nm&c!;DJqu-U;aqT<&q63mp4P8?x}eNa(Sa0076nx;mgS7#8Kx=**n@Cx{iGYVjUaq1IPpO3hJ0g zuZjJi>o~%I9LiaJ{gr>!-ocddTgR?2WgP0s*!y#2Hi$*Oebb&mg98OL*s&f?3WZRC z`u)fm(usv@WO_$plgKLVjA&w)$SUz{Xn0n_5!9Hi)CFun)F|V22F@YfS6;lLdI--j z8u~or`7pT8Xh&h6SAw01UPs!&=?Vb;WAs2eAOmq-00YwFdR(50MI1>xbsT;iI+g?5 vx#Aw|m=R0r-TB?)Rpy54EXc=(YvXcp1-NDgEaU$HK`f8M literal 0 HcmV?d00001 diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java index 77bf211b662..c9e6dc253b7 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java @@ -585,6 +585,10 @@ public enum CoreOperatorType * PARTITION_LIMIT = 54; */ PARTITION_LIMIT(54, 54), + /** + * PCAPNG_SUB_SCAN = 55; + */ + PCAPNG_SUB_SCAN(55, 55), ; /** @@ -807,6 +811,10 @@ public enum CoreOperatorType * PARTITION_LIMIT = 54; */ public static final int PARTITION_LIMIT_VALUE = 54; + /** + * PCAPNG_SUB_SCAN = 55; + */ + public static final int PCAPNG_SUB_SCAN_VALUE = 55; public final int getNumber() { return value; } @@ -868,6 +876,7 @@ public static CoreOperatorType valueOf(int value) { case 52: return IMAGE_SUB_SCAN; case 53: return SEQUENCE_SUB_SCAN; case 54: return PARTITION_LIMIT; + case 55: return PCAPNG_SUB_SCAN; default: return null; } } @@ -24404,7 +24413,7 @@ public Builder clearStatus() { "TATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020" + "\000\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022" + "\014\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005" + - "\022\032\n\026CANCELLATION_REQUESTED\020\006*\316\010\n\020CoreOpe" + + "\022\032\n\026CANCELLATION_REQUESTED\020\006*\343\010\n\020CoreOpe" + "ratorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAS" + "T_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE" + "\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HAS" + @@ -24432,11 +24441,11 @@ public Builder clearStatus() { "ER\0200\022\026\n\022OPEN_TSDB_SUB_SCAN\0201\022\017\n\013JSON_WRI" + "TER\0202\022\026\n\022HTPPD_LOG_SUB_SCAN\0203\022\022\n\016IMAGE_S", "UB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\0205\022\023\n\017PART" + - "ITION_LIMIT\0206*g\n\nSaslStatus\022\020\n\014SASL_UNKN" + - "OWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRES" + - "S\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B." + - "\n\033org.apache.drill.exec.protoB\rUserBitSh" + - "aredH\001" + "ITION_LIMIT\0206\022\023\n\017PCAPNG_SUB_SCAN\0207*g\n\nSa" + + "slStatus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START" + + "\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS" + + "\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org.apache.drill." + + "exec.protoB\rUserBitSharedH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java index 38ac50e2d29..2d7d49240e2 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java @@ -76,7 +76,8 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite