From bb47849157be640eb417a1021a3c55d220139bd4 Mon Sep 17 00:00:00 2001 From: xie-jia-jun Date: Fri, 13 Mar 2020 01:49:22 +0800 Subject: [PATCH 1/6] Support S3ConfigStorage of AWS --- .../zeppelin/storage/S3ConfigStorage.java | 148 ++++++++++++++++++ 1 file changed, 148 insertions(+) create mode 100644 zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/S3ConfigStorage.java diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/S3ConfigStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/S3ConfigStorage.java new file mode 100644 index 00000000000..5f79bba66c5 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/S3ConfigStorage.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.storage; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.ClientConfigurationFactory; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.S3Object; +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.InterpreterInfoSaving; +import org.apache.zeppelin.notebook.NotebookAuthorizationInfoSaving; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; + +/** + * Storing config in aws s3 file system + */ +public class S3ConfigStorage extends ConfigStorage { + + + private static Logger LOGGER = LoggerFactory.getLogger(S3ConfigStorage.class); + + private AmazonS3 s3client; + private String bucketName; + private String user; + private String rootFolder; + private String interpreterSettingPath; + private String authorizationPath; + + + + public S3ConfigStorage(ZeppelinConfiguration zConf) { + super(zConf); + bucketName = zConf.getS3BucketName(); + user = zConf.getS3User(); + rootFolder = user + "/conf"; + this.interpreterSettingPath = rootFolder + "/interpreter.json"; + this.authorizationPath = rootFolder + "/notebook-authorization.json"; + + // always use the default provider chain + AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain(); + + ClientConfigurationFactory configFactory = new ClientConfigurationFactory(); + ClientConfiguration cliConf = configFactory.getConfig(); + + // regular S3 + this.s3client = new AmazonS3Client(credentialsProvider, cliConf); + + // set S3 endpoint to use + s3client.setEndpoint(zConf.getS3Endpoint()); + } + + @Override + public void save(InterpreterInfoSaving settingInfos) throws IOException { + LOGGER.info("Save Interpreter Setting to s3://{}/{}", this.bucketName, this.interpreterSettingPath); + saveToS3(settingInfos.toJson(), interpreterSettingPath,"zeppelin-interpreter"); + } + + @Override + public InterpreterInfoSaving loadInterpreterSettings() throws IOException { + LOGGER.info("Load Interpreter Setting from s3 Path: " + interpreterSettingPath); + String json = readFromS3(interpreterSettingPath); + return buildInterpreterInfoSaving(json); + } + + @Override + public void save(NotebookAuthorizationInfoSaving authorizationInfoSaving) throws IOException { + LOGGER.info("Save notebook authorization to s3://{}/{} ",this.bucketName,this.authorizationPath); + saveToS3(authorizationInfoSaving.toJson(), authorizationPath,"notebook-authorization"); + } + + @Override + public NotebookAuthorizationInfoSaving loadNotebookAuthorization() throws IOException { + LOGGER.info("Load notebook authorization from s3 Path: " + interpreterSettingPath); + String json = readFromS3(interpreterSettingPath); + return NotebookAuthorizationInfoSaving.fromJson(json); + } + + @Override + public String loadCredentials() throws IOException { + return null; + } + + @Override + public void saveCredentials(String credentials) throws IOException { + + } + + @VisibleForTesting + void saveToS3(String content, String s3Path,String tempFileName) throws IOException { + File file = File.createTempFile(tempFileName, "zpln"); + try { + Writer writer = new OutputStreamWriter(new FileOutputStream(file)); + writer.write(content); + writer.close(); + PutObjectRequest putRequest = new PutObjectRequest(bucketName, s3Path, file); + s3client.putObject(putRequest); + } + catch (AmazonClientException ace) { + throw new IOException("Fail to store " + tempFileName + ": " + s3Path + " in S3", ace); + } + finally { + FileUtils.deleteQuietly(file); + } + } + + @VisibleForTesting + String readFromS3( String filePath) throws IOException { + S3Object s3object; + try { + s3object = s3client.getObject(new GetObjectRequest(bucketName, + filePath)); + } + catch (AmazonClientException ace) { + throw new IOException("Fail to get file: " + filePath + " from S3", ace); + } + try (InputStream ins = s3object.getObjectContent()) { + return IOUtils.toString(ins, zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING)); + } + } + +} From dbb6639a90862437bc6432684ec8521c7061a8df Mon Sep 17 00:00:00 2001 From: xie-jia-jun Date: Fri, 13 Mar 2020 01:51:51 +0800 Subject: [PATCH 2/6] Add Aliyun OSS SDK --- zeppelin-zengine/pom.xml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml index da320a8d388..92a49c2b964 100644 --- a/zeppelin-zengine/pom.xml +++ b/zeppelin-zengine/pom.xml @@ -45,6 +45,7 @@ 0.9.8 1.4.01 1.3 + 3.8.0 1.11.736 2.2 4.5.4.201711221230-r @@ -329,6 +330,12 @@ aws-java-sdk-s3 ${aws.sdk.s3.version} + + + com.aliyun.oss + aliyun-sdk-oss + ${oss.version} + org.apache.hadoop From 5d4b64598623a479030dd63beb4262ae88b21fd8 Mon Sep 17 00:00:00 2001 From: xie-jia-jun Date: Fri, 13 Mar 2020 01:53:31 +0800 Subject: [PATCH 3/6] Support OSSConfigStorage of Aliyun --- .../zeppelin/storage/OSSConfigStorage.java | 132 ++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100644 zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/OSSConfigStorage.java diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/OSSConfigStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/OSSConfigStorage.java new file mode 100644 index 00000000000..aafe4af0fbc --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/OSSConfigStorage.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.storage; + +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSClientBuilder; +import com.aliyun.oss.model.OSSObject; +import com.aliyun.oss.model.PutObjectRequest; +import com.amazonaws.AmazonClientException; +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.io.IOUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.InterpreterInfoSaving; +import org.apache.zeppelin.notebook.NotebookAuthorizationInfoSaving; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; + +/** + * Storing config in Aliyun OSS file system + */ +public class OSSConfigStorage extends ConfigStorage { + + + private static Logger LOGGER = LoggerFactory.getLogger(OSSConfigStorage.class); + + + + private OSS ossClient; + private String bucketName; + private String interpreterSettingPath; + private String authorizationPath; + + + + public OSSConfigStorage(ZeppelinConfiguration zConf) { + super(zConf); + String endpoint = zConf.getOSSEndpoint(); + bucketName = zConf.getOSSBucketName(); + String rootFolder = zConf.getNotebookDir(); + if (rootFolder.startsWith("/")) { + rootFolder = rootFolder.substring(1); + } + this.interpreterSettingPath = rootFolder + "/interpreter.json"; + this.authorizationPath = rootFolder + "/notebook-authorization.json"; + String accessKeyId = zConf.getOSSAccessKeyId(); + String accessKeySecret = zConf.getOSSAccessKeySecret(); + this.ossClient = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret); + } + + @Override + public void save(InterpreterInfoSaving settingInfos) throws IOException { + LOGGER.info("Save Interpreter Setting to oss://{}/{}", this.bucketName, this.interpreterSettingPath); + saveToOSS(settingInfos.toJson(), interpreterSettingPath); + } + + @Override + public InterpreterInfoSaving loadInterpreterSettings() throws IOException { + LOGGER.info("Load Interpreter Setting from oss Path: " + interpreterSettingPath); + String json = readFromOSS(interpreterSettingPath); + return buildInterpreterInfoSaving(json); + } + + @Override + public void save(NotebookAuthorizationInfoSaving authorizationInfoSaving) throws IOException { + LOGGER.info("Save notebook authorization to oss://{}/{} ",this.bucketName,this.authorizationPath); + saveToOSS(authorizationInfoSaving.toJson(), authorizationPath); + } + + @Override + public NotebookAuthorizationInfoSaving loadNotebookAuthorization() throws IOException { + LOGGER.info("Load notebook authorization from oss Path: " + interpreterSettingPath); + String json = readFromOSS(interpreterSettingPath); + return NotebookAuthorizationInfoSaving.fromJson(json); + } + + @Override + public String loadCredentials() throws IOException { + return null; + } + + @Override + public void saveCredentials(String credentials) throws IOException { + + } + + @VisibleForTesting + void saveToOSS(String content, String ossPath) throws IOException { + try { + PutObjectRequest putObjectRequest = new com.aliyun.oss.model.PutObjectRequest(bucketName, + ossPath, new ByteArrayInputStream(content.getBytes())); + ossClient.putObject(putObjectRequest); + } + catch (AmazonClientException ace) { + throw new IOException("Fail to store " + ossPath + " in OSS", ace); + } + + } + + @VisibleForTesting + String readFromOSS( String filePath) throws IOException { + + OSSObject ossObject; + try { + ossObject = ossClient.getObject(bucketName, filePath); + } + catch (Exception e){ + throw new IOException("Fail to get file: " + filePath + " from OSS", e); + } + + try (InputStream in = ossObject.getObjectContent()){ + return IOUtils.toString(in,zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING)); + } + } + +} From 45af87a46621fe77758beb713b3d7962c26740ff Mon Sep 17 00:00:00 2001 From: xiejiajun Date: Fri, 20 Mar 2020 21:50:10 +0800 Subject: [PATCH 4/6] added timeout for getting Thrift client to avoid situations where the interpreter may not be restarted when the interpreter process exits unexpectedly --- .../zeppelin/interpreter/remote/RemoteInterpreterProcess.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java index 441dc76c931..de6b1574c22 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -48,7 +48,7 @@ public synchronized Client getClient() throws Exception { clientFactory = new ClientFactory(getHost(), getPort()); clientPool = new GenericObjectPool<>(clientFactory); } - return clientPool.borrowObject(); + return clientPool.borrowObject(5_000); } public void shutdown() { From 3b3e8aafd1e7c403f6fbca23cbf2dc1abf7affdb Mon Sep 17 00:00:00 2001 From: xiejiajun Date: Wed, 19 Aug 2020 16:34:33 +0800 Subject: [PATCH 5/6] Fix the interpreter dependency conflict caused by the CLASSPATH environment variable --- bin/zeppelin-daemon.sh | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/bin/zeppelin-daemon.sh b/bin/zeppelin-daemon.sh index 1065b142e72..ea386f27cbe 100755 --- a/bin/zeppelin-daemon.sh +++ b/bin/zeppelin-daemon.sh @@ -98,7 +98,7 @@ if [[ "${USE_HADOOP}" == "true" ]]; then fi fi -CLASSPATH+=":${ZEPPELIN_CLASSPATH}" +ZEPPELIN_SERVER_CLASSPATH="${CLASSPATH}:${ZEPPELIN_CLASSPATH}" if [[ "${ZEPPELIN_NICENESS}" = "" ]]; then export ZEPPELIN_NICENESS=0 @@ -182,9 +182,9 @@ function upstart() { # where the service manager starts and stops the process initialize_default_directories - echo "ZEPPELIN_CLASSPATH: ${ZEPPELIN_CLASSPATH_OVERRIDES}:${CLASSPATH}" >> "${ZEPPELIN_OUTFILE}" + echo "ZEPPELIN_CLASSPATH: ${ZEPPELIN_CLASSPATH_OVERRIDES}:${ZEPPELIN_SERVER_CLASSPATH}" >> "${ZEPPELIN_OUTFILE}" - $ZEPPELIN_RUNNER $JAVA_OPTS -cp $ZEPPELIN_CLASSPATH_OVERRIDES:$CLASSPATH $ZEPPELIN_MAIN >> "${ZEPPELIN_OUTFILE}" + $ZEPPELIN_RUNNER $JAVA_OPTS -cp $ZEPPELIN_CLASSPATH_OVERRIDES:$ZEPPELIN_SERVER_CLASSPATH $ZEPPELIN_MAIN >> "${ZEPPELIN_OUTFILE}" } function start() { @@ -200,9 +200,9 @@ function start() { initialize_default_directories - echo "ZEPPELIN_CLASSPATH: ${ZEPPELIN_CLASSPATH_OVERRIDES}:${CLASSPATH}" >> "${ZEPPELIN_OUTFILE}" + echo "ZEPPELIN_CLASSPATH: ${ZEPPELIN_CLASSPATH_OVERRIDES}:${ZEPPELIN_SERVER_CLASSPATH}" >> "${ZEPPELIN_OUTFILE}" - nohup nice -n $ZEPPELIN_NICENESS $ZEPPELIN_RUNNER $JAVA_OPTS -cp $ZEPPELIN_CLASSPATH_OVERRIDES:$CLASSPATH $ZEPPELIN_MAIN >> "${ZEPPELIN_OUTFILE}" 2>&1 < /dev/null & + nohup nice -n $ZEPPELIN_NICENESS $ZEPPELIN_RUNNER $JAVA_OPTS -cp $ZEPPELIN_CLASSPATH_OVERRIDES:$ZEPPELIN_SERVER_CLASSPATH $ZEPPELIN_MAIN >> "${ZEPPELIN_OUTFILE}" 2>&1 < /dev/null & pid=$! if [[ -z "${pid}" ]]; then action_msg "${ZEPPELIN_NAME} start" "${SET_ERROR}" From a160f769619b9ca2c2c9c401827cd8c8a1faf5e9 Mon Sep 17 00:00:00 2001 From: xiejiajun Date: Wed, 19 Aug 2020 17:49:19 +0800 Subject: [PATCH 6/6] Remove irrelevant file --- .../zeppelin/storage/OSSConfigStorage.java | 132 ---------------- .../zeppelin/storage/S3ConfigStorage.java | 148 ------------------ 2 files changed, 280 deletions(-) delete mode 100644 zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/OSSConfigStorage.java delete mode 100644 zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/S3ConfigStorage.java diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/OSSConfigStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/OSSConfigStorage.java deleted file mode 100644 index aafe4af0fbc..00000000000 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/OSSConfigStorage.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.storage; - -import com.aliyun.oss.OSS; -import com.aliyun.oss.OSSClientBuilder; -import com.aliyun.oss.model.OSSObject; -import com.aliyun.oss.model.PutObjectRequest; -import com.amazonaws.AmazonClientException; -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.io.IOUtils; -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.interpreter.InterpreterInfoSaving; -import org.apache.zeppelin.notebook.NotebookAuthorizationInfoSaving; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.*; - -/** - * Storing config in Aliyun OSS file system - */ -public class OSSConfigStorage extends ConfigStorage { - - - private static Logger LOGGER = LoggerFactory.getLogger(OSSConfigStorage.class); - - - - private OSS ossClient; - private String bucketName; - private String interpreterSettingPath; - private String authorizationPath; - - - - public OSSConfigStorage(ZeppelinConfiguration zConf) { - super(zConf); - String endpoint = zConf.getOSSEndpoint(); - bucketName = zConf.getOSSBucketName(); - String rootFolder = zConf.getNotebookDir(); - if (rootFolder.startsWith("/")) { - rootFolder = rootFolder.substring(1); - } - this.interpreterSettingPath = rootFolder + "/interpreter.json"; - this.authorizationPath = rootFolder + "/notebook-authorization.json"; - String accessKeyId = zConf.getOSSAccessKeyId(); - String accessKeySecret = zConf.getOSSAccessKeySecret(); - this.ossClient = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret); - } - - @Override - public void save(InterpreterInfoSaving settingInfos) throws IOException { - LOGGER.info("Save Interpreter Setting to oss://{}/{}", this.bucketName, this.interpreterSettingPath); - saveToOSS(settingInfos.toJson(), interpreterSettingPath); - } - - @Override - public InterpreterInfoSaving loadInterpreterSettings() throws IOException { - LOGGER.info("Load Interpreter Setting from oss Path: " + interpreterSettingPath); - String json = readFromOSS(interpreterSettingPath); - return buildInterpreterInfoSaving(json); - } - - @Override - public void save(NotebookAuthorizationInfoSaving authorizationInfoSaving) throws IOException { - LOGGER.info("Save notebook authorization to oss://{}/{} ",this.bucketName,this.authorizationPath); - saveToOSS(authorizationInfoSaving.toJson(), authorizationPath); - } - - @Override - public NotebookAuthorizationInfoSaving loadNotebookAuthorization() throws IOException { - LOGGER.info("Load notebook authorization from oss Path: " + interpreterSettingPath); - String json = readFromOSS(interpreterSettingPath); - return NotebookAuthorizationInfoSaving.fromJson(json); - } - - @Override - public String loadCredentials() throws IOException { - return null; - } - - @Override - public void saveCredentials(String credentials) throws IOException { - - } - - @VisibleForTesting - void saveToOSS(String content, String ossPath) throws IOException { - try { - PutObjectRequest putObjectRequest = new com.aliyun.oss.model.PutObjectRequest(bucketName, - ossPath, new ByteArrayInputStream(content.getBytes())); - ossClient.putObject(putObjectRequest); - } - catch (AmazonClientException ace) { - throw new IOException("Fail to store " + ossPath + " in OSS", ace); - } - - } - - @VisibleForTesting - String readFromOSS( String filePath) throws IOException { - - OSSObject ossObject; - try { - ossObject = ossClient.getObject(bucketName, filePath); - } - catch (Exception e){ - throw new IOException("Fail to get file: " + filePath + " from OSS", e); - } - - try (InputStream in = ossObject.getObjectContent()){ - return IOUtils.toString(in,zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING)); - } - } - -} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/S3ConfigStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/S3ConfigStorage.java deleted file mode 100644 index 5f79bba66c5..00000000000 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/S3ConfigStorage.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.storage; - -import com.amazonaws.AmazonClientException; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.ClientConfigurationFactory; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3Client; -import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.PutObjectRequest; -import com.amazonaws.services.s3.model.S3Object; -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.IOUtils; -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.interpreter.InterpreterInfoSaving; -import org.apache.zeppelin.notebook.NotebookAuthorizationInfoSaving; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.*; - -/** - * Storing config in aws s3 file system - */ -public class S3ConfigStorage extends ConfigStorage { - - - private static Logger LOGGER = LoggerFactory.getLogger(S3ConfigStorage.class); - - private AmazonS3 s3client; - private String bucketName; - private String user; - private String rootFolder; - private String interpreterSettingPath; - private String authorizationPath; - - - - public S3ConfigStorage(ZeppelinConfiguration zConf) { - super(zConf); - bucketName = zConf.getS3BucketName(); - user = zConf.getS3User(); - rootFolder = user + "/conf"; - this.interpreterSettingPath = rootFolder + "/interpreter.json"; - this.authorizationPath = rootFolder + "/notebook-authorization.json"; - - // always use the default provider chain - AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain(); - - ClientConfigurationFactory configFactory = new ClientConfigurationFactory(); - ClientConfiguration cliConf = configFactory.getConfig(); - - // regular S3 - this.s3client = new AmazonS3Client(credentialsProvider, cliConf); - - // set S3 endpoint to use - s3client.setEndpoint(zConf.getS3Endpoint()); - } - - @Override - public void save(InterpreterInfoSaving settingInfos) throws IOException { - LOGGER.info("Save Interpreter Setting to s3://{}/{}", this.bucketName, this.interpreterSettingPath); - saveToS3(settingInfos.toJson(), interpreterSettingPath,"zeppelin-interpreter"); - } - - @Override - public InterpreterInfoSaving loadInterpreterSettings() throws IOException { - LOGGER.info("Load Interpreter Setting from s3 Path: " + interpreterSettingPath); - String json = readFromS3(interpreterSettingPath); - return buildInterpreterInfoSaving(json); - } - - @Override - public void save(NotebookAuthorizationInfoSaving authorizationInfoSaving) throws IOException { - LOGGER.info("Save notebook authorization to s3://{}/{} ",this.bucketName,this.authorizationPath); - saveToS3(authorizationInfoSaving.toJson(), authorizationPath,"notebook-authorization"); - } - - @Override - public NotebookAuthorizationInfoSaving loadNotebookAuthorization() throws IOException { - LOGGER.info("Load notebook authorization from s3 Path: " + interpreterSettingPath); - String json = readFromS3(interpreterSettingPath); - return NotebookAuthorizationInfoSaving.fromJson(json); - } - - @Override - public String loadCredentials() throws IOException { - return null; - } - - @Override - public void saveCredentials(String credentials) throws IOException { - - } - - @VisibleForTesting - void saveToS3(String content, String s3Path,String tempFileName) throws IOException { - File file = File.createTempFile(tempFileName, "zpln"); - try { - Writer writer = new OutputStreamWriter(new FileOutputStream(file)); - writer.write(content); - writer.close(); - PutObjectRequest putRequest = new PutObjectRequest(bucketName, s3Path, file); - s3client.putObject(putRequest); - } - catch (AmazonClientException ace) { - throw new IOException("Fail to store " + tempFileName + ": " + s3Path + " in S3", ace); - } - finally { - FileUtils.deleteQuietly(file); - } - } - - @VisibleForTesting - String readFromS3( String filePath) throws IOException { - S3Object s3object; - try { - s3object = s3client.getObject(new GetObjectRequest(bucketName, - filePath)); - } - catch (AmazonClientException ace) { - throw new IOException("Fail to get file: " + filePath + " from S3", ace); - } - try (InputStream ins = s3object.getObjectContent()) { - return IOUtils.toString(ins, zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING)); - } - } - -}