Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

preview of diffs between develop-cascading4 and develop #1

Closed
wants to merge 14 commits into from
Closed
30 changes: 17 additions & 13 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@ val catsEffectVersion = "1.1.0"
val catsVersion = "1.5.0"
val chillVersion = "0.8.4"
val dagonVersion = "0.3.1"
val elephantbirdVersion = "4.15"
val elephantbirdVersion = "4.17"
val hadoopLzoVersion = "0.4.19"
val hadoopVersion = "2.5.0"
val hadoopVersion = "3.2.1"
val hbaseVersion = "1.2.4"
val hravenVersion = "1.0.1"
val jacksonVersion = "2.8.7"
val json4SVersion = "3.5.0"
val paradiseVersion = "2.1.1"
val parquetVersion = "1.10.0"
val parquetVersion = "1.12.0"
val protobufVersion = "2.4.1"
val scalameterVersion = "0.8.2"
val scalaCheckVersion = "1.13.4"
val scalaTestVersion = "3.0.1"
val scroogeVersion = "19.8.0"
val sparkVersion = "2.4.0"
val sparkVersion = "3.1.2"
val beamVersion = "2.29.0"
val slf4jVersion = "1.6.6"
val thriftVersion = "0.9.3"
Expand All @@ -42,7 +42,7 @@ val printDependencyClasspath = taskKey[Unit]("Prints location of the dependencie

val sharedSettings = Seq(
organization := "com.twitter",
scalaVersion := "2.11.12",
scalaVersion := "2.12.14",
crossScalaVersions := Seq(scalaVersion.value, "2.12.14"),
javacOptions ++= Seq("-source", "1.8", "-target", "1.8"),
doc / javacOptions := Seq("-source", "1.8"),
Expand All @@ -61,6 +61,7 @@ val sharedSettings = Seq(
"com.novocode" % "junit-interface" % "0.10" % "test"
),
resolvers ++= Seq(
Resolver.mavenLocal,
Opts.resolver.sonatypeSnapshots,
Opts.resolver.sonatypeReleases,
"Concurrent Maven Repo".at("https://conjars.org/repo"),
Expand Down Expand Up @@ -260,7 +261,7 @@ lazy val scaldingArgs = module("args")
lazy val scaldingDate = module("date")

lazy val cascadingVersion =
System.getenv.asScala.getOrElse("SCALDING_CASCADING_VERSION", "2.6.1")
System.getenv.asScala.getOrElse("SCALDING_CASCADING_VERSION", "4.5.0-wip-dev")

lazy val cascadingJDBCVersion =
System.getenv.asScala.getOrElse("SCALDING_CASCADING_JDBC_VERSION", "2.6.0")
Expand All @@ -286,9 +287,10 @@ lazy val scaldingQuotation = module("quotation").settings(
lazy val scaldingCore = module("core")
.settings(
libraryDependencies ++= Seq(
"cascading" % "cascading-core" % cascadingVersion,
"cascading" % "cascading-hadoop" % cascadingVersion,
"cascading" % "cascading-local" % cascadingVersion,
"net.wensel" % "cascading-core" % cascadingVersion,
"net.wensel" % "cascading-hadoop3-common" % cascadingVersion,
"net.wensel" % "cascading-hadoop3-mr1" % cascadingVersion,
"net.wensel" % "cascading-local" % cascadingVersion,
"com.stripe" %% "dagon-core" % dagonVersion,
"com.twitter" % "chill-hadoop" % chillVersion,
"com.twitter" % "chill-java" % chillVersion,
Expand Down Expand Up @@ -353,7 +355,7 @@ lazy val scaldingCommons = module("commons")
"com.twitter" %% "bijection-core" % bijectionVersion,
"com.twitter" %% "algebird-core" % algebirdVersion,
"com.twitter" %% "chill" % chillVersion,
"com.twitter.elephantbird" % "elephant-bird-cascading2" % elephantbirdVersion,
"com.twitter.elephantbird" % "elephant-bird-cascading3" % elephantbirdVersion,
"com.twitter.elephantbird" % "elephant-bird-core" % elephantbirdVersion,
"com.hadoop.gplcompression" % "hadoop-lzo" % hadoopLzoVersion,
// TODO: split this out into scalding-thrift
Expand Down Expand Up @@ -531,7 +533,7 @@ lazy val scaldingJson = module("json")
"org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion,
"org.json4s" %% "json4s-native" % json4SVersion,
"com.twitter.elephantbird" % "elephant-bird-cascading2" % elephantbirdVersion % "provided"
"com.twitter.elephantbird" % "elephant-bird-cascading3" % elephantbirdVersion % "provided"
)
)
.dependsOn(scaldingCore)
Expand Down Expand Up @@ -601,7 +603,8 @@ lazy val maple = Project(
"org.apache.hbase" % "hbase-client" % hbaseVersion % "provided",
"org.apache.hbase" % "hbase-common" % hbaseVersion % "provided",
"org.apache.hbase" % "hbase-server" % hbaseVersion % "provided",
"cascading" % "cascading-hadoop" % cascadingVersion
"net.wensel" % "cascading-hadoop3-common" % cascadingVersion,
"net.wensel" % "cascading-hadoop3-mr1" % cascadingVersion
)
)
)
Expand All @@ -618,7 +621,8 @@ lazy val executionTutorial = Project(
"org.apache.hadoop" % "hadoop-client" % hadoopVersion,
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.slf4j" % "slf4j-log4j12" % slf4jVersion,
"cascading" % "cascading-hadoop" % cascadingVersion
"net.wensel" % "cascading-hadoop3-common" % cascadingVersion,
"net.wensel" % "cascading-hadoop3-mr1" % cascadingVersion
)
)
).dependsOn(scaldingCore)
Expand Down
8 changes: 8 additions & 0 deletions experimental-cascading/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Experimental Cascading
This is a folder for providing an experimental scalding/cascading build. <br>
We want to e2e test a hadoop3/cascading4 fork of cascading here.

# Cascading WIP
The WIP branches of cascading are a pain to consume because they only publish to github-packages which requires github auth to read.
It's easier to just compile to WIP branch here before a final release of cascading 4.5 is available.

10 changes: 10 additions & 0 deletions experimental-cascading/build-scalding.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash -ex

# build cascading
docker build --tag experimental-cascading:latest -f docker/Dockerfile ./docker

# build scalding
SCALDING_HOST_ROOT=$PWD../../
MAVEN_HOST_CACHE=~/.m2
RESOURCE_ARGS="--cpus=0.000 --memory=8g --memory-swap=24g" # container will crash without enough memory
docker run -v $SCALDING_HOST_ROOT:/scalding -v $MAVEN_HOST_CACHE:/root/m2-host experimental-cascading:latest $RESOURCE_ARGS
18 changes: 18 additions & 0 deletions experimental-cascading/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
FROM ubuntu:22.04

# CASCADING_RELEASE is the release tag for cascading to compile (ex. https://github.com/cwensel/cascading/releases/tag/4.5-wip-10)
# SCALDING_CASCADING_VERSION is the arg scalding uses to resolve the cascading version
ENV CASCADING_RELEASE 4.5-wip-10
ENV SCALDING_CASCADING_VERSION 4.5.0-wip-dev

COPY "./download-java.sh" "/scalding/experimental-cascading/docker/download-java.sh"
RUN "/scalding/experimental-cascading/docker/download-java.sh"

COPY "./compile-cascading.sh" "/scalding/experimental-cascading/docker/compile-cascading.sh"
RUN "/scalding/experimental-cascading/docker/compile-cascading.sh"

COPY "./cleanup-image.sh" "/scalding/experimental-cascading/docker/cleanup-image.sh"
RUN "/scalding/experimental-cascading/docker/cleanup-image.sh"

COPY "./compile-scalding.sh" "/scalding/experimental-cascading/docker/compile-scalding.sh"
ENTRYPOINT ["/bin/bash", "/scalding/experimental-cascading/docker/compile-scalding.sh"]
3 changes: 3 additions & 0 deletions experimental-cascading/docker/cleanup-image.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash -ex

rm -rf /scalding
15 changes: 15 additions & 0 deletions experimental-cascading/docker/compile-cascading.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/bin/bash -ex

RELEASE_LINK=https://github.com/cwensel/cascading/archive/refs/tags/$CASCADING_RELEASE.zip

# get the source for the target release tag
mkdir -p /scalding/tmp
apt-get install curl unzip git -y
curl $RELEASE_LINK -L -o /scalding/tmp/$CASCADING_RELEASE.zip
cd /scalding/tmp/
unzip $CASCADING_RELEASE
cd cascading-$CASCADING_RELEASE

# build cascading into maven cache so that scalding can pick it up
./gradlew publishToMavenLocal -x signMavenPublication
ls -R /root/.m2/repository/net/wensel/
10 changes: 10 additions & 0 deletions experimental-cascading/docker/compile-scalding.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash -ex

# drop cascading build back onto host machine
# to make IDE resolution work for local development
rm -rf /root/m2-host/repository/net/wensel/
cp -r /root/.m2/repository/net/wensel/ /root/m2-host/repository/net/wensel/

# build scalding
cd /scalding
./sbt compile
5 changes: 5 additions & 0 deletions experimental-cascading/docker/download-java.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash -ex

apt-get update -y
apt-get install openjdk-8-jdk -y
java -version
33 changes: 17 additions & 16 deletions maple/src/main/java/com/etsy/cascading/tap/local/LocalTap.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Properties;
import java.util.logging.Logger;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
Expand Down Expand Up @@ -45,18 +46,18 @@ public class LocalTap<SourceCtx, SinkCtx> extends Tap<Properties, RecordReader,
private JobConf defaults;
private Lfs lfs;

public LocalTap(String path, Scheme<JobConf, RecordReader, OutputCollector, SourceCtx, SinkCtx> scheme,
public LocalTap(String path, Scheme<Configuration, RecordReader, OutputCollector, SourceCtx, SinkCtx> scheme,
SinkMode sinkMode) {
super(new LocalScheme<SourceCtx, SinkCtx>(scheme), sinkMode);
setup(path, scheme);
}

public LocalTap(String path, Scheme<JobConf, RecordReader, OutputCollector, SourceCtx, SinkCtx> scheme) {
public LocalTap(String path, Scheme<Configuration, RecordReader, OutputCollector, SourceCtx, SinkCtx> scheme) {
super(new LocalScheme<SourceCtx, SinkCtx>(scheme));
setup(path, scheme);
}

private void setup(String path, Scheme<JobConf, RecordReader, OutputCollector, SourceCtx, SinkCtx> scheme) {
private void setup(String path, Scheme<Configuration, RecordReader, OutputCollector, SourceCtx, SinkCtx> scheme) {
this.path = path;

/*
Expand Down Expand Up @@ -90,13 +91,13 @@ public String getIdentifier() {
}

@Override
public TupleEntryIterator openForRead(FlowProcess<Properties> flowProcess, RecordReader input) throws IOException {
public TupleEntryIterator openForRead(FlowProcess<? extends Properties> flowProcess, RecordReader input) throws IOException {
JobConf jobConf = mergeDefaults("LocalTap#openForRead", flowProcess.getConfigCopy(), defaults);
return lfs.openForRead(new HadoopFlowProcess(jobConf));
}

@Override
public TupleEntryCollector openForWrite(FlowProcess<Properties> flowProcess, OutputCollector output)
public TupleEntryCollector openForWrite(FlowProcess<? extends Properties> flowProcess, OutputCollector output)
throws IOException {
JobConf jobConf = mergeDefaults("LocalTap#openForWrite", flowProcess.getConfigCopy(), defaults);
return lfs.openForWrite(new HadoopFlowProcess(jobConf));
Expand Down Expand Up @@ -141,11 +142,11 @@ private static class LocalScheme<SourceContext, SinkContext> extends
Scheme<Properties, RecordReader, OutputCollector, SourceContext, SinkContext> {
private static final long serialVersionUID = 5710119342340369543L;

private Scheme<JobConf, RecordReader, OutputCollector, SourceContext, SinkContext> scheme;
private Scheme<Configuration, RecordReader, OutputCollector, SourceContext, SinkContext> scheme;
private JobConf defaults;
private Lfs lfs;

public LocalScheme(Scheme<JobConf, RecordReader, OutputCollector, SourceContext, SinkContext> scheme) {
public LocalScheme(Scheme<Configuration, RecordReader, OutputCollector, SourceContext, SinkContext> scheme) {
super(scheme.getSourceFields(), scheme.getSinkFields());
this.scheme = scheme;
}
Expand All @@ -159,53 +160,53 @@ private void setLfs(Lfs lfs) {
}

@Override
public Fields retrieveSourceFields(FlowProcess<Properties> flowProcess,
public Fields retrieveSourceFields(FlowProcess<? extends Properties> flowProcess,
Tap tap) {
return scheme.retrieveSourceFields(new HadoopFlowProcess(defaults), lfs);
}

@Override
public void presentSourceFields(FlowProcess<Properties> flowProcess,
public void presentSourceFields(FlowProcess<? extends Properties> flowProcess,
Tap tap, Fields fields) {
scheme.presentSourceFields(new HadoopFlowProcess(defaults), lfs, fields);
}

@Override
public void sourceConfInit(FlowProcess<Properties> flowProcess,
public void sourceConfInit(FlowProcess<? extends Properties> flowProcess,
Tap<Properties, RecordReader, OutputCollector> tap, Properties conf) {
JobConf jobConf = mergeDefaults("LocalScheme#sourceConfInit", conf, defaults);
scheme.sourceConfInit(new HadoopFlowProcess(jobConf), lfs, jobConf);
overwriteProperties(conf, jobConf);
}

@Override
public Fields retrieveSinkFields(FlowProcess<Properties> flowProcess,
public Fields retrieveSinkFields(FlowProcess<? extends Properties> flowProcess,
Tap tap) {
return scheme.retrieveSinkFields(new HadoopFlowProcess(defaults), lfs);
}

@Override
public void presentSinkFields(FlowProcess<Properties> flowProcess,
public void presentSinkFields(FlowProcess<? extends Properties> flowProcess,
Tap tap, Fields fields) {
scheme.presentSinkFields(new HadoopFlowProcess(defaults), lfs, fields);
}

@Override
public void sinkConfInit(FlowProcess<Properties> flowProcess,
public void sinkConfInit(FlowProcess<? extends Properties> flowProcess,
Tap<Properties, RecordReader, OutputCollector> tap, Properties conf) {
JobConf jobConf = mergeDefaults("LocalScheme#sinkConfInit", conf, defaults);
scheme.sinkConfInit(new HadoopFlowProcess(jobConf), lfs, jobConf);
overwriteProperties(conf, jobConf);
}

@Override
public boolean source(FlowProcess<Properties> flowProcess, SourceCall<SourceContext, RecordReader> sourceCall)
public boolean source(FlowProcess<? extends Properties> flowProcess, SourceCall<SourceContext, RecordReader> sourceCall)
throws IOException {
throw new RuntimeException("LocalTap#source is never called");
}

@Override
public void sink(FlowProcess<Properties> flowProcess, SinkCall<SinkContext, OutputCollector> sinkCall)
public void sink(FlowProcess<? extends Properties> flowProcess, SinkCall<SinkContext, OutputCollector> sinkCall)
throws IOException {
throw new RuntimeException("LocalTap#sink is never called");
}
Expand Down
12 changes: 6 additions & 6 deletions maple/src/main/java/com/twitter/maple/hbase/HBaseScheme.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public String[] getFamilyNames() {
}

@Override
public void sourcePrepare(FlowProcess<JobConf> flowProcess,
public void sourcePrepare(FlowProcess<? extends JobConf> flowProcess,
SourceCall<Object[], RecordReader> sourceCall) {
Object[] pair =
new Object[]{sourceCall.getInput().createKey(), sourceCall.getInput().createValue()};
Expand All @@ -163,13 +163,13 @@ public void sourcePrepare(FlowProcess<JobConf> flowProcess,
}

@Override
public void sourceCleanup(FlowProcess<JobConf> flowProcess,
public void sourceCleanup(FlowProcess<? extends JobConf> flowProcess,
SourceCall<Object[], RecordReader> sourceCall) {
sourceCall.setContext(null);
}

@Override
public boolean source(FlowProcess<JobConf> flowProcess,
public boolean source(FlowProcess<? extends JobConf> flowProcess,
SourceCall<Object[], RecordReader> sourceCall) throws IOException {
Tuple result = new Tuple();

Expand Down Expand Up @@ -206,7 +206,7 @@ public boolean source(FlowProcess<JobConf> flowProcess,
}

@Override
public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall)
public void sink(FlowProcess<? extends JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall)
throws IOException {
TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
OutputCollector outputCollector = sinkCall.getOutput();
Expand All @@ -231,7 +231,7 @@ public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputColl
}

@Override
public void sinkConfInit(FlowProcess<JobConf> process,
public void sinkConfInit(FlowProcess<? extends JobConf> process,
Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
conf.setOutputFormat(TableOutputFormat.class);

Expand All @@ -240,7 +240,7 @@ public void sinkConfInit(FlowProcess<JobConf> process,
}

@Override
public void sourceConfInit(FlowProcess<JobConf> process,
public void sourceConfInit(FlowProcess<? extends JobConf> process,
Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
conf.setInputFormat(TableInputFormat.class);

Expand Down
10 changes: 5 additions & 5 deletions maple/src/main/java/com/twitter/maple/hbase/HBaseTap.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningExceptio
}

@Override
public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) {
public void sinkConfInit(FlowProcess<? extends JobConf> process, JobConf conf) {
if (quorumNames != null) {
conf.set("hbase.zookeeper.quorum", quorumNames);
} else {
Expand Down Expand Up @@ -183,13 +183,13 @@ public String getIdentifier() {
}

@Override
public TupleEntryIterator openForRead(FlowProcess<JobConf> jobConfFlowProcess, RecordReader recordReader) throws IOException {
public TupleEntryIterator openForRead(FlowProcess<? extends JobConf> jobConfFlowProcess, RecordReader recordReader) throws IOException {
return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader);
}

@Override
public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess, OutputCollector outputCollector) throws IOException {
HBaseTapCollector hBaseCollector = new HBaseTapCollector( jobConfFlowProcess, this );
public TupleEntryCollector openForWrite(FlowProcess<? extends JobConf> jobConfFlowProcess, OutputCollector outputCollector) throws IOException {
HBaseTapCollector hBaseCollector = new HBaseTapCollector( (FlowProcess<JobConf>) jobConfFlowProcess, this );
hBaseCollector.prepare();
return hBaseCollector;
}
Expand Down Expand Up @@ -235,7 +235,7 @@ public long getModifiedTime(JobConf jobConf) throws IOException {
}

@Override
public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) {
public void sourceConfInit(FlowProcess<? extends JobConf> process, JobConf conf) {
// a hack for MultiInputFormat to see that there is a child format
FileInputFormat.setInputPaths( conf, getPath() );

Expand Down