Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JSON schema inference corrupt with Elasticsearch Spark #441

Closed
ejsarge-gr opened this issue Apr 29, 2015 · 7 comments
Closed

JSON schema inference corrupt with Elasticsearch Spark #441

ejsarge-gr opened this issue Apr 29, 2015 · 7 comments

Comments

@ejsarge-gr
Copy link

The elasticsearch-hadoop library appears to corrupt the JSON schema inference. The same JSON source read using the SQLContext.jsonFile method succeeds.

Reproduction Steps

  1. Have an elasticsearch instance and configure it below.
  2. curl -XDELETE localhost:9200/events2-salsnap1-2013-12/
  3. curl -XPUT localhost:9200/events2-salsnap1-2013-12/event/76e3773d-8a19-485a-a75c-225070e2cbc6 -d '{"EventType":"MessageExportRequested","EventTime":1387710245000,"SessionId":"gsk*****","Trigger":"_null_","ScopedCompanyId":148,"ScopedArchiveId":"anArchive","EventId":"dbnbudzu4wge","ActorEntity":{"CompanyId":148,"IpAddress":"127.0.0.1","EntityId":"602","EntityName":"first-1 last-1","EntityType":"CompanyUser"},"AffectedEntity1":{"EntityId":"5678","EntityName":"5678","EntityType":"MessageExport","ExportPurpose":"FinraAudit","CaseName":"R v Sargisson","NumberMessages":534,"CaseId":"Sarg598","PriceCurrency":"CAD","DeliveryOptions":"DirectDownload","NewestMessageDate":1419112760000,"SpecialRequest":"_null_","ExportName":"Some Export","ExportFormat":"EML","SizeMessagesInBytes":1234789,"ExportDescription":"If the NSA can do it then so can I","ExportOption":"IncludeHiddenRecipientData","Price":500.12,"OldestMessageDate":1387576760000}}'
  4. Use Java to run SparkSQLElasticsearchTest
  5. Use Java to run SparkSQLJsonFileTest

Expected Output

ActorEntity          AffectedEntity1      EventId      EventTime     EventType            ScopedArchiveId ScopedCompanyId SessionId Trigger
[148,602,first-1 ... [Sarg598,R v Sarg... dbnbudzu4wge 1387710245000 MessageExportRequ... anArchive       148             gsk*****  _null_

Actual Output (from SparkSQLElasticsearchTest)

ActorEntity          AffectedEntity1 EventId  EventTime EventType ScopedArchiveId ScopedCompanyId SessionId            Trigger
MessageExportRequ... 1387710245000   gsk***** _null_    148       anArchive       dbnbudzu4wge    Map(CompanyId -> ... Map(EntityId -> 5...

Files

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <parent>
        <groupId>com.globalrelay</groupId>
        <artifactId>globalrelay-parent</artifactId>
        <version>3.2</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.globalrelay.herald</groupId>
    <artifactId>spark-es-test</artifactId>
    <version>0.0.0.1-SNAPSHOT</version>
    <name>Spark elasticsearch Test</name>

    <!-- Repository for snapshot version of elasticsearch-hadoop. Required until they release. -->
    <repositories>
      <repository>
        <id>sonatype-oss</id>
        <url>http://oss.sonatype.org/content/repositories/snapshots</url>
        <snapshots><enabled>true</enabled></snapshots>
      </repository>
    </repositories>

    <properties>
      <spark.version>1.3.1</spark.version>
      <elasticsearch-hadoop.version>2.1.0.BUILD-SNAPSHOT</elasticsearch-hadoop.version>
      <es.version>1.3.2</es.version>
    </properties>

    <dependencies>
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>${spark.version}</version>
      </dependency>
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>${spark.version}</version>
      </dependency>
      <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-hadoop</artifactId>
        <version>${elasticsearch-hadoop.version}</version>
        <exclusions>
          <exclusion>
            <groupId>org.apache.pig</groupId>
            <artifactId>pig</artifactId>
          </exclusion>
          <exclusion>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
          </exclusion>
          <exclusion>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-streaming</artifactId>
          </exclusion>
          <exclusion>
            <groupId>cascading</groupId>
            <artifactId>cascading-local</artifactId>
          </exclusion>
          <exclusion>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
          </exclusion>
          <exclusion>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
          </exclusion>
          <!-- <exclusion>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-mapper-asl</artifactId>
          </exclusion> -->
          <exclusion>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
          </exclusion>
          <exclusion>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
          </exclusion>
          <exclusion>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
          </exclusion>
          <exclusion>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-tools</artifactId>
          </exclusion>
          <exclusion>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
          </exclusion>
          <exclusion>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-service</artifactId>
          </exclusion>
          <exclusion>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
          </exclusion>
          <exclusion>
            <groupId>cascading</groupId>
            <artifactId>cascading-hadoop</artifactId>
          </exclusion>
        </exclusions>
      </dependency>
      <dependency>
        <groupId>org.codehaus.jackson</groupId>
        <artifactId>jackson-mapper-asl</artifactId>
        <version>1.8.8</version>
      </dependency>
      <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>${es.version}</version>
      </dependency>
    </dependencies>

</project>

SparkSQLElasticsearchTest.java

import static org.elasticsearch.index.query.FilterBuilders.typeFilter;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.filteredQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.elasticsearch.index.query.MatchQueryBuilder.Operator;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;

public class SparkSQLElasticsearchTest {

    /**
     * Before running this test:
     * 1. Have an elasticsearch instance and configure it below.
     * 2. curl -XDELETE localhost:9200/events2-salsnap1-2013-12/
     * 3. curl -XPUT localhost:9200/events2-salsnap1-2013-12/event/76e3773d-8a19-485a-a75c-225070e2cbc6 -d '{"EventType":"MessageExportRequested","EventTime":1387710245000,"SessionId":"gsk*****","Trigger":"_null_","ScopedCompanyId":148,"ScopedArchiveId":"anArchive","EventId":"dbnbudzu4wge","ActorEntity":{"CompanyId":148,"IpAddress":"127.0.0.1","EntityId":"602","EntityName":"first-1 last-1","EntityType":"CompanyUser"},"AffectedEntity1":{"EntityId":"5678","EntityName":"5678","EntityType":"MessageExport","ExportPurpose":"FinraAudit","CaseName":"R v Sargisson","NumberMessages":534,"CaseId":"Sarg598","PriceCurrency":"CAD","DeliveryOptions":"DirectDownload","NewestMessageDate":1419112760000,"SpecialRequest":"_null_","ExportName":"Some Export","ExportFormat":"EML","SizeMessagesInBytes":1234789,"ExportDescription":"If the NSA can do it then so can I","ExportOption":"IncludeHiddenRecipientData","Price":500.12,"OldestMessageDate":1387576760000}}'
     */
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("ElasticsearchTest");
        // Note that the elasticsearch node needs to have its publish_host set to be reachable.
        conf.set("spark.es.nodes", "192.168.50.2:9200"); // elasticsearch port
        conf.set("spark.es.resource", "customer/external");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(jsc);

        QueryBuilder qb = filteredQuery(
                boolQuery().
                must(matchQuery("EventType", "MessageExportRequested").operator(Operator.OR))/*.
                            must(rangeQuery("EventTime").from(startDate.getTime()).to(endDate.getTime()))*/,
                            typeFilter("event"));

        String queryString = "{\"query\":" + qb.toString() + "}";

        DataFrame baseDF = JavaEsSparkSQL.esDF(sqlContext, "events2-salsnap1-2013-12/event", queryString);

        System.out.println(baseDF.schema());
        baseDF.show();

        // Actual output
        // ActorEntity          AffectedEntity1 EventId  EventTime EventType ScopedArchiveId ScopedCompanyId SessionId            Trigger
        // MessageExportRequ... 1387710245000   gsk***** _null_    148       anArchive       dbnbudzu4wge    Map(CompanyId -> ... Map(EntityId -> 5...

        // Expected output (as for jsonfile)
        // ActorEntity          AffectedEntity1      EventId      EventTime     EventType            ScopedArchiveId ScopedCompanyId SessionId Trigger
        // [148,602,first-1 ... [Sarg598,R v Sarg... dbnbudzu4wge 1387710245000 MessageExportRequ... anArchive       148             gsk*****  _null_


    }
}

SparkSQLJsonFileTest

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

public class SparkSQLJsonFileTest {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("JsonFileTest");
        // Note that the elasticsearch node needs to have its publish_host set to be reachable.
        conf.set("spark.es.nodes", "192.168.50.2:9200"); // elasticsearch port
        conf.set("spark.es.resource", "customer/external");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(jsc);

        DataFrame baseDF = sqlContext.jsonFile("src/main/resources/message-export-events.json");

        System.out.println(baseDF.schema());
        baseDF.show();
        // Expected output:
        //        ActorEntity          AffectedEntity1      EventId      EventTime     EventType            ScopedArchiveId ScopedCompanyId SessionId Trigger
        //        [148,602,first-1 ... [Sarg598,R v Sarg... dbnbudzu4wge 1387710245000 MessageExportRequ... anArchive       148             gsk*****  _null_
    }
}

src/main/resources/message-export-events.json

{"EventType":"MessageExportRequested","EventTime":1387710245000,"SessionId":"gsk*****","Trigger":"_null_","ScopedCompanyId":148,"ScopedArchiveId":"anArchive","EventId":"dbnbudzu4wge","ActorEntity":{"CompanyId":148,"IpAddress":"127.0.0.1","EntityId":"602","EntityName":"first-1 last-1","EntityType":"CompanyUser"},"AffectedEntity1":{"EntityId":"5678","EntityName":"5678","EntityType":"MessageExport","ExportPurpose":"FinraAudit","CaseName":"R v Sargisson","NumberMessages":534,"CaseId":"Sarg598","PriceCurrency":"CAD","DeliveryOptions":"DirectDownload","NewestMessageDate":1419112760000,"SpecialRequest":"_null_","ExportName":"Some Export","ExportFormat":"EML","SizeMessagesInBytes":1234789,"ExportDescription":"If the NSA can do it then so can I","ExportOption":"IncludeHiddenRecipientData","Price":500.12,"OldestMessageDate":1387576760000}}
@costin
Copy link
Member

costin commented Apr 30, 2015

@ejsarge-gr First off, thanks for the detailed report; if only all will be done this way, things will be a LOT easier.
I've managed to reproduce the issue and will try to address it tomorrow. It looks like Spark maps the object to a struct and returns only its values while the connector translates that to a Map and returns both keys and values.
I'll try to align the two as it looks like the Map is not widely used by Spark SQL itself.

costin added a commit that referenced this issue May 4, 2015
Enhance value order when dealing with nested values

relates #441
@costin
Copy link
Member

costin commented May 5, 2015

@ejsarge-gr Hi, the issue should be fixed in master. I've pushed a new dev build to Maven so please try it out and report back whether it addresses your problem.

Cheers,

@ejsarge-gr
Copy link
Author

It appears to be working for me correctly. Thanks!

@costin
Copy link
Member

costin commented May 6, 2015

That's great to hear. Thanks for the feedback.

@analyticswarescott
Copy link

I grabbed the latest build as I was seeing this issue as well. While the schema inference seems to work now, the DataFrame is now returning all the objects wrapped in Buffers (in one case more than one level). This is causing a downstream error in (in my case) df.insertIntoJDBC when I try to invoke it on the DF from ES (code and logs below). Any ideas on how to get this convenient method working again?

------code
StoreWriterJDBC.logger.debug(" SCHEMA for DataFrame " + data.schema().toString());
StoreWriterJDBC.logger.debug(" Row 1 for DataFrame " + data.take(1)[0].toString());
data.insertIntoJDBC(_url, _tableName, true);

--debug log

2015-06-04 04:40:16,173 [ForkJoinPool.commonPool-worker-1] DEBUG com.dg.data.sync.writers.StoreWriterJDBC - SCHEMA for DataFrame StructType(StructField(medid,StringType,true), StructField(meid,StringType,true), StructField(ot,LongType,true), StructField(otn,StringType,true))
2015-06-04 04:40:16,205 [ForkJoinPool.commonPool-worker-1] DEBUG com.dg.data.sync.writers.StoreWriterJDBC - Row 1 for DataFrame [Buffer(42ce0341-d1da-11e4-b74a-8086f2c79781),Buffer(07b9bc73-aaf2-1034-4744-806e6f1de1e6),Buffer(10),Buffer(Buffer(File Delete))]
2015-06-04 04:40:16,267 [Executor task launch worker-3] ERROR org.apache.spark.executor.Executor - Exception in task 0.0 in stage 71.0 (TID 167)
java.lang.ClassCastException: scala.collection.convert.Wrappers$JListWrapper cannot be cast to java.lang.String
at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:117)
at org.apache.spark.sql.jdbc.package$JDBCWriteDetails$.savePartition(jdbc.scala:83)
at org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$saveTable$1.apply(jdbc.scala:180)
at org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$saveTable$1.apply(jdbc.scala:179)

@costin
Copy link
Member

costin commented Jun 5, 2015

@analyticswarescott Please open a new issue with some information about the actual versions of your runtime (git SHA1 of es-hadoop; you'll find it in the logs).
Also do provide a quick snippet of your Spark project and some sample data.
For reference, I've tried reproducing your issue (outside the existing test suite) and the snippet of the code above (referred in this issue) without success.

Moreover note that the test suite compares the data output and the schema against a raw JSON input as you can see here. I'm not sure where that Buffer comes from.

Anyway, this issue has been derailed long enough - please let's continue the discussion through a new issue.

@analyticswarescott
Copy link

With the new release build I was able to isolate the issue and I recorded it as #497

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants