Skip to content
This repository has been archived by the owner on Feb 8, 2019. It is now read-only.

fix GEARPUMP-122, refactor kafka connector API #25

Closed
wants to merge 1 commit into from

Conversation

manuzhang
Copy link
Contributor

@manuzhang manuzhang commented May 27, 2016

This PR mainly refactor the Kafka connector API with the following changes:

  1. removes OffsetStorage and KafkaStorage, and uses CheckpointStore and KafkaStore for both offset and state checkpoint stores.
  2. adds a checkpoint(CheckpointStoreFactory) method to TimeReplayableSource which makes offset store configurable by user. If checkpoint is not called, then no offset checkpoint will be performed and applications run in at-most-once mode.
  3. moves user facing APIs, KafkaSource, KafkaSink and KafkaStore to Java and keeps their implementations in Scala, which prevents Scala package private methods being exposed in Java programs.
  4. KafkaSource, KafkaSink and KafkaStore are all configurable through Java properties which is the standard Kafka way.
  5. update corresponding UTs.

override def open(context: TaskContext, startTime: TimeStamp): Unit = {
import context.{parallelism, taskId}

LOG.info("KafkaSource opened at start time {}", startTime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think adding the topic list that are consumed to the log is better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already log assigned partitions below. Will that be sufficient ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all right, I missed that part.

@huafengw
Copy link
Contributor

Documentation update?

@manuzhang
Copy link
Contributor Author

may I update doc as follow-up ?

props.put(KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX_CONFIG, appName)
val source = new KafkaSource(sourceTopic, props)
val checkpointStoreFactory = new KafkaStoreFactory(props)
source.checkpoint(checkpointStoreFactory)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if user doesn't call checkpoint?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then kafka offsets will not be checkpointed

@codecov-io
Copy link

codecov-io commented May 27, 2016

Current coverage is 63.75%

Merging #25 into master will decrease coverage by 0.34%

  1. 5 files (not in diff) in ...rpump/streaming/task were modified. more
  2. 2 files (not in diff) in ...streaming/state/impl were modified. more
  3. 1 files (not in diff) in ...e/gearpump/streaming were modified. more
    • Misses -1
    • Hits +1
  4. 2 files (not in diff) in ...security/oauth2/impl were modified. more
  5. 1 files (not in diff) in ...ices/security/oauth2 were modified. more
  6. 4 files (not in diff) in ...he/gearpump/services were modified. more
    • Misses -2
  7. 2 files (not in diff) in ...streaming/hadoop/lib were modified. more
  8. 1 files (not in diff) in ...ump/streaming/hadoop were modified. more
  9. 2 files (not in diff) in ...he/gearpump/jarstore were modified. more
    • Misses +8
    • Hits -8
  10. 2 files (not in diff) in ...rpump/cluster/master were modified. more
    • Misses +75
    • Hits -75
@@             master        #25   diff @@
==========================================
  Files           180        177     -3   
  Lines          5956       5898    -58   
  Methods        5678       5626    -52   
  Messages          0          0          
  Branches        278        272     -6   
==========================================
- Hits           3818       3760    -58   
  Misses         2138       2138          
  Partials          0          0          

Sunburst

Powered by Codecov. Last updated by 32f1507...25b3631

/**
* for tests only
*/
void addPartitionAndStore(TopicAndPartition tp, CheckpointStore store) {
Copy link
Contributor

@huafengw huafengw May 27, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

protected ?

@manuzhang
Copy link
Contributor Author

please do not merge. I need to verify whether integration test passes

@manuzhang manuzhang force-pushed the kafka branch 6 times, most recently from aac082b to ae6671a Compare May 30, 2016 03:55
@manuzhang
Copy link
Contributor Author

I've verified related integration tests pass

@manuzhang manuzhang force-pushed the kafka branch 3 times, most recently from a48d95a to 9c5052a Compare May 31, 2016 04:32
@huafengw
Copy link
Contributor

UT failed

@huafengw
Copy link
Contributor

+1

@asfgit asfgit closed this in 04c3975 Jun 1, 2016
@manuzhang manuzhang deleted the kafka branch June 3, 2016 03:27
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants