Skip to content

Commit

Permalink
Propagate ValueReader improvements to Spark module
Browse files Browse the repository at this point in the history
Fix #271
  • Loading branch information
costin committed Sep 20, 2014
1 parent 8984fc3 commit f936eb4
Showing 1 changed file with 41 additions and 32 deletions.
Expand Up @@ -3,13 +3,17 @@ package org.elasticsearch.spark.serialization
import scala.collection.JavaConverters._
import scala.collection.mutable.LinkedHashMap
import scala.collection.mutable.Map

import org.elasticsearch.hadoop.cfg.Settings
import org.elasticsearch.hadoop.serialization.FieldType
import org.elasticsearch.hadoop.serialization.FieldType._
import org.elasticsearch.hadoop.serialization.Parser
import org.elasticsearch.hadoop.serialization.Parser.Token._
import org.elasticsearch.hadoop.serialization.SettingsAware
import org.elasticsearch.hadoop.serialization.builder.ValueReader
import scala.collection.JavaConversions
import scala.Predef
import java.util.Date


class ScalaValueReader extends ValueReader with SettingsAware {

Expand All @@ -20,60 +24,65 @@ class ScalaValueReader extends ValueReader with SettingsAware {
null
}

if (parser.currentToken() == VALUE_NULL) {
nullValue()
}

esType match {
case NULL => nullValue()
case STRING => textValue(value)
case BYTE => byteValue(value)
case SHORT => shortValue(value)
case INTEGER => intValue(value)
case TOKEN_COUNT => longValue(value)
case LONG => longValue(value)
case FLOAT => floatValue(value)
case DOUBLE => doubleValue(value)
case BOOLEAN => booleanValue(value)
case STRING => textValue(value, parser)
case BYTE => byteValue(value, parser)
case SHORT => shortValue(value, parser)
case INTEGER => intValue(value, parser)
case TOKEN_COUNT => longValue(value, parser)
case LONG => longValue(value, parser)
case FLOAT => floatValue(value, parser)
case DOUBLE => doubleValue(value, parser)
case BOOLEAN => booleanValue(value, parser)
case BINARY => binaryValue(parser.binaryValue())
case DATE => date(value)
case DATE => date(value, parser)
// everything else (IP, GEO) gets translated to strings
case _ => textValue(value)
case _ => textValue(value, parser)
}
}

def checkNull(converter: String => Any, value: String) = {
def checkNull(converter: (String, Parser) => Any, value: String, parser: Parser) = {
if (value != null) {
if (value.isEmpty() && emptyAsNull) {
nullValue()
}
converter(value).asInstanceOf[AnyRef]

converter(value, parser).asInstanceOf[AnyRef]
}
else {
nullValue()
}
}

def nullValue() = { None }
def textValue(value: String) = { checkNull (parseText, value) }
protected def parseText(value:String) = { value }
def textValue(value: String, parser: Parser) = { checkNull (parseText, value, parser) }
protected def parseText(value:String, parser: Parser) = { value }

def byteValue(value: String) = { checkNull (parseByte, value) }
protected def parseByte(value: String) = { value.toByte }
def byteValue(value: String, parser: Parser) = { checkNull (parseByte, value, parser) }
protected def parseByte(value: String, parser:Parser) = { if (parser.currentToken()== VALUE_NUMBER) parser.intValue().toByte else value.toByte }

def shortValue(value: String) = { checkNull (parseShort, value) }
protected def parseShort(value: String) = { value.toShort }
def shortValue(value: String, parser:Parser) = { checkNull (parseShort, value, parser) }
protected def parseShort(value: String, parser:Parser) = { if (parser.currentToken()== VALUE_NUMBER) parser.shortValue().toShort else value.toShort }

def intValue(value: String) = { checkNull(parseInt, value) }
protected def parseInt(value: String) = { value.toInt }
def intValue(value: String, parser:Parser) = { checkNull(parseInt, value, parser) }
protected def parseInt(value: String, parser:Parser) = { if (parser.currentToken()== VALUE_NUMBER) parser.intValue().toInt else value.toInt }

def longValue(value: String) = { checkNull(parseLong, value) }
protected def parseLong(value: String) = { value.toLong }
def longValue(value: String, parser:Parser) = { checkNull(parseLong, value, parser) }
protected def parseLong(value: String, parser:Parser) = { if (parser.currentToken()== VALUE_NUMBER) parser.longValue().toLong else value.toLong }

def floatValue(value: String) = { checkNull(parseFloat, value) }
protected def parseFloat(value: String) = { value.toFloat }
def floatValue(value: String, parser:Parser) = { checkNull(parseFloat, value, parser) }
protected def parseFloat(value: String, parser:Parser) = { if (parser.currentToken()== VALUE_NUMBER) parser.floatValue().toFloat else value.toFloat }

def doubleValue(value: String) = { checkNull(parseDouble, value) }
protected def parseDouble(value: String) = { value.toDouble }
def doubleValue(value: String, parser:Parser) = { checkNull(parseDouble, value, parser) }
protected def parseDouble(value: String, parser:Parser) = { if (parser.currentToken()== VALUE_NUMBER) parser.doubleValue().toDouble else value.toDouble }

def booleanValue(value: String) = { checkNull(parseBoolean, value) }
protected def parseBoolean(value: String) = { value.toBoolean }
def booleanValue(value: String, parser:Parser) = { checkNull(parseBoolean, value, parser) }
protected def parseBoolean(value: String, parser:Parser) = { if (parser.currentToken()== VALUE_BOOLEAN) parser.booleanValue() else value.toBoolean }

def binaryValue(value: Array[Byte]) = {
if (value != null) {
Expand All @@ -88,8 +97,8 @@ class ScalaValueReader extends ValueReader with SettingsAware {
}
protected def parseBinary(value: Array[Byte]) = { value }

def date(value: String) = { checkNull(parseDate, value) }
protected def parseDate(value: String) = { value }
def date(value: String, parser: Parser) = { checkNull(parseDate, value, parser) }
protected def parseDate(value: String, parser:Parser) = { if (parser.currentToken()== VALUE_NUMBER) new Date(parser.longValue()) else value }

def setSettings(settings: Settings) = { emptyAsNull = settings.getFieldReadEmptyAsNull() }

Expand Down

0 comments on commit f936eb4

Please sign in to comment.