Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DSLSpout and bullet-core-0.6.5 #69

Merged
merged 16 commits into from
Feb 6, 2019
Merged

DSLSpout and bullet-core-0.6.5 #69

merged 16 commits into from
Feb 6, 2019

Conversation

0aix
Copy link
Member

@0aix 0aix commented Jan 11, 2019

No description provided.

@coveralls
Copy link

coveralls commented Jan 11, 2019

Coverage Status

Coverage increased (+0.2%) to 99.58% when pulling b976497 on 0aix:master into c34cd85 on bullet-db:master.

Copy link
Member

@akshaisarma akshaisarma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add a setting to enable its usage and wire it in the Topology main class. Can we bump the version to 0.9.0?

@@ -0,0 +1,136 @@
/*
* Copyright 2019, Verizon Media.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the same one as the rest of the project or at least Oath. This is not valid yet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I honestly didn't remember putting those in..

*
* @param args A list of arguments where the first argument is expected to be the path to a configuration file.
*/
public DSLSpout(List<String> args) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor isn't needed. Even for testing.

@Override
public void nextTuple() {
List<Object> objects = Collections.emptyList();
try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Push to a different method.

} catch (BulletDSLException e) {
log.error("Could not read from BulletConnector.", e);
}
long time = System.currentTimeMillis();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This time is a in-between time from reading to converting. Can we make it pre-emission after all conversion? Or pre-reading? Or get a new time per emission?

}
long time = System.currentTimeMillis();
for (Object object : objects) {
try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Push to a new method and can use streams with filters for non-null. Also the try catch just needs to be around the convert call.


@Override
public void ack(Object id) {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove empty line here and below

* @param config Not used.
*/
public MockConnector(BulletConfig config) {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty line

@@ -96,6 +97,11 @@
<artifactId>bullet-core</artifactId>
<version>${bullet.core.version}</version>
</dependency>
<dependency>
<groupId>com.yahoo.bullet</groupId>
<artifactId>bullet-dsl</artifactId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will pull in kafka, pulsar and whatever else clients in the future. Can we use the DSL by having the jar at runtime and make it provided here? We need to try that out in storm.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I've tried this out. A couple problems but works for the most part.

  1. I get a JNI error from the bullet-storm fat jar. I fixed this by making bullet-dsl "provided" and giving the jar to storm separately.
  2. There's a pulsar dependency in BulletDSLConfig. Oops.
  3. BulletConnector isn't serializable. Oops.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's fix bullet-dsl before merging this then

@0aix
Copy link
Member Author

0aix commented Jan 16, 2019

What I'm thinking to enable DSLSpout in the main class:

  • Set the original "bullet-spout" arg to optional
  • Add an optional "bullet-spout-use-dsl" boolean arg (or some other name)
  • (Later on) add other args for the dsl spout/bolt options we talked about
  • DSLSpout will use the same parallelism/cpu/memory args as "bullet-spout"
  • Rewrite the help message to make more sense

Edit: why do we use command line args anyways?

@akshaisarma
Copy link
Member

Maybe we shouldn't hijack bullet-spout to do the dsl plugin. Let's just make a set of different settings that are only used if dsl is enabled. We should also have a setting to separate using the BulletConnector in the DSLSpout and the BulletConvertor in a new DSLBolt. If this setting is false, we do both in the DSLSpout.

.defaultTo(DEFAULT_DSL_SPOUT_ENABLE)
.checkIf(Validator::isBoolean);
VALIDATOR.define(DSL_SPOUT_PARALLELISM)
.checkIf(Validator::isPositiveInt)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation on these and below

Copy link
Member

@akshaisarma akshaisarma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a reference to as a Spout (in which case, just use the {@link Topology#main(String[])} method with the class name of your Spout. in the javadocs in StormUtils.java that is no longer applicable.

collector.emit(new Values(objects, System.currentTimeMillis()), DUMMY_ID);
return;
}
objects.forEach(this::convertAndEmit);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can do collector.emit(new Values(dslBoltEnable ? objects : convert(objects), System.currentTimeMillis()), DUMMY_ID); and change convertAndEmit

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad. List emit vs object emit

}

/**
* This submits a topology after loading the given spout with the given configuration as the source of
* {@link com.yahoo.bullet.record.BulletRecord}.
* This submits a topology after loading the configured Spout (and optionally, bolt), which is either the {@link DSLSpout}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capitalize bolt?

@0aix 0aix merged commit 4287094 into bullet-db:master Feb 6, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants