Permalink
Browse files

Improve Pig script for Part 5.

Minor doc improvements.
  • Loading branch information...
1 parent b644af9 commit f4edf528e054ee4b1dc7bb2aadd8c1137e798ab9 @dvryaboy dvryaboy committed Sep 2, 2012
Showing with 53 additions and 23 deletions.
  1. +9 −1 part1/README.md
  2. +1 −1 part3/README.md
  3. +6 −0 part3/src/scripts/wc.pig
  4. +6 −5 part4/src/scripts/wc.pig
  5. +31 −16 part5/src/scripts/tfidf.pig
View
@@ -23,7 +23,15 @@ Before running this sample app, be sure to set your `HADOOP_HOME` environment va
To view the results:
- more output/rain/part-00000
+ cat output/rain/*
+
+To run the pig version of the script, make sure `PIG_HOME` is set and run :
+
+ rm -rf pigoutput
+ pig -p inPath=data/rain.txt -p outPath=pigoutput/rain ./src/scripts/copy.pig
+
+To view the results:
+ cat pigoutput/rain/*
An example of log captured from a successful build+run is at https://gist.github.com/2911686
View
@@ -4,7 +4,7 @@ The goal is to expand on our Word Count example in Cascading, and show how to wr
We'll keep building on this example until we have a MapReduce implementation of [TF-IDF](http://en.wikipedia.org/wiki/Tf*idf).
-More detailed background information and step-by-step documentation is provided at https://github.com/ConcurrentCore/impatient/wiki
+More detailed background information and step-by-step documentation is provided at https://github.com/cascading/impatient/wiki
Build Instructions
==================
View
@@ -3,6 +3,12 @@ docPipe = FILTER docPipe BY doc_id != 'doc_id';
-- specify a regex operation to split the "document" text lines into a token stream
tokenPipe = FOREACH docPipe GENERATE doc_id, FLATTEN(TOKENIZE(LOWER(text), ' [](),.')) AS token;
+
+-- part 3 of this tutorial teaches how to implement a custom function for cleaning the tokens.
+-- here, we simply use the built-in LOWER() UDF to do so.
+-- You can see the complete implementation of the UDF here:
+-- https://github.com/apache/pig/blob/trunk/src/org/apache/pig/builtin/LOWER.java
+
tokenPipe = FILTER tokenPipe BY token MATCHES '\\w.*';
-- DUMP tokenPipe;
View
@@ -1,3 +1,5 @@
+set pig.exec.mapPartAgg true
+
docPipe = LOAD '$docPath' USING PigStorage('\t', 'tagsource') AS (doc_id, text);
docPipe = FILTER docPipe BY doc_id != 'doc_id';
@@ -8,10 +10,8 @@ stopPipe = FILTER stopPipe BY stop != 'stop';
tokenPipe = FOREACH docPipe GENERATE doc_id, FLATTEN(TOKENIZE(LOWER(text), ' [](),.')) AS token;
tokenPipe = FILTER tokenPipe BY token MATCHES '\\w.*';
--- perform a left join to remove stop words, discarding the rows
--- which joined with stop words, i.e., were non-null after left join
-tokenPipe = JOIN tokenPipe BY token LEFT, stopPipe BY stop;
-tokenPipe = FILTER tokenPipe BY stopPipe::stop is NULL;
+-- perform a left join to remove stop words
+tokenPipe = JOIN tokenPipe BY token LEFT, stopPipe BY stop using 'replicated';
-- DUMP tokenPipe;
-- determine the word counts
@@ -20,4 +20,5 @@ wcPipe = FOREACH tokenGroups GENERATE group AS token, COUNT(tokenPipe) AS count;
-- output
STORE wcPipe INTO '$wcPath' using PigStorage('\t', 'tagsource');
-EXPLAIN -out dot/wc_pig.dot -dot wcPipe;
+--explain wcPipe
+-- EXPLAIN -out dot/wc_pig.dot -dot wcPipe;
@@ -1,3 +1,5 @@
+set pig.exec.mapPartAgg true
+
docPipe = LOAD '$docPath' USING PigStorage('\t', 'tagsource') AS (doc_id, text);
docPipe = FILTER docPipe BY doc_id != 'doc_id';
@@ -10,7 +12,7 @@ tokenPipe = FILTER tokenPipe BY token MATCHES '\\w.*';
-- perform a left join to remove stop words, discarding the rows
-- which joined with stop words, i.e., were non-null after left join
-tokenPipe = JOIN tokenPipe BY token LEFT, stopPipe BY stop;
+tokenPipe = JOIN tokenPipe BY token LEFT, stopPipe BY stop using 'replicated';
tokenPipe = FILTER tokenPipe BY stopPipe::stop is NULL;
-- DUMP tokenPipe;
@@ -23,27 +25,40 @@ tfPipe = FOREACH tfGroups GENERATE FLATTEN(group) AS (doc_id, tf_token), COUNT(t
dPipe = FOREACH tokenPipe GENERATE doc_id;
dPipe = DISTINCT dPipe;
dGroups = GROUP dPipe ALL;
-dPipe = FOREACH dGroups GENERATE COUNT(dPipe) AS n_docs;
--- DUMP dPipe;
+dPipe = FOREACH dGroups {
+ GENERATE COUNT(dPipe) AS n_docs;
+}
-- one branch tallies the token counts for document frequency (DF)
-dfPipe = DISTINCT tokenPipe;
-dfGroups = GROUP dfPipe BY token;
-dfPipe = FOREACH dfGroups GENERATE group AS df_token, COUNT(dfPipe) AS df_count;
+-- note that here, we calculate distinct inside the foreach, whereas
+-- for global count, we used the top-level DISTINCT operator.
+-- the difference is that one is slower (requires an MR job), but extremely
+-- scalable; the other is done in memory, on a reducer, per-group.
+-- since here we expect much smaller groups, we favor the method that will
+-- be faster and not produce an extra MR job.
+tokenGroups = GROUP tokenPipe BY token;
+dfPipe = FOREACH tokenGroups {
+ dfPipe = distinct tokenPipe.doc_id;
+ GENERATE group AS df_token, COUNT(dfPipe) AS df_count;
+}
-- DUMP dfPipe;
-- join to bring together all the components for calculating TF-IDF
-idfPipe = CROSS dfPipe, dPipe;
-tfidfPipe = JOIN tfPipe BY tf_token, idfPipe BY df_token;
-tfidfPipe = FOREACH tfidfPipe GENERATE doc_id, (double) tf_count * LOG( (double) n_docs / ( 1.0 + (double) df_count ) ) AS tfidf, tf_token AS token;
+tfidfPipe = JOIN tfPipe BY tf_token, dfPipe BY df_token;
+-- Note how we refer to dPipe.n_docs , even though it's a relation we didn't join in!
+-- That's a special case for single-tuple relations that allows one to simply treat them as
+-- constants. Seem more here: http://squarecog.wordpress.com/2010/12/19/new-features-in-apache-pig-0-8/
+tfidfPipe = FOREACH tfidfPipe GENERATE
+ doc_id,
+ (double) tf_count * LOG( (double) dPipe.n_docs / ( 1.0 + (double) df_count ) ) AS tfidf,
+ tf_token AS token;
-- output
-STORE tfidfPipe INTO '$tfidfPath' using PigStorage('\t', 'tagsource');
-EXPLAIN -out dot/tfidf_pig.dot -dot tfidfPipe;
+STORE tfidfPipe INTO '$tfidfPath' using PigStorage('\t', '-tagsource -schema');
+-- EXPLAIN -out dot/tfidf_pig.dot -dot tfidfPipe;
-- determine the word counts
--- THIS PART DIES IN APACHE PIG W/O HELPFUL EXCEPTION MESSAGES
---tokenGroups = GROUP tokenPipe BY token;
---wcPipe = FOREACH tokenGroups GENERATE COUNT(tokenPipe) AS count, group AS token;
---wcPipe = ORDER wcPipe BY count DESC;
---STORE wcPipe INTO '$wcPath' using PigStorage('\t', 'tagsource');
+tokenGroups = GROUP tokenPipe BY token;
+wcPipe = FOREACH tokenGroups GENERATE group AS token, COUNT(tokenPipe) AS count;
+wcPipe = ORDER wcPipe BY count DESC;
+STORE wcPipe INTO '$wcPath' using PigStorage('\t', '-tagsource -schema');

0 comments on commit f4edf52

Please sign in to comment.