Skip to content

Commit

Permalink
Merge pull request #4 from ghoseb/master
Browse files Browse the repository at this point in the history
Minor improvements.
  • Loading branch information
niclasmeier committed Jan 11, 2013
2 parents a3a7a1f + 0c7fab2 commit 54788e9
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 171 deletions.
18 changes: 7 additions & 11 deletions .gitignore
@@ -1,14 +1,10 @@
/target
/lib
/classes
/checkouts
pom.xml pom.xml
*jar *.jar
*class *.class
*iml
/lib/
/classes/
/newrelic/logs/
.lein-deps-sum .lein-deps-sum
.lein-failures .lein-failures
/.idea/ .lein-plugins
*.iml
.project
.classpath
.settings/
74 changes: 33 additions & 41 deletions src/herolabs/apns/feedback.clj
Expand Up @@ -14,25 +14,23 @@
[java.util.concurrent Executors ExecutorService ThreadFactory] [java.util.concurrent Executors ExecutorService ThreadFactory]
[java.util.concurrent LinkedBlockingQueue TimeUnit] [java.util.concurrent LinkedBlockingQueue TimeUnit]
[java.net InetSocketAddress] [java.net InetSocketAddress]
[javax.net.ssl SSLContext] [javax.net.ssl SSLContext]))
)
)




(def ^:private default-thread-pool* (atom nil)) (def ^:private default-thread-pool* (atom nil))


(defn default-thread-pool [] (defn default-thread-pool []
(or @default-thread-pool* (or @default-thread-pool*
(swap! default-thread-pool* (swap! default-thread-pool*
(fn [_] (Executors/newCachedThreadPool (fn [_] (Executors/newCachedThreadPool
(let [number (atom 1) (let [number (atom 1)
sm (System/getSecurityManager) sm (System/getSecurityManager)
group (if sm (.getThreadGroup sm) (.getThreadGroup (Thread/currentThread)))] group (if sm (.getThreadGroup sm) (.getThreadGroup (Thread/currentThread)))]
(reify ThreadFactory (reify ThreadFactory
(newThread [_ r] (let [t (Thread. group r (str "apns-feedback-" (swap! number inc)) 0) (newThread [_ r] (let [t (Thread. group r (str "apns-feedback-" (swap! number inc)) 0)
t (if (.isDaemon t) (.setDaemon t false) t) t (if (.isDaemon t) (.setDaemon t false) t)
t (if (not= Thread/NORM_PRIORITY (.getPriority t)) (.setPriority t Thread/NORM_PRIORITY) t)] t (if (not= Thread/NORM_PRIORITY (.getPriority t)) (.setPriority t Thread/NORM_PRIORITY) t)]
t))))))))) t)))))))))


(def ^:private timer* (ref nil)) (def ^:private timer* (ref nil))


Expand All @@ -45,19 +43,17 @@
(channelConnected [^ChannelHandlerContext ctx ^ChannelStateEvent event] (channelConnected [^ChannelHandlerContext ctx ^ChannelStateEvent event]
(debug "channelConnected") (debug "channelConnected")
(let [ssl-handler (-> ctx (let [ssl-handler (-> ctx
(.getPipeline) (.getPipeline)
(.get SslHandler))] (.get SslHandler))]
(.handshake ssl-handler) (.handshake ssl-handler)))
))
(messageReceived [^ChannelHandlerContext ctx ^MessageEvent event] (messageReceived [^ChannelHandlerContext ctx ^MessageEvent event]
(debug "messageReceived - " (.getMessage event)) (debug "messageReceived - " (.getMessage event))
(.put queue (.getMessage event))) (.put queue (.getMessage event)))
(exceptionCaught [^ChannelHandlerContext ctx ^ExceptionEvent event] (exceptionCaught [^ChannelHandlerContext ctx ^ExceptionEvent event]
(debug (.getCause event) "exceptionCaught") (debug (.getCause event) "exceptionCaught")
(-> event (-> event
(.getChannel) (.getChannel)
(.close)) (.close)))
)
(channelClosed [^ChannelHandlerContext ctx ^ChannelStateEvent event] (channelClosed [^ChannelHandlerContext ctx ^ChannelStateEvent event]
(debug "channelClosed")))) (debug "channelClosed"))))


Expand All @@ -68,46 +64,42 @@
(reify (reify
ChannelPipelineFactory ChannelPipelineFactory
(getPipeline [this] (getPipeline [this]
(doto (Channels/pipeline) (doto (Channels/pipeline)
(.addLast "ssl" (SslHandler. ssl-engine)) (.addLast "ssl" (SslHandler. ssl-engine))
(.addLast "decoder" (feedback-decoder)) (.addLast "decoder" (feedback-decoder))
(.addLast "timeout" (ReadTimeoutHandler. (timer) (int (if time-out time-out 300)))) (.addLast "timeout" (ReadTimeoutHandler. (timer) (int (if time-out time-out 300))))
(.addLast "handler" handler))))) (.addLast "handler" handler)))))


(defn- connect [^InetSocketAddress address ^SSLContext ssl-context time-out queue boss-executor worker-executor] (defn- connect [^InetSocketAddress address ^SSLContext ssl-context time-out queue boss-executor worker-executor]
"creates a netty Channel to connect to the server." "creates a netty Channel to connect to the server."
(try (try
(let [engine (ssl-engine ssl-context :use-client-mode true) (let [engine (ssl-engine ssl-context :use-client-mode true)
pipeline-factory (create-pipeline-factory engine (handler queue) time-out) pipeline-factory (create-pipeline-factory engine (handler queue) time-out)
bootstrap (doto (-> (NioClientSocketChannelFactory. boss-executor worker-executor) bootstrap (doto (-> (NioClientSocketChannelFactory. boss-executor worker-executor)
(ClientBootstrap.)) (ClientBootstrap.))
(.setOption "connectTimeoutMillis" 5000) (.setOption "connectTimeoutMillis" 5000)
(.setPipelineFactory pipeline-factory)) (.setPipelineFactory pipeline-factory))
future (.connect bootstrap address) future (.connect bootstrap address)
channel (-> future channel (-> future
(.awaitUninterruptibly) (.awaitUninterruptibly)
(.getChannel) (.getChannel))]
)]
(if (.isSuccess future) (if (.isSuccess future)
channel channel
(do (do
(.releaseExternalResources bootstrap) (.releaseExternalResources bootstrap)
nil nil)))
)
)
)
(catch java.lang.Exception e (catch java.lang.Exception e
(warn e "Error")))) (warn e "Error"))))




(defn- read-feedback [queue channel] (defn- read-feedback [queue channel]
"Internal function to create a lazy-seq returning the data from the feedback service" "Internal function to create a lazy-seq returning the data from the feedback service"
(lazy-seq (lazy-seq
(if-let [next (.poll queue 10 TimeUnit/SECONDS)] (if-let [next (.poll queue 10 TimeUnit/SECONDS)]
(cons next (read-feedback queue channel)) (cons next (read-feedback queue channel))
(when (.isConnected channel) (when (.isConnected channel)
(.close channel) (.close channel)
nil)))) nil))))


(defn feedback [^InetSocketAddress address ^SSLContext ssl-context & {:keys [time-out boss-executor worker-executor] (defn feedback [^InetSocketAddress address ^SSLContext ssl-context & {:keys [time-out boss-executor worker-executor]
:or {time-out 300 :or {time-out 300
Expand All @@ -120,4 +112,4 @@


(defn dev-address [] (InetSocketAddress. "feedback.sandbox.push.apple.com" 2196)) (defn dev-address [] (InetSocketAddress. "feedback.sandbox.push.apple.com" 2196))


(defn prod-address [] (InetSocketAddress. "feedback.push.apple.com" 2196)) (defn prod-address [] (InetSocketAddress. "feedback.push.apple.com" 2196))
22 changes: 7 additions & 15 deletions src/herolabs/apns/message.clj
Expand Up @@ -4,41 +4,33 @@
(defn with-badge [message number] (assoc-in message [:aps :badge ] number)) (defn with-badge [message number] (assoc-in message [:aps :badge ] number))


(defn with-sound [message sound] (defn with-sound [message sound]
(assoc-in message [:aps :sound ] (name sound)) (assoc-in message [:aps :sound ] (name sound)))
)


(defn with-standard-alert [message body] (defn with-standard-alert [message body]
(assoc-in message [:aps :alert ] body) (assoc-in message [:aps :alert ] body))
)


(defn with-action-loc-key [message key] (defn with-action-loc-key [message key]
(if key (if key
(assoc-in message [:aps :alert :action-loc-key ] key) (assoc-in message [:aps :alert :action-loc-key ] key)
message) message))
)


(defn with-loc-key [message key] (defn with-loc-key [message key]
(if key (if key
(assoc-in message [:aps :alert :loc-key ] key) (assoc-in message [:aps :alert :loc-key ] key)
message) message))
)




(defn with-loc-args [message args] (defn with-loc-args [message args]
(if-not (empty? args) (if-not (empty? args)
(assoc-in message [:aps :alert :loc-args ] (if (sequential? args) args (list args))) (assoc-in message [:aps :alert :loc-args ] (if (sequential? args) args (list args)))
message) message))
)


(defn with-payload [message payload] (defn with-payload [message payload]
(if-not (empty? payload) (if-not (empty? payload)
(merge (dissoc payload :aps) message) (merge (dissoc payload :aps) message)
message) message))
)


(defn with-alert-body [message body] (defn with-alert-body [message body]
(if key (if key
(assoc-in message [:aps :alert :body ] body) (assoc-in message [:aps :alert :body ] body)
message) message))
)

69 changes: 22 additions & 47 deletions src/herolabs/apns/protocol.clj
Expand Up @@ -4,8 +4,7 @@
[org.jboss.netty.buffer ChannelBuffer ChannelBuffers] [org.jboss.netty.buffer ChannelBuffer ChannelBuffers]
[java.nio ByteOrder] [java.nio ByteOrder]
[org.apache.commons.codec.binary Hex] [org.apache.commons.codec.binary Hex]
[java.util.concurrent.atomic AtomicInteger] [java.util.concurrent.atomic AtomicInteger]))
))


;; some constants ;; some constants
(def ^:private standard-head (byte-array 1 (byte 0))) (def ^:private standard-head (byte-array 1 (byte 0)))
Expand All @@ -19,39 +18,32 @@
(byte 5) :invalid-token-size (byte 5) :invalid-token-size
(byte 6) :invalid-topic-size (byte 6) :invalid-topic-size
(byte 7) :invalid-payload-size (byte 7) :invalid-payload-size
(byte 8) :invalid-token (byte 8) :invalid-token})
})




(def ^:dynamic *coercions* json/*coercions*) (def ^:dynamic *coercions* json/*coercions*)


(defn- serialize [msg] (defn- serialize [msg]
"Serializes the map into a JSON representation" "Serializes the map into a JSON representation"
(binding [json/*coercions* *coercions*] (binding [json/*coercions* *coercions*]
(json/generate-string msg) (json/generate-string msg)))
)
)


(defn- dynamic-buffer [len] (defn- dynamic-buffer [len]
"Creates a dynamic buffer." "Creates a dynamic buffer."
(ChannelBuffers/dynamicBuffer ByteOrder/BIG_ENDIAN len) (ChannelBuffers/dynamicBuffer ByteOrder/BIG_ENDIAN len))
)


(defn- encode-message [^String device-token msg] (defn- encode-message [^String device-token msg]
"Encodes a message into the standard APNS protocol format "Encodes a message into the standard APNS protocol format
http://developer.apple.com/library/mac/#documentation/NetworkingInternet/Conceptual/RemoteNotificationsPG/CommunicatingWIthAPS/CommunicatingWIthAPS.html" http://developer.apple.com/library/mac/#documentation/NetworkingInternet/Conceptual/RemoteNotificationsPG/CommunicatingWIthAPS/CommunicatingWIthAPS.html"
(let [token (.decode hex-codec device-token) (let [token (.decode hex-codec device-token)
serialized (serialize msg) serialized (serialize msg)
buffer (doto (dynamic-buffer (+ 1 2 (count token) 2 (count serialized))) buffer (doto (dynamic-buffer (+ 1 2 (count token) 2 (count serialized)))
(.writeBytes standard-head) (.writeBytes standard-head)
(.writeShort (int (count token))) (.writeShort (int (count token)))
(.writeBytes token) (.writeBytes token)
(.writeShort (int (count serialized))) (.writeShort (int (count serialized)))
(.writeBytes (.getBytes serialized))) (.writeBytes (.getBytes serialized)))]
] buffer))
buffer
)
)






Expand All @@ -64,17 +56,14 @@
expires (get m :expires Integer/MAX_VALUE) expires (get m :expires Integer/MAX_VALUE)
serialized (serialize msg) serialized (serialize msg)
buffer (doto (dynamic-buffer (+ 1 4 4 2 (count token) 2 (count serialized))) buffer (doto (dynamic-buffer (+ 1 4 4 2 (count token) 2 (count serialized)))
(.writeBytes enhanced-head) (.writeBytes enhanced-head)
(.writeInt id) (.writeInt id)
(.writeInt (int expires)) (.writeInt (int expires))
(.writeShort (int (count token))) (.writeShort (int (count token)))
(.writeBytes token) (.writeBytes token)
(.writeShort (int (count serialized))) (.writeShort (int (count serialized)))
(.writeBytes (.getBytes serialized))) (.writeBytes (.getBytes serialized)))]
] buffer))
buffer
)
)






Expand All @@ -87,14 +76,8 @@
(if-let [device-token (get (meta msg) :device-token )] (if-let [device-token (get (meta msg) :device-token )]
(if (= :enhanced (get (meta msg) :format )) (if (= :enhanced (get (meta msg) :format ))
(encode-enhanced-message id-gen device-token msg) (encode-enhanced-message id-gen device-token msg)
(encode-message device-token msg) (encode-message device-token msg))
) (throw (IllegalArgumentException. "Message must contain a :device-token as meta.")))))))
(throw (IllegalArgumentException. "Message must contain a :device-token as meta."))
)
)
)
)
)




(defn decoder [] (defn decoder []
Expand All @@ -104,11 +87,7 @@
(let [command (.readByte msg) (let [command (.readByte msg)
status (.readByte msg) status (.readByte msg)
id (.readInt msg)] id (.readInt msg)]
{:status (get status-dictionary status :unknown ) :id id} {:status (get status-dictionary status :unknown ) :id id}))))
)
)
)
)


(defn feedback-decoder [] (defn feedback-decoder []
"Creates an decoder for the APNS protocol." "Creates an decoder for the APNS protocol."
Expand All @@ -119,8 +98,4 @@
token-bytes (byte-array token-len)] token-bytes (byte-array token-len)]
(.readBytes msg token-bytes) (.readBytes msg token-bytes)
(let [token (Hex/encodeHexString token-bytes)] (let [token (Hex/encodeHexString token-bytes)]
[token time] [token time])))))
))
)
)
)

0 comments on commit 54788e9

Please sign in to comment.