Skip to content

Commit

Permalink
added #getPropertyNames to c.f.FlowProcess
Browse files Browse the repository at this point in the history
  • Loading branch information
cwensel committed Feb 16, 2012
1 parent 5eecc58 commit b5bd117
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 0 deletions.
15 changes: 15 additions & 0 deletions src/core/cascading/flow/FlowProcess.java
Expand Up @@ -21,6 +21,8 @@
package cascading.flow;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;

import cascading.tap.Tap;
Expand Down Expand Up @@ -65,6 +67,12 @@ public Object getProperty( String key )
return null;
}

@Override
public Collection<String> getPropertyKeys()
{
return Collections.EMPTY_SET;
}

public void keepAlive()
{
}
Expand Down Expand Up @@ -201,6 +209,13 @@ public void setCurrentSession( FlowSession currentSession )
*/
public abstract Object getProperty( String key );

/**
* Method getPropertyKeys returns an immutable collection of all available property key values.
*
* @return a Collection<String>
*/
public abstract Collection<String> getPropertyKeys();

/**
* Method keepAlive notifies the system that the current process is still alive. Use this method if a particular
* {@link cascading.operation.Operation} takes some moments to complete. Each system is different, so calling
Expand Down
7 changes: 7 additions & 0 deletions src/core/cascading/flow/FlowProcessWrapper.java
Expand Up @@ -21,6 +21,7 @@
package cascading.flow;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;

import cascading.tap.Tap;
Expand Down Expand Up @@ -94,6 +95,12 @@ public Object getProperty( String key )
return delegate.getProperty( key );
}

@Override
public Collection<String> getPropertyKeys()
{
return delegate.getPropertyKeys();
}

@Override
public void keepAlive()
{
Expand Down
16 changes: 16 additions & 0 deletions src/core/cascading/flow/stream/PipeFlowProcess.java
Expand Up @@ -20,6 +20,11 @@

package cascading.flow.stream;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import cascading.flow.FlowProcess;
import cascading.flow.FlowProcessWrapper;
import cascading.pipe.ConfigDef;
Expand Down Expand Up @@ -71,4 +76,15 @@ public Object getProperty( String key )
{
return configDef.apply( key, getter );
}

@Override
public Collection<String> getPropertyKeys()
{
Set<String> keys = new HashSet<String>();

keys.addAll( getDelegate().getPropertyKeys() );
keys.addAll( configDef.getAllKeys() );

return Collections.unmodifiableSet( keys );
}
}
13 changes: 13 additions & 0 deletions src/core/cascading/pipe/ConfigDef.java
Expand Up @@ -21,9 +21,12 @@
package cascading.pipe;

import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* The ConfigDef class allows for the creation of a configuration properties template to be applied to an existing
Expand Down Expand Up @@ -169,4 +172,14 @@ public void apply( Mode mode, Setter setter )
}
}
}

public Collection<String> getAllKeys()
{
Set<String> keys = new HashSet<String>();

for( Map<String, String> map : config.values() )
keys.addAll( map.keySet() );

return Collections.unmodifiableSet( keys );
}
}
15 changes: 15 additions & 0 deletions src/hadoop/cascading/flow/hadoop/HadoopFlowProcess.java
Expand Up @@ -21,7 +21,11 @@
package cascading.flow.hadoop;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import cascading.CascadingException;
import cascading.flow.FlowProcess;
Expand Down Expand Up @@ -211,6 +215,17 @@ public Object getProperty( String key )
return jobConf.get( key );
}

@Override
public Collection<String> getPropertyKeys()
{
Set<String> keys = new HashSet<String>();

for( Map.Entry<String, String> entry : jobConf )
keys.add( entry.getKey() );

return Collections.unmodifiableSet( keys );
}

@Override
public void keepAlive()
{
Expand Down
8 changes: 8 additions & 0 deletions src/local/cascading/flow/local/LocalFlowProcess.java
Expand Up @@ -21,6 +21,8 @@
package cascading.flow.local;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;

Expand Down Expand Up @@ -77,6 +79,12 @@ public Object getProperty( String key )
return config.getProperty( key );
}

@Override
public Collection<String> getPropertyKeys()
{
return Collections.unmodifiableSet( config.stringPropertyNames() );
}

@Override
public void keepAlive()
{
Expand Down
7 changes: 7 additions & 0 deletions src/test/cascading/operation/filter/LimitFilterTest.java
Expand Up @@ -21,6 +21,7 @@
package cascading.operation.filter;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;

import cascading.CascadingTestCase;
Expand Down Expand Up @@ -95,6 +96,12 @@ public Object getProperty( String key )
return null;
}

@Override
public Collection<String> getPropertyKeys()
{
return null;
}

@Override
public void keepAlive()
{
Expand Down

0 comments on commit b5bd117

Please sign in to comment.