The Cosmos Generic Enabler enables an easier BigData analysis over context integrated with some of the most popular BigData platforms.
Branch: master
Clone or download
Latest commit 4566f9e Feb 22, 2019
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
doc Update deprecated_functionalities.md Jan 24, 2019
lib Add linter Nov 30, 2018
src Unit tests Dec 2, 2018
.gitignore Add linter Nov 30, 2018
.travis.yml Update yaml Dec 2, 2018
CONTRIBUTIONS.md Modify README Jan 8, 2019
CREDITS.md Fix credits.md Dec 3, 2018
LICENSE Add Badges and CSS according to TSC requirements. Oct 12, 2018
NOTICE.md Create NOTICE.md Oct 15, 2018
README.md Update README.md Feb 22, 2019
ROADMAP.md Update ROADMAP.md Jan 21, 2019
dependency-reduced-pom.xml Add linter Nov 30, 2018
mkdocs.yml
pom.xml Unit tests Dec 2, 2018

README.md

fiware-cosmos-orion-flink-connector

License
Documentation badge Build Status Coverage Status Known Vulnerabilities Status

The Cosmos Generic Enabler simplifies Big Data analysis of context data and integrates with some of the many popular Big Data platforms.

Cosmos is a FIWARE Generic Enabler. Therefore, it can be integrated as part of any platform “Powered by FIWARE”. FIWARE is a curated framework of open source platform components which can be assembled together with other third-party platform components to accelerate the development of Smart Solutions.

This project is part of FIWARE. For more information check the FIWARE Catalogue entry for Context Processing, Analysis and Visualization.

📚 Documentation 🎓 Academy 🎯 Roadmap

Table of Contents


What is Cosmos?

The Cosmos Big Data Analysis GE is a set of tools that help achieving the tasks of Streaming and Batch processing over context data. These tools are:

  • Orion-Flink Connector (Source and Sink)
  • Apache Flink Processing Engine
  • Apache Spark Processing Engine (work in progress)
  • Streaming processing examples using Orion Context Broker

Why use Cosmos?

As the state of the real world changes, the entities representing your IoT devices are constantly changing. Big data analysis allows for the study of datasets coming from your context data which are too large for traditional data-processing software. You can apply predictive analysis or user behaviour analytics to extract meaningful conclusions as to the state of your smart solution and bring value to your solution.

Orion Flink Connector

This is a Flink connector for the Fiware Orion Context Broker. It has two parts:

  • OrionSource: Source for receiving NGSIv2 events in the shape of HTTP messages from subscriptions.
  • OrionSink: Sink for writing back to the Context Broker.

Installation

Download the JAR from the latest release. In your project directory run:

mvn install:install-file -Dfile=$(PATH_DOWNLOAD)/orion.flink.connector-1.0.jar -DgroupId=org.fiware.cosmos -DartifactId=orion.flink.connector -Dversion=1.0 -Dpackaging=jar

Add it to your pom.xml file inside the dependencies section.

<dependency>
    <groupId>org.fiware.cosmos</groupId>
    <artifactId>orion.flink.connector</artifactId>
    <version>1.0</version>
</dependency>

Usage: API Overview

OrionSource

  • Import dependency.
    import org.fiware.cosmos.orion.flink.connector.{OrionSource}
  • Add source to Flink Environment. Indicate what port you want to listen to (e.g. 9001).
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val eventStream = env.addSource(new OrionSource(9001))
  • Parse the received data.
    val processedDataStream = eventStream.
        .flatMap(event => event.entities)
        // ...processing
The received data is a DataStream of objects of the class **`NgsiEvent`**.
This class has the following attributes:
-   **`creationTime`**: Timestamp of arrival.
-   **`service`**: Fiware service extracted from the HTTP headers.
-   **`servicePath`**: Fiware service path extracted from the HTTP headers.
-   **`entities`**: Sequence of entites included in the message. Each entity
    has the following attributes:
    -   **`id`**: Identifier of the entity.
    -   **`type`**: Node type.
    -   **`attrs`**: Map of attributes in which the key is the attribute
        name and the value is an object with the following properties:
        -   **`type`**: Type of value (Float, Int,...).
        -   **`value`**: Value of the attribute.
        -   **`metadata`**: Additional metadata.

OrionSink

  • Import dependency.
    import org.fiware.cosmos.orion.flink.connector.{OrionSink,OrionSinkObject,ContentType,HTTPMethod}
  • Add sink to source.
val processedDataStream = eventStream. // ...
    processing .map(obj => new OrionSinkObject( "{\"temperature_avg\": {
    \"value\":"+obj.temperature+", \"type\": \"Float\"}}", // Stringified JSON
    message "http://context-broker-url:8080/v2/entities/Room1", // URL
    ContentType.JSON, // Content type HTTPMethod.POST) // HTTP method )

        OrionSink.addSink( processedDataStream )
The sink accepts a `DataStream` of objects of the class
**`OrionSinkObject`**. This class has 4 attributes:
  • content: Message content in String format. If it is a JSON, you need to make sure to stringify it before sending it.
  • url: URL to which the message should be sent.
  • contentType: Type of HTTP content of the message. It can be ContentType.JSON or ContentType.Plain.
  • method: HTTP method of the message. It can be HTTPMethod.POST, HTTPMethod.PUT or HTTPMethod.PATCH.

Production

Warning ⚠️

When packaging your code in a JAR, it is common to exclude dependencies like Flink and Scala since they are typically provided by the execution environment. Nevertheless, it is necessary to include this connector in your packaged code, since it is not part of the Flink distribution.

Training courses

Academy Courses

Some lessons on Big Data Fundamentals are offered in the FIWARE Academy .

Code Examples

Several examples are provided to facilitate getting started with the connector. They are hosted in a separate repository: fiware-cosmos-orion-flink-connector-examples.

Other Presentations

Quality Assurance

This project is part of FIWARE and has been rated as follows:

  • Version Tested:
  • Documentation:
  • Responsiveness:
  • FIWARE Testing:

Maintainers

@sonsoleslp.

Roadmap

The list of features that are planned for the subsequent release are available in the ROADMAP file.

Contributing

Contribution guidelines are detailed in the CONTRIBUTIONS file.

Testing

In order to test the code run:

mvn clean test -Dtest=*Test cobertura:cobertura coveralls:report -Padd-dependencies-for-IDEA

License

Cosmos is licensed under Affero General Public License (GPL) version 3.

Are there any legal issues with AGPL 3.0? Is it safe for me to use?

There is absolutely no problem in using a product licensed under AGPL 3.0. Issues with GPL (or AGPL) licenses are mostly related with the fact that different people assign different interpretations on the meaning of the term “derivate work” used in these licenses. Due to this, some people believe that there is a risk in just using software under GPL or AGPL licenses (even without modifying it).

For the avoidance of doubt, the owners of this software licensed under an AGPL-3.0 license
wish to make a clarifying public statement as follows:

Please note that software derived as a result of modifying the source code of this software in order to fix a bug or incorporate enhancements is considered a derivative work of the product. Software that merely uses or aggregates (i.e. links to) an otherwise unmodified version of existing software is not considered a derivative work, and therefore it does not need to be released as under the same license, or even released as open source.