Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[STORM-1015] Allow Kafka offsets to be saved using Kafka's consumer offset management api #705

Closed
wants to merge 19 commits into from

Conversation

ooasis
Copy link

@ooasis ooasis commented Aug 28, 2015

Not sure when it will be reviewed. So I chose to implement it based on master branch. Could be ported to other branches if needed.

-thanks

@rmkellogg
Copy link
Contributor

A few minor nits:

When providing javadoc on variables/methods, be sure to use the following syntax:

/**
* Comment text
**/
private String variableName;

Instead of the following:

// offset state information storage. validate options are storm and kafka
private String variableName;

The first variation will provide fly-over-help within IDE and generated Javadoc files.

Secondly:
Logger instance variables should be private.

Thirdly:
Suggest instance variables on PartitionManager, StaticCoordinator ZkDataStore be made private.

@ooasis
Copy link
Author

ooasis commented Sep 9, 2015

Thanks for the advices. Will get them fixed as soon as I get some time.

@sweetest
Copy link
Contributor

sweetest commented Sep 9, 2015

how will this change affect the format of spout's offset message in zookeeper?

@ooasis
Copy link
Author

ooasis commented Sep 9, 2015

I assume you are referring to the json format of the spout "state" stored in ZK? The "state" will be saved using the exact same json format in Kafka's internal topic as the "metadata" associated with the offsets.

@ooasis
Copy link
Author

ooasis commented Sep 12, 2015

Fixed coding styles suggested by rmkellogg.

import kafka.common.OffsetAndMetadata;
import kafka.common.OffsetMetadataAndError;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please expand this * import to explicitly include the classes you depend on

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix is pushed

@erikdw
Copy link
Contributor

erikdw commented Sep 15, 2015

This seems like it might not be backwards compatible with the existing kafka-spout; i.e., the offsets in ZK are presumably not going to be stored exactly the same as they were before. Is there any plan for supporting migration from the current kafka-spout to this one?

@erikdw
Copy link
Contributor

erikdw commented Sep 16, 2015

Furthermore, can you please provide a reference to "Kafka's consumer offset management api" in your description?

@ooasis
Copy link
Author

ooasis commented Sep 16, 2015

Erik,

I added 2 reference links in the description with more information on "Kafka's consumer offset management api".

As of migration from existing offsets stored in ZK to Kafka, it could be done with a simple migration utility similar to the tools provided by Kafka (https://cwiki.apache.org/confluence/display/KAFKA/System+Tools). Question is where it belongs to in Storm codebase.

-thanks

@@ -52,17 +52,24 @@ The optional ClientId is used as a part of the zookeeper path where the spout's
There are 2 extensions of KafkaConfig currently in use.

Spoutconfig is an extension of KafkaConfig that supports additional fields with ZooKeeper connection info and for controlling
behavior specific to KafkaSpout. The Zkroot will be used as root to store your consumer's offset. The id should uniquely
identify your spout.
behavior specific to KafkaSpout. The Zkroot will be used as root to store your consumer's offset if you chose Zookeeper as the storage.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit: Zookeeper -> ZooKeeper.

@erikdw
Copy link
Contributor

erikdw commented Sep 21, 2015

@hsun-cnnxty : I don't see the reference links in the Description on the PR's Conversation view? Maybe I'm looking in the wrong place? Maybe they should be in comments in the code too?
Yup, was looking in the wrong place, I didn't realize that "the description" meant the JIRA issue's Description!

I'm guessing the following are the links you meant to put:

I just want it to be very clear that this PR's purpose is to change from the kafka spout's consumer offsets being stored in ZooKeeper, to instead being stored directly in Kafka. We should also be clear about the version of Kafka required for such support (0.8.2+). I know the current version of the storm-kafka pom.xml (as of this change) is already referencing 0.8.2.1, but I feel like it should be called out as an explicit requirement in the commit.

this._stormConf = stormConf;
this._spoutConfig = spoutConfig;

// default to orignal storm storage format
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: original

@ooasis
Copy link
Author

ooasis commented Sep 22, 2015

Erik,

Pushed the change to rename config from "storm" to "zookeeper" (also fixed the typo). As of Kafka dependency, it is a good question. Actually I am not clear on how the "storm-kafka" module is built with Maven. Without adding the particular Kafka dependencies in pom.xml, the local build just won't work for me. Do I suppose to use some external config (such as settings.xml) to make it built locally?

-thanks

@choang
Copy link

choang commented Sep 28, 2015

I recommend making your abstraction at the state store, so you would have:

public interface StateStore {
  public void write(Partition p, long offset);
  public long read(Partition p);
}

public class ZkStateStore implements StateStore {
...
}

public class KafkaStateStore implements StateStore {
...
}

/**
Some topologies read from LATEST after a restart, so only memory state is needed.
*/
public class MemoryStateStore implements StateStore {
...
}

You would not need different PartitionStateManager for different stores. You would just have:

public class PartitionStateManager {
    public PartitionStateManager (..., StateStore store) { ... }
    public void writeState(...) {
        store.write(...);
    }
}

@ooasis
Copy link
Author

ooasis commented Oct 4, 2015

Hi Chi,

Storm stores more than just offset/partition data in the "state", would it be necessary to declare?

public interface StateStore {
    public void write(Partition p, Map<Object, Object> state);
    public Map<Object, Object> state read(Partition p);
}

-thanks

@choang
Copy link

choang commented Oct 4, 2015

I would make state more concrete, but I suppose your approach is fine.

@erikdw
Copy link
Contributor

erikdw commented Oct 5, 2015

@hsun-cnnxty : can you please make your statement a bit more concrete? i.e., what other info is stored in a given kafka topic's consumer state? (other than the offset/partition)

@ooasis
Copy link
Author

ooasis commented Oct 5, 2015

@choang: the refactored code is pushed

@erikdw: I was referring to the internal Json structure used to store offsets as shown by example below:

{
"broker": {
"host": "kafka.sample.net",
"port": 9092
},
"offset": 4285,
"partition": 1,
"topic": "testTopic",
"topology": {
"id": "fce905ff-25e0 -409e-bc3a-d855f 787d13b",
"name": "Test Topology"
}
}

@@ -86,7 +93,7 @@ The KafkaConfig class also has bunch of public variables that controls your appl
public int fetchMaxWait = 10000;
public int bufferSizeBytes = 1024 * 1024;
public MultiScheme scheme = new RawMultiScheme();
public boolean ignoreZkOffsets = false;
public boolean ignoreStoredOffsets = false;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a mechanism/section in the release notes to call out this change? changing this var name would break existing code.


SpoutConfig _spoutConfig;
SpoutOutputCollector _collector;
PartitionCoordinator _coordinator;
DynamicPartitionConnections _connections;
ZkState _state;
PartitionStateManagerFactory _partitionStateManagerFactory;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of a factory, you can make the developer declare the StateStore:

public void createTopology() {
    Spout spout = new KafkaSpout(..., new KafkaStateStore(...));
...
}

This should keep the KafkaSpout code much simpler and more explicit, and eliminate the need for a factory.

// https://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java
private volatile BlockingChannel _offsetManager;

public KafkaStateStore(Map stormConf, SpoutConfig spoutConfig) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stormConf isn't used so remove. By using spoutConfig, you are bleeding the spout state into the store. I understand it is for convenience, but using explicit would be better, so perhaps you can create a KafkaStoreConfig.

@choang
Copy link

choang commented Oct 6, 2015

looks really good. just had a few nits that shouldn't block. I also don't have a vote, so you'll need to find a proper reviewer :)

@ooasis
Copy link
Author

ooasis commented Oct 6, 2015

@choang thanks for the code review. Your comment on sharing the kafka store for all partitions is really helpful and I was able to cleanup a lot of unnecessary logic. I still have some code to check in and need to run a few tests. Maybe you can hold on for a while as I don't want to waste your time on the changing code base. I will try to write a response as soon as I get some time. Thanks again.

@ooasis
Copy link
Author

ooasis commented Oct 14, 2015

@choang In recent changes, I have made it possible to plug in custom store implementations. The custom implementation is given the opportunity to initialize itself by injecting two configuration objects into the constructor which takes the form like

public MyStoreImpl(Map conf, SpoutConfig spoutConfig)

In addition, I feel it is better to keep the implementation specific details, such as zookeeper quorum, inside the store implementation instead of leaving it to external class to build a implementation specific configuration object such as KafkaStoreConfig. What do you think?

Thanks again for the code review.

@choang
Copy link

choang commented Oct 14, 2015

StateStore implementations could be designed like:

public class KafkaStateStore implements StateStore {
    public KafkaStateStore(HostPort kafkaBroker, String consumerId) {  // consumerId could be topic?  I'm don't know enough about the compaction feature
        ...
     }
...
}

public class ZkStateStore implements StateStore {
    public ZkStateStore(HostPort zkConnect, String path) {
        ...
    }
}

// future implementations
public class JdbcStateStore implements StateStore {
    public JdbcStateStore(String url, String stateTable, String user, String password) {
        ...
    }
}

public class MemoryStateStore implements StateStore {
    public MemoryStateStore() {
        ...
    }
}

What I'm trying to illustrate are two factors:

  1. go away from StormConf and SpoutConfig because StateStore does not need all properties of either objects.
  2. the concrete implements of StateStore do not need to have the same constructor because your goal is not to have a factory and make it completely config driven. You want to make it so the topology developer can decide what StateStore to use.

With the above, it would be pretty easy for someone to implement MemoryStateStore and JdbcStateStore.

* apache/master: (21 commits)
  Added STORM-1204 to Changelog
  Added STORM-831 to Changelog
  Added STORM-1208 to Changelog
  Guard against both Infinity and NaN
  Added STORM-1016 to Changelog
  add apache license header
  Guard against NPE, and avoid using NaN values
  Fixing attempt to access to directory before checking user authorization and avoid dumps listing error
  Fix copyright year and holder.
  Adding MIT license for statistics image
  Added STORM-1190 to Changelog
  replace HashSet with LinkedHashSet in TridentTopology.java
  Use generic name bugtracker instead of jira and use public domain images
  Some more fixes to the test so they shutdown correctly.
  Fixes for unit tests getting stuck.
  Some fixes to improve performance
  Better performance with batching disabled, and better CPU utilization from flushers
  STORM-1190: System Load too high after recent changes
  STORM-1016: Generate trident bolt ids with sorted group names
  Remove default JIRA URL so that custom deployments won't default to apache jira link
  ...
@ooasis
Copy link
Author

ooasis commented Jan 6, 2016

@choang changes are made and latest code from master merged in. Please review whenever you have time. -thanks

@choang
Copy link

choang commented Jan 7, 2016

this looks good enough

* apache/master: (64 commits)
  add STORM-1496 to CHANGELOG.md
  fixing sporadic nimbus log failure and topology visualization
  backport STORM-1484/1478 to 1.0.0, too
  add STORM-1499 to CHANGELOG.md
  fix wrong package name for storm trident
  Added STORM-1463 to Changelog
  Added STORM-1485 to Changelog
  Added STORM-1486 to Changelog
  Added STORM-1214 to Changelog
  [STORM-1486] Fix storm-kafa documentation
  CHANGELOG: move STORM-1450 to 1.0.0 (backported)
  CHANGELOG.md: move 0.10.0-beta2 to 0.10.0
  Fix CHANGELOG.md to have 0.10.1 and move issues
  Fix misplaced CHANGELOGs
  add STORM-1452 to changelog
  add STORM-1406 to changelog
  adds comments about licensing the profiler feature
  Fixes profiling/debugging out of the box
  add storm-mqtt to binary distribution
  Fixing minor code comments
  ...
@AwesomeJohnR
Copy link

Any updates on the status of this merge? This update would be very helpful

@ooasis
Copy link
Author

ooasis commented Feb 25, 2016

Just did another merge to keep it up to date with master branch. Not sure what the plan is now. I would be happy to make any changes that can help to make the final merge happen.

d2r pushed a commit to d2r/storm that referenced this pull request Oct 16, 2018
We are closing stale Pull Requests to make the list more manageable.

Please re-open any Pull Request that has been closed in error.

Closes apache#608
Closes apache#639
Closes apache#640
Closes apache#648
Closes apache#662
Closes apache#668
Closes apache#692
Closes apache#705
Closes apache#724
Closes apache#728
Closes apache#730
Closes apache#753
Closes apache#803
Closes apache#854
Closes apache#922
Closes apache#986
Closes apache#992
Closes apache#1019
Closes apache#1040
Closes apache#1041
Closes apache#1043
Closes apache#1046
Closes apache#1051
Closes apache#1078
Closes apache#1146
Closes apache#1164
Closes apache#1165
Closes apache#1178
Closes apache#1213
Closes apache#1225
Closes apache#1258
Closes apache#1259
Closes apache#1268
Closes apache#1272
Closes apache#1277
Closes apache#1278
Closes apache#1288
Closes apache#1296
Closes apache#1328
Closes apache#1342
Closes apache#1353
Closes apache#1370
Closes apache#1376
Closes apache#1391
Closes apache#1395
Closes apache#1399
Closes apache#1406
Closes apache#1410
Closes apache#1422
Closes apache#1427
Closes apache#1443
Closes apache#1462
Closes apache#1468
Closes apache#1483
Closes apache#1506
Closes apache#1509
Closes apache#1515
Closes apache#1520
Closes apache#1521
Closes apache#1525
Closes apache#1527
Closes apache#1544
Closes apache#1550
Closes apache#1566
Closes apache#1569
Closes apache#1570
Closes apache#1575
Closes apache#1580
Closes apache#1584
Closes apache#1591
Closes apache#1600
Closes apache#1611
Closes apache#1613
Closes apache#1639
Closes apache#1703
Closes apache#1711
Closes apache#1719
Closes apache#1737
Closes apache#1760
Closes apache#1767
Closes apache#1768
Closes apache#1785
Closes apache#1799
Closes apache#1822
Closes apache#1824
Closes apache#1844
Closes apache#1874
Closes apache#1918
Closes apache#1928
Closes apache#1937
Closes apache#1942
Closes apache#1951
Closes apache#1957
Closes apache#1963
Closes apache#1964
Closes apache#1965
Closes apache#1967
Closes apache#1968
Closes apache#1971
Closes apache#1985
Closes apache#1986
Closes apache#1998
Closes apache#2031
Closes apache#2032
Closes apache#2071
Closes apache#2076
Closes apache#2108
Closes apache#2119
Closes apache#2128
Closes apache#2142
Closes apache#2174
Closes apache#2206
Closes apache#2297
Closes apache#2322
Closes apache#2332
Closes apache#2341
Closes apache#2377
Closes apache#2414
Closes apache#2469
d2r pushed a commit to d2r/storm that referenced this pull request Oct 16, 2018
We are closing stale Pull Requests to make the list more manageable.

Please re-open any Pull Request that has been closed in error.

Closes apache#608
Closes apache#639
Closes apache#640
Closes apache#648
Closes apache#662
Closes apache#668
Closes apache#692
Closes apache#705
Closes apache#724
Closes apache#728
Closes apache#730
Closes apache#753
Closes apache#803
Closes apache#854
Closes apache#922
Closes apache#986
Closes apache#992
Closes apache#1019
Closes apache#1040
Closes apache#1041
Closes apache#1043
Closes apache#1046
Closes apache#1051
Closes apache#1078
Closes apache#1146
Closes apache#1164
Closes apache#1165
Closes apache#1178
Closes apache#1213
Closes apache#1225
Closes apache#1258
Closes apache#1259
Closes apache#1268
Closes apache#1272
Closes apache#1277
Closes apache#1278
Closes apache#1288
Closes apache#1296
Closes apache#1328
Closes apache#1342
Closes apache#1353
Closes apache#1370
Closes apache#1376
Closes apache#1391
Closes apache#1395
Closes apache#1399
Closes apache#1406
Closes apache#1410
Closes apache#1422
Closes apache#1427
Closes apache#1443
Closes apache#1462
Closes apache#1468
Closes apache#1483
Closes apache#1506
Closes apache#1509
Closes apache#1515
Closes apache#1520
Closes apache#1521
Closes apache#1525
Closes apache#1527
Closes apache#1544
Closes apache#1550
Closes apache#1566
Closes apache#1569
Closes apache#1570
Closes apache#1575
Closes apache#1580
Closes apache#1584
Closes apache#1591
Closes apache#1600
Closes apache#1611
Closes apache#1613
Closes apache#1639
Closes apache#1703
Closes apache#1711
Closes apache#1719
Closes apache#1737
Closes apache#1760
Closes apache#1767
Closes apache#1768
Closes apache#1785
Closes apache#1799
Closes apache#1822
Closes apache#1824
Closes apache#1844
Closes apache#1874
Closes apache#1918
Closes apache#1928
Closes apache#1937
Closes apache#1942
Closes apache#1951
Closes apache#1957
Closes apache#1963
Closes apache#1964
Closes apache#1965
Closes apache#1967
Closes apache#1968
Closes apache#1971
Closes apache#1985
Closes apache#1986
Closes apache#1998
Closes apache#2031
Closes apache#2032
Closes apache#2071
Closes apache#2076
Closes apache#2108
Closes apache#2119
Closes apache#2128
Closes apache#2142
Closes apache#2174
Closes apache#2206
Closes apache#2297
Closes apache#2322
Closes apache#2332
Closes apache#2341
Closes apache#2377
Closes apache#2414
Closes apache#2469
@asfgit asfgit closed this in #2880 Oct 22, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants