Skip to content

Commit

Permalink
# IGNITE-226: WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
vozerov-gridgain committed Feb 13, 2015
1 parent 5e12908 commit 5443bb3
Show file tree
Hide file tree
Showing 38 changed files with 577 additions and 577 deletions.
Expand Up @@ -89,7 +89,7 @@ public abstract class IgfsTask<T, R> extends ComputeTaskAdapter<IgfsTaskArgs<T>,
assert args != null; assert args != null;


IgniteFs fs = ignite.fileSystem(args.igfsName()); IgniteFs fs = ignite.fileSystem(args.igfsName());
IgfsProcessorAdapter igfsProc = ((IgniteKernal) ignite).context().ggfs(); IgfsProcessorAdapter igfsProc = ((IgniteKernal) ignite).context().igfs();


Map<ComputeJob, ClusterNode> splitMap = new HashMap<>(); Map<ComputeJob, ClusterNode> splitMap = new HashMap<>();


Expand Down
Expand Up @@ -261,14 +261,14 @@ public interface GridKernalContext extends Iterable<GridComponent> {
* *
* @return File system processor. * @return File system processor.
*/ */
public IgfsProcessorAdapter ggfs(); public IgfsProcessorAdapter igfs();


/** /**
* Gets GGFS utils processor. * Gets GGFS utils processor.
* *
* @return GGFS utils processor. * @return GGFS utils processor.
*/ */
public IgfsHelper ggfsHelper(); public IgfsHelper igfsHelper();


/** /**
* Gets stream processor. * Gets stream processor.
Expand Down Expand Up @@ -506,7 +506,7 @@ public interface GridKernalContext extends Iterable<GridComponent> {
* *
* @return Thread pool implementation to be used for GGFS outgoing message sending. * @return Thread pool implementation to be used for GGFS outgoing message sending.
*/ */
public ExecutorService getGgfsExecutorService(); public ExecutorService getIgfsExecutorService();


/** /**
* Should return an instance of fully configured thread pool to be used for * Should return an instance of fully configured thread pool to be used for
Expand Down
Expand Up @@ -214,11 +214,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable


/** */ /** */
@GridToStringInclude @GridToStringInclude
private IgfsProcessorAdapter ggfsProc; private IgfsProcessorAdapter igfsProc;


/** */ /** */
@GridToStringInclude @GridToStringInclude
private IgfsHelper ggfsHelper; private IgfsHelper igfsHelper;


/** */ /** */
@GridToStringInclude @GridToStringInclude
Expand Down Expand Up @@ -278,7 +278,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable


/** */ /** */
@GridToStringExclude @GridToStringExclude
private ExecutorService ggfsExecSvc; private ExecutorService igfsExecSvc;


/** */ /** */
@GridToStringExclude @GridToStringExclude
Expand Down Expand Up @@ -325,7 +325,7 @@ public GridKernalContextImpl() {
* @param sysExecSvc System executor service. * @param sysExecSvc System executor service.
* @param p2pExecSvc P2P executor service. * @param p2pExecSvc P2P executor service.
* @param mgmtExecSvc Management executor service. * @param mgmtExecSvc Management executor service.
* @param ggfsExecSvc GGFS executor service. * @param igfsExecSvc GGFS executor service.
* @param restExecSvc REST executor service. * @param restExecSvc REST executor service.
*/ */
@SuppressWarnings("TypeMayBeWeakened") @SuppressWarnings("TypeMayBeWeakened")
Expand All @@ -339,7 +339,7 @@ protected GridKernalContextImpl(
ExecutorService sysExecSvc, ExecutorService sysExecSvc,
ExecutorService p2pExecSvc, ExecutorService p2pExecSvc,
ExecutorService mgmtExecSvc, ExecutorService mgmtExecSvc,
ExecutorService ggfsExecSvc, ExecutorService igfsExecSvc,
ExecutorService restExecSvc) { ExecutorService restExecSvc) {
assert grid != null; assert grid != null;
assert cfg != null; assert cfg != null;
Expand All @@ -353,7 +353,7 @@ protected GridKernalContextImpl(
this.sysExecSvc = sysExecSvc; this.sysExecSvc = sysExecSvc;
this.p2pExecSvc = p2pExecSvc; this.p2pExecSvc = p2pExecSvc;
this.mgmtExecSvc = mgmtExecSvc; this.mgmtExecSvc = mgmtExecSvc;
this.ggfsExecSvc = ggfsExecSvc; this.igfsExecSvc = igfsExecSvc;
this.restExecSvc = restExecSvc; this.restExecSvc = restExecSvc;


try { try {
Expand Down Expand Up @@ -452,7 +452,7 @@ else if (comp instanceof GridRestProcessor)
else if (comp instanceof GridDataLoaderProcessor) else if (comp instanceof GridDataLoaderProcessor)
dataLdrProc = (GridDataLoaderProcessor)comp; dataLdrProc = (GridDataLoaderProcessor)comp;
else if (comp instanceof IgfsProcessorAdapter) else if (comp instanceof IgfsProcessorAdapter)
ggfsProc = (IgfsProcessorAdapter)comp; igfsProc = (IgfsProcessorAdapter)comp;
else if (comp instanceof GridOffHeapProcessor) else if (comp instanceof GridOffHeapProcessor)
offheapProc = (GridOffHeapProcessor)comp; offheapProc = (GridOffHeapProcessor)comp;
else if (comp instanceof GridStreamProcessor) else if (comp instanceof GridStreamProcessor)
Expand Down Expand Up @@ -482,7 +482,7 @@ public void addHelper(Object helper) {
assert helper != null; assert helper != null;


if (helper instanceof IgfsHelper) if (helper instanceof IgfsHelper)
ggfsHelper = (IgfsHelper)helper; igfsHelper = (IgfsHelper)helper;
else else
assert false : "Unknown helper class: " + helper.getClass(); assert false : "Unknown helper class: " + helper.getClass();
} }
Expand Down Expand Up @@ -676,13 +676,13 @@ public void addHelper(Object helper) {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public IgfsProcessorAdapter ggfs() { @Override public IgfsProcessorAdapter igfs() {
return ggfsProc; return igfsProc;
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public IgfsHelper ggfsHelper() { @Override public IgfsHelper igfsHelper() {
return ggfsHelper; return igfsHelper;
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down Expand Up @@ -857,8 +857,8 @@ protected Object readResolve() throws ObjectStreamException {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public ExecutorService getGgfsExecutorService() { @Override public ExecutorService getIgfsExecutorService() {
return ggfsExecSvc; return igfsExecSvc;
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down
Expand Up @@ -527,13 +527,13 @@ private void notifyLifecycleBeansEx(LifecycleEventType evt) {
*@param sysExecSvc *@param sysExecSvc
* @param p2pExecSvc * @param p2pExecSvc
* @param mgmtExecSvc * @param mgmtExecSvc
* @param ggfsExecSvc * @param igfsExecSvc
* @param errHnd Error handler to use for notification about startup problems. @throws IgniteCheckedException Thrown in case of any errors. * @param errHnd Error handler to use for notification about startup problems. @throws IgniteCheckedException Thrown in case of any errors.
*/ */
@SuppressWarnings({"CatchGenericClass", "unchecked"}) @SuppressWarnings({"CatchGenericClass", "unchecked"})
public void start(final IgniteConfiguration cfg, ExecutorService utilityCachePool, final ExecutorService execSvc, public void start(final IgniteConfiguration cfg, ExecutorService utilityCachePool, final ExecutorService execSvc,
final ExecutorService sysExecSvc, ExecutorService p2pExecSvc, ExecutorService mgmtExecSvc, final ExecutorService sysExecSvc, ExecutorService p2pExecSvc, ExecutorService mgmtExecSvc,
ExecutorService ggfsExecSvc, ExecutorService restExecSvc, GridAbsClosure errHnd) ExecutorService igfsExecSvc, ExecutorService restExecSvc, GridAbsClosure errHnd)
throws IgniteCheckedException { throws IgniteCheckedException {
gw.compareAndSet(null, new GridKernalGatewayImpl(cfg.getGridName())); gw.compareAndSet(null, new GridKernalGatewayImpl(cfg.getGridName()));


Expand Down Expand Up @@ -644,7 +644,7 @@ public void start(final IgniteConfiguration cfg, ExecutorService utilityCachePoo
try { try {
GridKernalContextImpl ctx = GridKernalContextImpl ctx =
new GridKernalContextImpl(log, this, cfg, gw, utilityCachePool, execSvc, sysExecSvc, p2pExecSvc, new GridKernalContextImpl(log, this, cfg, gw, utilityCachePool, execSvc, sysExecSvc, p2pExecSvc,
mgmtExecSvc, ggfsExecSvc, restExecSvc); mgmtExecSvc, igfsExecSvc, restExecSvc);


nodeLoc = new ClusterNodeLocalMapImpl(ctx); nodeLoc = new ClusterNodeLocalMapImpl(ctx);


Expand Down Expand Up @@ -2896,7 +2896,7 @@ private boolean runNextNodeCallable(final ConcurrentLinkedQueue<IgniteNodeCallab
guard(); guard();


try{ try{
IgniteFs fs = ctx.ggfs().ggfs(name); IgniteFs fs = ctx.igfs().igfs(name);


if (fs == null) if (fs == null)
throw new IllegalArgumentException("IgniteFs is not configured: " + name); throw new IllegalArgumentException("IgniteFs is not configured: " + name);
Expand All @@ -2913,7 +2913,7 @@ private boolean runNextNodeCallable(final ConcurrentLinkedQueue<IgniteNodeCallab
guard(); guard();


try { try {
return ctx.ggfs().ggfs(name); return ctx.igfs().igfs(name);
} }
finally { finally {
unguard(); unguard();
Expand All @@ -2925,7 +2925,7 @@ private boolean runNextNodeCallable(final ConcurrentLinkedQueue<IgniteNodeCallab
guard(); guard();


try { try {
return ctx.ggfs().ggfss(); return ctx.igfs().igfss();
} }
finally { finally {
unguard(); unguard();
Expand Down
Expand Up @@ -64,7 +64,7 @@ public final class IgniteNodeAttributes {
public static final String ATTR_TX_CONFIG = ATTR_PREFIX + ".tx"; public static final String ATTR_TX_CONFIG = ATTR_PREFIX + ".tx";


/** Internal attribute name constant. */ /** Internal attribute name constant. */
public static final String ATTR_GGFS = ATTR_PREFIX + ".igfs"; public static final String ATTR_IGFS = ATTR_PREFIX + ".igfs";


/** Internal attribute name constant. */ /** Internal attribute name constant. */
public static final String ATTR_MONGO = ATTR_PREFIX + ".mongo"; public static final String ATTR_MONGO = ATTR_PREFIX + ".mongo";
Expand Down
Expand Up @@ -1166,8 +1166,8 @@ private static final class IgniteNamedInstance {
/** P2P executor service. */ /** P2P executor service. */
private ExecutorService p2pExecSvc; private ExecutorService p2pExecSvc;


/** GGFS executor service. */ /** IGFS executor service. */
private ExecutorService ggfsExecSvc; private ExecutorService igfsExecSvc;


/** REST requests executor service. */ /** REST requests executor service. */
private ExecutorService restExecSvc; private ExecutorService restExecSvc;
Expand Down Expand Up @@ -1502,7 +1502,7 @@ private void start0(GridStartContext startCtx) throws IgniteCheckedException {
new LinkedBlockingQueue<Runnable>()); new LinkedBlockingQueue<Runnable>());


// Note that we do not pre-start threads here as igfs pool may not be needed. // Note that we do not pre-start threads here as igfs pool may not be needed.
ggfsExecSvc = new IgniteThreadPoolExecutor( igfsExecSvc = new IgniteThreadPoolExecutor(
"igfs-" + cfg.getGridName(), "igfs-" + cfg.getGridName(),
cfg.getIgfsThreadPoolSize(), cfg.getIgfsThreadPoolSize(),
cfg.getIgfsThreadPoolSize(), cfg.getIgfsThreadPoolSize(),
Expand Down Expand Up @@ -1561,13 +1561,13 @@ else if (marsh instanceof OptimizedMarshaller && !U.isHotSpot()) {
myCfg.setMarshalLocalJobs(cfg.isMarshalLocalJobs()); myCfg.setMarshalLocalJobs(cfg.isMarshalLocalJobs());
myCfg.setNodeId(nodeId); myCfg.setNodeId(nodeId);


IgfsConfiguration[] ggfsCfgs = cfg.getIgfsConfiguration(); IgfsConfiguration[] igfsCfgs = cfg.getIgfsConfiguration();


if (ggfsCfgs != null) { if (igfsCfgs != null) {
IgfsConfiguration[] clone = ggfsCfgs.clone(); IgfsConfiguration[] clone = igfsCfgs.clone();


for (int i = 0; i < ggfsCfgs.length; i++) for (int i = 0; i < igfsCfgs.length; i++)
clone[i] = new IgfsConfiguration(ggfsCfgs[i]); clone[i] = new IgfsConfiguration(igfsCfgs[i]);


myCfg.setIgfsConfiguration(clone); myCfg.setIgfsConfiguration(clone);
} }
Expand Down Expand Up @@ -1845,7 +1845,7 @@ else if (marsh instanceof OptimizedMarshaller && !U.isHotSpot()) {
// Init here to make grid available to lifecycle listeners. // Init here to make grid available to lifecycle listeners.
grid = grid0; grid = grid0;


grid0.start(myCfg, utilityCacheExecSvc, execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc, ggfsExecSvc, grid0.start(myCfg, utilityCacheExecSvc, execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc, igfsExecSvc,
restExecSvc, restExecSvc,
new CA() { new CA() {
@Override public void apply() { @Override public void apply() {
Expand Down Expand Up @@ -2145,9 +2145,9 @@ private void stopExecutors0(IgniteLogger log) {


p2pExecSvc = null; p2pExecSvc = null;


U.shutdownNow(getClass(), ggfsExecSvc, log); U.shutdownNow(getClass(), igfsExecSvc, log);


ggfsExecSvc = null; igfsExecSvc = null;


if (restExecSvc != null) if (restExecSvc != null)
U.shutdownNow(getClass(), restExecSvc, log); U.shutdownNow(getClass(), restExecSvc, log);
Expand Down

0 comments on commit 5443bb3

Please sign in to comment.