Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Per tap configuration for cascading #146

costin opened this Issue Feb 21, 2014 · 5 comments


None yet
2 participants
Copy link

costin commented Feb 21, 2014

Taken from #134

I'm using 1.3.0.M2 & Scalding 0.9.0rc4

I'm overriding the config object and set the to "number"

import com.twitter.scalding.{Job, Args}
import org.elasticsearch.hadoop.cfg.ConfigurationOptions

class JobBase(args: Args) extends Job(args) {
// Overide JobConfig
override def config : Map[AnyRef,AnyRef] = {
super.config ++ Map (ConfigurationOptions.ES_MAPPING_ID -> "number") ++ Map(ConfigurationOptions.ES_WRITE_OPERATION -> "index")


My Scalding Job looks like that

class ElasticSearchUpdateIndexes(args: Args) extends JobBase(args) {

// Some data to push into elastic-search
val someData = List(
("1","product1", "description1"),
("2","product2", "description2"),
("3","product3", "description3"))

val indexNewDataInElasticSearch =
IterableSource(String,String, String))
.write(ElasticSearchSource("localhost", 9200,"index_es/type_es"))

And the wrapper that i'm trying to implement and contribute to Scalding for the ElasticSearchSource currently looks like this:

case class ElasticSearchSource(
es_host :String="localhost",
es_port :Int = 9200,
es_fields : Fields = Fields.ALL)
extends Source {

def createLocalTap: Tap[, _, _] =
new EsTap(eshost, es_port, es_resource,"",es_fields)

override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[, _, _] = {
mode match {
case Local() => {

My problem is that once i introduce :

++ Map (ConfigurationOptions.ES_MAPPING_ID -> "number")

I'm getting

cascading.tuple.TupleException: unable to sink into output identifier: 'unknown'

My second concern is around the usage of the ConfigurationOptions.ES_MAPPING_ID as part of the JobConfiguration
I understand the benefits of that approach , for Hive/Pig, but I think for Cascading/Scalding - having a single property is inefficient

What i would ideally be able to do in Scalding is the following:

val productSales = data
.project('productID, 'customerID, 'quantity)
.write(ElasticSearchSource("localhost", 9200,"offers/summer-offer-14"), 'productID) // productID is the
.joinWithSmaller('customerID -> 'customerID, customerData)
.write(ElasticSearchSource("localhost", 9200,"customers/got-offer"), 'customerID) // customerID is the

So i would like within a Single Job to have multiple elastic-search sources & sinks.
My understanding at the moment is that elasticsearch-hadoop will not allow me to configure all sources..

Anyhow i'm just looking for some help in implementing this capability in Scalding ..
Any help appreciated


This comment has been minimized.

Copy link
Member Author

costin commented Feb 21, 2014

@Antwnis I copied your email here in its own issue. I'll use this issue to track the per Tap configuration for cascading support.
Regarding the unknown problem, as I've mentioned on the mailing list, please try and translate your code into proper Cascading and raise a separate issue for it.



This comment has been minimized.

Copy link

Antwnis commented Feb 24, 2014

Regarding the 'unknown problem just created #150

@costin costin added v1.3.0.M3 and removed bug labels Mar 6, 2014

costin added a commit that referenced this issue Mar 6, 2014


This comment has been minimized.

Copy link
Member Author

costin commented Mar 7, 2014

@Antwnis can you please try the latest snapshot (#333). It brings per tap configuration - both in terms of index/query but also complete settings (through the Properties field). Now, one can specify a Tap specific configuration through the constructor.
The Configuration object can still be used however the Tap settings will always win.


@costin costin closed this Mar 7, 2014


This comment has been minimized.

Copy link

Antwnis commented Mar 24, 2014

Excellent work @costin !! Everything works as expected - Many thanks !!


This comment has been minimized.

Copy link
Member Author

costin commented Mar 24, 2014

Glad to hear it - thanks for the feedback!

costin added a commit that referenced this issue Apr 8, 2014

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.