Permalink
Browse files

Merge pull request #1 from tmalaska/master

CDH4 Updates to Ades
  • Loading branch information...
2 parents f888d93 + 30f74b2 commit 78674a2ea15088c573e90198293301f9290a6ff8 @jwills jwills committed Jul 26, 2012
Showing with 70 additions and 25 deletions.
  1. +8 −8 README.md
  2. +10 −2 pom.xml
  3. +35 −12 src/main/java/com/cloudera/science/pig/Combinatorial.java
  4. +17 −3 src/main/java/com/cloudera/science/pig/EBCI.java
View
@@ -11,7 +11,7 @@ This project contains code for running an analysis of adverse drug events using
### Code
This analysis is designed to be small enough that you can run it on a single machine if you
-do not have access to a Hadoop cluster. You will need to have a version of [CDH3](https://ccp.cloudera.com/display/CDHDOC/CDH3+Installation)
+do not have access to a Hadoop cluster. You will need to have a version of [CDH3](https://ccp.cloudera.com/display/CDHDOC/CDH4+Installation)
on your local machine, along with the version of Pig that is compatible with that version.
You will need to have Maven for compiling the Pig user-defined functions, and may also want to have a
@@ -29,13 +29,13 @@ into aers/demos, and all of the REAC\*.TXT files should go into aers/reactions.
If you have not done so already, load the input data into the Hadoop cluster:
- hadoop fs -mkdir aers
- hadoop fs -mkdir aers/drugs
- hadoop fs -put DRUG*.TXT aers/drugs
- hadoop fs -mkdir aers/demos
- hadoop fs -put DEMO*.TXT aers/demos
- hadoop fs -mkdir aers/reactions
- hadoop fs -put REAC*.TXT aers/reactions
+ hdfs dfs -mkdir aers
+ hdfs dfs -mkdir aers/drugs
+ hdfs dfs -put DRUG*.TXT aers/drugs
+ hdfs dfs -mkdir aers/demos
+ hdfs dfs -put DEMO*.TXT aers/demos
+ hdfs dfs -mkdir aers/reactions
+ hdfs dfs -put REAC*.TXT aers/reactions
Each of these commands should be run from the project's top-level directory,
i.e., the directory that contains this README file.
View
@@ -17,14 +17,22 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
- <version>0.20.2-cdh3u1</version>
+ <version>0.23.1-mr1-cdh4.0.0b2</version>
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>0.23.1-mr1-cdh4.0.0b2</version>
+ <scope>provided</scope>
+ </dependency>
+
+
+ <dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
- <version>0.8.1</version>
+ <version>0.9.2</version>
<scope>provided</scope>
</dependency>
@@ -101,42 +101,65 @@ private boolean isComparable(byte pigType) {
}
@Override
+ /**
+ * This UDF is given a Bag of Tuples of Comparables.
+ * Describe output: ({name:chararray})
+ *
+ * We want to output a Bag of Tuples of X of Comparables. X being equal to arity
+ * Describe should output: ({name0:chararray, name1:chararray})s
+ */
public Schema outputSchema(Schema input) {
if (input.size() != 1) {
throw new IllegalArgumentException("Expected a bag; input has > 1 field");
}
try {
+
+ //Run some error checking
if (input.getField(0).type != DataType.BAG) {
throw new IllegalArgumentException("Expected a bag; found: " +
DataType.findTypeName(input.getField(0).type));
}
+ if (input.getField(0).schema.getField(0).type != DataType.TUPLE) {
+ throw new IllegalArgumentException("Expected a tuple in a bag; found: " +
+ DataType.findTypeName(input.getField(0).type));
+ }
if (input.getField(0).schema.size() != 1) {
throw new IllegalArgumentException("The bag must contain a single field");
}
- byte bagType = input.getField(0).schema.getField(0).type;
- if (bagType == DataType.TUPLE) {
- bagType = input.getField(0).schema.getField(0).schema.getField(0).type;
- }
- if (!isComparable(bagType)) {
- throw new IllegalArgumentException("The bag's field must be a comparable type");
+
+ //just to bucket schemas because we will be going 3 levels deep
+ Schema bagSchema = input.getField(0).schema;
+ Schema tupleSchema = bagSchema.getField(0).schema;
+
+ byte fieldType = tupleSchema.getField(0).type;
+
+ if (!isComparable(fieldType)) {
+ throw new IllegalArgumentException("The bag's Tulple's field must be a comparable type");
}
- FieldSchema inputField = input.getField(0).schema.getField(0);
+ FieldSchema inputField = tupleSchema.getField(0);
String inputName = inputField.alias;
+
+ //Define how many fields will be in the tuple
List<FieldSchema> fields = Lists.newArrayList();
for (int i = 0; i < arity; i++) {
fields.add(new FieldSchema(inputName + i, inputField.type));
}
- Schema tupleSchema = new Schema(fields);
+ Schema newTupleSchema = new Schema(fields);
- FieldSchema tupleFieldSchema = new FieldSchema(inputName + "tuple", tupleSchema,
+ //Define the tuple
+ FieldSchema tupleFieldSchema = new FieldSchema(inputName + "tuple", newTupleSchema,
DataType.TUPLE);
- Schema bagSchema = new Schema(tupleFieldSchema);
- bagSchema.setTwoLevelAccessRequired(true);
+ //Define Bag
+ Schema newBagSchema = new Schema(tupleFieldSchema);
+ //bagSchema.setTwoLevelAccessRequired(true); // This was deprecated. TODO why was this there.
Schema.FieldSchema bagFieldSchema = new Schema.FieldSchema(inputName + "bag",
- bagSchema, DataType.BAG);
+ newBagSchema, DataType.BAG);
+
+
return new Schema(bagFieldSchema);
+
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -19,6 +19,7 @@
import org.apache.commons.math.ConvergenceException;
import org.apache.commons.math.FunctionEvaluationException;
+import org.apache.commons.math.MathRuntimeException;
import org.apache.commons.math.analysis.DifferentiableUnivariateRealFunction;
import org.apache.commons.math.analysis.UnivariateRealFunction;
import org.apache.commons.math.analysis.integration.SimpsonIntegrator;
@@ -45,7 +46,7 @@ public PiFunction(double p, GammaDistribution g1, GammaDistribution g2) {
this.g2 = g2;
}
- @Override
+
public double value(double lambda) throws FunctionEvaluationException {
return p * g1.density(lambda) + (1.0 - p) * g2.density(lambda);
}
@@ -62,7 +63,6 @@ public PiFunctionIntegral(PiFunction pi, double target) {
this.integrator = new SimpsonIntegrator();
}
- @Override
public double value(double lambda) throws FunctionEvaluationException {
try {
if (lambda == 0.0) {
@@ -73,11 +73,14 @@ public double value(double lambda) throws FunctionEvaluationException {
e.printStackTrace();
} catch (IllegalArgumentException e) {
e.printStackTrace();
+ } catch (Exception e)
+ {
+
+ throw new RuntimeException("lambda-" + lambda,e);
}
return Double.POSITIVE_INFINITY;
}
- @Override
public UnivariateRealFunction derivative() {
return pi;
}
@@ -97,6 +100,9 @@ public EBCI(String target, String alpha1, String beta1,
Double.valueOf(alpha2), Double.valueOf(beta2), Double.valueOf(p));
}
+
+
+
public EBCI(double target, double alpha1, double beta1,
double alpha2, double beta2, double p) {
this.target = target;
@@ -113,12 +119,20 @@ public double eval(int n, double e) {
GammaDistribution g2 = new GammaDistributionImpl(alpha2 + n, beta2 + e);
PiFunction pi = new PiFunction(q.eval(n, e), g1, g2);
PiFunctionIntegral ipi = new PiFunctionIntegral(pi, target);
+
+
+
try {
return (new BrentSolver()).solve(ipi, 0.0, 10.0, 0.01);
} catch (ConvergenceException e1) {
e1.printStackTrace();
} catch (FunctionEvaluationException e1) {
e1.printStackTrace();
+ } catch (RuntimeException e1)
+ {
+ //MathRuntimeException function values at endpoints do not have different signs
+ e1.printStackTrace();
+
}
return -1.0;
}

0 comments on commit 78674a2

Please sign in to comment.