Skip to content

Commit

Permalink
# IGNITE-226: WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
vozerov-gridgain committed Feb 13, 2015
1 parent 5443bb3 commit 453d9cc
Show file tree
Hide file tree
Showing 19 changed files with 143 additions and 143 deletions.
Expand Up @@ -173,9 +173,9 @@ void awaitInit() {

grpBlockSize = igfsCtx.configuration().getBlockSize() * grpSize;

String ggfsName = igfsCtx.configuration().getName();
String igfsName = igfsCtx.configuration().getName();

topic = F.isEmpty(ggfsName) ? TOPIC_IGFS : TOPIC_IGFS.topic(ggfsName);
topic = F.isEmpty(igfsName) ? TOPIC_IGFS : TOPIC_IGFS.topic(igfsName);

igfsCtx.kernalContext().io().addMessageListener(topic, new GridMessageListener() {
@Override public void onMessage(UUID nodeId, Object msg) {
Expand Down Expand Up @@ -221,7 +221,7 @@ else if (msg instanceof IgfsAckMessage)
maxPendingPuts = igfsCtx.configuration().getDualModeMaxPendingPutsSize();

delWorker = new AsyncDeleteWorker(igfsCtx.kernalContext().gridName(),
"igfs-" + ggfsName + "-delete-worker", log);
"igfs-" + igfsName + "-delete-worker", log);
}

/** {@inheritDoc} */
Expand Down
Expand Up @@ -95,9 +95,9 @@ public class IgfsFragmentizerManager extends IgfsManager {

fragmentizerWorker = new FragmentizerWorker();

String ggfsName = igfsCtx.configuration().getName();
String igfsName = igfsCtx.configuration().getName();

topic = F.isEmpty(ggfsName) ? TOPIC_IGFS : TOPIC_IGFS.topic(ggfsName);
topic = F.isEmpty(igfsName) ? TOPIC_IGFS : TOPIC_IGFS.topic(igfsName);

igfsCtx.kernalContext().io().addMessageListener(topic, fragmentizerWorker);

Expand Down
Expand Up @@ -1717,10 +1717,10 @@ private void deleteFile(IgfsPath path, FileDescriptor desc, boolean rmvLocked) t
*/
private boolean sameIgfs(IgfsAttributes[] attrs) {
if (attrs != null) {
String ggfsName = name();
String igfsName = name();

for (IgfsAttributes attr : attrs) {
if (F.eq(ggfsName, attr.igfsName()))
if (F.eq(igfsName, attr.igfsName()))
return true;
}
}
Expand Down
Expand Up @@ -58,7 +58,7 @@ public class IgfsProcessor extends IgfsProcessorAdapter {
};

/** */
private final ConcurrentMap<String, IgfsContext> ggfsCache =
private final ConcurrentMap<String, IgfsContext> igfsCache =
new ConcurrentHashMap8<>();

/**
Expand Down Expand Up @@ -93,7 +93,7 @@ public IgfsProcessor(GridKernalContext ctx) {
for (IgfsManager mgr : ggfsCtx.managers())
mgr.start(ggfsCtx);

ggfsCache.put(maskName(cfg.getName()), ggfsCtx);
igfsCache.put(maskName(cfg.getName()), ggfsCtx);
}

if (log.isDebugEnabled())
Expand All @@ -110,15 +110,15 @@ public IgfsProcessor(GridKernalContext ctx) {
checkGgfsOnRemoteNode(n);
}

for (IgfsContext ggfsCtx : ggfsCache.values())
for (IgfsContext ggfsCtx : igfsCache.values())
for (IgfsManager mgr : ggfsCtx.managers())
mgr.onKernalStart();
}

/** {@inheritDoc} */
@Override public void stop(boolean cancel) {
// Stop GGFS instances.
for (IgfsContext ggfsCtx : ggfsCache.values()) {
for (IgfsContext ggfsCtx : igfsCache.values()) {
if (log.isDebugEnabled())
log.debug("Stopping igfs: " + ggfsCtx.configuration().getName());

Expand All @@ -133,15 +133,15 @@ public IgfsProcessor(GridKernalContext ctx) {
ggfsCtx.igfs().stop();
}

ggfsCache.clear();
igfsCache.clear();

if (log.isDebugEnabled())
log.debug("GGFS processor stopped.");
}

/** {@inheritDoc} */
@Override public void onKernalStop(boolean cancel) {
for (IgfsContext ggfsCtx : ggfsCache.values()) {
for (IgfsContext ggfsCtx : igfsCache.values()) {
if (log.isDebugEnabled())
log.debug("Stopping igfs: " + ggfsCtx.configuration().getName());

Expand All @@ -162,27 +162,27 @@ public IgfsProcessor(GridKernalContext ctx) {
@Override public void printMemoryStats() {
X.println(">>>");
X.println(">>> GGFS processor memory stats [grid=" + ctx.gridName() + ']');
X.println(">>> ggfsCacheSize: " + ggfsCache.size());
X.println(">>> ggfsCacheSize: " + igfsCache.size());
}

/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public Collection<IgniteFs> igfss() {
return F.viewReadOnly(ggfsCache.values(), CTX_TO_GGFS);
return F.viewReadOnly(igfsCache.values(), CTX_TO_GGFS);
}

/** {@inheritDoc} */
@Override @Nullable public IgniteFs igfs(@Nullable String name) {
IgfsContext ggfsCtx = ggfsCache.get(maskName(name));
IgfsContext ggfsCtx = igfsCache.get(maskName(name));

return ggfsCtx == null ? null : ggfsCtx.igfs();
}

/** {@inheritDoc} */
@Override @Nullable public Collection<IpcServerEndpoint> endpoints(@Nullable String name) {
IgfsContext ggfsCtx = ggfsCache.get(maskName(name));
IgfsContext igfsCtx = igfsCache.get(maskName(name));

return ggfsCtx == null ? Collections.<IpcServerEndpoint>emptyList() : ggfsCtx.server().endpoints();
return igfsCtx == null ? Collections.<IpcServerEndpoint>emptyList() : igfsCtx.server().endpoints();
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -406,7 +406,7 @@ private void checkGgfsOnRemoteNode(ClusterNode rmtNode) throws IgniteCheckedExce
}
}

private void checkSame(String name, String propName, UUID rmtNodeId, Object rmtVal, Object locVal, String ggfsName)
private void checkSame(String name, String propName, UUID rmtNodeId, Object rmtVal, Object locVal, String igfsName)
throws IgniteCheckedException {
if (!F.eq(rmtVal, locVal))
throw new IgniteCheckedException(name + " should be the same on all nodes in grid for GGFS configuration " +
Expand All @@ -415,6 +415,6 @@ private void checkSame(String name, String propName, UUID rmtNodeId, Object rmtV
"property ) [rmtNodeId=" + rmtNodeId +
", rmt" + propName + "=" + rmtVal +
", loc" + propName + "=" + locVal +
", ggfName=" + ggfsName + ']');
", ggfName=" + igfsName + ']');
}
}
Expand Up @@ -40,7 +40,7 @@
*/
public class IgfsServer {
/** GGFS context. */
private final IgfsContext ggfsCtx;
private final IgfsContext igfsCtx;

/** Logger. */
private final IgniteLogger log;
Expand Down Expand Up @@ -68,19 +68,19 @@ public class IgfsServer {

/**
* Constructs igfs server manager.
* @param ggfsCtx GGFS context.
* @param igfsCtx GGFS context.
* @param endpointCfg Endpoint configuration to start.
* @param mgmt Management flag - if true, server is intended to be started for Visor.
*/
public IgfsServer(IgfsContext ggfsCtx, Map<String, String> endpointCfg, boolean mgmt) {
assert ggfsCtx != null;
public IgfsServer(IgfsContext igfsCtx, Map<String, String> endpointCfg, boolean mgmt) {
assert igfsCtx != null;
assert endpointCfg != null;

this.endpointCfg = endpointCfg;
this.ggfsCtx = ggfsCtx;
this.igfsCtx = igfsCtx;
this.mgmt = mgmt;

log = ggfsCtx.kernalContext().log(IgfsServer.class);
log = igfsCtx.kernalContext().log(IgfsServer.class);

marsh = new IgfsMarshaller();
}
Expand All @@ -105,7 +105,7 @@ public void start() throws IgniteCheckedException {

if (srvEndpoint0.getHost() == null) {
if (mgmt) {
String locHostName = ggfsCtx.kernalContext().config().getLocalHost();
String locHostName = igfsCtx.kernalContext().config().getLocalHost();

try {
srvEndpoint0.setHost(U.resolveLocalHost(locHostName).getHostAddress());
Expand All @@ -120,15 +120,15 @@ public void start() throws IgniteCheckedException {
}
}

ggfsCtx.kernalContext().resource().injectGeneric(srvEndpoint);
igfsCtx.kernalContext().resource().injectGeneric(srvEndpoint);

srvEndpoint.start();

// IpcServerEndpoint.getPort contract states return -1 if there is no port to be registered.
if (srvEndpoint.getPort() >= 0)
ggfsCtx.kernalContext().ports().registerPort(srvEndpoint.getPort(), TCP, srvEndpoint.getClass());
igfsCtx.kernalContext().ports().registerPort(srvEndpoint.getPort(), TCP, srvEndpoint.getClass());

hnd = new IgfsIpcHandler(ggfsCtx);
hnd = new IgfsIpcHandler(igfsCtx);

// Start client accept worker.
acceptWorker = new AcceptWorker();
Expand Down Expand Up @@ -174,10 +174,10 @@ public void stop(boolean cancel) {

// IpcServerEndpoint.getPort contract states return -1 if there is no port to be registered.
if (srvEndpoint.getPort() >= 0)
ggfsCtx.kernalContext().ports().deregisterPort(srvEndpoint.getPort(), TCP, srvEndpoint.getClass());
igfsCtx.kernalContext().ports().deregisterPort(srvEndpoint.getPort(), TCP, srvEndpoint.getClass());

try {
ggfsCtx.kernalContext().resource().cleanupGeneric(srvEndpoint);
igfsCtx.kernalContext().resource().cleanupGeneric(srvEndpoint);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to cleanup server endpoint.", e);
Expand Down Expand Up @@ -217,7 +217,7 @@ private class ClientWorker extends GridWorker {
* @throws IgniteCheckedException If endpoint output stream cannot be obtained.
*/
protected ClientWorker(IpcEndpoint endpoint, int idx) throws IgniteCheckedException {
super(ggfsCtx.kernalContext().gridName(), "igfs-client-worker-" + idx, log);
super(igfsCtx.kernalContext().gridName(), "igfs-client-worker-" + idx, log);

this.endpoint = endpoint;

Expand Down Expand Up @@ -384,7 +384,7 @@ private class AcceptWorker extends GridWorker {
* Creates accept worker.
*/
protected AcceptWorker() {
super(ggfsCtx.kernalContext().gridName(), "igfs-accept-worker", log);
super(igfsCtx.kernalContext().gridName(), "igfs-accept-worker", log);
}

/** {@inheritDoc} */
Expand All @@ -394,7 +394,7 @@ protected AcceptWorker() {
IpcEndpoint client = srvEndpoint.accept();

if (log.isDebugEnabled())
log.debug("GGFS client connected [igfsName=" + ggfsCtx.kernalContext().gridName() +
log.debug("GGFS client connected [igfsName=" + igfsCtx.kernalContext().gridName() +
", client=" + client + ']');

ClientWorker worker = new ClientWorker(client, acceptCnt++);
Expand Down
Expand Up @@ -63,18 +63,18 @@ public VisorIgfs(
}

/**
* @param ggfs Source GGFS.
* @param igfs Source GGFS.
* @return Data transfer object for given GGFS.
* @throws IgniteCheckedException
*/
public static VisorIgfs from(IgniteFs ggfs) throws IgniteCheckedException {
assert ggfs != null;
public static VisorIgfs from(IgniteFs igfs) throws IgniteCheckedException {
assert igfs != null;

return new VisorIgfs(
ggfs.name(),
ggfs.configuration().getDefaultMode(),
VisorIgfsMetrics.from(ggfs.metrics()),
ggfs.configuration().getSecondaryFileSystem() != null
igfs.name(),
igfs.configuration().getDefaultMode(),
VisorIgfsMetrics.from(igfs.metrics()),
igfs.configuration().getSecondaryFileSystem() != null
);
}

Expand Down
Expand Up @@ -30,7 +30,7 @@ public class VisorIgfsEndpoint implements Serializable{
private static final long serialVersionUID = 0L;

/** GGFS name. */
private final String ggfsName;
private final String igfsName;

/** Grid name. */
private final String gridName;
Expand All @@ -43,13 +43,13 @@ public class VisorIgfsEndpoint implements Serializable{

/**
* Create GGFS endpoint descriptor with given parameters.
* @param ggfsName GGFS name.
* @param igfsName GGFS name.
* @param gridName Grid name.
* @param hostName Host address / name.
* @param port Port number.
*/
public VisorIgfsEndpoint(@Nullable String ggfsName, String gridName, @Nullable String hostName, int port) {
this.ggfsName = ggfsName;
public VisorIgfsEndpoint(@Nullable String igfsName, String gridName, @Nullable String hostName, int port) {
this.igfsName = igfsName;
this.gridName = gridName;
this.hostName = hostName;
this.port = port;
Expand All @@ -58,8 +58,8 @@ public VisorIgfsEndpoint(@Nullable String ggfsName, String gridName, @Nullable S
/**
* @return GGFS name.
*/
@Nullable public String ggfsName() {
return ggfsName;
@Nullable public String igfsName() {
return igfsName;
}

/**
Expand Down Expand Up @@ -89,14 +89,14 @@ public int port() {
public String authority() {
String addr = hostName + ":" + port;

if (ggfsName == null && gridName == null)
if (igfsName == null && gridName == null)
return addr;
else if (ggfsName == null)
else if (igfsName == null)
return gridName + "@" + addr;
else if (gridName == null)
return ggfsName + "@" + addr;
return igfsName + "@" + addr;
else
return ggfsName + ":" + gridName + "@" + addr;
return igfsName + ":" + gridName + "@" + addr;
}

/** {@inheritDoc} */
Expand Down
Expand Up @@ -51,12 +51,12 @@ private VisorIgfsFormatJob(String arg, boolean debug) {
}

/** {@inheritDoc} */
@Override protected Void run(String ggfsName) {
@Override protected Void run(String igfsName) {
try {
ignite.fileSystem(ggfsName).format();
ignite.fileSystem(igfsName).format();
}
catch (IllegalArgumentException iae) {
throw new IgniteException("Failed to format IgniteFs: " + ggfsName, iae);
throw new IgniteException("Failed to format IgniteFs: " + igfsName, iae);
}

return null;
Expand Down
Expand Up @@ -38,7 +38,7 @@ public class VisorIgfsProfiler {
* @param entries Entries to sum.
* @return Single aggregated entry.
*/
public static VisorIgfsProfilerEntry aggregateGgfsProfilerEntries(List<VisorIgfsProfilerEntry> entries) {
public static VisorIgfsProfilerEntry aggregateIgfsProfilerEntries(List<VisorIgfsProfilerEntry> entries) {
assert !F.isEmpty(entries);

if (entries.size() == 1)
Expand Down
Expand Up @@ -59,9 +59,9 @@ private VisorIgfsProfilerClearJob(String arg, boolean debug) {
int notDeleted = 0;

try {
IgniteFs ggfs = ignite.fileSystem(arg);
IgniteFs igfs = ignite.fileSystem(arg);

Path logsDir = resolveIgfsProfilerLogsDir(ggfs);
Path logsDir = resolveIgfsProfilerLogsDir(igfs);

if (logsDir != null) {
PathMatcher matcher = FileSystems.getDefault().getPathMatcher(
Expand Down

0 comments on commit 453d9cc

Please sign in to comment.