Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ public void writeDictionaryData() throws Exception {

public void writeTableDictionaryData(String tableUniqueName) throws Exception {
TableDictionaryGenerator generator = tableMap.get(tableUniqueName);
generator.writeDictionaryData(tableUniqueName);
if (generator != null) {
generator.writeDictionaryData(tableUniqueName);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
* limitations under the License.
*/

package org.apache.carbondata.spark.testsuite.dataload
package org.apache.carbondata.integration.spark.testsuite.dataload

import org.apache.spark.sql.Row
import org.apache.spark.sql.common.util.QueryTest
import org.scalatest.BeforeAndAfterAll

Expand Down Expand Up @@ -103,9 +104,26 @@ class TestLoadDataWithSinglePass extends QueryTest with BeforeAndAfterAll {
)
}

test("test data loading with dctionary exclude") {
sql("DROP TABLE IF EXISTS dict_exclude")
sql(
"""
|CREATE TABLE dict_exclude (ID int, date Timestamp, country String,
|name String, phonetype String, serialname String, salary int)
|STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('DICTIONARY_EXCLUDE'='country,name,serialname,phonetype')
""".stripMargin)
sql(
s"""
|LOAD DATA local inpath '$resourcesPath/source.csv' INTO TABLE dict_exclude
|OPTIONS('DELIMITER'= ',', 'SINGLE_PASS'='FALSE')
""".stripMargin)
checkAnswer(sql("select name from dict_exclude limit 1"),Row("aaa1"))
}

override def afterAll {
sql("DROP TABLE IF EXISTS table_two_pass")
sql("DROP TABLE IF EXISTS table_one_pass")
sql("DROP TABLE IF EXISTS table_one_pass_2")
sql("DROP TABLE IF EXISTS dict_exclude")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -495,16 +495,27 @@ case class LoadTable(
CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
val sparkDriverHost = sqlContext.sparkContext.getConf.get("spark.driver.host")
carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
// start dictionary server when use one pass load.
val server: DictionaryServer = DictionaryServer
.getInstance(dictionaryServerPort.toInt)
carbonLoadModel.setDictionaryServerPort(server.getPort)
// start dictionary server when use one pass load and dimension with DICTIONARY
// encoding is present.
val allDimensions = table.getAllDimensions.asScala.toList
val createDictionary = allDimensions.exists {
carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
!carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
}
val server: Option[DictionaryServer] = if (createDictionary) {
val dictionaryServer = DictionaryServer
.getInstance(dictionaryServerPort.toInt)
carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
Some(dictionaryServer)
} else {
None
}
CarbonDataRDDFactory.loadCarbonData(sqlContext,
carbonLoadModel,
relation.tableMeta.storePath,
columnar,
partitionStatus,
Some(server),
server,
dataFrame,
updateModel)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,7 @@ object CarbonDataRDDFactory {

private def writeDictionary(carbonLoadModel: CarbonLoadModel,
result: Option[DictionaryServer], writeAll: Boolean) = {
// write dictionary file and shutdown dictionary server
// write dictionary file
val uniqueTableName: String = s"${ carbonLoadModel.getDatabaseName }_${
carbonLoadModel.getTableName
}"
Expand All @@ -976,7 +976,7 @@ object CarbonDataRDDFactory {
server.writeTableDictionary(uniqueTableName)
}
} catch {
case ex: Exception =>
case _: Exception =>
LOGGER.error(s"Error while writing dictionary file for $uniqueTableName")
throw new Exception("Dataload failed due to error while writing dictionary file!")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,16 +504,27 @@ case class LoadTable(
val sparkDriverHost = sparkSession.sqlContext.sparkContext.
getConf.get("spark.driver.host")
carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
// start dictionary server when use one pass load.
val server: DictionaryServer = DictionaryServer
.getInstance(dictionaryServerPort.toInt)
carbonLoadModel.setDictionaryServerPort(server.getPort)
// start dictionary server when use one pass load and dimension with DICTIONARY
// encoding is present.
val allDimensions = table.getAllDimensions.asScala.toList
val createDictionary = allDimensions.exists {
carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
!carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
}
val server: Option[DictionaryServer] = if (createDictionary) {
val dictionaryServer = DictionaryServer
.getInstance(dictionaryServerPort.toInt)
carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
Some(dictionaryServer)
} else {
None
}
CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
carbonLoadModel,
relation.tableMeta.storePath,
columnar,
partitionStatus,
Some(server),
server,
dataFrame,
updateModel)
}
Expand Down