Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STORM-1675 - Allow submitting multiple jars from the client #1296

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
[org.apache.commons.io FileUtils])
(:use [org.apache.storm config util log converter local-state-converter])
(:import [org.apache.storm.generated AuthorizationException KeyNotFoundException WorkerResources])
(:import [org.apache.storm.utils NimbusLeaderNotFoundException VersionInfo])
(:import [org.apache.storm.utils NimbusLeaderNotFoundException VersionInfo Zipper])
(:import [java.nio.file Files StandardCopyOption])
(:import [org.apache.storm.generated WorkerResources ProfileAction LocalAssignment])
(:import [org.apache.storm Config ProcessSimulator])
Expand Down Expand Up @@ -393,12 +393,12 @@
(defn required-topo-files-exist?
[conf storm-id]
(let [stormroot (ConfigUtils/supervisorStormDistRoot conf storm-id)
stormjarpath (ConfigUtils/supervisorStormJarPath stormroot)
storm-jar-zip-path (ConfigUtils/supervisorStormJarZipPath stormroot)
stormcodepath (ConfigUtils/supervisorStormCodePath stormroot)
stormconfpath (ConfigUtils/supervisorStormConfPath stormroot)]
(and (every? #(Utils/checkFileExists %) [stormroot stormconfpath stormcodepath])
(or (ConfigUtils/isLocalMode conf)
(Utils/checkFileExists stormjarpath)))))
(Utils/checkFileExists storm-jar-zip-path)))))

(defn get-worker-assignment-helper-msg
[assignment supervisor port id]
Expand Down Expand Up @@ -1053,13 +1053,14 @@
(if (conf SUPERVISOR-RUN-WORKER-AS-USER)
(throw (RuntimeException. (str "ERROR: Windows doesn't implement setting the correct permissions")))))
(Utils/downloadResourcesAsSupervisor (ConfigUtils/masterStormJarKey storm-id)
(ConfigUtils/supervisorStormJarPath tmproot) blobstore)
(ConfigUtils/supervisorStormJarZipPath tmproot) blobstore)
(Utils/downloadResourcesAsSupervisor (ConfigUtils/masterStormCodeKey storm-id)
(ConfigUtils/supervisorStormCodePath tmproot) blobstore)
(Utils/downloadResourcesAsSupervisor (ConfigUtils/masterStormConfKey storm-id)
(ConfigUtils/supervisorStormConfPath tmproot) blobstore)
(.shutdown blobstore)
(Utils/extractDirFromJar (ConfigUtils/supervisorStormJarPath tmproot) ConfigUtils/RESOURCES_SUBDIR tmproot)
(Zipper/unzip (ConfigUtils/supervisorStormJarZipPath tmproot) tmproot)
; (Utils/extractDirFromJar (ConfigUtils/supervisorStormJarPath tmproot) ConfigUtils/RESOURCES_SUBDIR tmproot)
(download-blobs-for-topology! conf (ConfigUtils/supervisorStormConfPath tmproot) localizer
tmproot)
(if (download-blobs-for-topology-succeed? (ConfigUtils/supervisorStormConfPath tmproot) tmproot)
Expand Down Expand Up @@ -1176,13 +1177,13 @@
(str storm-home Utils/FILE_PATH_SEPARATOR "log4j2"))
stormroot (ConfigUtils/supervisorStormDistRoot conf storm-id)
jlp (jlp stormroot conf)
stormjar (ConfigUtils/supervisorStormJarPath stormroot)
stormjarlist (Utils/getFullJars (ConfigUtils/concatIfNotNull stormroot))
storm-conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id))
topo-classpath (if-let [cp (storm-conf TOPOLOGY-CLASSPATH)]
[cp]
[])
classpath (-> (Utils/workerClasspath)
(Utils/addToClasspath [stormjar])
(Utils/addToClasspath stormjarlist)
(Utils/addToClasspath topo-classpath))
top-gc-opts (storm-conf TOPOLOGY-WORKER-GC-CHILDOPTS)

Expand Down
73 changes: 52 additions & 21 deletions storm-core/src/jvm/org/apache/storm/StormSubmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,40 @@
*/
package org.apache.storm;

import java.io.File;
import java.nio.ByteBuffer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.ClusterSummary;
import org.apache.storm.generated.Credentials;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.NotAliveException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.SubmitOptions;
import org.apache.storm.generated.TopologyInfo;
import org.apache.storm.generated.TopologyInitialStatus;
import org.apache.storm.generated.TopologySummary;
import org.apache.storm.scheduler.resource.ResourceUtils;
import org.apache.storm.security.auth.AuthUtils;
import org.apache.storm.security.auth.IAutoCredentials;
import org.apache.storm.utils.BufferFileInputStream;
import org.apache.storm.utils.NimbusClient;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.Zipper;
import org.apache.storm.validation.ConfigValidation;
import org.apache.commons.lang.StringUtils;
import org.apache.thrift.TException;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.storm.security.auth.IAutoCredentials;
import org.apache.storm.security.auth.AuthUtils;
import org.apache.storm.generated.*;
import org.apache.storm.utils.BufferFileInputStream;
import org.apache.storm.utils.NimbusClient;
import org.apache.storm.utils.Utils;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* Use this class to submit topologies to run on the Storm cluster. You should run your program
Expand Down Expand Up @@ -382,23 +395,41 @@ public static String submitJarAs(Map conf, String localJar, ProgressListener lis
throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
}

String[] jars = localJar.split(":");
File tmpDir = FileUtils.getTempDirectory();
String zipFilePath = new File(tmpDir, "stormjars.zip").getAbsolutePath();
LOG.info("Zipping " + localJar + " to " + zipFilePath);
try {
Zipper.zip(Arrays.asList(jars), zipFilePath);
} catch (IOException e) {
throw new RuntimeException("Failed to zip jars " + localJar + " into zip " + zipFilePath, e);
}

return submitFileAs(conf, zipFilePath, listener, asUser);
}

private static String submitFileAs(Map conf, String localFile, ProgressListener listener, String asUser) {
if (localFile == null) {
throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
}

NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser);
try {
String uploadLocation = client.getClient().beginFileUpload();
LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation);
BufferFileInputStream is = new BufferFileInputStream(localJar, THRIFT_CHUNK_SIZE_BYTES);
LOG.info("Uploading local file " + localFile + " to assigned location: " + uploadLocation);
BufferFileInputStream is = new BufferFileInputStream(localFile, THRIFT_CHUNK_SIZE_BYTES);

long totalSize = new File(localJar).length();
long totalSize = new File(localFile).length();
if (listener != null) {
listener.onStart(localJar, uploadLocation, totalSize);
listener.onStart(localFile, uploadLocation, totalSize);
}

long bytesUploaded = 0;
while(true) {
byte[] toSubmit = is.read();
bytesUploaded += toSubmit.length;
if (listener != null) {
listener.onProgress(localJar, uploadLocation, bytesUploaded, totalSize);
listener.onProgress(localFile, uploadLocation, bytesUploaded, totalSize);
}

if(toSubmit.length==0) break;
Expand All @@ -407,10 +438,10 @@ public static String submitJarAs(Map conf, String localJar, ProgressListener lis
client.getClient().finishFileUpload(uploadLocation);

if (listener != null) {
listener.onCompleted(localJar, uploadLocation, totalSize);
listener.onCompleted(localFile, uploadLocation, totalSize);
}

LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation);
LOG.info("Successfully uploaded " + localFile + " to assigned location: " + uploadLocation);
return uploadLocation;
} catch(Exception e) {
throw new RuntimeException(e);
Expand Down
10 changes: 5 additions & 5 deletions storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public static String masterLocalDir(Map conf) throws IOException {
}

public static String masterStormJarKey(String topologyId) {
return (topologyId + "-stormjar.jar");
return (topologyId + "-stormjar.zip");
}

public static String masterStormCodeKey(String topologyId) {
Expand Down Expand Up @@ -253,7 +253,7 @@ public static Map readSupervisorStormConfGivenPath(Map conf, String stormConfPat
}

public static String masterStormJarPath(String stormRoot) {
return (stormRoot + FILE_SEPARATOR + "stormjar.jar");
return (stormRoot + FILE_SEPARATOR + "stormjar.zip");
}

public static String masterInbox(Map conf) throws IOException {
Expand Down Expand Up @@ -308,8 +308,8 @@ public static String concatIfNotNull(String dir) {
return ret;
}

public static String supervisorStormJarPath(String stormRoot) {
return (concatIfNotNull(stormRoot) + FILE_SEPARATOR + "stormjar.jar");
public static String supervisorStormJarZipPath(String stormRoot) {
return (concatIfNotNull(stormRoot) + FILE_SEPARATOR + "stormjar.zip");
}

public static String supervisorStormCodePath(String stormRoot) {
Expand Down Expand Up @@ -396,7 +396,7 @@ public static String getWorkerUser(Map conf, String workerId) {

public static String getIdFromBlobKey(String key) {
if (key == null) return null;
final String STORM_JAR_SUFFIX = "-stormjar.jar";
final String STORM_JAR_SUFFIX = "-stormjar.zip";
final String STORM_CODE_SUFFIX = "-stormcode.ser";
final String STORM_CONF_SUFFIX = "-stormconf.ser";

Expand Down
2 changes: 1 addition & 1 deletion storm-core/src/jvm/org/apache/storm/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -2032,7 +2032,7 @@ public String currentClasspathImpl() {
* @param dir the directory to search
* @return the jar file names
*/
private static List<String> getFullJars(String dir) {
public static List<String> getFullJars(String dir) {
File[] files = new File(dir).listFiles(jarFilter);

if(files == null) {
Expand Down
82 changes: 82 additions & 0 deletions storm-core/src/jvm/org/apache/storm/utils/Zipper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package org.apache.storm.utils;

import org.apache.commons.io.FileUtils;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;

public class Zipper {

public static void zip(List<String> paths, String zipFilePath) throws IOException {
File zipFile = new File(zipFilePath);
if (!zipFile.exists()) {
zipFile.delete();
}
zipFile.createNewFile();

ZipOutputStream zipOutputStream = new ZipOutputStream(new FileOutputStream(zipFile));
Map<String, File> zipEntryFiles = new HashMap<>();
for (String path : paths) {
File file = new File(path);
String parentPath = file.getParent();
if (file.isDirectory()) {
Collection<File> nestedFiles = FileUtils.listFiles(file, null, true);
for (File nestedFile : nestedFiles) {
String relativePath = Paths.get(parentPath).relativize(Paths.get(nestedFile.getPath())).toString();
zipEntryFiles.put(relativePath, nestedFile);
}
} else {
zipEntryFiles.put(file.getName(), file);
}
}
for (Map.Entry<String, File> zipEntryFile : zipEntryFiles.entrySet()) {
File fileToBeZipped = zipEntryFile.getValue();
zipOutputStream.putNextEntry(new ZipEntry(zipEntryFile.getKey()));
copyContents(zipOutputStream, new FileInputStream(fileToBeZipped));
zipOutputStream.closeEntry();
}
zipOutputStream.close();
}

public static void unzip(String zipFile, String localDir) throws IOException {
File file = new File(localDir);
if (!file.exists()) {
file.mkdir();
}

ZipInputStream zipInputStream = new ZipInputStream(new FileInputStream(zipFile));
ZipEntry entry = zipInputStream.getNextEntry();
while (null != entry) {
File outputFile = new File(localDir, entry.getName());
outputFile.getParentFile().mkdirs();
FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
copyContents(fileOutputStream, zipInputStream);
zipInputStream.closeEntry();
entry = zipInputStream.getNextEntry();
}
zipInputStream.close();
}

private static void copyContents(OutputStream out, InputStream inputStream) throws IOException {
BufferedInputStream in = new BufferedInputStream(inputStream);
byte[] buffer = new byte[1024];
int len;
while ((len = in.read(buffer)) >= 0) {
out.write(buffer, 0, len);
}
}

}