Skip to content

Commit

Permalink
Merge 20733b2 into 894216e
Browse files Browse the repository at this point in the history
  • Loading branch information
dhatchayani committed May 19, 2019
2 parents 894216e + 20733b2 commit f9480da
Show file tree
Hide file tree
Showing 12 changed files with 120 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,36 @@ package org.apache.carbondata.examples
import java.io.File
import java.sql.{DriverManager, ResultSet, Statement}

import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.permission.{FsAction, FsPermission}

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.examples.util.ExampleUtils
import org.apache.carbondata.hive.server.HiveEmbeddedServer2
import org.apache.carbondata.hive.test.server.HiveEmbeddedServer2

// scalastyle:off println
object HiveExample {

private val driverName: String = "org.apache.hive.jdbc.HiveDriver"

def main(args: Array[String]) {
val carbonSession = ExampleUtils.createCarbonSession("HiveExample")
exampleBody(carbonSession, CarbonProperties.getStorePath
+ CarbonCommonConstants.FILE_SEPARATOR
+ CarbonCommonConstants.DATABASE_DEFAULT_NAME)
carbonSession.stop()
val rootPath = new File(this.getClass.getResource("/").getPath
+ "../../../..").getCanonicalPath
private val targetLoc = s"$rootPath/examples/spark2/target"
val metaStoreLoc = s"$targetLoc/metastore_db"
val storeLocation = s"$targetLoc/store"
val logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)


def main(args: Array[String]) {
createCarbonTable(storeLocation)
readFromHive
System.exit(0)
}

def exampleBody(carbonSession: SparkSession, store: String): Unit = {
val logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val rootPath = new File(this.getClass.getResource("/").getPath
+ "../../../..").getCanonicalPath
def createCarbonTable(store: String): Unit = {

val carbonSession = ExampleUtils.createCarbonSession("HiveExample")

carbonSession.sql("""DROP TABLE IF EXISTS HIVE_CARBON_EXAMPLE""".stripMargin)

Expand All @@ -56,14 +59,44 @@ object HiveExample {
| STORED BY 'carbondata'
""".stripMargin)

val inputPath = FileFactory
.getUpdatedFilePath(s"$rootPath/examples/spark2/src/main/resources/sample.csv")

carbonSession.sql(
s"""
| LOAD DATA LOCAL INPATH '$rootPath/examples/spark2/src/main/resources/sample.csv'
| LOAD DATA LOCAL INPATH '$inputPath'
| INTO TABLE HIVE_CARBON_EXAMPLE
""".stripMargin)

carbonSession.sql(
s"""
| LOAD DATA LOCAL INPATH '$inputPath'
| INTO TABLE HIVE_CARBON_EXAMPLE
""".stripMargin)

carbonSession.sql("SELECT * FROM HIVE_CARBON_EXAMPLE").show()

carbonSession.close()

// delete the already existing lock on metastore so that new derby instance
// for HiveServer can run on the same metastore
checkAndDeleteDBLock

}

def checkAndDeleteDBLock: Unit = {
val dbLockPath = FileFactory.getUpdatedFilePath(s"$metaStoreLoc/db.lck")
val dbexLockPath = FileFactory.getUpdatedFilePath(s"$metaStoreLoc/dbex.lck")
if(FileFactory.isFileExist(dbLockPath)) {
FileFactory.deleteFile(dbLockPath, FileFactory.getFileType(dbLockPath))
}
if(FileFactory.isFileExist(dbexLockPath)) {
FileFactory.deleteFile(dbexLockPath, FileFactory.getFileType(dbexLockPath))
}
}


def readFromHive: Unit = {
try {
Class.forName(driverName)
}
Expand All @@ -72,37 +105,19 @@ object HiveExample {
classNotFoundException.printStackTrace()
}

// make HDFS writable
val path = new Path(targetLoc)
val fileSys = path.getFileSystem(FileFactory.getConfiguration)
fileSys.setPermission(path, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))

val hiveEmbeddedServer2 = new HiveEmbeddedServer2()
hiveEmbeddedServer2.start()
hiveEmbeddedServer2.start(targetLoc)
val port = hiveEmbeddedServer2.getFreePort
val connection = DriverManager.getConnection(s"jdbc:hive2://localhost:$port/default", "", "")
val statement: Statement = connection.createStatement

logger.info(s"============HIVE CLI IS STARTED ON PORT $port ==============")

statement.execute(
s"""
| CREATE TABLE IF NOT EXISTS HIVE_CARBON_EXAMPLE
| (ID int, NAME string,SALARY double)
| ROW FORMAT SERDE 'org.apache.carbondata.hive.CarbonHiveSerDe'
| WITH SERDEPROPERTIES ('mapreduce.input.carboninputformat.databaseName'='default',
| 'mapreduce.input.carboninputformat.tableName'='HIVE_CARBON_EXAMPLE')
""".stripMargin)

statement.execute(
s"""
| ALTER TABLE HIVE_CARBON_EXAMPLE
| SET FILEFORMAT
| INPUTFORMAT \"org.apache.carbondata.hive.MapredCarbonInputFormat\"
| OUTPUTFORMAT \"org.apache.carbondata.hive.MapredCarbonOutputFormat\"
| SERDE \"org.apache.carbondata.hive.CarbonHiveSerDe\"
""".stripMargin)

statement
.execute(
"ALTER TABLE HIVE_CARBON_EXAMPLE SET LOCATION " +
s"'file:///$store/hive_carbon_example' ")

val resultSet: ResultSet = statement.executeQuery("SELECT * FROM HIVE_CARBON_EXAMPLE")

var rowsFetched = 0
Expand Down Expand Up @@ -135,7 +150,7 @@ object HiveExample {
rowsFetched = rowsFetched + 1
}
println(s"******Total Number Of Rows Fetched ****** $rowsFetched")
assert(rowsFetched == 2)
assert(rowsFetched == 4)

logger.info("Fetching the Individual Columns ")

Expand Down Expand Up @@ -166,7 +181,7 @@ object HiveExample {
}
println(" ********** Total Rows Fetched When Quering The Individual Columns **********" +
s"$individualColRowsFetched")
assert(individualColRowsFetched == 2)
assert(individualColRowsFetched == 4)

logger.info("Fetching the Out Of Order Columns ")

Expand Down Expand Up @@ -200,7 +215,7 @@ object HiveExample {
}
println(" ********** Total Rows Fetched When Quering The Out Of Order Columns **********" +
s"$outOfOrderColFetched")
assert(outOfOrderColFetched == 2)
assert(outOfOrderColFetched == 4)

hiveEmbeddedServer2.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class RunExamples extends QueryTest with BeforeAndAfterAll {
}

test("HiveExample") {
HiveExample.exampleBody(spark, TestQueryExecutor.warehouse)
HiveExample.createCarbonTable(TestQueryExecutor.warehouse)
HiveExample.readFromHive
}
}
9 changes: 4 additions & 5 deletions integration/hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,6 @@
</exclusions>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-spark2</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service</artifactId>
Expand Down Expand Up @@ -108,6 +103,10 @@
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,10 @@ public CarbonHiveInputSplit(String segmentId, Path path, long start, long length
}

public CarbonHiveInputSplit(String segmentId, Path path, long start, long length,
String[] locations, int numberOfBlocklets, ColumnarFormatVersion version) {
String[] locations, int numberOfBlocklets, ColumnarFormatVersion version,
BlockletDetailInfo detailInfo) {
this(segmentId, path, start, length, locations, version);
this.detailInfo = detailInfo;
this.numberOfBlocklets = numberOfBlocklets;
}

Expand All @@ -110,8 +112,8 @@ public CarbonHiveInputSplit(String segmentId, Path path, long start, long length
*/
public CarbonHiveInputSplit(String segmentId, Path path, long start, long length,
String[] locations, int numberOfBlocklets, ColumnarFormatVersion version,
Map<String, String> blockStorageIdMap) {
this(segmentId, path, start, length, locations, numberOfBlocklets, version);
Map<String, String> blockStorageIdMap, BlockletDetailInfo detailInfo) {
this(segmentId, path, start, length, locations, numberOfBlocklets, version, detailInfo);
this.blockStorageIdMap = blockStorageIdMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
* It transparently passes the object to/from the Carbon file reader/writer.
*/
@SerDeSpec(schemaProps = { serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES })
class CarbonHiveSerDe extends AbstractSerDe {
public class CarbonHiveSerDe extends AbstractSerDe {
private final SerDeStats stats;
private ObjectInspector objInspector;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
package org.apache.carbondata.hive;

import java.io.IOException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.exception.InvalidConfigurationException;
import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.SchemaReader;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
Expand Down Expand Up @@ -72,7 +74,8 @@ private static void populateCarbonTable(Configuration configuration, String path
} else {
if (paths != null) {
for (String inputPath : inputPaths) {
if (paths.startsWith(inputPath.replace("file:", ""))) {
inputPath = inputPath.replace("file:", "");
if (FileFactory.isFileExist(inputPath)) {
validInputPath = inputPath;
break;
}
Expand Down Expand Up @@ -101,22 +104,22 @@ private static CarbonTable getCarbonTable(Configuration configuration, String pa
}

@Override public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
Random secureRandom = new SecureRandom();
int random = secureRandom.nextInt();
jobConf.set(DATABASE_NAME, "_dummyDb_" + random);
jobConf.set(TABLE_NAME, "_dummyTable_" + random);
org.apache.hadoop.mapreduce.JobContext jobContext = Job.getInstance(jobConf);
List<org.apache.hadoop.mapreduce.InputSplit> splitList = super.getSplits(jobContext);
CarbonTableInputFormat carbonTableInputFormat = new CarbonTableInputFormat();
List<org.apache.hadoop.mapreduce.InputSplit> splitList =
carbonTableInputFormat.getSplits(jobContext);
InputSplit[] splits = new InputSplit[splitList.size()];
CarbonInputSplit split;
for (int i = 0; i < splitList.size(); i++) {
split = (CarbonInputSplit) splitList.get(i);
CarbonHiveInputSplit inputSplit = new CarbonHiveInputSplit(split.getSegmentId(),
split.getPath(), split.getStart(), split.getLength(),
split.getLocations(), split.getNumberOfBlocklets(),
split.getVersion(), split.getBlockStorageIdMap());
BlockletDetailInfo info = new BlockletDetailInfo();
info.setBlockSize(split.getLength());
info.setBlockFooterOffset(split.getDetailInfo().getBlockFooterOffset());
info.setVersionNumber(split.getVersion().number());
info.setUseMinMaxForPruning(false);
inputSplit.setDetailInfo(info);
split.getVersion(), split.getBlockStorageIdMap(), split.getDetailInfo());
splits[i] = inputSplit;
}
return splits;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,22 @@
import java.io.IOException;
import java.util.Properties;

import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Progressable;

/**
* TODO : To extend CarbonOutputFormat
*/
class MapredCarbonOutputFormat<T> extends FileOutputFormat<Void, T>
public class MapredCarbonOutputFormat<T> extends CarbonTableOutputFormat
implements HiveOutputFormat<Void, T> {

@Override
Expand All @@ -41,6 +43,12 @@ public RecordWriter<Void, T> getRecordWriter(FileSystem fileSystem, JobConf jobC
return null;
}

@Override public void checkOutputSpecs(FileSystem fileSystem, JobConf jobConf)
throws IOException {
org.apache.hadoop.mapreduce.JobContext jobContext = Job.getInstance(jobConf);
super.checkOutputSpecs(jobContext);
}

@Override public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
Class<? extends Writable> valueClass, boolean isCompressed, Properties tableProperties,
Progressable progress) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@
* limitations under the License.
*/

package org.apache.carbondata.hive.server;
package org.apache.carbondata.hive.test.server;

import java.io.File;
import java.lang.reflect.Field;
import java.security.SecureRandom;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand All @@ -48,20 +46,22 @@
* a child JVM (which Hive calls local) or external.
*/
public class HiveEmbeddedServer2 {
private static final String SCRATCH_DIR = "/tmp/hive";
private String SCRATCH_DIR = "";
private static final Logger log = LogServiceFactory.getLogService(Hive.class.getName());
private HiveServer2 hiveServer;
private HiveConf config;
private int port;
private static Random secureRandom = new SecureRandom();

public void start() throws Exception {
public void start(String storePath) throws Exception {
log.info("Starting Hive Local/Embedded Server...");
SCRATCH_DIR = storePath;
if (hiveServer == null) {
config = configure();
hiveServer = new HiveServer2();
port = MetaStoreUtils.findFreePort();
config.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT, port);
config.setBoolVar(ConfVars.HADOOPMAPREDINPUTDIRRECURSIVE, true);
config.setBoolVar(ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES, true);
hiveServer.init(config);
hiveServer.start();
waitForStartup();
Expand Down Expand Up @@ -126,14 +126,12 @@ private HiveConf configure() throws Exception {
}
}

int random = secureRandom.nextInt();

conf.set("hive.metastore.warehouse.dir", scratchDir + "/warehouse" + random);
conf.set("hive.metastore.metadb.dir", scratchDir + "/metastore_db" + random);
conf.set("hive.metastore.warehouse.dir", scratchDir + "/warehouse");
conf.set("hive.metastore.metadb.dir", scratchDir + "/metastore_db");
conf.set("hive.exec.scratchdir", scratchDir);
conf.set("fs.permissions.umask-mode", "022");
conf.set("javax.jdo.option.ConnectionURL",
"jdbc:derby:;databaseName=" + scratchDir + "/metastore_db" + random + ";create=true");
"jdbc:derby:;databaseName=" + scratchDir + "/metastore_db" + ";create=true");
conf.set("hive.metastore.local", "true");
conf.set("hive.aux.jars.path", "");
conf.set("hive.added.jars.path", "");
Expand Down

0 comments on commit f9480da

Please sign in to comment.