Skip to content

Commit

Permalink
Working implementation of RSS polling / aggregation / conversion to a…
Browse files Browse the repository at this point in the history
…ctivitystreams format

git-svn-id: https://svn.apache.org/repos/asf/incubator/streams/trunk@1565455 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
sblackmon committed Feb 6, 2014
1 parent 82917c4 commit d3896be
Show file tree
Hide file tree
Showing 21 changed files with 1,499 additions and 4 deletions.
14 changes: 14 additions & 0 deletions provision/provision.iml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="false">
<output url="file://$MAVEN_REPOSITORY$/org/apache/streams/streams-master/0.2-incubating-SNAPSHOT/target/classes" />
<output-test url="file://$MAVEN_REPOSITORY$/org/apache/streams/streams-master/0.2-incubating-SNAPSHOT/target/test-classes" />
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/target" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="POM File Configuration" pomFile="" />
</module>

1 change: 1 addition & 0 deletions streams-contrib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
<module>streams-provider-moreover</module>
<module>streams-provider-twitter</module>
<module>streams-provider-sysomos</module>
<module>streams-provider-rss</module>
<!--<module>streams-proxy-semantria</module>-->
</modules>

Expand Down
16 changes: 16 additions & 0 deletions streams-contrib/streams-provider-rss/README.markdown
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
streams-provider-rss

Purpose

Module connects to rss feeds, collects events, and passes each message downstream once.

Capabilities

Simplification

Optionally, module can output messages as basic text

Normalization

Optionally, module can output messages as other json objects such as Activity

173 changes: 173 additions & 0 deletions streams-contrib/streams-provider-rss/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.streams</groupId>
<artifactId>streams-contrib</artifactId>
<version>0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>streams-provider-rss</artifactId>

<dependencies>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.jsonschema2pojo</groupId>
<artifactId>jsonschema2pojo-core</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-pojo</artifactId>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-config</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path-assert</artifactId>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-persist-console</artifactId>
<version>0.1-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>rome</groupId>
<artifactId>rome</artifactId>
<version>1.0</version>
</dependency>
</dependencies>

<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
<testResources>
<testResource>
<directory>src/test/resources</directory>
</testResource>
</testResources>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.8</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>target/generated-sources/jsonschema2pojo</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jsonschema2pojo</groupId>
<artifactId>jsonschema2pojo-maven-plugin</artifactId>
<configuration>
<addCompileSourceRoot>true</addCompileSourceRoot>
<generateBuilders>true</generateBuilders>
<sourcePaths>
<sourcePath>src/main/jsonschema</sourcePath>
</sourcePaths>
<outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
<targetPackage>org.apache.streams.rss.pojo</targetPackage>
<useLongIntegers>true</useLongIntegers>
<useJodaDates>false</useJodaDates>
</configuration>
<executions>
<execution>
<goals>
<goal>generate</goal>
</goals>
</execution>
</executions>
</plugin>
<!--<plugin>-->
<!--<groupId>org.jvnet.jaxb2.maven2</groupId>-->
<!--<artifactId>maven-jaxb2-plugin</artifactId>-->
<!--<configuration>-->
<!--<schemaDirectory>src/main/xmlschema</schemaDirectory>-->
<!--<generateDirectory>target/generated-sources/jaxb2</generateDirectory>-->
<!--<verbose>true</verbose>-->
<!--<debug>true</debug>-->
<!--<encoding>${project.build.sourceEncoding}</encoding>-->
<!--<forceRegenerate>true</forceRegenerate>-->
<!--<removeOldOutput>false</removeOldOutput>-->
<!--<generatePackage>org.apache.streams.rss</generatePackage>-->
<!--<schemas>-->
<!--<schema>-->
<!--<fileset>-->
<!--&lt;!&ndash; Defaults to schemaIncludes &ndash;&gt;-->
<!--<includes>-->
<!--<include>contents.xsd</include>-->
<!--<include>opml.xsd</include>-->
<!--</includes>-->
<!--</fileset>-->
<!--</schema>-->
<!--</schemas>-->
<!--&lt;!&ndash;<plugins>&ndash;&gt;-->
<!--&lt;!&ndash;<plugin>&ndash;&gt;-->
<!--&lt;!&ndash;<groupId>org.jvnet.jaxb2_commons</groupId>&ndash;&gt;-->
<!--&lt;!&ndash;<artifactId>jaxb2-basics</artifactId>&ndash;&gt;-->
<!--&lt;!&ndash;<version>0.6.5</version>&ndash;&gt;-->
<!--&lt;!&ndash;</plugin>&ndash;&gt;-->
<!--&lt;!&ndash;</plugins>&ndash;&gt;-->
<!--</configuration>-->
<!--<executions>-->
<!--<execution>-->
<!--<goals>-->
<!--<goal>generate</goal>-->
<!--</goals>-->
<!--</execution>-->
<!--</executions>-->
<!--</plugin>-->
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.apache.streams.rss.provider;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.sun.syndication.feed.synd.SyndEntry;

/**
* Created by sblackmon on 12/13/13.
*/
public class RssEventClassifier {

public static Class detectClass( ObjectNode bean ) {
return SyndEntry.class;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package org.apache.streams.rss.provider;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.sun.syndication.feed.synd.SyndEntry;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.rss.serializer.SyndEntryActivitySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Queue;
import java.util.Random;

/**
* Created by sblackmon on 12/10/13.
*/
public class RssEventProcessor implements Runnable {

private final static Logger LOGGER = LoggerFactory.getLogger(RssEventProcessor.class);

private ObjectMapper mapper = new ObjectMapper();

private Queue<SyndEntry> inQueue;
private Queue<StreamsDatum> outQueue;

private Class inClass;
private Class outClass;

private SyndEntryActivitySerializer syndEntryActivitySerializer = new SyndEntryActivitySerializer();

public final static String TERMINATE = new String("TERMINATE");

public RssEventProcessor(Queue<SyndEntry> inQueue, Queue<StreamsDatum> outQueue, Class inClass, Class outClass) {
this.inQueue = inQueue;
this.outQueue = outQueue;
this.inClass = inClass;
this.outClass = outClass;
}

public RssEventProcessor(Queue<SyndEntry> inQueue, Queue<StreamsDatum> outQueue, Class outClass) {
this.inQueue = inQueue;
this.outQueue = outQueue;
this.outClass = outClass;
}

@Override
public void run() {

while(true) {
Object item;
try {
item = inQueue.poll();
if(item instanceof String && item.equals(TERMINATE)) {
LOGGER.info("Terminating!");
break;
}

Thread.sleep(new Random().nextInt(100));

// if the target is string, just pass-through
if( String.class.equals(outClass))
outQueue.offer(new StreamsDatum(item.toString()));
else if( SyndEntry.class.equals(outClass))
{
outQueue.offer(new StreamsDatum(item));
}
else if( Activity.class.equals(outClass))
{
// convert to desired format
SyndEntry entry = (SyndEntry)item;
if( entry != null ) {
Activity out = syndEntryActivitySerializer.deserialize((SyndEntry)item);

if( out != null )
outQueue.offer(new StreamsDatum(out));
}
}

} catch (Exception e) {
e.printStackTrace();
}
}
}

};
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.apache.streams.rss.provider;

import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import org.apache.streams.rss.FeedDetails;
import org.apache.streams.rss.RssStreamConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
* Created by sblackmon on 12/10/13.
*/
public class RssStreamConfigurator {

private final static Logger LOGGER = LoggerFactory.getLogger(RssStreamConfigurator.class);

public static RssStreamConfiguration detectConfiguration(Config rss) {

RssStreamConfiguration rssStreamConfiguration = new RssStreamConfiguration();

List<FeedDetails> feeds = Lists.newArrayList();
feeds.add(new FeedDetails().withUrl(rss.getString("url")));

rssStreamConfiguration.setFeeds(feeds);
return rssStreamConfiguration;
}

}
Loading

0 comments on commit d3896be

Please sign in to comment.