From 36366c801dc74be7111bcb2e7f5d78e6fe3570f4 Mon Sep 17 00:00:00 2001 From: Kostya Golikov Date: Tue, 28 Jul 2015 21:44:22 +0300 Subject: [PATCH 1/2] No other storm/external project has changelog and looks like log became obsolete ever since storm-kafka was incorporated into storm. --- external/storm-kafka/CHANGELOG.md | 13 ------------- 1 file changed, 13 deletions(-) delete mode 100644 external/storm-kafka/CHANGELOG.md diff --git a/external/storm-kafka/CHANGELOG.md b/external/storm-kafka/CHANGELOG.md deleted file mode 100644 index 33a49eeccdb..00000000000 --- a/external/storm-kafka/CHANGELOG.md +++ /dev/null @@ -1,13 +0,0 @@ -## 0.9.2-incubating (0.5.0) -* incorporated as an Apache Storm external module -* fixed partition assignment for KafkaSpout -* upgraded to storm 0.9.1 - -## 0.4.0 -* added support for reading kafka message keys -* configurable metrics emit interval - -## 0.3.0 -* updated partition path in zookeeper -* added error handling for fetch request - From 4686a47cabb283037d165438a1f4c886a9e1f47a Mon Sep 17 00:00:00 2001 From: Kostya Golikov Date: Mon, 17 Aug 2015 16:01:37 +0300 Subject: [PATCH 2/2] Removing TODO as well, to track issues and proposals see https://issues.apache.org/jira/browse/STORM --- TODO | 178 ----------------------------------------------------------- 1 file changed, 178 deletions(-) delete mode 100644 TODO diff --git a/TODO b/TODO deleted file mode 100644 index 23c88376666..00000000000 --- a/TODO +++ /dev/null @@ -1,178 +0,0 @@ -Use cases: - -1. number of steps between 2 people in a graph (topology with cycles?) - - -################# - -* Repackage jzmq and zmq as a leiningen "native dep" - - this might be good, since the native dep can package builds for all different systems/os's? - - -* Deploy design: - -- storm swap {name} {jar} {class} -- it's allowed to use resources equal to current running topology plus number of free resources -- starts in deactivated mode -- add TOPOLOGY_STARTUP_TIME config for the delay until nimbus activates a topology after launching it -- for swap, after the startup time, deactivate the other topology, wait the TOPOLOGY_MESSAGE_TIMEOUT_SECS, and then activate the other topology -- should be able to decrease the message timeout for killing or swapping (add optional thrift parameter) -- or just make it part of the config? -- add killWithOptions, swap, swapWithOptions - -* Storm UI, stats, debugging, diagnosis tools --- need to be able to hide system streams/components from the calculations (another query param and should be default) --- need to optimize (slowness is probably on nimbus end of querying zk, consider adding heartbeat caching into nimbus) --- add margins --- add titles so its easier to distinguish the various pages --- right align all table columns except for the leftmost - -* Unit test the core pieces that have stabilized their APIs - -- process simulator -- virtual ports -- supervisor -- utils -- test worker/tasks - -* implement pseudo-distributed mode -- this is for testing the distributed parts of the code - - perhaps i can use pallet/vmfest for this - -* Need integration tests that run on an actual storm cluster (scp code/process code/zookeeper code not tested in unit tests) - -* bolts with none grouping can be pushed into a bolt. e.g. A -> B -> C - A -> D -> E - -If A -> B and A -> D are shuffle grouping = none, and B -> C and D -> E are not, then both can be run in A, b's branch goes to C and D's branch goes to E - - -* Failure design - -Add fail method to outputcollector -Fail sends fail message to Acker for those anchors, which sends fail message back to spout. -Whenever spout fails a tuple, it emits it in its failure stream... - -Add fail method to drpc... Causes blocked thread to throw exception - -* Have worker heartbeat with its task ids, nimbus verifies - if wrong, reassign tasks? -- detect and ignore stray tasks -Each worker can choose a unique id for itself when heart beating -- nimbus deletes those that aren't in topology - -* Subscriptions design - --- new kind of spout: "subscription spout" - --> goal is to sync it's data across the tasks that subscribe to its streams - --> after doing a grouping, remembers what task it sent the tuple to (regardless of grouping). if a task dies, it knows its subscriptions and asks to be resynced - --> normal operation is to push to tasks, but pull done when a task starts up (b/c previous task died or something) - --> need to be able to add tuples to subscription or take tuples away (this is protocol with who you're subscribing to - e.g. rocket) - --> subscriptions can only happen in a spout because it requires persistent state - --> when subscription spout task dies, it polls the source (e.g. rocket) for all the subscription info - --> ideally you'd set things up to have one subscription spout per rocket server - --> TODO: Need some way to delete subscriptions -> part of tuple or extra metadata on tuple (extra metadata seems cleaner) - --> add isSubscription() method to Tuple as well as a getSubscriptionType() [which returns ADD or REMOVE] - --> when a spout starts up, it also needs to push all of its subscription info - --> acks are irrelevant for subscription tuples -- how should acks be managed as an abstraction? - -- maybe the synchronized state is done for you -- you just access the state directly and receive a callback whenever it changes? - -- so don't use tuples... - --> subscriptions break all the abstractions, perhaps I should generalize spouts and factor acking as a library on top of storm. subscriptions would just be another kind of library? -> no, it seems to break abstractions anyway (like keeping task -> tuples in memory) - --> maybe call it "syncspout" - --> if just do syncing (don't expose tuples directly?) - --> have a "SubscribedState" class that takes care of indexing/etc. --> expose it through topologycontext? - -- need a way to distinguish between states of different streams - -- has "add" and "remove" methods - -- bolt can give a statemanager object that implements add and remove in the prepare method - -- add(Tuple tuple) - -- remove(Tuple tuple) - --> synchronize protocol (when spout or source of data dies): - --> send how many tuples are going to be sent - --> send the tuples - --> OR: pack everything together into a single message (could be hard b/c where tuples are supposed to go is abstracted away) - --> tie everything together with a unique ID - --> once task receives everything, has info needed to remove tuples - --> statespout should do long-polling with timeout - --> to do subscriptions, the state should contain something like [url, subscriber]. some bolt appends subscriber to tuples, group by subscriber, and send info back - --> how to to fields grouping with an even distribution? - --> ********* tasks need to block on startup until they're synchronized ********* - --> send sync messages in a loop until it's synchronized - --> add a task.synchronize.poll.freq.secs config (default to 10 seconds) - --> need to buffer other messages as topology is waiting for synchronization messages (use disk?) - --> could use acking system to know if a piece of state gets fully synchronized and communicate this with user - --> perhaps expose this through a special stream? (the state status stream -> similar to failure streams) - --> should be able to do updates of existing state - --> use case: have a knob that you can set externally - --> this isn't really any better than just using zookeeper directly - - -_myState = context.setSubscribedState(_myState) - -StateSpout { - //does a timeout long poll and emits new add or remove state tuples (add and remove on the output collector) - nextTuple(StateSpoutOutputCollector) //collector has add and remove methods add(id, tuple). remove(id) - //emits all the tuples into the output collector (in the background, will also send ids and counts to tasks so they know how to synchronize) - //called on startup - //collector can have a synchronize method in case the source of data (e.g., rocket) craps out - synchronize(SynchronizationOutputCollector) //collector only has add(id, tuple) method -} - -//task startup (in prepare method) [this is automatic] -for(int taskId: statespoutids) { - emitDirect(SYNC_STREAM, tuple()) -} - -statespout synchronization(): - id = uuid() - //getAlLStateTuples calls synchronize on the spout to get the tuples - for(Tuple t: getAllStateTuplesFromSource()) { - List tasks = emit(cons(id, t)); - .. keep track of id -> tasks -> count - for(task: all output tasks) { - emitDirect(task, id, count) - } - } - -for synchronization to work, task needs to keep track of which tasks sent it tuples, and compare against only that set on synchronization - -Need a way to propogate information back up the topology - "subscriptions" -e.g. browser -> rocket -> bolt -> bolt -> bolt. - -example: #retweets for a subscribed set of tweet ids - -storm topology - - -> tweet spout (A) -> group on original id -> count (B) -> rocket - -subscriptions: rocket -> count (B) tweet id (need to group) -> spout (need to go to all) - --- how does it work when stuff dies downstream or upstream? do people ask what the subscriptions are? or do you push your subscriptions up? a combination? - --- maybe subscriptions are a "constant" spout? e..g, continuously emits and refreshes to make sure every task has the tuple. this seem amporphous and hard to implement... nimbus would need to refire all constant spouts whenever there's a reassignment that affects the flow of data. subscriptions seem more natural - --- subscriptions are a special kind of stream that are driven by being asked to send it. e..g, rocket is a spout that emits subscription/unsubscription tuples. they only send it when they get something new, or are asked as to what all the subscriptions are - --- maybe you just need a system stream to know when tasks are created. when you see that a downstream task is created, you know to fire subscriptions to it if its subscribed to your subscriptions stream? - how does this interplay with all the grouping types... you almost want to do a grouping and only send what to tasks that would have received. spouts would need to be able to subscribe to streams as well - -(use 'backtype.storm.testing) -;;(start-simulating-time!) -(def cluster (mk-local-storm-cluster)) -(use 'backtype.storm.bootstrap) (bootstrap) -(import '[backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter]) -(def spout (feeder-spout ["word"])) -(def topology (thrift/mk-topology - {1 (thrift/mk-spout-spec spout :parallelism-hint 3)} - {2 (thrift/mk-bolt-spec {1 ["word"]} (TestWordCounter.) :parallelism-hint 4) - 3 (thrift/mk-bolt-spec {1 :global} (TestGlobalCount.)) - 4 (thrift/mk-bolt-spec {2 :global} (TestAggregatesCounter.)) - })) -(submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 4 TOPOLOGY-DEBUG true} topology) - - -* clean up project - - remove log4j dir and instead generate it in the deploy (it's only used in bin/storm -> create a console one and put into bin/) - - include system component / stream information in the topologycontext and clean up system specific code all over the place - -* Very rare errors - -weird nullptr exceptions: -(tasks i) on send-fn -no virtual port socket for outbound task (in worker) -