Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

some classpath fixes #4

Merged
merged 1 commit into from
Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import cascading.flow.Flow;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.flow.hadoop3.Hadoop3MRFlowConnector;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
Expand Down Expand Up @@ -122,7 +122,7 @@ public <T> void verifyScroogeRead(List<TBase> recordsToWrite, Class<T> readClass

Pipe assembly = new Pipe("namecp");
assembly = new Each(assembly, new ObjectToStringFunction());
Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
Flow flow = new Hadoop3MRFlowConnector().connect("namecp", source, sink, assembly);

flow.complete();
String result = FileUtils.readFileToString(new File(TXT_OUTPUT_PATH + "/part-00000"));
Expand Down Expand Up @@ -184,7 +184,7 @@ private void doWrite() throws Exception {

Pipe assembly = new Pipe( "namecp" );
assembly = new Each(assembly, new PackThriftFunction());
Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
Flow flow = new Hadoop3MRFlowConnector().connect("namecp", source, sink, assembly);

flow.complete();
}
Expand All @@ -202,7 +202,7 @@ private void doRead() throws Exception {

Pipe assembly = new Pipe( "namecp" );
assembly = new Each(assembly, new UnpackThriftFunction());
Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
Flow flow = new Hadoop3MRFlowConnector().connect("namecp", source, sink, assembly);

flow.complete();
String result = FileUtils.readFileToString(new File(txtOutputPath+"/part-00000"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import cascading.flow.Flow;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.flow.hadoop3.Hadoop3MRFlowConnector;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
Expand Down Expand Up @@ -61,7 +61,7 @@ public void testWrite() throws Exception {

Pipe assembly = new Pipe( "namecp" );
assembly = new Each(assembly, new PackThriftFunction());
HadoopFlowConnector hadoopFlowConnector = new HadoopFlowConnector();
Hadoop3MRFlowConnector hadoopFlowConnector = new Hadoop3MRFlowConnector();
Flow flow = hadoopFlowConnector.connect("namecp", source, sink, assembly);

flow.complete();
Expand Down Expand Up @@ -95,7 +95,7 @@ private void doRead(Scheme sourceScheme) throws Exception {

Pipe assembly = new Pipe( "namecp" );
assembly = new Each(assembly, new UnpackThriftFunction());
Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
Flow flow = new Hadoop3MRFlowConnector().connect("namecp", source, sink, assembly);

flow.complete();
String result = FileUtils.readFileToString(new File(txtOutputPath+"/part-00000"));
Expand Down Expand Up @@ -159,7 +159,7 @@ public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
TupleEntry arguments = functionCall.getArguments();
Tuple result = new Tuple();

Name name = (Name) arguments.get(0);
Name name = (Name) arguments.getObject(0);
result.add(name.getFirst_name());
result.add(name.getLast_name());
functionCall.getOutputCollector().add(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import cascading.flow.Flow;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.flow.hadoop3.Hadoop3MRFlowConnector;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
Expand Down Expand Up @@ -69,7 +69,7 @@ public void testFieldProjection() throws Exception {

Pipe assembly = new Pipe("namecp");
assembly = new Each(assembly, new ProjectedTupleFunction());
Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
Flow flow = new Hadoop3MRFlowConnector().connect("namecp", source, sink, assembly);

flow.complete();
String result = FileUtils.readFileToString(new File(txtOutputPath + "/part-00000"));
Expand All @@ -91,7 +91,7 @@ public void testReadWrite(String inputPath) throws Exception {

Pipe assembly = new Pipe("namecp");
assembly = new Each(assembly, new UnpackTupleFunction());
Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
Flow flow = new Hadoop3MRFlowConnector().connect("namecp", source, sink, assembly);

flow.complete();
String result = FileUtils.readFileToString(new File(txtOutputPath + "/part-00000"));
Expand Down