Skip to content

Commit

Permalink
[CARBONDATA-2872] Added Spark FileFormat interface implementation in …
Browse files Browse the repository at this point in the history
…Carbon

Added new package carbondata-spark-datasource under /integration/spark-datasource
It contains the implementation of Spark's FileFormat and user can use carbon as format in spark
For example

create table test_table(c1 string, c2 int) using carbon
or
dataframe.write.format("carbon").saveAsTable("test_table")
There are few classes moved to this datasource package as part of refactoring and spark2 and spark-common packages now depends on spark-datasource package.

This closes #2647
  • Loading branch information
ravipesala authored and kumarvishal09 committed Aug 24, 2018
1 parent 1372450 commit 347b8e1
Show file tree
Hide file tree
Showing 51 changed files with 1,999 additions and 660 deletions.
Expand Up @@ -53,6 +53,8 @@
import static org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.MV;
import static org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.PREAGGREGATE;

import org.apache.hadoop.fs.Path;

/**
* It maintains all the DataMaps in it.
*/
Expand All @@ -70,6 +72,11 @@ public Map<String, List<TableDataMap>> getAllDataMaps() {
*/
private Map<String, List<TableDataMap>> allDataMaps = new ConcurrentHashMap<>();

/**
* Contains the table name to the tablepath mapping.
*/
private Map<String, String> tablePathMap = new ConcurrentHashMap<>();

/**
* Contains the datamap catalog for each datamap provider.
*/
Expand Down Expand Up @@ -388,6 +395,7 @@ public TableDataMap registerDataMap(CarbonTable table,

tableIndices.add(dataMap);
allDataMaps.put(tableUniqueName, tableIndices);
tablePathMap.put(tableUniqueName, table.getTablePath());
return dataMap;
}

Expand Down Expand Up @@ -426,6 +434,16 @@ public void clearDataMaps(AbsoluteTableIdentifier identifier) {
CarbonTable carbonTable = getCarbonTable(identifier);
String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName();
List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
if (tableIndices == null && identifier.getTablePath() != null) {
// Try get using table path
for (Map.Entry<String, String> entry : tablePathMap.entrySet()) {
if (new Path(entry.getValue()).equals(new Path(identifier.getTablePath()))) {
tableIndices = allDataMaps.get(entry.getKey());
tableUniqueName = entry.getKey();
break;
}
}
}
if (null != carbonTable && tableIndices != null) {
try {
DataMapUtil.executeDataMapJobForClearingDataMaps(carbonTable);
Expand All @@ -437,6 +455,7 @@ public void clearDataMaps(AbsoluteTableIdentifier identifier) {
segmentRefreshMap.remove(identifier.uniqueName());
clearDataMaps(tableUniqueName);
allDataMaps.remove(tableUniqueName);
tablePathMap.remove(tableUniqueName);
}

/**
Expand Down Expand Up @@ -477,6 +496,7 @@ public void clearDataMaps(String tableUniqName) {
}
}
allDataMaps.remove(tableUniqName);
tablePathMap.remove(tableUniqName);
}

/**
Expand Down
Expand Up @@ -94,6 +94,9 @@ public static void executeDataMapJobForClearingDataMaps(CarbonTable carbonTable)
throws IOException {
String dataMapJobClassName = "org.apache.carbondata.spark.rdd.SparkDataMapJob";
DataMapJob dataMapJob = (DataMapJob) createDataMapJob(dataMapJobClassName);
if (dataMapJob == null) {
return;
}
String className = "org.apache.carbondata.core.datamap.DistributableDataMapFormat";
SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo =
getValidAndInvalidSegments(carbonTable);
Expand Down
Expand Up @@ -71,6 +71,10 @@ public static AbsoluteTableIdentifier from(String tablePath, String dbName, Stri
return from(tablePath, dbName, tableName, "");
}

public static AbsoluteTableIdentifier from(String tablePath) {
return from(tablePath, "", "", "");
}

public static AbsoluteTableIdentifier from(
String tablePath,
CarbonTableIdentifier carbonTableIdentifier) {
Expand Down
Expand Up @@ -17,7 +17,12 @@
package org.apache.carbondata.core.readcommitter;

import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
Expand Down Expand Up @@ -49,26 +54,31 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
* @param path carbon file path
* @param segmentId segment id
*/
public LatestFilesReadCommittedScope(String path, String segmentId) {
public LatestFilesReadCommittedScope(String path, String segmentId) throws IOException {
Objects.requireNonNull(path);
this.carbonFilePath = path;
this.segmentId = segmentId;
try {
takeCarbonIndexFileSnapShot();
} catch (IOException ex) {
throw new RuntimeException("Error while taking index snapshot", ex);
}
takeCarbonIndexFileSnapShot();
}

/**
* a new constructor with path
*
* @param path carbon file path
*/
public LatestFilesReadCommittedScope(String path) {
public LatestFilesReadCommittedScope(String path) throws IOException {
this(path, null);
}

/**
* a new constructor with carbon index files
*
* @param indexFiles carbon index files
*/
public LatestFilesReadCommittedScope(CarbonFile[] indexFiles) {
takeCarbonIndexFileSnapShot(indexFiles);
}

private void prepareLoadMetadata() {
int loadCount = 0;
Map<String, List<String>> snapshotMap =
Expand Down Expand Up @@ -128,8 +138,9 @@ private void prepareLoadMetadata() {
return indexFileStore;
}

@Override public SegmentRefreshInfo getCommittedSegmentRefreshInfo(
Segment segment, UpdateVO updateVo) throws IOException {
@Override
public SegmentRefreshInfo getCommittedSegmentRefreshInfo(Segment segment, UpdateVO updateVo)
throws IOException {
Map<String, SegmentRefreshInfo> snapShot =
readCommittedIndexFileSnapShot.getSegmentTimestampUpdaterMap();
String segName;
Expand All @@ -147,8 +158,8 @@ private String getSegmentID(String carbonIndexFileName, String indexFilePath) {
// This is CarbonFile case where the Index files are present inside the Segment Folder
// So the Segment has to be extracted from the path not from the CarbonIndex file.
String segString = indexFilePath.substring(0, indexFilePath.lastIndexOf("/") + 1);
String segName = segString
.substring(segString.lastIndexOf("_") + 1, segString.lastIndexOf("/"));
String segName =
segString.substring(segString.lastIndexOf("_") + 1, segString.lastIndexOf("/"));
return segName;
} else {
String fileName = carbonIndexFileName;
Expand All @@ -160,8 +171,7 @@ private String getSegmentID(String carbonIndexFileName, String indexFilePath) {
@Override public void takeCarbonIndexFileSnapShot() throws IOException {
// Read the current file Path get the list of indexes from the path.
CarbonFile file = FileFactory.getCarbonFile(carbonFilePath);
Map<String, List<String>> indexFileStore = new HashMap<>();
Map<String, SegmentRefreshInfo> segmentTimestampUpdaterMap = new HashMap<>();

CarbonFile[] carbonIndexFiles = null;
if (file.isDirectory()) {
if (segmentId == null) {
Expand All @@ -174,45 +184,50 @@ private String getSegmentID(String carbonIndexFileName, String indexFilePath) {
throw new IOException(
"No Index files are present in the table location :" + carbonFilePath);
}
for (int i = 0; i < carbonIndexFiles.length; i++) {
// TODO. Nested File Paths.
if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)
|| carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
// Get Segment Name from the IndexFile.
String indexFilePath =
FileFactory.getUpdatedFilePath(carbonIndexFiles[i].getAbsolutePath());
String segId =
getSegmentID(carbonIndexFiles[i].getName(), indexFilePath);
// TODO. During Partition table handling, place Segment File Name.
List<String> indexList;
SegmentRefreshInfo segmentRefreshInfo;
if (indexFileStore.get(segId) == null) {
indexList = new ArrayList<>(1);
segmentRefreshInfo =
new SegmentRefreshInfo(carbonIndexFiles[i].getLastModifiedTime(), 0);
segmentTimestampUpdaterMap.put(segId, segmentRefreshInfo);
} else {
// Entry is already present.
indexList = indexFileStore.get(segId);
segmentRefreshInfo = segmentTimestampUpdaterMap.get(segId);
}
indexList.add(indexFilePath);
if (segmentRefreshInfo.getSegmentUpdatedTimestamp() < carbonIndexFiles[i]
.getLastModifiedTime()) {
segmentRefreshInfo
.setSegmentUpdatedTimestamp(carbonIndexFiles[i].getLastModifiedTime());
}
indexFileStore.put(segId, indexList);
segmentRefreshInfo.setCountOfFileInSegment(indexList.size());
}
}
ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot =
new ReadCommittedIndexFileSnapShot(indexFileStore, segmentTimestampUpdaterMap);
this.readCommittedIndexFileSnapShot = readCommittedIndexFileSnapShot;
prepareLoadMetadata();
takeCarbonIndexFileSnapShot(carbonIndexFiles);
} else {
throw new IOException("Path is not pointing to directory");
}
}

private void takeCarbonIndexFileSnapShot(CarbonFile[] carbonIndexFiles) {
Map<String, List<String>> indexFileStore = new HashMap<>();
Map<String, SegmentRefreshInfo> segmentTimestampUpdaterMap = new HashMap<>();
for (int i = 0; i < carbonIndexFiles.length; i++) {
// TODO. Nested File Paths.
if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)
|| carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
// Get Segment Name from the IndexFile.
String indexFilePath =
FileFactory.getUpdatedFilePath(carbonIndexFiles[i].getAbsolutePath());
String segId = getSegmentID(carbonIndexFiles[i].getName(), indexFilePath);
// TODO. During Partition table handling, place Segment File Name.
List<String> indexList;
SegmentRefreshInfo segmentRefreshInfo;
if (indexFileStore.get(segId) == null) {
indexList = new ArrayList<>(1);
segmentRefreshInfo =
new SegmentRefreshInfo(carbonIndexFiles[i].getLastModifiedTime(), 0);
segmentTimestampUpdaterMap.put(segId, segmentRefreshInfo);
} else {
// Entry is already present.
indexList = indexFileStore.get(segId);
segmentRefreshInfo = segmentTimestampUpdaterMap.get(segId);
}
indexList.add(indexFilePath);
if (segmentRefreshInfo.getSegmentUpdatedTimestamp() < carbonIndexFiles[i]
.getLastModifiedTime()) {
segmentRefreshInfo
.setSegmentUpdatedTimestamp(carbonIndexFiles[i].getLastModifiedTime());
}
indexFileStore.put(segId, indexList);
segmentRefreshInfo.setCountOfFileInSegment(indexList.size());
}
}
ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot =
new ReadCommittedIndexFileSnapShot(indexFileStore, segmentTimestampUpdaterMap);
this.readCommittedIndexFileSnapShot = readCommittedIndexFileSnapShot;
prepareLoadMetadata();
}

}
Expand Up @@ -216,7 +216,7 @@ private List<AbstractIndex> getDataBlocks(QueryModel queryModel) throws IOExcept
}
if (null == segmentProperties) {
segmentProperties = new SegmentProperties(fileFooter.getColumnInTable(),
blockInfo.getDetailInfo().getDimLens());
fileFooter.getSegmentInfo().getColumnCardinality());
filePathToSegmentPropertiesMap.put(blockInfo.getFilePath(), segmentProperties);
}
readAndFillBlockletInfo(tableBlockInfos, blockInfo,
Expand Down
32 changes: 12 additions & 20 deletions core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
Expand Up @@ -39,7 +39,6 @@
import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.exception.InvalidConfigurationException;
import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
Expand Down Expand Up @@ -2237,24 +2236,20 @@ static DataType thriftDataTyopeToWrapperDataType(
}
}

public static List<String> getFilePathExternalFilePath(String path) {
public static String getFilePathExternalFilePath(String path) {

// return the list of carbondata files in the given path.
CarbonFile segment = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
CarbonFile[] dataFiles = segment.listFiles(new CarbonFileFilter() {
@Override public boolean accept(CarbonFile file) {

if (file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
return true;
}
return false;
CarbonFile[] dataFiles = segment.listFiles();
for (CarbonFile dataFile : dataFiles) {
if (dataFile.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
return dataFile.getAbsolutePath();
} else if (dataFile.isDirectory()) {
return getFilePathExternalFilePath(dataFile.getAbsolutePath());
}
});
List<String> filePaths = new ArrayList<>(dataFiles.length);
for (CarbonFile dfiles : dataFiles) {
filePaths.add(dfiles.getAbsolutePath());
}
return filePaths;
return null;
}

/**
Expand All @@ -2264,16 +2259,13 @@ public static List<String> getFilePathExternalFilePath(String path) {
*/
public static org.apache.carbondata.format.TableInfo inferSchema(String carbonDataFilePath,
String tableName, boolean isCarbonFileProvider) throws IOException {
List<String> filePaths;
String fistFilePath = null;
if (isCarbonFileProvider) {
filePaths = getFilePathExternalFilePath(carbonDataFilePath + "/Fact/Part0/Segment_null");
fistFilePath = getFilePathExternalFilePath(carbonDataFilePath + "/Fact/Part0/Segment_null");
} else {
filePaths = getFilePathExternalFilePath(carbonDataFilePath);
fistFilePath = getFilePathExternalFilePath(carbonDataFilePath);
}
String fistFilePath = null;
try {
fistFilePath = filePaths.get(0);
} catch (Exception e) {
if (fistFilePath == null) {
// Check if we can infer the schema from the hive metastore.
LOGGER.error("CarbonData file is not present in the table location");
throw new IOException("CarbonData file is not present in the table location");
Expand Down
Expand Up @@ -116,7 +116,10 @@ public List<InputSplit> getSplits(JobContext job) throws IOException {
readCommittedScope = new LatestFilesReadCommittedScope(
identifier.getTablePath() + "/Fact/Part0/Segment_null/");
} else {
readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath());
readCommittedScope = getReadCommittedScope(job.getConfiguration());
if (readCommittedScope == null) {
readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath());
}
}
Expression filter = getFilterPredicates(job.getConfiguration());
// this will be null in case of corrupt schema file.
Expand Down
Expand Up @@ -47,6 +47,7 @@
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.profiler.ExplainCollector;
import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.scan.model.QueryModel;
Expand Down Expand Up @@ -115,6 +116,8 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
private static final String PARTITIONS_TO_PRUNE =
"mapreduce.input.carboninputformat.partitions.to.prune";
private static final String FGDATAMAP_PRUNING = "mapreduce.input.carboninputformat.fgdatamap";
private static final String READ_COMMITTED_SCOPE =
"mapreduce.input.carboninputformat.read.committed.scope";

// record segment number and hit blocks
protected int numSegments = 0;
Expand Down Expand Up @@ -337,6 +340,29 @@ public AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configur
}
}

public static void setReadCommittedScope(Configuration configuration,
ReadCommittedScope committedScope) {
if (committedScope == null) {
return;
}
try {
String subFoldersString = ObjectSerializationUtil.convertObjectToString(committedScope);
configuration.set(READ_COMMITTED_SCOPE, subFoldersString);
} catch (Exception e) {
throw new RuntimeException(
"Error while setting committedScope information to Job" + committedScope, e);
}
}

public static ReadCommittedScope getReadCommittedScope(Configuration configuration)
throws IOException {
String subFoldersString = configuration.get(READ_COMMITTED_SCOPE);
if (subFoldersString != null) {
return (ReadCommittedScope) ObjectSerializationUtil.convertStringToObject(subFoldersString);
}
return null;
}

/**
* {@inheritDoc}
* Configurations FileInputFormat.INPUT_DIR
Expand Down
Expand Up @@ -660,7 +660,10 @@ public ReadCommittedScope getReadCommitted(JobContext job, AbsoluteTableIdentifi
if (job.getConfiguration().getBoolean(CARBON_TRANSACTIONAL_TABLE, true)) {
readCommittedScope = new TableStatusReadCommittedScope(identifier);
} else {
readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath());
readCommittedScope = getReadCommittedScope(job.getConfiguration());
if (readCommittedScope == null) {
readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath());
}
}
this.readCommittedScope = readCommittedScope;
}
Expand Down

0 comments on commit 347b8e1

Please sign in to comment.