Skip to content

Commit

Permalink
interim checkin - needs testing - made watches synchronous, send old+…
Browse files Browse the repository at this point in the history
…new state, added add-watch, remove-watch, redefined add-watcher in terms of add-watch
  • Loading branch information
richhickey committed Feb 26, 2009
1 parent 71334ef commit 2ea5600
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 183 deletions.
160 changes: 80 additions & 80 deletions src/jvm/clojure/lang/ARef.java
Expand Up @@ -14,94 +14,94 @@

import java.util.Map;

public abstract class ARef extends AReference implements IRef {
protected volatile IFn validator = null;
private volatile IPersistentMap watchers = PersistentHashMap.EMPTY;
public abstract class ARef extends AReference implements IRef{
protected volatile IFn validator = null;
private volatile IPersistentMap watches = PersistentHashMap.EMPTY;

public ARef() {
super();
}
public ARef(){
super();
}

public ARef(IPersistentMap meta) {
super(meta);
}
public ARef(IPersistentMap meta){
super(meta);
}

void validate(IFn vf, Object val){
try{
if(vf != null && !RT.booleanCast(vf.invoke(val)))
throw new IllegalStateException("Invalid reference state");
}
catch(RuntimeException re)
{
throw re;
}
catch(Exception e)
{
throw new IllegalStateException("Invalid reference state", e);
}
}
void validate(IFn vf, Object val){
try
{
if(vf != null && !RT.booleanCast(vf.invoke(val)))
throw new IllegalStateException("Invalid reference state");
}
catch(RuntimeException re)
{
throw re;
}
catch(Exception e)
{
throw new IllegalStateException("Invalid reference state", e);
}
}

void validate(Object val){
validate(validator,val);
}
void validate(Object val){
validate(validator, val);
}

public void setValidator(IFn vf){
try
{
validate(vf, deref());
}
catch (Exception e)
{
throw new RuntimeException(e);
}
validator = vf;
}
public void setValidator(IFn vf){
try
{
validate(vf, deref());
}
catch(Exception e)
{
throw new RuntimeException(e);
}
validator = vf;
}

public IFn getValidator(){
return validator;
}
public IFn getValidator(){
return validator;
}

public IPersistentMap getWatches(){
return watchers;
}

synchronized public IRef addWatch(Agent watcher, IFn action, boolean sendOff){
watchers = watchers.assoc(watcher, new Object[]{action, sendOff});
return this;
}
public IPersistentMap getWatches(){
return watches;
}

synchronized public IRef removeWatch(Agent watcher){
try
{
watchers = watchers.without(watcher);
}
catch(Exception e)
{
throw new RuntimeException(e);
}
synchronized public IRef addWatch(Object key, IFn callback){
watches = watches.assoc(key, callback);
return this;
}

synchronized public IRef removeWatch(Object key){
try
{
watches = watches.without(key);
}
catch(Exception e)
{
throw new RuntimeException(e);
}

return this;
}
return this;
}

public void notifyWatches() {
IPersistentMap ws = watchers;
if (ws.count() > 0)
{
ISeq args = new Cons(this, null);
for (ISeq s = RT.seq(ws); s != null; s = s.next())
{
Map.Entry e = (Map.Entry) s.first();
Object[] a = (Object[]) e.getValue();
Agent agent = (Agent) e.getKey();
try
{
agent.dispatch((IFn) a[0], args, (Boolean)a[1]);
}
catch (Exception e1)
{
//eat dispatching exceptions and continue
}
}
}
}
public void notifyWatches(Object oldval, Object newval){
IPersistentMap ws = watches;
if(ws.count() > 0)
{
for(ISeq s = ws.seq(); s != null; s = s.next())
{
Map.Entry e = (Map.Entry) s.first();
IFn fn = (IFn) e.getValue();
try
{
if(fn != null)
fn.invoke(e.getKey(), this, oldval, newval);
}
catch(Exception e1)
{
throw new RuntimeException(e1);
}
}
}
}
}
8 changes: 4 additions & 4 deletions src/jvm/clojure/lang/Agent.java
Expand Up @@ -63,12 +63,12 @@ static void doRun(Action action){
nested.set(PersistentVector.EMPTY);

boolean hadError = false;
boolean changed = false;
try
{
changed = action.agent.setState(action.fn.applyTo(RT.cons(action.agent.state, action.args)));
if(changed)
action.agent.notifyWatches();
Object oldval = action.agent.state;
Object newval = action.fn.applyTo(RT.cons(action.agent.state, action.args));
action.agent.setState(newval);
action.agent.notifyWatches(oldval,newval);
}
catch(Throwable e)
{
Expand Down
159 changes: 78 additions & 81 deletions src/jvm/clojure/lang/Atom.java
Expand Up @@ -14,94 +14,91 @@

import java.util.concurrent.atomic.AtomicReference;

public class Atom extends ARef{
final AtomicReference state;
final public class Atom extends ARef{
final AtomicReference state;

public Atom(Object state) {
this.state = new AtomicReference(state);
}
public Atom(Object state){
this.state = new AtomicReference(state);
}

public Atom(Object state, IPersistentMap meta) {
super(meta);
this.state = new AtomicReference(state);
}
public Atom(Object state, IPersistentMap meta){
super(meta);
this.state = new AtomicReference(state);
}

public Object deref() {
return state.get();
}
public Object deref(){
return state.get();
}

public Object swap(IFn f) throws Exception {
for(;;)
{
Object v = deref();
Object newv = f.invoke(v);
validate(newv);
if(state.compareAndSet(v,newv))
{
if(v != newv)
notifyWatches();
return newv;
}
}
}
public Object swap(IFn f) throws Exception{
for(; ;)
{
Object v = deref();
Object newv = f.invoke(v);
validate(newv);
if(state.compareAndSet(v, newv))
{
notifyWatches(v, newv);
return newv;
}
}
}

public Object swap(IFn f, Object arg) throws Exception {
for(;;)
{
Object v = deref();
Object newv = f.invoke(v,arg);
validate(newv);
if(state.compareAndSet(v,newv))
{
if(v != newv)
notifyWatches();
return newv;
}
}
}
public Object swap(IFn f, Object arg) throws Exception{
for(; ;)
{
Object v = deref();
Object newv = f.invoke(v, arg);
validate(newv);
if(state.compareAndSet(v, newv))
{
notifyWatches(v, newv);
return newv;
}
}
}

public Object swap(IFn f, Object arg1, Object arg2) throws Exception {
for(;;)
{
Object v = deref();
Object newv = f.invoke(v, arg1, arg2);
validate(newv);
if(state.compareAndSet(v,newv))
{
if(v != newv)
notifyWatches();
return newv;
}
}
}
public Object swap(IFn f, Object arg1, Object arg2) throws Exception{
for(; ;)
{
Object v = deref();
Object newv = f.invoke(v, arg1, arg2);
validate(newv);
if(state.compareAndSet(v, newv))
{
notifyWatches(v, newv);
return newv;
}
}
}

public Object swap(IFn f, Object x, Object y, ISeq args) throws Exception {
for(;;)
{
Object v = deref();
Object newv = f.applyTo(RT.listStar(v, x, y, args));
validate(newv);
if(state.compareAndSet(v,newv))
{
if(v != newv)
notifyWatches();
return newv;
}
}
}
public Object swap(IFn f, Object x, Object y, ISeq args) throws Exception{
for(; ;)
{
Object v = deref();
Object newv = f.applyTo(RT.listStar(v, x, y, args));
validate(newv);
if(state.compareAndSet(v, newv))
{
notifyWatches(v, newv);
return newv;
}
}
}

public boolean compareAndSet(Object oldv, Object newv){
validate(newv);
boolean ret = state.compareAndSet(oldv, newv);
if (ret && oldv != newv)
notifyWatches();
return ret;
}
public boolean compareAndSet(Object oldv, Object newv){
validate(newv);
boolean ret = state.compareAndSet(oldv, newv);
if(ret)
notifyWatches(oldv, newv);
return ret;
}

public Object reset(Object newval){
validate(newval);
state.set(newval);
notifyWatches();
return newval;
}
public Object reset(Object newval){
Object oldval = state.get();
validate(newval);
state.set(newval);
notifyWatches(oldval, newval);
return newval;
}
}
7 changes: 3 additions & 4 deletions src/jvm/clojure/lang/IRef.java
Expand Up @@ -14,15 +14,14 @@

public interface IRef extends IDeref{

void setValidator(IFn vf);
void setValidator(IFn vf);

IFn getValidator();

IPersistentMap getWatches();

IRef addWatch(Agent watcher, IFn action, boolean sendOff);
IRef addWatch(Object key, IFn callback);

IRef removeWatch(Agent watcher);
IRef removeWatch(Object key);

void notifyWatches();
}

0 comments on commit 2ea5600

Please sign in to comment.