Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CARBONDATA-3889] Cleanup code typo in carbondata-spark module #3867

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ object CarbonStore {

/*
* Collect all stage files and matched success files and loading files.
* return unloaded stagefiles and loading stagefiles in the end.
* return unloaded stage files and loading stage files in the end.
*/
def listStageFiles(
loadDetailsDir: String): (Array[CarbonFile], Array[CarbonFile]) = {
Expand Down Expand Up @@ -343,7 +343,7 @@ object CarbonStore {
if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
metadataDetail.getSegmentFile == null) {
val loadStartTime: Long = metadataDetail.getLoadStartTime
// delete all files of @loadStartTime from tablepath
// delete all files of @loadStartTime from table path
cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
partitionSpecList.foreach {
partitionSpec =>
Expand All @@ -352,7 +352,7 @@ object CarbonStore {
if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
val partitionCarbonFile = FileFactory
.getCarbonFile(partitionLocation.toString)
// list all files from partitionLoacation
// list all files from partitionLocation
val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
// delete all files of @loadStartTime from externalPath
cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
Expand Down Expand Up @@ -382,29 +382,29 @@ object CarbonStore {
}

// validates load ids
private def validateLoadIds(loadids: Seq[String]): Unit = {
if (loadids.isEmpty) {
private def validateLoadIds(loadIds: Seq[String]): Unit = {
if (loadIds.isEmpty) {
val errorMessage = "Error: Segment id(s) should not be empty."
throw new MalformedCarbonCommandException(errorMessage)
}
}

// TODO: move dbName and tableName to caller, caller should handle the log and error
def deleteLoadById(
loadids: Seq[String],
loadIds: Seq[String],
dbName: String,
tableName: String,
carbonTable: CarbonTable): Unit = {

validateLoadIds(loadids)
validateLoadIds(loadIds)

val path = carbonTable.getMetadataPath

try {
val invalidLoadIds = SegmentStatusManager.updateDeletionStatus(
carbonTable.getAbsoluteTableIdentifier, loadids.asJava, path).asScala
carbonTable.getAbsoluteTableIdentifier, loadIds.asJava, path).asScala
if (invalidLoadIds.isEmpty) {
LOGGER.info(s"Delete segment by Id is successfull for $dbName.$tableName.")
LOGGER.info(s"Delete segment by Id is successful for $dbName.$tableName.")
} else {
sys.error(s"Delete segment by Id is failed. Invalid ID is: ${invalidLoadIds.mkString(",")}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ case class AlterTableCompactionPreEvent(sparkSession: SparkSession,
mergedLoadName: String) extends Event with AlterTableCompactionEventInfo

/**
* Compaction Event for handling pre update status file opeartions, lister has to implement this
* Compaction Event for handling pre update status file operations, lister has to implement this
* event before updating the table status file
* @param sparkSession
* @param carbonTable
Expand All @@ -166,7 +166,7 @@ case class AlterTableCompactionPostEvent(sparkSession: SparkSession,
carbonMergerMapping: CarbonMergerMapping,
compactedLoads: java.util.List[String]) extends Event with AlterTableCompactionEventInfo
/**
* Compaction Event for handling pre update status file opeartions, lister has to implement this
* Compaction Event for handling pre update status file operations, lister has to implement this
* event before updating the table status file
* @param sparkSession
* @param carbonTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
* @param sparkSession
*/
case class DeleteSegmentByIdPreEvent(carbonTable: CarbonTable, loadIds: Seq[String],
sparkSession: SparkSession) extends Event with DeleteSegmentbyIdEventInfo
sparkSession: SparkSession) extends Event with DeleteSegmentByIdEventInfo


/**
Expand All @@ -37,7 +37,7 @@ case class DeleteSegmentByIdPreEvent(carbonTable: CarbonTable, loadIds: Seq[Stri
* @param sparkSession
*/
case class DeleteSegmentByIdPostEvent(carbonTable: CarbonTable, loadIds: Seq[String],
sparkSession: SparkSession) extends Event with DeleteSegmentbyIdEventInfo
sparkSession: SparkSession) extends Event with DeleteSegmentByIdEventInfo


/**
Expand All @@ -47,7 +47,7 @@ case class DeleteSegmentByIdPostEvent(carbonTable: CarbonTable, loadIds: Seq[Str
* @param sparkSession
*/
case class DeleteSegmentByIdAbortEvent(carbonTable: CarbonTable, loadIds: Seq[String],
sparkSession: SparkSession) extends Event with DeleteSegmentbyIdEventInfo
sparkSession: SparkSession) extends Event with DeleteSegmentByIdEventInfo

/**
*
Expand All @@ -56,7 +56,7 @@ case class DeleteSegmentByIdAbortEvent(carbonTable: CarbonTable, loadIds: Seq[St
* @param sparkSession
*/
case class DeleteSegmentByDatePreEvent(carbonTable: CarbonTable, loadDates: String,
sparkSession: SparkSession) extends Event with DeleteSegmentbyDateEventInfo
sparkSession: SparkSession) extends Event with DeleteSegmentByDateEventInfo

/**
*
Expand All @@ -65,7 +65,7 @@ case class DeleteSegmentByDatePreEvent(carbonTable: CarbonTable, loadDates: Stri
* @param sparkSession
*/
case class DeleteSegmentByDatePostEvent(carbonTable: CarbonTable, loadDates: String,
sparkSession: SparkSession) extends Event with DeleteSegmentbyDateEventInfo
sparkSession: SparkSession) extends Event with DeleteSegmentByDateEventInfo

/**
*
Expand All @@ -74,4 +74,4 @@ case class DeleteSegmentByDatePostEvent(carbonTable: CarbonTable, loadDates: Str
* @param sparkSession
*/
case class DeleteSegmentByDateAbortEvent(carbonTable: CarbonTable, loadDates: String,
sparkSession: SparkSession) extends Event with DeleteSegmentbyDateEventInfo
sparkSession: SparkSession) extends Event with DeleteSegmentByDateEventInfo
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,15 @@ trait AlterTableHivePartitionInfo {
/**
* event for DeleteSegmentById
*/
trait DeleteSegmentbyIdEventInfo {
trait DeleteSegmentByIdEventInfo {
val carbonTable: CarbonTable
val loadIds: Seq[String]
}

/**
* event for DeleteSegmentByDate
*/
trait DeleteSegmentbyDateEventInfo {
trait DeleteSegmentByDateEventInfo {
val carbonTable: CarbonTable
val loadDates: String
}
Expand Down Expand Up @@ -208,7 +208,7 @@ trait BuildIndexEventsInfo {
}

/**
* EventInfo for prepriming on IndexServer. This event is used to
* EventInfo for pre-priming on IndexServer. This event is used to
* fire a call to the index server when the load is complete.
*/
trait IndexServerEventInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import org.apache.spark.sql.SparkSession
import org.apache.carbondata.core.index.Segment
import org.apache.carbondata.core.metadata.schema.table.CarbonTable

// Event for Prepriming in cache
// Event for Pre-priming in cache
case class IndexServerLoadEvent(sparkSession: SparkSession,
carbonTable: CarbonTable,
segment: List[Segment],
invalidsegment: List[String]) extends Event with IndexServerEventInfo
invalidSegment: List[String]) extends Event with IndexServerEventInfo

case class IndexServerEvent(sparkSession: SparkSession,
carbonTable: CarbonTable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.carbondata.spark.rdd.CarbonRDD


/**
* RDD to merge all bloomindex files of specified segment for bloom index
* RDD to merge all bloom index files of specified segment for bloom index
*/
class CarbonMergeBloomIndexFilesRDD(
@transient private val ss: SparkSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class RawBytesReadSupport(segmentProperties: SegmentProperties, indexColumns: Ar
*/
override def readRow(data: Array[Object]): Array[Object] = {

val surrogatKeys = if (segmentProperties.getNumberOfDictDimensions > 0) {
val surrogateKeys = if (segmentProperties.getNumberOfDictDimensions > 0) {
ByteUtil.convertBytesToLongArray(
data(0).asInstanceOf[ByteArrayWrapper].getDictionaryKey)
} else {
Expand All @@ -207,7 +207,7 @@ class RawBytesReadSupport(segmentProperties: SegmentProperties, indexColumns: Ar
val rtn = new Array[Object](indexColumns.length + 3)
indexColumns.zipWithIndex.foreach { case (col, i) =>
rtn(i) = if (indexCol2IdxInDictArray.contains(col.getColName)) {
surrogatKeys(indexCol2IdxInDictArray(col.getColName)).toInt.asInstanceOf[Integer]
surrogateKeys(indexCol2IdxInDictArray(col.getColName)).toInt.asInstanceOf[Integer]
} else if (indexCol2IdxInNoDictArray.contains(col.getColName)) {
val bytes = data(0).asInstanceOf[ByteArrayWrapper].getNoDictionaryKeyByIndex(
indexCol2IdxInNoDictArray(col.getColName))
Expand Down Expand Up @@ -316,7 +316,7 @@ class IndexRebuildRDD[K, V](
reader = new CarbonRecordReader[Array[Object]](model, readSupport, inputMetrics,
attemptContext.getConfiguration)
reader.initialize(inputSplit, attemptContext)
// skip clear indexSchema and we will do this adter rebuild
// skip clear indexSchema and we will do this after rebuild
reader.setSkipClearIndexAtClose(true)

// Note that indexSchema rebuilding is based on query, the blockletId in rowWithPosition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ object DistributedRDDUtils {
}

/**
* This function creates an event for prepriming of the index server
* This function creates an event for pre-priming of the index server
*/
def triggerPrepriming(sparkSession: SparkSession,
carbonTable: CarbonTable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class DistributedIndexJob extends AbstractIndexJob {
.createTempFolderForIndexServer(indexFormat.getQueryId)
LOGGER
.info("Temp folder path for Query ID: " + indexFormat.getQueryId + " is " + splitFolderPath)
val (resonse, time) = logTime {
val (response, time) = logTime {
try {
val spark = SparkSQLUtil.getSparkSession
indexFormat.setTaskGroupId(SparkSQLUtil.getTaskGroupId(spark))
Expand All @@ -73,34 +73,33 @@ class DistributedIndexJob extends AbstractIndexJob {
}
}
LOGGER.info(s"Time taken to get response from server: $time ms")
resonse
response
}

/**
* Iterate over FiltersReslover,
* a. Change only RowLevelFilterResolverImpl because SparkUnkown is part of it
* and others FilterReslover like ConditionalFilterResolverImpl so directly return.
* b. Change SparkUnkownExpression to TrueExpression so that isScanRequired
* Iterate over FilterResolver,
* a. Change only RowLevelFilterResolverImpl because SparkUnknown is part of it
* and other FilterResolver like ConditionalFilterResolverImpl so directly return.
* b. Change SparkUnknownExpression to TrueExpression so that isScanRequired
* selects block/blocklet.
*
* @param filterInf FiltersReslover to be changed
* @param tableIdentifer AbsoluteTableIdentifier object
* @param filterProcessor changed FiltersReslover.
* @return
* @param filterInf FilterResolver to be changed
* @param tableIdentifier AbsoluteTableIdentifier object
* @param filterProcessor FilterExpressionProcessor
* @return changed FilterResolver.
*/
def removeSparkUnknown(filterInf: FilterResolverIntf,
tableIdentifer: AbsoluteTableIdentifier,
filterProcessor: FilterExpressionProcessor): FilterResolverIntf = {
def removeSparkUnknown(filterInf: FilterResolverIntf, tableIdentifier: AbsoluteTableIdentifier,
filterProcessor: FilterExpressionProcessor): FilterResolverIntf = {
if (filterInf.isInstanceOf[LogicalFilterResolverImpl]) {
return new LogicalFilterResolverImpl(
removeSparkUnknown(filterInf.getLeft, tableIdentifer, filterProcessor),
removeSparkUnknown(filterInf.getRight, tableIdentifer, filterProcessor),
removeSparkUnknown(filterInf.getLeft, tableIdentifier, filterProcessor),
removeSparkUnknown(filterInf.getRight, tableIdentifier, filterProcessor),
filterProcessor.removeUnknownExpression(filterInf.getFilterExpression).
asInstanceOf[BinaryExpression])
}
if (filterInf.isInstanceOf[RowLevelFilterResolverImpl] &&
filterInf.getFilterExpression.getFilterExpressionType == ExpressionType.UNKNOWN) {
return filterProcessor.changeUnknownResolverToTrue(tableIdentifer)
return filterProcessor.changeUnknownResolverToTrue(tableIdentifier)
}
filterInf
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ object IndexServer extends ServerInterface {
val sparkSession = SparkSQLUtil.getSparkSession
val databaseName = carbonTable.getDatabaseName
val tableName = carbonTable.getTableName
val jobgroup: String = " Invalided Segment Cache for " + databaseName + "." + tableName
sparkSession.sparkContext.setLocalProperty("spark.job.description", jobgroup)
val jobGroup: String = " Invalided Segment Cache for " + databaseName + "." + tableName
sparkSession.sparkContext.setLocalProperty("spark.job.description", jobGroup)
sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", jobGroupId)
if (!isFallBack) {
val indexServerEvent = IndexServerEvent(sparkSession,
Expand All @@ -213,7 +213,7 @@ object IndexServer extends ServerInterface {

override def showCache(tableId: String = "", executorCache: Boolean): Array[String] = {
doAs {
val jobgroup: String = "Show Cache " + (tableId match {
val jobGroup: String = "Show Cache " + (tableId match {
case "" =>
if (executorCache) {
"for all the Executors."
Expand All @@ -224,7 +224,7 @@ object IndexServer extends ServerInterface {
})
val sparkSession = SparkSQLUtil.getSparkSession
sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", UUID.randomUUID().toString)
sparkSession.sparkContext.setLocalProperty("spark.job.description", jobgroup)
sparkSession.sparkContext.setLocalProperty("spark.job.description", jobGroup)
new DistributedShowCacheRDD(sparkSession, tableId, executorCache).collect()
}
}
Expand Down Expand Up @@ -306,7 +306,7 @@ object IndexServer extends ServerInterface {
}

/**
* This class to define the acl for indexserver ,similar to HDFSPolicyProvider.
* This class to define the acl for index server ,similar to HDFSPolicyProvider.
* key in Service can be configured in hadoop-policy.xml or in Configuration().This ACL
* will be used for Authorization in
* org.apache.hadoop.security.authorize.ServiceAuthorizationManager#authorize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ class updateResultImpl
}
}

trait DeleteDelataResult[K, V] extends Serializable {
trait DeleteDelateResult[K, V] extends Serializable {
def getKey(key: SegmentStatus, value: (SegmentUpdateDetails, ExecutionErrors, Long)): (K, V)
}

class DeleteDelataResultImpl
extends DeleteDelataResult[SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long)] {
class DeleteDeltaResultImpl
extends DeleteDelateResult[SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long)] {
override def getKey(key: SegmentStatus,
value: (SegmentUpdateDetails, ExecutionErrors, Long)): (SegmentStatus, (SegmentUpdateDetails,
ExecutionErrors, Long)) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object CsvRDDHelper {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)

/**
* createsw a RDD that does reading of multiple CSV files
* create a RDD that does reading of multiple CSV files
*/
def csvFileScanRDD(
spark: SparkSession,
Expand Down