Permalink
Browse files

added camel example for S3 and Google Storage

  • Loading branch information...
1 parent 96e6318 commit 8e1546ae40ef2bcdf1628a6beb901d00357294c3 @barryku committed Feb 8, 2011
View
@@ -0,0 +1,52 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>com.barryku.camel</groupId>
+ <artifactId>CamelApp</artifactId>
+ <version>0.1</version>
+ <name>Camel App</name>
+ <description>Camel Payground</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-spring</artifactId>
+ <version>2.6.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk</artifactId>
+ <version>1.0.14</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.api.client</groupId>
+ <artifactId>google-api-client</artifactId>
+ <version>1.2.2-alpha</version>
+ </dependency>
+
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>0.9.24</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.5</source>
+ <target>1.5</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
@@ -0,0 +1,29 @@
+package com.barryku.camel;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Component;
+import org.apache.camel.builder.RouteBuilder;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class FileCopy {
+
+ public static void main(String... args) throws Exception {
+ ApplicationContext springContext = new ClassPathXmlApplicationContext("META-INF/spring/camel-context.xml");
+ CamelContext context = (CamelContext) springContext.getBean("camelContext");
+ context.addComponent("s3file", (Component) springContext.getBean("s3FileComponent"));
+
+ context.addRoutes(new RouteBuilder() {
+
+ @Override
+ public void configure() throws Exception {
+ from("s3file://data/inbox?noop=false").beanRef("gsFileManager", "process");
+
+ }
+ });
+
+ context.start();
+ Thread.sleep(3000);
+ context.stop();
+ }
+}
@@ -0,0 +1,107 @@
+package com.barryku.camel;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URLEncoder;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Locale;
+import java.util.TimeZone;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.util.MessageHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import com.google.api.client.googleapis.GoogleHeaders;
+import com.google.api.client.googleapis.GoogleTransport;
+import com.google.api.client.googleapis.auth.storage.GoogleStorageAuthentication;
+import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpResponseException;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.InputStreamContent;
+
+@Component
+public class GsFileManager {
+ private static final SimpleDateFormat httpDateFormat = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z", Locale.US);
+ private static final Logger logger = LoggerFactory.getLogger(GsFileManager.class);
+ static{
+ httpDateFormat.setTimeZone(TimeZone.getTimeZone("pst"));
+ }
+
+ public GsFileManager() {}
+
+ public void process(Message message) throws IOException {
+
+ String fileName = message.getHeader(Exchange.FILE_NAME, String.class);
+ String type = MessageHelper.getContentType(message);
+ type = type == null ? "text/html" : type;
+ byte[] content = message.getBody(byte[].class);
+ logger.info("saving " + fileName + ", " + type);
+ putFile(fileName, type, content);
+ }
+
+
+ private @Value("${gs.apiKey}") String apiKey;
+ private @Value("${gs.apiKeySecret}") String apiKeySecret;
+ private @Value("${gs.bucket}") String bucket;
+ private @Value("${gs.url}") String url;
+
+ private void putFile(String fileName, String type, byte[] content) throws IOException {
+ HttpTransport transport = GoogleTransport.create();
+ GoogleStorageAuthentication.authorize(transport, apiKey, apiKeySecret);
+
+ HttpRequest request = transport.buildPutRequest();
+
+
+ InputStreamContent isc = new InputStreamContent();
+ isc.inputStream = new ByteArrayInputStream(content);
+ isc.type = type;
+ request.content = isc;
+ request.url = new GenericUrl(url + bucket + "/" + URLEncoder.encode(fileName, "utf8"));
+ GoogleHeaders headers = (GoogleHeaders) request.headers;
+ headers.date = httpDateFormat.format(new Date());
+ try {
+ request.execute();
+ } catch (HttpResponseException e) {
+ logger.warn(getStreamContent(e.response.getContent()), e);
+ }
+ }
+
+ public static void main(String... args) throws Exception {
+
+ HttpTransport transport = GoogleTransport.create();
+ //transport.addParser(new XmlHttpParser());
+ GoogleStorageAuthentication.authorize(transport, "mykey", "myKeySecret....");
+
+ HttpRequest request = transport.buildGetRequest();
+ request.url= new GenericUrl("http://agilecloud.commondatastorage.googleapis.com/");
+
+ GoogleHeaders headers = (GoogleHeaders) request.headers;
+ headers.date = httpDateFormat.format(new Date());
+
+ try {
+
+ HttpResponse response = request.execute();
+ logger.info(getStreamContent(response.getContent()));
+
+ } catch (HttpResponseException e) {
+ logger.warn(getStreamContent(e.response.getContent()), e);
+ }
+ }
+
+ private static String getStreamContent(InputStream in) throws IOException {
+ StringBuffer out = new StringBuffer();
+ byte[] b = new byte[4096];
+ for (int n; (n = in.read(b)) != -1;) {
+ out.append(new String(b, 0, n));
+ }
+ return out.toString();
+ }
+}
@@ -0,0 +1,96 @@
+package com.barryku.camel;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.camel.component.file.FileEndpoint;
+import org.apache.camel.component.file.GenericFileComponent;
+import org.apache.camel.component.file.GenericFileConfiguration;
+import org.apache.camel.component.file.GenericFileEndpoint;
+import org.apache.camel.language.simple.SimpleLanguage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+
+@Component
+public class S3FileComponent extends GenericFileComponent<File> {
+ private static final Logger logger = LoggerFactory.getLogger(S3FileComponent.class);
+ /**
+ * GenericFile property on Camel Exchanges.
+ */
+ public static final String FILE_EXCHANGE_FILE = "CamelFileExchangeFile";
+
+ /**
+ * Default camel lock filename postfix
+ */
+ public static final String DEFAULT_LOCK_FILE_POSTFIX = ".camelLock";
+
+ protected GenericFileEndpoint<File> buildFileEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+ // the starting directory must be a static (not containing dynamic expressions)
+ if (SimpleLanguage.hasStartToken(remaining)) {
+ throw new IllegalArgumentException("Invalid directory: " + remaining
+ + ". Dynamic expressions with ${ } placeholders is not allowed."
+ + " Use the fileName option to set the dynamic expression.");
+ }
+
+
+ getFiles(remaining);
+ File file = new File(TEMP_FOLDER);
+
+ FileEndpoint result = new FileEndpoint(uri, this);
+ result.setFile(file);
+
+ GenericFileConfiguration config = new GenericFileConfiguration();
+ config.setDirectory(file.getPath());
+ result.setConfiguration(config);
+
+ return result;
+ }
+
+ private static final String FOLDER_SUFFIX = "/";
+ private static final String TEMP_FOLDER = "tmp";
+
+ private @Value("${s3.apiKey}") String apiKey;
+ private @Value("${s3.apiKeySecret}") String apiKeySecret;
+ private @Value("${s3.bucket}") String bucket;
+
+ private void getFiles(String path) throws IOException{
+ AmazonS3 s3 = new AmazonS3Client(
+ new BasicAWSCredentials(apiKey, apiKeySecret));
+ ObjectListing objList = s3.listObjects(bucket, path);
+
+ for (S3ObjectSummary summary:objList.getObjectSummaries()) {
+ //ignore folders
+ if(! summary.getKey().endsWith(FOLDER_SUFFIX)){
+ S3Object obj = s3.getObject(
+ new GetObjectRequest(bucket, summary.getKey()));
+ logger.info("retrieving " + summary.getKey());
+ FileOutputStream fout = new FileOutputStream(TEMP_FOLDER + summary.getKey().substring(path.length()));
+ InputStream in = obj.getObjectContent();
+ byte[] buf = new byte[1024];
+ int len;
+ while ((len = in.read(buf)) > 0){
+ fout.write(buf, 0, len);
+ }
+ in.close();
+ fout.close();
+ }
+ }
+
+ }
+ protected void afterPropertiesSet(GenericFileEndpoint<File> endpoint) throws Exception {
+ // noop
+ }
+}
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- Configures the Camel Context-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
+ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+ http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd"
+ xmlns:context="http://www.springframework.org/schema/context">
+
+ <context:property-placeholder ignore-resource-not-found="true"
+ location="/META-INF/spring/spring.properties,file:/.cloudapp/camel-app.properties"/>
+ <context:component-scan base-package="com.barryku.camel"/>
+ <context:annotation-config />
+
+
+ <camelContext id="camelContext" xmlns="http://camel.apache.org/schema/spring">
+ </camelContext>
+
+</beans>
@@ -0,0 +1,8 @@
+gs.apiKey = dummy
+gs.apiKeySecret = dummy
+gs.bucket = agilecloud
+gs.url = http://commondatastorage.googleapis.com/
+
+s3.bucket = barryku
+s3.apiKey = dummy
+s3.apiKeySecret = dummy
@@ -0,0 +1,16 @@
+<configuration>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <!-- encoders are assigned the type
+ ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+ <encoder>
+ <pattern>%d{HH:mm:ss} %-5level - %logger{30} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+
+ <root level="info">
+ <appender-ref ref="STDOUT" />
+ </root>
+
+</configuration>

0 comments on commit 8e1546a

Please sign in to comment.