Skip to content

Commit

Permalink
IGNITE-2206: Hadoop file system creation is now abstracted out using …
Browse files Browse the repository at this point in the history
…factory interface.
  • Loading branch information
vozerov-gridgain committed Jan 5, 2016
1 parent 7d58d14 commit 8ed73b4
Show file tree
Hide file tree
Showing 26 changed files with 1,191 additions and 552 deletions.
Expand Up @@ -34,24 +34,24 @@ public abstract class IgfsUserContext {
* The main contract of this method is that {@link #currentUser()} method invoked * The main contract of this method is that {@link #currentUser()} method invoked
* inside closure always returns 'user' this callable executed with. * inside closure always returns 'user' this callable executed with.
* @param user the user name to invoke closure on behalf of. * @param user the user name to invoke closure on behalf of.
* @param clo the closure to execute * @param c the closure to execute
* @param <T> The type of closure result. * @param <T> The type of closure result.
* @return the result of closure execution. * @return the result of closure execution.
* @throws IllegalArgumentException if user name is null or empty String or if the closure is null. * @throws IllegalArgumentException if user name is null or empty String or if the closure is null.
*/ */
public static <T> T doAs(String user, final IgniteOutClosure<T> clo) { public static <T> T doAs(String user, final IgniteOutClosure<T> c) {
if (F.isEmpty(user)) if (F.isEmpty(user))
throw new IllegalArgumentException("Failed to use null or empty user name."); throw new IllegalArgumentException("Failed to use null or empty user name.");


final String ctxUser = userStackThreadLocal.get(); final String ctxUser = userStackThreadLocal.get();


if (F.eq(ctxUser, user)) if (F.eq(ctxUser, user))
return clo.apply(); // correct context is already there return c.apply(); // correct context is already there


userStackThreadLocal.set(user); userStackThreadLocal.set(user);


try { try {
return clo.apply(); return c.apply();
} }
finally { finally {
userStackThreadLocal.set(ctxUser); userStackThreadLocal.set(ctxUser);
Expand Down Expand Up @@ -81,24 +81,24 @@ public static <T> T doAs(String user, final IgniteOutClosure<T> clo) {
* } * }
* </pre> * </pre>
* @param user the user name to invoke closure on behalf of. * @param user the user name to invoke closure on behalf of.
* @param clbl the Callable to execute * @param c the Callable to execute
* @param <T> The type of callable result. * @param <T> The type of callable result.
* @return the result of closure execution. * @return the result of closure execution.
* @throws IllegalArgumentException if user name is null or empty String or if the closure is null. * @throws IllegalArgumentException if user name is null or empty String or if the closure is null.
*/ */
public static <T> T doAs(String user, final Callable<T> clbl) throws Exception { public static <T> T doAs(String user, final Callable<T> c) throws Exception {
if (F.isEmpty(user)) if (F.isEmpty(user))
throw new IllegalArgumentException("Failed to use null or empty user name."); throw new IllegalArgumentException("Failed to use null or empty user name.");


final String ctxUser = userStackThreadLocal.get(); final String ctxUser = userStackThreadLocal.get();


if (F.eq(ctxUser, user)) if (F.eq(ctxUser, user))
return clbl.call(); // correct context is already there return c.call(); // correct context is already there


userStackThreadLocal.set(user); userStackThreadLocal.set(user);


try { try {
return clbl.call(); return c.call();
} }
finally { finally {
userStackThreadLocal.set(ctxUser); userStackThreadLocal.set(ctxUser);
Expand Down
Expand Up @@ -192,18 +192,4 @@ public OutputStream append(IgfsPath path, int bufSize, boolean create, @Nullable
* @throws IgniteException In case of error. * @throws IgniteException In case of error.
*/ */
public long usedSpaceSize() throws IgniteException; public long usedSpaceSize() throws IgniteException;

/**
* Gets the implementation specific properties of file system.
*
* @return Map of properties.
*/
public Map<String,String> properties();


/**
* Closes the secondary file system.
* @throws IgniteException in case of an error.
*/
public void close() throws IgniteException;
} }
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.hadoop;

/**
* Gets payload for Hadoop secondary file system.
*/
public interface HadoopPayloadAware {
/**
* @return Payload.
*/
public Object getPayload();
}
Expand Up @@ -43,19 +43,6 @@ public interface IgfsEx extends IgniteFileSystem {
/** File property: prefer writes to local node. */ /** File property: prefer writes to local node. */
public static final String PROP_PREFER_LOCAL_WRITES = "locWrite"; public static final String PROP_PREFER_LOCAL_WRITES = "locWrite";


/** Property name for path to Hadoop configuration. */
public static final String SECONDARY_FS_CONFIG_PATH = "SECONDARY_FS_CONFIG_PATH";

/** Property name for URI of file system. */
public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI";

/** Property name for default user name of file system.
* NOTE: for secondary file system this is just a default user name, which is used
* when the 2ndary filesystem is used outside of any user context.
* If another user name is set in the context, 2ndary file system will work on behalf
* of that user, which is different from the default. */
public static final String SECONDARY_FS_USER_NAME = "SECONDARY_FS_USER_NAME";

/** /**
* Stops IGFS cleaning all used resources. * Stops IGFS cleaning all used resources.
* *
Expand Down
Expand Up @@ -72,6 +72,7 @@
import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware;
import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridCompoundFuture;
Expand All @@ -87,6 +88,7 @@
import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.thread.IgniteThreadPoolExecutor; import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -200,6 +202,9 @@ public final class IgfsImpl implements IgfsEx {
data = igfsCtx.data(); data = igfsCtx.data();
secondaryFs = cfg.getSecondaryFileSystem(); secondaryFs = cfg.getSecondaryFileSystem();


if (secondaryFs instanceof LifecycleAware)
((LifecycleAware) secondaryFs).start();

/* Default IGFS mode. */ /* Default IGFS mode. */
IgfsMode dfltMode; IgfsMode dfltMode;


Expand Down Expand Up @@ -256,8 +261,12 @@ public final class IgfsImpl implements IgfsEx {


modeRslvr = new IgfsModeResolver(dfltMode, modes); modeRslvr = new IgfsModeResolver(dfltMode, modes);


secondaryPaths = new IgfsPaths(secondaryFs == null ? null : secondaryFs.properties(), dfltMode, Object secondaryFsPayload = null;
modeRslvr.modesOrdered());
if (secondaryFs instanceof HadoopPayloadAware)
secondaryFsPayload = ((HadoopPayloadAware) secondaryFs).getPayload();

secondaryPaths = new IgfsPaths(secondaryFsPayload, dfltMode, modeRslvr.modesOrdered());


// Check whether IGFS LRU eviction policy is set on data cache. // Check whether IGFS LRU eviction policy is set on data cache.
String dataCacheName = igfsCtx.configuration().getDataCacheName(); String dataCacheName = igfsCtx.configuration().getDataCacheName();
Expand Down Expand Up @@ -305,7 +314,8 @@ private ClusterNode localNode() {
batch.cancel(); batch.cancel();


try { try {
secondaryFs.close(); if (secondaryFs instanceof LifecycleAware)
((LifecycleAware)secondaryFs).stop();
} }
catch (Exception e) { catch (Exception e) {
log.error("Failed to close secondary file system.", e); log.error("Failed to close secondary file system.", e);
Expand Down
Expand Up @@ -17,17 +17,21 @@


package org.apache.ignite.internal.processors.igfs; package org.apache.ignite.internal.processors.igfs;


import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Externalizable; import java.io.Externalizable;
import java.io.IOException; import java.io.IOException;
import java.io.ObjectInput; import java.io.ObjectInput;
import java.io.ObjectOutput; import java.io.ObjectOutput;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.igfs.IgfsMode; import org.apache.ignite.igfs.IgfsMode;
import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;


/** /**
Expand All @@ -37,8 +41,8 @@ public class IgfsPaths implements Externalizable {
/** */ /** */
private static final long serialVersionUID = 0L; private static final long serialVersionUID = 0L;


/** Additional secondary file system properties. */ /** */
private Map<String, String> props; private byte[] payloadBytes;


/** Default IGFS mode. */ /** Default IGFS mode. */
private IgfsMode dfltMode; private IgfsMode dfltMode;
Expand All @@ -56,22 +60,25 @@ public IgfsPaths() {
/** /**
* Constructor. * Constructor.
* *
* @param props Additional secondary file system properties. * @param payload Payload.
* @param dfltMode Default IGFS mode. * @param dfltMode Default IGFS mode.
* @param pathModes Path modes. * @param pathModes Path modes.
* @throws IgniteCheckedException If failed.
*/ */
public IgfsPaths(Map<String, String> props, IgfsMode dfltMode, @Nullable List<T2<IgfsPath, public IgfsPaths(Object payload, IgfsMode dfltMode, @Nullable List<T2<IgfsPath, IgfsMode>> pathModes)
IgfsMode>> pathModes) { throws IgniteCheckedException {
this.props = props;
this.dfltMode = dfltMode; this.dfltMode = dfltMode;
this.pathModes = pathModes; this.pathModes = pathModes;
}


/** if (payload == null)
* @return Secondary file system properties. payloadBytes = null;
*/ else {
public Map<String, String> properties() { ByteArrayOutputStream out = new ByteArrayOutputStream();
return props;
new JdkMarshaller().marshal(payload, out);

payloadBytes = out.toByteArray();
}
} }


/** /**
Expand All @@ -88,17 +95,36 @@ public IgfsMode defaultMode() {
return pathModes; return pathModes;
} }


/**
* @return Payload.
*
* @throws IgniteCheckedException If failed to deserialize the payload.
*/
@Nullable public Object getPayload(ClassLoader clsLdr) throws IgniteCheckedException {
if (payloadBytes == null)
return null;
else {
ByteArrayInputStream in = new ByteArrayInputStream(payloadBytes);

return new JdkMarshaller().unmarshal(in, clsLdr);
}
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException { @Override public void writeExternal(ObjectOutput out) throws IOException {
U.writeStringMap(out, props); U.writeByteArray(out, payloadBytes);

U.writeEnum(out, dfltMode); U.writeEnum(out, dfltMode);


if (pathModes != null) { if (pathModes != null) {
out.writeBoolean(true); out.writeBoolean(true);
out.writeInt(pathModes.size()); out.writeInt(pathModes.size());


for (T2<IgfsPath, IgfsMode> pathMode : pathModes) { for (T2<IgfsPath, IgfsMode> pathMode : pathModes) {
assert pathMode.getKey() != null;

pathMode.getKey().writeExternal(out); pathMode.getKey().writeExternal(out);

U.writeEnum(out, pathMode.getValue()); U.writeEnum(out, pathMode.getValue());
} }
} }
Expand All @@ -108,7 +134,8 @@ public IgfsMode defaultMode() {


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
props = U.readStringMap(in); payloadBytes = U.readByteArray(in);

dfltMode = IgfsMode.fromOrdinal(in.readByte()); dfltMode = IgfsMode.fromOrdinal(in.readByte());


if (in.readBoolean()) { if (in.readBoolean()) {
Expand All @@ -118,11 +145,10 @@ public IgfsMode defaultMode() {


for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
IgfsPath path = new IgfsPath(); IgfsPath path = new IgfsPath();
path.readExternal(in);


T2<IgfsPath, IgfsMode> entry = new T2<>(path, IgfsMode.fromOrdinal(in.readByte())); path.readExternal(in);


pathModes.add(entry); pathModes.add(new T2<>(path, IgfsMode.fromOrdinal(in.readByte())));
} }
} }
} }
Expand Down
Expand Up @@ -19,7 +19,6 @@


import java.io.OutputStream; import java.io.OutputStream;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Map; import java.util.Map;
import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteException;
import org.apache.ignite.igfs.IgfsFile; import org.apache.ignite.igfs.IgfsFile;
Expand Down Expand Up @@ -116,14 +115,4 @@ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem {
@Override public long usedSpaceSize() throws IgniteException { @Override public long usedSpaceSize() throws IgniteException {
return igfs.usedSpaceSize(); return igfs.usedSpaceSize();
} }

/** {@inheritDoc} */
@Override public Map<String, String> properties() {
return Collections.emptyMap();
}

/** {@inheritDoc} */
@Override public void close() throws IgniteException {
// No-op.
}
} }

0 comments on commit 8ed73b4

Please sign in to comment.