Skip to content

Commit

Permalink
Merge 82aec87 into d5a2c69
Browse files Browse the repository at this point in the history
  • Loading branch information
ravipesala committed Dec 27, 2018
2 parents d5a2c69 + 82aec87 commit 1013480
Show file tree
Hide file tree
Showing 11 changed files with 299 additions and 65 deletions.
6 changes: 6 additions & 0 deletions datamap/examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-spark2</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
Expand Down
19 changes: 14 additions & 5 deletions examples/spark2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-spark2</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
Expand Down Expand Up @@ -70,11 +76,8 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-repl_${scala.binary.version}</artifactId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand All @@ -99,6 +102,12 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
Expand Down
6 changes: 6 additions & 0 deletions integration/hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-spark2</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
Expand Down
3 changes: 2 additions & 1 deletion integration/spark-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
47 changes: 42 additions & 5 deletions integration/spark-datasource/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,8 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-repl_${scala.binary.version}</artifactId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down Expand Up @@ -315,5 +312,45 @@
</plugins>
</build>
</profile>
<profile>
<id>spark-2.2-cdh</id>
<properties>
<spark.version>2.2.0.cloudera2</spark.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.8</scala.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<excludes>
<exclude>src/main/spark2.3plus</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/spark2.1andspark2.2</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql;

import java.lang.reflect.Array;
import java.lang.reflect.Field;
import java.lang.reflect.Method;

import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;

import org.apache.spark.sql.execution.vectorized.ColumnVector;

/**
* This class uses the java reflection to create parquet dictionary class as CDH distribution uses
* twitter parquet instead of apache parquet.
*/
@InterfaceAudience.Internal
public class CarbonDictionaryReflectionUtil {

private static final boolean isApacheParquet;

static {
boolean isApache = true;
try {
createClass("org.apache.parquet.column.Encoding");
} catch (Exception e) {
isApache = false;
}
isApacheParquet = isApache;
}

public static Object generateDictionary(CarbonDictionary dictionary) {
Class binary = createClass(getQualifiedName("parquet.io.api.Binary"));
Object binaries = Array.newInstance(binary, dictionary.getDictionarySize());
try {
for (int i = 0; i < dictionary.getDictionarySize(); i++) {
Object binaryValue = invokeStaticMethod(binary, "fromReusedByteArray",
new Object[] { dictionary.getDictionaryValue(i) }, new Class[] { byte[].class });
Array.set(binaries, i, binaryValue);
}
;
Class bytesInputClass = createClass(getQualifiedName("parquet.bytes.BytesInput"));
Object bytesInput = invokeStaticMethod(bytesInputClass, "from", new Object[] { new byte[0] },
new Class[] { byte[].class });

Class dictPageClass = createClass(getQualifiedName("parquet.column.page.DictionaryPage"));
Class encodingClass = createClass(getQualifiedName("parquet.column.Encoding"));
Object plainEncoding = invokeStaticMethod(encodingClass, "valueOf", new Object[] { "PLAIN" },
new Class[] { String.class });

Object dictPageObj =
dictPageClass.getDeclaredConstructor(bytesInputClass, int.class, encodingClass)
.newInstance(bytesInput, 0, plainEncoding);
Class plainDict = createClass(getQualifiedName(
"parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary"));
Object plainDictionary =
plainDict.getDeclaredConstructor(dictPageClass).newInstance(dictPageObj);
Field field = plainDict.getDeclaredField("binaryDictionaryContent");
field.setAccessible(true);
field.set(plainDictionary, binaries);
return plainDictionary;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static Object invokeStaticMethod(Class className, String methodName, Object[] values,
Class[] classes) throws Exception {
Method method = className.getMethod(methodName, classes);
return method.invoke(null, values);
}

private static Class createClass(String className) {
try {
return Class.forName(className);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}

private static String getQualifiedName(String className) {
if (isApacheParquet) {
return "org.apache." + className;
} else {
return className;
}
}

public static void setDictionary(ColumnVector vector, Object dictionary) {
try {
Class<?> aClass = vector.getClass();
while (!aClass.getSimpleName().equals("ColumnVector")) {
aClass = aClass.getSuperclass();
}
Field field = aClass.getDeclaredField("dictionary");
field.setAccessible(true);
field.set(vector, dictionary);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
import org.apache.carbondata.core.scan.scanner.LazyPageLoader;

import org.apache.parquet.column.Encoding;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.vectorized.ColumnVector;
Expand Down Expand Up @@ -306,10 +305,9 @@ public void putDictionaryInt(int rowId, int value) {

public void setDictionary(CarbonDictionary dictionary) {
if (null != dictionary) {
CarbonDictionaryWrapper dictionaryWrapper =
new CarbonDictionaryWrapper(Encoding.PLAIN, dictionary);
vector.setDictionary(dictionaryWrapper);
this.dictionary = dictionaryWrapper;
Object dictionaryObj = CarbonDictionaryReflectionUtil.generateDictionary(dictionary);
CarbonDictionaryReflectionUtil.setDictionary(vector, dictionaryObj);
CarbonDictionaryReflectionUtil.setDictionary(this, dictionaryObj);
} else {
vector.setDictionary(null);
}
Expand Down
Loading

0 comments on commit 1013480

Please sign in to comment.