Skip to content

Commit

Permalink
Fixed comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ravipesala committed Aug 23, 2018
1 parent f773e5b commit 5c08056
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 48 deletions.
Expand Up @@ -36,55 +36,47 @@
*/
@InterfaceAudience.Internal
@InterfaceStability.Stable
public class LatestFilesReadCommittedScope
implements ReadCommittedScope {
public class LatestFilesReadCommittedScope implements ReadCommittedScope {

private String carbonFilePath;
private String segmentId;
private ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot;
private LoadMetadataDetails[] loadMetadataDetails;
private String[] subFolders;
private String[] dataFolders;

/**
* a new constructor of this class
*
* @param path carbon file path
* @param segmentId segment id
*/
public LatestFilesReadCommittedScope(String path, String segmentId) {
public LatestFilesReadCommittedScope(String path, String segmentId) throws IOException {
Objects.requireNonNull(path);
this.carbonFilePath = path;
this.segmentId = segmentId;
try {
takeCarbonIndexFileSnapShot();
} catch (IOException ex) {
throw new RuntimeException("Error while taking index snapshot", ex);
}
takeCarbonIndexFileSnapShot();
}

/**
* a new constructor with path
*
* @param path carbon file path
*/
public LatestFilesReadCommittedScope(String path) {
public LatestFilesReadCommittedScope(String path) throws IOException {
this(path, (String) null);
}

/**
* a new constructor with path
*
* @param path carbon file path
* @param dataFolders Folders where carbondata files exists
*/
public LatestFilesReadCommittedScope(String path, String[] subFolders) {
public LatestFilesReadCommittedScope(String path, String[] dataFolders) throws IOException {
Objects.requireNonNull(path);
this.carbonFilePath = path;
this.subFolders = subFolders;
try {
takeCarbonIndexFileSnapShot();
} catch (IOException ex) {
throw new RuntimeException("Error while taking index snapshot", ex);
}
this.dataFolders = dataFolders;
takeCarbonIndexFileSnapShot();
}

private void prepareLoadMetadata() {
Expand Down Expand Up @@ -184,9 +176,9 @@ private String getSegmentID(String carbonIndexFileName, String indexFilePath) {
CarbonFile[] carbonIndexFiles = null;
if (file.isDirectory()) {
if (segmentId == null) {
if (subFolders != null) {
if (dataFolders != null) {
List<CarbonFile> allIndexFiles = new ArrayList<>();
for (String subFolder : subFolders) {
for (String subFolder : dataFolders) {
CarbonFile[] files = SegmentIndexFileStore.getCarbonIndexFiles(subFolder);
allIndexFiles.addAll(Arrays.asList(files));
}
Expand Down
Expand Up @@ -117,7 +117,7 @@ public List<InputSplit> getSplits(JobContext job) throws IOException {
identifier.getTablePath() + "/Fact/Part0/Segment_null/");
} else {
readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath(),
getSubFoldersToRead(job.getConfiguration()));
getDataFoldersToRead(job.getConfiguration()));
}
Expression filter = getFilterPredicates(job.getConfiguration());
// this will be null in case of corrupt schema file.
Expand Down
Expand Up @@ -116,7 +116,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
private static final String PARTITIONS_TO_PRUNE =
"mapreduce.input.carboninputformat.partitions.to.prune";
private static final String FGDATAMAP_PRUNING = "mapreduce.input.carboninputformat.fgdatamap";
private static final String SUB_FOLDERS = "mapreduce.input.carboninputformat.subfolders";
private static final String DATA_FOLDERS = "mapreduce.input.carboninputformat.datafolders";

// record segment number and hit blocks
protected int numSegments = 0;
Expand Down Expand Up @@ -339,23 +339,23 @@ public AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configur
}
}

public static void setSubFoldersToRead(Configuration configuration, String[] subFoldersToRead) {
if (subFoldersToRead == null) {
public static void setDataFoldersToRead(Configuration configuration, String[] dataFoldersToRead) {
if (dataFoldersToRead == null) {
return;
}
try {
String subFoldersString =
ObjectSerializationUtil.convertObjectToString(subFoldersToRead);
configuration.set(SUB_FOLDERS, subFoldersString);
ObjectSerializationUtil.convertObjectToString(dataFoldersToRead);
configuration.set(DATA_FOLDERS, subFoldersString);
} catch (Exception e) {
throw new RuntimeException(
"Error while setting subfolders information to Job" + Arrays.toString(subFoldersToRead),
"Error while setting subfolders information to Job" + Arrays.toString(dataFoldersToRead),
e);
}
}

public static String[] getSubFoldersToRead(Configuration configuration) throws IOException {
String subFoldersString = configuration.get(SUB_FOLDERS);
public static String[] getDataFoldersToRead(Configuration configuration) throws IOException {
String subFoldersString = configuration.get(DATA_FOLDERS);
if (subFoldersString != null) {
return (String[]) ObjectSerializationUtil.convertStringToObject(subFoldersString);
}
Expand Down
Expand Up @@ -661,7 +661,7 @@ public ReadCommittedScope getReadCommitted(JobContext job, AbsoluteTableIdentifi
readCommittedScope = new TableStatusReadCommittedScope(identifier);
} else {
readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath(),
getSubFoldersToRead(job.getConfiguration()));
getDataFoldersToRead(job.getConfiguration()));
}
this.readCommittedScope = readCommittedScope;
}
Expand Down
Expand Up @@ -95,7 +95,8 @@ class CarbonFileIndex(
// Check for any subfolders are present here.
if (!rootPaths.head.equals(new Path(tablePath.get)) &&
rootPaths.head.toString.contains(tablePath.get)) {
CarbonInputFormat.setSubFoldersToRead(hadoopConf, rootPaths.map(_.toUri.toString).toArray)
CarbonInputFormat.setDataFoldersToRead(hadoopConf,
rootPaths.map(_.toUri.toString).toArray)
}
}
filter match {
Expand Down
Expand Up @@ -48,12 +48,10 @@ import org.apache.carbondata.core.indexstore.BlockletDetailInfo
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, ColumnarFormatVersion}
import org.apache.carbondata.core.metadata.schema.SchemaReader
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.reader.CarbonHeaderReader
import org.apache.carbondata.core.scan.expression.{Expression => CarbonExpression}
import org.apache.carbondata.core.scan.expression.logical.AndExpression
import org.apache.carbondata.core.statusmanager.{FileFormat => CarbonFileFormatVersion}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection, CarbonRecordReader}
import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat, CarbonTableOutputFormat}
Expand Down Expand Up @@ -158,9 +156,11 @@ class SparkCarbonFileFormat extends FileFormat
context: TaskAttemptContext,
fieldTypes: Array[StructField]) extends OutputWriter with AbstractCarbonOutputWriter {

val writable = new ObjectArrayWritable
private val writable = new ObjectArrayWritable

val recordWriter: RecordWriter[NullWritable, ObjectArrayWritable] =
private val cutOffDate = Integer.MAX_VALUE >> 1

private val recordWriter: RecordWriter[NullWritable, ObjectArrayWritable] =
new CarbonTableOutputFormat().getRecordWriter(context)

/**
Expand Down Expand Up @@ -193,31 +193,51 @@ class SparkCarbonFileFormat extends FileFormat
data(i) = new StructObject(extractData(row.getStruct(i, s.fields.length), s.fields))
case s: ArrayType =>
data(i) = new ArrayObject(extractData(row.getArray(i), s.elementType))
case d: DateType =>
data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef]
case d: TimestampType =>
data(i) = (row.getLong(i) / 1000).asInstanceOf[AnyRef]
case other =>
data(i) = row.get(i, other)
}
} else {
setNull(fieldTypes(i).dataType, data, i)
}
i += 1
}
data
}

private def setNull(dataType: DataType, data: Array[AnyRef], i: Int) = {
dataType match {
case d: DateType =>
// 1 as treated as null in carbon
data(i) = 1.asInstanceOf[AnyRef]
case _ =>
}
}

/**
* Convert the internal row to carbondata understandable object
*/
private def extractData(row: ArrayData, fieldType: DataType): Array[AnyRef] = {
private def extractData(row: ArrayData, dataType: DataType): Array[AnyRef] = {
val data = new Array[AnyRef](row.numElements())
var i = 0
while (i < data.length) {

fieldType match {
case d: DecimalType =>
data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
case s: StructType =>
data(i) = new StructObject(extractData(row.getStruct(i, s.fields.length), s.fields))
case s: ArrayType =>
data(i) = new ArrayObject(extractData(row.getArray(i), s.elementType))
case other => data(i) = row.get(i, fieldType)
if (!row.isNullAt(i)) {
dataType match {
case d: DecimalType =>
data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
case s: StructType =>
data(i) = new StructObject(extractData(row.getStruct(i, s.fields.length), s.fields))
case s: ArrayType =>
data(i) = new ArrayObject(extractData(row.getArray(i), s.elementType))
case d: DateType =>
data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef]
case other => data(i) = row.get(i, dataType)
}
} else {
setNull(dataType, data, i)
}
i += 1
}
Expand Down
Expand Up @@ -219,15 +219,61 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
frame.write.format("carbon").save(warehouse1 + "/test_carbon_folder")
val dfread = spark.read.format("carbon").load(warehouse1 + "/test_carbon_folder")
dfread.show(false)
// TestUtil
// .checkAnswer(spark.sql("select * from carbon_table"),
// spark.sql("select * from parquet_table"))
FileFactory
.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_carbon_folder"))
spark.sql("drop table if exists parquet_table")
}


test("test read and write with date datatype") {
spark.sql("drop table if exists date_table")
spark.sql("drop table if exists date_parquet_table")
spark.sql("create table date_table(empno int, empname string, projdate Date) using carbon")
spark.sql("insert into date_table select 11, 'ravi', '2017-11-11'")
spark.sql("create table date_parquet_table(empno int, empname string, projdate Date) using parquet")
spark.sql("insert into date_parquet_table select 11, 'ravi', '2017-11-11'")
checkAnswer(spark.sql("select * from date_table"), spark.sql("select * from date_parquet_table"))
spark.sql("drop table if exists date_table")
spark.sql("drop table if exists date_parquet_table")
}

test("test read and write with date datatype with wrong format") {
spark.sql("drop table if exists date_table")
spark.sql("drop table if exists date_parquet_table")
spark.sql("create table date_table(empno int, empname string, projdate Date) using carbon")
spark.sql("insert into date_table select 11, 'ravi', '11-11-2017'")
spark.sql("create table date_parquet_table(empno int, empname string, projdate Date) using parquet")
spark.sql("insert into date_parquet_table select 11, 'ravi', '11-11-2017'")
checkAnswer(spark.sql("select * from date_table"), spark.sql("select * from date_parquet_table"))
spark.sql("drop table if exists date_table")
spark.sql("drop table if exists date_parquet_table")
}

test("test read and write with timestamp datatype") {
spark.sql("drop table if exists date_table")
spark.sql("drop table if exists date_parquet_table")
spark.sql("create table date_table(empno int, empname string, projdate timestamp) using carbon")
spark.sql("insert into date_table select 11, 'ravi', '2017-11-11 00:00:01'")
spark.sql("create table date_parquet_table(empno int, empname string, projdate timestamp) using parquet")
spark.sql("insert into date_parquet_table select 11, 'ravi', '2017-11-11 00:00:01'")
checkAnswer(spark.sql("select * from date_table"), spark.sql("select * from date_parquet_table"))
spark.sql("drop table if exists date_table")
spark.sql("drop table if exists date_parquet_table")
}

test("test read and write with timestamp datatype with wrong format") {
spark.sql("drop table if exists date_table")
spark.sql("drop table if exists date_parquet_table")
spark.sql("create table date_table(empno int, empname string, projdate timestamp) using carbon")
spark.sql("insert into date_table select 11, 'ravi', '11-11-2017 00:00:01'")
spark.sql("create table date_parquet_table(empno int, empname string, projdate timestamp) using parquet")
spark.sql("insert into date_parquet_table select 11, 'ravi', '11-11-2017 00:00:01'")
checkAnswer(spark.sql("select * from date_table"), spark.sql("select * from date_parquet_table"))
spark.sql("drop table if exists date_table")
spark.sql("drop table if exists date_parquet_table")
}


override protected def beforeAll(): Unit = {
drop
}
Expand Down

0 comments on commit 5c08056

Please sign in to comment.