Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

saveToEs saves fields with NULL values #792

Closed
ssemichev opened this issue Jun 22, 2016 · 3 comments
Closed

saveToEs saves fields with NULL values #792

ssemichev opened this issue Jun 22, 2016 · 3 comments

Comments

@ssemichev
Copy link

ssemichev commented Jun 22, 2016

Issue description

I don't want to index fields with NULL values. It looks like a bug.

Original JSON document that I read from a file doesn't contain NULL values. A mapping doesn't use null_value also. After data loaded in data frame some fields like name.prefix, addresses.address2 are NULLs and these fields are created in ES index.

Partial result from ES

                     "address1": "156 JUSTIN AVE",
                     "address2": null,
                     "asofdate": null,
                     "captured_at": "2016-06-10t05:34:01.616z",
                     "cd": "01",

Steps to reproduce

Code:

val personsDF = sqlContext.read.schema(sampleDataDF.schema).json("data-file")
val parentsDF = personsDF.select("id", "sources", "captured_at", "identities", "names", "addresses")
parentDF.saveToEs("persons/person", config)

parentsDF schema:

parentDF: org.apache.spark.sql.DataFrame = [id: string, sources: array<string>, captured_at: string, identities: struct<hhseq:bigint,m_matchkey:string,nb_id:string,r_matchkey:string,rnc_regid:string,strat_id:bigint,voter_id:string>, names: array<struct<captured_at:string,first_name:string,hash_value:string,instances:bigint,last_name:string,middle_name:string,owners:array<string>,prefix:string,sources:array<string>,suffix:string,value:string>>, addresses: array<struct<address1:string,address2:string,asofdate:string,captured_at:string,cd:string,cen_block:string,cen_tract:string,city:string,congressional_districts:array<string>,crc:string,dpc:string,fips:string,hash_value:string,instances:bigint,lat:double,ld:string,location:array<double>,lon:double,mail_score:bigint,owners:array<string>,raw_value:string,sd:string,sources:array<string>,state:string,street1:string,street2:string,type:string,value:string,zip:string,zip4:string>>]

parentsDF sample - parentsDF.take(1)

rres5: Array[org.apache.spark.sql.Row] = Array([546db9f0de8e558dacced669cc8d15b7,WrappedArray(value),2016-06-10t05:34:01.209z,[1111,null,111-08d1-11e2-b0f9-111,0626~~83~111~~~,{111-111-4fd6-983d-111},111,001377677],WrappedArray([2016-06-10t05:34:01.209z,value,value,1,value,e,WrappedArray(tv),null,WrappedArray(value),null,value]),WrappedArray([PERRY ST,UNIT 171,2014-04-23T00:00:00.000Z,2016-06-10t05:34:01.209z,02,8005,903100,value,WrappedArray(CT-02),c001,469,09015,83STCT06260,1,41.912786,051,WrappedArray(-77.897427, 41.912786),-77.897427,7,WrappedArray(tv),PERRY ST UNIT, PUTNAM, CT 06260,029,WrappedArray(value),CT,PERRY,null,registered,PERRY ST, UNIT, PUTNAM, CT 06260,06260,2249], [CROOKED TRAIL EXT,null,null,2016-06-10t05:34:01.209z,02,null,null,WOODSTOCK,WrappedArray(CT-02),null,null,09015,EXTCT06281,1,null,051,null,null,7,WrappedArray(tv),CROOKED TRAIL EXT, WOOD, CT 06281,029,WrappedArray(voterrecord201602),CT,null,null,mailing,TRAIL EXT, WOOD, CT 06281,06281,null])])

Version Info

Hadoop/Spark: Spark 1.6
ES-Hadoop : 2.3.2
ES : 2.3.1

@costin
Copy link
Member

costin commented Jun 27, 2016

Do you have a data sample, it can be as simple as one, two entries/lines? It would help a lot in reproducing the problem and finding the bug.

Thanks,

@ssemichev
Copy link
Author

ssemichev commented Jun 28, 2016

It works as expected if I am writing existing JSON to ES

>cat persons.txt
{"id":"1","sources":["source1"],"captured_at":"2016-06-10t05:34:01.578z","name":{"fname":"fname1","mname":"mname1","lname":"lname1"},"child_details":{"value":"details"}}
{"id":"2","sources":["source1"],"captured_at":"2016-06-10t05:34:01.578z","name":{"fname":"fname2","lname":"lname2"},"child_details":{"value":"details"}}
import org.elasticsearch.spark.sql._

var config: Map[String,String] = Map()
config += ("es.nodes" -> "ip-xxx")
config += ("es.mapping.id" -> "id")  

val rdd = sc.textFile("/mnt/spark/tests/persons.json")
org.elasticsearch.spark.rdd.EsSpark.saveJsonToEs(rdd, "persons_dev/person", config)

Result (mname field for person2 hasn't been created)

"hits": [
    {
       "_source": {
          "id": "1",
          "name": {
             "fname": "fname1",
             "lname": "lname1",
             "mname": "mname1"
          }
       }
    },
    {
       "_source": {
          "id": "2",
          "name": {
             "fname": "fname2",
             "lname": "lname2"
          }
       }
    }
 ]

But it creates mname field with NULL value if I save a data frame.
I understand that when the JSON objects have been transformed to the typed data frame, we already have these NULL values. The question is can you add a feature to reject NULLs and don't index such fields?

 val person1 = """{"id":"1","sources":["source1"],"captured_at":"2016-06-10t05:34:01.578z","name":{"fname":"fname1","mname":"mname1","lname":"lname1"},"child_details":{"value":"details"}}"""      
 val person2 = """{"id":"2","sources":["source1"],"captured_at":"2016-06-10t05:34:01.579z","name":{"fname":"fname2","lname":"lname2"},"child_details":{"value":"details"}}"""

 val jsonRdd = sc.makeRDD(Seq(person1, person2))

 val jsonDf = sqlContext.read.json(jsonRdd).toDF.select($"id", $"name")
jsonDf.printSchema


 > jsonDf.printSchema
 root
  |-- id: string (nullable = true)
  |-- name: struct (nullable = true)
  |    |-- fname: string (nullable = true)
  |    |-- lname: string (nullable = true)
  |    |-- mname: string (nullable = true)
> jsonDf.collect
Array[org.apache.spark.sql.Row] = Array([1,[fname1,lname1,mname1]], [2,[fname2,lname2,null]])
import org.elasticsearch.spark.sql._

var config: Map[String,String] = Map()
config += ("es.nodes" -> "ip-xxx")
config += ("es.mapping.id" -> "id")  

jsonDf.saveToEs("persons_dev/person", config)

Result (mname field for person2 has been created)

"hits": [
    {
       "_source": {
          "id": "1",
          "name": {
             "fname": "fname1",
             "lname": "lname1",
             "mname": "mname1"
          }
       }
    },
    {
       "_source": {
          "id": "2",
          "name": {
             "fname": "fname2",
             "lname": "lname2",
             "mname": null
          }
       }
    }
 ]

Thank you in advance!

@jbaiera
Copy link
Member

jbaiera commented Jul 1, 2016

@ssemichev Thank you for the very clear breakdown of the problem! I was able to reproduce this locally very quickly and I am currently testing a fix for this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants