Skip to content

Commit

Permalink
Merge f773e5b into d4a1577
Browse files Browse the repository at this point in the history
  • Loading branch information
ravipesala committed Aug 23, 2018
2 parents d4a1577 + f773e5b commit 1f92ce6
Show file tree
Hide file tree
Showing 38 changed files with 1,853 additions and 517 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@
*/
@InterfaceAudience.Internal
@InterfaceStability.Stable
public class LatestFilesReadCommittedScope implements ReadCommittedScope {
public class LatestFilesReadCommittedScope
implements ReadCommittedScope {

private String carbonFilePath;
private String segmentId;
private ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot;
private LoadMetadataDetails[] loadMetadataDetails;
private String[] subFolders;

/**
* a new constructor of this class
Expand All @@ -66,7 +68,23 @@ public LatestFilesReadCommittedScope(String path, String segmentId) {
* @param path carbon file path
*/
public LatestFilesReadCommittedScope(String path) {
this(path, null);
this(path, (String) null);
}

/**
* a new constructor with path
*
* @param path carbon file path
*/
public LatestFilesReadCommittedScope(String path, String[] subFolders) {
Objects.requireNonNull(path);
this.carbonFilePath = path;
this.subFolders = subFolders;
try {
takeCarbonIndexFileSnapShot();
} catch (IOException ex) {
throw new RuntimeException("Error while taking index snapshot", ex);
}
}

private void prepareLoadMetadata() {
Expand Down Expand Up @@ -128,8 +146,9 @@ private void prepareLoadMetadata() {
return indexFileStore;
}

@Override public SegmentRefreshInfo getCommittedSegmentRefreshInfo(
Segment segment, UpdateVO updateVo) throws IOException {
@Override
public SegmentRefreshInfo getCommittedSegmentRefreshInfo(Segment segment, UpdateVO updateVo)
throws IOException {
Map<String, SegmentRefreshInfo> snapShot =
readCommittedIndexFileSnapShot.getSegmentTimestampUpdaterMap();
String segName;
Expand All @@ -147,8 +166,8 @@ private String getSegmentID(String carbonIndexFileName, String indexFilePath) {
// This is CarbonFile case where the Index files are present inside the Segment Folder
// So the Segment has to be extracted from the path not from the CarbonIndex file.
String segString = indexFilePath.substring(0, indexFilePath.lastIndexOf("/") + 1);
String segName = segString
.substring(segString.lastIndexOf("_") + 1, segString.lastIndexOf("/"));
String segName =
segString.substring(segString.lastIndexOf("_") + 1, segString.lastIndexOf("/"));
return segName;
} else {
String fileName = carbonIndexFileName;
Expand All @@ -165,7 +184,16 @@ private String getSegmentID(String carbonIndexFileName, String indexFilePath) {
CarbonFile[] carbonIndexFiles = null;
if (file.isDirectory()) {
if (segmentId == null) {
carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(file);
if (subFolders != null) {
List<CarbonFile> allIndexFiles = new ArrayList<>();
for (String subFolder : subFolders) {
CarbonFile[] files = SegmentIndexFileStore.getCarbonIndexFiles(subFolder);
allIndexFiles.addAll(Arrays.asList(files));
}
carbonIndexFiles = allIndexFiles.toArray(new CarbonFile[0]);
} else {
carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(file);
}
} else {
String segmentPath = CarbonTablePath.getSegmentPath(carbonFilePath, segmentId);
carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath);
Expand All @@ -181,8 +209,7 @@ private String getSegmentID(String carbonIndexFileName, String indexFilePath) {
// Get Segment Name from the IndexFile.
String indexFilePath =
FileFactory.getUpdatedFilePath(carbonIndexFiles[i].getAbsolutePath());
String segId =
getSegmentID(carbonIndexFiles[i].getName(), indexFilePath);
String segId = getSegmentID(carbonIndexFiles[i].getName(), indexFilePath);
// TODO. During Partition table handling, place Segment File Name.
List<String> indexList;
SegmentRefreshInfo segmentRefreshInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ private List<AbstractIndex> getDataBlocks(QueryModel queryModel) throws IOExcept
}
if (null == segmentProperties) {
segmentProperties = new SegmentProperties(fileFooter.getColumnInTable(),
blockInfo.getDetailInfo().getDimLens());
fileFooter.getSegmentInfo().getColumnCardinality());
filePathToSegmentPropertiesMap.put(blockInfo.getFilePath(), segmentProperties);
}
readAndFillBlockletInfo(tableBlockInfos, blockInfo,
Expand Down
32 changes: 12 additions & 20 deletions core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.exception.InvalidConfigurationException;
import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
Expand Down Expand Up @@ -2237,24 +2236,20 @@ static DataType thriftDataTyopeToWrapperDataType(
}
}

public static List<String> getFilePathExternalFilePath(String path) {
public static String getFilePathExternalFilePath(String path) {

// return the list of carbondata files in the given path.
CarbonFile segment = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
CarbonFile[] dataFiles = segment.listFiles(new CarbonFileFilter() {
@Override public boolean accept(CarbonFile file) {

if (file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
return true;
}
return false;
CarbonFile[] dataFiles = segment.listFiles();
for (CarbonFile dataFile : dataFiles) {
if (dataFile.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
return dataFile.getAbsolutePath();
} else if (dataFile.isDirectory()) {
return getFilePathExternalFilePath(dataFile.getAbsolutePath());
}
});
List<String> filePaths = new ArrayList<>(dataFiles.length);
for (CarbonFile dfiles : dataFiles) {
filePaths.add(dfiles.getAbsolutePath());
}
return filePaths;
return null;
}

/**
Expand All @@ -2264,16 +2259,13 @@ public static List<String> getFilePathExternalFilePath(String path) {
*/
public static org.apache.carbondata.format.TableInfo inferSchema(String carbonDataFilePath,
String tableName, boolean isCarbonFileProvider) throws IOException {
List<String> filePaths;
String fistFilePath = null;
if (isCarbonFileProvider) {
filePaths = getFilePathExternalFilePath(carbonDataFilePath + "/Fact/Part0/Segment_null");
fistFilePath = getFilePathExternalFilePath(carbonDataFilePath + "/Fact/Part0/Segment_null");
} else {
filePaths = getFilePathExternalFilePath(carbonDataFilePath);
fistFilePath = getFilePathExternalFilePath(carbonDataFilePath);
}
String fistFilePath = null;
try {
fistFilePath = filePaths.get(0);
} catch (Exception e) {
if (fistFilePath == null) {
// Check if we can infer the schema from the hive metastore.
LOGGER.error("CarbonData file is not present in the table location");
throw new IOException("CarbonData file is not present in the table location");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ public List<InputSplit> getSplits(JobContext job) throws IOException {
readCommittedScope = new LatestFilesReadCommittedScope(
identifier.getTablePath() + "/Fact/Part0/Segment_null/");
} else {
readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath());
readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath(),
getSubFoldersToRead(job.getConfiguration()));
}
Expression filter = getFilterPredicates(job.getConfiguration());
// this will be null in case of corrupt schema file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -115,6 +116,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
private static final String PARTITIONS_TO_PRUNE =
"mapreduce.input.carboninputformat.partitions.to.prune";
private static final String FGDATAMAP_PRUNING = "mapreduce.input.carboninputformat.fgdatamap";
private static final String SUB_FOLDERS = "mapreduce.input.carboninputformat.subfolders";

// record segment number and hit blocks
protected int numSegments = 0;
Expand Down Expand Up @@ -337,6 +339,29 @@ public AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configur
}
}

public static void setSubFoldersToRead(Configuration configuration, String[] subFoldersToRead) {
if (subFoldersToRead == null) {
return;
}
try {
String subFoldersString =
ObjectSerializationUtil.convertObjectToString(subFoldersToRead);
configuration.set(SUB_FOLDERS, subFoldersString);
} catch (Exception e) {
throw new RuntimeException(
"Error while setting subfolders information to Job" + Arrays.toString(subFoldersToRead),
e);
}
}

public static String[] getSubFoldersToRead(Configuration configuration) throws IOException {
String subFoldersString = configuration.get(SUB_FOLDERS);
if (subFoldersString != null) {
return (String[]) ObjectSerializationUtil.convertStringToObject(subFoldersString);
}
return null;
}

/**
* {@inheritDoc}
* Configurations FileInputFormat.INPUT_DIR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,8 @@ public ReadCommittedScope getReadCommitted(JobContext job, AbsoluteTableIdentifi
if (job.getConfiguration().getBoolean(CARBON_TRANSACTIONAL_TABLE, true)) {
readCommittedScope = new TableStatusReadCommittedScope(identifier);
} else {
readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath());
readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath(),
getSubFoldersToRead(job.getConfiguration()));
}
this.readCommittedScope = readCommittedScope;
}
Expand Down
5 changes: 5 additions & 0 deletions integration/spark-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
<artifactId>carbondata-streaming</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-spark-datasource</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,22 @@

package org.apache.carbondata.spark.util;

import java.util.ArrayList;
import java.util.List;

import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.hadoop.CarbonInputSplit;

import org.apache.spark.SparkConf;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CarbonMetastoreTypes;
import org.apache.spark.sql.util.SparkTypeConverter;
import org.apache.spark.util.Utils;

public class Util {
Expand All @@ -46,4 +57,66 @@ public static boolean isBlockWithoutBlockletInfoExists(List<CarbonInputSplit> sp
}
return false;
}

private static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType(
DataType carbonDataType) {
if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.STRING) {
return DataTypes.StringType;
} else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT) {
return DataTypes.ShortType;
} else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT) {
return DataTypes.IntegerType;
} else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) {
return DataTypes.LongType;
} else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE) {
return DataTypes.DoubleType;
} else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN) {
return DataTypes.BooleanType;
} else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(carbonDataType)) {
return DataTypes.createDecimalType();
} else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.TIMESTAMP) {
return DataTypes.TimestampType;
} else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DATE) {
return DataTypes.DateType;
} else {
return null;
}
}

public static StructType convertToSparkSchema(CarbonTable table, ColumnSchema[] carbonColumns) {
List<StructField> fields = new ArrayList<>(carbonColumns.length);
for (int i = 0; i < carbonColumns.length; i++) {
ColumnSchema carbonColumn = carbonColumns[i];
DataType dataType = carbonColumn.getDataType();
if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(dataType)) {
fields.add(new StructField(carbonColumn.getColumnName(),
new DecimalType(carbonColumn.getPrecision(), carbonColumn.getScale()),
true, Metadata.empty()));
} else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isStructType(dataType)) {
fields.add(
new StructField(
carbonColumn.getColumnName(),
CarbonMetastoreTypes.toDataType(
String.format("struct<%s>",
SparkTypeConverter.getStructChildren(table, carbonColumn.getColumnName()))),
true,
Metadata.empty()));
} else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isArrayType(dataType)) {
fields.add(
new StructField(
carbonColumn.getColumnName(),
CarbonMetastoreTypes.toDataType(
String.format("array<%s>",
SparkTypeConverter.getArrayChildren(
table,
carbonColumn.getColumnName()))),
true,
Metadata.empty()));
} else {
fields.add(new StructField(carbonColumn.getColumnName(),
convertCarbonToSparkDataType(carbonColumn.getDataType()), true, Metadata.empty()));
}
}
return new StructType(fields.toArray(new StructField[0]));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.sql.util.CarbonException

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.block._
Expand All @@ -56,7 +57,7 @@ import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger._
import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
import org.apache.carbondata.spark.MergeResult
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, SparkDataTypeConverterImpl, Util}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}

class CarbonMergerRDD[K, V](
sc: SparkContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.PartitionUtils

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.datastore.block.{Distributable, SegmentProperties, TaskBlockInfo}
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
Expand All @@ -50,7 +51,6 @@ import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.merger.CarbonCompactionUtil
import org.apache.carbondata.processing.partition.spliter.CarbonSplitExecutor
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.spark.sql.util.SparkSQLUtil.sessionState
import org.apache.spark.util.{CarbonReflectionUtils, TaskCompletionListener}

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonCommonConstantsInternal}
import org.apache.carbondata.core.datastore.block.Distributable
import org.apache.carbondata.core.indexstore.PartitionSpec
Expand All @@ -59,7 +60,7 @@ import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.InitInputMetrics
import org.apache.carbondata.spark.util.{SparkDataTypeConverterImpl, Util}
import org.apache.carbondata.spark.util.Util
import org.apache.carbondata.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.block.SegmentProperties
import org.apache.carbondata.core.datastore.impl.FileFactory
Expand All @@ -47,7 +48,7 @@ import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, CompactionType}
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.{HandoffResult, HandoffResultImpl}
import org.apache.carbondata.spark.util.{CommonUtil, SparkDataTypeConverterImpl}
import org.apache.carbondata.spark.util.CommonUtil
import org.apache.carbondata.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}


Expand Down
Loading

0 comments on commit 1f92ce6

Please sign in to comment.