Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][CARBONDATA-3935]Support partition table transactional write in presto #3916

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -369,7 +369,8 @@ private CarbonCommonConstants() {
public static final String CARBON_MERGE_INDEX_IN_SEGMENT =
"carbon.merge.index.in.segment";

public static final String CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT = "true";
// TODO: revert this after proper fix in this PR
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please revert this

public static final String CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT = "false";

/**
* It is the user defined property to specify whether to throw exception or not in case
Expand Down
Expand Up @@ -18,10 +18,19 @@
package org.apache.carbondata.hive;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.util.ObjectSerializationUtil;
import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
Expand Down Expand Up @@ -123,13 +132,46 @@ public void commitJob(JobContext jobContext) throws IOException {
try {
Configuration configuration = jobContext.getConfiguration();
CarbonLoadModel carbonLoadModel = MapredCarbonOutputFormat.getLoadModel(configuration);
ThreadLocalSessionInfo.unsetAll();
SegmentFileStore.writeSegmentFile(carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable(),
carbonLoadModel.getSegmentId(), String.valueOf(carbonLoadModel.getFactTimeStamp()));
SegmentFileStore
.mergeIndexAndWriteSegmentFile(carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable(),
carbonLoadModel.getSegmentId(), String.valueOf(carbonLoadModel.getFactTimeStamp()));
CarbonTableOutputFormat.setLoadModel(configuration, carbonLoadModel);
if (!carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().isHivePartitionTable()) {
ThreadLocalSessionInfo.unsetAll();
SegmentFileStore
.writeSegmentFile(carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable(),
carbonLoadModel.getSegmentId(), String.valueOf(carbonLoadModel.getFactTimeStamp()));
SegmentFileStore.mergeIndexAndWriteSegmentFile(
carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable(),
carbonLoadModel.getSegmentId(), String.valueOf(carbonLoadModel.getFactTimeStamp()));
CarbonTableOutputFormat.setLoadModel(configuration, carbonLoadModel);
} else {
String tableFactLocation =
carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getTablePath();
List<CarbonFile> carbonFiles =
FileFactory.getCarbonFile(tableFactLocation).listFiles(true, new CarbonFileFilter() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

list files of whole table can be very slow when multiple segments are present. we list previous load segments also here. I think we need to keep list of index/merge index created for current load in memory and write in the segment file here.

@Override
public boolean accept(CarbonFile file) {
return (file.getName().endsWith(".carbonindex") || file.getName()
.endsWith(".carbonindexmerge")) && file.getName()
.contains("" + carbonLoadModel.getFactTimeStamp());
}
});
Map<String, Set<String>> partitionIndexMap = new HashMap<String, Set<String>>();
for (CarbonFile carbonFile: carbonFiles) {
String absTablePath = carbonFile.getAbsolutePath();
String partitionPath =
absTablePath.substring(0, absTablePath.indexOf(carbonFile.getName()));
Set<String> indexSet = partitionIndexMap.get(partitionPath);
if (indexSet == null) {
indexSet = new HashSet<>();
indexSet.add(carbonFile.getName());
partitionIndexMap.put(partitionPath, indexSet);
} else {
indexSet.add(carbonFile.getAbsolutePath());
}
}
jobContext.getConfiguration().set("carbon.index.files.name",
ObjectSerializationUtil.convertObjectToString(partitionIndexMap));
jobContext.getConfiguration().set("carbon.output.partitions.name", ObjectSerializationUtil
.convertObjectToString(new ArrayList<>(partitionIndexMap.keySet())));
}
carbonOutputCommitter.commitJob(jobContext);
} catch (Exception e) {
LOGGER.error(e);
Expand Down
Expand Up @@ -115,9 +115,20 @@ public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalO
carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getPartitionInfo();
final int partitionColumn =
partitionInfo != null ? partitionInfo.getColumnSchemaList().size() : 0;
final String finalOutputPath;
if (carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().isHivePartitionTable()) {
carbonLoadModel.getMetrics().addToPartitionPath(finalOutPath.toString());
context.getConfiguration().set("carbon.outputformat.writepath", finalOutPath.toString());
String[] outputPathSplits = finalOutPath.toString().split("/");
StringBuilder partitionDirs = new StringBuilder();
for (int i = partitionColumn; i > 0; i--) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, how carbondata-hive partition write is working ?

partitionDirs.append(CarbonCommonConstants.FILE_SEPARATOR)
.append(outputPathSplits[outputPathSplits.length - i]);
}
finalOutputPath = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getTablePath() +
partitionDirs;
carbonLoadModel.getMetrics().addToPartitionPath(finalOutputPath);
context.getConfiguration().set("carbon.outputformat.writepath", finalOutputPath);
} else {
finalOutputPath = finalOutPath.toString();
}
CarbonTableOutputFormat.setLoadModel(jc, carbonLoadModel);
org.apache.hadoop.mapreduce.RecordWriter<NullWritable, ObjectArrayWritable> re =
Expand All @@ -130,7 +141,7 @@ public void write(Writable writable) throws IOException {
if (isHivePartitionedTable) {
Object[] actualRow = ((CarbonHiveRow) writable).getData();
Object[] newData = Arrays.copyOf(actualRow, actualRow.length + partitionColumn);
String[] partitionValues = finalOutPath.toString().substring(tablePath.length())
String[] partitionValues = finalOutputPath.toString().substring(tablePath.length())
.split("/");
for (int j = 0, i = actualRow.length; j < partitionValues.length; j++) {
if (partitionValues[j].contains("=")) {
Expand Down
Expand Up @@ -81,7 +81,7 @@ public static CarbonLoadModel getCarbonLoadModel(Configuration tableProperties)
String partitionColumnTypes =
tableProperties.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
if (partitionColumns != null) {
columns = columns + "," + partitionColumns;
columns = columns + "," + partitionColumns.replace("/", ",");
columnTypes = columnTypes + ":" + partitionColumnTypes;
}
String[] columnTypeArray = splitSchemaStringToArray(columnTypes);
Expand All @@ -105,11 +105,21 @@ public static CarbonLoadModel getCarbonLoadModel(Properties tableProperties,
String tablePath = tableProperties.getProperty(hive_metastoreConstants.META_TABLE_LOCATION);
String columns = tableProperties.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS);
String sortColumns = tableProperties.getProperty("sort_columns");
String[] columnTypes = splitSchemaStringToArray(tableProperties.getProperty("columns.types"));
String columnTypes = tableProperties.getProperty("columns.types");
Object partitionColumns =
tableProperties.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
Object partitionColumnTypes =
tableProperties.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
if (partitionColumns != null) {
columns = columns + "," + partitionColumns.toString().replace("/", ",");
columnTypes = columnTypes + ":" + partitionColumnTypes.toString();
}
String[] columnTypesArray = splitSchemaStringToArray(
columnTypes);
String complexDelim = tableProperties.getProperty("complex_delimiter", "");
CarbonLoadModel carbonLoadModel =
getCarbonLoadModel(tableName, databaseName, tablePath, sortColumns, columns.split(","),
columnTypes, configuration);
columnTypesArray, configuration);
for (String delim : complexDelim.split(",")) {
carbonLoadModel.setComplexDelimiter(delim);
}
Expand Down
Expand Up @@ -42,6 +42,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.IOConstants;
Expand Down Expand Up @@ -87,8 +88,11 @@ public class CarbonDataFileWriter implements HiveFileWriter {
public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, Properties properties,
JobConf configuration, TypeManager typeManager) throws SerDeException {
requireNonNull(outPutPath, "path is null");
// take the outputPath same as location in compliance with the carbon store folder structure.
this.outPutPath = new Path(properties.getProperty("location"));
if (properties.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS) != null) {
this.outPutPath = outPutPath.getParent();
} else {
this.outPutPath = new Path(properties.getProperty("location"));
}
this.configuration = requireNonNull(configuration, "conf is null");
List<String> columnNames = Arrays
.asList(properties.getProperty(IOConstants.COLUMNS, "").split(CarbonCommonConstants.COMMA));
Expand Down
Expand Up @@ -23,7 +23,6 @@
import java.util.Properties;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
import org.apache.carbondata.hive.MapredCarbonOutputCommitter;
Expand Down
@@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.presto.integrationtest

import java.io.File
import java.util
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConverters._

import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuiteLike}

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore}
import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField}
import org.apache.carbondata.core.metadata.schema.{PartitionInfo, SchemaReader}
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.presto.server.PrestoServer

/**
* Tests for partition tables transational write in presto
*/
class PrestoInsertIntoPartitionTableTestCase
extends FunSuiteLike with BeforeAndAfterAll with BeforeAndAfterEach {

private val logger = LogServiceFactory
.getLogService(classOf[PrestoAllDataTypeTest].getCanonicalName)

private val rootPath = new File(this.getClass.getResource("/").getPath
+ "../../../..").getCanonicalPath
private val storePath = s"$rootPath/integration/presto/target/store"
private val prestoServer = new PrestoServer

override def beforeAll: Unit = {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove written by, it has to take care internally

"Presto")
val map = new util.HashMap[String, String]()
map.put("hive.metastore", "file")
map.put("hive.metastore.catalog.dir", s"file://$storePath")
map.put("hive.allow-drop-table", "true")
prestoServer.startServer("testdb", map)
prestoServer.execute("drop schema if exists testdb")
prestoServer.execute("create schema testdb")
}

test("test partition table insert") {
prestoServer.execute(s"drop table if exists partitiontable")
val query =
"create table testdb.partitiontable(ID int, name varchar,country varchar) with " +
"(partitioned_by = ARRAY['country'], format='CARBON') "
val tuple = getSchemaBuilder(false)
PrestoUtil.createTable(prestoServer, query, "testdb", "partitiontable", tuple._1, tuple._2)
prestoServer.execute("insert into testdb.partitiontable values(10,'joey','India')")
prestoServer.execute("insert into testdb.partitiontable values(20,'ross','US')")
prestoServer.execute("insert into testdb.partitiontable values(30,'chandler','china')")
val partitionDirectories = Seq("country=India/", "country=US/", "country=china/")
checkPartitionAssertions("partitiontable", "testdb", partitionDirectories, 3)
val actualResult1: List[Map[String, Any]] = prestoServer
.executeQuery("select count(*) AS RESULT from testdb.partitiontable WHERE country = 'India'")
val expectedResult1: List[Map[String, Any]] = List(Map("RESULT" -> 1))
assert(actualResult1.equals(expectedResult1))
val actualResult2: List[Map[String, Any]] = prestoServer
.executeQuery(
"select count(*) AS RESULT from testdb.partitiontable WHERE country = 'India' or " +
"country = 'US' or country = 'china'")
val expectedResult2: List[Map[String, Any]] = List(Map("RESULT" -> 3))
assert(actualResult2.equals(expectedResult2))
}

test("test insert to partition table with multiple partition columns") {
prestoServer.execute(s"drop table if exists multipartitiontable")
val query =
"create table testdb.multipartitiontable(ID int, name varchar,country varchar, state " +
"varchar, city varchar) with (partitioned_by = ARRAY['country','state','city'], " +
"format='CARBON') "
val tuple = getSchemaBuilder(true)
PrestoUtil.createTable(prestoServer, query, "testdb", "multipartitiontable", tuple._1, tuple._2)
prestoServer.execute(
"insert into testdb.multipartitiontable values(10,'joey','India', 'karnataka', 'gadag')")
prestoServer.execute(
"insert into testdb.multipartitiontable values(20,'ross','US', 'Texas','Dallas')")
prestoServer.execute(
"insert into testdb.multipartitiontable values(30,'chandler','china', 'zejiang','shenzen')")
prestoServer.execute(
"insert into testdb.multipartitiontable values(10,'rachel','India', 'karnataka', 'mysuru')")
val partitionDirectories = Seq("country=India/state=karnataka/city=mysuru/",
"country=India/state=karnataka/city=gadag/",
"country=china/state=zejiang/city=shenzen/",
"country=US/state=Texas/city=Dallas/")
checkPartitionAssertions("multipartitiontable", "testdb", partitionDirectories, 4)
val actualResult1: List[Map[String, Any]] = prestoServer.executeQuery(
"select count(*) AS RESULT from testdb.multipartitiontable WHERE country = 'India'")
val expectedResult1: List[Map[String, Any]] = List(Map("RESULT" -> 2))
assert(actualResult1.equals(expectedResult1))
val actualResult2: List[Map[String, Any]] = prestoServer
.executeQuery(
"select count(*) AS RESULT from testdb.multipartitiontable WHERE country = 'US' or " +
"country = 'china' or city = 'gadag'")
val expectedResult2: List[Map[String, Any]] = List(Map("RESULT" -> 3))
assert(actualResult2.equals(expectedResult2))
}

private def checkPartitionAssertions(tableName: String,
dbName: String,
partitionDirectories: Seq[String], segmentFileCount: Int): Unit = {
val absoluteTableIdentifier: AbsoluteTableIdentifier = PrestoUtil
.getAbsoluteIdentifier(dbName, tableName)
val carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier)
val tablePath = carbonTable.getTablePath
val segmentsPath = CarbonTablePath.getSegmentFilesLocation(tablePath)
// check if all the segment files are present for each segment.
val segmentFiles = FileFactory.getCarbonFile(segmentsPath).listFiles()
assert(segmentFiles.length == segmentFileCount)
segmentFiles.foreach { segmentFile =>
val segmentFileName = segmentFile.getName
val segmentFileStore = new SegmentFileStore(carbonTable.getTablePath, segmentFileName)
val partitionSpecs = segmentFileStore.getPartitionSpecs
partitionDirectories.contains(partitionSpecs.get(0).getPartitions.get(0))
// check if the partition directories are not empty
val dataOrIndexFiles = FileFactory.getCarbonFile(partitionSpecs.get(0).getLocation.toString)
.listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = {
file.getName.endsWith(CarbonCommonConstants.FACT_FILE_EXT) ||
file.getName.endsWith(CarbonCommonConstants.UPDATE_INDEX_FILE_EXT) ||
file.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)
}
})
assert(dataOrIndexFiles.nonEmpty)
}
// check if the segment file name is present in each load metadatadetail
val ssm = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
ssm.getValidAndInvalidSegments.getValidSegments.asScala.foreach { segment =>
val loadMetadataDetails = segment.getLoadMetadataDetails
assert(loadMetadataDetails.getSegmentFile != null)
}
}

def getSchemaBuilder(isMultiplePartionColuns: Boolean): (TableSchemaBuilder, PartitionInfo) = {
val integer = new AtomicInteger(0)
val schemaBuilder = new TableSchemaBuilder
schemaBuilder.addColumn(new StructField("ID", DataTypes.INT), integer, false, false)
schemaBuilder.addColumn(new StructField("name", DataTypes.STRING), integer, false, false)
val partitionColumnSchemas: util.List[ColumnSchema] = new util.ArrayList[ColumnSchema]
partitionColumnSchemas.add(schemaBuilder.addColumn(new StructField("country", DataTypes.STRING),
integer, false, false))
if (isMultiplePartionColuns) {
partitionColumnSchemas.add(schemaBuilder.addColumn(new StructField("state", DataTypes.STRING),
integer, false, false))
partitionColumnSchemas.add(schemaBuilder.addColumn(new StructField("city", DataTypes.STRING),
integer, false, false))
}
val partitionInfo = new PartitionInfo(partitionColumnSchemas, PartitionType.NATIVE_HIVE)
(schemaBuilder, partitionInfo)
}

override def afterAll(): Unit = {
prestoServer.stopServer()
CarbonUtil.deleteFoldersAndFiles(FileFactory.getCarbonFile(storePath))
}

}