Skip to content

Commit

Permalink
# IGNITE-226: WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
vozerov-gridgain committed Feb 13, 2015
1 parent 8d6347d commit a79055c
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 63 deletions.
4 changes: 2 additions & 2 deletions docs/core-site.ignite.xml
Expand Up @@ -48,15 +48,15 @@
--> -->
<property> <property>
<name>fs.igfs.impl</name> <name>fs.igfs.impl</name>
<value>org.apache.ignite.igfs.hadoop.v1.GridGgfsHadoopFileSystem</value> <value>org.apache.ignite.igfs.hadoop.v1.IgfsHadoopFileSystem</value>
</property> </property>


<!-- <!--
Set Hadoop 2.* file system implementation class for IgniteFS. Set Hadoop 2.* file system implementation class for IgniteFS.
--> -->
<property> <property>
<name>fs.AbstractFileSystem.igfs.impl</name> <name>fs.AbstractFileSystem.igfs.impl</name>
<value>org.apache.ignite.igfs.hadoop.v2.GridGgfsHadoopFileSystem</value> <value>org.apache.ignite.igfs.hadoop.v2.IgfsHadoopFileSystem</value>
</property> </property>


<!-- <!--
Expand Down
4 changes: 2 additions & 2 deletions examples/config/filesystem/core-site.xml
Expand Up @@ -31,12 +31,12 @@
<property> <property>
<!-- FS driver class for the 'igfs://' URIs. --> <!-- FS driver class for the 'igfs://' URIs. -->
<name>fs.igfs.impl</name> <name>fs.igfs.impl</name>
<value>org.apache.ignite.igfs.hadoop.v1.GridGgfsHadoopFileSystem</value> <value>org.apache.ignite.igfs.hadoop.v1.IgfsHadoopFileSystem</value>
</property> </property>


<property> <property>
<!-- FS driver class for the 'igfs://' URIs in Hadoop2.x --> <!-- FS driver class for the 'igfs://' URIs in Hadoop2.x -->
<name>fs.AbstractFileSystem.igfs.impl</name> <name>fs.AbstractFileSystem.igfs.impl</name>
<value>org.apache.ignite.igfs.hadoop.v2.GridGgfsHadoopFileSystem</value> <value>org.apache.ignite.igfs.hadoop.v2.IgfsHadoopFileSystem</value>
</property> </property>
</configuration> </configuration>
Expand Up @@ -108,7 +108,7 @@ public VisorGridConfiguration from(IgniteEx ignite) {
rest(VisorRestConfiguration.from(c)); rest(VisorRestConfiguration.from(c));
userAttributes(c.getUserAttributes()); userAttributes(c.getUserAttributes());
caches(VisorCacheConfiguration.list(ignite, c.getCacheConfiguration())); caches(VisorCacheConfiguration.list(ignite, c.getCacheConfiguration()));
ggfss(VisorIgfsConfiguration.list(c.getIgfsConfiguration())); igfss(VisorIgfsConfiguration.list(c.getIgfsConfiguration()));
streamers(VisorStreamerConfiguration.list(c.getStreamerConfiguration())); streamers(VisorStreamerConfiguration.list(c.getStreamerConfiguration()));
env(new HashMap<>(getenv())); env(new HashMap<>(getenv()));
systemProperties(getProperties()); systemProperties(getProperties());
Expand Down Expand Up @@ -308,7 +308,7 @@ public Iterable<VisorIgfsConfiguration> igfss() {
/** /**
* @param igfss New igfss. * @param igfss New igfss.
*/ */
public void ggfss(Iterable<VisorIgfsConfiguration> igfss) { public void igfss(Iterable<VisorIgfsConfiguration> igfss) {
this.igfss = igfss; this.igfss = igfss;
} }


Expand Down
Expand Up @@ -122,22 +122,22 @@ public class VisorIgfsConfiguration implements Serializable {
private long trashPurgeTimeout; private long trashPurgeTimeout;


/** /**
* @param ggfs IGFS configuration. * @param igfs IGFS configuration.
* @return Data transfer object for IGFS configuration properties. * @return Data transfer object for IGFS configuration properties.
*/ */
public static VisorIgfsConfiguration from(IgfsConfiguration ggfs) { public static VisorIgfsConfiguration from(IgfsConfiguration igfs) {
VisorIgfsConfiguration cfg = new VisorIgfsConfiguration(); VisorIgfsConfiguration cfg = new VisorIgfsConfiguration();


cfg.name(ggfs.getName()); cfg.name(igfs.getName());
cfg.metaCacheName(ggfs.getMetaCacheName()); cfg.metaCacheName(igfs.getMetaCacheName());
cfg.dataCacheName(ggfs.getDataCacheName()); cfg.dataCacheName(igfs.getDataCacheName());
cfg.blockSize(ggfs.getBlockSize()); cfg.blockSize(igfs.getBlockSize());
cfg.prefetchBlocks(ggfs.getPrefetchBlocks()); cfg.prefetchBlocks(igfs.getPrefetchBlocks());
cfg.streamBufferSize(ggfs.getStreamBufferSize()); cfg.streamBufferSize(igfs.getStreamBufferSize());
cfg.perNodeBatchSize(ggfs.getPerNodeBatchSize()); cfg.perNodeBatchSize(igfs.getPerNodeBatchSize());
cfg.perNodeParallelBatchCount(ggfs.getPerNodeParallelBatchCount()); cfg.perNodeParallelBatchCount(igfs.getPerNodeParallelBatchCount());


Igfs secFs = ggfs.getSecondaryFileSystem(); Igfs secFs = igfs.getSecondaryFileSystem();


if (secFs != null) { if (secFs != null) {
Map<String, String> props = secFs.properties(); Map<String, String> props = secFs.properties();
Expand All @@ -146,44 +146,44 @@ public static VisorIgfsConfiguration from(IgfsConfiguration ggfs) {
cfg.secondaryHadoopFileSystemConfigPath(props.get(SECONDARY_FS_CONFIG_PATH)); cfg.secondaryHadoopFileSystemConfigPath(props.get(SECONDARY_FS_CONFIG_PATH));
} }


cfg.defaultMode(ggfs.getDefaultMode()); cfg.defaultMode(igfs.getDefaultMode());
cfg.pathModes(ggfs.getPathModes()); cfg.pathModes(igfs.getPathModes());
cfg.dualModePutExecutorService(compactClass(ggfs.getDualModePutExecutorService())); cfg.dualModePutExecutorService(compactClass(igfs.getDualModePutExecutorService()));
cfg.dualModePutExecutorServiceShutdown(ggfs.getDualModePutExecutorServiceShutdown()); cfg.dualModePutExecutorServiceShutdown(igfs.getDualModePutExecutorServiceShutdown());
cfg.dualModeMaxPendingPutsSize(ggfs.getDualModeMaxPendingPutsSize()); cfg.dualModeMaxPendingPutsSize(igfs.getDualModeMaxPendingPutsSize());
cfg.maxTaskRangeLength(ggfs.getMaximumTaskRangeLength()); cfg.maxTaskRangeLength(igfs.getMaximumTaskRangeLength());
cfg.fragmentizerConcurrentFiles(ggfs.getFragmentizerConcurrentFiles()); cfg.fragmentizerConcurrentFiles(igfs.getFragmentizerConcurrentFiles());
cfg.fragmentizerLocalWritesRatio(ggfs.getFragmentizerLocalWritesRatio()); cfg.fragmentizerLocalWritesRatio(igfs.getFragmentizerLocalWritesRatio());
cfg.fragmentizerEnabled(ggfs.isFragmentizerEnabled()); cfg.fragmentizerEnabled(igfs.isFragmentizerEnabled());
cfg.fragmentizerThrottlingBlockLength(ggfs.getFragmentizerThrottlingBlockLength()); cfg.fragmentizerThrottlingBlockLength(igfs.getFragmentizerThrottlingBlockLength());
cfg.fragmentizerThrottlingDelay(ggfs.getFragmentizerThrottlingDelay()); cfg.fragmentizerThrottlingDelay(igfs.getFragmentizerThrottlingDelay());


Map<String, String> endpointCfg = ggfs.getIpcEndpointConfiguration(); Map<String, String> endpointCfg = igfs.getIpcEndpointConfiguration();
cfg.ipcEndpointConfiguration(endpointCfg != null ? endpointCfg.toString() : null); cfg.ipcEndpointConfiguration(endpointCfg != null ? endpointCfg.toString() : null);


cfg.ipcEndpointEnabled(ggfs.isIpcEndpointEnabled()); cfg.ipcEndpointEnabled(igfs.isIpcEndpointEnabled());
cfg.maxSpace(ggfs.getMaxSpaceSize()); cfg.maxSpace(igfs.getMaxSpaceSize());
cfg.managementPort(ggfs.getManagementPort()); cfg.managementPort(igfs.getManagementPort());
cfg.sequenceReadsBeforePrefetch(ggfs.getSequentialReadsBeforePrefetch()); cfg.sequenceReadsBeforePrefetch(igfs.getSequentialReadsBeforePrefetch());
cfg.trashPurgeTimeout(ggfs.getTrashPurgeTimeout()); cfg.trashPurgeTimeout(igfs.getTrashPurgeTimeout());


return cfg; return cfg;
} }


/** /**
* Construct data transfer object for igfs configurations properties. * Construct data transfer object for igfs configurations properties.
* *
* @param ggfss igfs configurations. * @param igfss Igfs configurations.
* @return igfs configurations properties. * @return igfs configurations properties.
*/ */
public static Iterable<VisorIgfsConfiguration> list(IgfsConfiguration[] ggfss) { public static Iterable<VisorIgfsConfiguration> list(IgfsConfiguration[] igfss) {
if (ggfss == null) if (igfss == null)
return Collections.emptyList(); return Collections.emptyList();


final Collection<VisorIgfsConfiguration> cfgs = new ArrayList<>(ggfss.length); final Collection<VisorIgfsConfiguration> cfgs = new ArrayList<>(igfss.length);


for (IgfsConfiguration ggfs : ggfss) for (IgfsConfiguration igfs : igfss)
cfgs.add(from(ggfs)); cfgs.add(from(igfs));


return cfgs; return cfgs;
} }
Expand Down
Expand Up @@ -123,13 +123,13 @@ private void caches(VisorNodeDataCollectorJobResult res, VisorNodeDataCollectorT
/** Collect IGFS. */ /** Collect IGFS. */
private void igfs(VisorNodeDataCollectorJobResult res) { private void igfs(VisorNodeDataCollectorJobResult res) {
try { try {
IgfsProcessorAdapter ggfsProc = ((IgniteKernal)ignite).context().igfs(); IgfsProcessorAdapter igfsProc = ((IgniteKernal)ignite).context().igfs();


for (IgniteFs igfs : ggfsProc.igfss()) { for (IgniteFs igfs : igfsProc.igfss()) {
long start0 = U.currentTimeMillis(); long start0 = U.currentTimeMillis();


try { try {
Collection<IpcServerEndpoint> endPoints = ggfsProc.endpoints(igfs.name()); Collection<IpcServerEndpoint> endPoints = igfsProc.endpoints(igfs.name());


if (endPoints != null) { if (endPoints != null) {
for (IpcServerEndpoint ep : endPoints) for (IpcServerEndpoint ep : endPoints)
Expand Down
Expand Up @@ -54,13 +54,13 @@ public class VisorNodeDataCollectorJobResult implements Serializable {
private Throwable cachesEx; private Throwable cachesEx;


/** Node IGFSs. */ /** Node IGFSs. */
private final Collection<VisorIgfs> ggfss = new ArrayList<>(); private final Collection<VisorIgfs> igfss = new ArrayList<>();


/** All IGFS endpoints collected from nodes. */ /** All IGFS endpoints collected from nodes. */
private final Collection<VisorIgfsEndpoint> ggfsEndpoints = new ArrayList<>(); private final Collection<VisorIgfsEndpoint> igfsEndpoints = new ArrayList<>();


/** Exception while collecting node IGFSs. */ /** Exception while collecting node IGFSs. */
private Throwable ggfssEx; private Throwable igfssEx;


/** Node streamers. */ /** Node streamers. */
private final Collection<VisorStreamer> streamers = new ArrayList<>(); private final Collection<VisorStreamer> streamers = new ArrayList<>();
Expand Down Expand Up @@ -129,19 +129,19 @@ public void cachesEx(Throwable cachesEx) {
} }


public Collection<VisorIgfs> igfss() { public Collection<VisorIgfs> igfss() {
return ggfss; return igfss;
} }


public Collection<VisorIgfsEndpoint> igfsEndpoints() { public Collection<VisorIgfsEndpoint> igfsEndpoints() {
return ggfsEndpoints; return igfsEndpoints;
} }


public Throwable igfssEx() { public Throwable igfssEx() {
return ggfssEx; return igfssEx;
} }


public void igfssEx(Throwable ggfssEx) { public void igfssEx(Throwable igfssEx) {
this.ggfssEx = ggfssEx; this.igfssEx = igfssEx;
} }


public Collection<VisorStreamer> streamers() { public Collection<VisorStreamer> streamers() {
Expand Down
Expand Up @@ -57,13 +57,13 @@ public class VisorNodeDataCollectorTaskResult implements Serializable {
private final Map<UUID, Throwable> cachesEx = new HashMap<>(); private final Map<UUID, Throwable> cachesEx = new HashMap<>();


/** All IGFS collected from nodes. */ /** All IGFS collected from nodes. */
private final Map<UUID, Collection<VisorIgfs>> ggfss = new HashMap<>(); private final Map<UUID, Collection<VisorIgfs>> igfss = new HashMap<>();


/** All IGFS endpoints collected from nodes. */ /** All IGFS endpoints collected from nodes. */
private final Map<UUID, Collection<VisorIgfsEndpoint>> ggfsEndpoints = new HashMap<>(); private final Map<UUID, Collection<VisorIgfsEndpoint>> igfsEndpoints = new HashMap<>();


/** Exceptions caught during collecting IGFS from nodes. */ /** Exceptions caught during collecting IGFS from nodes. */
private final Map<UUID, Throwable> ggfssEx = new HashMap<>(); private final Map<UUID, Throwable> igfssEx = new HashMap<>();


/** All streamers collected from nodes. */ /** All streamers collected from nodes. */
private final Map<UUID, Collection<VisorStreamer>> streamers = new HashMap<>(); private final Map<UUID, Collection<VisorStreamer>> streamers = new HashMap<>();
Expand All @@ -84,9 +84,9 @@ public boolean isEmpty() {
eventsEx.isEmpty() && eventsEx.isEmpty() &&
caches.isEmpty() && caches.isEmpty() &&
cachesEx.isEmpty() && cachesEx.isEmpty() &&
ggfss.isEmpty() && igfss.isEmpty() &&
ggfsEndpoints.isEmpty() && igfsEndpoints.isEmpty() &&
ggfssEx.isEmpty() && igfssEx.isEmpty() &&
streamers.isEmpty() && streamers.isEmpty() &&
streamersEx.isEmpty(); streamersEx.isEmpty();
} }
Expand Down Expand Up @@ -151,21 +151,21 @@ public Map<UUID, Throwable> cachesEx() {
* @return All IGFS collected from nodes. * @return All IGFS collected from nodes.
*/ */
public Map<UUID, Collection<VisorIgfs>> igfss() { public Map<UUID, Collection<VisorIgfs>> igfss() {
return ggfss; return igfss;
} }


/** /**
* @return All IGFS endpoints collected from nodes. * @return All IGFS endpoints collected from nodes.
*/ */
public Map<UUID, Collection<VisorIgfsEndpoint>> igfsEndpoints() { public Map<UUID, Collection<VisorIgfsEndpoint>> igfsEndpoints() {
return ggfsEndpoints; return igfsEndpoints;
} }


/** /**
* @return Exceptions caught during collecting IGFS from nodes. * @return Exceptions caught during collecting IGFS from nodes.
*/ */
public Map<UUID, Throwable> igfssEx() { public Map<UUID, Throwable> igfssEx() {
return ggfssEx; return igfssEx;
} }


/** /**
Expand Down
Expand Up @@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */


package org.apache.ignite.loadtests.ggfs; package org.apache.ignite.loadtests.igfs;


import org.apache.ignite.*; import org.apache.ignite.*;
import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.*;
Expand Down
Expand Up @@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */


package org.apache.ignite.loadtests.ggfs; package org.apache.ignite.loadtests.igfs;


import org.apache.hadoop.conf.*; import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.*;
Expand Down

0 comments on commit a79055c

Please sign in to comment.