Skip to content
Browse files

asdff

  • Loading branch information...
1 parent d03936b commit d2ccb32f4c504881c9bd42e430cde0830e05a09e Marco Munizaga committed Nov 30, 2012
Showing with 162 additions and 5 deletions.
  1. +40 −0 pconn
  2. +2 −0 project.clj
  3. +40 −5 src/realtime_stats/core.clj
  4. +40 −0 src/realtime_stats/pconn.clj
  5. +40 −0 src/realtime_stats/tcp_conn.clj
View
40 pconn
@@ -0,0 +1,40 @@
+(ns socketspam.pconn
+ (:use lamina.core aleph.tcp aleph.formats))
+
+(def socketspam-host "localhost")
+(def socketspam-port 9000)
+
+;Create an endpoint and ground it so messages don't get queued up
+;if nothing is listening
+(def endpoint (channel))
+(ground endpoint)
+
+(defn send-data
+ "Sends the given data to the endpoint channel, which should hopefully
+ be siphoned into the tcp connection"
+ [data]
+ (enqueue endpoint data))
+
+(declare make-connection)
+(defn connection-established
+ "When a connection is established, forward everything going to endpoint into
+ it. Also, set up recovery handler"
+ [socket]
+ (println "Socket connection established")
+ (on-closed socket make-connection)
+ (siphon endpoint socket))
+
+(defn connection-error
+ "On connection closed, wait a minute and try to recover. We use future
+ to keep the stack from growing (I think that will work)"
+ [error]
+ (println "Error connecting to socketspam: " error)
+ (Thread/sleep 1000)
+ (future (make-connection)))
+
+(defn make-connection
+ "Initializes the connection to socketspam"
+ []
+ (on-realized (tcp-client {:host socketspam-host :port socketspam-port})
+ connection-established
+ connection-error))
View
2 project.clj
@@ -5,4 +5,6 @@
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.4.0"]
[storm "0.8.1"]
+ [aleph "0.3.0-beta5"]
+ [lamina "0.5.0-beta8"]
[org.clojars.jasonjckn/storm-kafka "0.8.2-SNAPSHOT"]])
View
45 src/realtime_stats/core.clj
@@ -1,5 +1,6 @@
(ns realtime-stats.core
(:use [backtype.storm clojure config])
+ (:require [realtime-stats.pconn])
(:import [backtype.storm StormSubmitter LocalCluster]
(storm.kafka KafkaSpout
KafkaConfig
@@ -9,25 +10,36 @@
(java.util ArrayList))
(:gen-class))
+(realtime-stats.pconn/make-connection)
+
+(comment
+
+(realtime-stats.pconn/send-data "hello!")
+ )
(defn foo
"I don't do a whole lot."
[x]
(println x "Hello, World!"))
+(comment
+ (def testString "US 1 2 3")
+ (clojure.string/split testString #" ")
+ )
+
(defn createKafkaSpoutConfig
- []
+ [topic-name zkFile id]
(let [
stringScheme (StringScheme.) ;so the kafka spout reads things as strings we need to create the schema
hostPort (HostPort. "127.0.0.1" 9092)
hostList (ArrayList.)
append (.add hostList hostPort)
staticHost (storm.kafka.KafkaConfig$StaticHosts. hostList 1)
- spoutConfig (SpoutConfig. staticHost "test-topic" "/zkRootFoo" "fooID")]
+ spoutConfig (SpoutConfig. staticHost topic-name zkFile id)]
(set! (. spoutConfig scheme) stringScheme) ;tell the spout to read strings
spoutConfig))
-(def kafkaSpout (KafkaSpout. (createKafkaSpoutConfig)))
+(def kafkaSpout (KafkaSpout. (createKafkaSpoutConfig "test-topic" "/zkRootFoo" "fooID")))
(defspout sentence-spout ["sentence"]
@@ -64,9 +76,26 @@
(defbolt echo-sentence ["word"] [tuple collector]
(let [word (.getString tuple 0)]
- (emit-bolt! collector [(str "hello there!" word)] :anchor tuple)
+ (emit-bolt! collector [word] :anchor tuple)
(ack! collector tuple)))
+(defbolt net-echo-sentence ["word"] [tuple collector]
+ (let [word (.getString tuple 0)]
+ ;(realtime-stats.pconn/send-data word)
+ (emit-bolt! collector [word] :anchor tuple)
+ (ack! collector tuple)))
+
+(defbolt country-count ["word" "count"] {:prepare true}
+ [conf context collector]
+ (let [counts (atom {})]
+ (bolt
+ (execute [tuple]
+ (let [word (.getString tuple 0)]
+ (swap! counts (partial merge-with +) {word 1})
+ (emit-bolt! collector [word (@counts word)] :anchor tuple)
+ (ack! collector tuple)
+ )))))
+
(defbolt word-count ["word" "count"] {:prepare true}
[conf context collector]
(let [counts (atom {})]
@@ -84,7 +113,13 @@
{"1" (spout-spec kafkaSpout)}
{"5" (bolt-spec {"1" :shuffle}
echo-sentence
- :p 2)}))
+ :p 2)
+ "6" (bolt-spec {"5" ["word"]}
+ country-count
+ :p 4)
+ "7" (bolt-spec {"6" :shuffle}
+ net-echo-sentence
+ :p 1)}))
(defn run-local! []
(let [cluster (LocalCluster.)]
View
40 src/realtime_stats/pconn.clj
@@ -0,0 +1,40 @@
+(ns realtime-stats.pconn
+ (:use lamina.core aleph.tcp aleph.formats))
+
+(def socketspam-host "localhost")
+(def socketspam-port 9000)
+
+;Create an endpoint and ground it so messages don't get queued up
+;if nothing is listening
+(def endpoint (channel))
+(ground endpoint)
+
+(defn send-data
+ "Sends the given data to the endpoint channel, which should hopefully
+ be siphoned into the tcp connection"
+ [data]
+ (enqueue endpoint data))
+
+(declare make-connection)
+(defn connection-established
+ "When a connection is established, forward everything going to endpoint into
+ it. Also, set up recovery handler"
+ [socket]
+ (println "Socket connection established")
+ (on-closed socket make-connection)
+ (siphon endpoint socket))
+
+(defn connection-error
+ "On connection closed, wait a minute and try to recover. We use future
+ to keep the stack from growing (I think that will work)"
+ [error]
+ (println "Error connecting to socketspam: " error)
+ (Thread/sleep 1000)
+ (future (make-connection)))
+
+(defn make-connection
+ "Initializes the connection to socketspam"
+ []
+ (on-realized (tcp-client {:host socketspam-host :port socketspam-port})
+ connection-established
+ connection-error))
View
40 src/realtime_stats/tcp_conn.clj
@@ -0,0 +1,40 @@
+(ns realtime-stats.pconn
+ (:use lamina.core aleph.tcp aleph.formats))
+
+(def socketspam-host "localhost")
+(def socketspam-port 9000)
+
+;Create an endpoint and ground it so messages don't get queued up
+;if nothing is listening
+(def endpoint (channel))
+(ground endpoint)
+
+(defn send-data
+ "Sends the given data to the endpoint channel, which should hopefully
+ be siphoned into the tcp connection"
+ [data]
+ (enqueue endpoint data))
+
+(declare make-connection)
+(defn connection-established
+ "When a connection is established, forward everything going to endpoint into
+ it. Also, set up recovery handler"
+ [socket]
+ (println "Socket connection established")
+ (on-closed socket make-connection)
+ (siphon endpoint socket))
+
+(defn connection-error
+ "On connection closed, wait a minute and try to recover. We use future
+ to keep the stack from growing (I think that will work)"
+ [error]
+ (println "Error connecting to socketspam: " error)
+ (Thread/sleep 1000)
+ (future (make-connection)))
+
+(defn make-connection
+ "Initializes the connection to socketspam"
+ []
+ (on-realized (tcp-client {:host socketspam-host :port socketspam-port})
+ connection-established
+ connection-error))

0 comments on commit d2ccb32

Please sign in to comment.
Something went wrong with that request. Please try again.