Skip to content

Commit

Permalink
Merge 792c15a into 53dbb45
Browse files Browse the repository at this point in the history
  • Loading branch information
SteNicholas committed Dec 20, 2018
2 parents 53dbb45 + 792c15a commit eec034d
Show file tree
Hide file tree
Showing 11 changed files with 256 additions and 241 deletions.
2 changes: 1 addition & 1 deletion examples/flink/pom.xml
Expand Up @@ -57,7 +57,7 @@
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-examples-spark2</artifactId>
<artifactId>carbondata-examples</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
Expand Down
16 changes: 13 additions & 3 deletions examples/spark2/pom.xml
Expand Up @@ -26,14 +26,19 @@
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>carbondata-examples-spark2</artifactId>
<name>Apache CarbonData :: Spark2 Examples</name>
<artifactId>carbondata-examples</artifactId>
<name>Apache CarbonData :: Examples</name>

<properties>
<dev.path>${basedir}/../../dev</dev.path>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-hive</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-spark2</artifactId>
Expand Down Expand Up @@ -78,7 +83,12 @@
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
<version>4.3.4</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.3-alpha1</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
Expand Down
Expand Up @@ -14,14 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.hiveexample
package org.apache.carbondata.examples

import java.io.File
import java.sql.{DriverManager, ResultSet, Statement}

import org.apache.spark.sql.SparkSession

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.examples.util.ExampleUtils
import org.apache.carbondata.hive.server.HiveEmbeddedServer2

// scalastyle:off println
Expand All @@ -30,44 +33,38 @@ object HiveExample {
private val driverName: String = "org.apache.hive.jdbc.HiveDriver"

def main(args: Array[String]) {
val rootPath = new File(this.getClass.getResource("/").getPath
+ "../../../..").getCanonicalPath
val store = s"$rootPath/integration/hive/target/store"
val warehouse = s"$rootPath/integration/hive/target/warehouse"
val metaStore_Db = s"$rootPath/integration/hive/target/carbon_metaStore_db"
val logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
var resultId = ""
var resultName = ""
var resultSalary = ""

val carbonSession = ExampleUtils.createCarbonSession("HiveExample")
exampleBody(carbonSession, CarbonProperties.getStorePath
+ CarbonCommonConstants.FILE_SEPARATOR
+ CarbonCommonConstants.DATABASE_DEFAULT_NAME)
carbonSession.close()

import org.apache.spark.sql.CarbonSession._
System.exit(0)
}

val carbonSession = SparkSession
.builder()
.master("local")
.appName("HiveExample")
.config("carbonSession.sql.warehouse.dir", warehouse).enableHiveSupport()
.getOrCreateCarbonSession(
store, metaStore_Db)
def exampleBody(sparkSession: SparkSession, store: String): Unit = {
val logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val rootPath = new File(this.getClass.getResource("/").getPath
+ "../../../..").getCanonicalPath

carbonSession.sql("""DROP TABLE IF EXISTS HIVE_CARBON_EXAMPLE""".stripMargin)
sparkSession.sql("""DROP TABLE IF EXISTS HIVE_CARBON_EXAMPLE""".stripMargin)

carbonSession
.sql(
"""CREATE TABLE HIVE_CARBON_EXAMPLE (ID int,NAME string,SALARY double) STORED BY
|'CARBONDATA' """
.stripMargin)
sparkSession.sql(
s"""
| CREATE TABLE HIVE_CARBON_EXAMPLE
| (ID int,NAME string,SALARY double)
| STORED BY 'carbondata'
""".stripMargin)

carbonSession.sql(
sparkSession.sql(
s"""
LOAD DATA LOCAL INPATH '$rootPath/integration/hive/src/main/resources/data.csv' INTO
TABLE
HIVE_CARBON_EXAMPLE
""")
carbonSession.sql("SELECT * FROM HIVE_CARBON_EXAMPLE").show()
| LOAD DATA LOCAL INPATH '$rootPath/examples/spark2/src/main/resources/sample.csv'
| INTO TABLE HIVE_CARBON_EXAMPLE
""".stripMargin)

sparkSession.sql("SELECT * FROM HIVE_CARBON_EXAMPLE").show()

carbonSession.stop()
sparkSession.stop()

try {
Class.forName(driverName)
Expand All @@ -85,25 +82,35 @@ object HiveExample {

logger.info(s"============HIVE CLI IS STARTED ON PORT $port ==============")

statement.execute("CREATE TABLE IF NOT EXISTS " + "HIVE_CARBON_EXAMPLE " +
" (ID int, NAME string,SALARY double)")
statement
.execute(
"ALTER TABLE HIVE_CARBON_EXAMPLE SET FILEFORMAT INPUTFORMAT \"org.apache.carbondata." +
"hive.MapredCarbonInputFormat\"OUTPUTFORMAT \"org.apache.carbondata.hive." +
"MapredCarbonOutputFormat\"SERDE \"org.apache.carbondata.hive." +
"CarbonHiveSerDe\" ")
statement.execute(
s"""
| CREATE TABLE IF NOT EXISTS HIVE_CARBON_EXAMPLE
| (ID int, NAME string,SALARY double)
| ROW FORMAT SERDE 'org.apache.carbondata.hive.CarbonHiveSerDe'
| WITH SERDEPROPERTIES ('mapreduce.input.carboninputformat.databaseName'='default',
| 'mapreduce.input.carboninputformat.tableName'='HIVE_CARBON_EXAMPLE')
""".stripMargin)

statement.execute(
s"""
| ALTER TABLE HIVE_CARBON_EXAMPLE
| SET FILEFORMAT
| INPUTFORMAT \"org.apache.carbondata.hive.MapredCarbonInputFormat\"
| OUTPUTFORMAT \"org.apache.carbondata.hive.MapredCarbonOutputFormat\"
| SERDE \"org.apache.carbondata.hive.CarbonHiveSerDe\"
""".stripMargin)

statement
.execute(
"ALTER TABLE HIVE_CARBON_EXAMPLE SET LOCATION " +
s"'file:///$store/hive_carbon_example' ")

val sql = "SELECT * FROM HIVE_CARBON_EXAMPLE"
s"'file:///$store/hive_carbon_example' ")

val resultSet: ResultSet = statement.executeQuery(sql)
val resultSet: ResultSet = statement.executeQuery("SELECT * FROM HIVE_CARBON_EXAMPLE")

var rowsFetched = 0
var resultId = ""
var resultName = ""
var resultSalary = ""

while (resultSet.next) {
if (rowsFetched == 0) {
Expand All @@ -130,6 +137,7 @@ object HiveExample {
rowsFetched = rowsFetched + 1
}
println(s"******Total Number Of Rows Fetched ****** $rowsFetched")
assert(rowsFetched == 2)

logger.info("Fetching the Individual Columns ")

Expand Down Expand Up @@ -158,8 +166,9 @@ object HiveExample {
}
individualColRowsFetched = individualColRowsFetched + 1
}
println(" ********** Total Rows Fetched When Quering The Individual Column **********" +
s"$individualColRowsFetched")
println(" ********** Total Rows Fetched When Quering The Individual Columns **********" +
s"$individualColRowsFetched")
assert(individualColRowsFetched == 2)

logger.info("Fetching the Out Of Order Columns ")

Expand Down Expand Up @@ -191,7 +200,10 @@ object HiveExample {
}
outOfOrderColFetched = outOfOrderColFetched + 1
}
println(" ********** Total Rows Fetched When Quering The Out Of Order Columns **********" +
s"$outOfOrderColFetched")
assert(outOfOrderColFetched == 2)

hiveEmbeddedServer2.stop()
}

}
Expand Up @@ -24,9 +24,6 @@ import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties


// scalastyle:off println

object ExampleUtils {

def currentPath: String = new File(this.getClass.getResource("/").getPath + "../../")
Expand Down Expand Up @@ -113,5 +110,3 @@ object ExampleUtils {
spark.sql(s"DROP TABLE IF EXISTS $tableName")
}
}
// scalastyle:on println

Expand Up @@ -19,12 +19,12 @@ package org.apache.carbondata.examplesCI

import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll

import org.apache.carbondata.examples._
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.examples.sdk.CarbonReaderExample
import org.apache.carbondata.examples.sql.JavaCarbonSessionExample
import org.apache.spark.sql.test.TestQueryExecutor

/**
* Test suite for examples
Expand Down Expand Up @@ -123,4 +123,8 @@ class RunExamples extends QueryTest with BeforeAndAfterAll {
test("CarbonReaderExample") {
CarbonReaderExample.main(null)
}
}

test("HiveExample") {
HiveExample.exampleBody(spark, TestQueryExecutor.warehouse)
}
}
Expand Up @@ -29,6 +29,7 @@
import org.apache.carbondata.core.datastore.block.BlockletInfos;
import org.apache.carbondata.core.datastore.block.Distributable;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.util.CarbonProperties;
Expand Down Expand Up @@ -68,6 +69,8 @@ public class CarbonHiveInputSplit extends FileSplit

private List<UpdateVO> invalidTimestampsList;

private BlockletDetailInfo detailInfo;

public CarbonHiveInputSplit() {
segmentId = null;
taskId = "0";
Expand Down Expand Up @@ -124,9 +127,12 @@ public static List<TableBlockInfo> createBlocks(List<CarbonHiveInputSplit> split
BlockletInfos blockletInfos =
new BlockletInfos(split.getNumberOfBlocklets(), 0, split.getNumberOfBlocklets());
try {
tableBlockInfoList.add(
new TableBlockInfo(split.getPath().toString(), split.getStart(), split.getSegmentId(),
split.getLocations(), split.getLength(), blockletInfos, split.getVersion(), null));
TableBlockInfo blockInfo = new TableBlockInfo(split.getPath().toString(), split.getStart(),
split.getSegmentId(), split.getLocations(), split.getLength(), blockletInfos,
split.getVersion(), null);
blockInfo.setDetailInfo(split.getDetailInfo());
blockInfo.setBlockOffset(split.getDetailInfo().getBlockFooterOffset());
tableBlockInfoList.add(blockInfo);
} catch (IOException e) {
throw new RuntimeException("fail to get location of split: " + split, e);
}
Expand All @@ -138,9 +144,13 @@ public static TableBlockInfo getTableBlockInfo(CarbonHiveInputSplit inputSplit)
BlockletInfos blockletInfos =
new BlockletInfos(inputSplit.getNumberOfBlocklets(), 0, inputSplit.getNumberOfBlocklets());
try {
return new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(),
TableBlockInfo blockInfo =
new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(),
inputSplit.getSegmentId(), inputSplit.getLocations(), inputSplit.getLength(),
blockletInfos, inputSplit.getVersion(), null);
blockInfo.setDetailInfo(inputSplit.getDetailInfo());
blockInfo.setBlockOffset(inputSplit.getDetailInfo().getBlockFooterOffset());
return blockInfo;
} catch (IOException e) {
throw new RuntimeException("fail to get location of split: " + inputSplit, e);
}
Expand All @@ -160,6 +170,11 @@ public String getSegmentId() {
for (int i = 0; i < numInvalidSegment; i++) {
invalidSegments.add(in.readUTF());
}
boolean detailInfoExists = in.readBoolean();
if (detailInfoExists) {
detailInfo = new BlockletDetailInfo();
detailInfo.readFields(in);
}
this.numberOfBlocklets = in.readInt();
}

Expand All @@ -172,6 +187,10 @@ public String getSegmentId() {
for (String invalidSegment : invalidSegments) {
out.writeUTF(invalidSegment);
}
out.writeBoolean(detailInfo != null);
if (detailInfo != null) {
detailInfo.write(out);
}
out.writeInt(numberOfBlocklets);
}

Expand Down Expand Up @@ -304,4 +323,12 @@ public String getBucketId() {
public Map<String, String> getBlockStorageIdMap() {
return blockStorageIdMap;
}

public BlockletDetailInfo getDetailInfo() {
return detailInfo;
}

public void setDetailInfo(BlockletDetailInfo detailInfo) {
this.detailInfo = detailInfo;
}
}
Expand Up @@ -22,6 +22,7 @@

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.exception.InvalidConfigurationException;
import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.SchemaReader;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
Expand Down Expand Up @@ -106,9 +107,17 @@ private static CarbonTable getCarbonTable(Configuration configuration, String pa
CarbonInputSplit split;
for (int i = 0; i < splitList.size(); i++) {
split = (CarbonInputSplit) splitList.get(i);
splits[i] = new CarbonHiveInputSplit(split.getSegmentId(), split.getPath(), split.getStart(),
split.getLength(), split.getLocations(), split.getNumberOfBlocklets(), split.getVersion(),
split.getBlockStorageIdMap());
CarbonHiveInputSplit inputSplit = new CarbonHiveInputSplit(split.getSegmentId(),
split.getPath(), split.getStart(), split.getLength(),
split.getLocations(), split.getNumberOfBlocklets(),
split.getVersion(), split.getBlockStorageIdMap());
BlockletDetailInfo info = new BlockletDetailInfo();
info.setBlockSize(split.getLength());
info.setBlockFooterOffset(split.getDetailInfo().getBlockFooterOffset());
info.setVersionNumber(split.getVersion().number());
info.setUseMinMaxForPruning(false);
inputSplit.setDetailInfo(info);
splits[i] = inputSplit;
}
return splits;
}
Expand Down

0 comments on commit eec034d

Please sign in to comment.