Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

add support for stdout and stderr

  • Loading branch information...
commit c7a9c4f56ecf03828c546e7a360ffbb38338bfaf 1 parent b16c1a8
@ninjudd ninjudd authored
View
1  project.clj
@@ -1,4 +1,5 @@
(defproject portal "0.0.1-SNAPSHOT"
:description "The cake is a lie!"
:dependencies [[clojure "1.2.0"]
+ [clojure-useful "0.3.8"]
[aleph "0.1.5-SNAPSHOT"]])
View
29 src/clj/portal/io.clj
@@ -0,0 +1,29 @@
+(ns portal.io
+ (:use portal.core lamina.core
+ [clojure.java.io :only [writer]]
+ [clojure.string :only [join]])
+ (:import (java.io Writer PrintWriter PipedWriter PipedReader)
+ (clojure.lang LineNumberingPushbackReader)))
+
+(defn pipe []
+ (let [writer (PipedWriter.)
+ reader (LineNumberingPushbackReader. (PipedReader. writer))]
+ [reader writer]))
+
+(defn context-writer [channels id type]
+ (let [data (channel)]
+ (writer
+ (proxy [Writer] []
+ (write
+ ([string]
+ (enqueue data (cond (string? string) string
+ (integer? string) (char string)
+ :else (join string))))
+ ([string off len]
+ (enqueue data (if (string? string)
+ (.substring string off (+ off len))
+ (join (->> string (drop off) (take len)))))))
+ (flush []
+ (let [string (join (channel-seq data))]
+ (doseq [ch (channels)]
+ (enqueue-message ch id [type string]))))))))
View
56 src/clj/portal/server.clj
@@ -1,37 +1,45 @@
(ns portal.server
- (:use portal.core lamina.core aleph.tcp
- [clojure.stacktrace :only [root-cause]])
- (:import (java.io PipedWriter PipedReader)
- (clojure.lang LineNumberingPushbackReader LispReader$ReaderException)))
+ (:use portal.core portal.io lamina.core aleph.tcp
+ [clojure.stacktrace :only [root-cause]]
+ [useful :only [update conj-set]])
+ (:import (clojure.lang LispReader$ReaderException)))
(def contexts (atom {}))
(def ^{:dynamic true} *pipe* nil)
-(defn- gen-context [context]
+(defn- gen-context [context id]
(or context
- (clojure.main/with-bindings
- (binding [*pipe* (PipedWriter.)]
- (binding [*in* (LineNumberingPushbackReader. (PipedReader. *pipe*))]
+ (let [[reader writer] (pipe)
+ channels #(get (meta @contexts) id)]
+ (clojure.main/with-bindings
+ (binding [*pipe* writer
+ *in* reader
+ *out* (context-writer channels id "stdout")
+ *err* (context-writer channels id "stderr")]
(ns user)
(agent (get-thread-bindings)))))))
(defn get-context [id]
(or (get @contexts id)
- (get (swap! contexts update-in [id] gen-context) id)))
+ (get (swap! contexts update id gen-context id) id)))
(defmacro with-context
"Execute the given forms in the context associated with id."
- [id & forms]
- `(send (get-context ~id)
- (fn [bindings#]
- (try (push-thread-bindings bindings#)
- ~@forms
- (get-thread-bindings)
- (finally (pop-thread-bindings))))))
+ [channel id & forms]
+ `(do (swap! contexts vary-meta update ~id conj-set ~channel)
+ (send (get-context ~id)
+ (fn [bindings#]
+ (try (push-thread-bindings bindings#)
+ ~@forms
+ (dissoc (get-thread-bindings) #'*agent*)
+ (finally (pop-thread-bindings)))))))
-(defn clear-context! [id]
- (swap! contexts dissoc id))
+(defn clear-context [contexts channel id]
+ (let [contexts (vary-meta contexts update id disj channel)]
+ (if (empty? (get (meta contexts) id))
+ (dissoc contexts id)
+ contexts)))
(defn read-eval [data]
(try (let [forms (read-seq data)]
@@ -41,20 +49,20 @@
(catch LispReader$ReaderException e
["read-error" (root-cause e)])))
-(defn handler [ch client-info]
- (receive-all ch
+(defn handler [channel client-info]
+ (receive-all channel
(fn [frame]
(let [[id type data] (decode-message frame)]
(case type
"stdin" (binding [*out* (get @(get-context id) #'*pipe*)]
(print data)
(flush))
- "eval" (with-context id
- (enqueue-message ch id (read-eval data)))
+ "eval" (with-context channel id
+ (enqueue-message channel id (read-eval data)))
"fork" (swap! contexts
#(assoc % data (get % id)))
- "clear" (clear-context! id)
- (enqueue-message ch id ["invalid" type]))))))
+ "clear" (swap! contexts clear-context channel id)
+ (enqueue-message channel id ["invalid" type]))))))
(defn start [port]
(start-tcp-server handler {:port port, :frame netstring}))
View
21 src/rb/portal.rb
@@ -6,6 +6,7 @@ class Error < StandardError; end
class ReadError < Error; end
class ProtocolError < Error; end
RESULT_WAIT = 0.01
+ BLOCK_SIZE = 1024
def initialize(port, host = "localhost")
@socket = TCPSocket.new(host, port)
@@ -14,11 +15,13 @@ def initialize(port, host = "localhost")
while (message = receive_message)
id, type, content = message
if ["stdout", "stderr"].include?(type)
- out = context(id)[type][1]
+ out = context(id)[type.to_sym][1]
out.write(content)
out.flush
- else
+ elsif ["result", "error", "read-error"].include?(type)
context(id)[:results] << [type, content]
+ else
+ raise ProtocolError, "unknown message type: #{type}"
end
end
end
@@ -44,7 +47,7 @@ def receive_message
end
def context(id)
- @contexts[id] ||= {
+ @contexts[id.to_s] ||= {
:results => [],
:count => 0,
:stdout => IO.pipe,
@@ -61,7 +64,7 @@ def with_context(id)
def eval(form, id = @id || rand)
send_message(id, "eval", form)
- context = context(id.to_s)
+ context = context(id)
count = context[:count] += 1;
lambda do
while (count > context[:results].size)
@@ -71,7 +74,7 @@ def eval(form, id = @id || rand)
case type
when "error" then raise Error, form
when "read-error" then raise ReadError, form
- else form.split("\n")
+ when "result" then form.split("\n")
end
end
end
@@ -80,4 +83,12 @@ def write(string, id = @id)
raise ProtocolError, "context id required to write to stdin" unless id
send_message(id, "stdin", string)
end
+
+ def tail(type, id = @id)
+ raise ProtocolError, "context id required to tail" unless id
+ while true
+ print(context(id)[type.to_sym][0].readpartial(BLOCK_SIZE))
+ end
+ rescue EOFError
+ end
end
Please sign in to comment.
Something went wrong with that request. Please try again.