Skip to content

Commit

Permalink
Refactored ReadCommitted scope
Browse files Browse the repository at this point in the history
  • Loading branch information
ravipesala committed Aug 24, 2018
1 parent 23abf03 commit 62cbab8
Show file tree
Hide file tree
Showing 11 changed files with 122 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ public static void executeDataMapJobForClearingDataMaps(CarbonTable carbonTable)
throws IOException {
String dataMapJobClassName = "org.apache.carbondata.spark.rdd.SparkDataMapJob";
DataMapJob dataMapJob = (DataMapJob) createDataMapJob(dataMapJobClassName);
if (dataMapJob == null) {
return;
}
String className = "org.apache.carbondata.core.datamap.DistributableDataMapFormat";
SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo =
getValidAndInvalidSegments(carbonTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ public static AbsoluteTableIdentifier from(String tablePath, String dbName, Stri
return from(tablePath, dbName, tableName, "");
}

public static AbsoluteTableIdentifier from(String tablePath) {
return from(tablePath, "", "", "");
}

public static AbsoluteTableIdentifier from(
String tablePath,
CarbonTableIdentifier carbonTableIdentifier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
package org.apache.carbondata.core.readcommitter;

import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
Expand All @@ -42,7 +47,6 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
private String segmentId;
private ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot;
private LoadMetadataDetails[] loadMetadataDetails;
private String[] dataFolders;

/**
* a new constructor of this class
Expand All @@ -63,20 +67,16 @@ public LatestFilesReadCommittedScope(String path, String segmentId) throws IOExc
* @param path carbon file path
*/
public LatestFilesReadCommittedScope(String path) throws IOException {
this(path, (String) null);
this(path, null);
}

/**
* a new constructor with path
* a new constructor with carbon index files
*
* @param path carbon file path
* @param dataFolders Folders where carbondata files exists
* @param indexFiles carbon index files
*/
public LatestFilesReadCommittedScope(String path, String[] dataFolders) throws IOException {
Objects.requireNonNull(path);
this.carbonFilePath = path;
this.dataFolders = dataFolders;
takeCarbonIndexFileSnapShot();
public LatestFilesReadCommittedScope(CarbonFile[] indexFiles) {
takeCarbonIndexFileSnapShot(indexFiles);
}

private void prepareLoadMetadata() {
Expand Down Expand Up @@ -171,21 +171,11 @@ private String getSegmentID(String carbonIndexFileName, String indexFilePath) {
@Override public void takeCarbonIndexFileSnapShot() throws IOException {
// Read the current file Path get the list of indexes from the path.
CarbonFile file = FileFactory.getCarbonFile(carbonFilePath);
Map<String, List<String>> indexFileStore = new HashMap<>();
Map<String, SegmentRefreshInfo> segmentTimestampUpdaterMap = new HashMap<>();

CarbonFile[] carbonIndexFiles = null;
if (file.isDirectory()) {
if (segmentId == null) {
if (dataFolders != null) {
List<CarbonFile> allIndexFiles = new ArrayList<>();
for (String subFolder : dataFolders) {
CarbonFile[] files = SegmentIndexFileStore.getCarbonIndexFiles(subFolder);
allIndexFiles.addAll(Arrays.asList(files));
}
carbonIndexFiles = allIndexFiles.toArray(new CarbonFile[0]);
} else {
carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(file);
}
carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(file);
} else {
String segmentPath = CarbonTablePath.getSegmentPath(carbonFilePath, segmentId);
carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath);
Expand All @@ -194,44 +184,50 @@ private String getSegmentID(String carbonIndexFileName, String indexFilePath) {
throw new IOException(
"No Index files are present in the table location :" + carbonFilePath);
}
for (int i = 0; i < carbonIndexFiles.length; i++) {
// TODO. Nested File Paths.
if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)
|| carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
// Get Segment Name from the IndexFile.
String indexFilePath =
FileFactory.getUpdatedFilePath(carbonIndexFiles[i].getAbsolutePath());
String segId = getSegmentID(carbonIndexFiles[i].getName(), indexFilePath);
// TODO. During Partition table handling, place Segment File Name.
List<String> indexList;
SegmentRefreshInfo segmentRefreshInfo;
if (indexFileStore.get(segId) == null) {
indexList = new ArrayList<>(1);
segmentRefreshInfo =
new SegmentRefreshInfo(carbonIndexFiles[i].getLastModifiedTime(), 0);
segmentTimestampUpdaterMap.put(segId, segmentRefreshInfo);
} else {
// Entry is already present.
indexList = indexFileStore.get(segId);
segmentRefreshInfo = segmentTimestampUpdaterMap.get(segId);
}
indexList.add(indexFilePath);
if (segmentRefreshInfo.getSegmentUpdatedTimestamp() < carbonIndexFiles[i]
.getLastModifiedTime()) {
segmentRefreshInfo
.setSegmentUpdatedTimestamp(carbonIndexFiles[i].getLastModifiedTime());
}
indexFileStore.put(segId, indexList);
segmentRefreshInfo.setCountOfFileInSegment(indexList.size());
}
}
ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot =
new ReadCommittedIndexFileSnapShot(indexFileStore, segmentTimestampUpdaterMap);
this.readCommittedIndexFileSnapShot = readCommittedIndexFileSnapShot;
prepareLoadMetadata();
takeCarbonIndexFileSnapShot(carbonIndexFiles);
} else {
throw new IOException("Path is not pointing to directory");
}
}

private void takeCarbonIndexFileSnapShot(CarbonFile[] carbonIndexFiles) {
Map<String, List<String>> indexFileStore = new HashMap<>();
Map<String, SegmentRefreshInfo> segmentTimestampUpdaterMap = new HashMap<>();
for (int i = 0; i < carbonIndexFiles.length; i++) {
// TODO. Nested File Paths.
if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)
|| carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
// Get Segment Name from the IndexFile.
String indexFilePath =
FileFactory.getUpdatedFilePath(carbonIndexFiles[i].getAbsolutePath());
String segId = getSegmentID(carbonIndexFiles[i].getName(), indexFilePath);
// TODO. During Partition table handling, place Segment File Name.
List<String> indexList;
SegmentRefreshInfo segmentRefreshInfo;
if (indexFileStore.get(segId) == null) {
indexList = new ArrayList<>(1);
segmentRefreshInfo =
new SegmentRefreshInfo(carbonIndexFiles[i].getLastModifiedTime(), 0);
segmentTimestampUpdaterMap.put(segId, segmentRefreshInfo);
} else {
// Entry is already present.
indexList = indexFileStore.get(segId);
segmentRefreshInfo = segmentTimestampUpdaterMap.get(segId);
}
indexList.add(indexFilePath);
if (segmentRefreshInfo.getSegmentUpdatedTimestamp() < carbonIndexFiles[i]
.getLastModifiedTime()) {
segmentRefreshInfo
.setSegmentUpdatedTimestamp(carbonIndexFiles[i].getLastModifiedTime());
}
indexFileStore.put(segId, indexList);
segmentRefreshInfo.setCountOfFileInSegment(indexList.size());
}
}
ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot =
new ReadCommittedIndexFileSnapShot(indexFileStore, segmentTimestampUpdaterMap);
this.readCommittedIndexFileSnapShot = readCommittedIndexFileSnapShot;
prepareLoadMetadata();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,10 @@ public List<InputSplit> getSplits(JobContext job) throws IOException {
readCommittedScope = new LatestFilesReadCommittedScope(
identifier.getTablePath() + "/Fact/Part0/Segment_null/");
} else {
readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath(),
getDataFoldersToRead(job.getConfiguration()));
readCommittedScope = getReadCommittedScope(job.getConfiguration());
if (readCommittedScope == null) {
readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath());
}
}
Expression filter = getFilterPredicates(job.getConfiguration());
// this will be null in case of corrupt schema file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
import java.util.Objects;
Expand All @@ -48,6 +47,7 @@
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.profiler.ExplainCollector;
import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.scan.model.QueryModel;
Expand Down Expand Up @@ -116,7 +116,8 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
private static final String PARTITIONS_TO_PRUNE =
"mapreduce.input.carboninputformat.partitions.to.prune";
private static final String FGDATAMAP_PRUNING = "mapreduce.input.carboninputformat.fgdatamap";
private static final String DATA_FOLDERS = "mapreduce.input.carboninputformat.datafolders";
private static final String READ_COMMITTED_SCOPE =
"mapreduce.input.carboninputformat.read.committed.scope";

// record segment number and hit blocks
protected int numSegments = 0;
Expand Down Expand Up @@ -339,25 +340,25 @@ public AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configur
}
}

public static void setDataFoldersToRead(Configuration configuration, String[] dataFoldersToRead) {
if (dataFoldersToRead == null) {
public static void setReadCommittedScope(Configuration configuration,
ReadCommittedScope committedScope) {
if (committedScope == null) {
return;
}
try {
String subFoldersString =
ObjectSerializationUtil.convertObjectToString(dataFoldersToRead);
configuration.set(DATA_FOLDERS, subFoldersString);
String subFoldersString = ObjectSerializationUtil.convertObjectToString(committedScope);
configuration.set(READ_COMMITTED_SCOPE, subFoldersString);
} catch (Exception e) {
throw new RuntimeException(
"Error while setting subfolders information to Job" + Arrays.toString(dataFoldersToRead),
e);
"Error while setting committedScope information to Job" + committedScope, e);
}
}

public static String[] getDataFoldersToRead(Configuration configuration) throws IOException {
String subFoldersString = configuration.get(DATA_FOLDERS);
public static ReadCommittedScope getReadCommittedScope(Configuration configuration)
throws IOException {
String subFoldersString = configuration.get(READ_COMMITTED_SCOPE);
if (subFoldersString != null) {
return (String[]) ObjectSerializationUtil.convertStringToObject(subFoldersString);
return (ReadCommittedScope) ObjectSerializationUtil.convertStringToObject(subFoldersString);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,8 +660,10 @@ public ReadCommittedScope getReadCommitted(JobContext job, AbsoluteTableIdentifi
if (job.getConfiguration().getBoolean(CARBON_TRANSACTIONAL_TABLE, true)) {
readCommittedScope = new TableStatusReadCommittedScope(identifier);
} else {
readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath(),
getDataFoldersToRead(job.getConfiguration()));
readCommittedScope = getReadCommittedScope(job.getConfiguration());
if (readCommittedScope == null) {
readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath());
}
}
this.readCommittedScope = readCommittedScope;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.carbondata.execution.datasources

import java.io.IOException
import java.util

import scala.collection.JavaConverters._
Expand All @@ -30,8 +31,11 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.{InMemoryFileIndex, _}
import org.apache.spark.sql.types.StructType

import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, HDFSCarbonFile}
import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope
import org.apache.carbondata.core.scan.expression.{Expression => CarbonExpression}
import org.apache.carbondata.core.scan.expression.logical.AndExpression
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.CarbonInputSplit
import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat}

Expand All @@ -43,18 +47,18 @@ class CarbonFileIndex(
sparkSession: SparkSession,
dataSchema: StructType,
parameters: Map[String, String],
inMemoryFileIndex: InMemoryFileIndex)
fileIndex: FileIndex)
extends FileIndex with AbstractCarbonFileIndex {

override def rootPaths: Seq[Path] = inMemoryFileIndex.rootPaths
override def rootPaths: Seq[Path] = fileIndex.rootPaths

override def inputFiles: Array[String] = inMemoryFileIndex.inputFiles
override def inputFiles: Array[String] = fileIndex.inputFiles

override def refresh(): Unit = inMemoryFileIndex.refresh()
override def refresh(): Unit = fileIndex.refresh()

override def sizeInBytes: Long = inMemoryFileIndex.sizeInBytes
override def sizeInBytes: Long = fileIndex.sizeInBytes

override def partitionSchema: StructType = inMemoryFileIndex.partitionSchema
override def partitionSchema: StructType = fileIndex.partitionSchema

/**
* It lists the pruned files after applying partition and data filters.
Expand All @@ -65,10 +69,10 @@ class CarbonFileIndex(
*/
override def listFiles(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
val method = inMemoryFileIndex.getClass.getMethods.find(_.getName == "listFiles").get
val method = fileIndex.getClass.getMethods.find(_.getName == "listFiles").get
val directories =
method.invoke(
inMemoryFileIndex,
fileIndex,
partitionFilters,
dataFilters).asInstanceOf[Seq[PartitionDirectory]]
prune(dataFilters, directories)
Expand All @@ -78,7 +82,7 @@ class CarbonFileIndex(
directories: Seq[PartitionDirectory]) = {
val tablePath = parameters.get("path")
if (tablePath.nonEmpty) {
val hadoopConf = new Configuration(sparkSession.sparkContext.hadoopConfiguration)
val hadoopConf = sparkSession.sessionState.newHadoopConf()
// convert t sparks source filter
val filters = dataFilters.flatMap(DataSourceStrategy.translateFilter)

Expand All @@ -91,14 +95,20 @@ class CarbonFileIndex(
hadoopConf,
model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
CarbonInputFormat.setTransactionalTable(hadoopConf, false)
if (rootPaths.nonEmpty) {
// Check for any subfolders are present here.
if (!rootPaths.head.equals(new Path(tablePath.get)) &&
rootPaths.head.toString.contains(tablePath.get)) {
CarbonInputFormat.setDataFoldersToRead(hadoopConf,
rootPaths.map(_.toUri.toString).toArray)
}
var totalFiles = 0
val indexFiles = directories.flatMap { dir =>
totalFiles += dir.files.length
dir.files.filter{f =>
f.getPath.getName.endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
f.getPath.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)}.
map(new HDFSCarbonFile(_))
}.toArray.asInstanceOf[Array[CarbonFile]]
if (indexFiles.length == 0 && totalFiles > 0) {
throw new IOException("No Index files are present in the table location :" + tablePath.get)
}
CarbonInputFormat.setReadCommittedScope(
hadoopConf,
new LatestFilesReadCommittedScope(indexFiles))
filter match {
case Some(c) => CarbonInputFormat.setFilterPredicates(hadoopConf, c)
case None => None
Expand All @@ -120,9 +130,9 @@ class CarbonFileIndex(
}

override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
val method = inMemoryFileIndex.getClass.getMethods.find(_.getName == "listFiles").get
val method = fileIndex.getClass.getMethods.find(_.getName == "listFiles").get
val directories =
method.invoke(inMemoryFileIndex, filters).asInstanceOf[Seq[PartitionDirectory]]
method.invoke(fileIndex, filters).asInstanceOf[Seq[PartitionDirectory]]
prune(filters, directories)
}
}
Expand Down
Loading

0 comments on commit 62cbab8

Please sign in to comment.