Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pushdown option not working as expected with Spark data frames #734

Closed
vijaygrk opened this issue Apr 8, 2016 · 1 comment
Closed

Pushdown option not working as expected with Spark data frames #734

vijaygrk opened this issue Apr 8, 2016 · 1 comment

Comments

@vijaygrk
Copy link

vijaygrk commented Apr 8, 2016

Issue description

Pushdown option not working as expected when using spark data frames. I created a simple app in scala to launch the thrift server and serve data frames created from elastic search documents.
A simple count query takes 15 secs to return a count of 6000.

Description

Steps to reproduce

This is how i launch the server. I use a spark JDBC driver from progress to execute the count query

spark-submit --jars /usr/local/bin/elasticsearch-hadoop-2.2.0.jar --class "SimpleApp" --master local[4] target/scala-2.10/simple-project_2.10-1.0.jar

Code:

object SimpleApp {
  def main(args: Array[String]) {

    val domain: String = "goodman_networks"
    val conf = new SparkConf().setAppName("JDBCServer")
    conf.set("es.nodes", "10.10.131.194")
    conf.set("es.nodes.wan.only", "true")
    conf.set("es.nodes.discovery", "false")
    conf.set("spark.sql.hive.thriftServer.singleSession", "true")

    val sparkContext = new SparkContext(conf)

    /*val sqlContext = new SQLContext(sparkContext)*/
    val sqlContext = new HiveContext(sparkContext)

    val orgDF = sqlContext.read.format("org.elasticsearch.spark.sql").load(domain + "/org_units")
    val usersDF = sqlContext.read.format("org.elasticsearch.spark.sql").load(domain +"/users")
    val projectsDF = sqlContext.read.format("org.elasticsearch.spark.sql").load(domain +"/projects")
    val activitiesDF = sqlContext.read.format("org.elasticsearch.spark.sql").load(domain +"/activities")

    projectsDF.registerTempTable("projects");
    usersDF.registerTempTable("users");
    orgDF.registerTempTable("org_units");
    activitiesDF.registerTempTable("activities");


    projectsDF.printSchema()
    projectsDF.show()
    HiveThriftServer2.startWithContext(sqlContext)
  }
}

Strack trace:

Stack trace goes here

Version Info

OS: : Centos 7
JVM : openjdk version "1.8.0_65"
Hadoop/Spark: spark-1.6.1-bin-hadoop2.6
ES-Hadoop : 2.2.0
ES : 2.1.1

@costin
Copy link
Member

costin commented May 3, 2016

That's because count operation cannot be pushed down, especially for DataFrame. That is Spark doesn't push down the operation, instead it loads all the items.
Several versions ago, we thought about implementing a custom method for RDDs however it was reverted since it had different semantics - the count was returned in an instance however the RDD content was not initialized which led to subtle changes.

Until Spark pushed down this operation, count ends up loading all the items.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants