Skip to content

Commit

Permalink
# IGNITE-226: WIP (9)
Browse files Browse the repository at this point in the history
  • Loading branch information
vozerov-gridgain committed Feb 13, 2015
1 parent 21218e0 commit 31d4e02
Show file tree
Hide file tree
Showing 30 changed files with 181 additions and 181 deletions.
Expand Up @@ -27,7 +27,7 @@
import java.util.*;

/**
* Example that shows how to use {@link IgniteFsTask} to find lines matching particular pattern in the file in pretty
* Example that shows how to use {@link org.apache.ignite.ignitefs.mapreduce.IgfsTask} to find lines matching particular pattern in the file in pretty
* the same way as {@code grep} command does.
* <p>
* Remote nodes should always be started with configuration file which includes
Expand Down Expand Up @@ -71,7 +71,7 @@ else if (args.length == 1)

writeFile(fs, fsPath, file);

Collection<Line> lines = fs.execute(new GrepTask(), IgniteFsNewLineRecordResolver.NEW_LINE,
Collection<Line> lines = fs.execute(new GrepTask(), IgfsNewLineRecordResolver.NEW_LINE,
Collections.singleton(fsPath), regexStr);

if (lines.isEmpty()) {
Expand Down Expand Up @@ -126,10 +126,10 @@ private static void print(String str) {
/**
* Grep task.
*/
private static class GrepTask extends IgniteFsTask<String, Collection<Line>> {
private static class GrepTask extends IgfsTask<String, Collection<Line>> {
/** {@inheritDoc} */
@Override public IgniteFsJob createJob(IgniteFsPath path, IgniteFsFileRange range,
IgniteFsTaskArgs<String> args) {
@Override public IgfsJob createJob(IgniteFsPath path, IgfsFileRange range,
IgfsTaskArgs<String> args) {
return new GrepJob(args.userArgument());
}

Expand Down Expand Up @@ -159,7 +159,7 @@ private static class GrepTask extends IgniteFsTask<String, Collection<Line>> {
/**
* Grep job.
*/
private static class GrepJob extends IgniteFsInputStreamJobAdapter {
private static class GrepJob extends IgfsInputStreamJobAdapter {
/** Regex string. */
private final String regex;

Expand All @@ -173,7 +173,7 @@ private GrepJob(String regex) {
}

/** {@inheritDoc} */
@Override public Object execute(IgniteFs igniteFs, IgniteFsRangeInputStream in) throws IgniteException, IOException {
@Override public Object execute(IgniteFs igniteFs, IgfsRangeInputStream in) throws IgniteException, IOException {
Collection<Line> res = null;

long start = in.startOffset();
Expand Down
12 changes: 6 additions & 6 deletions modules/core/src/main/java/org/apache/ignite/IgniteFs.java
Expand Up @@ -279,7 +279,7 @@ public Collection<IgniteFsBlockLocation> affinity(IgniteFsPath path, long start,
* @throws IgniteException If execution failed.
*/
@IgniteAsyncSupported
public <T, R> R execute(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr,
public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
Collection<IgniteFsPath> paths, @Nullable T arg) throws IgniteException;

/**
Expand All @@ -300,7 +300,7 @@ public <T, R> R execute(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolve
* @throws IgniteException If execution failed.
*/
@IgniteAsyncSupported
public <T, R> R execute(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr,
public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg)
throws IgniteException;

Expand All @@ -317,8 +317,8 @@ public <T, R> R execute(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolve
* @throws IgniteException If execution failed.
*/
@IgniteAsyncSupported
public <T, R> R execute(Class<? extends IgniteFsTask<T, R>> taskCls,
@Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) throws IgniteException;
public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls,
@Nullable IgfsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) throws IgniteException;

/**
* Executes GGFS task with overridden maximum range length (see
Expand All @@ -337,8 +337,8 @@ public <T, R> R execute(Class<? extends IgniteFsTask<T, R>> taskCls,
* @throws IgniteException If execution failed.
*/
@IgniteAsyncSupported
public <T, R> R execute(Class<? extends IgniteFsTask<T, R>> taskCls,
@Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles,
public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls,
@Nullable IgfsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles,
long maxRangeLen, @Nullable T arg) throws IgniteException;

/** {@inheritDoc} */
Expand Down
Expand Up @@ -762,13 +762,13 @@ public void setDualModeMaxPendingPutsSize(long dualModeMaxPendingPutsSize) {

/**
* Get maximum default range size of a file being split during GGFS task execution. When GGFS task is about to
* be executed, it requests file block locations first. Each location is defined as {@link org.apache.ignite.ignitefs.mapreduce.IgniteFsFileRange} which
* be executed, it requests file block locations first. Each location is defined as {@link org.apache.ignite.ignitefs.mapreduce.IgfsFileRange} which
* has length. In case this parameter is set to positive value, then GGFS will split single file range into smaller
* ranges with length not greater that this parameter. The only exception to this case is when maximum task range
* length is smaller than file block size. In this case maximum task range size will be overridden and set to file
* block size.
* <p>
* Note that this parameter is applied when task is split into jobs before {@link org.apache.ignite.ignitefs.mapreduce.IgniteFsRecordResolver} is
* Note that this parameter is applied when task is split into jobs before {@link org.apache.ignite.ignitefs.mapreduce.IgfsRecordResolver} is
* applied. Therefore, final file ranges being assigned to particular jobs could be greater than value of this
* parameter depending on file data layout and selected resolver type.
* <p>
Expand Down
Expand Up @@ -23,7 +23,7 @@
/**
* Entity representing part of GGFS file identified by file path, start position, and length.
*/
public class IgniteFsFileRange {
public class IgfsFileRange {
/** File path. */
private IgniteFsPath path;

Expand All @@ -40,7 +40,7 @@ public class IgniteFsFileRange {
* @param start Start position.
* @param len Length.
*/
public IgniteFsFileRange(IgniteFsPath path, long start, long len) {
public IgfsFileRange(IgniteFsPath path, long start, long len) {
this.path = path;
this.start = start;
this.len = len;
Expand Down Expand Up @@ -75,6 +75,6 @@ public long length() {

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IgniteFsFileRange.class, this);
return S.toString(IgfsFileRange.class, this);
}
}
Expand Up @@ -24,19 +24,19 @@
import java.io.*;

/**
* Convenient {@link IgniteFsJob} adapter. It limits data returned from {@link org.apache.ignite.ignitefs.IgniteFsInputStream} to bytes within
* the {@link IgniteFsFileRange} assigned to the job.
* Convenient {@link IgfsJob} adapter. It limits data returned from {@link org.apache.ignite.ignitefs.IgniteFsInputStream} to bytes within
* the {@link IgfsFileRange} assigned to the job.
* <p>
* Under the covers it simply puts job's {@code GridGgfsInputStream} position to range start and wraps in into
* {@link GridFixedSizeInputStream} limited to range length.
*/
public abstract class IgniteFsInputStreamJobAdapter extends IgniteFsJobAdapter {
public abstract class IgfsInputStreamJobAdapter extends IgfsJobAdapter {
/** {@inheritDoc} */
@Override public final Object execute(IgniteFs ggfs, IgniteFsFileRange range, IgniteFsInputStream in)
@Override public final Object execute(IgniteFs ggfs, IgfsFileRange range, IgniteFsInputStream in)
throws IgniteException, IOException {
in.seek(range.start());

return execute(ggfs, new IgniteFsRangeInputStream(in, range));
return execute(ggfs, new IgfsRangeInputStream(in, range));
}

/**
Expand All @@ -48,5 +48,5 @@ public abstract class IgniteFsInputStreamJobAdapter extends IgniteFsJobAdapter {
* @throws IgniteException If execution failed.
* @throws IOException If IO exception encountered while working with stream.
*/
public abstract Object execute(IgniteFs ggfs, IgniteFsRangeInputStream in) throws IgniteException, IOException;
public abstract Object execute(IgniteFs ggfs, IgfsRangeInputStream in) throws IgniteException, IOException;
}
Expand Up @@ -23,24 +23,24 @@
import java.io.*;

/**
* Defines executable unit for {@link IgniteFsTask}. Before this job is executed, it is assigned one of the
* ranges provided by the {@link IgniteFsRecordResolver} passed to one of the {@code GridGgfs.execute(...)} methods.
* Defines executable unit for {@link IgfsTask}. Before this job is executed, it is assigned one of the
* ranges provided by the {@link IgfsRecordResolver} passed to one of the {@code GridGgfs.execute(...)} methods.
* <p>
* {@link #execute(org.apache.ignite.IgniteFs, IgniteFsFileRange, org.apache.ignite.ignitefs.IgniteFsInputStream)} method is given {@link IgniteFsFileRange} this
* {@link #execute(org.apache.ignite.IgniteFs, IgfsFileRange, org.apache.ignite.ignitefs.IgniteFsInputStream)} method is given {@link IgfsFileRange} this
* job is expected to operate on, and already opened {@link org.apache.ignite.ignitefs.IgniteFsInputStream} for the file this range belongs to.
* <p>
* Note that provided input stream has position already adjusted to range start. However, it will not
* automatically stop on range end. This is done to provide capability in some cases to look beyond
* the range end or seek position before the reange start.
* <p>
* In majority of the cases, when you want to process only provided range, you should explicitly control amount
* of returned data and stop at range end. You can also use {@link IgniteFsInputStreamJobAdapter}, which operates
* on {@link IgniteFsRangeInputStream} bounded to range start and end, or manually wrap provided input stream with
* {@link IgniteFsRangeInputStream}.
* of returned data and stop at range end. You can also use {@link IgfsInputStreamJobAdapter}, which operates
* on {@link IgfsRangeInputStream} bounded to range start and end, or manually wrap provided input stream with
* {@link IgfsRangeInputStream}.
* <p>
* You can inject any resources in concrete implementation, just as with regular {@link org.apache.ignite.compute.ComputeJob} implementations.
*/
public interface IgniteFsJob {
public interface IgfsJob {
/**
* Executes this job.
*
Expand All @@ -52,7 +52,7 @@ public interface IgniteFsJob {
* @throws IgniteException If execution failed.
* @throws IOException If file system operation resulted in IO exception.
*/
public Object execute(IgniteFs ggfs, IgniteFsFileRange range, IgniteFsInputStream in) throws IgniteException,
public Object execute(IgniteFs ggfs, IgfsFileRange range, IgniteFsInputStream in) throws IgniteException,
IOException;

/**
Expand Down
Expand Up @@ -18,9 +18,9 @@
package org.apache.ignite.ignitefs.mapreduce;

/**
* Adapter for {@link IgniteFsJob} with no-op implementation of {@link #cancel()} method.
* Adapter for {@link IgfsJob} with no-op implementation of {@link #cancel()} method.
*/
public abstract class IgniteFsJobAdapter implements IgniteFsJob {
public abstract class IgfsJobAdapter implements IgfsJob {
/** {@inheritDoc} */
@Override public void cancel() {
// No-op.
Expand Down
Expand Up @@ -25,11 +25,11 @@

/**
* Decorator for regular {@link org.apache.ignite.ignitefs.IgniteFsInputStream} which streams only data within the given range.
* This stream is used for {@link IgniteFsInputStreamJobAdapter} convenience adapter to create
* This stream is used for {@link IgfsInputStreamJobAdapter} convenience adapter to create
* jobs which will be working only with the assigned range. You can also use it explicitly when
* working with {@link IgniteFsJob} directly.
* working with {@link IgfsJob} directly.
*/
public final class IgniteFsRangeInputStream extends IgniteFsInputStream {
public final class IgfsRangeInputStream extends IgniteFsInputStream {
/** Base input stream. */
private final IgniteFsInputStream is;

Expand All @@ -50,7 +50,7 @@ public final class IgniteFsRangeInputStream extends IgniteFsInputStream {
* @param maxLen Maximum stream length.
* @throws IOException In case of exception.
*/
public IgniteFsRangeInputStream(IgniteFsInputStream is, long start, long maxLen) throws IOException {
public IgfsRangeInputStream(IgniteFsInputStream is, long start, long maxLen) throws IOException {
if (is == null)
throw new IllegalArgumentException("Input stream cannot be null.");

Expand Down Expand Up @@ -85,7 +85,7 @@ public IgniteFsRangeInputStream(IgniteFsInputStream is, long start, long maxLen)
* @param range File range.
* @throws IOException In case of exception.
*/
public IgniteFsRangeInputStream(IgniteFsInputStream is, IgniteFsFileRange range) throws IOException {
public IgfsRangeInputStream(IgniteFsInputStream is, IgfsFileRange range) throws IOException {
this(is, range.start(), range.length());
}

Expand Down Expand Up @@ -192,6 +192,6 @@ public long startOffset() {

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IgniteFsRangeInputStream.class, this);
return S.toString(IgfsRangeInputStream.class, this);
}
}
Expand Up @@ -24,8 +24,8 @@
import java.io.*;

/**
* GGFS record resolver. When {@link IgniteFsTask} is split into {@link IgniteFsJob}s each produced job will obtain
* {@link IgniteFsFileRange} based on file data location. Record resolver is invoked in each job before actual
* GGFS record resolver. When {@link IgfsTask} is split into {@link IgfsJob}s each produced job will obtain
* {@link IgfsFileRange} based on file data location. Record resolver is invoked in each job before actual
* execution in order to adjust record boundaries in a way consistent with user data.
* <p>
* E.g., you may want to split your task into jobs so that each job process zero, one or several lines from that file.
Expand All @@ -34,13 +34,13 @@
* <p>
* The following record resolvers are available out of the box:
* <ul>
* <li>{@link org.apache.ignite.ignitefs.mapreduce.records.IgniteFsFixedLengthRecordResolver}</li>
* <li>{@link org.apache.ignite.ignitefs.mapreduce.records.IgniteFsByteDelimiterRecordResolver}</li>
* <li>{@link org.apache.ignite.ignitefs.mapreduce.records.IgniteFsStringDelimiterRecordResolver}</li>
* <li>{@link org.apache.ignite.ignitefs.mapreduce.records.IgniteFsNewLineRecordResolver}</li>
* <li>{@link org.apache.ignite.ignitefs.mapreduce.records.IgfsFixedLengthRecordResolver}</li>
* <li>{@link org.apache.ignite.ignitefs.mapreduce.records.IgfsByteDelimiterRecordResolver}</li>
* <li>{@link org.apache.ignite.ignitefs.mapreduce.records.IgfsStringDelimiterRecordResolver}</li>
* <li>{@link org.apache.ignite.ignitefs.mapreduce.records.IgfsNewLineRecordResolver}</li>
* </ul>
*/
public interface IgniteFsRecordResolver extends Serializable {
public interface IgfsRecordResolver extends Serializable {
/**
* Adjusts record start offset and length.
*
Expand All @@ -51,6 +51,6 @@ public interface IgniteFsRecordResolver extends Serializable {
* @throws IgniteException If resolve failed.
* @throws IOException If resolve failed.
*/
@Nullable public IgniteFsFileRange resolveRecords(IgniteFs fs, IgniteFsInputStream stream,
IgniteFsFileRange suggestedRecord) throws IgniteException, IOException;
@Nullable public IgfsFileRange resolveRecords(IgniteFs fs, IgniteFsInputStream stream,
IgfsFileRange suggestedRecord) throws IgniteException, IOException;
}
Expand Up @@ -33,15 +33,15 @@
* GGFS task which can be executed on the grid using one of {@code GridGgfs.execute()} methods. Essentially GGFS task
* is regular {@link org.apache.ignite.compute.ComputeTask} with different map logic. Instead of implementing
* {@link org.apache.ignite.compute.ComputeTask#map(List, Object)} method to split task into jobs, you must implement
* {@link IgniteFsTask#createJob(org.apache.ignite.ignitefs.IgniteFsPath, IgniteFsFileRange, IgniteFsTaskArgs)} method.
* {@link IgfsTask#createJob(org.apache.ignite.ignitefs.IgniteFsPath, IgfsFileRange, IgfsTaskArgs)} method.
* <p>
* Each file participating in GGFS task is split into {@link IgniteFsFileRange}s first. Normally range is a number of
* Each file participating in GGFS task is split into {@link IgfsFileRange}s first. Normally range is a number of
* consequent bytes located on a single node (see {@code IgniteFsGroupDataBlocksKeyMapper}). In case maximum range size
* is provided (either through {@link org.apache.ignite.configuration.IgniteFsConfiguration#getMaximumTaskRangeLength()} or {@code GridGgfs.execute()}
* argument), then ranges could be further divided into smaller chunks.
* <p>
* Once file is split into ranges, each range is passed to {@code GridGgfsTask.createJob()} method in order to create a
* {@link IgniteFsJob}.
* {@link IgfsJob}.
* <p>
* Finally all generated jobs are sent to Grid nodes for execution.
* <p>
Expand Down Expand Up @@ -74,7 +74,7 @@
* }
* </pre>
*/
public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<IgniteFsTaskArgs<T>, R> {
public abstract class IgfsTask<T, R> extends ComputeTaskAdapter<IgfsTaskArgs<T>, R> {
/** */
private static final long serialVersionUID = 0L;

Expand All @@ -84,7 +84,7 @@ public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<IgniteFsTask

/** {@inheritDoc} */
@Nullable @Override public final Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
@Nullable IgniteFsTaskArgs<T> args) {
@Nullable IgfsTaskArgs<T> args) {
assert ignite != null;
assert args != null;

Expand Down Expand Up @@ -123,7 +123,7 @@ public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<IgniteFsTask
throw new IgniteException("Failed to find any of block affinity nodes in subgrid [loc=" + loc +
", subgrid=" + subgrid + ']');

IgniteFsJob job = createJob(path, new IgniteFsFileRange(file.path(), loc.start(), loc.length()), args);
IgfsJob job = createJob(path, new IgfsFileRange(file.path(), loc.start(), loc.length()), args);

if (job != null) {
ComputeJob jobImpl = ggfsProc.createJob(job, fs.name(), file.path(), loc.start(),
Expand Down Expand Up @@ -152,8 +152,8 @@ public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<IgniteFsTask
* @return GGFS job. If {@code null} is returned, the passed in file range will be skipped.
* @throws IgniteException If job creation failed.
*/
@Nullable public abstract IgniteFsJob createJob(IgniteFsPath path, IgniteFsFileRange range,
IgniteFsTaskArgs<T> args) throws IgniteException;
@Nullable public abstract IgfsJob createJob(IgniteFsPath path, IgfsFileRange range,
IgfsTaskArgs<T> args) throws IgniteException;

/**
* Maps list by node ID.
Expand Down

0 comments on commit 31d4e02

Please sign in to comment.