Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

preview of diffs between develop-cascading4 and develop #1

Closed
wants to merge 14 commits into from
Closed
43 changes: 9 additions & 34 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,79 +20,46 @@ jobs:
# all combinations that we will run in parallel to increase throughput
matrix:
include:
- scala-version: "2.11.12"
BUILD: "base"
TEST_TARGET: "scalding-args scalding-date maple scalding-quotation"
script: './scripts/run_test.sh'

- scala-version: "2.12.14"
BUILD: "base"
TEST_TARGET: "scalding-args scalding-date maple scalding-quotation"
script: './scripts/run_test.sh'

- scala-version: "2.11.12"
BUILD: "base"
TEST_TARGET: "scalding-avro scalding-hraven scalding-commons scalding-parquet scalding-parquet-scrooge"
script: './scripts/run_test.sh'
- scala-version: "2.12.14"
BUILD: "base"
TEST_TAGET: "scalding-avro scalding-hraven scalding-commons scalding-parquet scalding-parquet-scrooge"
script: './scripts/run_test.sh'

- scala-version: "2.11.12"
BUILD: "base"
TEST_TARGET: "scalding-core scalding-jdbc scalding-json scalding-db scalding-cats"
script: './scripts/run_test.sh'
- scala-version: "2.12.14"
BUILD: "base"
TEST_TARGET: "scalding-core scalding-jdbc scalding-json scalding-db scalding-cats"
script: './scripts/run_test.sh'

- scala-version: "2.11.12"
BUILD: "base"
TEST_TARGET: "scalding-hadoop-test"
script: './scripts/run_test.sh'
- scala-version: "2.12.14"
BUILD: "base"
TEST_TARGET: "scalding-hadoop-test"
script: './scripts/run_test.sh'

- scala-version: "2.11.12"
BUILD: "base"
TEST_TARGET: "scalding-estimators-test"
script: './scripts/run_test.sh'
- scala-version: "2.12.14"
BUILD: "base"
TEST_TARGET: "scalding-estimators-test"
script: './scripts/run_test.sh'

- scala-version: "2.11.12"
BUILD: "base"
TEST_TARGET: "scalding-serialization scalding-spark scalding-beam"
script: './scripts/run_test.sh'
- scala-version: "2.12.14"
BUILD: "base"
TEST_TARGET: "scalding-serialization scalding-spark scalding-beam"
script: './scripts/run_test.sh'

- scala-version: "2.11.12"
BUILD: "base"
TEST_TARGET: "scalding-thrift-macros"
script: './scripts/run_test.sh'
- scala-version: "2.12.14"
BUILD: "base"
TEST_TARGET: "scalding-thrift-macros"
script: './scripts/run_test.sh'

- scala-version: "2.11.12"
BUILD: "test tutorials and matrix tutorials and repl"
script: "./scripts/run_test.sh && ./scripts/build_assembly_no_test.sh scalding-assembly && ./scripts/test_tutorials.sh && ./scripts/build_assembly_no_test.sh scalding-assembly && ./scripts/test_matrix_tutorials.sh"
- scala-version: "2.12.14"
BUILD: "test tutorials and matrix tutorials and repl"
script: "./scripts/run_test.sh && ./scripts/build_assembly_no_test.sh scalding-assembly && ./scripts/test_tutorials.sh && ./scripts/build_assembly_no_test.sh scalding-assembly && ./scripts/test_matrix_tutorials.sh"

- scala-version: "2.11.12"
BUILD: "test repl and typed tutorials and microsite"
script: "./sbt ++$TRAVIS_SCALA_VERSION clean docs/makeMicrosite && ./scripts/build_assembly_no_test.sh scalding-repl && ./scripts/test_repl_tutorial.sh && ./scripts/build_assembly_no_test.sh scalding-core && ./scripts/test_typed_tutorials.sh && ./scripts/build_assembly_no_test.sh execution-tutorial && ./scripts/test_execution_tutorial.sh"
- scala-version: "2.12.14"
BUILD: "test repl and typed tutorials and microsite"
script: "./sbt ++$TRAVIS_SCALA_VERSION clean docs/makeMicrosite && ./scripts/build_assembly_no_test.sh scalding-repl && ./scripts/test_repl_tutorial.sh && ./scripts/build_assembly_no_test.sh scalding-core && ./scripts/test_typed_tutorials.sh && ./scripts/build_assembly_no_test.sh execution-tutorial && ./scripts/test_execution_tutorial.sh"
Expand All @@ -118,6 +85,14 @@ jobs:
gem install sass -v 3.7.4
gem install jekyll -v 3.2.1

# this is temporary until cascading releases an official 4.5 artifact
# wip-4.5 is the first tag with a hadoop3 target
- name: "Compile experimental cascading WIP"
run: |
cd experimental-cascading
./build-cascading.sh
ls -R /home/runner/.m2/

- name: "Run Test Variant"
env:
TRAVIS_SCALA_VERSION: ${{ matrix.scala-version }}
Expand Down
30 changes: 17 additions & 13 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@ val catsEffectVersion = "1.1.0"
val catsVersion = "1.5.0"
val chillVersion = "0.8.4"
val dagonVersion = "0.3.1"
val elephantbirdVersion = "4.15"
val elephantbirdVersion = "4.17"
val hadoopLzoVersion = "0.4.19"
val hadoopVersion = "2.5.0"
val hadoopVersion = "3.2.1"
val hbaseVersion = "1.2.4"
val hravenVersion = "1.0.1"
val jacksonVersion = "2.8.7"
val json4SVersion = "3.5.0"
val paradiseVersion = "2.1.1"
val parquetVersion = "1.10.0"
val parquetVersion = "1.12.0"
val protobufVersion = "2.4.1"
val scalameterVersion = "0.8.2"
val scalaCheckVersion = "1.13.4"
val scalaTestVersion = "3.0.1"
val scroogeVersion = "19.8.0"
val sparkVersion = "2.4.0"
val sparkVersion = "3.1.2"
val beamVersion = "2.29.0"
val slf4jVersion = "1.6.6"
val thriftVersion = "0.9.3"
Expand All @@ -42,7 +42,7 @@ val printDependencyClasspath = taskKey[Unit]("Prints location of the dependencie

val sharedSettings = Seq(
organization := "com.twitter",
scalaVersion := "2.11.12",
scalaVersion := "2.12.14",
crossScalaVersions := Seq(scalaVersion.value, "2.12.14"),
javacOptions ++= Seq("-source", "1.8", "-target", "1.8"),
doc / javacOptions := Seq("-source", "1.8"),
Expand All @@ -61,6 +61,7 @@ val sharedSettings = Seq(
"com.novocode" % "junit-interface" % "0.10" % "test"
),
resolvers ++= Seq(
Resolver.mavenLocal,
Opts.resolver.sonatypeSnapshots,
Opts.resolver.sonatypeReleases,
"Concurrent Maven Repo".at("https://conjars.org/repo"),
Expand Down Expand Up @@ -260,7 +261,7 @@ lazy val scaldingArgs = module("args")
lazy val scaldingDate = module("date")

lazy val cascadingVersion =
System.getenv.asScala.getOrElse("SCALDING_CASCADING_VERSION", "2.6.1")
System.getenv.asScala.getOrElse("SCALDING_CASCADING_VERSION", "4.5.0-wip-dev")

lazy val cascadingJDBCVersion =
System.getenv.asScala.getOrElse("SCALDING_CASCADING_JDBC_VERSION", "2.6.0")
Expand All @@ -286,9 +287,10 @@ lazy val scaldingQuotation = module("quotation").settings(
lazy val scaldingCore = module("core")
.settings(
libraryDependencies ++= Seq(
"cascading" % "cascading-core" % cascadingVersion,
"cascading" % "cascading-hadoop" % cascadingVersion,
"cascading" % "cascading-local" % cascadingVersion,
"net.wensel" % "cascading-core" % cascadingVersion,
"net.wensel" % "cascading-hadoop3-common" % cascadingVersion,
"net.wensel" % "cascading-hadoop3-mr1" % cascadingVersion,
"net.wensel" % "cascading-local" % cascadingVersion,
"com.stripe" %% "dagon-core" % dagonVersion,
"com.twitter" % "chill-hadoop" % chillVersion,
"com.twitter" % "chill-java" % chillVersion,
Expand Down Expand Up @@ -353,7 +355,7 @@ lazy val scaldingCommons = module("commons")
"com.twitter" %% "bijection-core" % bijectionVersion,
"com.twitter" %% "algebird-core" % algebirdVersion,
"com.twitter" %% "chill" % chillVersion,
"com.twitter.elephantbird" % "elephant-bird-cascading2" % elephantbirdVersion,
"com.twitter.elephantbird" % "elephant-bird-cascading3" % elephantbirdVersion,
"com.twitter.elephantbird" % "elephant-bird-core" % elephantbirdVersion,
"com.hadoop.gplcompression" % "hadoop-lzo" % hadoopLzoVersion,
// TODO: split this out into scalding-thrift
Expand Down Expand Up @@ -531,7 +533,7 @@ lazy val scaldingJson = module("json")
"org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion,
"org.json4s" %% "json4s-native" % json4SVersion,
"com.twitter.elephantbird" % "elephant-bird-cascading2" % elephantbirdVersion % "provided"
"com.twitter.elephantbird" % "elephant-bird-cascading3" % elephantbirdVersion % "provided"
)
)
.dependsOn(scaldingCore)
Expand Down Expand Up @@ -601,7 +603,8 @@ lazy val maple = Project(
"org.apache.hbase" % "hbase-client" % hbaseVersion % "provided",
"org.apache.hbase" % "hbase-common" % hbaseVersion % "provided",
"org.apache.hbase" % "hbase-server" % hbaseVersion % "provided",
"cascading" % "cascading-hadoop" % cascadingVersion
"net.wensel" % "cascading-hadoop3-common" % cascadingVersion,
"net.wensel" % "cascading-hadoop3-mr1" % cascadingVersion
)
)
)
Expand All @@ -618,7 +621,8 @@ lazy val executionTutorial = Project(
"org.apache.hadoop" % "hadoop-client" % hadoopVersion,
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.slf4j" % "slf4j-log4j12" % slf4jVersion,
"cascading" % "cascading-hadoop" % cascadingVersion
"net.wensel" % "cascading-hadoop3-common" % cascadingVersion,
"net.wensel" % "cascading-hadoop3-mr1" % cascadingVersion
)
)
).dependsOn(scaldingCore)
Expand Down
8 changes: 8 additions & 0 deletions experimental-cascading/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Experimental Cascading
This is a folder for providing an experimental scalding/cascading build. <br>
We want to e2e test a hadoop3/cascading4 fork of cascading here.

# Cascading WIP
The WIP branches of cascading are a pain to consume because they only publish to github-packages which requires github auth to read.
It's easier to just compile to WIP branch here before a final release of cascading 4.5 is available.

10 changes: 10 additions & 0 deletions experimental-cascading/build-cascading.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash -ex

# build cascading
docker build --tag experimental-cascading:latest -f docker/Dockerfile ./docker

# build scalding
SCALDING_HOST_ROOT=$PWD../../
MAVEN_HOST_CACHE=~/.m2
RESOURCE_ARGS="--cpus=0.000 --memory=8g --memory-swap=24g" # container will crash without enough memory
docker run -v $SCALDING_HOST_ROOT:/scalding -v $MAVEN_HOST_CACHE:/root/m2-host experimental-cascading:latest $RESOURCE_ARGS
18 changes: 18 additions & 0 deletions experimental-cascading/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
FROM ubuntu:22.04

# CASCADING_RELEASE is the release tag for cascading to compile (ex. https://github.com/cwensel/cascading/releases/tag/4.5-wip-10)
# SCALDING_CASCADING_VERSION is the arg scalding uses to resolve the cascading version
ENV CASCADING_RELEASE 4.5-wip-10
ENV SCALDING_CASCADING_VERSION 4.5.0-wip-dev

COPY "./download-java.sh" "/scalding/experimental-cascading/docker/download-java.sh"
RUN "/scalding/experimental-cascading/docker/download-java.sh"

COPY "./compile-cascading.sh" "/scalding/experimental-cascading/docker/compile-cascading.sh"
RUN "/scalding/experimental-cascading/docker/compile-cascading.sh"

COPY "./cleanup-image.sh" "/scalding/experimental-cascading/docker/cleanup-image.sh"
RUN "/scalding/experimental-cascading/docker/cleanup-image.sh"

COPY "./link-cascading.sh" "/scalding/experimental-cascading/docker/link-cascading.sh"
ENTRYPOINT ["/bin/bash", "/scalding/experimental-cascading/docker/link-cascading.sh"]
3 changes: 3 additions & 0 deletions experimental-cascading/docker/cleanup-image.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash -ex

rm -rf /scalding
15 changes: 15 additions & 0 deletions experimental-cascading/docker/compile-cascading.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/bin/bash -ex

RELEASE_LINK=https://github.com/cwensel/cascading/archive/refs/tags/$CASCADING_RELEASE.zip

# get the source for the target release tag
mkdir -p /scalding/tmp
apt-get install curl unzip git -y
curl $RELEASE_LINK -L -o /scalding/tmp/$CASCADING_RELEASE.zip
cd /scalding/tmp/
unzip $CASCADING_RELEASE
cd cascading-$CASCADING_RELEASE

# build cascading into maven cache so that scalding can pick it up
./gradlew publishToMavenLocal -x signMavenPublication
ls -R /root/.m2/repository/net/wensel/
5 changes: 5 additions & 0 deletions experimental-cascading/docker/download-java.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash -ex

apt-get update -y
apt-get install openjdk-8-jdk -y
java -version
8 changes: 8 additions & 0 deletions experimental-cascading/docker/link-cascading.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash -ex

# drop cascading build back onto host machine
# to make IDE resolution work for local development
rm -rf /root/m2-host/repository/net/wensel/
mkdir -p /root/m2-host/repository/net/wensel/
ls /root/.m2/repository/net/wensel/
cp -r /root/.m2/repository/net/wensel/ /root/m2-host/repository/net/
33 changes: 17 additions & 16 deletions maple/src/main/java/com/etsy/cascading/tap/local/LocalTap.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Properties;
import java.util.logging.Logger;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
Expand Down Expand Up @@ -45,18 +46,18 @@ public class LocalTap<SourceCtx, SinkCtx> extends Tap<Properties, RecordReader,
private JobConf defaults;
private Lfs lfs;

public LocalTap(String path, Scheme<JobConf, RecordReader, OutputCollector, SourceCtx, SinkCtx> scheme,
public LocalTap(String path, Scheme<Configuration, RecordReader, OutputCollector, SourceCtx, SinkCtx> scheme,
SinkMode sinkMode) {
super(new LocalScheme<SourceCtx, SinkCtx>(scheme), sinkMode);
setup(path, scheme);
}

public LocalTap(String path, Scheme<JobConf, RecordReader, OutputCollector, SourceCtx, SinkCtx> scheme) {
public LocalTap(String path, Scheme<Configuration, RecordReader, OutputCollector, SourceCtx, SinkCtx> scheme) {
super(new LocalScheme<SourceCtx, SinkCtx>(scheme));
setup(path, scheme);
}

private void setup(String path, Scheme<JobConf, RecordReader, OutputCollector, SourceCtx, SinkCtx> scheme) {
private void setup(String path, Scheme<Configuration, RecordReader, OutputCollector, SourceCtx, SinkCtx> scheme) {
this.path = path;

/*
Expand Down Expand Up @@ -90,13 +91,13 @@ public String getIdentifier() {
}

@Override
public TupleEntryIterator openForRead(FlowProcess<Properties> flowProcess, RecordReader input) throws IOException {
public TupleEntryIterator openForRead(FlowProcess<? extends Properties> flowProcess, RecordReader input) throws IOException {
JobConf jobConf = mergeDefaults("LocalTap#openForRead", flowProcess.getConfigCopy(), defaults);
return lfs.openForRead(new HadoopFlowProcess(jobConf));
}

@Override
public TupleEntryCollector openForWrite(FlowProcess<Properties> flowProcess, OutputCollector output)
public TupleEntryCollector openForWrite(FlowProcess<? extends Properties> flowProcess, OutputCollector output)
throws IOException {
JobConf jobConf = mergeDefaults("LocalTap#openForWrite", flowProcess.getConfigCopy(), defaults);
return lfs.openForWrite(new HadoopFlowProcess(jobConf));
Expand Down Expand Up @@ -141,11 +142,11 @@ private static class LocalScheme<SourceContext, SinkContext> extends
Scheme<Properties, RecordReader, OutputCollector, SourceContext, SinkContext> {
private static final long serialVersionUID = 5710119342340369543L;

private Scheme<JobConf, RecordReader, OutputCollector, SourceContext, SinkContext> scheme;
private Scheme<Configuration, RecordReader, OutputCollector, SourceContext, SinkContext> scheme;
private JobConf defaults;
private Lfs lfs;

public LocalScheme(Scheme<JobConf, RecordReader, OutputCollector, SourceContext, SinkContext> scheme) {
public LocalScheme(Scheme<Configuration, RecordReader, OutputCollector, SourceContext, SinkContext> scheme) {
super(scheme.getSourceFields(), scheme.getSinkFields());
this.scheme = scheme;
}
Expand All @@ -159,53 +160,53 @@ private void setLfs(Lfs lfs) {
}

@Override
public Fields retrieveSourceFields(FlowProcess<Properties> flowProcess,
public Fields retrieveSourceFields(FlowProcess<? extends Properties> flowProcess,
Tap tap) {
return scheme.retrieveSourceFields(new HadoopFlowProcess(defaults), lfs);
}

@Override
public void presentSourceFields(FlowProcess<Properties> flowProcess,
public void presentSourceFields(FlowProcess<? extends Properties> flowProcess,
Tap tap, Fields fields) {
scheme.presentSourceFields(new HadoopFlowProcess(defaults), lfs, fields);
}

@Override
public void sourceConfInit(FlowProcess<Properties> flowProcess,
public void sourceConfInit(FlowProcess<? extends Properties> flowProcess,
Tap<Properties, RecordReader, OutputCollector> tap, Properties conf) {
JobConf jobConf = mergeDefaults("LocalScheme#sourceConfInit", conf, defaults);
scheme.sourceConfInit(new HadoopFlowProcess(jobConf), lfs, jobConf);
overwriteProperties(conf, jobConf);
}

@Override
public Fields retrieveSinkFields(FlowProcess<Properties> flowProcess,
public Fields retrieveSinkFields(FlowProcess<? extends Properties> flowProcess,
Tap tap) {
return scheme.retrieveSinkFields(new HadoopFlowProcess(defaults), lfs);
}

@Override
public void presentSinkFields(FlowProcess<Properties> flowProcess,
public void presentSinkFields(FlowProcess<? extends Properties> flowProcess,
Tap tap, Fields fields) {
scheme.presentSinkFields(new HadoopFlowProcess(defaults), lfs, fields);
}

@Override
public void sinkConfInit(FlowProcess<Properties> flowProcess,
public void sinkConfInit(FlowProcess<? extends Properties> flowProcess,
Tap<Properties, RecordReader, OutputCollector> tap, Properties conf) {
JobConf jobConf = mergeDefaults("LocalScheme#sinkConfInit", conf, defaults);
scheme.sinkConfInit(new HadoopFlowProcess(jobConf), lfs, jobConf);
overwriteProperties(conf, jobConf);
}

@Override
public boolean source(FlowProcess<Properties> flowProcess, SourceCall<SourceContext, RecordReader> sourceCall)
public boolean source(FlowProcess<? extends Properties> flowProcess, SourceCall<SourceContext, RecordReader> sourceCall)
throws IOException {
throw new RuntimeException("LocalTap#source is never called");
}

@Override
public void sink(FlowProcess<Properties> flowProcess, SinkCall<SinkContext, OutputCollector> sinkCall)
public void sink(FlowProcess<? extends Properties> flowProcess, SinkCall<SinkContext, OutputCollector> sinkCall)
throws IOException {
throw new RuntimeException("LocalTap#sink is never called");
}
Expand Down
Loading