Skip to content

Commit

Permalink
# sprint-2
Browse files Browse the repository at this point in the history
  • Loading branch information
sboikov committed Feb 19, 2015
1 parent 413143a commit d591ae8
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 62 deletions.
Expand Up @@ -673,9 +673,7 @@ public void start(final IgniteConfiguration cfg,
igfsExecSvc, igfsExecSvc,
restExecSvc); restExecSvc);


cluster = new IgniteClusterImpl(); cluster = new IgniteClusterImpl(ctx);

cluster.setKernalContext(ctx);


U.onGridStart(); U.onGridStart();


Expand Down
Expand Up @@ -44,9 +44,6 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
/** Kernal context. */ /** Kernal context. */
protected transient GridKernalContext ctx; protected transient GridKernalContext ctx;


/** Parent projection. */
private transient ClusterGroup parent;

/** Compute. */ /** Compute. */
private transient IgniteComputeImpl compute; private transient IgniteComputeImpl compute;


Expand All @@ -65,7 +62,7 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
/** Subject ID. */ /** Subject ID. */
private UUID subjId; private UUID subjId;


/** Projection predicate. */ /** Cluster group predicate. */
protected IgnitePredicate<ClusterNode> p; protected IgnitePredicate<ClusterNode> p;


/** Node IDs. */ /** Node IDs. */
Expand All @@ -80,17 +77,13 @@ public ClusterGroupAdapter() {


/** /**
* @param subjId Subject ID. * @param subjId Subject ID.
* @param parent Parent of this projection. * @param ctx Kernal context.
* @param ctx Grid kernal context.
* @param p Predicate. * @param p Predicate.
*/ */
protected ClusterGroupAdapter(@Nullable ClusterGroup parent, protected ClusterGroupAdapter(@Nullable GridKernalContext ctx,
@Nullable GridKernalContext ctx,
@Nullable UUID subjId, @Nullable UUID subjId,
@Nullable IgnitePredicate<ClusterNode> p) @Nullable IgnitePredicate<ClusterNode> p)
{ {
this.parent = parent;

if (ctx != null) if (ctx != null)
setKernalContext(ctx); setKernalContext(ctx);


Expand All @@ -101,18 +94,14 @@ protected ClusterGroupAdapter(@Nullable ClusterGroup parent,
} }


/** /**
* @param parent Parent of this projection. * @param ctx Kernal context.
* @param ctx Grid kernal context.
* @param subjId Subject ID. * @param subjId Subject ID.
* @param ids Node IDs. * @param ids Node IDs.
*/ */
protected ClusterGroupAdapter(@Nullable ClusterGroup parent, protected ClusterGroupAdapter(@Nullable GridKernalContext ctx,
@Nullable GridKernalContext ctx,
@Nullable UUID subjId, @Nullable UUID subjId,
Set<UUID> ids) Set<UUID> ids)
{ {
this.parent = parent;

if (ctx != null) if (ctx != null)
setKernalContext(ctx); setKernalContext(ctx);


Expand All @@ -126,19 +115,15 @@ protected ClusterGroupAdapter(@Nullable ClusterGroup parent,


/** /**
* @param subjId Subject ID. * @param subjId Subject ID.
* @param parent Parent of this projection.
* @param ctx Grid kernal context. * @param ctx Grid kernal context.
* @param p Predicate. * @param p Predicate.
* @param ids Node IDs. * @param ids Node IDs.
*/ */
private ClusterGroupAdapter(@Nullable ClusterGroup parent, private ClusterGroupAdapter(@Nullable GridKernalContext ctx,
@Nullable GridKernalContext ctx,
@Nullable UUID subjId, @Nullable UUID subjId,
@Nullable IgnitePredicate<ClusterNode> p, @Nullable IgnitePredicate<ClusterNode> p,
Set<UUID> ids) Set<UUID> ids)
{ {
this.parent = parent;

if (ctx != null) if (ctx != null)
setKernalContext(ctx); setKernalContext(ctx);


Expand Down Expand Up @@ -179,9 +164,6 @@ public void setKernalContext(GridKernalContext ctx) {


this.ctx = ctx; this.ctx = ctx;


if (parent == null)
parent = ctx.grid().cluster();

gridName = ctx.gridName(); gridName = ctx.gridName();
} }


Expand All @@ -200,7 +182,7 @@ public void setKernalContext(GridKernalContext ctx) {
} }


/** /**
* @return {@link org.apache.ignite.IgniteCompute} for this projection. * @return {@link org.apache.ignite.IgniteCompute} for this cluster group.
*/ */
public final IgniteCompute compute() { public final IgniteCompute compute() {
if (compute == null) { if (compute == null) {
Expand All @@ -213,7 +195,7 @@ public final IgniteCompute compute() {
} }


/** /**
* @return {@link org.apache.ignite.IgniteMessaging} for this projection. * @return {@link org.apache.ignite.IgniteMessaging} for this cluster group.
*/ */
public final IgniteMessaging message() { public final IgniteMessaging message() {
if (messaging == null) { if (messaging == null) {
Expand All @@ -226,7 +208,7 @@ public final IgniteMessaging message() {
} }


/** /**
* @return {@link org.apache.ignite.IgniteEvents} for this projection. * @return {@link org.apache.ignite.IgniteEvents} for this cluster group.
*/ */
public final IgniteEvents events() { public final IgniteEvents events() {
if (evts == null) { if (evts == null) {
Expand All @@ -239,7 +221,7 @@ public final IgniteEvents events() {
} }


/** /**
* @return {@link org.apache.ignite.IgniteServices} for this projection. * @return {@link org.apache.ignite.IgniteServices} for this cluster group.
*/ */
public IgniteServices services() { public IgniteServices services() {
if (svcs == null) { if (svcs == null) {
Expand All @@ -252,7 +234,7 @@ public IgniteServices services() {
} }


/** /**
* @return {@link ExecutorService} for this projection. * @return {@link ExecutorService} for this cluster group.
*/ */
public ExecutorService executorService() { public ExecutorService executorService() {
assert ctx != null; assert ctx != null;
Expand Down Expand Up @@ -349,7 +331,7 @@ else if (ids.size() == 1) {
guard(); guard();


try { try {
return new ClusterGroupAdapter(this, ctx, subjId, this.p != null ? F.and(p, this.p) : p); return new ClusterGroupAdapter(ctx, subjId, this.p != null ? F.and(p, this.p) : p);
} }
finally { finally {
unguard(); unguard();
Expand Down Expand Up @@ -385,7 +367,7 @@ else if (ids.size() == 1) {
nodeIds.add(node.id()); nodeIds.add(node.id());
} }


return new ClusterGroupAdapter(this, ctx, subjId, nodeIds); return new ClusterGroupAdapter(ctx, subjId, nodeIds);
} }
finally { finally {
unguard(); unguard();
Expand All @@ -405,7 +387,7 @@ else if (ids.size() == 1) {
if (contains(n)) if (contains(n))
nodeIds.add(n.id()); nodeIds.add(n.id());


return new ClusterGroupAdapter(this, ctx, subjId, nodeIds); return new ClusterGroupAdapter(ctx, subjId, nodeIds);
} }
finally { finally {
unguard(); unguard();
Expand Down Expand Up @@ -435,7 +417,7 @@ else if (ids.size() == 1) {
nodeIds.add(id); nodeIds.add(id);
} }


return new ClusterGroupAdapter(this, ctx, subjId, nodeIds); return new ClusterGroupAdapter(ctx, subjId, nodeIds);
} }
finally { finally {
unguard(); unguard();
Expand All @@ -456,7 +438,7 @@ else if (ids.size() == 1) {
nodeIds.add(id); nodeIds.add(id);
} }


return new ClusterGroupAdapter(this, ctx, subjId, nodeIds); return new ClusterGroupAdapter(ctx, subjId, nodeIds);
} }
finally { finally {
unguard(); unguard();
Expand All @@ -471,8 +453,8 @@ else if (ids.size() == 1) {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public ClusterGroup forOthers(ClusterGroup prj) { @Override public ClusterGroup forOthers(ClusterGroup grp) {
A.notNull(prj, "prj"); A.notNull(grp, "grp");


if (ids != null) { if (ids != null) {
guard(); guard();
Expand All @@ -483,18 +465,18 @@ else if (ids.size() == 1) {
for (UUID id : ids) { for (UUID id : ids) {
ClusterNode n = node(id); ClusterNode n = node(id);


if (n != null && !prj.predicate().apply(n)) if (n != null && !grp.predicate().apply(n))
nodeIds.add(id); nodeIds.add(id);
} }


return new ClusterGroupAdapter(this, ctx, subjId, nodeIds); return new ClusterGroupAdapter(ctx, subjId, nodeIds);
} }
finally { finally {
unguard(); unguard();
} }
} }
else else
return forPredicate(F.not(prj.predicate())); return forPredicate(F.not(grp.predicate()));
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand All @@ -504,7 +486,7 @@ else if (ids.size() == 1) {


/** /**
* @param excludeIds Node IDs. * @param excludeIds Node IDs.
* @return New projection. * @return New cluster group.
*/ */
private ClusterGroup forOthers(Collection<UUID> excludeIds) { private ClusterGroup forOthers(Collection<UUID> excludeIds) {
assert excludeIds != null; assert excludeIds != null;
Expand All @@ -520,7 +502,7 @@ private ClusterGroup forOthers(Collection<UUID> excludeIds) {
nodeIds.add(id); nodeIds.add(id);
} }


return new ClusterGroupAdapter(this, ctx, subjId, nodeIds); return new ClusterGroupAdapter(ctx, subjId, nodeIds);
} }
finally { finally {
unguard(); unguard();
Expand Down Expand Up @@ -579,12 +561,12 @@ private ClusterGroup forOthers(Collection<UUID> excludeIds) {


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public ClusterGroup forOldest() { @Override public ClusterGroup forOldest() {
return new AgeProjection(this, true); return new AgeClusterGroup(this, true);
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public ClusterGroup forYoungest() { @Override public ClusterGroup forYoungest() {
return new AgeProjection(this, false); return new AgeClusterGroup(this, false);
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand All @@ -595,8 +577,8 @@ private ClusterGroup forOthers(Collection<UUID> excludeIds) {
guard(); guard();


try { try {
return ids != null ? new ClusterGroupAdapter(this, ctx, subjId, ids) : return ids != null ? new ClusterGroupAdapter(ctx, subjId, ids) :
new ClusterGroupAdapter(this, ctx, subjId, p); new ClusterGroupAdapter(ctx, subjId, p);
} }
finally { finally {
unguard(); unguard();
Expand All @@ -605,7 +587,7 @@ private ClusterGroup forOthers(Collection<UUID> excludeIds) {


/** /**
* @param n Node. * @param n Node.
* @return Whether node belongs to this projection. * @return Whether node belongs to this cluster group.
*/ */
private boolean contains(ClusterNode n) { private boolean contains(ClusterNode n) {
assert n != null; assert n != null;
Expand All @@ -615,7 +597,7 @@ private boolean contains(ClusterNode n) {


/** /**
* @param id Node ID. * @param id Node ID.
* @return Whether node belongs to this projection. * @return Whether node belongs to this cluster group.
*/ */
private boolean contains(UUID id) { private boolean contains(UUID id) {
assert id != null; assert id != null;
Expand Down Expand Up @@ -663,10 +645,8 @@ protected Object readResolve() throws ObjectStreamException {
try { try {
IgniteKernal g = IgnitionEx.gridx(gridName); IgniteKernal g = IgnitionEx.gridx(gridName);


ClusterGroup grp = g.cluster(); return ids != null ? new ClusterGroupAdapter(g.context(), subjId, ids) :

p != null ? new ClusterGroupAdapter(g.context(), subjId, p) : g;
return ids != null ? new ClusterGroupAdapter(grp, g.context(), subjId, ids) :
p != null ? new ClusterGroupAdapter(grp, g.context(), subjId, p) : g;
} }
catch (IllegalStateException e) { catch (IllegalStateException e) {
throw U.withCause(new InvalidObjectException(e.getMessage()), e); throw U.withCause(new InvalidObjectException(e.getMessage()), e);
Expand Down Expand Up @@ -815,9 +795,9 @@ private OthersFilter(Collection<UUID> nodeIds) {
} }


/** /**
* Age-based projection. * Age-based cluster group.
*/ */
private static class AgeProjection extends ClusterGroupAdapter { private static class AgeClusterGroup extends ClusterGroupAdapter {
/** Serialization version. */ /** Serialization version. */
private static final long serialVersionUID = 0L; private static final long serialVersionUID = 0L;


Expand All @@ -833,16 +813,16 @@ private static class AgeProjection extends ClusterGroupAdapter {
/** /**
* Required for {@link Externalizable}. * Required for {@link Externalizable}.
*/ */
public AgeProjection() { public AgeClusterGroup() {
// No-op. // No-op.
} }


/** /**
* @param prj Parent projection. * @param parent Parent cluster group.
* @param isOldest Oldest flag. * @param isOldest Oldest flag.
*/ */
private AgeProjection(ClusterGroupAdapter prj, boolean isOldest) { private AgeClusterGroup(ClusterGroupAdapter parent, boolean isOldest) {
super(prj.parent, prj.ctx, prj.subjId, prj.p, prj.ids); super(parent.ctx, parent.subjId, parent.p, parent.ids);


this.isOldest = isOldest; this.isOldest = isOldest;


Expand Down
Expand Up @@ -53,6 +53,24 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
@GridToStringExclude @GridToStringExclude
private ClusterNodeLocalMap nodeLoc; private ClusterNodeLocalMap nodeLoc;


/**
* Required by {@link Externalizable}.
*/
public IgniteClusterImpl() {
// No-op.
}

/**
* @param ctx Kernal context.
*/
public IgniteClusterImpl(GridKernalContext ctx) {
super(ctx, null, (IgnitePredicate<ClusterNode>)null);

cfg = ctx.config();

nodeLoc = new ClusterNodeLocalMapImpl(ctx);
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void setKernalContext(GridKernalContext ctx) { @Override public void setKernalContext(GridKernalContext ctx) {
super.setKernalContext(ctx); super.setKernalContext(ctx);
Expand All @@ -67,7 +85,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
guard(); guard();


try { try {
return new ClusterGroupAdapter(this, ctx, null, Collections.singleton(cfg.getNodeId())); return new ClusterGroupAdapter(ctx, null, Collections.singleton(cfg.getNodeId()));
} }
finally { finally {
unguard(); unguard();
Expand Down
Expand Up @@ -257,7 +257,7 @@ public void testExecution() throws Exception {


IgnitePredicate<Event> lsnr; IgnitePredicate<Event> lsnr;


if (!Ignite.class.isAssignableFrom(projection().getClass())) { if (!IgniteCluster.class.isAssignableFrom(projection().getClass())) {
g.events().localListen(lsnr = new IgnitePredicate<Event>() { g.events().localListen(lsnr = new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) { @Override public boolean apply(Event evt) {
assert evt.type() == EVT_JOB_STARTED; assert evt.type() == EVT_JOB_STARTED;
Expand Down

0 comments on commit d591ae8

Please sign in to comment.