Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Initial commit

  • Loading branch information...
commit 53d9f9a6e22b0ce5d1e18cc8375a92d71987b1aa 0 parents
@jwills jwills authored
Showing with 1,485 additions and 0 deletions.
  1. +64 −0 README.md
  2. +4 −0 bin/suhdp
  3. +74 −0 pom.xml
  4. +23 −0 src/main/assembly/hadoop-job.xml
  5. +51 −0 src/main/java/com/cloudera/seismic/crunch/EmitterCallback.java
  6. +66 −0 src/main/java/com/cloudera/seismic/crunch/SUDoFn.java
  7. +173 −0 src/main/java/com/cloudera/seismic/crunch/SUPipeline.java
  8. +71 −0 src/main/java/com/cloudera/seismic/crunch/SUPostGroupFn.java
  9. +84 −0 src/main/java/com/cloudera/seismic/crunch/SUSort.java
  10. +52 −0 src/main/java/com/cloudera/seismic/segy/Field.java
  11. +73 −0 src/main/java/com/cloudera/seismic/segy/FieldType.java
  12. +91 −0 src/main/java/com/cloudera/seismic/segy/Fields.java
  13. +43 −0 src/main/java/com/cloudera/seismic/segy/Main.java
  14. +29 −0 src/main/java/com/cloudera/seismic/segy/SegyException.java
  15. +123 −0 src/main/java/com/cloudera/seismic/segy/SegyLoader.java
  16. +90 −0 src/main/java/com/cloudera/seismic/segy/SegyUnloader.java
  17. +15 −0 src/main/java/com/cloudera/seismic/su/SUCallback.java
  18. +156 −0 src/main/java/com/cloudera/seismic/su/SUProcess.java
  19. +90 −0 src/main/java/com/cloudera/seismic/su/SUReader.java
  20. +47 −0 src/main/java/com/cloudera/seismic/su/SequenceFileCallback.java
  21. +35 −0 src/test/java/com/cloudera/seismic/segy/FieldTypeTest.java
  22. +31 −0 src/test/java/com/cloudera/seismic/su/SUProcessTest.java
64 README.md
@@ -0,0 +1,64 @@
+# Seismic Hadoop
+
+## Introduction
+
+Seismic Hadoop combines [Seismic Unix](http://www.cwp.mines.edu/cwpcodes/) with [Cloudera's Distribution including Apache Hadoop](http://www.cloudera.com/hadoop/)
+to make it easy to execute common seismic data processing tasks on a Hadoop cluster.
+
+## Build and Installation
+
+You will need to install Seismic Unix on both your client machine and the servers in your Hadoop cluster.
+
+In order to create the jar file that coordinates job execution, simply run `mvn package`.
+
+This will create a `seismic-0.1.0-job.jar` file in the `target/` directory, which includes all of the necessary
+dependencies for running a Seismic Unix job on a Hadoop cluster.
+
+## Running Seismic Hadoop
+
+The `suhdp` script in the `bin/` directory may be used as a shortcut for running the following commands. It requires that
+the `HADOOP_HOME` environment variable is set on the client machine.
+
+### Writing SEG-Y or SU data files to the Hadoop Cluster
+
+The `load` command to suhdp will take SEG-Y or SU formatted files on the local machine, format them for use with Hadoop,
+and copy them to the Hadoop cluster.
+
+ suhdp load -input <local SEG-Y/SU files> -output <HDFS target> [-cwproot <path>]
+
+The `cwproot` argument only needs to be specified if the CWPROOT environment variable is not set on the client machine.
+Seismic Hadoop will use the `segyread` command to parse a local file unless it ends with ".su".
+
+### Reading SU data files from the Hadoop Cluster
+
+The `unload` command will read Hadoop-formatted data files from the Hadoop cluster and write them to the local machine.
+
+ suhdp unload -input <SU file/directory of files on HDFS> -output <local file to write>
+
+### Running SU Commands on data in the Hadoop cluster
+
+The `run` command will execute a series of Seismic Unix commands on data stored in HDFS by converting the commands
+to a series of MapReduce jobs.
+
+ suhdp run -command "seismic | unix | commands" -input <HDFS input path> -output <HDFS output path> \
+ -cwproot <path to SU on the cluster machines>
+
+For example, we might run:
+
+ suhdp run -command "sufilter f=10,20,30,40 | suchw key1=gx,cdp key2=offset,gx key3=sx,sx b=1,1 c=1,1 d=1,2 | susort cdp gx" \
+ -input aniso.su -output sorted.su -cwproot /usr/local/su
+
+In this case, Seismic Hadoop will run a MapReduce job that applies the `sufilter` and `suchw` commands to each trace during the Map
+phase, and then sorts the data by the CDP field in the trace header during the Shuffle phase, and then performs a secondary sort
+on the receiver locations for each CDP gather in the Reduce phase. There are a few things to note about running SU commands on the
+cluster:
+
+1. Most SU commands that are specified are run as-is by the system. The most notable exception is `susort`, which is performed by the
+framework, but is designed to be compatible with the standard `susort` command.
+2. If the last SU command specified in the `command` argument is an X Windows command (e.g., `suximage`, `suxwigb`), then the system
+will stream the results of running the pipeline to the client machine, where the X Windows command will be executed locally. Make sure
+that the `CWPROOT` environment variable is specified on the client machine in order to support this option.
+3. Certain commands that are not trace parallel (e.g., `suop2`) will not work correctly on Seismic Hadoop. Also, commands that take
+additional input files will not work properly because the system will not copy those input files to the jobs running on the cluster.
+We plan to fix this limitation soon.
+
4 bin/suhdp
@@ -0,0 +1,4 @@
+#!/bin/bash
+
+BASEDIR=$(dirname $0)
+$HADOOP_HOME/bin/hadoop jar $BASEDIR/../target/seismic-0.1.0-job.jar $@
74 pom.xml
@@ -0,0 +1,74 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.cloudera.seismic</groupId>
+ <artifactId>seismic</artifactId>
+ <version>0.1.0</version>
+ <packaging>jar</packaging>
+
+ <name>seismic</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>0.20.2-cdh3u1</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.cloudera.crunch</groupId>
+ <artifactId>crunch</artifactId>
+ <version>0.1.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>3.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <repositories>
+ <repository>
+ <id>maven-hadoop</id>
+ <name>Hadoop Releases</name>
+ <url>https://repository.cloudera.com/content/repositories/releases/</url>
+ </repository>
+ <repository>
+ <id>cloudera-local</id>
+ <name>Cloudera Snapshots</name>
+ <url>https://repository.cloudera.com/artifactory/libs-release-local/</url>
+ </repository>
+ </repositories>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2.1</version>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/hadoop-job.xml</descriptor>
+ </descriptors>
+ <archive>
+ <manifest>
+ <mainClass>com.cloudera.seismic.segy.Main</mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
23 src/main/assembly/hadoop-job.xml
@@ -0,0 +1,23 @@
+<assembly>
+ <id>job</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <dependencySets>
+ <dependencySet>
+ <unpack>false</unpack>
+ <scope>runtime</scope>
+ <outputDirectory>lib</outputDirectory>
+ <excludes>
+ <exclude>${groupId}:${artifactId}</exclude>
+ </excludes>
+ </dependencySet>
+ <dependencySet>
+ <unpack>true</unpack>
+ <includes>
+ <include>${groupId}:${artifactId}</include>
+ </includes>
+ </dependencySet>
+ </dependencySets>
+</assembly>
51 src/main/java/com/cloudera/seismic/crunch/EmitterCallback.java
@@ -0,0 +1,51 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.seismic.crunch;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+import com.cloudera.crunch.Emitter;
+import com.cloudera.seismic.su.SUCallback;
+
+public class EmitterCallback implements SUCallback {
+
+ private transient Emitter<ByteBuffer> emitter;
+
+ public EmitterCallback(Emitter<ByteBuffer> emitter) {
+ this.emitter = emitter;
+ }
+
+ @Override
+ public void close() throws IOException {
+ emitter.flush();
+ }
+
+ @Override
+ public void initialize(TaskInputOutputContext context) throws IOException {
+ }
+
+ @Override
+ public void write(byte[] data, int start, int length) throws IOException {
+ emitter.emit(ByteBuffer.wrap(data, start, length));
+ }
+
+ @Override
+ public void cleanup(TaskInputOutputContext context) throws IOException {
+ }
+
+}
66 src/main/java/com/cloudera/seismic/crunch/SUDoFn.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.seismic.crunch;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import com.cloudera.crunch.DoFn;
+import com.cloudera.crunch.Emitter;
+import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException;
+import com.cloudera.seismic.su.SUCallback;
+import com.cloudera.seismic.su.SUProcess;
+
+public class SUDoFn extends DoFn<ByteBuffer, ByteBuffer> {
+
+ private final SUProcess proc;
+
+ private SUCallback emitterCallback;
+
+ public SUDoFn(SUProcess proc) {
+ this.proc = proc;
+ }
+
+ @Override
+ public void initialize() {
+ try {
+ proc.start();
+ } catch (IOException e) {
+ throw new CrunchRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void process(ByteBuffer input, Emitter<ByteBuffer> emitter) {
+ if (emitterCallback == null) {
+ this.emitterCallback = new EmitterCallback(emitter);
+ proc.addCallback(emitterCallback);
+ }
+ try {
+ proc.write(input.array(), input.arrayOffset(), input.limit());
+ } catch (IOException e) {
+ throw new CrunchRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void cleanup(Emitter<ByteBuffer> emitter) {
+ try {
+ proc.closeAndWait();
+ } catch (IOException e) {
+ throw new CrunchRuntimeException(e);
+ }
+ }
+}
173 src/main/java/com/cloudera/seismic/crunch/SUPipeline.java
@@ -0,0 +1,173 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.seismic.crunch;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.cloudera.crunch.PCollection;
+import com.cloudera.crunch.PGroupedTable;
+import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.Pipeline;
+import com.cloudera.crunch.TupleN;
+import com.cloudera.crunch.impl.mr.MRPipeline;
+import com.cloudera.crunch.io.From;
+import com.cloudera.crunch.io.To;
+import com.cloudera.crunch.lib.PTables;
+import com.cloudera.crunch.type.PTypeFamily;
+import com.cloudera.crunch.type.writable.Writables;
+import com.cloudera.seismic.su.SUProcess;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+
+public class SUPipeline extends Configured implements Tool {
+
+ private static final Set<String> X_COMMANDS = ImmutableSet.of(
+ "suxcontour", "suxgraph", "suximage", "suxmax", "suxmovie", "suxpicker", "suxwigb",
+ "xcontour", "ximage", "xpicker", "xwigb");
+
+ public PCollection<ByteBuffer> constructPipeline(PCollection<ByteBuffer> input, String cwproot, List<String> steps) {
+ PTypeFamily ptf = input.getTypeFamily();
+ PGroupedTable<TupleN, ByteBuffer> sorted = null;
+ for (String step : steps) {
+ String[] pieces = step.split("\\s+");
+ if ("susort".equals(pieces[0])) {
+ if (sorted != null) {
+ throw new IllegalArgumentException("Cannot have susort followed by susort");
+ } else {
+ List<String> keys = Lists.newArrayList();
+ for (int i = 1; i < pieces.length; i++) {
+ if (!pieces[i].isEmpty()) {
+ keys.add(pieces[i]);
+ }
+ }
+ if (keys.isEmpty()) {
+ throw new IllegalArgumentException("susort must have at least one key");
+ }
+ sorted = SUSort.apply(input, keys);
+ }
+ } else {
+ SUProcess proc = new SUProcess(cwproot, pieces[0]);
+ for (int i = 1; i < pieces.length; i++) {
+ proc.addArg(pieces[i]);
+ }
+ if (sorted == null) {
+ input = input.parallelDo(pieces[0], new SUDoFn(proc), ptf.bytes());
+ } else {
+ input = sorted.parallelDo(pieces[0], new SUPostGroupFn(proc), ptf.bytes());
+ sorted = null;
+ }
+ }
+ }
+ if (sorted != null) {
+ input = PTables.values(sorted.ungroup());
+ }
+ return input;
+ }
+
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+ options.addOption("cwproot", true, "The path to CWPROOT on the cluster machines");
+ options.addOption("input", true, "SU files in Hadoop");
+ options.addOption("output", true, "The path of the SU files to write out to Hadoop");
+ options.addOption("command", true, "A pipeline of SU commands to run on the data");
+
+ // Parse the commandline and check for required arguments.
+ CommandLine cmdLine = new PosixParser().parse(options, args, false);
+ if (!cmdLine.hasOption("input") || !cmdLine.hasOption("command")) {
+ System.out.println("Mising required input/command arguments");
+ new HelpFormatter().printHelp("SUPipeline", options);
+ System.exit(1);
+ }
+
+ String clusterCwproot = null;
+ if (cmdLine.hasOption("cwproot")) {
+ clusterCwproot = cmdLine.getOptionValue("cwproot");
+ }
+ if (clusterCwproot == null || clusterCwproot.isEmpty()) {
+ System.out.println("Could not determine cluster's CWPROOT value");
+ new HelpFormatter().printHelp("SUPipeline", options);
+ System.exit(1);
+ }
+
+ Pipeline pipeline = new MRPipeline(SUPipeline.class);
+ PCollection<ByteBuffer> traces = pipeline.read(From.sequenceFile(
+ cmdLine.getOptionValue("input"), Writables.bytes()));
+ Pair<List<String>, String> cmd = parse(cmdLine.getOptionValue("command"));
+ PCollection<ByteBuffer> result = constructPipeline(traces, clusterCwproot, cmd.first());
+
+ if (cmdLine.hasOption("output")) {
+ result.write(To.sequenceFile(cmdLine.getOptionValue("output")));
+ }
+
+ if (cmd.second() != null) {
+ String localCwproot = System.getenv("CWPROOT");
+ if (localCwproot == null) {
+ System.out.println("To use local SU commands, the CWPROOT environment variable must be set");
+ System.exit(1);
+ }
+ String[] pieces = cmd.second().split("\\s+");
+ SUProcess x = new SUProcess(localCwproot, pieces[0]);
+ for (int i = 1; i < pieces.length; i++) {
+ x.addArg(pieces[i]);
+ }
+ x.addEnvironment(ImmutableMap.of("DISPLAY", System.getenv("DISPLAY")));
+ Iterator<ByteBuffer> iter = result.materialize().iterator();
+ x.start();
+ while (iter.hasNext()) {
+ ByteBuffer bb = iter.next();
+ x.write(bb.array(), bb.arrayOffset(), bb.limit());
+ }
+ x.closeAndWait();
+ }
+
+ if (!cmdLine.hasOption("output") && cmd.second() == null) {
+ System.out.println("No output destination specified");
+ System.exit(1);
+ }
+
+ pipeline.done();
+ return 0;
+ }
+
+ private Pair<List<String>, String> parse(String command) {
+ List<String> hCmds = Lists.newArrayList();
+ String xCmd = null;
+ for (String arg : command.toLowerCase().split("\\|\\s+")) {
+ if (X_COMMANDS.contains(arg.split("\\s+")[0])) {
+ xCmd = arg;
+ break;
+ } else {
+ hCmds.add(arg);
+ }
+ }
+ return Pair.of(hCmds, xCmd);
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new SUPipeline(), args);
+ }
+}
71 src/main/java/com/cloudera/seismic/crunch/SUPostGroupFn.java
@@ -0,0 +1,71 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.seismic.crunch;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import com.cloudera.crunch.DoFn;
+import com.cloudera.crunch.Emitter;
+import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.TupleN;
+import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException;
+import com.cloudera.seismic.su.SUCallback;
+import com.cloudera.seismic.su.SUProcess;
+
+public class SUPostGroupFn extends DoFn<Pair<TupleN, Iterable<ByteBuffer>>, ByteBuffer> {
+
+ private final SUProcess proc;
+
+ private SUCallback emitterCallback;
+
+ public SUPostGroupFn(SUProcess proc) {
+ this.proc = proc;
+ }
+
+ @Override
+ public void initialize() {
+ try {
+ proc.start();
+ } catch (IOException e) {
+ throw new CrunchRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void process(Pair<TupleN, Iterable<ByteBuffer>> input,
+ Emitter<ByteBuffer> emitter) {
+ if (emitterCallback == null) {
+ emitterCallback = new EmitterCallback(emitter);
+ proc.addCallback(emitterCallback);
+ }
+ try {
+ for (ByteBuffer bb : input.second()) {
+ proc.write(bb.array(), bb.arrayOffset(), bb.limit());
+ }
+ } catch (IOException e) {
+ throw new CrunchRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void cleanup(Emitter<ByteBuffer> emitter) {
+ try {
+ proc.closeAndWait();
+ } catch (IOException e) {
+ throw new CrunchRuntimeException(e);
+ }
+ }
+}
84 src/main/java/com/cloudera/seismic/crunch/SUSort.java
@@ -0,0 +1,84 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.seismic.crunch;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.List;
+
+import com.cloudera.crunch.GroupingOptions;
+import com.cloudera.crunch.MapFn;
+import com.cloudera.crunch.PCollection;
+import com.cloudera.crunch.PGroupedTable;
+import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.TupleN;
+import com.cloudera.crunch.lib.JoinUtils;
+import com.cloudera.crunch.type.PType;
+import com.cloudera.crunch.type.PTypeFamily;
+import com.cloudera.seismic.segy.Field;
+import com.cloudera.seismic.segy.Fields;
+
+public class SUSort {
+
+ private static class HeaderExtractor extends MapFn<ByteBuffer, Pair<TupleN, ByteBuffer>> {
+ private final Field[] fields;
+ private final boolean[] negate;
+ private final Object[] headerValues;
+
+ public HeaderExtractor(Field[] fields, boolean[] negate) {
+ this.fields = fields;
+ this.negate = negate;
+ this.headerValues = new Integer[fields.length];
+ }
+
+ @Override
+ public Pair<TupleN, ByteBuffer> map(ByteBuffer input) {
+ input.order(ByteOrder.BIG_ENDIAN);
+ for (int i = 0; i < headerValues.length; i++) {
+ int v = fields[i].read(input);
+ headerValues[i] = negate[i] ? -v : v;
+ }
+ int x = input.array().length;
+ return Pair.of(new TupleN(headerValues), input);
+ }
+ }
+
+ public static PGroupedTable<TupleN, ByteBuffer> apply(PCollection<ByteBuffer> traces, List<String> keys) {
+ Field[] fields = new Field[keys.size()];
+ boolean[] negate = new boolean[keys.size()];
+ for (int i = 0; i < keys.size(); i++) {
+ String key = keys.get(i);
+ if (key.charAt(0) == '-') {
+ negate[i] = true;
+ key = key.substring(1);
+ }
+ fields[i] = Fields.getSortField(key);
+ if (fields[i] == null) {
+ throw new IllegalArgumentException("Unrecognized susort key: " + keys.get(i));
+ }
+ }
+
+ PTypeFamily tf = traces.getTypeFamily();
+ PType[] headerTypes = new PType[keys.size()];
+ for (int i = 0; i < keys.size(); i++) {
+ headerTypes[i] = tf.ints();
+ }
+
+ GroupingOptions options = GroupingOptions.builder()
+ .partitionerClass(JoinUtils.getPartitionerClass(tf)).build();
+ return traces.parallelDo("gethw", new HeaderExtractor(fields, negate),
+ tf.tableOf(tf.tuples(headerTypes), tf.bytes())).groupByKey(options);
+ }
+}
52 src/main/java/com/cloudera/seismic/segy/Field.java
@@ -0,0 +1,52 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.seismic.segy;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+/**
+ * A {@code Field} describes a value in a trace header file in terms of a byte
+ * offset and a {@link FieldType} that specifies how to interpret the data at
+ * that offset.
+ *
+ * <p>SEG-Y trace headers are only loosely standardized, so clients can create
+ * {@code Field} instances as necessary for the SEG-Y files they are working
+ * with.
+ */
+public class Field implements Serializable {
+ private final int offset;
+ private final FieldType type;
+
+ public Field(int offset, FieldType type) {
+ this.offset = offset;
+ this.type = type;
+ }
+
+ public int read(ByteBuffer buffer) {
+ return type.decode(buffer, offset);
+ }
+
+ public void write(ByteBuffer buffer, int value) {
+ type.encode(buffer, offset, value);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("Offset: ");
+ sb.append(offset).append(" Type: ").append(type);
+ return sb.toString();
+ }
+}
73 src/main/java/com/cloudera/seismic/segy/FieldType.java
@@ -0,0 +1,73 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.seismic.segy;
+
+import java.nio.ByteBuffer;
+
+/**
+ * The {@code FieldType} specifies how to encode/decode the bytes at an offset
+ * in a SEG-Y trace header.
+ *
+ * <p>The SEG-Y standard specifies three integer types for trace header fields:
+ * a signed 16-bit integer, an unsigned 16-bit integer, and a signed 32-bit
+ * integer. All three types are supported here, but clients only need to
+ * work with Java ints.
+ *
+ */
+public enum FieldType {
+ INT16 {
+ public void encode(ByteBuffer buffer, int offset, int value) {
+ buffer.putShort(offset, (short) value);
+ }
+ public int decode(ByteBuffer buffer, int offset) {
+ return buffer.getShort(offset);
+ }
+ },
+ UINT16 {
+ public void encode(ByteBuffer buffer, int offset, int value) {
+ buffer.putChar(offset, (char) value);
+ }
+ public int decode(ByteBuffer buffer, int offset) {
+ return buffer.getChar(offset);
+ }
+ },
+ INT32 {
+ public void encode(ByteBuffer buffer, int offset, int value) {
+ buffer.putInt(offset, value);
+ }
+ public int decode(ByteBuffer buffer, int offset) {
+ return buffer.getInt(offset);
+ }
+ };
+
+ /**
+ * Write the given value to the offset of the given buffer.
+ *
+ * @param buffer The buffer to write.
+ * @param offset The offset into the buffer to begin writing at.
+ * @param value The value to write.
+ */
+ public abstract void encode(ByteBuffer buffer, int offset, int value);
+
+
+ /**
+ * Read the value of this type from the given offset into the buffer.
+ *
+ * @param buffer The buffer to read from.
+ * @param offset The offset into the buffer to read.
+ * @return The value that was read.
+ */
+ public abstract int decode(ByteBuffer buffer, int offset);
+}
91 src/main/java/com/cloudera/seismic/segy/Fields.java
@@ -0,0 +1,91 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.seismic.segy;
+
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Common {@link Field} values that occur in most SEG-Y files.
+ */
+public class Fields {
+ public static final Field TRACE_SEQUENCE_LINE = new Field(0, FieldType.INT32);
+ public static final Field TRACE_SEQUENCE_FILE = new Field(4, FieldType.INT32);
+ public static final Field FIELD_RECORD = new Field(8, FieldType.INT32);
+ public static final Field TRACE_NUMBER = new Field(12, FieldType.INT32);
+ public static final Field ENERGY_SOURCE_POINT = new Field(16, FieldType.INT32);
+ public static final Field CDP = new Field(20, FieldType.INT32);
+ public static final Field CDP_TRACE = new Field(24, FieldType.INT32);
+ public static final Field TRACE_IDENTIFICATION_CODE = new Field(28, FieldType.UINT16);
+ public static final Field NUM_SUMMED_TRACE = new Field(30, FieldType.INT16);
+ public static final Field NUM_STACKED_TRACES = new Field(32, FieldType.INT16);
+ public static final Field DATA_USE = new Field(34, FieldType.INT16);
+ public static final Field OFFSET = new Field(36, FieldType.INT32);
+ public static final Field RECEIVER_GROUP_ELEVATION = new Field(40, FieldType.INT32);
+ public static final Field SOURCE_SURFACE_ELEVATION = new Field(44, FieldType.INT32);
+ public static final Field SOURCE_DEPTH = new Field(48, FieldType.INT32);
+ public static final Field RECEIVER_DATUM_ELEVATION = new Field(52, FieldType.INT32);
+ public static final Field SOURCE_DATUM_ELEVATION = new Field(56, FieldType.INT32);
+ public static final Field SOURCE_WATER_DEPTH = new Field(60, FieldType.INT32);
+ public static final Field GROUP_WATER_DEPTH = new Field(64, FieldType.INT32);
+ public static final Field ELEVATION_SCALAR = new Field(68, FieldType.INT16);
+ public static final Field SOURCE_GROUP_SCALAR = new Field(70, FieldType.INT16);
+ public static final Field SOURCE_X = new Field(72, FieldType.INT32);
+ public static final Field SOURCE_Y = new Field(76, FieldType.INT32);
+ public static final Field GROUP_X = new Field(80, FieldType.INT32);
+ public static final Field GROUP_Y = new Field(84, FieldType.INT32);
+ public static final Field COORDINATE_UNITS = new Field(88, FieldType.INT16);
+ public static final Field WEATHERING_VELOCITY = new Field(90, FieldType.INT16);
+ public static final Field SUB_WEATHERING_VELOCITY = new Field(92, FieldType.INT16);
+ public static final Field SOURCE_UPHOLE_TIME = new Field(94, FieldType.INT16);
+ public static final Field GROUP_UPHOLE_TIME = new Field(96, FieldType.INT16);
+ public static final Field SOURCE_STATIC_CORRECTION = new Field(98, FieldType.INT16);
+ public static final Field GROUP_STATIC_CORRECTION = new Field(100, FieldType.INT16);
+ public static final Field TOTAL_STATIC_APPLIED = new Field(102, FieldType.INT16);
+ public static final Field LAG_TIME_A = new Field(104, FieldType.INT16);
+ public static final Field LAG_TIME_B = new Field(106, FieldType.INT16);
+ public static final Field DELAY_RECORDING_TIME = new Field(108, FieldType.INT16);
+ public static final Field MUTE_TIME_START = new Field(110, FieldType.INT16);
+ public static final Field MUTE_TIME_END = new Field(112, FieldType.INT16);
+ public static final Field NUM_SAMPLES = new Field(114, FieldType.UINT16);
+ public static final Field SAMPLE_INTERVAL = new Field(116, FieldType.UINT16);
+ public static final Field COURSE = new Field(124, FieldType.INT16);
+ public static final Field SPEED = new Field(126, FieldType.INT16);
+ public static final Field YEAR = new Field(156, FieldType.INT16);
+ public static final Field DAY_OF_YEAR = new Field(158, FieldType.INT16);
+ public static final Field HOUR = new Field(160, FieldType.INT16);
+ public static final Field MINUTE = new Field(162, FieldType.INT16);
+ public static final Field SECOND = new Field(164, FieldType.INT16);
+ public static final Field TIME_BASE_CODE = new Field(166, FieldType.INT16);
+
+ private static Map<String, Field> SORT_FIELDS = ImmutableMap.<String, Field>builder()
+ .put("ep", ENERGY_SOURCE_POINT)
+ .put("cdp", CDP)
+ .put("cdpt", CDP_TRACE)
+ .put("sx", SOURCE_X)
+ .put("sy", SOURCE_Y)
+ .put("gx", GROUP_X)
+ .put("gy", GROUP_Y)
+ .put("offset", OFFSET)
+ .build();
+
+ public static Field getSortField(String shortName) {
+ return SORT_FIELDS.get(shortName);
+ }
+
+ // Cannot be instantiated
+ private Fields() {}
+}
43 src/main/java/com/cloudera/seismic/segy/Main.java
@@ -0,0 +1,43 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.seismic.segy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.cloudera.seismic.crunch.SUPipeline;
+
+public class Main {
+ public static void main(String[] args) throws Exception {
+ if (args.length == 0) {
+ System.err.println("Please speicfy a command: load|run|unload");
+ System.exit(1);
+ }
+ Configuration conf = new Configuration();
+ String cmd = args[0].toLowerCase();
+ String[] remaining = new String[args.length - 1];
+ System.arraycopy(args, 1, remaining, 0, args.length - 1);
+ if ("load".equals(cmd)) {
+ ToolRunner.run(conf, new SegyLoader(), remaining);
+ } else if ("run".equals(cmd)) {
+ ToolRunner.run(conf, new SUPipeline(), remaining);
+ } else if ("unload".equals(cmd)) {
+ ToolRunner.run(conf, new SegyUnloader(), remaining);
+ } else {
+ System.err.println("Unrecognized command: " + cmd);
+ System.exit(1);
+ }
+ }
+}
29 src/main/java/com/cloudera/seismic/segy/SegyException.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.seismic.segy;
+
+import java.io.IOException;
+
+/**
+ * For exceptions that occur reading SEG-Y files.
+ *
+ */
+public class SegyException extends IOException {
+ private static final long serialVersionUID = 2018563673978353367L;
+
+ public SegyException(String msg) {
+ super(msg);
+ }
+}
123 src/main/java/com/cloudera/seismic/segy/SegyLoader.java
@@ -0,0 +1,123 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.seismic.segy;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.util.List;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.cloudera.seismic.su.SUCallback;
+import com.cloudera.seismic.su.SUProcess;
+import com.cloudera.seismic.su.SUReader;
+import com.cloudera.seismic.su.SequenceFileCallback;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+/**
+ * Command-line utility for reading in SEG-Y formatted trace data and writing
+ * the data into a block-compressed {@link SequenceFile} for processing by
+ * Hadoop.
+ *
+ */
+public class SegyLoader extends Configured implements Tool {
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+ options.addOption("cwproot", true, "The path to CWPROOT on this machine");
+ options.addOption("input", true, "SEG-Y files to import into Hadoop");
+ options.addOption("output", true, "The path of the sequence file to write in Hadoop");
+
+ // Parse the commandline and check for required arguments.
+ CommandLine cmdLine = new PosixParser().parse(options, args, false);
+ if (!cmdLine.hasOption("input") || !cmdLine.hasOption("output")) {
+ System.out.println("Mising required input/output arguments");
+ new HelpFormatter().printHelp("SegyLoader", options);
+ System.exit(1);
+ }
+
+ String cwproot = System.getenv("CWPROOT");
+ if (cmdLine.hasOption("cwproot")) {
+ cwproot = cmdLine.getOptionValue("cwproot");
+ }
+ if (cwproot == null || cwproot.isEmpty()) {
+ System.out.println("Could not determine CWPROOT value, using /usr/local/su...");
+ cwproot = "/usr/local/su";
+ }
+
+ // Assume any remaining args are for segyread
+ List<String> segyReadArgs = Lists.newArrayList();
+ for (String arg : cmdLine.getArgs()) {
+ if (arg.contains("=")) {
+ segyReadArgs.add(arg);
+ }
+ }
+
+ // Open the output sequence file.
+ Configuration conf = getConf();
+ Path outputPath = new Path(cmdLine.getOptionValue("output"));
+ SequenceFile.Writer writer = SequenceFile.createWriter(FileSystem.get(conf), conf,
+ outputPath, NullWritable.class, BytesWritable.class, CompressionType.BLOCK);
+ int rc = 0;
+ SequenceFileCallback sfc = new SequenceFileCallback(writer);
+ try {
+ for (String filename : cmdLine.getOptionValues("input")) {
+ System.out.println("Reading input file: " + filename);
+ if (filename.endsWith(".su")) {
+ SUReader reader = new SUReader(new BufferedInputStream(new FileInputStream(filename)),
+ ImmutableList.<SUCallback>of(sfc));
+ reader.run();
+ System.out.println("Bytes read: " + reader.getBytesRead());
+ } else {
+ SUProcess proc = new SUProcess(cwproot, "segyread");
+ for (String arg : segyReadArgs) {
+ proc.addArg(arg);
+ }
+ proc.addArg(String.format("tape=%s", filename));
+ proc.addCallback(sfc);
+ proc.start();
+ rc += proc.closeAndWait();
+ System.out.println("Bytes read: " + proc.getTotalBytesRead());
+ }
+ }
+ System.out.println("Bytes written: " + sfc.getBytesWritten());
+ } catch (Throwable t) {
+ t.printStackTrace();
+ rc = 1;
+ } finally {
+ writer.close();
+ }
+ return rc;
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new Configuration(), new SegyLoader(), args);
+ }
+}
90 src/main/java/com/cloudera/seismic/segy/SegyUnloader.java
@@ -0,0 +1,90 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.seismic.segy;
+
+import java.io.DataOutputStream;
+import java.io.FileOutputStream;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class SegyUnloader extends Configured implements Tool {
+
+ private void write(Path path, DataOutputStream out, Configuration conf) throws Exception {
+ System.out.println("Reading: " + path);
+ SequenceFile.Reader reader = new SequenceFile.Reader(FileSystem.get(conf), path, conf);
+ BytesWritable value = new BytesWritable();
+ while (reader.next(NullWritable.get(), value)) {
+ out.write(value.getBytes(), 0, value.getLength());
+ }
+ reader.close();
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+ options.addOption("input", true, "SU sequence files to export from Hadoop");
+ options.addOption("output", true, "The local SU file to write");
+
+ // Parse the commandline and check for required arguments.
+ CommandLine cmdLine = new PosixParser().parse(options, args, false);
+ if (!cmdLine.hasOption("input") || !cmdLine.hasOption("output")) {
+ System.out.println("Mising required input/output arguments");
+ new HelpFormatter().printHelp("SegyUnloader", options);
+ System.exit(1);
+ }
+
+ Configuration conf = getConf();
+ FileSystem hdfs = FileSystem.get(conf);
+ Path inputPath = new Path(cmdLine.getOptionValue("input"));
+ if (!hdfs.exists(inputPath)) {
+ System.out.println("Input path does not exist");
+ System.exit(1);
+ }
+
+ PathFilter pf = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return !path.getName().startsWith("_");
+ }
+ };
+
+ DataOutputStream os = new DataOutputStream(
+ new FileOutputStream(cmdLine.getOptionValue("output")));
+ for (FileStatus fs : hdfs.listStatus(inputPath, pf)) {
+ write(fs.getPath(), os, conf);
+ }
+ os.close();
+
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new SegyUnloader(), args);
+ }
+}
15 src/main/java/com/cloudera/seismic/su/SUCallback.java
@@ -0,0 +1,15 @@
+package com.cloudera.seismic.su;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+public interface SUCallback extends Serializable, Closeable {
+ public void initialize(TaskInputOutputContext context) throws IOException;
+
+ public void write(byte[] data, int start, int length) throws IOException;
+
+ public void cleanup(TaskInputOutputContext context) throws IOException;
+}
156 src/main/java/com/cloudera/seismic/su/SUProcess.java
@@ -0,0 +1,156 @@
+package com.cloudera.seismic.su;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.util.LineReader;
+
+public class SUProcess implements SUCallback {
+
+ private final String cwproot;
+ private final List<String> command;
+ private final Map<String, String> environment;
+ private final List<SUCallback> callbacks;
+
+ private transient Process process;
+ private transient OutputStream outputStream;
+ private transient OutputThread outputThread;
+ private transient Thread errorThread;
+
+ public SUProcess(String cwproot, String command) {
+ this.cwproot = cwproot;
+ this.command = new ArrayList<String>();
+ this.command.add(cwproot + "/bin/" + command);
+ this.environment = new HashMap<String, String>();
+ this.callbacks = new ArrayList<SUCallback>();
+ }
+
+ public SUProcess addArg(String arg) {
+ this.command.add(arg);
+ return this;
+ }
+
+ public SUProcess addEnvironment(Map<String, String> environment) {
+ this.environment.putAll(environment);
+ return this;
+ }
+
+ public SUProcess addCallback(SUCallback callback) {
+ this.callbacks.add(callback);
+ return this;
+ }
+
+ @Override
+ public void initialize(TaskInputOutputContext context) throws IOException {
+ for (SUCallback callback : callbacks) {
+ callback.initialize(context);
+ }
+ start();
+ }
+
+ @Override
+ public void cleanup(TaskInputOutputContext context) throws IOException {
+ closeAndWait();
+ for (SUCallback callback : callbacks) {
+ cleanup(context);
+ }
+ }
+
+ public void start() throws IOException {
+ ProcessBuilder pb = new ProcessBuilder(command);
+ pb.environment().putAll(environment);
+ pb.environment().put("CWPROOT", cwproot);
+ this.process = pb.start();
+
+ this.outputStream = new BufferedOutputStream(process.getOutputStream());
+ this.errorThread = new ErrorThread(process.getErrorStream());
+ this.errorThread.start();
+
+ this.outputThread = new OutputThread(new BufferedInputStream(process.getInputStream()),
+ callbacks);
+ this.outputThread.start();
+ }
+
+ public int closeAndWait() throws IOException {
+ int rc = 1;
+ close();
+ try { outputThread.join(); } catch (InterruptedException ignore) {}
+ try { errorThread.join(); } catch (InterruptedException ignore) {}
+ try { rc = process.waitFor(); } catch (InterruptedException ignore) {}
+ process.destroy();
+ return rc;
+ }
+
+ public int getTotalBytesRead() {
+ return outputThread.getBytesRead();
+ }
+
+ @Override
+ public void write(byte[] data, int start, int len) throws IOException {
+ if (outputStream == null) {
+ return;
+ }
+ try {
+ outputStream.write(data, start, len);
+ } catch (IOException e) {
+ // A hack for the case when an SU process exits early.
+ if ("Broken pipe".equals(e.getMessage())) {
+ outputStream = null;
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (outputStream != null) {
+ outputStream.flush();
+ outputStream.close();
+ }
+ }
+
+ private static class OutputThread extends Thread {
+ private final SUReader reader;
+
+ public OutputThread(InputStream stream, List<SUCallback> callbacks) {
+ this.reader = new SUReader(stream, callbacks);
+ }
+
+ public int getBytesRead() {
+ return reader.getBytesRead();
+ }
+
+ public void run() {
+ reader.run();
+ }
+ }
+
+ private static class ErrorThread extends Thread {
+ private final InputStream stream;
+
+ public ErrorThread(InputStream stream) {
+ this.stream = stream;
+ }
+
+ public void run() {
+ Text line = new Text();
+ LineReader lineReader = new LineReader(stream);
+ try {
+ while (lineReader.readLine(line) > 0) {
+ System.err.println(line);
+ }
+ lineReader.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
90 src/main/java/com/cloudera/seismic/su/SUReader.java
@@ -0,0 +1,90 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.seismic.su;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.List;
+
+import com.cloudera.seismic.segy.Fields;
+
+/**
+ * Handler for reading SU data from a stream and passing it along
+ * to one or more callbacks.
+ */
+public class SUReader implements Runnable {
+
+ private final InputStream stream;
+ private final List<SUCallback> callbacks;
+ private int bytesRead = 0;
+
+ public SUReader(InputStream stream, List<SUCallback> callbacks) {
+ this.stream = stream;
+ this.callbacks = callbacks;
+ }
+
+ public int getBytesRead() {
+ return bytesRead;
+ }
+
+ @Override
+ public void run() {
+ int bytesInSample = 0;
+ byte[] data = new byte[240];
+ ByteBuffer headerBuffer = ByteBuffer.wrap(data).order(ByteOrder.BIG_ENDIAN);
+ int read = 0;
+ try {
+ while (read != -1) {
+ read = stream.read(data, 0, 240);
+ if (read == -1) {
+ break;
+ }
+ while (read < 240) {
+ read += stream.read(data, read, 240 - read);
+ }
+ bytesRead += read;
+ int samples = Fields.NUM_SAMPLES.read(headerBuffer);
+ if (samples * 4 != bytesInSample) {
+ bytesInSample = samples * 4;
+ byte[] tmp = new byte[240 + bytesInSample];
+ System.arraycopy(data, 0, tmp, 0, 240);
+ data = tmp;
+ headerBuffer = ByteBuffer.wrap(data).order(ByteOrder.BIG_ENDIAN);
+ }
+ read = 0;
+ while (read < bytesInSample) {
+ int thisRead = stream.read(data, 240 + read, bytesInSample - read);
+ if (thisRead == -1) {
+ throw new IOException("EOF reached unexpectedly in trace");
+ }
+ read += thisRead;
+ bytesRead += thisRead;
+ }
+ for (SUCallback callback : callbacks) {
+ callback.write(data, 0, 240 + bytesInSample);
+ }
+ }
+ if (read != -1) {
+ throw new IOException("Non-negative read exit");
+ }
+ stream.close();
+ } catch (IOException e) {
+ // Ruh-roh.
+ throw new RuntimeException(e);
+ }
+ }
+}
47 src/main/java/com/cloudera/seismic/su/SequenceFileCallback.java
@@ -0,0 +1,47 @@
+package com.cloudera.seismic.su;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+public class SequenceFileCallback implements SUCallback {
+
+ private final SequenceFile.Writer writer;
+ private final BytesWritable value;
+ private int bytesWritten = 0;
+
+ public SequenceFileCallback(SequenceFile.Writer writer) {
+ this.writer = writer;
+ this.value = new BytesWritable();
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.close();
+ }
+
+ public int getBytesWritten() {
+ return bytesWritten;
+ }
+
+ @Override
+ public void write(byte[] data, int start, int length) throws IOException {
+ value.set(data, start, length);
+ writer.append(NullWritable.get(), value);
+ bytesWritten += length;
+ }
+
+ @Override
+ public void initialize(TaskInputOutputContext context) throws IOException {
+ // No-op in this implementation
+ }
+
+ @Override
+ public void cleanup(TaskInputOutputContext context) throws IOException {
+ // No-op
+ }
+
+}
35 src/test/java/com/cloudera/seismic/segy/FieldTypeTest.java
@@ -0,0 +1,35 @@
+/**
+ * Copyright 2011 Cloudera, Inc. All Rights Reserved.
+ */
+
+package com.cloudera.seismic.segy;
+
+import java.nio.ByteBuffer;
+
+import junit.framework.TestCase;
+
+public class FieldTypeTest extends TestCase {
+
+ private byte[] bytes = new byte[4];
+ private ByteBuffer buffer = ByteBuffer.wrap(bytes);
+
+ public void testUint16() throws Exception {
+ FieldType.UINT16.encode(buffer, 0, 52007);
+ assertEquals(52007, FieldType.UINT16.decode(buffer, 0));
+ }
+
+ public void testUint16Negative() throws Exception {
+ FieldType.UINT16.encode(buffer, 0, -16);
+ assertEquals(65520, FieldType.UINT16.decode(buffer, 0));
+ }
+
+ public void testInt16() throws Exception {
+ FieldType.INT16.encode(buffer, 0, 52007);
+ assertEquals(-13529, FieldType.INT16.decode(buffer, 0));
+ }
+
+ public void testInt16Negative() throws Exception {
+ FieldType.INT16.encode(buffer, 0, -16);
+ assertEquals(-16, FieldType.INT16.decode(buffer, 0));
+ }
+}
31 src/test/java/com/cloudera/seismic/su/SUProcessTest.java
@@ -0,0 +1,31 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.seismic.su;
+
+import com.cloudera.seismic.su.SUProcess;
+
+import junit.framework.TestCase;
+
+/**
+ * @author josh
+ *
+ */
+public class SUProcessTest extends TestCase {
+ public void testHelp() throws Exception {
+ SUProcess proc = new SUProcess("/usr/local/su", "susort");
+ proc.start();
+ System.out.println(proc.closeAndWait());
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.