Skip to content

Commit

Permalink
DRILL-4241: Add Single Tablet Writer
Browse files Browse the repository at this point in the history
- Also move to a test bootstrap
- Update to the latest kudu and Drill
- Add plugin to Drill distribution
- Checkstyle and directory cleanup

This closes #314.
  • Loading branch information
jacques-n committed Jan 11, 2016
1 parent 3694909 commit 392d1f7
Show file tree
Hide file tree
Showing 18 changed files with 706 additions and 155 deletions.
1 change: 1 addition & 0 deletions contrib/pom.xml
Expand Up @@ -36,6 +36,7 @@
<module>storage-hive</module> <module>storage-hive</module>
<module>storage-mongo</module> <module>storage-mongo</module>
<module>storage-jdbc</module> <module>storage-jdbc</module>
<module>storage-kudu</module>
<module>sqlline</module> <module>sqlline</module>
<module>data</module> <module>data</module>
<module>gis</module> <module>gis</module>
Expand Down
114 changes: 96 additions & 18 deletions contrib/storage-kudu/pom.xml
Expand Up @@ -16,58 +16,47 @@
<parent> <parent>
<artifactId>drill-contrib-parent</artifactId> <artifactId>drill-contrib-parent</artifactId>
<groupId>org.apache.drill.contrib</groupId> <groupId>org.apache.drill.contrib</groupId>
<version>1.3.0</version> <version>1.5.0-SNAPSHOT</version>
</parent> </parent>


<artifactId>drill-storage-kudu</artifactId> <artifactId>drill-kudu-storage</artifactId>
<version>1.3.0-SNAPSHOT</version>


<name>contrib/kudu-storage-plugin</name> <name>contrib/kudu-storage-plugin</name>


<properties>
<drill.version>1.3.0</drill.version>
</properties>


<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.apache.drill.exec</groupId> <groupId>org.apache.drill.exec</groupId>
<artifactId>drill-java-exec</artifactId> <artifactId>drill-java-exec</artifactId>
<version>${drill.version}</version> <version>${project.version}</version>
</dependency> </dependency>


<!-- Test dependencies --> <!-- Test dependencies -->
<dependency> <dependency>
<groupId>org.apache.drill.exec</groupId> <groupId>org.apache.drill.exec</groupId>
<artifactId>drill-java-exec</artifactId> <artifactId>drill-java-exec</artifactId>
<classifier>tests</classifier> <classifier>tests</classifier>
<version>${drill.version}</version> <version>${project.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>


<dependency> <dependency>
<groupId>org.apache.drill</groupId> <groupId>org.apache.drill</groupId>
<artifactId>drill-common</artifactId> <artifactId>drill-common</artifactId>
<classifier>tests</classifier> <classifier>tests</classifier>
<version>${drill.version}</version> <version>${project.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>


<dependency> <dependency>
<groupId>org.kududb</groupId> <groupId>org.kududb</groupId>
<artifactId>kudu-client</artifactId> <artifactId>kudu-client</artifactId>
<version>0.5.0</version> <version>0.6.0</version>
</dependency> </dependency>


</dependencies> </dependencies>


<repositories> <repositories>
<repository>
<id>drill-1016</id>
<url>https://repository.apache.org/content/repositories/orgapachedrill-1016/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository> <repository>
<id>cdh.repo</id> <id>cdh.repo</id>
<name>Cloudera Repositories</name> <name>Cloudera Repositories</name>
Expand All @@ -78,9 +67,98 @@
</repository> </repository>
</repositories> </repositories>


<pluginRepositories>
<pluginRepository>
<id>apache-snapshots</id>
<url>https://repository.apache.org/content/groups/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>

</pluginRepository>
</pluginRepositories>
<build> <build>
<plugins> <plugins>

<plugin>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution> <!-- copy all templates/data in the same location to compile them at once -->
<id>copy-fmpp-resources</id>
<phase>initialize</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/codegen</outputDirectory>
<resources>
<resource>
<directory>src/main/codegen</directory>
<filtering>false</filtering>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
<executions>
<!-- Extract ValueVectorTypes.tdd from drill-vector.jar and put
it under ${project.build.directory}/codegen/data where all freemarker data
files are. -->
<execution>
<id>unpack-vector-types</id>
<phase>initialize</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.drill.exec</groupId>
<artifactId>vector</artifactId>
<version>${project.version}</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/</outputDirectory>
<includes>codegen/data/ValueVectorTypes.tdd</includes>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
<plugin> <!-- generate sources from fmpp -->
<groupId>org.apache.drill.tools</groupId>
<artifactId>drill-fmpp-maven-plugin</artifactId>
<version>${project.version}</version>
<dependencies>
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
<version>2.3.19</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>generate-fmpp</id>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<config>${project.build.directory}/codegen/config.fmpp</config>
<output>${project.build.directory}/generated-sources</output>
<templates>${project.build.directory}/codegen/templates</templates>
</configuration>
</execution>
</executions>
</plugin>
</plugins> </plugins>
</build> </build>


Expand Down
23 changes: 23 additions & 0 deletions contrib/storage-kudu/src/main/codegen/config.fmpp
@@ -0,0 +1,23 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http:# www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

data: {
vv: tdd(../data/ValueVectorTypes.tdd),

}
freemarkerLinks: {
includes: includes/
}
175 changes: 175 additions & 0 deletions contrib/storage-kudu/src/main/codegen/templates/KuduRecordWriter.java
@@ -0,0 +1,175 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

<@pp.dropOutputFile />
<@pp.changeOutputFile name="org/apache/drill/exec/store/kudu/KuduRecordWriter.java" />
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.drill.exec.store.kudu;

import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
import org.apache.drill.exec.vector.complex.fn.JsonOutput;
import java.io.IOException;
import java.lang.UnsupportedOperationException;
import java.util.List;
import com.google.common.collect.Lists;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.holders.*;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
import org.apache.drill.exec.vector.*;
import org.apache.drill.exec.util.DecimalUtility;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.io.api.Binary;
import io.netty.buffer.DrillBuf;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.common.types.TypeProtos;
import org.joda.time.DateTimeUtils;
import java.io.IOException;
import java.lang.UnsupportedOperationException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import com.google.common.collect.Lists;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.holders.*;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
import org.apache.drill.exec.vector.*;
import org.apache.drill.exec.util.DecimalUtility;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.io.api.Binary;
import io.netty.buffer.DrillBuf;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.common.types.TypeProtos;
import org.joda.time.DateTimeUtils;
import java.io.IOException;
import java.lang.UnsupportedOperationException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.kududb.client.*;
import org.apache.drill.exec.store.*;

public abstract class KuduRecordWriter extends AbstractRecordWriter implements RecordWriter {

private PartialRow row;

public void setUp(PartialRow row) {
this.row = row;
}

<#list vv.types as type>
<#list type.minor as minor>
<#list vv.modes as mode>

<#if mode.prefix == "Repeated" ||
minor.class == "TinyInt" ||
minor.class == "UInt1" ||
minor.class == "UInt2" ||
minor.class == "SmallInt" ||
minor.class == "Time" ||
minor.class == "Decimal9" ||
minor.class == "Decimal18" ||
minor.class == "Date" ||
minor.class == "UInt4" ||
minor.class == "Decimal28Sparse" ||
minor.class == "Decimal38Sparse" ||
minor.class?contains("Interval")
>
<#else>
@Override
public FieldConverter getNew${mode.prefix}${minor.class}Converter(int fieldId, String fieldName, FieldReader reader) {
return new ${mode.prefix}${minor.class}KuduConverter(fieldId, fieldName, reader);
}
public class ${mode.prefix}${minor.class}KuduConverter extends FieldConverter {
private Nullable${minor.class}Holder holder = new Nullable${minor.class}Holder();
public ${mode.prefix}${minor.class}KuduConverter(int fieldId, String fieldName, FieldReader reader) {
super(fieldId, fieldName, reader);
}
@Override
public void writeField() throws IOException {
<#if mode.prefix == "Nullable" >
if (!reader.isSet()) {
return;
}
</#if>
reader.read(holder);
<#if minor.class == "Float4">
row.addFloat(fieldId, holder.value);
<#elseif minor.class == "TimeStamp">
row.addLong(fieldId, holder.value*1000);
<#elseif minor.class == "Int">
row.addInt(fieldId, holder.value);
<#elseif minor.class == "BigInt">
row.addLong(fieldId, holder.value);
<#elseif minor.class == "Float8">
row.addDouble(fieldId, holder.value);
<#elseif minor.class == "Bit">
row.addBoolean(fieldId, holder.value == 1);
<#elseif minor.class == "VarChar" >
byte[] bytes = new byte[holder.end - holder.start];
holder.buffer.getBytes(holder.start, bytes);
row.addStringUtf8(fieldId, bytes);
<#elseif minor.class == "VarBinary">
byte[] bytes = new byte[holder.end - holder.start];
holder.buffer.getBytes(holder.start, bytes);
row.addBinary(fieldId, bytes);
reader.read(holder);
<#else>
throw new UnsupportedOperationException();
</#if>
}
}
</#if>
</#list>
</#list>
</#list>
}

0 comments on commit 392d1f7

Please sign in to comment.