Permalink
Browse files

using field names instead of tuple positions for TF-IDF calculation

  • Loading branch information...
1 parent 94937f1 commit 102f76e5c8cdafbc34a37046e0c9ce868c1f9f5c @ceteri ceteri committed Jun 23, 2012
View
@@ -1,9 +1,11 @@
**/build
**/dot
**/output
+**/.idea
**/.gradle
*~
+*.dot
*.class
# Package Files #
View
@@ -24,5 +24,8 @@ simple5
physical plan: 10 Mappers, 10 Reducers
simple6
- shows how to use checkpoints, assertions, traps, unit tests
+ shows how to use checkpoints, assertions, traps, unit tests, debug
automatically switch between Hfs and Lfs
+
+simple7
+ has a switch to run the example instead in local mode
@@ -55,6 +55,10 @@
String trapPath = args[ 4 ];
String checkPath = args[ 5 ];
+ Properties properties = new Properties();
+ AppProps.setApplicationJarClass( properties, Main.class );
+ HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );
+
// create source taps, and read from local file system if inputs are not URLs
Tap docTap = makeTap( docPath, new TextDelimited( true, "\t" ) );
@@ -75,14 +79,15 @@
docPipe = new Each( docPipe, AssertionLevel.STRICT, assertMatches );
// specify an operation within a pipe, to split text lines into a token stream
- Fields text = new Fields( "text" );
Fields token = new Fields( "token" );
+ Fields text = new Fields( "text" );
RegexSplitGenerator splitter = new RegexSplitGenerator( token, "[ \\[\\]\\(\\),.]" );
- Fields outputSelector = new Fields( "doc_id", "token" );
- docPipe = new Each( docPipe, text, splitter, outputSelector );
+ Fields fieldDeclaration = new Fields( "doc_id", "token" );
+ docPipe = new Each( docPipe, text, splitter, fieldDeclaration );
// define "ScrubFunction" to clean up the token stream
- docPipe = new Each( docPipe, new ScrubFunction() );
+ Fields doc_id = new Fields( "doc_id" );
+ docPipe = new Each( docPipe, new ScrubFunction( doc_id, token, fieldDeclaration ) );
// perform a left join to remove the stop words
Pipe stopPipe = new Pipe( "stop" );
@@ -101,7 +106,6 @@
tfPipe = new Rename( tfPipe, token, tf_token );
// one branch to count the number of documents (D)
- Fields doc_id = new Fields( "doc_id" );
Fields tally = new Fields( "tally" );
Fields rhs_join = new Fields( "rhs_join" );
Fields n_docs = new Fields( "n_docs" );
@@ -132,7 +136,7 @@
Pipe tfidfPipe = new CoGroup( tfPipe, tf_token, idfCheck, df_token );
// calculate TF-IDF metric
- Fields fieldDeclaration = new Fields( "token", "doc_id", "tfidf" );
+ fieldDeclaration = new Fields( "token", "doc_id", "tfidf" );
tfidfPipe = new Each( tfidfPipe, new TfIdfFunction( doc_id, tf_token, tf_count, df_count, n_docs, fieldDeclaration ) );
// keep track of the word counts, useful for QA
@@ -144,7 +148,7 @@
Fields count = new Fields( "count" );
wcPipe = new GroupBy( wcPipe, count, count );
- // connect the taps and pipes into a flow
+ // connect the taps, pipes, traps, checkpoints, etc., into a flow
FlowDef flowDef = FlowDef.flowDef().setName( "simple" );
flowDef.addSource( docPipe, docTap );
flowDef.addSource( stopPipe, stopTap );
@@ -159,17 +163,10 @@
// set to AssertionLevel.STRICT for all assertions, or AssertionLevel.NONE in production
flowDef.setAssertionLevel( AssertionLevel.STRICT );
- // run the Flow
- Properties properties = new Properties();
- AppProps.setApplicationJarClass( properties, Main.class );
-
- HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );
+ // write a DOT file and run the flow
Flow simpleFlow = flowConnector.connect( flowDef );
simpleFlow.writeDOT( "dot/simple.dot" );
-
- CascadeConnector cascadeConnector = new CascadeConnector( properties );
- Cascade cascade = cascadeConnector.connect( simpleFlow );
- cascade.complete();
+ simpleFlow.complete();
}
public static Tap
@@ -17,16 +17,21 @@
public class ScrubFunction extends BaseOperation implements Function
{
- public ScrubFunction()
+ private final Fields docIdField;
+ private final Fields tokenField;
+
+ public ScrubFunction( Fields docIdField, Fields tokenField, Fields fieldDeclaration )
{
- super( 2, new Fields( "doc_id", "token" ) );
+ super( 2, fieldDeclaration );
+ this.docIdField = docIdField;
+ this.tokenField = tokenField;
}
public void operate( FlowProcess flowProcess, FunctionCall functionCall )
{
TupleEntry argument = functionCall.getArguments();
- String doc_id = argument.getString( 0 );
- String token = scrubText( argument.getString( 1 ) );
+ String doc_id = argument.getString( docIdField );
+ String token = scrubText( argument.getString( tokenField ) );
if( token.length() > 0 )
{
@@ -7,19 +7,21 @@
package simple6;
import org.junit.Test;
-
import static org.junit.Assert.assertEquals;
+import cascading.tuple.Fields;
+
public class ScrubTest
{
-
@Test
- public void
- testMain()
- throws Exception
+ public void testMain() throws Exception
{
ScrubTest tester = new ScrubTest();
- ScrubFunction scrub = new ScrubFunction();
+
+ Fields doc_id = new Fields( "doc_id" );
+ Fields token = new Fields( "token" );
+ Fields fieldDeclaration = new Fields( "doc_id", "token" );
+ ScrubFunction scrub = new ScrubFunction( doc_id, token, fieldDeclaration );
assertEquals( "Scrub", "foo bar", scrub.scrubText( "FoO BAR " ) );
}
@@ -7,19 +7,24 @@
package simple6;
import org.junit.Test;
-
import static org.junit.Assert.assertEquals;
+import cascading.tuple.Fields;
+
public class TfIdfTest
{
-
@Test
- public void
- testMain()
- throws Exception
+ public void testMain() throws Exception
{
TfIdfTest tester = new TfIdfTest();
- TfIdfFunction tfidf = new TfIdfFunction();
+
+ Fields doc_id = new Fields( "doc_id" );
+ Fields tf_token = new Fields( "tf_token" );
+ Fields tf_count = new Fields( "tf_count" );
+ Fields df_count = new Fields( "df_count" );
+ Fields n_docs = new Fields( "n_docs" );
+ Fields fieldDeclaration = new Fields( "token", "doc_id", "tfidf" );
+ TfIdfFunction tfidf = new TfIdfFunction( doc_id, tf_token, tf_count, df_count, n_docs, fieldDeclaration );
assertEquals( "TF-IDF", 0.446, tfidf.getMetric( 2, 5, 3 ), 0.001 );
}

0 comments on commit 102f76e

Please sign in to comment.