diff --git a/Dockerfile b/Dockerfile
index 16a3a73..1668dc7 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -8,10 +8,17 @@ RUN apt-get update
# Auto-accept the Oracle JDK license
RUN echo oracle-java8-installer shared/accepted-oracle-license-v1-1 select true | sudo /usr/bin/debconf-set-selections
+RUN mkdir /etc/service/onyx_peer
+RUN mkdir /etc/service/aeron
+
RUN apt-get install -y oracle-java8-installer
ADD target/desdemona-0.1.0-SNAPSHOT-standalone.jar /srv/desdemona.jar
-ADD script/run-peers.sh /srv/run-peers.sh
+ADD script/run_peers.sh /etc/service/onyx_peer/run
+ADD script/run_aeron.sh /etc/service/aeron/run
+
+EXPOSE 40200/tcp
+EXPOSE 40200/udp
-ENTRYPOINT ["/bin/sh", "/srv/run-peers.sh"]
+CMD ["/sbin/my_init"]
diff --git a/README.md b/README.md
index b94d8c8..2cd4c8a 100644
--- a/README.md
+++ b/README.md
@@ -34,6 +34,45 @@ You can launch a sample job as follows:
(desdemona.jobs.sample-submit-job/submit-job user/system)
```
+### Using Docker Compose
+
+Start the cluster:
+
+```
+docker-compose up
+```
+
+Wait until it's all started. It should say this and then wait:
+
+```
+peer_1 | Started peers. Blocking forever.
+```
+
+Make sure you create the Kafka topic:
+
+```
+docker run --rm -it --link desdemona_kafka_1:kafka1 kafka bash -c "\$KAFKA_HOME/bin/kafka-console-producer.sh --topic test1 --broker-list=kafka1:9092"
+```
+
+And the MySQL database:
+
+```
+docker run -it --link desdemona_db_1:mysql --rm mysql sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
+```
+
+```
+use logs;
+CREATE TABLE logLines (id int primary key auto_increment, line text);
+```
+
+Now you can submit the job:
+
+```
+ZOOKEEPER=$(echo $DOCKER_HOST|cut -d ':' -f 2|sed "s/\/\///g") lein run -m desdemona.jobs.sample-submit-job
+```
+
+Anything you send to syslog on that Docker host (there's a syslog-ng relay running as a container) will appear in MySQL.
+
### Production Mode Peers
First start the Aeron media driver, which should be used in production
diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644
index 0000000..0b200b0
--- /dev/null
+++ b/docker-compose.yml
@@ -0,0 +1,46 @@
+peer:
+ image: 'desdemona:0.1.0'
+ links:
+ - zookeeper:zk
+ - kafka:kafka
+ - db:db
+ - syslog-ng:syslog-ng
+ environment:
+ ONYX_ID: 1
+ NPEERS: 6
+ privileged: true
+ expose:
+ - "40200"
+ - "40200/udp"
+zookeeper:
+ image: 'wurstmeister/zookeeper'
+ ports:
+ - '2181:2181'
+kafka:
+ image: 'wurstmeister/kafka'
+ environment:
+ KAFKA_BROKER_ID: 1
+ HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
+ links:
+ - zookeeper:zk
+ ports:
+ - "9092:9092"
+ volumes:
+ - /var/run/docker.sock:/var/run/docker.sock
+db:
+ image: mysql
+ environment:
+ MYSQL_ALLOW_EMPTY_PASSWORD: 'true'
+ MYSQL_USER: onyx
+ MYSQL_PASSWORD: onyx
+ MYSQL_DATABASE: logs
+ ports:
+ - "3306:3306"
+syslog-ng:
+ build: script/syslog-ng
+ links:
+ - kafka:kafka
+ volumes:
+ - ./script/syslog-ng/syslog-ng.conf:/etc/syslog-ng/syslog-ng.conf
+ ports:
+ - "601:601"
diff --git a/env/dev/desdemona/dev_inputs/sample_input.clj b/env/dev/desdemona/dev_inputs/sample_input.clj
index 555ff69..127a022 100644
--- a/env/dev/desdemona/dev_inputs/sample_input.clj
+++ b/env/dev/desdemona/dev_inputs/sample_input.clj
@@ -1,72 +1,14 @@
(ns desdemona.dev-inputs.sample-input)
-;; Story from http://textfiles.com/stories/advtthum.txt
-;; Warning, I haven't read it. :)
-
(def lines
- [{:line " THE ADVENTURES OF TOM THUMB"}
+ [{:line "2016-01-29T21:44:01+00:00 192.168.99.1 102 <1> Jan 29 15:44:01 lips.local sean[68673]: hello ernie, it's 2016-01-29 15:44:01.150100217 -0600 CST"}
+ {:line ""}
+ {:line "2016-01-29T21:44:02+00:00 192.168.99.1 102 <1> Jan 29 15:44:01 lips.local sean[68673]: hello ernie, it's 2016-01-29 15:44:01.654799006 -0600 CST"}
+ {:line ""}
+ {:line "2016-01-29T21:44:02+00:00 192.168.99.1 102 <1> Jan 29 15:44:02 lips.local sean[68673]: hello ernie, it's 2016-01-29 15:44:02.159437592 -0600 CST"}
+ {:line ""}
+ {:line "2016-01-29T21:44:03+00:00 192.168.99.1 102 <1> Jan 29 15:44:02 lips.local sean[68673]: hello ernie, it's 2016-01-29 15:44:02.662554386 -0600 CST"}
+ {:line ""}
+ {:line "2016-01-29T21:44:03+00:00 192.168.99.1 102 <1> Jan 29 15:44:03 lips.local sean[68673]: hello ernie, it's 2016-01-29 15:44:03.164769137 -0600 CST"}
{:line ""}
- {:line " Once upon a time . . . there lived a giant who had quarrelled with a very "}
- {:line "greedy wizard over sharing a treasure. After the quarrel, the giant said "}
- {:line "menacingly to the wizard:"}
- {:line " \"I could crush you under my thumb if I wanted to! Now, get out of my "}
- {:line "sight!\" The wizard hurried away, but from a safe distance, he hurled his "}
- {:line "terrible revenge."}
- {:line " \"Abracadabra! Here I cast this spell! May the son, your wife will shortly "}
- {:line "give you, never grow any taller than my own thumb!\""}
- {:line " After Tom Thumb was born, his parents were at their wits' end. They could "}
- {:line "never find him, for they could barely see him. They had to speak in whispers "}
- {:line "for fear of deafening the little boy. Tom Thumb preferred playing with the "}
- {:line "little garden creatures, to the company of parents so different from himself. "}
- {:line "He rode piggyback on the snail and danced with the ladybirds. Tiny as he was, "}
- {:line "he had great fun in the world of little things."}
- {:line " But one unlucky day, he went to visit a froggy friend. No sooner had he "}
- {:line "scrambled onto a leaf than a large pike swallowed him up. But the pike too was"}
- {:line "fated to come to a very bad end. A little later, he took the bait cast by one "}
- {:line "of the King's fishermen, and before long, found himself under the cook's knife"}
- {:line "in the royal kitchens. And great was everyone's surprise when, out of the "}
- {:line "fish's stomach, stepped Tom Thumb, quite alive and little the worse for his "}
- {:line "adventure."}
- {:line " \"What am I to do with this tiny lad?\" said the cook to himself. Then he had"}
- {:line "a brainwave. \"He can be a royal pageboy! He's so tiny, I can pop him into the "}
- {:line "cake I'm making. When he marches across the bridge, sounding the trumpet "}
- {:line "everyone will gasp in wonder!\" Never had such a marvel been seen at Court. The"}
- {:line "guests clapped excitedly at the cook's skill and the King himself clapped "}
- {:line "loudest of all. The King rewarded the clever cook with a bag of gold. Tom "}
- {:line "Thumb was even luckier. The cook made him a pageboy, and a pageboy he remained,"}
- {:line "enjoying all the honours of his post."}
- {:line " He had a white mouse for a mount, a gold pin for a sword and he was allowed"}
- {:line "to eat the King's food. In exchange, he marched up and down the table at "}
- {:line "banquets. He picked his way amongst the plates and glasses amusing the guests"}
- {:line "with his trumpet."}
- {:line " What Tom Thumb didn't know was that he had made an enemy. The cat which, "}
- {:line "until Tom's arrival, had been the King's pet, was now forgotten. And, vowing "}
- {:line "to have its revenge on the newcomer, it ambushed Tom in the garden. When Tom "}
- {:line "saw the cat, he did not run away, as the creature had intended. He whipped out"}
- {:line "his gold pin and cried to his white mouse mount:"}
- {:line " \"Charge! Charge!\" Jabbed by the tiny sword, the cat turned tail and fled. "}
- {:line "Since brute force was not the way to revenge, the cat decided to use guile. "}
- {:line "Casually pretending to bump into the King as he walked down the staircase, the"}
- {:line "cat softly miaowed:"}
- {:line " \"Sire! Be on your guard! A plot is being hatched against your life!\" And"}
- {:line "then he told a dreadful lie. \"Tom Thumb is planning to lace your food with "}
- {:line "hemlock. I saw him picking the leaves in the garden the other day. heard him "}
- {:line "say these very words!\""}
- {:line " Now, the King had once been kept in bed with very bad tummy pains, after "}
- {:line "eating too many chernes and he feared the thought of being poisoned, so he "}
- {:line "sent for Tom Thumb. The cat provided proof of his words by pulling a hemlock "}
- {:line "leaf from under the white mouse's saddle cloth, where he had hidden it "}
- {:line "himself."}
- {:line " Tom Thumb was so amazed, he was at a loss for words to deny what the cat "}
- {:line "had said. The King, withiut further ado, had him thrown into prison. And since"}
- {:line "he was so tiny, they locked him up in a pendulum clock. The hours passed and"}
- {:line "the days too. Tom's only pastime was swinging back and forth, clinging to the "}
- {:line "pendulum, util the night when he attracted the attention of a big night moth,"}
- {:line "fluttering round the room."}
- {:line " \"Let me out!\" cried Tom Thumb, tapping on the glass. As it so happens, the "}
- {:line "moth had only just been set free after being a prisoner in a large box, in "}
- {:line "which she had taken a nap. So she took pity on Tom Thumb and released him."}
- {:line " 'll take you to the Butterfly Kingdom, where everyone's tiny like burself. "}
- {:line "They'll take care of you there!\" And that is what happened. To this day, if "}
- {:line "you visit the Butterfly Kingdom, you can ask to see the Butterfly monument "}
- {:line "that Tom Thumb built after this amazing adventrure."}])
+ {:line "2016-01-29T21:44:04+00:00 192.168.99.1 102 <1> Jan 29 15:44:03 lips.local sean[68673]: hello ernie, it's 2016-01-29 15:44:03.669196559 -0600 CST"}])
diff --git a/env/dev/user.clj b/env/dev/user.clj
deleted file mode 100644
index 3893756..0000000
--- a/env/dev/user.clj
+++ /dev/null
@@ -1,25 +0,0 @@
-(ns user
- (:require [clojure.tools.namespace.repl :refer [refresh set-refresh-dirs]]
- [com.stuartsierra.component :as component]
- [desdemona.launcher.dev-system :refer [onyx-dev-env]]))
-
-(set-refresh-dirs "src" "test")
-
-(def system nil)
-
-(defn init [n-peers]
- (alter-var-root #'system (constantly (onyx-dev-env n-peers))))
-
-(defn start []
- (alter-var-root #'system component/start))
-
-(defn stop []
- (alter-var-root #'system (fn [s] (when s (component/stop s)))))
-
-(defn go [n-peers]
- (init n-peers)
- (start))
-
-(defn reset []
- (stop)
- (refresh))
diff --git a/project.clj b/project.clj
index 75d211b..fbab1f4 100644
--- a/project.clj
+++ b/project.clj
@@ -4,7 +4,14 @@
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.8.0"]
- [org.onyxplatform/onyx "0.8.2"]
+ [org.onyxplatform/onyx "0.8.8"]
+ [org.onyxplatform/onyx-sql "0.8.8.0"]
+ [mysql/mysql-connector-java "5.1.18"]
+ [org.onyxplatform/onyx-kafka "0.8.8.0"]
+ [org.onyxplatform/onyx-seq "0.8.8.0"]
+ [cheshire "5.5.0"]
+ [org.clojure/tools.cli "0.3.3"]
+ [aero "0.1.3"]
[org.clojure/core.logic "0.8.10"]]
:plugins [[lein-cljfmt "0.3.0"]
[lein-cloverage "1.0.7-SNAPSHOT"]
diff --git a/resources/config.edn b/resources/config.edn
new file mode 100644
index 0000000..269a8b5
--- /dev/null
+++ b/resources/config.edn
@@ -0,0 +1,24 @@
+{:env-config
+ {:onyx/id #cond {:default #env ONYX_ID
+ :dev "1"}
+ :onyx.bookkeeper/server? true
+ :onyx.bookkeeper/delete-server-data? true
+ :zookeeper/address #cond {:default #env [ZOOKEEPER "zk:2181"]
+ :test #env [ZOOKEEPER "127.0.0.1:2181"]}
+ :zookeeper/server? #cond {:default false
+ :test true}
+ :zookeeper.server/port 2181}
+ :peer-config
+ {:onyx/id #cond {:default #env ONYX_ID
+ :dev "1"}
+ :zookeeper/address #cond {:default #env [ZOOKEEPER "zk:2181"]
+ :dev #env [ZOOKEEPER "192.168.99.100:2181"]}
+ :onyx.peer/job-scheduler :onyx.job-scheduler/greedy
+ :onyx.peer/zookeeper-timeout 60000
+ :onyx.messaging/allow-short-circuit? true
+ :onyx.messaging/impl :aeron
+ ;; Change "localhost" to a resolvable hostname
+ ;; by any node in your cluster.
+ :onyx.messaging/bind-addr #cond {:default #env [BIND_ADDR "localhost"]}
+ :onyx.messaging/peer-port 40200
+ :onyx.messaging.aeron/embedded-driver? false}}
diff --git a/resources/prod-peer-config.edn b/resources/prod-peer-config.edn
deleted file mode 100644
index d640ef5..0000000
--- a/resources/prod-peer-config.edn
+++ /dev/null
@@ -1,8 +0,0 @@
-{:zookeeper/address "127.0.0.1:2188"
- :onyx.peer/job-scheduler :onyx.job-scheduler/greedy
- :onyx.messaging/impl :aeron
- ;; Change "localhost" to a resolvable hostname
- ;; by any node in your cluster.
- :onyx.messaging/bind-addr "localhost"
- :onyx.messaging/peer-port 40200
- :onyx.messaging.aeron/embedded-driver? false}
diff --git a/resources/sample_input.edn b/resources/sample_input.edn
new file mode 100644
index 0000000..658dc89
--- /dev/null
+++ b/resources/sample_input.edn
@@ -0,0 +1,21 @@
+[{"venue_visibility" "public", "payment_required" "0", "yes_rsvp_count" 4, "group" {"group_lon" -123.14, "country" "ca", "city" "Vancouver", "group_lat" 49.26, "id" 1734488, "group_photo" {"highres_link" "http://photos4.meetupstatic.com/photos/event/1/d/3/highres_431820467.jpeg", "photo_link" "http://photos4.meetupstatic.com/photos/event/1/d/3/600_431820467.jpeg", "photo_id" 431820467, "thumb_link" "http://photos2.meetupstatic.com/photos/event/1/d/3/thumb_431820467.jpeg"}, "name" "Kitsilano Business Leaders", "state" "BC", "category" {"name" "career/business", "id" 2, "shortname" "career-business"}, "urlname" "KitsilanoBusinessLeaders", "join_mode" "open"}, "id" "227142972", "utc_offset" -28800000, "visibility" "public", "name" "12 Steps How to Build Mobile App to Rock Your Business", "maybe_rsvp_count" 0, "status" "upcoming", "venue" {"country" "ca", "city" "Vancouver", "address_1" "2941 West Broadway", "name" "G&F Financial Group", "lon" -123.170868, "state" "BC", "lat" 49.26437}, "time" 1452783600000, "rsvp_limit" 60, "description" "
Do you have a mobile app for your business? Do you have an effective mobile strategy that involves more than just having a mobile-friendly website?
\nFollowing Melanie Haselmayr's Forbes article, we are featuring two local entrepreneurs who will share the 12 steps they took to build a dynamite app for their businesses. http://www.forbes.com/sites/allbusiness/2014/11/17/heres-why-your-business-needs-its-own-mobile-app/
\n
\n Ria Mizera, founder of ClosetRelay,
\n![](\"http://photos3.meetupstatic.com/photos/event/7/b/b/b/600_445111675.jpeg\")
\n
\n
\n![](\"http://photos1.meetupstatic.com/photos/event/7/c/8/3/600_445111875.jpeg\")
\nKristoffer Vik Hansen, co-founder of Spare
\n
\nEarly risers, come and kickstart your day once a month with insightful discussions on current business challenges.
\nMEETING SCHEDULE
\n6:50 - 7:20 am: Sign in, get your coffee and muffin early. Be ready to develop business trust and camaraderie.
\n7:20 - 7:45 am: Muffin Sponsor (5 min.), Success Circle...15 second insight on who you are and how your business can help others to succeed.
\n7:45 - 8:30 am: Keynote presentation
\n8:30 - 9:00 am: Door prizes; share and learn camaraderie continues
\nDOOR PRIZE donors: Contact Judy if you have a product/service to promote.
\nMUFFIN SPONSOR: Contact Judy how you can be muffin sponsor of the month and promote your business for 5 minutes to a captive audience.
\n
\n
", "event_url" "http://www.meetup.com/KitsilanoBusinessLeaders/events/227142972/", "mtime" 1450129667481}
+ {"venue_visibility" "members", "payment_required" "0", "yes_rsvp_count" 1, "group" {"group_lon" -111.75, "country" "us", "city" "Gilbert", "group_lat" 33.36, "id" 5391722, "group_photo" {"highres_link" "http://photos4.meetupstatic.com/photos/event/e/2/4/highres_437583620.jpeg", "photo_link" "http://photos4.meetupstatic.com/photos/event/e/2/4/600_437583620.jpeg", "photo_id" 437583620, "thumb_link" "http://photos2.meetupstatic.com/photos/event/e/2/4/thumb_437583620.jpeg"}, "name" "Chandler Gilbert Ladies and Gents Singles Over 45", "state" "AZ", "category" {"name" "parents/family", "id" 25, "shortname" "parents-family"}, "urlname" "meetup-group-HwhOSVfy", "join_mode" "open"}, "id" "227421769", "utc_offset" -25200000, "visibility" "public", "name" "Let's meetup for a walk...", "maybe_rsvp_count" 0, "status" "suggested", "time" 1452524400000, "duration" 5400000, "rsvp_limit" 0, "description" "Let's meetup for some fun and fitness while walking at the Riparian Preserve.
", "event_url" "http://www.meetup.com/meetup-group-HwhOSVfy/events/227421769/", "mtime" 1450129667292}
+ {"venue_visibility" "members",
+ "payment_required" "0",
+ "yes_rsvp_count" 1,
+ "group" {"group_lon" -118.33, "country" "us", "city" "Los Angeles", "group_lat" 34.1, "id" 511174,
+ "group_photo" {"highres_link" "http://photos3.meetupstatic.com/photos/event/5/5/1/d/highres_436221789.jpeg",
+ "photo_link" "http://photos3.meetupstatic.com/photos/event/5/5/1/d/600_436221789.jpeg",
+ "photo_id" 436221789,
+ "thumb_link" "http://photos3.meetupstatic.com/photos/event/5/5/1/d/thumb_436221789.jpeg"},
+ "name" "L.A. Foodies", "state" "CA",
+ "category" {"name" "food/drink",
+ "id" 10, "shortname" "food-drink"},
+ "urlname" "Foodies", "join_mode" "approval"},
+ "id" "227420766", "utc_offset" -28800000, "visibility" "public",
+ "name" "$85 five-course tasting menu @ Trois Mec!",
+ "maybe_rsvp_count" 0, "status" "upcoming",
+ "time" 1452304800000, "rsvp_limit" 6,
+ "event_url" "http://www.meetup.com/Foodies/events/227420766/",
+ "mtime" 1450129671307}
+ :done]
diff --git a/resources/table.sql b/resources/table.sql
new file mode 100644
index 0000000..d036c6e
--- /dev/null
+++ b/resources/table.sql
@@ -0,0 +1 @@
+CREATE TABLE logLines (id int primary key auto_increment, line text);
diff --git a/script/build.sh b/script/build.sh
old mode 100644
new mode 100755
diff --git a/script/run-container.sh b/script/run-container.sh
deleted file mode 100644
index e2ec7a6..0000000
--- a/script/run-container.sh
+++ /dev/null
@@ -1,15 +0,0 @@
-#!/usr/bin/env bash
-set -o errexit
-set -o nounset
-set -o xtrace
-
-if [[ "$#" -ne 2 ]]; then
- echo "Usage: $0 onyx-id n-peers"
- echo "Example: $0 3424312384i3423 4"
- exit 1
-fi
-
-ONYX_ID=$1
-NPEERS=$2
-
-docker run --privileged -e ONYX_ID=$ONYX_ID -e NPEERS=$NPEERS -d desdemona:0.1.0
diff --git a/script/run-peers.sh b/script/run-peers.sh
deleted file mode 100755
index d739053..0000000
--- a/script/run-peers.sh
+++ /dev/null
@@ -1,10 +0,0 @@
-#!/usr/bin/env bash
-set -o errexit
-set -o nounset
-set -o xtrace
-
-echo "Setting shared memory for Aeron"
-mount -t tmpfs -o remount,rw,nosuid,nodev,noexec,relatime,size=256M tmpfs /dev/shm
-APP_NAME=$(echo "desdemona" | sed s/"-"/"_"/g)
-java -cp /srv/desdemona.jar "$APP_NAME.launcher.aeron_media_driver" &
-java -cp /srv/desdemona.jar "$APP_NAME.launcher.launch_prod_peers" $ONYX_ID $NPEERS
diff --git a/script/run_aeron.sh b/script/run_aeron.sh
new file mode 100755
index 0000000..58bdf86
--- /dev/null
+++ b/script/run_aeron.sh
@@ -0,0 +1,7 @@
+#!/bin/sh
+
+echo "Setting shared memory for Aeron"
+mount -t tmpfs -o remount,rw,nosuid,nodev,noexec,relatime,size=256M tmpfs /dev/shm
+APP_NAME=$(echo "desdemona" | sed s/"-"/"_"/g)
+
+exec java -cp /srv/desdemona.jar "$APP_NAME.launcher.aeron_media_driver" >>/var/log/aeron.log 2>&1
diff --git a/script/run_peers.sh b/script/run_peers.sh
new file mode 100755
index 0000000..f297749
--- /dev/null
+++ b/script/run_peers.sh
@@ -0,0 +1,8 @@
+#!/usr/bin/env bash
+set -o errexit
+set -o nounset
+set -o xtrace
+
+export BIND_ADDR=$(hostname --ip-address)
+export APP_NAME=$(echo "desdemona" | sed s/"-"/"_"/g)
+exec java -cp /srv/desdemona.jar "$APP_NAME.launcher.launch_prod_peers" $NPEERS
diff --git a/script/syslog-ng/Dockerfile b/script/syslog-ng/Dockerfile
new file mode 100644
index 0000000..371de53
--- /dev/null
+++ b/script/syslog-ng/Dockerfile
@@ -0,0 +1,24 @@
+FROM debian:latest
+MAINTAINER Andras Mitzki
+
+RUN apt-get update -qq && apt-get install -y \
+ wget
+
+RUN wget -qO - http://download.opensuse.org/repositories/home:/laszlo_budai:/syslog-ng/Debian_8.0/Release.key | apt-key add -
+RUN echo 'deb http://download.opensuse.org/repositories/home:/laszlo_budai:/syslog-ng/Debian_8.0 ./' | tee --append /etc/apt/sources.list.d/syslog-ng-obs.list
+
+RUN apt-get update -qq && apt-get install -y \
+ syslog-ng
+
+ADD openjdk-libjvm.conf /etc/ld.so.conf.d/openjdk-libjvm.conf
+RUN ldconfig
+
+RUN wget -q http://artfiles.org/apache.org/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz -O /tmp/kafka_2.11-0.9.0.0.tgz
+RUN tar xfz /tmp/kafka_2.11-0.9.0.0.tgz -C /opt
+RUN ln -s /opt/kafka_2.11-0.9.0.0 /opt/kafka
+
+EXPOSE 514
+EXPOSE 601
+EXPOSE 6514
+
+ENTRYPOINT ["/usr/sbin/syslog-ng", "-F"]
diff --git a/script/syslog-ng/openjdk-libjvm.conf b/script/syslog-ng/openjdk-libjvm.conf
new file mode 100644
index 0000000..433be80
--- /dev/null
+++ b/script/syslog-ng/openjdk-libjvm.conf
@@ -0,0 +1 @@
+/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/amd64/server/
diff --git a/script/syslog-ng/syslog-ng.conf b/script/syslog-ng/syslog-ng.conf
new file mode 100644
index 0000000..c3924c5
--- /dev/null
+++ b/script/syslog-ng/syslog-ng.conf
@@ -0,0 +1,30 @@
+@version: 3.7
+@module mod-java
+@include "scl.conf"
+
+source s_tcp {
+ network(port(601));
+};
+
+destination d_file {
+ file("/var/log/testlogs");
+};
+
+destination d_kafka {
+ kafka(
+ client_lib_dir("/opt/kafka/libs/*.jar:/usr/share/kafka/lib/*.jar")
+ kafka_bootstrap_servers("kafka:9092")
+ topic("test1")
+ );
+};
+
+
+log {
+ source(s_tcp);
+ destination(d_file);
+};
+
+log {
+ source(s_tcp);
+ destination(d_kafka);
+};
diff --git a/src/desdemona/catalogs/sample_catalog.clj b/src/desdemona/catalogs/sample_catalog.clj
index 5b488d5..8d7a916 100644
--- a/src/desdemona/catalogs/sample_catalog.clj
+++ b/src/desdemona/catalogs/sample_catalog.clj
@@ -1,4 +1,5 @@
-(ns desdemona.catalogs.sample-catalog)
+(ns desdemona.catalogs.sample-catalog
+ (:require [desdemona.functions.sample-functions :refer [transform-segment-shape prepare-rows]]))
;;; Catalogs describe each task in a workflow. We use
;;; them for describing input and output sources, injecting parameters,
@@ -7,35 +8,17 @@
(defn build-catalog
([] (build-catalog 5 50))
([batch-size batch-timeout]
- [{:onyx/name :read-lines
- :onyx/plugin :desdemona.plugins.http-reader/reader
- :onyx/type :input
- :onyx/medium :http
- :http/uri "http://textfiles.com/stories/abbey.txt"
- :onyx/batch-size batch-size
- :onyx/batch-timeout batch-timeout
- :onyx/max-peers 1
- :onyx/doc "Reads lines from an HTTP url text file"}
-
- {:onyx/name :format-line
- :onyx/fn :desdemona.functions.sample-functions/format-line
+ [{:onyx/name :extract-line-info
+ :onyx/fn :desdemona.functions.sample-functions/transform-segment-shape
:onyx/type :function
:onyx/batch-size batch-size
:onyx/batch-timeout batch-timeout
- :onyx/doc "Strips the line of any leading or trailing whitespace"}
+ :keypath {"line" [:line]}
+ :onyx/params [:keypath]
+ :onyx/doc "Extracts the line"}
- {:onyx/name :upper-case
- :onyx/fn :desdemona.functions.sample-functions/upper-case
+ {:onyx/name :prepare-rows
+ :onyx/fn :desdemona.functions.sample-functions/prepare-rows
:onyx/type :function
:onyx/batch-size batch-size
- :onyx/batch-timeout batch-timeout
- :onyx/doc "Capitalizes the first letter of the line"}
-
- {:onyx/name :write-lines
- :onyx/plugin :onyx.plugin.core-async/output
- :onyx/type :output
- :onyx/medium :core.async
- :onyx/batch-size batch-size
- :onyx/batch-timeout batch-timeout
- :onyx/max-peers 1
- :onyx/doc "Writes segments to a core.async channel"}]))
+ :onyx/batch-timeout batch-timeout}]))
diff --git a/src/desdemona/flow_conditions/sample_flow_conditions.clj b/src/desdemona/flow_conditions/sample_flow_conditions.clj
index 47adc7b..375f033 100644
--- a/src/desdemona/flow_conditions/sample_flow_conditions.clj
+++ b/src/desdemona/flow_conditions/sample_flow_conditions.clj
@@ -1,31 +1,4 @@
(ns desdemona.flow-conditions.sample-flow-conditions)
(def flow-conditions
- [{:flow/from :upper-case
- :flow/to [:write-lines]
- :flow/short-circuit? true
- :flow/thrown-exception? true
- :flow/post-transform :desdemona.flow-conditions.sample-flow-conditions/substitute-segment
- :flow/predicate :desdemona.flow-conditions.sample-flow-conditions/npe?
- :flow/doc "Send a canned value if this segment threw a NullPointerException."}
- {:flow/from :format-line
- :flow/to [:upper-case]
- :param/disallow-char \B
- :param/max-line-length 60
- :flow/predicate [:and
- [:not [:desdemona.flow-conditions.sample-flow-conditions/starts-with? :param/disallow-char]]
- [:desdemona.flow-conditions.sample-flow-conditions/within-length? :param/max-line-length]]
- :flow/doc "Output the line if it doesn't start with 'B' and is less than 60 characters"}])
-
-(defn starts-with? [event old {:keys [line]} all-new disallowed-char]
- (= (first line) disallowed-char))
-
-(defn within-length? [event old {:keys [line]} all-new max-length]
- (<= (count line) max-length))
-
-(defn npe? [event old ex all-new]
- (= (class ex) java.lang.NullPointerException))
-
-(defn substitute-segment [event ex]
- {:line "<<< Blank line was here >>>"})
-
+ [])
diff --git a/src/desdemona/functions/sample_functions.clj b/src/desdemona/functions/sample_functions.clj
index 6e60479..d74bb2b 100644
--- a/src/desdemona/functions/sample_functions.clj
+++ b/src/desdemona/functions/sample_functions.clj
@@ -1,15 +1,21 @@
(ns desdemona.functions.sample-functions
- (:require [clojure.string :refer [trim capitalize]]))
+ (:require [clojure
+ [walk :refer [postwalk]]]))
;;; Defines functions to be used by the peers. These are located
;;; with fully qualified namespaced keywords, such as
;;; desdemona.functions.sample-functions/format-line
-(defn format-line [segment]
- (update-in segment [:line] trim))
+(defn transform-segment-shape
+ "Recursively restructures a segment {:new-key [paths...]}"
+ [paths segment]
+ (try (let [f (fn [[k v]]
+ (if (vector? v)
+ [k (get-in segment v)]
+ [k v]))]
+ (postwalk (fn [x] (if (map? x) (into {} (map f x)) x)) paths))
+ (catch Exception e
+ segment)))
-(defn upper-case [{:keys [line] :as segment}]
- (if (seq line)
- (let [upper-cased (apply str (capitalize (first line)) (rest line))]
- (assoc-in segment [:line] upper-cased))
- segment))
+(defn prepare-rows [segment]
+ {:rows [segment]})
diff --git a/src/desdemona/jobs/sample_submit_job.clj b/src/desdemona/jobs/sample_submit_job.clj
index 4083ac9..5960c80 100644
--- a/src/desdemona/jobs/sample_submit_job.clj
+++ b/src/desdemona/jobs/sample_submit_job.clj
@@ -1,33 +1,46 @@
(ns desdemona.jobs.sample-submit-job
- (:require [clojure.java.io :refer [resource]]
- [com.stuartsierra.component :as component]
- [desdemona.workflows.sample-workflow :refer [workflow]]
- [desdemona.catalogs.sample-catalog :refer [build-catalog] :as sc]
- [desdemona.lifecycles.sample-lifecycle :refer [build-lifecycles] :as sl]
- [desdemona.flow-conditions.sample-flow-conditions :as sf]
- [desdemona.functions.sample-functions]
- [desdemona.dev-inputs.sample-input :as dev-inputs]
- [desdemona.utils :as u]
+ (:require [desdemona.catalogs.sample-catalog :refer [build-catalog]]
+ [desdemona.tasks.kafka :refer [add-kafka-input add-kafka-output]]
+ [desdemona.tasks.sql :refer [add-sql-partition-input add-sql-insert-output]]
+ [desdemona.lifecycles.sample-lifecycle :refer [build-lifecycles]]
+ [desdemona.lifecycles.metrics :refer [add-metrics]]
+ [desdemona.lifecycles.logging :refer [add-logging]]
+ [desdemona.workflows.sample-workflow :refer [build-workflow]]
+ [aero.core :refer [read-config]]
[onyx.api]))
-(defn submit-job [dev-env]
- (let [dev-cfg (-> "dev-peer-config.edn" resource slurp read-string)
- peer-config (assoc dev-cfg :onyx/id (:onyx-id dev-env))
- ;; Turn :read-lines and :write-lines into core.async I/O channels
- stubs [:read-lines :write-lines]
- ;; Stubs the catalog entries for core.async I/O
- dev-catalog (u/in-memory-catalog (build-catalog 20 50) stubs)
- ;; Stubs the lifecycles for core.async I/O
- dev-lifecycles (u/in-memory-lifecycles (build-lifecycles) dev-catalog stubs)]
- ;; Automatically pipes the data structure into the channel, attaching :done at the end
- (u/bind-inputs! dev-lifecycles {:read-lines dev-inputs/lines})
- (let [job {:workflow workflow
- :catalog dev-catalog
- :lifecycles dev-lifecycles
- :flow-conditions sf/flow-conditions
- :task-scheduler :onyx.task-scheduler/balanced}]
- (onyx.api/submit-job peer-config job)
- ;; Automatically grab output from the stubbed core.async channels,
- ;; returning a vector of the results with data structures representing
- ;; the output.
- (u/collect-outputs! dev-lifecycles [:write-lines]))))
+;;;;
+;; Lets build a job
+;; Since we always run in Docker Compose, kafka is added as an input, and onyx-sql is used as the output
+
+(defn build-job []
+ (let [batch-size 1
+ batch-timeout 1000
+ base-job {:catalog (build-catalog batch-size batch-timeout)
+ :lifecycles (build-lifecycles)
+ :workflow (build-workflow)
+ :task-scheduler :onyx.task-scheduler/balanced}]
+ (-> base-job
+ (add-kafka-input :read-lines {:onyx/batch-size batch-size
+ :onyx/max-peers 1
+ :kafka/topic "test1"
+ :kafka/group-id "onyx-consumer"
+ :kafka/zookeeper "zk:2181"
+ :kafka/deserializer-fn :desdemona.tasks.kafka/deserialize-message-raw
+ :kafka/offset-reset :smallest})
+ (add-sql-insert-output :write-lines {:onyx/batch-size batch-size
+ :sql/classname "com.mysql.jdbc.Driver"
+ :sql/subprotocol "mysql"
+ :sql/subname "//db:3306/logs"
+ :sql/user "onyx"
+ :sql/password "onyx"
+ :sql/table :logLines})
+ (add-logging :read-lines)
+ (add-logging :write-lines))))
+
+(defn -main [& args]
+ (let [config (read-config (clojure.java.io/resource "config.edn") {:profile :dev})
+ peer-config (get config :peer-config)
+ job (build-job)]
+ (let [{:keys [job-id]} (onyx.api/submit-job peer-config job)]
+ (println "Submitted job: " job-id))))
diff --git a/src/desdemona/launcher/aeron_media_driver.clj b/src/desdemona/launcher/aeron_media_driver.clj
index d28ea0a..b6d1eea 100644
--- a/src/desdemona/launcher/aeron_media_driver.clj
+++ b/src/desdemona/launcher/aeron_media_driver.clj
@@ -1,11 +1,38 @@
(ns desdemona.launcher.aeron-media-driver
(:gen-class)
- (:require [clojure.core.async :refer [chan (MediaDriver$Context.)
+ threading-mode (.threadingMode threading-mode-obj)
+ delete-dirs (.dirsDeleteOnStart delete-dirs))
+ media-driver (try (MediaDriver/launch ctx)
+ (catch IllegalStateException ise (throw (Exception. "Error starting media driver. This may be due to a media driver data incompatibility between versions. Check that no other media driver has been started and then use -d to delete the directory on startup" ise))))]
(println "Launched the Media Driver. Blocking forever...")
(OnyxDevEnv n-peers))
diff --git a/src/desdemona/launcher/launch_prod_peers.clj b/src/desdemona/launcher/launch_prod_peers.clj
index a8c1e3d..8b28030 100644
--- a/src/desdemona/launcher/launch_prod_peers.clj
+++ b/src/desdemona/launcher/launch_prod_peers.clj
@@ -1,19 +1,38 @@
(ns desdemona.launcher.launch-prod-peers
- (:gen-class)
- (:require [clojure.core.async :refer [chan "prod-peer-config.edn"
- resource slurp read-string) :onyx/id onyx-id)
+ config (read-config (clojure.java.io/resource "config.edn") {:profile :default})
+ peer-config (assoc
+ (:peer-config config)
+ :onyx.log/config
+ {:appenders
+ {:standard-out
+ {:enabled? true,
+ :async? false,
+ :output-fn t/default-output-fn,
+ :fn standard-out-logger}}})
peer-group (onyx.api/start-peer-group peer-config)
+ env (onyx.api/start-env (:env-config config))
peers (onyx.api/start-peers n-peers peer-group)]
+ (println "Attempting to connect to Zookeeper: " (:zookeeper/address peer-config))
(.addShutdownHook (Runtime/getRuntime)
(Thread.
(fn []
diff --git a/src/desdemona/launcher/submit_prod_sample_job.clj b/src/desdemona/launcher/submit_prod_sample_job.clj
deleted file mode 100644
index b328f0a..0000000
--- a/src/desdemona/launcher/submit_prod_sample_job.clj
+++ /dev/null
@@ -1,19 +0,0 @@
-(ns desdemona.launcher.submit-prod-sample-job
- (:require [clojure.java.io :refer [resource]]
- [desdemona.workflows.sample-workflow :refer [workflow]]
- [desdemona.catalogs.sample-catalog :refer [build-catalog]]
- [desdemona.lifecycles.sample-lifecycle :as sample-lifecycle]
- [desdemona.functions.sample-functions]
- [onyx.plugin.core-async :refer [take-segments!]]
- [onyx.api]))
-
-(defn -main [onyx-id & args]
- (let [cfg (-> "prod-peer-config.edn" resource slurp read-string)
- peer-config (assoc cfg :onyx/id onyx-id)
- lifecycles (sample-lifecycle/build-lifecycles)]
- (let [job {:workflow workflow
- :catalog (build-catalog 20 50)
- :lifecycles lifecycles
- :task-scheduler :onyx.task-scheduler/balanced}]
- (onyx.api/submit-job peer-config job)
- (shutdown-agents))))
diff --git a/src/desdemona/lifecycles/logging.clj b/src/desdemona/lifecycles/logging.clj
new file mode 100644
index 0000000..42a0725
--- /dev/null
+++ b/src/desdemona/lifecycles/logging.clj
@@ -0,0 +1,22 @@
+(ns desdemona.lifecycles.logging
+ (:require [taoensso.timbre :refer [info]]))
+
+(defn log-batch [event lifecycle]
+ (let [task-name (:onyx/name (:onyx.core/task-map event))]
+ (doseq [m (map :message (mapcat :leaves (:tree (:onyx.core/results event))))]
+ (info task-name " logging segment: " m)))
+ {})
+
+(def log-calls
+ {:lifecycle/after-batch log-batch})
+
+(defn add-logging
+ "Add's logging output to a tasks output-batch. "
+ [job task]
+ (if-let [entry (first (filter #(= (:onyx/name %) task) (:catalog job)))]
+ (update-in
+ job
+ [:lifecycles]
+ conj
+ {:lifecycle/task task,
+ :lifecycle/calls :desdemona.lifecycles.logging/log-calls})))
diff --git a/src/desdemona/lifecycles/metrics.clj b/src/desdemona/lifecycles/metrics.clj
new file mode 100644
index 0000000..e43328f
--- /dev/null
+++ b/src/desdemona/lifecycles/metrics.clj
@@ -0,0 +1,13 @@
+(ns desdemona.lifecycles.metrics)
+
+(defn add-metrics
+ "Add's throughput and latency metrics to a task"
+ [job task opts]
+ (update
+ job
+ :lifecycles
+ conj
+ (merge
+ {:lifecycle/task task
+ :lifecycle/calls :onyx.lifecycle.metrics.metrics/calls}
+ opts)))
diff --git a/src/desdemona/lifecycles/sample_lifecycle.clj b/src/desdemona/lifecycles/sample_lifecycle.clj
index 54fa301..56dae1c 100644
--- a/src/desdemona/lifecycles/sample_lifecycle.clj
+++ b/src/desdemona/lifecycles/sample_lifecycle.clj
@@ -1,32 +1,5 @@
-(ns desdemona.lifecycles.sample-lifecycle
- (:require [clojure.core.async :refer [chan sliding-buffer >!!]]
- [onyx.plugin.core-async :refer [take-segments!]]
- [taoensso.timbre :refer [info]]
- [onyx.static.planning :refer [find-task]]
- [desdemona.utils :as u]))
+(ns desdemona.lifecycles.sample-lifecycle)
-(defn log-batch [event lifecycle]
- (let [task-name (:onyx/name (:onyx.core/task-map event))]
- (doseq [m (map :message (mapcat :leaves (:tree (:onyx.core/results event))))]
- (info task-name " logging segment: " m)))
- {})
-
-(def log-calls
- {:lifecycle/after-batch log-batch})
-
-(defn build-lifecycles []
- [{:lifecycle/task :read-lines
- :lifecycle/calls :desdemona.plugins.http-reader/reader-calls
- :lifecycle/replaceable? true
- :lifecycle/doc "Lifecycle for reading from a core.async chan"}
- {:lifecycle/task :write-lines
- :lifecycle/calls :desdemona.utils/out-calls
- :core.async/id (java.util.UUID/randomUUID)
- :lifecycle/replaceable? true
- :lifecycle/doc "Lifecycle for your output task. When using in-memory-lifecycles, this will be replaced"}
- {:lifecycle/task :write-lines
- :lifecycle/calls :onyx.plugin.core-async/writer-calls
- :lifecycle/doc "Lifecycle for injecting a core.async writer chan"}
- {:lifecycle/task :write-lines
- :lifecycle/calls ::log-calls
- :lifecycle/doc "Lifecycle for printing the output of a task's batch"}])
+(defn build-lifecycles
+ "Put your environment-independent lifecycles here"
+ [])
diff --git a/src/desdemona/plugins/http_reader.clj b/src/desdemona/plugins/http_reader.clj
deleted file mode 100644
index 2df36ad..0000000
--- a/src/desdemona/plugins/http_reader.clj
+++ /dev/null
@@ -1,87 +0,0 @@
-(ns desdemona.plugins.http-reader
- (:require [clojure.core.async :refer [chan >!! !! ch {:line x}))
- (>!! ch :done))
- (catch Exception e
- (.printStackTrace e))))]
- {:http/fut fut
- :http/chan ch
- :http/retry-ch (chan 1000)
- :http/drained? (atom false)
- :http/pending-messages (atom {})}))
-
-(def reader-calls
- {:lifecycle/before-task-start inject-reader})
-
-(defrecord HttpReader []
- p-ext/Pipeline
- (write-batch
- [this event]
- (function/write-batch event))
-
- (read-batch [_ {:keys [onyx.core/task-map http/chan http/retry-ch http/pending-messages http/drained?] :as event}]
- (let [pending (count @pending-messages)
- max-pending (or (:onyx/max-pending task-map) (:onyx/max-pending defaults))
- batch-size (:onyx/batch-size task-map)
- max-segments (min (- max-pending pending) batch-size)
- ms (or (:onyx/batch-timeout task-map) (:onyx/batch-timeout defaults))
- step-ms (/ ms (:onyx/batch-size task-map))
- timeout-ch (timeout ms)
- batch (if (zero? max-segments)
- (!! retry-ch msg)
- (swap! pending-messages dissoc message-id)))
-
- (pending?
- [_ {:keys [http/pending-messages]} message-id]
- (get @pending-messages message-id))
-
- (drained?
- [_ {:keys [http/drained?]}]
- @drained?))
-
-(defn reader [pipeline-data]
- (->HttpReader))
diff --git a/src/desdemona/tasks/core_async.clj b/src/desdemona/tasks/core_async.clj
new file mode 100644
index 0000000..f30ecff
--- /dev/null
+++ b/src/desdemona/tasks/core_async.clj
@@ -0,0 +1,80 @@
+(ns desdemona.tasks.core-async
+ (:require [clojure.core.async :refer [chan sliding-buffer >!!]]
+ [onyx.plugin.core-async :refer [take-segments!]]
+ [taoensso.timbre :refer [info]]
+ [clojure.set :refer [join]]
+ [cheshire.core :as json]))
+
+(def channels (atom {}))
+
+(def default-channel-size 1000)
+
+(defn get-channel
+ ([id] (get-channel id nil))
+ ([id size]
+ (if-let [id (get @channels id)]
+ id
+ (do (swap! channels assoc id (chan (or size default-channel-size)))
+ (get-channel id)))))
+
+(defn inject-in-ch
+ [_ lifecycle]
+ {:core.async/chan (get-channel (:core.async/id lifecycle)
+ (or (:core.async/size lifecycle) default-channel-size))})
+(defn inject-out-ch
+ [_ lifecycle]
+ {:core.async/chan (get-channel (:core.async/id lifecycle)
+ (or (:core.async/size lifecycle) default-channel-size))})
+
+(def in-calls
+ {:lifecycle/before-task-start inject-in-ch})
+
+(def out-calls
+ {:lifecycle/before-task-start inject-out-ch})
+
+(defn get-core-async-channels
+ [{:keys [catalog lifecycles]}]
+ (let [lifecycle-catalog-join (join catalog lifecycles {:onyx/name :lifecycle/task})]
+ (reduce (fn [acc item]
+ (assoc acc
+ (:onyx/name item)
+ (get-channel (:core.async/id item)))) {} (filter :core.async/id lifecycle-catalog-join))))
+
+(defn add-core-async-input
+ ([job task opts] (add-core-async-input job task opts default-channel-size))
+ ([job task opts chan-size]
+ (-> job
+ (update :catalog conj (merge {:onyx/name task
+ :onyx/plugin :onyx.plugin.core-async/input
+ :onyx/type :input
+ :onyx/medium :core.async
+ ;:onyx/batch-size batch-size
+ :onyx/max-peers 1
+ :onyx/doc "Reads segments from a core.async channel"}
+ opts))
+ (update :lifecycles into [{:lifecycle/task task
+ :lifecycle/calls ::in-calls
+ :core.async/id (java.util.UUID/randomUUID)
+ :core.async/size chan-size}
+ {:lifecycle/task task
+ :lifecycle/calls :onyx.plugin.core-async/reader-calls}]))))
+
+(defn add-core-async-output
+ ([job task opts] (add-core-async-output job task opts default-channel-size))
+ ([job task opts chan-size]
+ (-> job
+ (update :catalog conj (merge {:onyx/name task
+ :onyx/plugin :onyx.plugin.core-async/output
+ :onyx/type :output
+ :onyx/medium :core.async
+ :onyx/max-peers 1
+ ;:onyx/batch-size batch-size
+ :onyx/doc "Writes segments to a core.async channel"}
+ opts))
+
+ (update :lifecycles into [{:lifecycle/task task
+ :core.async/id (java.util.UUID/randomUUID)
+ :core.async/size (inc chan-size)
+ :lifecycle/calls ::out-calls}
+ {:lifecycle/task task
+ :lifecycle/calls :onyx.plugin.core-async/writer-calls}]))))
diff --git a/src/desdemona/tasks/file_input.clj b/src/desdemona/tasks/file_input.clj
new file mode 100644
index 0000000..28d6388
--- /dev/null
+++ b/src/desdemona/tasks/file_input.clj
@@ -0,0 +1,28 @@
+(ns desdemona.tasks.file-input
+ (:require [taoensso.timbre :refer [info]]))
+
+(defn inject-in-reader [event lifecycle]
+ (let [filename (:filename (:onyx.core/task-map event))]
+ {:seq/seq (read-string (slurp filename))}))
+
+(def in-seq-calls
+ {:lifecycle/before-task-start inject-in-reader})
+
+(defn add-seq-file-input
+ "Adds task catalog entry and task lifecycles to use an edn file as an input"
+ [job task opts]
+ (-> job
+ (update :catalog conj (merge {:onyx/name task
+ :onyx/plugin :onyx.plugin.seq/input
+ :onyx/type :input
+ :onyx/medium :seq
+ ;:onyx/batch-size batch-size
+ ; :filename filename
+ :seq/checkpoint? true
+ :onyx/max-peers 1
+ :onyx/doc "Reads segments from seq"}
+ opts))
+ (update :lifecycles into [{:lifecycle/task task
+ :lifecycle/calls ::in-seq-calls}
+ {:lifecycle/task task
+ :lifecycle/calls :onyx.plugin.seq/reader-calls}])))
diff --git a/src/desdemona/tasks/kafka.clj b/src/desdemona/tasks/kafka.clj
new file mode 100644
index 0000000..e35b0f9
--- /dev/null
+++ b/src/desdemona/tasks/kafka.clj
@@ -0,0 +1,69 @@
+(ns desdemona.tasks.kafka
+ (:require [taoensso.timbre :refer [info]]
+ [cheshire.core :as json]))
+
+(defn deserialize-message-json [bytes]
+ (try
+ (json/parse-string (String. bytes "UTF-8"))
+ (catch Exception e
+ {:error e})))
+
+(defn deserialize-message-edn [bytes]
+ (try
+ (read-string (String. bytes "UTF-8"))
+ (catch Exception e
+ {:error e})))
+
+(defn deserialize-message-raw [bytes]
+ (try
+ {:line (String. bytes "UTF-8")}
+ (catch Exception e
+ {:error e})))
+
+(defn serialize-message-json [segment]
+ (.getBytes (json/generate-string segment)))
+
+(defn serialize-message-edn [segment]
+ (.getBytes segment))
+
+(defn add-kafka-input
+ "Instrument a job with Kafka lifecycles and catalog entries."
+ [job task opts]
+ (-> job
+ (update :catalog conj (merge {:onyx/name task
+ :onyx/plugin :onyx.plugin.kafka/read-messages
+ :onyx/type :input
+ :onyx/medium :kafka
+ ;:kafka/topic "topic"
+ ;:kafka/group-id "group-id"
+ :kafka/fetch-size 307200
+ :kafka/chan-capacity 1000
+ ;:kafka/zookeeper "zookeeper-addr"
+ :kafka/offset-reset :smallest
+ ;:kafka/force-reset? true
+ :kafka/empty-read-back-off 500
+ :kafka/commit-interval 500
+ ;:kafka/deserializer-fn ::deserialize-message-json
+ ;:onyx/batch-size 100
+ :onyx/doc "Reads messages from a Kafka topic"}
+ opts))
+ (update :lifecycles conj {:lifecycle/task task
+ :lifecycle/calls :onyx.plugin.kafka/read-messages-calls})))
+
+(defn add-kafka-output
+ "Instrument a job with Kafka lifecycles and catalog entries."
+ [job task opts]
+ (-> job
+ (update :catalog conj (merge {:onyx/name task
+ :onyx/plugin :onyx.plugin.kafka/write-messages
+ :onyx/type :output
+ :onyx/medium :kafka
+ ;:onyx/batch-size batch-size
+ ;:kafka/topic topic
+ ;:kafka/zookeeper zookeeper-addr
+ ;:kafka/serializer-fn (expand-serializer-fn serializer-fn)
+ :kafka/request-size 307200
+ :onyx/doc "Writes messages to a Kafka topic"}
+ opts))
+ (update :lifecycles conj {:lifecycle/task task
+ :lifecycle/calls :onyx.plugin.kafka/write-messages-calls})))
diff --git a/src/desdemona/tasks/sql.clj b/src/desdemona/tasks/sql.clj
new file mode 100644
index 0000000..8709fe1
--- /dev/null
+++ b/src/desdemona/tasks/sql.clj
@@ -0,0 +1,52 @@
+(ns desdemona.tasks.sql
+ (:require [schema.core :as s]
+ [taoensso.timbre :refer [info]]))
+
+;; TODO, add read-rows function task
+
+(s/defn add-sql-partition-input
+ "Adds an sql patition input task to a job"
+ [job task opts]
+ (-> job
+ (update :catalog conj (merge {:onyx/name task
+ :onyx/plugin :onyx.plugin.sql/partition-keys
+ :onyx/type :input
+ :onyx/medium :sql
+ ;:onyx/batch-size batch-size
+ ;:sql/classname classname
+ ;:sql/subprotocol subprotocol
+ ;:sql/subname subname
+ ;:sql/user user
+ ;:sql/password password
+ ;:sql/table table-name ; e.g. :your-table
+ ;:sql/id id-column ; e.g. :an-id-column
+ :sql/columns [:*]
+ ;; 500 * 1000 = 50,000 rows
+ ;; to be processed within :onyx/pending-timeout, 60s by default
+ :sql/rows-per-segment 500
+ :onyx/max-pending 1000
+ :onyx/max-peers 1
+ :onyx/doc "Partitions a range of primary keys into subranges"}
+ opts))
+ (update :lifecycles conj {:lifecycle/task task
+ :lifecycle/calls :onyx.plugin.sql/read-rows-calls})))
+
+(defn add-sql-insert-output
+ "Adds an sql insert rows output task to a job"
+ [job task opts]
+ (-> job
+ (update :catalog conj (merge {:onyx/name task
+ :onyx/plugin :onyx.plugin.sql/write-rows
+ :onyx/type :output
+ :onyx/medium :sql
+ ;:onyx/batch-size batch-size
+ ;:sql/classname classname
+ ;:sql/subprotocol subprotocol
+ ;:sql/subname subname
+ ;:sql/user user
+ ;:sql/password password
+ ;:sql/table table-name
+ :onyx/doc "Writes segments from the :rows keys to the SQL database"}
+ opts))
+ (update :lifecycles conj {:lifecycle/task task
+ :lifecycle/calls :onyx.plugin.sql/write-rows-calls})))
diff --git a/src/desdemona/utils.clj b/src/desdemona/utils.clj
index a30f3bc..29dce09 100644
--- a/src/desdemona/utils.clj
+++ b/src/desdemona/utils.clj
@@ -1,8 +1,6 @@
(ns desdemona.utils
- (:require [clojure.test :refer [is]]
- [clojure.core.async :refer [chan sliding-buffer >!!]]
- [clojure.java.io :refer [resource]]
- [onyx.plugin.core-async :refer [take-segments!]]))
+ (:require [clojure.core.async :refer [chan sliding-buffer >!!]]
+ [clojure.java.io :refer [resource]]))
;;;; Test utils ;;;;
@@ -12,12 +10,6 @@
(def zk-str (str zk-address ":" zk-port))
-(defn only [coll]
- (assert (not (next coll)))
- (if-let [result (first coll)]
- result
- (assert false)))
-
(defn find-task
"Finds the catalog entry where the :onyx/name key equals task-name"
[catalog task-name]
@@ -27,27 +19,6 @@
{:catalog catalog :task-name task-name})))
(first matches)))
-(defn update-task
- "Finds the catalog entry with :onyx/name task-name
- and applies f to it, returning the full catalog with the
- transformed catalog entry"
- [catalog task-name f]
- (mapv (fn [entry]
- (if (= task-name (:onyx/name entry))
- (f entry)
- entry))
- catalog))
-
-(defn add-to-job
- "Adds to the catalog and lifecycles of a job in form
- {:workflow ...
- :catalog ...
- :lifecycles ...}"
- [job {:keys [catalog lifecycles]}]
- (-> job
- (update :catalog into catalog)
- (update :lifecycles into lifecycles)))
-
(defn n-peers
"Takes a workflow and catalog, returns the minimum number of peers
needed to execute this job."
@@ -58,52 +29,6 @@
(+ sum (or (:onyx/min-peers (find-task catalog t)) 1)))
0 task-set)))
-(defn segments-equal?
- "Onyx is a parallel, distributed system - so ordering isn't guaranteed.
- Does an unordered comparison of segments to check for equality."
- [expected actual]
- (is (= (set expected) (set (remove (partial = :done) actual))))
- (is (= :done (last actual)))
- (is (= (dec (count actual)) (count expected))))
-
-(defn load-peer-config [onyx-id]
- (assoc (-> "dev-peer-config.edn" resource slurp read-string)
- :onyx/id onyx-id
- :zookeeper/address zk-str))
-
-(defn load-env-config [onyx-id]
- (assoc (-> "env-config.edn" resource slurp read-string)
- :onyx/id onyx-id
- :zookeeper/address zk-str
- :zookeeper.server/port zk-port))
-
-(defn in-memory-catalog
- "Takes a catalog and a set of input/output task names,
- returning a new catalog with all I/O catalog entries
- that were specified turned into core.async plugins. The
- core.async entries preserve all non-Onyx parameters."
- [catalog tasks]
- (mapv
- (fn [entry]
- (cond (and (some #{(:onyx/name entry)} tasks) (= (:onyx/type entry) :input))
- (merge
- entry
- {:onyx/plugin :onyx.plugin.core-async/input
- :onyx/type :input
- :onyx/medium :core.async
- :onyx/max-peers 1
- :onyx/doc "Reads segments from a core.async channel"})
- (and (some #{(:onyx/name entry)} tasks) (= (:onyx/type entry) :output))
- (merge
- entry
- {:onyx/plugin :onyx.plugin.core-async/output
- :onyx/type :output
- :onyx/medium :core.async
- :onyx/max-peers 1
- :onyx/doc "Writes segments to a core.async channel"})
- :else entry))
- catalog))
-
;;;; Lifecycles utils ;;;;
(def input-channel-capacity 10000)
@@ -118,13 +43,6 @@
(memoize
(fn [id] (chan (sliding-buffer output-channel-capacity)))))
-(defn channel-id-for [lifecycles task-name]
- (->> lifecycles
- (filter #(= task-name (:lifecycle/task %)))
- (map :core.async/id)
- (remove nil?)
- (first)))
-
(defn inject-in-ch [event lifecycle]
{:core.async/chan (get-input-channel (:core.async/id lifecycle))})
@@ -138,46 +56,5 @@
{:lifecycle/before-task-start inject-out-ch})
;;; Stubs lifecycles to use core.async IO, instead of, say, Kafka or Datomic.
-(defn in-memory-lifecycles
- [lifecycles catalog tasks]
- (vec
- (mapcat
- (fn [{:keys [lifecycle/task lifecycle/replaceable?] :as lifecycle}]
- (let [catalog-entry (find-task catalog task)]
- (cond (and (some #{task} tasks) replaceable?
- (= (:onyx/type catalog-entry) :input))
- [{:lifecycle/task task
- :lifecycle/calls ::in-calls
- :core.async/id (java.util.UUID/randomUUID)}
- {:lifecycle/task task
- :lifecycle/calls :onyx.plugin.core-async/reader-calls}]
- (and (some #{task} tasks) replaceable?
- (= (:onyx/type catalog-entry) :output))
- [{:lifecycle/task task
- :lifecycle/calls ::out-calls
- :core.async/id (java.util.UUID/randomUUID)}
- {:lifecycle/task task
- :lifecycle/calls :onyx.plugin.core-async/writer-calls}]
- :else [lifecycle])))
- lifecycles)))
-
-(defn bind-inputs! [lifecycles mapping]
- (doseq [[task segments] mapping]
- (let [in-ch (get-input-channel (channel-id-for lifecycles task))
- n-segments (count segments)]
- (when (< input-channel-capacity n-segments)
- (throw (ex-info "Input channel capacity is smaller than bound inputs. Capacity can be adjusted in utils.clj"
- {:channel-size input-channel-capacity
- :n-segments n-segments})))
- (when-not ((set (map :lifecycle/task lifecycles)) task)
- (throw (ex-info (str "Cannot bind input for task " task " as lifecycles are missing. Check that inputs are being bound to the correct task name.")
- {:input task
- :lifecycles lifecycles})))
- (doseq [segment segments]
- (>!! in-ch segment))
- (>!! in-ch :done))))
-
-(defn collect-outputs! [lifecycles output-tasks]
- (->> output-tasks
- (map #(get-output-channel (channel-id-for lifecycles %)))
- (map take-segments!)))
+
+
diff --git a/src/desdemona/workflows/sample_workflow.clj b/src/desdemona/workflows/sample_workflow.clj
index cb54ccb..2f30771 100644
--- a/src/desdemona/workflows/sample_workflow.clj
+++ b/src/desdemona/workflows/sample_workflow.clj
@@ -3,7 +3,7 @@
;;; The workflow of an Onyx job describes the graph of all possible
;;; tasks that data can flow between.
-(def workflow
- [[:read-lines :format-line]
- [:format-line :upper-case]
- [:upper-case :write-lines]])
+(defn build-workflow []
+ [[:read-lines :extract-line-info]
+ [:extract-line-info :prepare-rows]
+ [:prepare-rows :write-lines]])
diff --git a/test/desdemona/functions/sample_functions_test.clj b/test/desdemona/functions/sample_functions_test.clj
new file mode 100644
index 0000000..6ff0f11
--- /dev/null
+++ b/test/desdemona/functions/sample_functions_test.clj
@@ -0,0 +1,13 @@
+(ns desdemona.functions.sample-functions-test
+ (:require [clojure.test :refer [deftest is]]
+ [desdemona.functions.sample-functions :refer [transform-segment-shape prepare-rows]]))
+
+(deftest transform-segment-shape-test
+ (let [got (transform-segment-shape {"line" [:line]} {:line "this is a log line"})
+ expected {"line" "this is a log line"}]
+ (is (= got expected))))
+
+(deftest prepare-rows-test
+ (let [got (prepare-rows {"line" "this is a log line"})
+ expected {:rows [{"line" "this is a log line"}]}]
+ (is (= got expected))))
diff --git a/test/desdemona/jobs/sample_job_test.clj b/test/desdemona/jobs/sample_job_test.clj
index 2a7d4fa..f1794dd 100644
--- a/test/desdemona/jobs/sample_job_test.clj
+++ b/test/desdemona/jobs/sample_job_test.clj
@@ -1,53 +1,27 @@
(ns desdemona.jobs.sample-job-test
(:require [clojure.test :refer [deftest is]]
- [clojure.core.async :refer [>!!]]
- [clojure.java.io :refer [resource]]
- [com.stuartsierra.component :as component]
- [desdemona.launcher.dev-system :refer [onyx-dev-env]]
- [desdemona.workflows.sample-workflow :refer [workflow]]
- [desdemona.catalogs.sample-catalog :refer [build-catalog] :as sc]
- [desdemona.lifecycles.sample-lifecycle :refer [build-lifecycles] :as sl]
- [desdemona.plugins.http-reader]
- [desdemona.functions.sample-functions]
- [desdemona.dev-inputs.sample-input :as dev-inputs]
- [desdemona.utils :as u]
- [onyx.api]))
+ [desdemona.jobs.sample-submit-job :refer [build-job]]
+ [desdemona.utils :refer [find-task]]
+ ; Make the plugins load
+ [onyx.plugin.kafka]
+ [onyx.plugin.seq]
+ [onyx.plugin.sql]))
-(deftest test-sample-dev-job
- (try
- (let [stubs [:read-lines :write-lines]
- catalog (u/in-memory-catalog (build-catalog) stubs)
- lifecycles (u/in-memory-lifecycles (build-lifecycles) catalog stubs)]
- (user/go (u/n-peers catalog workflow))
- (u/bind-inputs! lifecycles {:read-lines dev-inputs/lines})
- (let [peer-config (u/load-peer-config (:onyx-id user/system))
- job {:workflow workflow
- :catalog catalog
- :lifecycles lifecycles
- :task-scheduler :onyx.task-scheduler/balanced}]
- (onyx.api/submit-job peer-config job)
- (let [[results] (u/collect-outputs! lifecycles [:write-lines])]
- (is (seq results)))))
- (catch InterruptedException e
- (Thread/interrupted))
- (finally
- (user/stop))))
-
-(deftest test-sample-prod-job
- (try
- (let [catalog (build-catalog 20 500)
- lifecycles (build-lifecycles)]
- (user/go (u/n-peers catalog workflow))
- (u/bind-inputs! lifecycles {:read-lines dev-inputs/lines})
- (let [peer-config (u/load-peer-config (:onyx-id user/system))
- job {:workflow workflow
- :catalog catalog
- :lifecycles lifecycles
- :task-scheduler :onyx.task-scheduler/balanced}]
- (onyx.api/submit-job peer-config job)
- (let [[results] (u/collect-outputs! lifecycles [:write-lines])]
- (is (seq results)))))
- (catch InterruptedException e
- (Thread/interrupted))
- (finally
- (user/stop))))
+(deftest build-job-test
+ (let [job (build-job)
+ expected-catalog-names [:extract-line-info :prepare-rows :read-lines :write-lines]
+ catalog (job :catalog)
+ workflow (job :workflow)
+ expected-workflow [[:read-lines :extract-line-info]
+ [:extract-line-info :prepare-rows]
+ [:prepare-rows :write-lines]]
+ lifecycles (job :lifecycles)
+ expected-lifecycles [{:lifecycle/task :write-lines :lifecycle/calls :desdemona.lifecycles.logging/log-calls}
+ {:lifecycle/task :read-lines :lifecycle/calls :desdemona.lifecycles.logging/log-calls}
+ {:lifecycle/task :write-lines :lifecycle/calls :onyx.plugin.sql/write-rows-calls}
+ {:lifecycle/task :read-lines, :lifecycle/calls :onyx.plugin.kafka/read-messages-calls}]]
+ (is (= (map :onyx/name catalog) expected-catalog-names))
+ (is (= ((find-task catalog :read-lines) :kafka/topic) "test1"))
+ (is (= ((find-task catalog :write-lines) :sql/table) :logLines))
+ (is (= workflow expected-workflow))
+ (is (= lifecycles expected-lifecycles))))
diff --git a/test/desdemona/tasks/kafka_test.clj b/test/desdemona/tasks/kafka_test.clj
new file mode 100644
index 0000000..18fa437
--- /dev/null
+++ b/test/desdemona/tasks/kafka_test.clj
@@ -0,0 +1,12 @@
+(ns desdemona.tasks.kafka-test
+ (:require [clojure.test :refer [deftest is]]
+ [desdemona.tasks.kafka :refer [deserialize-message-raw]]))
+
+(deftest deserialize-message-raw-test
+ (let [got (deserialize-message-raw (.getBytes "this is raw text"))
+ expected {:line "this is raw text"}]
+ (is (= got expected))))
+
+(deftest deserialize-message-raw-fails-test
+ (let [got (deserialize-message-raw "this should be bytes")]
+ (is (= (.getMessage (got :error)) "No matching ctor found for class java.lang.String"))))
diff --git a/test/desdemona/workflows/sample_workflow_test.clj b/test/desdemona/workflows/sample_workflow_test.clj
new file mode 100644
index 0000000..807b56a
--- /dev/null
+++ b/test/desdemona/workflows/sample_workflow_test.clj
@@ -0,0 +1,10 @@
+(ns desdemona.workflows.sample-workflow-test
+ (:require [clojure.test :refer [deftest is]]
+ [desdemona.workflows.sample-workflow :refer [build-workflow]]))
+
+(deftest build-workflow-test
+ (let [expected [[:read-lines :extract-line-info]
+ [:extract-line-info :prepare-rows]
+ [:prepare-rows :write-lines]]
+ got (build-workflow)]
+ (is (= got expected))))