From f936eb476e8402d7afe0654e0385ddd94b158b1d Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Sat, 20 Sep 2014 16:20:22 +0300 Subject: [PATCH] Propagate ValueReader improvements to Spark module Fix #271 --- .../serialization/ScalaValueReader.scala | 73 +++++++++++-------- 1 file changed, 41 insertions(+), 32 deletions(-) diff --git a/spark/src/main/scala/org/elasticsearch/spark/serialization/ScalaValueReader.scala b/spark/src/main/scala/org/elasticsearch/spark/serialization/ScalaValueReader.scala index 55cb3b30b..70d0c92ea 100644 --- a/spark/src/main/scala/org/elasticsearch/spark/serialization/ScalaValueReader.scala +++ b/spark/src/main/scala/org/elasticsearch/spark/serialization/ScalaValueReader.scala @@ -3,13 +3,17 @@ package org.elasticsearch.spark.serialization import scala.collection.JavaConverters._ import scala.collection.mutable.LinkedHashMap import scala.collection.mutable.Map - import org.elasticsearch.hadoop.cfg.Settings import org.elasticsearch.hadoop.serialization.FieldType import org.elasticsearch.hadoop.serialization.FieldType._ import org.elasticsearch.hadoop.serialization.Parser +import org.elasticsearch.hadoop.serialization.Parser.Token._ import org.elasticsearch.hadoop.serialization.SettingsAware import org.elasticsearch.hadoop.serialization.builder.ValueReader +import scala.collection.JavaConversions +import scala.Predef +import java.util.Date + class ScalaValueReader extends ValueReader with SettingsAware { @@ -20,30 +24,35 @@ class ScalaValueReader extends ValueReader with SettingsAware { null } + if (parser.currentToken() == VALUE_NULL) { + nullValue() + } + esType match { case NULL => nullValue() - case STRING => textValue(value) - case BYTE => byteValue(value) - case SHORT => shortValue(value) - case INTEGER => intValue(value) - case TOKEN_COUNT => longValue(value) - case LONG => longValue(value) - case FLOAT => floatValue(value) - case DOUBLE => doubleValue(value) - case BOOLEAN => booleanValue(value) + case STRING => textValue(value, parser) + case BYTE => byteValue(value, parser) + case SHORT => shortValue(value, parser) + case INTEGER => intValue(value, parser) + case TOKEN_COUNT => longValue(value, parser) + case LONG => longValue(value, parser) + case FLOAT => floatValue(value, parser) + case DOUBLE => doubleValue(value, parser) + case BOOLEAN => booleanValue(value, parser) case BINARY => binaryValue(parser.binaryValue()) - case DATE => date(value) + case DATE => date(value, parser) // everything else (IP, GEO) gets translated to strings - case _ => textValue(value) + case _ => textValue(value, parser) } } - def checkNull(converter: String => Any, value: String) = { + def checkNull(converter: (String, Parser) => Any, value: String, parser: Parser) = { if (value != null) { if (value.isEmpty() && emptyAsNull) { nullValue() } - converter(value).asInstanceOf[AnyRef] + + converter(value, parser).asInstanceOf[AnyRef] } else { nullValue() @@ -51,29 +60,29 @@ class ScalaValueReader extends ValueReader with SettingsAware { } def nullValue() = { None } - def textValue(value: String) = { checkNull (parseText, value) } - protected def parseText(value:String) = { value } + def textValue(value: String, parser: Parser) = { checkNull (parseText, value, parser) } + protected def parseText(value:String, parser: Parser) = { value } - def byteValue(value: String) = { checkNull (parseByte, value) } - protected def parseByte(value: String) = { value.toByte } + def byteValue(value: String, parser: Parser) = { checkNull (parseByte, value, parser) } + protected def parseByte(value: String, parser:Parser) = { if (parser.currentToken()== VALUE_NUMBER) parser.intValue().toByte else value.toByte } - def shortValue(value: String) = { checkNull (parseShort, value) } - protected def parseShort(value: String) = { value.toShort } + def shortValue(value: String, parser:Parser) = { checkNull (parseShort, value, parser) } + protected def parseShort(value: String, parser:Parser) = { if (parser.currentToken()== VALUE_NUMBER) parser.shortValue().toShort else value.toShort } - def intValue(value: String) = { checkNull(parseInt, value) } - protected def parseInt(value: String) = { value.toInt } + def intValue(value: String, parser:Parser) = { checkNull(parseInt, value, parser) } + protected def parseInt(value: String, parser:Parser) = { if (parser.currentToken()== VALUE_NUMBER) parser.intValue().toInt else value.toInt } - def longValue(value: String) = { checkNull(parseLong, value) } - protected def parseLong(value: String) = { value.toLong } + def longValue(value: String, parser:Parser) = { checkNull(parseLong, value, parser) } + protected def parseLong(value: String, parser:Parser) = { if (parser.currentToken()== VALUE_NUMBER) parser.longValue().toLong else value.toLong } - def floatValue(value: String) = { checkNull(parseFloat, value) } - protected def parseFloat(value: String) = { value.toFloat } + def floatValue(value: String, parser:Parser) = { checkNull(parseFloat, value, parser) } + protected def parseFloat(value: String, parser:Parser) = { if (parser.currentToken()== VALUE_NUMBER) parser.floatValue().toFloat else value.toFloat } - def doubleValue(value: String) = { checkNull(parseDouble, value) } - protected def parseDouble(value: String) = { value.toDouble } + def doubleValue(value: String, parser:Parser) = { checkNull(parseDouble, value, parser) } + protected def parseDouble(value: String, parser:Parser) = { if (parser.currentToken()== VALUE_NUMBER) parser.doubleValue().toDouble else value.toDouble } - def booleanValue(value: String) = { checkNull(parseBoolean, value) } - protected def parseBoolean(value: String) = { value.toBoolean } + def booleanValue(value: String, parser:Parser) = { checkNull(parseBoolean, value, parser) } + protected def parseBoolean(value: String, parser:Parser) = { if (parser.currentToken()== VALUE_BOOLEAN) parser.booleanValue() else value.toBoolean } def binaryValue(value: Array[Byte]) = { if (value != null) { @@ -88,8 +97,8 @@ class ScalaValueReader extends ValueReader with SettingsAware { } protected def parseBinary(value: Array[Byte]) = { value } - def date(value: String) = { checkNull(parseDate, value) } - protected def parseDate(value: String) = { value } + def date(value: String, parser: Parser) = { checkNull(parseDate, value, parser) } + protected def parseDate(value: String, parser:Parser) = { if (parser.currentToken()== VALUE_NUMBER) new Date(parser.longValue()) else value } def setSettings(settings: Settings) = { emptyAsNull = settings.getFieldReadEmptyAsNull() }