Skip to content

Commit

Permalink
- better usage of netty facilities
Browse files Browse the repository at this point in the history
- added future-handler macro
- now able to supply a future handler to the send function.
  • Loading branch information
niclasmeier committed Jun 19, 2012
1 parent ff55436 commit 1d060c8
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 100 deletions.
2 changes: 1 addition & 1 deletion project.clj
@@ -1,4 +1,4 @@
(defproject herolabs/apns "0.1.2"
(defproject herolabs/apns "0.1.3"
:description "A simple lightweight library to use with the Apple push notification service."
:url "https://github.com/HEROLABS/herolabs-apns"
:dependencies [[org.clojure/clojure "1.3.0"]
Expand Down
175 changes: 85 additions & 90 deletions src/herolabs/apns/push.clj
@@ -1,6 +1,6 @@
(ns herolabs.apns.push
(:use [clojure.tools.logging]
[herolabs.apns.ssl :only (ssl-context ssl-engine)]
[herolabs.apns.ssl :only (ssl-context ssl-engine ssl-engine-factory)]
[herolabs.apns.protocol :only (encoder decoder)]
[clojure.stacktrace])
(:import [org.jboss.netty.channel Channel ChannelFuture Channels ChannelPipeline ChannelPipelineFactory ChannelEvent
Expand All @@ -11,158 +11,153 @@
[org.jboss.netty.handler.ssl SslHandler]
[org.jboss.netty.handler.timeout WriteTimeoutHandler]
[org.jboss.netty.util HashedWheelTimer]
[java.util.concurrent Executors]
[org.jboss.netty.handler.execution ExecutionHandler OrderedMemoryAwareThreadPoolExecutor]
[java.util.concurrent Executors ExecutorService ThreadFactory]
[java.util.concurrent.atomic AtomicInteger]
[java.net InetSocketAddress]
[javax.net.ssl SSLContext]
)
)


(defn- handler [& {:keys [exception-handler close-handler]}]
[javax.net.ssl SSLContext]))


(def ^:private default-thread-pool* (atom nil))

(defn default-thread-pool []
(or @default-thread-pool*
(swap! default-thread-pool*
(fn [_] (Executors/newCachedThreadPool
(let [number (atom 1)
sm (System/getSecurityManager)
group (if sm (.getThreadGroup sm) (.getThreadGroup (Thread/currentThread)))]
(reify ThreadFactory
(newThread [_ r] (let [t (Thread. group r (str "apns-thread-pool-" (swap! number inc)) 0)
t (if (.isDaemon t) (.setDaemon t false) t)
t (if (not= Thread/NORM_PRIORITY (.getPriority t)) (.setPriority t Thread/NORM_PRIORITY) t)]
t)))))))))

(defmacro future-listener [params & body]
(cond
(not (vector? params)) (throw (IllegalArgumentException. "Parameter have to be a vector."))
(not= 1 (count params)) (throw (IllegalArgumentException. "Parameter may only contain one element."))
(empty? body) nil
:else (let [future (first params)]
`(reify org.jboss.netty.channel.ChannelFutureListener
(operationComplete [this# ^ChannelFuture ~future] ~@body)))))

(defn- handler [bootstrap ssl-handler-factory client-handle exception-handler]
"Function to create a ChannelUpstreamHandler"
(proxy [org.jboss.netty.channel.SimpleChannelUpstreamHandler] []
(channelConnected [^ChannelHandlerContext ctx ^ChannelStateEvent event]
(debug "channelConnected")
(trace "channelConnected")
(let [ssl-handler (-> ctx
(.getPipeline)
(.get SslHandler))]
(.handshake ssl-handler)
))
(channelDisconnected [^ChannelHandlerContext ctx ^ChannelStateEvent event]
(trace "channelDisconnected" this))
(messageReceived [^ChannelHandlerContext ctx ^MessageEvent event]
(debug "messageReceived -" (.getMessage event))
(trace "messageReceived -" (.getMessage event))
)
(exceptionCaught [^ChannelHandlerContext ctx ^ExceptionEvent event]
(debug (.getCause event) "exceptionCaught")
(trace (.getCause event) "exceptionCaught")
(when exception-handler (exception-handler (.getCause event)))
(-> event
(.getChannel)
(.close))
)
(channelClosed [^ChannelHandlerContext ctx ^ChannelStateEvent event]
(debug "channelClosed")
(when close-handler (close-handler))
)
))
(trace "channelClosed")
(let [new-handler (ssl-handler-factory)
pipeline (.getPipeline ctx)
ssl-handler (.replace pipeline SslHandler "ssl" new-handler)]
(-> (.connect bootstrap) (.addListener (future-listener [f]
(swap! client-handle (fn [_] (.getChannel f))))))))))


(defn- create-ssl-handler-factory [ssl-engine-factory] (fn [] (SslHandler. (ssl-engine-factory))))

(defn- create-pipeline-factory [ssl-engine handler time-out]
(defn- create-pipeline-factory [ssl-handler-factory protocoll-handler time-out]
"Creates a pipeline factory"
(reify
ChannelPipelineFactory
(getPipeline [this]
(let [id-gen (AtomicInteger.)
timer (HashedWheelTimer.)]
(doto (Channels/pipeline)
(.addLast "ssl" (SslHandler. ssl-engine))
(.addLast "ssl" (ssl-handler-factory))
(.addLast "encoder" (encoder id-gen))
(.addLast "decoder" (decoder))
(.addLast "timeout" (WriteTimeoutHandler. timer (int (if time-out time-out 300))))
(.addLast "handler" handler)
(.addLast "protocoll-handler" protocoll-handler)
))
)
)
)

(defn- connect [^InetSocketAddress address ^SSLContext ssl-context time-out & {:keys [exception-handler close-handler]}]

(defn- default-exception-handler [cause] (info cause "An exception occured while sending push notification to the server."))

(defn- connect [^InetSocketAddress address ^SSLContext ssl-context time-out boss-executor worker-executor exception-handler]
"creates a netty Channel to connect to the server."
(let [engine (ssl-engine ssl-context :use-client-mode true)
pipeline-factory (create-pipeline-factory engine (handler
:exception-handler exception-handler
:close-handler close-handler) time-out)
bootstrap (doto (-> (NioClientSocketChannelFactory.
(Executors/newCachedThreadPool)
(Executors/newCachedThreadPool))
(ClientBootstrap.))
(let [engine-factory (ssl-engine-factory ssl-context :use-client-mode true)
bootstrap (-> (NioClientSocketChannelFactory.
boss-executor worker-executor) (ClientBootstrap.))
ssl-handler-factory (create-ssl-handler-factory engine-factory)
client-handle (atom nil)
pipeline-factory (create-pipeline-factory ssl-handler-factory (handler bootstrap ssl-handler-factory client-handle
exception-handler) time-out)
bootstrap (doto bootstrap
(.setOption "connectTimeoutMillis" 5000)
(.setPipelineFactory pipeline-factory))
future (.connect bootstrap address)
channel (-> future
(.awaitUninterruptibly)
(.getChannel)
)]
(.setPipelineFactory pipeline-factory)
(.setOption "remoteAddress" address))
future (.connect bootstrap)
channel (-> future (.awaitUninterruptibly) (.getChannel))
]
(if (.isSuccess future)
channel
(do
(.releaseExternalResources bootstrap)
nil
)
)
)
)
(swap! client-handle (fn [_] channel))
client-handle)
nil)))


(defn- ensure-connected! [channel ^InetSocketAddress address ^SSLContext ssl-context time-out]
"Internal function to ensure that a channel ref is not nil and the underlying channel is connected.
Returns always a connected channel and updates the channel ref if neccessary."
(let [c @channel]
(if-not (and c (.isConnected c))
(swap! channel (fn [_] (connect address ssl-context time-out
:exception-handler (fn [cause]
(info cause "An exception occured while sending push notification to the server.")
)
:close-handler (fn []
(debug "Resetting internal channel, due to close.")
(reset! channel nil)))))
c
)
)
)

(defprotocol Connection
(is-connected? [this] "Determines is a connection is connected")
(write-message [this message] "Writes a message")
(disconnect [this] "Disconnects a connection from the server")
)

(deftype ApnsConnection [^InetSocketAddress address ^SSLContext ssl-context time-out channel]
Connection
(is-connected? [_]
(let [c @channel]
(and c (.isConnected c)))
)
(write-message [_ message]
(let [c (ensure-connected! channel address ssl-context time-out)]
(.write c message)))
(disconnect [_]
(swap! channel (fn [c]
(when c (.close c))
nil
)))
)

(defprotocol Result
(success? [this] "Determines if the send operation was a success.")
(done? [this] "Checks if the operation already competed.")
)

(defn success? [^ChannelFuture future] (when future (-> future (.awaitUninterruptibly) (.isSuccess))))

(deftype SendResult [^ChannelFuture future]
Result
(success? [_] (-> future
(.awaitUninterruptibly)
(.isSuccess)))
(done? [_] (-> future
(.isDone)))
)

(defn create-connection [^InetSocketAddress address ^SSLContext ssl-context & {:keys [time-out] :or [time-out 300]}]
(defn create-connection [^InetSocketAddress address ^SSLContext ssl-context & {:keys [time-out boss-executor worker-executor exception-handler]
:or {time-out 300
boss-executor (default-thread-pool)
worker-executor (default-thread-pool)
exception-handler default-exception-handler}}]
"Creates a connection"
(ApnsConnection. address ssl-context time-out (atom nil))
)

(defn send-message [^herolabs.apns.push.Connection connection ^String device-token message]
(let [client-handle (connect address ssl-context time-out boss-executor worker-executor exception-handler)]
(when client-handle
(reify Connection
(is-connected? [_] (when-let [channel @client-handle] (.isConnected channel)))
(disconnect [_] (when-let [channel @client-handle] (.close channel)))
(write-message [_ message] (when-let [channel @client-handle] (.write channel message)))))))

(defn send-message [^herolabs.apns.push.Connection connection ^String device-token message & {:keys [completed-listener]}]
"Sends a message in the standard message format to the Apple push service"
(when (and connection device-token message)
(let [msg (with-meta message {:device-token device-token})]
(SendResult. (.write-message connection msg))
)))
(loop [[listener & rest] (if (sequential? completed-listener) completed-listener [completed-listener])
future (.write-message connection (with-meta message {:device-token device-token}))]
(if listener (recur rest (doto future (.addListener listener))) future))))

(defn send-enhanced-message [^herolabs.apns.push.Connection connection ^String device-token message]
"Sends a message in the enhanced message format to the Apple push service"
(when (and connection device-token message)
(let [msg (with-meta message {:device-token device-token :format :enhanced})]
(SendResult. (.write-message connection msg))
(.write-message connection msg)
)))

(defn dev-address []
Expand Down
27 changes: 18 additions & 9 deletions src/herolabs/apns/ssl.clj
@@ -1,6 +1,6 @@
(ns herolabs.apns.ssl
(:use [clojure.tools.logging]
[clojure.java.io :only (input-stream)])
[clojure.java.io :only (input-stream)])
(:import [java.security Security KeyStore]
[javax.net.ssl KeyManager KeyManagerFactory SSLContext SSLEngine TrustManager TrustManagerFactory X509TrustManager]
[java.security.cert X509Certificate CertificateException])
Expand Down Expand Up @@ -37,16 +37,25 @@
)
)

(defn ssl-engine-factory [context & {:keys [use-client-mode] :or {use-client-mode true}}]
"Creates an SSL engine"
(fn [] (let [engine (.createSSLEngine context)]
(if use-client-mode
(doto engine (.setUseClientMode use-client-mode))
engine)
)))



(defn naive-trust-managers [& {:keys [trace] :or [trace false]}]
"Creates a very naive trust manager that will accept all certificates."
(into-array (list (proxy [javax.net.ssl.X509TrustManager] []
(getAcceptedIssuers [] (make-array X509Certificate 0))
(checkClientTrusted [chain auth-type]
(when trace (info "Unknown client certificate:" (.getSubjectDN (get chain 0))))
)
(checkServerTrusted [chain auth-type]
(when trace (info "Unknown server certificate:" (.getSubjectDN (get chain 0))))
)
)))
(getAcceptedIssuers [] (make-array X509Certificate 0))
(checkClientTrusted [chain auth-type]
(when trace (info "Unknown client certificate:" (.getSubjectDN (get chain 0))))
)
(checkServerTrusted [chain auth-type]
(when trace (info "Unknown server certificate:" (.getSubjectDN (get chain 0))))
)
)))
)

0 comments on commit 1d060c8

Please sign in to comment.