From 9e3bd5a8a7f7d4c3b5bdd5d83edde13cb278c7a2 Mon Sep 17 00:00:00 2001 From: Daniel Sudzilouski Date: Wed, 16 Feb 2022 12:27:31 -0800 Subject: [PATCH 01/11] compile & compile:test passes all implementation code minus tests compiles scalding-commons compiles scalding-avro compiles scalding-hraven compiles scalding-json compile scalding-hadoop-test compiles main core compile fix CascadingStats c.t.h.TemplateTap deprecated in cascading fix getElementGraph fix AGGREGATE_BY_CAPACITY maple compiles add docker cascading WIP build --- build.sbt | 30 ++-- experimental-cascading/README.md | 8 ++ experimental-cascading/build-scalding.sh | 10 ++ experimental-cascading/docker/Dockerfile | 18 +++ .../docker/cleanup-image.sh | 3 + .../docker/compile-cascading.sh | 15 ++ .../docker/compile-scalding.sh | 10 ++ .../docker/download-java.sh | 5 + .../etsy/cascading/tap/local/LocalTap.java | 33 ++--- .../com/twitter/maple/hbase/HBaseScheme.java | 12 +- .../com/twitter/maple/hbase/HBaseTap.java | 10 +- .../com/twitter/maple/tap/MemorySinkTap.java | 4 +- .../twitter/maple/tap/MemorySourceTap.java | 14 +- .../java/com/twitter/maple/tap/StdoutTap.java | 4 +- .../twitter/scalding/avro/AvroSource.scala | 5 +- .../commons/scheme/KeyValueByteScheme.java | 20 ++- .../scalding/commons/tap/VersionedTap.java | 34 ++--- .../scheme/CombinedSequenceFileScheme.scala | 20 --- .../commons/source/LzoGenericScheme.scala | 16 +-- .../scalding/commons/source/LzoTraits.scala | 2 +- .../commons/source/LzoTypedText.scala | 2 +- .../source/VersionedKeyValSource.scala | 7 +- .../com/twitter/scalding/tap/GlobHfs.java | 9 +- .../com/twitter/scalding/tap/ScaldingHfs.java | 32 +---- .../tuple/HadoopTupleEntrySchemeIterator.java | 9 +- .../scala/com/twitter/scalding/Config.scala | 6 +- .../com/twitter/scalding/Execution.scala | 2 +- .../twitter/scalding/ExecutionContext.scala | 2 +- .../com/twitter/scalding/FileSource.scala | 24 ++-- .../scalding/HfsConfPropertySetter.scala | 9 +- .../main/scala/com/twitter/scalding/Job.scala | 4 +- .../scala/com/twitter/scalding/JobStats.scala | 6 +- .../scala/com/twitter/scalding/JobTest.scala | 6 +- .../com/twitter/scalding/MemoryTap.scala | 4 +- .../com/twitter/scalding/Operations.scala | 4 +- .../scala/com/twitter/scalding/Source.scala | 24 ++-- .../scala/com/twitter/scalding/Stats.scala | 4 +- .../com/twitter/scalding/TemplateSource.scala | 134 ------------------ .../com/twitter/scalding/TestTapFactory.scala | 9 +- .../twitter/scalding/estimation/Common.scala | 2 +- .../CascadingBinaryComparator.scala | 2 +- .../memory/MemoryEstimatorTest.scala | 4 +- .../scalding/platform/LocalCluster.scala | 3 - .../estimation/HRavenHistoryService.scala | 2 +- .../estimation/HRavenHistoryServiceTest.scala | 2 +- .../com/twitter/scalding/TypedJson.scala | 2 +- .../parquet/scrooge/ParquetScroogeScheme.java | 21 +-- .../scrooge/Parquet346ScroogeScheme.scala | 7 +- .../scalding/parquet/ParquetValueScheme.java | 19 +-- .../parquet/thrift/ParquetTBaseScheme.java | 21 +-- .../parquet/tuple/ParquetTupleScheme.java | 39 ++--- .../parquet/tuple/TupleReadSupport.java | 2 +- .../thrift/Parquet346TBaseScheme.scala | 7 +- .../scheme/TypedParquetTupleScheme.scala | 8 +- 54 files changed, 318 insertions(+), 393 deletions(-) create mode 100644 experimental-cascading/README.md create mode 100755 experimental-cascading/build-scalding.sh create mode 100644 experimental-cascading/docker/Dockerfile create mode 100755 experimental-cascading/docker/cleanup-image.sh create mode 100755 experimental-cascading/docker/compile-cascading.sh create mode 100644 experimental-cascading/docker/compile-scalding.sh create mode 100755 experimental-cascading/docker/download-java.sh delete mode 100644 scalding-commons/src/main/scala/com/twitter/scalding/commons/scheme/CombinedSequenceFileScheme.scala delete mode 100644 scalding-core/src/main/scala/com/twitter/scalding/TemplateSource.scala diff --git a/build.sbt b/build.sbt index 8e2fd3e841..02e1d72a16 100644 --- a/build.sbt +++ b/build.sbt @@ -17,21 +17,21 @@ val catsEffectVersion = "1.1.0" val catsVersion = "1.5.0" val chillVersion = "0.8.4" val dagonVersion = "0.3.1" -val elephantbirdVersion = "4.15" +val elephantbirdVersion = "4.17" val hadoopLzoVersion = "0.4.19" -val hadoopVersion = "2.5.0" +val hadoopVersion = "3.2.1" val hbaseVersion = "1.2.4" val hravenVersion = "1.0.1" val jacksonVersion = "2.8.7" val json4SVersion = "3.5.0" val paradiseVersion = "2.1.1" -val parquetVersion = "1.10.0" +val parquetVersion = "1.12.0" val protobufVersion = "2.4.1" val scalameterVersion = "0.8.2" val scalaCheckVersion = "1.13.4" val scalaTestVersion = "3.0.1" val scroogeVersion = "19.8.0" -val sparkVersion = "2.4.0" +val sparkVersion = "3.1.2" val beamVersion = "2.29.0" val slf4jVersion = "1.6.6" val thriftVersion = "0.9.3" @@ -42,7 +42,7 @@ val printDependencyClasspath = taskKey[Unit]("Prints location of the dependencie val sharedSettings = Seq( organization := "com.twitter", - scalaVersion := "2.11.12", + scalaVersion := "2.12.14", crossScalaVersions := Seq(scalaVersion.value, "2.12.14"), javacOptions ++= Seq("-source", "1.8", "-target", "1.8"), doc / javacOptions := Seq("-source", "1.8"), @@ -61,6 +61,7 @@ val sharedSettings = Seq( "com.novocode" % "junit-interface" % "0.10" % "test" ), resolvers ++= Seq( + Resolver.mavenLocal, Opts.resolver.sonatypeSnapshots, Opts.resolver.sonatypeReleases, "Concurrent Maven Repo".at("https://conjars.org/repo"), @@ -260,7 +261,7 @@ lazy val scaldingArgs = module("args") lazy val scaldingDate = module("date") lazy val cascadingVersion = - System.getenv.asScala.getOrElse("SCALDING_CASCADING_VERSION", "2.6.1") + System.getenv.asScala.getOrElse("SCALDING_CASCADING_VERSION", "4.5.0-wip-dev") lazy val cascadingJDBCVersion = System.getenv.asScala.getOrElse("SCALDING_CASCADING_JDBC_VERSION", "2.6.0") @@ -286,9 +287,10 @@ lazy val scaldingQuotation = module("quotation").settings( lazy val scaldingCore = module("core") .settings( libraryDependencies ++= Seq( - "cascading" % "cascading-core" % cascadingVersion, - "cascading" % "cascading-hadoop" % cascadingVersion, - "cascading" % "cascading-local" % cascadingVersion, + "net.wensel" % "cascading-core" % cascadingVersion, + "net.wensel" % "cascading-hadoop3-common" % cascadingVersion, + "net.wensel" % "cascading-hadoop3-mr1" % cascadingVersion, + "net.wensel" % "cascading-local" % cascadingVersion, "com.stripe" %% "dagon-core" % dagonVersion, "com.twitter" % "chill-hadoop" % chillVersion, "com.twitter" % "chill-java" % chillVersion, @@ -353,7 +355,7 @@ lazy val scaldingCommons = module("commons") "com.twitter" %% "bijection-core" % bijectionVersion, "com.twitter" %% "algebird-core" % algebirdVersion, "com.twitter" %% "chill" % chillVersion, - "com.twitter.elephantbird" % "elephant-bird-cascading2" % elephantbirdVersion, + "com.twitter.elephantbird" % "elephant-bird-cascading3" % elephantbirdVersion, "com.twitter.elephantbird" % "elephant-bird-core" % elephantbirdVersion, "com.hadoop.gplcompression" % "hadoop-lzo" % hadoopLzoVersion, // TODO: split this out into scalding-thrift @@ -531,7 +533,7 @@ lazy val scaldingJson = module("json") "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided", "com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion, "org.json4s" %% "json4s-native" % json4SVersion, - "com.twitter.elephantbird" % "elephant-bird-cascading2" % elephantbirdVersion % "provided" + "com.twitter.elephantbird" % "elephant-bird-cascading3" % elephantbirdVersion % "provided" ) ) .dependsOn(scaldingCore) @@ -601,7 +603,8 @@ lazy val maple = Project( "org.apache.hbase" % "hbase-client" % hbaseVersion % "provided", "org.apache.hbase" % "hbase-common" % hbaseVersion % "provided", "org.apache.hbase" % "hbase-server" % hbaseVersion % "provided", - "cascading" % "cascading-hadoop" % cascadingVersion + "net.wensel" % "cascading-hadoop3-common" % cascadingVersion, + "net.wensel" % "cascading-hadoop3-mr1" % cascadingVersion ) ) ) @@ -618,7 +621,8 @@ lazy val executionTutorial = Project( "org.apache.hadoop" % "hadoop-client" % hadoopVersion, "org.slf4j" % "slf4j-api" % slf4jVersion, "org.slf4j" % "slf4j-log4j12" % slf4jVersion, - "cascading" % "cascading-hadoop" % cascadingVersion + "net.wensel" % "cascading-hadoop3-common" % cascadingVersion, + "net.wensel" % "cascading-hadoop3-mr1" % cascadingVersion ) ) ).dependsOn(scaldingCore) diff --git a/experimental-cascading/README.md b/experimental-cascading/README.md new file mode 100644 index 0000000000..eb5b7cc628 --- /dev/null +++ b/experimental-cascading/README.md @@ -0,0 +1,8 @@ +# Experimental Cascading +This is a folder for providing an experimental scalding/cascading build.
+We want to e2e test a hadoop3/cascading4 fork of cascading here. + +# Cascading WIP +The WIP branches of cascading are a pain to consume because they only publish to github-packages which requires github auth to read. +It's easier to just compile to WIP branch here before a final release of cascading 4.5 is available. + diff --git a/experimental-cascading/build-scalding.sh b/experimental-cascading/build-scalding.sh new file mode 100755 index 0000000000..d36744f79f --- /dev/null +++ b/experimental-cascading/build-scalding.sh @@ -0,0 +1,10 @@ +#!/bin/bash -ex + +# build cascading +docker build --tag experimental-cascading:latest -f docker/Dockerfile ./docker + +# build scalding +SCALDING_HOST_ROOT=$PWD../../ +MAVEN_HOST_CACHE=~/.m2 +RESOURCE_ARGS="--cpus=0.000 --memory=8g --memory-swap=24g" # container will crash without enough memory +docker run -v $SCALDING_HOST_ROOT:/scalding -v $MAVEN_HOST_CACHE:/root/m2-host experimental-cascading:latest $RESOURCE_ARGS \ No newline at end of file diff --git a/experimental-cascading/docker/Dockerfile b/experimental-cascading/docker/Dockerfile new file mode 100644 index 0000000000..81064a66de --- /dev/null +++ b/experimental-cascading/docker/Dockerfile @@ -0,0 +1,18 @@ +FROM ubuntu:22.04 + +# CASCADING_RELEASE is the release tag for cascading to compile (ex. https://github.com/cwensel/cascading/releases/tag/4.5-wip-10) +# SCALDING_CASCADING_VERSION is the arg scalding uses to resolve the cascading version +ENV CASCADING_RELEASE 4.5-wip-10 +ENV SCALDING_CASCADING_VERSION 4.5.0-wip-dev + +COPY "./download-java.sh" "/scalding/experimental-cascading/docker/download-java.sh" +RUN "/scalding/experimental-cascading/docker/download-java.sh" + +COPY "./compile-cascading.sh" "/scalding/experimental-cascading/docker/compile-cascading.sh" +RUN "/scalding/experimental-cascading/docker/compile-cascading.sh" + +COPY "./cleanup-image.sh" "/scalding/experimental-cascading/docker/cleanup-image.sh" +RUN "/scalding/experimental-cascading/docker/cleanup-image.sh" + +COPY "./compile-scalding.sh" "/scalding/experimental-cascading/docker/compile-scalding.sh" +ENTRYPOINT ["/bin/bash", "/scalding/experimental-cascading/docker/compile-scalding.sh"] diff --git a/experimental-cascading/docker/cleanup-image.sh b/experimental-cascading/docker/cleanup-image.sh new file mode 100755 index 0000000000..dcf81a83b6 --- /dev/null +++ b/experimental-cascading/docker/cleanup-image.sh @@ -0,0 +1,3 @@ +#!/bin/bash -ex + +rm -rf /scalding \ No newline at end of file diff --git a/experimental-cascading/docker/compile-cascading.sh b/experimental-cascading/docker/compile-cascading.sh new file mode 100755 index 0000000000..69a972a599 --- /dev/null +++ b/experimental-cascading/docker/compile-cascading.sh @@ -0,0 +1,15 @@ +#!/bin/bash -ex + +RELEASE_LINK=https://github.com/cwensel/cascading/archive/refs/tags/$CASCADING_RELEASE.zip + +# get the source for the target release tag +mkdir -p /scalding/tmp +apt-get install curl unzip git -y +curl $RELEASE_LINK -L -o /scalding/tmp/$CASCADING_RELEASE.zip +cd /scalding/tmp/ +unzip $CASCADING_RELEASE +cd cascading-$CASCADING_RELEASE + +# build cascading into maven cache so that scalding can pick it up +./gradlew publishToMavenLocal -x signMavenPublication +ls -R /root/.m2/repository/net/wensel/ diff --git a/experimental-cascading/docker/compile-scalding.sh b/experimental-cascading/docker/compile-scalding.sh new file mode 100644 index 0000000000..d7be7a711a --- /dev/null +++ b/experimental-cascading/docker/compile-scalding.sh @@ -0,0 +1,10 @@ +#!/bin/bash -ex + +# drop cascading build back onto host machine +# to make IDE resolution work for local development +rm -rf /root/m2-host/repository/net/wensel/ +cp -r /root/.m2/repository/net/wensel/ /root/m2-host/repository/net/wensel/ + +# build scalding +cd /scalding +./sbt compile diff --git a/experimental-cascading/docker/download-java.sh b/experimental-cascading/docker/download-java.sh new file mode 100755 index 0000000000..0d72b134a6 --- /dev/null +++ b/experimental-cascading/docker/download-java.sh @@ -0,0 +1,5 @@ +#!/bin/bash -ex + +apt-get update -y +apt-get install openjdk-8-jdk -y +java -version diff --git a/maple/src/main/java/com/etsy/cascading/tap/local/LocalTap.java b/maple/src/main/java/com/etsy/cascading/tap/local/LocalTap.java index 1aefbbb49f..1451f97694 100644 --- a/maple/src/main/java/com/etsy/cascading/tap/local/LocalTap.java +++ b/maple/src/main/java/com/etsy/cascading/tap/local/LocalTap.java @@ -5,6 +5,7 @@ import java.util.Properties; import java.util.logging.Logger; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; @@ -45,18 +46,18 @@ public class LocalTap extends Tap scheme, + public LocalTap(String path, Scheme scheme, SinkMode sinkMode) { super(new LocalScheme(scheme), sinkMode); setup(path, scheme); } - public LocalTap(String path, Scheme scheme) { + public LocalTap(String path, Scheme scheme) { super(new LocalScheme(scheme)); setup(path, scheme); } - private void setup(String path, Scheme scheme) { + private void setup(String path, Scheme scheme) { this.path = path; /* @@ -90,13 +91,13 @@ public String getIdentifier() { } @Override - public TupleEntryIterator openForRead(FlowProcess flowProcess, RecordReader input) throws IOException { + public TupleEntryIterator openForRead(FlowProcess flowProcess, RecordReader input) throws IOException { JobConf jobConf = mergeDefaults("LocalTap#openForRead", flowProcess.getConfigCopy(), defaults); return lfs.openForRead(new HadoopFlowProcess(jobConf)); } @Override - public TupleEntryCollector openForWrite(FlowProcess flowProcess, OutputCollector output) + public TupleEntryCollector openForWrite(FlowProcess flowProcess, OutputCollector output) throws IOException { JobConf jobConf = mergeDefaults("LocalTap#openForWrite", flowProcess.getConfigCopy(), defaults); return lfs.openForWrite(new HadoopFlowProcess(jobConf)); @@ -141,11 +142,11 @@ private static class LocalScheme extends Scheme { private static final long serialVersionUID = 5710119342340369543L; - private Scheme scheme; + private Scheme scheme; private JobConf defaults; private Lfs lfs; - public LocalScheme(Scheme scheme) { + public LocalScheme(Scheme scheme) { super(scheme.getSourceFields(), scheme.getSinkFields()); this.scheme = scheme; } @@ -159,19 +160,19 @@ private void setLfs(Lfs lfs) { } @Override - public Fields retrieveSourceFields(FlowProcess flowProcess, + public Fields retrieveSourceFields(FlowProcess flowProcess, Tap tap) { return scheme.retrieveSourceFields(new HadoopFlowProcess(defaults), lfs); } @Override - public void presentSourceFields(FlowProcess flowProcess, + public void presentSourceFields(FlowProcess flowProcess, Tap tap, Fields fields) { scheme.presentSourceFields(new HadoopFlowProcess(defaults), lfs, fields); } @Override - public void sourceConfInit(FlowProcess flowProcess, + public void sourceConfInit(FlowProcess flowProcess, Tap tap, Properties conf) { JobConf jobConf = mergeDefaults("LocalScheme#sourceConfInit", conf, defaults); scheme.sourceConfInit(new HadoopFlowProcess(jobConf), lfs, jobConf); @@ -179,19 +180,19 @@ public void sourceConfInit(FlowProcess flowProcess, } @Override - public Fields retrieveSinkFields(FlowProcess flowProcess, + public Fields retrieveSinkFields(FlowProcess flowProcess, Tap tap) { return scheme.retrieveSinkFields(new HadoopFlowProcess(defaults), lfs); } @Override - public void presentSinkFields(FlowProcess flowProcess, + public void presentSinkFields(FlowProcess flowProcess, Tap tap, Fields fields) { scheme.presentSinkFields(new HadoopFlowProcess(defaults), lfs, fields); } - + @Override - public void sinkConfInit(FlowProcess flowProcess, + public void sinkConfInit(FlowProcess flowProcess, Tap tap, Properties conf) { JobConf jobConf = mergeDefaults("LocalScheme#sinkConfInit", conf, defaults); scheme.sinkConfInit(new HadoopFlowProcess(jobConf), lfs, jobConf); @@ -199,13 +200,13 @@ public void sinkConfInit(FlowProcess flowProcess, } @Override - public boolean source(FlowProcess flowProcess, SourceCall sourceCall) + public boolean source(FlowProcess flowProcess, SourceCall sourceCall) throws IOException { throw new RuntimeException("LocalTap#source is never called"); } @Override - public void sink(FlowProcess flowProcess, SinkCall sinkCall) + public void sink(FlowProcess flowProcess, SinkCall sinkCall) throws IOException { throw new RuntimeException("LocalTap#sink is never called"); } diff --git a/maple/src/main/java/com/twitter/maple/hbase/HBaseScheme.java b/maple/src/main/java/com/twitter/maple/hbase/HBaseScheme.java index 0f830ede86..6dfa4ff7ff 100644 --- a/maple/src/main/java/com/twitter/maple/hbase/HBaseScheme.java +++ b/maple/src/main/java/com/twitter/maple/hbase/HBaseScheme.java @@ -154,7 +154,7 @@ public String[] getFamilyNames() { } @Override - public void sourcePrepare(FlowProcess flowProcess, + public void sourcePrepare(FlowProcess flowProcess, SourceCall sourceCall) { Object[] pair = new Object[]{sourceCall.getInput().createKey(), sourceCall.getInput().createValue()}; @@ -163,13 +163,13 @@ public void sourcePrepare(FlowProcess flowProcess, } @Override - public void sourceCleanup(FlowProcess flowProcess, + public void sourceCleanup(FlowProcess flowProcess, SourceCall sourceCall) { sourceCall.setContext(null); } @Override - public boolean source(FlowProcess flowProcess, + public boolean source(FlowProcess flowProcess, SourceCall sourceCall) throws IOException { Tuple result = new Tuple(); @@ -206,7 +206,7 @@ public boolean source(FlowProcess flowProcess, } @Override - public void sink(FlowProcess flowProcess, SinkCall sinkCall) + public void sink(FlowProcess flowProcess, SinkCall sinkCall) throws IOException { TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); OutputCollector outputCollector = sinkCall.getOutput(); @@ -231,7 +231,7 @@ public void sink(FlowProcess flowProcess, SinkCall process, + public void sinkConfInit(FlowProcess process, Tap tap, JobConf conf) { conf.setOutputFormat(TableOutputFormat.class); @@ -240,7 +240,7 @@ public void sinkConfInit(FlowProcess process, } @Override - public void sourceConfInit(FlowProcess process, + public void sourceConfInit(FlowProcess process, Tap tap, JobConf conf) { conf.setInputFormat(TableInputFormat.class); diff --git a/maple/src/main/java/com/twitter/maple/hbase/HBaseTap.java b/maple/src/main/java/com/twitter/maple/hbase/HBaseTap.java index bd5ffa8095..80fb06f665 100644 --- a/maple/src/main/java/com/twitter/maple/hbase/HBaseTap.java +++ b/maple/src/main/java/com/twitter/maple/hbase/HBaseTap.java @@ -144,7 +144,7 @@ protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningExceptio } @Override - public void sinkConfInit(FlowProcess process, JobConf conf) { + public void sinkConfInit(FlowProcess process, JobConf conf) { if (quorumNames != null) { conf.set("hbase.zookeeper.quorum", quorumNames); } else { @@ -183,13 +183,13 @@ public String getIdentifier() { } @Override - public TupleEntryIterator openForRead(FlowProcess jobConfFlowProcess, RecordReader recordReader) throws IOException { + public TupleEntryIterator openForRead(FlowProcess jobConfFlowProcess, RecordReader recordReader) throws IOException { return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); } @Override - public TupleEntryCollector openForWrite(FlowProcess jobConfFlowProcess, OutputCollector outputCollector) throws IOException { - HBaseTapCollector hBaseCollector = new HBaseTapCollector( jobConfFlowProcess, this ); + public TupleEntryCollector openForWrite(FlowProcess jobConfFlowProcess, OutputCollector outputCollector) throws IOException { + HBaseTapCollector hBaseCollector = new HBaseTapCollector( (FlowProcess) jobConfFlowProcess, this ); hBaseCollector.prepare(); return hBaseCollector; } @@ -235,7 +235,7 @@ public long getModifiedTime(JobConf jobConf) throws IOException { } @Override - public void sourceConfInit(FlowProcess process, JobConf conf) { + public void sourceConfInit(FlowProcess process, JobConf conf) { // a hack for MultiInputFormat to see that there is a child format FileInputFormat.setInputPaths( conf, getPath() ); diff --git a/maple/src/main/java/com/twitter/maple/tap/MemorySinkTap.java b/maple/src/main/java/com/twitter/maple/tap/MemorySinkTap.java index 6b71b08b5b..f7fc9e21d1 100644 --- a/maple/src/main/java/com/twitter/maple/tap/MemorySinkTap.java +++ b/maple/src/main/java/com/twitter/maple/tap/MemorySinkTap.java @@ -7,7 +7,7 @@ import cascading.tuple.Tuple; import cascading.tuple.TupleEntry; import cascading.tuple.TupleEntryIterator; -import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.conf.Configuration; import java.io.File; import java.io.IOException; @@ -43,7 +43,7 @@ public static String getTempDir() { } @Override - public boolean commitResource(JobConf conf) throws java.io.IOException { + public boolean commitResource(Configuration conf) throws java.io.IOException { TupleEntryIterator it = new HadoopFlowProcess(conf).openTapForRead(this); boolean first_time = true; diff --git a/maple/src/main/java/com/twitter/maple/tap/MemorySourceTap.java b/maple/src/main/java/com/twitter/maple/tap/MemorySourceTap.java index d3ef58e624..df63bdeaa5 100644 --- a/maple/src/main/java/com/twitter/maple/tap/MemorySourceTap.java +++ b/maple/src/main/java/com/twitter/maple/tap/MemorySourceTap.java @@ -46,20 +46,20 @@ public List getTuples() { } @Override - public void sourceConfInit(FlowProcess flowProcess, + public void sourceConfInit(FlowProcess flowProcess, Tap, Void> tap, JobConf conf) { conf.setInputFormat(TupleMemoryInputFormat.class); TupleMemoryInputFormat.storeTuples(conf, TupleMemoryInputFormat.TUPLES_PROPERTY, this.tuples); } @Override - public void sinkConfInit(FlowProcess flowProcess, + public void sinkConfInit(FlowProcess flowProcess, Tap, Void> tap, JobConf conf) { throw new UnsupportedOperationException("Not supported yet."); } @Override - public void sourcePrepare( FlowProcess flowProcess, SourceCall flowProcess, SourceCall> sourceCall ) { sourceCall.setContext( new Object[ 2 ] ); @@ -68,7 +68,7 @@ public void sourcePrepare( FlowProcess flowProcess, SourceCall flowProcess, SourceCall flowProcess, SourceCall> sourceCall) throws IOException { TupleWrapper key = (TupleWrapper) sourceCall.getContext()[ 0 ]; NullWritable value = (NullWritable) sourceCall.getContext()[ 1 ]; @@ -83,13 +83,13 @@ public boolean source(FlowProcess flowProcess, SourceCall flowProcess, SourceCall flowProcess, SourceCall> sourceCall ) { sourceCall.setContext( null ); } @Override - public void sink(FlowProcess flowProcess, SinkCall sinkCall ) throws IOException { + public void sink(FlowProcess flowProcess, SinkCall sinkCall ) throws IOException { throw new UnsupportedOperationException("Not supported."); } @@ -126,7 +126,7 @@ public boolean equals(Object object) { } @Override - public TupleEntryIterator openForRead( FlowProcess flowProcess, RecordReader flowProcess, RecordReader input ) throws IOException { // input may be null when this method is called on the client side or cluster side when accumulating // for a HashJoin diff --git a/maple/src/main/java/com/twitter/maple/tap/StdoutTap.java b/maple/src/main/java/com/twitter/maple/tap/StdoutTap.java index 5c3f5f0b29..4d55b2c854 100644 --- a/maple/src/main/java/com/twitter/maple/tap/StdoutTap.java +++ b/maple/src/main/java/com/twitter/maple/tap/StdoutTap.java @@ -5,7 +5,7 @@ import cascading.tap.hadoop.Lfs; import cascading.tuple.Fields; import cascading.tuple.TupleEntryIterator; -import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.conf.Configuration; import java.io.File; import java.io.IOException; @@ -32,7 +32,7 @@ public static String getTempDir() { } @Override - public boolean commitResource(JobConf conf) throws java.io.IOException { + public boolean commitResource(Configuration conf) throws java.io.IOException { TupleEntryIterator it = new HadoopFlowProcess(conf).openTapForRead(this); System.out.println(""); System.out.println(""); diff --git a/scalding-avro/src/main/scala/com/twitter/scalding/avro/AvroSource.scala b/scalding-avro/src/main/scala/com/twitter/scalding/avro/AvroSource.scala index 708436e54f..66dddabf44 100644 --- a/scalding-avro/src/main/scala/com/twitter/scalding/avro/AvroSource.scala +++ b/scalding-avro/src/main/scala/com/twitter/scalding/avro/AvroSource.scala @@ -27,13 +27,14 @@ import java.util.Properties import cascading.tuple.Fields import collection.JavaConverters._ import org.apache.hadoop.mapred.{JobConf, OutputCollector, RecordReader} +import org.apache.hadoop.conf.Configuration trait UnpackedAvroFileScheme extends FileSource { def schema: Option[Schema] // HadoopSchemeInstance gives compile errors in 2.10 for some reason override def hdfsScheme = (new AvroScheme(schema.getOrElse(null))) - .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + .asInstanceOf[Scheme[Configuration, RecordReader[_, _], OutputCollector[_, _], _, _]] override def localScheme = (new LAvroScheme(schema.getOrElse(null))) .asInstanceOf[Scheme[Properties, InputStream, OutputStream, _, _]] @@ -45,7 +46,7 @@ trait PackedAvroFileScheme[T] extends FileSource { // HadoopSchemeInstance gives compile errors for this in 2.10 for some reason override def hdfsScheme = (new PackedAvroScheme[T](schema)) - .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + .asInstanceOf[Scheme[Configuration, RecordReader[_, _], OutputCollector[_, _], _, _]] override def localScheme = (new LPackedAvroScheme[T](schema)).asInstanceOf[Scheme[Properties, InputStream, OutputStream, _, _]] diff --git a/scalding-commons/src/main/java/com/twitter/scalding/commons/scheme/KeyValueByteScheme.java b/scalding-commons/src/main/java/com/twitter/scalding/commons/scheme/KeyValueByteScheme.java index a436c75e6d..d81c1ef51a 100644 --- a/scalding-commons/src/main/java/com/twitter/scalding/commons/scheme/KeyValueByteScheme.java +++ b/scalding-commons/src/main/java/com/twitter/scalding/commons/scheme/KeyValueByteScheme.java @@ -7,20 +7,23 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.conf.Configuration; import cascading.flow.FlowProcess; import cascading.scheme.SinkCall; import cascading.scheme.SourceCall; +import cascading.scheme.hadoop.WritableSequenceFile; +import cascading.tap.Tap; import cascading.tuple.Fields; import cascading.tuple.Tuple; import cascading.tuple.TupleEntry; -import com.twitter.elephantbird.cascading2.scheme.CombinedWritableSequenceFile; /** * */ -public class KeyValueByteScheme extends CombinedWritableSequenceFile { +public class KeyValueByteScheme extends WritableSequenceFile { public KeyValueByteScheme(Fields fields) { super(fields, BytesWritable.class, BytesWritable.class); } @@ -29,8 +32,17 @@ public static byte[] getBytes(BytesWritable key) { return Arrays.copyOfRange(key.getBytes(), 0, key.getLength()); } + + @Override + public void sourceConfInit(FlowProcess flowProcess, + Tap tap, Configuration conf) { + super.sourceConfInit(flowProcess, tap, conf); + conf.setClass("mapred.input.format.class", SequenceFileInputFormat.class, + org.apache.hadoop.mapred.InputFormat.class); + } + @Override - public boolean source(FlowProcess flowProcess, + public boolean source(FlowProcess flowProcess, SourceCall sourceCall) throws IOException { BytesWritable key = (BytesWritable) sourceCall.getContext()[0]; BytesWritable value = (BytesWritable) sourceCall.getContext()[1]; @@ -48,7 +60,7 @@ public boolean source(FlowProcess flowProcess, } @Override - public void sink(FlowProcess flowProcess, SinkCall sinkCall) + public void sink(FlowProcess flowProcess, SinkCall sinkCall) throws IOException { TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); diff --git a/scalding-commons/src/main/java/com/twitter/scalding/commons/tap/VersionedTap.java b/scalding-commons/src/main/java/com/twitter/scalding/commons/tap/VersionedTap.java index 4c0ea264b8..fb6a16590c 100644 --- a/scalding-commons/src/main/java/com/twitter/scalding/commons/tap/VersionedTap.java +++ b/scalding-commons/src/main/java/com/twitter/scalding/commons/tap/VersionedTap.java @@ -12,10 +12,12 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.conf.Configuration; import cascading.flow.FlowProcess; import cascading.scheme.Scheme; + public class VersionedTap extends GlobHfs { public static enum TapMode {SOURCE, SINK} @@ -31,7 +33,7 @@ public static enum TapMode {SOURCE, SINK} private String newVersionPath; private String writtenPath; - public VersionedTap(String dir, Scheme scheme, TapMode mode) + public VersionedTap(String dir, Scheme scheme, TapMode mode) throws IOException { super(scheme, dir); this.mode = mode; @@ -60,11 +62,11 @@ public String getOutputDirectory() { return getPath().toString(); } - public VersionedStore getStore(JobConf conf) throws IOException { + public VersionedStore getStore(Configuration conf) throws IOException { return new VersionedStore(getPath().getFileSystem(conf), getOutputDirectory()); } - public String getSourcePath(JobConf conf) { + public String getSourcePath(Configuration conf) { VersionedStore store; try { store = getStore(conf); @@ -78,7 +80,7 @@ public String getSourcePath(JobConf conf) { } } - public String getSinkPath(JobConf conf) { + public String getSinkPath(Configuration conf) { try { VersionedStore store = getStore(conf); String sinkPath = (version == null) ? store.createVersion() : store.createVersion(version); @@ -92,38 +94,38 @@ public String getSinkPath(JobConf conf) { } @Override - public void sourceConfInit(FlowProcess process, JobConf conf) { + public void sourceConfInit(FlowProcess process, Configuration conf) { super.sourceConfInit(process, conf); - FileInputFormat.setInputPaths(conf, getSourcePath(conf)); + FileInputFormat.setInputPaths((JobConf) conf, getSourcePath(conf)); } @Override - public void sinkConfInit(FlowProcess process, JobConf conf) { + public void sinkConfInit(FlowProcess process, Configuration conf) { super.sinkConfInit(process, conf); if (newVersionPath == null) newVersionPath = getSinkPath(conf); - FileOutputFormat.setOutputPath(conf, new Path(newVersionPath)); + FileOutputFormat.setOutputPath((JobConf) conf, new Path(newVersionPath)); } @Override - public long getSize(JobConf conf) throws IOException { + public long getSize(Configuration conf) throws IOException { return getSize(new Path(getSourcePath(conf)), conf); } @Override - public boolean resourceExists(JobConf jc) throws IOException { + public boolean resourceExists(Configuration jc) throws IOException { return getStore(jc).mostRecentVersion() != null; } @Override - public boolean createResource(JobConf jc) throws IOException { + public boolean createResource(Configuration jc) throws IOException { throw new UnsupportedOperationException("Not supported yet."); } @Override - public boolean deleteResource(JobConf jc) throws IOException { + public boolean deleteResource(Configuration jc) throws IOException { throw new UnsupportedOperationException("Not supported yet."); } @@ -137,13 +139,13 @@ public String getIdentifier() { } @Override - public long getModifiedTime(JobConf conf) throws IOException { + public long getModifiedTime(Configuration conf) throws IOException { VersionedStore store = getStore(conf); return (mode == TapMode.SINK) ? 0 : store.mostRecentVersion(); } @Override - public boolean commitResource(JobConf conf) throws IOException { + public boolean commitResource(Configuration conf) throws IOException { VersionedStore store = getStore(conf); if (newVersionPath != null) { @@ -161,7 +163,7 @@ public String getWrittenPath() { return writtenPath; } - private static void markSuccessfulOutputDir(Path path, JobConf conf) throws IOException { + private static void markSuccessfulOutputDir(Path path, Configuration conf) throws IOException { FileSystem fs = path.getFileSystem(conf); // create a file in the folder to mark it if (fs.exists(path)) { @@ -171,7 +173,7 @@ private static void markSuccessfulOutputDir(Path path, JobConf conf) throws IOEx } @Override - public boolean rollbackResource(JobConf conf) throws IOException { + public boolean rollbackResource(Configuration conf) throws IOException { if (newVersionPath != null) { getStore(conf).failVersion(newVersionPath); newVersionPath = null; diff --git a/scalding-commons/src/main/scala/com/twitter/scalding/commons/scheme/CombinedSequenceFileScheme.scala b/scalding-commons/src/main/scala/com/twitter/scalding/commons/scheme/CombinedSequenceFileScheme.scala deleted file mode 100644 index 6036b561d6..0000000000 --- a/scalding-commons/src/main/scala/com/twitter/scalding/commons/scheme/CombinedSequenceFileScheme.scala +++ /dev/null @@ -1,20 +0,0 @@ -package com.twitter.scalding.commons.scheme - -import cascading.scheme.Scheme -import com.twitter.elephantbird.cascading2.scheme.{CombinedSequenceFile, CombinedWritableSequenceFile} -import com.twitter.scalding.{HadoopSchemeInstance, SequenceFileScheme, WritableSequenceFileScheme} - -trait CombinedSequenceFileScheme extends SequenceFileScheme { - // TODO Cascading doesn't support local mode yet - override def hdfsScheme = HadoopSchemeInstance( - new CombinedSequenceFile(fields).asInstanceOf[Scheme[_, _, _, _, _]] - ) -} - -trait CombinedWritableSequenceFileScheme extends WritableSequenceFileScheme { - // TODO Cascading doesn't support local mode yet - override def hdfsScheme = - HadoopSchemeInstance( - new CombinedWritableSequenceFile(fields, keyType, valueType).asInstanceOf[Scheme[_, _, _, _, _]] - ) -} diff --git a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoGenericScheme.scala b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoGenericScheme.scala index 4e9764ece6..d1a7f68326 100644 --- a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoGenericScheme.scala +++ b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoGenericScheme.scala @@ -20,7 +20,7 @@ import scala.reflect.ClassTag import com.twitter.bijection._ import com.twitter.chill.Externalizer -import com.twitter.elephantbird.cascading2.scheme.LzoBinaryScheme +import com.twitter.elephantbird.cascading3.scheme.LzoBinaryScheme import com.twitter.elephantbird.mapreduce.input.combine.DelegateCombineFileInputFormat import com.twitter.elephantbird.mapreduce.io.{BinaryConverter, GenericWritable} import com.twitter.elephantbird.mapreduce.input.{BinaryConverterProvider, MultiInputFormat} @@ -101,7 +101,7 @@ object LzoGenericScheme { */ def setConverter[M]( conv: BinaryConverter[M], - conf: JobConf, + conf: Configuration, confKey: String, overrideConf: Boolean = false ): Unit = @@ -129,9 +129,9 @@ class LzoGenericScheme[M](@transient conv: BinaryConverter[M], clazz: Class[M]) new GenericWritable(conv) override def sourceConfInit( - fp: FlowProcess[JobConf], - tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]], - conf: JobConf + fp: FlowProcess[_ <: Configuration], + tap: Tap[Configuration, RecordReader[_, _], OutputCollector[_, _]], + conf: Configuration ): Unit = { LzoGenericScheme.setConverter(conv, conf, SourceConfigBinaryConverterProvider.ProviderConfKey) @@ -142,9 +142,9 @@ class LzoGenericScheme[M](@transient conv: BinaryConverter[M], clazz: Class[M]) } override def sinkConfInit( - fp: FlowProcess[JobConf], - tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]], - conf: JobConf + fp: FlowProcess[_ <: Configuration], + tap: Tap[Configuration, RecordReader[_, _], OutputCollector[_, _]], + conf: Configuration ): Unit = { LzoGenericScheme.setConverter(conv, conf, SinkConfigBinaryConverterProvider.ProviderConfKey) LzoGenericBlockOutputFormat.setClassConf(clazz, conf) diff --git a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTraits.scala b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTraits.scala index d0e0e68eb2..7dec9db729 100644 --- a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTraits.scala +++ b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTraits.scala @@ -22,7 +22,7 @@ import cascading.scheme.Scheme import org.apache.thrift.TBase import com.google.protobuf.Message import com.twitter.bijection.Injection -import com.twitter.elephantbird.cascading2.scheme._ +import com.twitter.elephantbird.cascading3.scheme._ import com.twitter.scalding._ import com.twitter.scalding.Dsl._ import com.twitter.scalding.source.{CheckedInversion, MaxFailuresCheck} diff --git a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTypedText.scala b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTypedText.scala index 7f32be0259..6b4bae9f47 100644 --- a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTypedText.scala +++ b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTypedText.scala @@ -1,7 +1,7 @@ package com.twitter.scalding.commons.source import cascading.scheme.Scheme -import com.twitter.elephantbird.cascading2.scheme.LzoTextDelimited +import com.twitter.elephantbird.cascading3.scheme.LzoTextDelimited import com.twitter.scalding._ import com.twitter.scalding.source.TypedTextDelimited import com.twitter.scalding.source.TypedSep diff --git a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/VersionedKeyValSource.scala b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/VersionedKeyValSource.scala index 85e12eb82f..a72a4e3343 100644 --- a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/VersionedKeyValSource.scala +++ b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/VersionedKeyValSource.scala @@ -33,6 +33,7 @@ import com.twitter.scalding.source.{CheckedInversion, MaxFailuresCheck} import com.twitter.scalding.typed.KeyedListLike import com.twitter.scalding.typed.TypedSink import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.conf.Configuration import scala.collection.JavaConverters._ /** @@ -113,7 +114,7 @@ class VersionedKeyValSource[K, V]( sourceVersion.foreach { version => mode match { case hadoopMode: HadoopMode => { - val store = source.getStore(new JobConf(hadoopMode.jobConf)) + val store = source.getStore(new Configuration(hadoopMode.jobConf)) if (!store.hasVersion(version)) { throw new InvalidSourceException( @@ -140,7 +141,7 @@ class VersionedKeyValSource[K, V]( buffers(this).map(!_.isEmpty).getOrElse(false) } case _ => { - val conf = new JobConf(mode.asInstanceOf[HadoopMode].jobConf) + val conf = new Configuration(mode.asInstanceOf[HadoopMode].jobConf) source.resourceExists(conf) } } @@ -155,7 +156,7 @@ class VersionedKeyValSource[K, V]( buffers(this).map(!_.isEmpty).getOrElse(false) case m: HadoopMode => - val conf = new JobConf(m.jobConf) + val conf = new Configuration(m.jobConf) val store = sink.getStore(conf) store.hasVersion(version) case _ => sys.error(s"Unknown mode $mode") diff --git a/scalding-core/src/main/java/com/twitter/scalding/tap/GlobHfs.java b/scalding-core/src/main/java/com/twitter/scalding/tap/GlobHfs.java index 8a0dd41e82..a4d0dfd53b 100644 --- a/scalding-core/src/main/java/com/twitter/scalding/tap/GlobHfs.java +++ b/scalding-core/src/main/java/com/twitter/scalding/tap/GlobHfs.java @@ -9,6 +9,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.conf.Configuration; import cascading.scheme.Scheme; @@ -17,16 +18,16 @@ * that will throw IOException where we actually can calculate size of source. */ public class GlobHfs extends ScaldingHfs { - public GlobHfs(Scheme scheme) { + public GlobHfs(Scheme scheme) { super(scheme); } - public GlobHfs(Scheme scheme, String stringPath) { + public GlobHfs(Scheme scheme, String stringPath) { super(scheme, stringPath); } @Override - public long getSize(JobConf conf) throws IOException { + public long getSize(Configuration conf) throws IOException { return getSize(getPath(), conf); } @@ -34,7 +35,7 @@ public long getSize(JobConf conf) throws IOException { * Get the total size of the file(s) specified by the Hfs, which may contain a glob * pattern in its path, so we must be ready to handle that case. */ - public static long getSize(Path path, JobConf conf) throws IOException { + public static long getSize(Path path, Configuration conf) throws IOException { FileSystem fs = path.getFileSystem(conf); FileStatus[] statuses = fs.globStatus(path); diff --git a/scalding-core/src/main/java/com/twitter/scalding/tap/ScaldingHfs.java b/scalding-core/src/main/java/com/twitter/scalding/tap/ScaldingHfs.java index 8af0eb6edf..8af6aea9cd 100644 --- a/scalding-core/src/main/java/com/twitter/scalding/tap/ScaldingHfs.java +++ b/scalding-core/src/main/java/com/twitter/scalding/tap/ScaldingHfs.java @@ -2,7 +2,7 @@ import java.io.IOException; -import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; @@ -18,40 +18,20 @@ public class ScaldingHfs extends cascading.tap.hadoop.Hfs { protected ScaldingHfs() { } - protected ScaldingHfs(Scheme scheme) { + protected ScaldingHfs(Scheme scheme) { super(scheme); } - @Deprecated - public ScaldingHfs(Fields fields, String stringPath) { - super(fields, stringPath); - } - - @Deprecated - public ScaldingHfs(Fields fields, String stringPath, boolean replace) { - super(fields, stringPath, replace); - } - - @Deprecated - public ScaldingHfs(Fields fields, String stringPath, SinkMode sinkMode) { - super(fields, stringPath, sinkMode); - } - - public ScaldingHfs(Scheme scheme, String stringPath) { + public ScaldingHfs(Scheme scheme, String stringPath) { super(scheme, stringPath); } - @Deprecated - public ScaldingHfs(Scheme scheme, String stringPath, boolean replace) { - super(scheme, stringPath, replace); - } - - public ScaldingHfs(Scheme scheme, String stringPath, SinkMode sinkMode) { + public ScaldingHfs(Scheme scheme, String stringPath, SinkMode sinkMode) { super(scheme, stringPath, sinkMode); } @Override - public TupleEntryIterator openForRead(FlowProcess flowProcess, RecordReader input) throws IOException { - return new HadoopTupleEntrySchemeIterator(flowProcess, this, input); + public TupleEntryIterator openForRead(FlowProcess flowProcess, RecordReader input) throws IOException { + return new HadoopTupleEntrySchemeIterator((FlowProcess) flowProcess, this, input); } } diff --git a/scalding-core/src/main/java/com/twitter/scalding/tuple/HadoopTupleEntrySchemeIterator.java b/scalding-core/src/main/java/com/twitter/scalding/tuple/HadoopTupleEntrySchemeIterator.java index 236b8f2598..c9a3559af4 100644 --- a/scalding-core/src/main/java/com/twitter/scalding/tuple/HadoopTupleEntrySchemeIterator.java +++ b/scalding-core/src/main/java/com/twitter/scalding/tuple/HadoopTupleEntrySchemeIterator.java @@ -33,25 +33,26 @@ import cascading.util.CloseableIterator; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.conf.Configuration; /** * */ -public class HadoopTupleEntrySchemeIterator extends TupleEntrySchemeIterator +public class HadoopTupleEntrySchemeIterator extends TupleEntrySchemeIterator { private MeasuredRecordReader measuredRecordReader; - public HadoopTupleEntrySchemeIterator( FlowProcess flowProcess, Tap parentTap, RecordReader recordReader ) throws IOException + public HadoopTupleEntrySchemeIterator( FlowProcess flowProcess, Tap parentTap, RecordReader recordReader ) throws IOException { this( flowProcess, parentTap.getScheme(), makeIterator( flowProcess, parentTap, recordReader ) ); } - public HadoopTupleEntrySchemeIterator( FlowProcess flowProcess, Scheme scheme, CloseableIterator closeableIterator ) + public HadoopTupleEntrySchemeIterator( FlowProcess flowProcess, Scheme scheme, CloseableIterator closeableIterator ) { super( flowProcess, scheme, closeableIterator, flowProcess.getStringProperty( MultiInputSplit.CASCADING_SOURCE_PATH ) ); } - private static CloseableIterator makeIterator( FlowProcess flowProcess, Tap parentTap, RecordReader recordReader ) throws IOException + private static CloseableIterator makeIterator( FlowProcess flowProcess, Tap parentTap, RecordReader recordReader ) throws IOException { if( recordReader != null ) return new RecordReaderIterator( recordReader ); diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Config.scala b/scalding-core/src/main/scala/com/twitter/scalding/Config.scala index 540a02c9d2..1d94e14218 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Config.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Config.scala @@ -24,7 +24,7 @@ import com.twitter.chill.config.{ConfiguredInstantiator, ScalaMapConfig} import com.twitter.bijection.{Base64String, Injection} import com.twitter.scalding.filecache.{CachedFile, DistributedCacheFile, HadoopCachedFile} -import cascading.pipe.assembly.AggregateBy +import cascading.pipe.assembly.AggregateByProps import cascading.flow.{FlowListener, FlowProps, FlowStepListener, FlowStepStrategy} import cascading.property.AppProps import cascading.tuple.collect.SpillableProps @@ -136,10 +136,10 @@ abstract class Config extends Serializable { * the best results */ def setMapSideAggregationThreshold(count: Int): Config = - this + (AggregateBy.AGGREGATE_BY_THRESHOLD -> count.toString) + this + (AggregateByProps.AGGREGATE_BY_CAPACITY -> count.toString) def getMapSideAggregationThreshold: Option[Int] = - get(AggregateBy.AGGREGATE_BY_THRESHOLD).map(_.toInt) + get(AggregateByProps.AGGREGATE_BY_CAPACITY).map(_.toInt) @deprecated("Use setRequireOrderedSerializationMode", "12/14/17") def setRequireOrderedSerialization(b: Boolean): Config = diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala b/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala index 7a5b7468ea..429b3b7ae4 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala @@ -1150,7 +1150,7 @@ object ExecutionCounters { /** * Just gets the counters from the CascadingStats and ignores all the other fields present */ - def fromCascading(cs: cascading.stats.CascadingStats): ExecutionCounters = new ExecutionCounters { + def fromCascading(cs: cascading.stats.CascadingStats[_]): ExecutionCounters = new ExecutionCounters { import scala.collection.JavaConverters._ val keys = (for { diff --git a/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala b/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala index 5e379f59a4..f96aefca25 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala @@ -199,7 +199,7 @@ object ExecutionContext { private val LOG: Logger = LoggerFactory.getLogger(ExecutionContext.getClass) private[scalding] def getDesc[T](baseFlowStep: BaseFlowStep[T]): Seq[String] = - baseFlowStep.getGraph.vertexSet.asScala.flatMap { + baseFlowStep.getElementGraph.vertexSet.asScala.flatMap { case pipe: Pipe => RichPipe.getPipeDescriptions(pipe) case _ => List() // no descriptions }(collection.breakOut) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala b/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala index 305e5cae86..5ebe493a36 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala @@ -47,7 +47,7 @@ abstract class SchemedSource extends Source { throw ModeException("Cascading local mode not supported for: " + toString) /** The scheme to use if the source is on hdfs. */ - def hdfsScheme: Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _] = + def hdfsScheme: Scheme[Configuration, RecordReader[_, _], OutputCollector[_, _], _, _] = throw ModeException("Cascading Hadoop mode not supported for: " + toString) // The mode to use for output taps determining how conflicts with existing output are handled. @@ -56,7 +56,7 @@ abstract class SchemedSource extends Source { trait HfsTapProvider { def createHfsTap( - scheme: Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _], + scheme: Scheme[Configuration, RecordReader[_, _], OutputCollector[_, _], _, _], path: String, sinkMode: SinkMode ): Hfs = @@ -65,8 +65,8 @@ trait HfsTapProvider { private[scalding] object CastFileTap { // The scala compiler has problems with the generics in Cascading - def apply(tap: FileTap): Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]] = - tap.asInstanceOf[Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]]] + def apply(tap: FileTap): Tap[Configuration, RecordReader[_, _], OutputCollector[_, _]] = + tap.asInstanceOf[Tap[Configuration, RecordReader[_, _], OutputCollector[_, _]]] } /** @@ -88,7 +88,7 @@ trait LocalSourceOverride extends SchemedSource { * @return * A tap. */ - def createLocalTap(sinkMode: SinkMode): Tap[JobConf, _, _] = { + def createLocalTap(sinkMode: SinkMode): Tap[Configuration, _, _] = { val taps = localPaths.map { p: String => CastFileTap(new FileTap(localScheme, p, sinkMode)) }.toList @@ -344,8 +344,8 @@ abstract class FileSource extends SchemedSource with LocalSourceOverride with Hf case Hdfs(false, conf) => hdfsPaths.filter(pathIsGood(_, conf)) } - protected def createHdfsReadTap(hdfsMode: Hdfs): Tap[JobConf, _, _] = { - val taps: List[Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]]] = + protected def createHdfsReadTap(hdfsMode: Hdfs): Tap[Configuration, _, _] = { + val taps: List[Tap[Configuration, RecordReader[_, _], OutputCollector[_, _]]] = goodHdfsPaths(hdfsMode).toList.map(path => CastHfsTap(createHfsTap(hdfsScheme, path, sinkMode))) taps.size match { case 0 => { @@ -361,10 +361,10 @@ abstract class FileSource extends SchemedSource with LocalSourceOverride with Hf } } -class ScaldingMultiSourceTap(taps: Seq[Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]]]) +class ScaldingMultiSourceTap(taps: Seq[Tap[Configuration, RecordReader[_, _], OutputCollector[_, _]]]) extends MultiSourceTap[ - Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]], - JobConf, + Tap[Configuration, RecordReader[_, _], OutputCollector[_, _]], + Configuration, RecordReader[_, _] ](taps: _*) { private final val randomId = UUID.randomUUID.toString @@ -455,10 +455,10 @@ trait SuccessFileSource extends FileSource { * Hadoop tap outside of Hadoop in the Cascading local mode */ trait LocalTapSource extends LocalSourceOverride { - override def createLocalTap(sinkMode: SinkMode): Tap[JobConf, _, _] = { + override def createLocalTap(sinkMode: SinkMode): Tap[Configuration, _, _] = { val taps = localPaths.map { p => new LocalTap(p, hdfsScheme, sinkMode) - .asInstanceOf[Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]]] + .asInstanceOf[Tap[Configuration, RecordReader[_, _], OutputCollector[_, _]]] }.toSeq taps match { diff --git a/scalding-core/src/main/scala/com/twitter/scalding/HfsConfPropertySetter.scala b/scalding-core/src/main/scala/com/twitter/scalding/HfsConfPropertySetter.scala index 98882187b6..7262f66b89 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/HfsConfPropertySetter.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/HfsConfPropertySetter.scala @@ -17,6 +17,7 @@ package com.twitter.scalding import cascading.tap.SinkMode import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.conf.Configuration import cascading.flow.FlowProcess import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.OutputCollector @@ -27,18 +28,18 @@ import com.twitter.scalding.tap.ScaldingHfs private[scalding] class ConfPropertiesHfsTap( sourceConfig: Config, sinkConfig: Config, - scheme: Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _], + scheme: Scheme[Configuration, RecordReader[_, _], OutputCollector[_, _], _, _], stringPath: String, sinkMode: SinkMode ) extends ScaldingHfs(scheme, stringPath, sinkMode) { - override def sourceConfInit(process: FlowProcess[JobConf], conf: JobConf): Unit = { + override def sourceConfInit(process: FlowProcess[_ <: Configuration], conf: Configuration): Unit = { sourceConfig.toMap.foreach { case (k, v) => conf.set(k, v) } super.sourceConfInit(process, conf) } - override def sinkConfInit(process: FlowProcess[JobConf], conf: JobConf): Unit = { + override def sinkConfInit(process: FlowProcess[_ <: Configuration], conf: Configuration): Unit = { sinkConfig.toMap.foreach { case (k, v) => conf.set(k, v) } @@ -64,7 +65,7 @@ trait HfsConfPropertySetter extends HfsTapProvider { def sinkConfig: Config = Config.empty override def createHfsTap( - scheme: Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _], + scheme: Scheme[Configuration, RecordReader[_, _], OutputCollector[_, _], _, _], path: String, sinkMode: SinkMode ): Hfs = { diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Job.scala b/scalding-core/src/main/scala/com/twitter/scalding/Job.scala index 4d013144c6..772c19b35b 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Job.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Job.scala @@ -322,7 +322,7 @@ class Job(val args: Args) extends FieldConversions with java.io.Serializable { def clear(): Unit = FlowStateMap.clear(flowDef) - protected def handleStats(statsData: CascadingStats): Unit = { + protected def handleStats(statsData: CascadingStats[_]): Unit = { scaldingCascadingStats = Some(statsData) // TODO: Why the two ways to do stats? Answer: jank-den. if (args.boolean("scalding.flowstats")) { @@ -347,7 +347,7 @@ class Job(val args: Args) extends FieldConversions with java.io.Serializable { // This awful name is designed to avoid collision // with subclasses @transient - private[scalding] var scaldingCascadingStats: Option[CascadingStats] = None + private[scalding] var scaldingCascadingStats: Option[CascadingStats[_]] = None /** * Save the Flow object after a run to allow clients to inspect the job. diff --git a/scalding-core/src/main/scala/com/twitter/scalding/JobStats.scala b/scalding-core/src/main/scala/com/twitter/scalding/JobStats.scala index 78b6474965..8b69acfcde 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/JobStats.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/JobStats.scala @@ -22,7 +22,7 @@ import scala.util.{Failure, Try} object JobStats { def empty: JobStats = new JobStats(Map("counters" -> Map.empty)) - def apply(stats: CascadingStats): JobStats = { + def apply(stats: CascadingStats[_]): JobStats = { val m: Map[String, Any] = statsMap(stats) new JobStats(stats match { case cs: CascadeStats => m @@ -30,7 +30,7 @@ object JobStats { }) } - private def counterMap(stats: CascadingStats): Map[String, Map[String, Long]] = + private def counterMap(stats: CascadingStats[_]): Map[String, Map[String, Long]] = stats.getCounterGroups.asScala.map { group => ( group, @@ -44,7 +44,7 @@ object JobStats { ) }.toMap - private def statsMap(stats: CascadingStats): Map[String, Any] = + private def statsMap(stats: CascadingStats[_]): Map[String, Any] = Map( "counters" -> counterMap(stats), "duration" -> stats.getDuration, diff --git a/scalding-core/src/main/scala/com/twitter/scalding/JobTest.scala b/scalding-core/src/main/scala/com/twitter/scalding/JobTest.scala index ef2535e368..688a77ff2f 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/JobTest.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/JobTest.scala @@ -70,7 +70,7 @@ object CascadeTest { class JobTest(cons: (Args) => Job) { private var argsMap = Map[String, List[String]]() private val callbacks = mutable.Buffer[() => Unit]() - private val statsCallbacks = mutable.Buffer[(CascadingStats) => Unit]() + private val statsCallbacks = mutable.Buffer[(CascadingStats[_]) => Unit]() // TODO: Switch the following maps and sets from Source to String keys // to guard for scala equality bugs private var sourceMap: (Source) => Option[mutable.Buffer[Tuple]] = { _ => None } @@ -139,13 +139,13 @@ class JobTest(cons: (Args) => Job) { // If this test is checking for multiple jobs chained by next, this only checks // for the counters in the final job's FlowStat. def counter(counter: String, group: String = Stats.ScaldingGroup)(op: Long => Unit) = { - statsCallbacks += ((stats: CascadingStats) => op(Stats.getCounterValue(counter, group)(stats))) + statsCallbacks += ((stats: CascadingStats[_]) => op(Stats.getCounterValue(counter, group)(stats))) this } // Used to check an assertion on all custom counters of a given scalding job. def counters(op: Map[String, Long] => Unit) = { - statsCallbacks += ((stats: CascadingStats) => op(Stats.getAllCustomCounters()(stats))) + statsCallbacks += ((stats: CascadingStats[_]) => op(Stats.getAllCustomCounters()(stats))) this } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/MemoryTap.scala b/scalding-core/src/main/scala/com/twitter/scalding/MemoryTap.scala index c722a6c0ad..269e6396cc 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/MemoryTap.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/MemoryTap.scala @@ -43,10 +43,10 @@ class MemoryTap[In, Out](val scheme: Scheme[Properties, In, Out, _, _], val tupl override def getModifiedTime(conf: Properties) = if (resourceExists(conf)) modifiedTime else 0L override lazy val getIdentifier: String = scala.math.random.toString - override def openForRead(flowProcess: FlowProcess[Properties], input: In) = + override def openForRead(flowProcess: FlowProcess[_ <: Properties], input: In) = new TupleEntryChainIterator(scheme.getSourceFields, tupleBuffer.toIterator.asJava) - override def openForWrite(flowProcess: FlowProcess[Properties], output: Out): TupleEntryCollector = { + override def openForWrite(flowProcess: FlowProcess[_ <: Properties], output: Out): TupleEntryCollector = { tupleBuffer.clear() new MemoryTupleEntryCollector(tupleBuffer, this) } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala b/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala index 5c3f94701c..4a8ad1e77b 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala @@ -18,7 +18,7 @@ package com.twitter.scalding { import cascading.operation._ import cascading.tuple._ import cascading.flow._ - import cascading.pipe.assembly.AggregateBy + import cascading.pipe.assembly.{AggregateBy, AggregateByProps} import com.twitter.chill.MeatLocker import scala.collection.JavaConverters._ @@ -327,7 +327,7 @@ package com.twitter.scalding { object MapsideCache { val DEFAULT_CACHE_SIZE = 100000 - val SIZE_CONFIG_KEY = AggregateBy.AGGREGATE_BY_THRESHOLD + val SIZE_CONFIG_KEY = AggregateByProps.AGGREGATE_BY_CAPACITY val ADAPTIVE_CACHE_KEY = "scalding.mapsidecache.adaptive" private def getCacheSize(fp: FlowProcess[_]): Int = diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Source.scala b/scalding-core/src/main/scala/com/twitter/scalding/Source.scala index 1c2aee5a33..da4532d773 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Source.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Source.scala @@ -28,12 +28,14 @@ import cascading.tuple.{Fields, Tuple => CTuple, TupleEntry, TupleEntryCollector import cascading.pipe.Pipe +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.InputFormat import org.apache.hadoop.mapred.InputSplit import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.OutputCollector import org.apache.hadoop.mapred.RecordReader + import scala.collection.JavaConverters._ /** @@ -52,7 +54,7 @@ class InvalidSourceException(message: String, cause: Throwable) extends RuntimeE * * hdfsPaths represents user-supplied list that was detected as not containing any valid paths. */ -class InvalidSourceTap(val e: Throwable) extends SourceTap[JobConf, RecordReader[_, _]] { +class InvalidSourceTap(val e: Throwable) extends SourceTap[Configuration, RecordReader[_, _]] { def this(hdfsPaths: Iterable[String]) = this(new InvalidSourceException(s"No good paths in $hdfsPaths")) @@ -63,12 +65,12 @@ class InvalidSourceTap(val e: Throwable) extends SourceTap[JobConf, RecordReader override def hashCode: Int = randomId.hashCode - override def getModifiedTime(conf: JobConf): Long = 0L + override def getModifiedTime(conf: Configuration): Long = 0L - override def openForRead(flow: FlowProcess[JobConf], input: RecordReader[_, _]): TupleEntryIterator = + override def openForRead(flow: FlowProcess[_ <: Configuration], input: RecordReader[_, _]): TupleEntryIterator = throw new InvalidSourceException("Encountered InvalidSourceTap!", e) - override def resourceExists(conf: JobConf): Boolean = false + override def resourceExists(conf: Configuration): Boolean = false override def getScheme = new NullScheme() @@ -81,8 +83,8 @@ class InvalidSourceTap(val e: Throwable) extends SourceTap[JobConf, RecordReader // 4. source.validateTaps (throws InvalidSourceException) // In the worst case if the flow plan is misconfigured, // openForRead on mappers should fail when using this tap. - override def sourceConfInit(flow: FlowProcess[JobConf], conf: JobConf): Unit = { - conf.setInputFormat(classOf[InvalidInputFormat]) + override def sourceConfInit(flow: FlowProcess[_ <: Configuration], conf: Configuration): Unit = { + (conf.asInstanceOf[JobConf]).setInputFormat(classOf[InvalidInputFormat]) super.sourceConfInit(flow, conf) } } @@ -114,13 +116,13 @@ case object Write extends AccessMode object HadoopSchemeInstance { def apply(scheme: Scheme[_, _, _, _, _]) = - scheme.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + scheme.asInstanceOf[Scheme[Configuration, RecordReader[_, _], OutputCollector[_, _], _, _]] } object CastHfsTap { // The scala compiler has problems with the generics in Cascading - def apply(tap: Hfs): Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]] = - tap.asInstanceOf[Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]]] + def apply(tap: Hfs): Tap[Configuration, RecordReader[_, _], OutputCollector[_, _]] = + tap.asInstanceOf[Tap[Configuration, RecordReader[_, _], OutputCollector[_, _]]] } /** @@ -294,7 +296,7 @@ class NullTap[Config, Input, Output, SourceContext, SinkContext] ) { def getIdentifier = "nullTap" - def openForWrite(flowProcess: FlowProcess[Config], output: Output) = + def openForWrite(flowProcess: FlowProcess[_ <: Config], output: Output) = new TupleEntryCollector { override def add(te: TupleEntry): Unit = () override def add(t: CTuple): Unit = () @@ -313,7 +315,7 @@ trait BaseNullSource extends Source { case Read => throw new Exception("not supported, reading from null") case Write => mode match { - case Hdfs(_, _) => new NullTap[JobConf, RecordReader[_, _], OutputCollector[_, _], Any, Any] + case Hdfs(_, _) => new NullTap[Configuration, RecordReader[_, _], OutputCollector[_, _], Any, Any] case Local(_) => new NullTap[Properties, InputStream, OutputStream, Any, Any] case Test(_) => new NullTap[Properties, InputStream, OutputStream, Any, Any] } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala b/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala index 9ac3087d09..74153946ab 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala @@ -108,11 +108,11 @@ object Stats { // When getting a counter value, cascadeStats takes precedence (if set) and // flowStats is used after that. Returns None if neither is defined. - def getCounterValue(key: StatKey)(implicit cascadingStats: CascadingStats): Long = + def getCounterValue(key: StatKey)(implicit cascadingStats: CascadingStats[_]): Long = cascadingStats.getCounterValue(key.group, key.counter) // Returns a map of all custom counter names and their counts. - def getAllCustomCounters()(implicit cascadingStats: CascadingStats): Map[String, Long] = + def getAllCustomCounters()(implicit cascadingStats: CascadingStats[_]): Map[String, Long] = cascadingStats .getCountersFor(ScaldingGroup) .asScala diff --git a/scalding-core/src/main/scala/com/twitter/scalding/TemplateSource.scala b/scalding-core/src/main/scala/com/twitter/scalding/TemplateSource.scala deleted file mode 100644 index 75075ada1f..0000000000 --- a/scalding-core/src/main/scala/com/twitter/scalding/TemplateSource.scala +++ /dev/null @@ -1,134 +0,0 @@ -/* -Copyright 2013 Inkling, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - */ -package com.twitter.scalding - -import cascading.tap.hadoop.{TemplateTap => HTemplateTap} -import cascading.tap.local.FileTap -import cascading.tap.local.{TemplateTap => LTemplateTap} -import cascading.tap.SinkMode -import cascading.tap.Tap -import cascading.tuple.Fields - -/** - * This is a base class for template based output sources - */ -abstract class TemplateSource extends SchemedSource with HfsTapProvider { - - // The root path of the templated output. - def basePath: String - // The template as a java Formatter string. e.g. %s/%s for a two part template. - def template: String - // The fields to apply to the template. - def pathFields: Fields = Fields.ALL - - /** - * Creates the template tap. - * - * @param readOrWrite - * Describes if this source is being read from or written to. - * @param mode - * The mode of the job. (implicit) - * - * @return - * A cascading TemplateTap. - */ - override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = - readOrWrite match { - case Read => throw new InvalidSourceException("Cannot use TemplateSource for input") - case Write => { - mode match { - case Local(_) => { - val localTap = new FileTap(localScheme, basePath, sinkMode) - new LTemplateTap(localTap, template, pathFields) - } - case hdfsMode @ Hdfs(_, _) => { - val hfsTap = createHfsTap(hdfsScheme, basePath, sinkMode) - new HTemplateTap(hfsTap, template, pathFields) - } - case hdfsTest @ HadoopTest(_, _) => { - val hfsTap = createHfsTap(hdfsScheme, hdfsTest.getWritePathFor(this), sinkMode) - new HTemplateTap(hfsTap, template, pathFields) - } - case _ => TestTapFactory(this, hdfsScheme).createTap(readOrWrite) - } - } - } - - /** - * Validates the taps, makes sure there are no nulls as the path or template. - * - * @param mode - * The mode of the job. - */ - override def validateTaps(mode: Mode): Unit = - if (basePath == null) { - throw new InvalidSourceException("basePath cannot be null for TemplateTap") - } else if (template == null) { - throw new InvalidSourceException("template cannot be null for TemplateTap") - } -} - -/** - * An implementation of TSV output, split over a template tap. - * - * @param basePath - * The root path for the output. - * @param template - * The java formatter style string to use as the template. e.g. %s/%s. - * @param pathFields - * The set of fields to apply to the path. - * @param writeHeader - * Flag to indicate that the header should be written to the file. - * @param sinkMode - * How to handle conflicts with existing output. - * @param fields - * The set of fields to apply to the output. - */ -case class TemplatedTsv( - override val basePath: String, - override val template: String, - override val pathFields: Fields = Fields.ALL, - override val writeHeader: Boolean = false, - override val sinkMode: SinkMode = SinkMode.REPLACE, - override val fields: Fields = Fields.ALL -) extends TemplateSource - with DelimitedScheme - -/** - * An implementation of SequenceFile output, split over a template tap. - * - * @param basePath - * The root path for the output. - * @param template - * The java formatter style string to use as the template. e.g. %s/%s. - * @param sequenceFields - * The set of fields to use for the sequence file. - * @param pathFields - * The set of fields to apply to the path. - * @param sinkMode - * How to handle conflicts with existing output. - */ -case class TemplatedSequenceFile( - override val basePath: String, - override val template: String, - val sequenceFields: Fields = Fields.ALL, - override val pathFields: Fields = Fields.ALL, - override val sinkMode: SinkMode = SinkMode.REPLACE -) extends TemplateSource - with SequenceFileScheme { - - override val fields = sequenceFields -} diff --git a/scalding-core/src/main/scala/com/twitter/scalding/TestTapFactory.scala b/scalding-core/src/main/scala/com/twitter/scalding/TestTapFactory.scala index 5266a6d7ef..b130fa5e43 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/TestTapFactory.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/TestTapFactory.scala @@ -26,6 +26,7 @@ import java.io.{InputStream, OutputStream, Serializable} import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.OutputCollector import org.apache.hadoop.mapred.RecordReader +import org.apache.hadoop.conf.Configuration import scala.collection.JavaConverters._ /** @@ -47,11 +48,11 @@ object TestTapFactory extends Serializable { } def apply[A, B]( src: Source, - scheme: Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], A, B] + scheme: Scheme[Configuration, RecordReader[_, _], OutputCollector[_, _], A, B] ): TestTapFactory = apply(src, scheme, SinkMode.REPLACE) def apply[A, B]( src: Source, - scheme: Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], A, B], + scheme: Scheme[Configuration, RecordReader[_, _], OutputCollector[_, _], A, B], sinkMode: SinkMode ): TestTapFactory = new TestTapFactory(src, sinkMode) { override def hdfsScheme = Some(scheme) } @@ -64,7 +65,7 @@ class TestTapFactory(src: Source, sinkMode: SinkMode) extends Serializable { def sinkFields: Fields = hdfsScheme.map(_.getSinkFields).getOrElse(sys.error("No sinkFields defined")) - def hdfsScheme: Option[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] = None + def hdfsScheme: Option[Scheme[Configuration, RecordReader[_, _], OutputCollector[_, _], _, _]] = None @SuppressWarnings(Array("org.wartremover.warts.OptionPartial")) def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = @@ -100,7 +101,7 @@ class TestTapFactory(src: Source, sinkMode: SinkMode) extends Serializable { if (bufOpt.isDefined) { val buffer = bufOpt.get val fields = sourceFields - (new MemorySourceTap(buffer.toList.asJava, fields)).asInstanceOf[Tap[JobConf, _, _]] + (new MemorySourceTap(buffer.toList.asJava, fields)).asInstanceOf[Tap[Configuration, _, _]] } else { CastHfsTap(new ScaldingHfs(hdfsScheme.get, hdfsTest.getWritePathFor(src), sinkMode)) } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/estimation/Common.scala b/scalding-core/src/main/scala/com/twitter/scalding/estimation/Common.scala index 6b2a616a33..3dc5b9499a 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/estimation/Common.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/estimation/Common.scala @@ -19,7 +19,7 @@ object Common { } def unrollTaps(step: FlowStep[JobConf]): Seq[Tap[_, _, _]] = - unrollTaps(step.getSources.asScala.toSeq) + unrollTaps(step.getFlow.getSourcesCollection.asScala.toSeq) def inputSizes(step: FlowStep[JobConf]): Seq[(String, Long)] = { val conf = step.getConfig diff --git a/scalding-core/src/main/scala/com/twitter/scalding/serialization/CascadingBinaryComparator.scala b/scalding-core/src/main/scala/com/twitter/scalding/serialization/CascadingBinaryComparator.scala index 791bdbac73..837e5092ec 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/serialization/CascadingBinaryComparator.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/serialization/CascadingBinaryComparator.scala @@ -92,7 +92,7 @@ object CascadingBinaryComparator { def getDescriptionsForMissingOrdSer[U](bfs: BaseFlowStep[U]): Option[String] = // does this job have any Splices without OrderedSerialization: if ( - bfs.getGraph.vertexSet.asScala.exists { + bfs.getElementGraph.vertexSet.asScala.exists { case gb: GroupBy => check(gb).isFailure case cg: CoGroup => check(cg).isFailure case _ => false // only do sorting in groupBy/cogroupBy diff --git a/scalding-estimators-test/src/test/scala/com/twitter/scalding/estimation/memory/MemoryEstimatorTest.scala b/scalding-estimators-test/src/test/scala/com/twitter/scalding/estimation/memory/MemoryEstimatorTest.scala index f805956d2c..bb68a9d6d7 100644 --- a/scalding-estimators-test/src/test/scala/com/twitter/scalding/estimation/memory/MemoryEstimatorTest.scala +++ b/scalding-estimators-test/src/test/scala/com/twitter/scalding/estimation/memory/MemoryEstimatorTest.scala @@ -143,9 +143,9 @@ class CustomHistoryService(val history: JobConf => Seq[(String, Long)]) extends import Utils._ override def fetchHistory(info: FlowStrategyInfo, maxHistory: Int): Try[Seq[FlowStepHistory]] = - if (info.step.getStepNum == 1) { + if (info.step.getOrdinal == 1) { makeHistory(info.step.getConfig, history) - } else if (info.step.getStepNum == 2) { + } else if (info.step.getOrdinal == 2) { Success(Nil) } else { makeHistory(info.step.getConfig, _ => Seq("MAP" -> 512.megabyte, "REDUCE" -> 512.megabyte)) diff --git a/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/LocalCluster.scala b/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/LocalCluster.scala index 32c820455d..924822803b 100644 --- a/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/LocalCluster.scala +++ b/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/LocalCluster.scala @@ -136,8 +136,6 @@ class LocalCluster(mutex: Boolean = true) { classOf[com.twitter.chill.algebird.AveragedValueSerializer], classOf[com.twitter.algebird.Semigroup[_]], classOf[com.twitter.chill.KryoInstantiator], - classOf[org.jgrapht.ext.EdgeNameProvider[_]], - classOf[org.apache.commons.lang.StringUtils], classOf[cascading.scheme.local.TextDelimited], classOf[org.apache.commons.logging.LogFactory], classOf[org.apache.commons.codec.binary.Base64], @@ -146,7 +144,6 @@ class LocalCluster(mutex: Boolean = true) { classOf[com.esotericsoftware.kryo.KryoSerializable], classOf[com.twitter.chill.hadoop.KryoSerialization], classOf[com.twitter.maple.tap.TupleMemoryInputFormat], - classOf[org.apache.commons.configuration.Configuration] ).foreach(addClassSourceToClassPath(_)) this } diff --git a/scalding-hraven/src/main/scala/com/twitter/scalding/hraven/estimation/HRavenHistoryService.scala b/scalding-hraven/src/main/scala/com/twitter/scalding/hraven/estimation/HRavenHistoryService.scala index 534df99b02..2f80ddb491 100644 --- a/scalding-hraven/src/main/scala/com/twitter/scalding/hraven/estimation/HRavenHistoryService.scala +++ b/scalding-hraven/src/main/scala/com/twitter/scalding/hraven/estimation/HRavenHistoryService.scala @@ -158,7 +158,7 @@ trait HRavenHistoryService extends HistoryService { */ def fetchPastJobDetails(step: FlowStep[JobConf], max: Int): Try[Seq[JobDetails]] = { val conf = step.getConfig - val stepNum = step.getStepNum + val stepNum = step.getOrdinal def findMatchingJobStep(pastFlow: Flow) = pastFlow.getJobs.asScala diff --git a/scalding-hraven/src/test/scala/com/twitter/scalding/hraven/estimation/HRavenHistoryServiceTest.scala b/scalding-hraven/src/test/scala/com/twitter/scalding/hraven/estimation/HRavenHistoryServiceTest.scala index 23bdf9ca07..b52f441ac6 100644 --- a/scalding-hraven/src/test/scala/com/twitter/scalding/hraven/estimation/HRavenHistoryServiceTest.scala +++ b/scalding-hraven/src/test/scala/com/twitter/scalding/hraven/estimation/HRavenHistoryServiceTest.scala @@ -79,7 +79,7 @@ object TestFlowStrategyInfo { val mockedInfo = mock(classOf[FlowStrategyInfo]) when(mockedStep.getConfig).thenReturn(mockedConf) - when(mockedStep.getStepNum).thenReturn(stepNum) + when(mockedStep.getOrdinal).thenReturn(stepNum) when(mockedInfo.step).thenReturn(mockedStep) mockedInfo diff --git a/scalding-json/src/main/scala/com/twitter/scalding/TypedJson.scala b/scalding-json/src/main/scala/com/twitter/scalding/TypedJson.scala index 546cc0e527..5365ad753c 100644 --- a/scalding-json/src/main/scala/com/twitter/scalding/TypedJson.scala +++ b/scalding-json/src/main/scala/com/twitter/scalding/TypedJson.scala @@ -2,7 +2,7 @@ package com.twitter.scalding import com.twitter.bijection.{AbstractInjection, Injection} import com.twitter.bijection.Inversion._ -import com.twitter.elephantbird.cascading2.scheme.LzoTextLine +import com.twitter.elephantbird.cascading3.scheme.LzoTextLine import org.json4s._ import org.json4s.native.Serialization._ diff --git a/scalding-parquet-scrooge/src/main/java/com/twitter/scalding/parquet/scrooge/ParquetScroogeScheme.java b/scalding-parquet-scrooge/src/main/java/com/twitter/scalding/parquet/scrooge/ParquetScroogeScheme.java index 3b41696ae6..ae2a2533c6 100644 --- a/scalding-parquet-scrooge/src/main/java/com/twitter/scalding/parquet/scrooge/ParquetScroogeScheme.java +++ b/scalding-parquet-scrooge/src/main/java/com/twitter/scalding/parquet/scrooge/ParquetScroogeScheme.java @@ -21,6 +21,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.conf.Configuration; import com.twitter.scalding.parquet.ParquetValueScheme; import com.twitter.scalding.parquet.ScaldingDeprecatedParquetInputFormat; @@ -51,19 +52,19 @@ public ParquetScroogeScheme(ParquetValueScheme.Config config) { } @Override - public void sinkConfInit(FlowProcess fp, - Tap tap, JobConf jobConf) { - DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf); - ParquetOutputFormat.setWriteSupportClass(jobConf, ScroogeWriteSupport.class); - ScroogeWriteSupport.setScroogeClass(jobConf, this.config.getKlass()); + public void sinkConfInit(FlowProcess fp, + Tap tap, Configuration jobConf) { + DeprecatedParquetOutputFormat.setAsOutputFormat((JobConf) jobConf); + ParquetOutputFormat.setWriteSupportClass((JobConf) jobConf, ScroogeWriteSupport.class); + ScroogeWriteSupport.setScroogeClass((JobConf) jobConf, this.config.getKlass()); } @Override - public void sourceConfInit(FlowProcess fp, - Tap tap, JobConf jobConf) { + public void sourceConfInit(FlowProcess fp, + Tap tap, Configuration jobConf) { super.sourceConfInit(fp, tap, jobConf); - jobConf.setInputFormat(ScaldingDeprecatedParquetInputFormat.class); - ParquetInputFormat.setReadSupportClass(jobConf, ScroogeReadSupport.class); - ThriftReadSupport.setRecordConverterClass(jobConf, ScroogeRecordConverter.class); + ((JobConf) jobConf).setInputFormat(ScaldingDeprecatedParquetInputFormat.class); + ParquetInputFormat.setReadSupportClass((JobConf) jobConf, ScroogeReadSupport.class); + ThriftReadSupport.setRecordConverterClass((JobConf) jobConf, ScroogeRecordConverter.class); } } diff --git a/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/Parquet346ScroogeScheme.scala b/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/Parquet346ScroogeScheme.scala index 925482a47b..2f1ba9d856 100644 --- a/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/Parquet346ScroogeScheme.scala +++ b/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/Parquet346ScroogeScheme.scala @@ -6,6 +6,7 @@ import com.twitter.scalding.parquet.ParquetValueScheme import com.twitter.scalding.parquet.thrift.Parquet346StructTypeRepairer import com.twitter.scrooge.{ThriftStruct, ThriftStructCodec} import org.apache.hadoop.mapred.{JobConf, OutputCollector, RecordReader} +import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.thrift.ThriftReadSupport import org.apache.parquet.schema.MessageType import org.apache.parquet.thrift.struct.ThriftType.StructType @@ -31,9 +32,9 @@ class Parquet346ScroogeScheme[T <: ThriftStruct](config: ParquetValueScheme.Conf extends ParquetScroogeScheme[T](config) { override def sourceConfInit( - fp: FlowProcess[JobConf], - tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]], - jobConf: JobConf + fp: FlowProcess[_ <: Configuration], + tap: Tap[Configuration, RecordReader[_, _], OutputCollector[_, _]], + jobConf: Configuration ): Unit = { super.sourceConfInit(fp, tap, jobConf) diff --git a/scalding-parquet/src/main/java/com/twitter/scalding/parquet/ParquetValueScheme.java b/scalding-parquet/src/main/java/com/twitter/scalding/parquet/ParquetValueScheme.java index 2d71c44896..c786aae41c 100644 --- a/scalding-parquet/src/main/java/com/twitter/scalding/parquet/ParquetValueScheme.java +++ b/scalding-parquet/src/main/java/com/twitter/scalding/parquet/ParquetValueScheme.java @@ -14,6 +14,7 @@ import cascading.tap.Tap; import cascading.tuple.Tuple; import cascading.tuple.TupleEntry; +import org.apache.hadoop.conf.Configuration; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.parquet.hadoop.mapred.Container; @@ -29,7 +30,7 @@ * This is an abstract class; implementations are expected to set up their Input/Output Formats * correctly in the respective Init methods. */ -public abstract class ParquetValueScheme extends Scheme{ +public abstract class ParquetValueScheme extends Scheme{ public static final class Config implements Serializable { private final FilterPredicate filterPredicate; @@ -102,32 +103,32 @@ public ParquetValueScheme(Config config) { } @Deprecated - private void setProjectionPushdown(JobConf jobConf) { + private void setProjectionPushdown(Configuration jobConf) { if (this.config.deprecatedProjectionString != null) { - ThriftReadSupport.setProjectionPushdown(jobConf, this.config.deprecatedProjectionString); + ThriftReadSupport.setProjectionPushdown((JobConf) jobConf, this.config.deprecatedProjectionString); } } - private void setStrictProjectionPushdown(JobConf jobConf) { + private void setStrictProjectionPushdown(Configuration jobConf) { if (this.config.strictProjectionString != null) { ThriftReadSupport.setStrictFieldProjectionFilter(jobConf, this.config.strictProjectionString); } } - private void setPredicatePushdown(JobConf jobConf) { + private void setPredicatePushdown(Configuration jobConf) { if (this.config.filterPredicate != null) { ParquetInputFormat.setFilterPredicate(jobConf, this.config.filterPredicate); } } @Override - public void sourceConfInit(FlowProcess jobConfFlowProcess, Tap jobConfRecordReaderOutputCollectorTap, final JobConf jobConf) { + public void sourceConfInit(FlowProcess jobConfFlowProcess, Tap jobConfRecordReaderOutputCollectorTap, final Configuration jobConf) { setPredicatePushdown(jobConf); setProjectionPushdown(jobConf); setStrictProjectionPushdown(jobConf); setRecordClass(jobConf); } - private void setRecordClass(JobConf jobConf) { + private void setRecordClass(Configuration jobConf) { if (config.klass != null) { ParquetThriftInputFormat.setThriftClass(jobConf, config.klass); } @@ -135,7 +136,7 @@ private void setRecordClass(JobConf jobConf) { @SuppressWarnings("unchecked") @Override - public boolean source(FlowProcess fp, SourceCall sc) + public boolean source(FlowProcess fp, SourceCall sc) throws IOException { Container value = (Container) sc.getInput().createValue(); boolean hasNext = sc.getInput().next(null, value); @@ -150,7 +151,7 @@ public boolean source(FlowProcess fp, SourceCall fp, SinkCall sc) + public void sink(FlowProcess fp, SinkCall sc) throws IOException { TupleEntry tuple = sc.getOutgoingEntry(); diff --git a/scalding-parquet/src/main/java/com/twitter/scalding/parquet/thrift/ParquetTBaseScheme.java b/scalding-parquet/src/main/java/com/twitter/scalding/parquet/thrift/ParquetTBaseScheme.java index 6a22546704..cac78de00b 100644 --- a/scalding-parquet/src/main/java/com/twitter/scalding/parquet/thrift/ParquetTBaseScheme.java +++ b/scalding-parquet/src/main/java/com/twitter/scalding/parquet/thrift/ParquetTBaseScheme.java @@ -10,6 +10,7 @@ import cascading.flow.FlowProcess; import cascading.tap.Tap; +import org.apache.hadoop.conf.Configuration; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat; @@ -41,24 +42,24 @@ public ParquetTBaseScheme(ParquetValueScheme.Config config) { } @Override - public void sourceConfInit(FlowProcess fp, - Tap tap, JobConf jobConf) { + public void sourceConfInit(FlowProcess fp, + Tap tap, Configuration jobConf) { super.sourceConfInit(fp, tap, jobConf); - jobConf.setInputFormat(ScaldingDeprecatedParquetInputFormat.class); - ParquetInputFormat.setReadSupportClass(jobConf, ThriftReadSupport.class); - ThriftReadSupport.setRecordConverterClass(jobConf, TBaseRecordConverter.class); + ((JobConf) jobConf).setInputFormat(ScaldingDeprecatedParquetInputFormat.class); + ParquetInputFormat.setReadSupportClass((JobConf) jobConf, ThriftReadSupport.class); + ThriftReadSupport.setRecordConverterClass((JobConf) jobConf, TBaseRecordConverter.class); } @Override - public void sinkConfInit(FlowProcess fp, - Tap tap, JobConf jobConf) { + public void sinkConfInit(FlowProcess fp, + Tap tap, Configuration jobConf) { if (this.config.getKlass() == null) { throw new IllegalArgumentException("To use ParquetTBaseScheme as a sink, you must specify a thrift class in the constructor"); } - DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf); - DeprecatedParquetOutputFormat.setWriteSupportClass(jobConf, TBaseWriteSupport.class); - TBaseWriteSupport.setThriftClass(jobConf, this.config.getKlass()); + DeprecatedParquetOutputFormat.setAsOutputFormat((JobConf) jobConf); + DeprecatedParquetOutputFormat.setWriteSupportClass((JobConf) jobConf, TBaseWriteSupport.class); + TBaseWriteSupport.setThriftClass((JobConf) jobConf, this.config.getKlass()); } } diff --git a/scalding-parquet/src/main/java/com/twitter/scalding/parquet/tuple/ParquetTupleScheme.java b/scalding-parquet/src/main/java/com/twitter/scalding/parquet/tuple/ParquetTupleScheme.java index 74bae3b061..a9a2e0ace0 100644 --- a/scalding-parquet/src/main/java/com/twitter/scalding/parquet/tuple/ParquetTupleScheme.java +++ b/scalding-parquet/src/main/java/com/twitter/scalding/parquet/tuple/ParquetTupleScheme.java @@ -20,6 +20,7 @@ import cascading.tuple.Fields; import cascading.tuple.Tuple; import cascading.tuple.TupleEntry; +import org.apache.hadoop.conf.Configuration; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.hadoop.Footer; import org.apache.parquet.hadoop.ParquetInputFormat; @@ -42,7 +43,7 @@ * @author Avi Bryant */ -public class ParquetTupleScheme extends Scheme{ +public class ParquetTupleScheme extends Scheme{ private static final long serialVersionUID = 0L; private String parquetSchema; @@ -84,21 +85,21 @@ public ParquetTupleScheme(Fields sourceFields, Fields sinkFields, final String s @SuppressWarnings("rawtypes") @Override - public void sourceConfInit(FlowProcess fp, - Tap tap, JobConf jobConf) { + public void sourceConfInit(FlowProcess fp, + Tap tap, Configuration jobConf) { if (filterPredicate != null) { ParquetInputFormat.setFilterPredicate(jobConf, filterPredicate); } - jobConf.setInputFormat(ScaldingDeprecatedParquetInputFormat.class); - ParquetInputFormat.setReadSupportClass(jobConf, TupleReadSupport.class); - TupleReadSupport.setRequestedFields(jobConf, getSourceFields()); + ((JobConf) jobConf).setInputFormat(ScaldingDeprecatedParquetInputFormat.class); + ParquetInputFormat.setReadSupportClass((JobConf) jobConf, TupleReadSupport.class); + TupleReadSupport.setRequestedFields((JobConf) jobConf, getSourceFields()); } @Override - public Fields retrieveSourceFields(FlowProcess flowProcess, Tap tap) { - MessageType schema = readSchema(flowProcess, tap); + public Fields retrieveSourceFields(FlowProcess flowProcess, Tap tap) { + MessageType schema = readSchema((FlowProcess) flowProcess, tap); SchemaIntersection intersection = new SchemaIntersection(schema, getSourceFields()); setSourceFields(intersection.getSourceFields()); @@ -106,7 +107,7 @@ public Fields retrieveSourceFields(FlowProcess flowProcess, Tap tap) { return getSourceFields(); } - private MessageType readSchema(FlowProcess flowProcess, Tap tap) { + private MessageType readSchema(FlowProcess flowProcess, Tap tap) { try { Hfs hfs; @@ -127,16 +128,16 @@ private MessageType readSchema(FlowProcess flowProcess, Tap tap) { } } - private List