Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions contrib/firebaseio/AUTHORS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
elibixby [@ github, google, gmail ]
15 changes: 15 additions & 0 deletions contrib/firebaseio/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
## FirebaseIO for Dataflow

This library includes two components for communicating with [Firebase](https://www.firebase.com/) from Dataflow:

* `FirebaseSource` an `UnboundedSource` which can watch a Firebase ref for events and attempts (but does not guarantee) to deliver those events in order to the Dataflow pipeline. The source will unmarshal `DataSnapshot`s generated by these events into the paramaterized type specified by your `FirebaseSource`, which can be any object serializable by Jackson.

* Various `FirebaseDoFn` which can be used to write data to a firebase:
* `DoFirebasePush` will attach unique and time-ordered strings as keys to objects recieved from the pipeline.
* `DoFirebaseSet` will overriding data at the key's location with the specified `Object`
* `DoFirebaseUpdate` will update the tree at the ref with the given `Map<String, Object>`


These utilities also use various `FirebaseEvents` to wrap your unmarshalled data in a Jackson Serializable POJO, which retains event type information. So for example you could filter a pipeline on `ChildAdded<T>` events, where your original `DataSnapshot` was unmarshalled into a Jackson Serializable POJO of type `T`.

The library uses the firebase JVM client library and as such takes advantage of Web sockets for data transfer. However this means that writing to Firebase will use one websocket per shard, so if you plan to fan out your writes, make sure to limit the parallelism with `PTransform#withMaxParallelism` to avoid capping out your available connections.
115 changes: 115 additions & 0 deletions contrib/firebaseio/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-contrib-firebaseio</artifactId>
<name>FirebaseIO</name>
<description>Unbounded Source, and DoFns to read and write to Firebase from Dataflow</description>
<version>0.0.2-SNAPSHOT</version>

<licenses>
<license>
<name>Apache License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
<distribution>repo</distribution>
</license>
</licenses>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<google-cloud-dataflow-version>[1.2.0, 2.0.0)</google-cloud-dataflow-version>
</properties>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.12</version>
<dependencies>
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<version>6.6</version>
</dependency>
</dependencies>
<configuration>
<configLocation>../../checkstyle.xml</configLocation>
<consoleOutput>true</consoleOutput>
<failOnViolation>true</failOnViolation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
</configuration>
<executions>
<execution>
<phase>validate</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>${google-cloud-dataflow-version}</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>

<dependency>
<groupId>com.firebase</groupId>
<artifactId>firebase-client-jvm</artifactId>
<version>2.4.0</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.15</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.15</version>
</dependency>

<!-- Dependency for tests -->
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/**
* Copyright (c) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.dataflow.contrib.firebase.contrib;

import static com.google.cloud.dataflow.sdk.util.Structs.addString;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.StandardCoder;
import com.google.cloud.dataflow.sdk.util.CloudObject;
import com.google.cloud.dataflow.sdk.values.TypeDescriptor;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.SerializationFeature;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;

/**
* Encodes objects using {@link ObjectReader} and {@link ObjectWriter}.
*/
public class JacksonCoder<T> extends StandardCoder<T> {

private static final long serialVersionUID = -754345287170755870L;

private static final ObjectMapper MAPPER = new ObjectMapper()
.enableDefaultTyping(DefaultTyping.NON_FINAL)
.enable(DeserializationFeature.WRAP_EXCEPTIONS)
.enable(SerializationFeature.WRAP_EXCEPTIONS)
.disable(SerializationFeature.FAIL_ON_UNWRAPPED_TYPE_IDENTIFIERS)
.enable(SerializationFeature.WRITE_NULL_MAP_VALUES)
.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false)
.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);

// Threadsafe as per http://wiki.fasterxml.com/JacksonBestPracticesPerformance
private final ObjectWriter writer;
private final ObjectReader reader;

protected final Class<T> type;

public static <K> JacksonCoder<K> of(TypeDescriptor<K> type) {
@SuppressWarnings("unchecked")
Class<K> clazz = (Class<K>) type.getRawType();
return of(clazz);
}

/**
* JacksonCoder will encode any type that can be serialized/deserialized by Jackson.
* This can be achieved by using either Jackson annotations (as seen in this library), or
* implementing @see <a href=http://wiki.fasterxml.com/JacksonHowToCustomSerializers>
* custom serializers</a>.
* @param clazz Type to encode
* @return A JacksonCoder parameterized with type {@code clazz}
*/
public static <K> JacksonCoder<K> of(Class<K> clazz){
return new JacksonCoder<>(clazz);
}

@SuppressWarnings({"unchecked", "rawtypes"})
@JsonCreator
public static JacksonCoder<?> of(
@JsonProperty("type") String classType) throws ClassNotFoundException {
return new JacksonCoder(Class.forName(classType));
}

protected JacksonCoder(Class<T> clazz){
this.type = clazz;
writer = MAPPER.writer();
reader = MAPPER.readerFor(type);
}

@Override
public void encode(T value, OutputStream outStream, Context context)
throws IOException {
writer.writeValue(outStream, value);
}

@Override
public T decode(InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
throws IOException {
return reader.readValue(inStream);
}

@Override
public List<? extends Coder<?>> getCoderArguments() {
return Collections.emptyList();
}

@Override
public void verifyDeterministic() throws NonDeterministicException{
throw new Coder.NonDeterministicException(this, "JSON is not a deterministic encoding");
}

public Class<T> getType(){
return this.type;
}

protected Object writeReplace() {
// When serialized by Java, instances of AvroCoder should be replaced by
// a SerializedJacksonCoderProxy.
return new SerializedJacksonCoderProxy<>(type);
}

@Override
public CloudObject asCloudObject() {
CloudObject result = super.asCloudObject();
addString(result, "type", type.getName());
return result;
}

/**
* Proxy to use in place of serializing the JacksonCoder. This allows the fields
* to remain final.
*/
protected static class SerializedJacksonCoderProxy<T> implements Serializable {
protected final Class<T> type;

public SerializedJacksonCoderProxy(Class<T> type) {
this.type = type;
}

private Object readResolve() {
// When deserialized, instances of this object should be replaced by
// constructing an JacksonCoder.
return new JacksonCoder<>(type);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Copyright (c) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.dataflow.contrib.firebase.contrib;

import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PDone;

import org.slf4j.event.Level;
import org.slf4j.event.LoggingEvent;
import org.slf4j.event.SubstituteLoggingEvent;
import org.slf4j.impl.SimpleLogger;
import org.slf4j.impl.SimpleLoggerFactory;

/**
* Logs a {@link PCollection} to a logger for the specified {@link Class}.
*/
public class LogElements<T> extends PTransform<PCollection<T>, PDone> {

private final Level level;
private final String className;

public LogElements(Class<?> clazz, Level level){
this.className = clazz.getName();
this.level = level;
}

@Override
public PDone apply(PCollection<T> input){
input.apply(ParDo.of(new LoggingDoFn<T>(this.className, this.level)));
return PDone.in(input.getPipeline());
}

/**
* Given a PCollection of {@link LoggingEvent}s, log them to a simple logger (uses System.err).
*/
public static class LoggingDoFn<T> extends DoFn<T, Void>{

private final String className;
private final Level level;
private transient SimpleLogger logger;

public LoggingDoFn(String className, Level level){
this.className = className;
this.level = level;
}

@Override
public void startBundle(Context c) throws Exception {
logger = (SimpleLogger) new SimpleLoggerFactory().getLogger(this.className);
}

@Override
public void processElement(DoFn<T, Void>.ProcessContext c) throws Exception {
SubstituteLoggingEvent event = new SubstituteLoggingEvent();
event.setMessage(c.element().toString());
event.setLevel(this.level);
logger.log(event);
}
}
}
Loading