From 0d91ffa4c62deb0bb0b38f58c5e9962f5e5ff9ff Mon Sep 17 00:00:00 2001 From: Sean Schulte Date: Tue, 2 Feb 2016 17:56:12 -0600 Subject: [PATCH 01/17] Kafka ingestion of syslog-ng. Backported Kafka support from Onyx. Ingesting syslog via Kafka and saving it to MySQL. Running a syslog-ng relay that receives syslog and passes it to Kafka. --- Dockerfile | 11 +- docker-compose.yml | 52 +++++ env/dev/desdemona/dev_inputs/sample_input.clj | 76 ++------ env/dev/user.clj | 25 --- project.clj | 9 +- resources/config.edn | 24 +++ resources/prod-peer-config.edn | 8 - resources/sample_input.edn | 21 ++ resources/table.sql | 1 + script/build.sh | 0 script/run-peers.sh | 10 - script/run_aeron.sh | 7 + script/run_peers.sh | 8 + script/syslog-ng/Dockerfile | 24 +++ script/syslog-ng/openjdk-libjvm.conf | 1 + script/syslog-ng/syslog-ng.conf | 30 +++ src/desdemona/catalogs/sample_catalog.clj | 58 +++--- src/desdemona/functions/sample_functions.clj | 21 +- src/desdemona/jobs/sample_submit_job.clj | 87 ++++++--- src/desdemona/launcher/aeron_media_driver.clj | 33 +++- src/desdemona/launcher/dev_system.clj | 62 ------ src/desdemona/launcher/launch_prod_peers.clj | 63 +++--- .../launcher/submit_prod_sample_job.clj | 19 -- src/desdemona/lifecycles/logging.clj | 19 ++ src/desdemona/lifecycles/metrics.clj | 9 + src/desdemona/lifecycles/sample_lifecycle.clj | 36 +--- src/desdemona/plugins/http_reader.clj | 87 --------- src/desdemona/tasks/core_async.clj | 81 ++++++++ src/desdemona/tasks/file_input.clj | 28 +++ src/desdemona/tasks/kafka.clj | 69 +++++++ src/desdemona/tasks/sql.clj | 52 +++++ src/desdemona/utils.clj | 183 ------------------ src/desdemona/workflows/sample_workflow.clj | 17 +- test/desdemona/jobs/sample_job_test.clj | 75 +++---- 34 files changed, 671 insertions(+), 635 deletions(-) create mode 100644 docker-compose.yml delete mode 100644 env/dev/user.clj create mode 100644 resources/config.edn delete mode 100644 resources/prod-peer-config.edn create mode 100644 resources/sample_input.edn create mode 100644 resources/table.sql mode change 100644 => 100755 script/build.sh delete mode 100755 script/run-peers.sh create mode 100755 script/run_aeron.sh create mode 100755 script/run_peers.sh create mode 100644 script/syslog-ng/Dockerfile create mode 100644 script/syslog-ng/openjdk-libjvm.conf create mode 100644 script/syslog-ng/syslog-ng.conf delete mode 100644 src/desdemona/launcher/dev_system.clj delete mode 100644 src/desdemona/launcher/submit_prod_sample_job.clj create mode 100644 src/desdemona/lifecycles/logging.clj create mode 100644 src/desdemona/lifecycles/metrics.clj delete mode 100644 src/desdemona/plugins/http_reader.clj create mode 100644 src/desdemona/tasks/core_async.clj create mode 100644 src/desdemona/tasks/file_input.clj create mode 100644 src/desdemona/tasks/kafka.clj create mode 100644 src/desdemona/tasks/sql.clj delete mode 100644 src/desdemona/utils.clj diff --git a/Dockerfile b/Dockerfile index 16a3a73..1668dc7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,10 +8,17 @@ RUN apt-get update # Auto-accept the Oracle JDK license RUN echo oracle-java8-installer shared/accepted-oracle-license-v1-1 select true | sudo /usr/bin/debconf-set-selections +RUN mkdir /etc/service/onyx_peer +RUN mkdir /etc/service/aeron + RUN apt-get install -y oracle-java8-installer ADD target/desdemona-0.1.0-SNAPSHOT-standalone.jar /srv/desdemona.jar -ADD script/run-peers.sh /srv/run-peers.sh +ADD script/run_peers.sh /etc/service/onyx_peer/run +ADD script/run_aeron.sh /etc/service/aeron/run + +EXPOSE 40200/tcp +EXPOSE 40200/udp -ENTRYPOINT ["/bin/sh", "/srv/run-peers.sh"] +CMD ["/sbin/my_init"] diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..09d8b58 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,52 @@ +peer: + image: 'desdemona:0.1.0' + links: + - zookeeper:zk + - kafka:kafka + - db:db + - syslog-ng:syslog-ng + environment: + ONYX_ID: 1 + NPEERS: 6 + privileged: true + expose: + - "40200" + - "40200/udp" +zookeeper: + image: 'wurstmeister/zookeeper' + ports: + - '2181:2181' +kafka: + image: 'wurstmeister/kafka' + environment: + KAFKA_BROKER_ID: 1 + HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'" + links: + - zookeeper:zk + ports: + - "9092:9092" + volumes: + - /var/run/docker.sock:/var/run/docker.sock +db: + image: mysql + environment: + MYSQL_ALLOW_EMPTY_PASSWORD: 'true' + MYSQL_USER: onyx + MYSQL_PASSWORD: onyx + MYSQL_DATABASE: logs + ports: + - "3306:3306" +syslog-ng: + build: script/syslog-ng + links: + - kafka:kafka + volumes: + - ./script/syslog-ng/syslog-ng.conf:/etc/syslog-ng/syslog-ng.conf + ports: + - "601:601" +#kafkacat: +# build: script/kafka-meetup-streamer +# links: +# - kafka:kafka +# environment: +# BROKER_LIST: kafka:9092 diff --git a/env/dev/desdemona/dev_inputs/sample_input.clj b/env/dev/desdemona/dev_inputs/sample_input.clj index 555ff69..b8215db 100644 --- a/env/dev/desdemona/dev_inputs/sample_input.clj +++ b/env/dev/desdemona/dev_inputs/sample_input.clj @@ -4,69 +4,15 @@ ;; Warning, I haven't read it. :) (def lines - [{:line " THE ADVENTURES OF TOM THUMB"} + [{:line "2016-01-29T21:44:01+00:00 192.168.99.1 102 <1> Jan 29 15:44:01 lips.local sean[68673]: hello ernie, it's 2016-01-29 15:44:01.150100217 -0600 CST"} {:line ""} - {:line " Once upon a time . . . there lived a giant who had quarrelled with a very "} - {:line "greedy wizard over sharing a treasure. After the quarrel, the giant said "} - {:line "menacingly to the wizard:"} - {:line " \"I could crush you under my thumb if I wanted to! Now, get out of my "} - {:line "sight!\" The wizard hurried away, but from a safe distance, he hurled his "} - {:line "terrible revenge."} - {:line " \"Abracadabra! Here I cast this spell! May the son, your wife will shortly "} - {:line "give you, never grow any taller than my own thumb!\""} - {:line " After Tom Thumb was born, his parents were at their wits' end. They could "} - {:line "never find him, for they could barely see him. They had to speak in whispers "} - {:line "for fear of deafening the little boy. Tom Thumb preferred playing with the "} - {:line "little garden creatures, to the company of parents so different from himself. "} - {:line "He rode piggyback on the snail and danced with the ladybirds. Tiny as he was, "} - {:line "he had great fun in the world of little things."} - {:line " But one unlucky day, he went to visit a froggy friend. No sooner had he "} - {:line "scrambled onto a leaf than a large pike swallowed him up. But the pike too was"} - {:line "fated to come to a very bad end. A little later, he took the bait cast by one "} - {:line "of the King's fishermen, and before long, found himself under the cook's knife"} - {:line "in the royal kitchens. And great was everyone's surprise when, out of the "} - {:line "fish's stomach, stepped Tom Thumb, quite alive and little the worse for his "} - {:line "adventure."} - {:line " \"What am I to do with this tiny lad?\" said the cook to himself. Then he had"} - {:line "a brainwave. \"He can be a royal pageboy! He's so tiny, I can pop him into the "} - {:line "cake I'm making. When he marches across the bridge, sounding the trumpet "} - {:line "everyone will gasp in wonder!\" Never had such a marvel been seen at Court. The"} - {:line "guests clapped excitedly at the cook's skill and the King himself clapped "} - {:line "loudest of all. The King rewarded the clever cook with a bag of gold. Tom "} - {:line "Thumb was even luckier. The cook made him a pageboy, and a pageboy he remained,"} - {:line "enjoying all the honours of his post."} - {:line " He had a white mouse for a mount, a gold pin for a sword and he was allowed"} - {:line "to eat the King's food. In exchange, he marched up and down the table at "} - {:line "banquets. He picked his way amongst the plates and glasses amusing the guests"} - {:line "with his trumpet."} - {:line " What Tom Thumb didn't know was that he had made an enemy. The cat which, "} - {:line "until Tom's arrival, had been the King's pet, was now forgotten. And, vowing "} - {:line "to have its revenge on the newcomer, it ambushed Tom in the garden. When Tom "} - {:line "saw the cat, he did not run away, as the creature had intended. He whipped out"} - {:line "his gold pin and cried to his white mouse mount:"} - {:line " \"Charge! Charge!\" Jabbed by the tiny sword, the cat turned tail and fled. "} - {:line "Since brute force was not the way to revenge, the cat decided to use guile. "} - {:line "Casually pretending to bump into the King as he walked down the staircase, the"} - {:line "cat softly miaowed:"} - {:line " \"Sire! Be on your guard! A plot is being hatched against your life!\" And"} - {:line "then he told a dreadful lie. \"Tom Thumb is planning to lace your food with "} - {:line "hemlock. I saw him picking the leaves in the garden the other day. heard him "} - {:line "say these very words!\""} - {:line " Now, the King had once been kept in bed with very bad tummy pains, after "} - {:line "eating too many chernes and he feared the thought of being poisoned, so he "} - {:line "sent for Tom Thumb. The cat provided proof of his words by pulling a hemlock "} - {:line "leaf from under the white mouse's saddle cloth, where he had hidden it "} - {:line "himself."} - {:line " Tom Thumb was so amazed, he was at a loss for words to deny what the cat "} - {:line "had said. The King, withiut further ado, had him thrown into prison. And since"} - {:line "he was so tiny, they locked him up in a pendulum clock. The hours passed and"} - {:line "the days too. Tom's only pastime was swinging back and forth, clinging to the "} - {:line "pendulum, util the night when he attracted the attention of a big night moth,"} - {:line "fluttering round the room."} - {:line " \"Let me out!\" cried Tom Thumb, tapping on the glass. As it so happens, the "} - {:line "moth had only just been set free after being a prisoner in a large box, in "} - {:line "which she had taken a nap. So she took pity on Tom Thumb and released him."} - {:line " 'll take you to the Butterfly Kingdom, where everyone's tiny like burself. "} - {:line "They'll take care of you there!\" And that is what happened. To this day, if "} - {:line "you visit the Butterfly Kingdom, you can ask to see the Butterfly monument "} - {:line "that Tom Thumb built after this amazing adventrure."}]) + {:line "2016-01-29T21:44:02+00:00 192.168.99.1 102 <1> Jan 29 15:44:01 lips.local sean[68673]: hello ernie, it's 2016-01-29 15:44:01.654799006 -0600 CST"} + {:line ""} + {:line "2016-01-29T21:44:02+00:00 192.168.99.1 102 <1> Jan 29 15:44:02 lips.local sean[68673]: hello ernie, it's 2016-01-29 15:44:02.159437592 -0600 CST"} + {:line ""} + {:line "2016-01-29T21:44:03+00:00 192.168.99.1 102 <1> Jan 29 15:44:02 lips.local sean[68673]: hello ernie, it's 2016-01-29 15:44:02.662554386 -0600 CST"} + {:line ""} + {:line "2016-01-29T21:44:03+00:00 192.168.99.1 102 <1> Jan 29 15:44:03 lips.local sean[68673]: hello ernie, it's 2016-01-29 15:44:03.164769137 -0600 CST"} + {:line ""} + {:line "2016-01-29T21:44:04+00:00 192.168.99.1 102 <1> Jan 29 15:44:03 lips.local sean[68673]: hello ernie, it's 2016-01-29 15:44:03.669196559 -0600 CST"}] + ) diff --git a/env/dev/user.clj b/env/dev/user.clj deleted file mode 100644 index 3893756..0000000 --- a/env/dev/user.clj +++ /dev/null @@ -1,25 +0,0 @@ -(ns user - (:require [clojure.tools.namespace.repl :refer [refresh set-refresh-dirs]] - [com.stuartsierra.component :as component] - [desdemona.launcher.dev-system :refer [onyx-dev-env]])) - -(set-refresh-dirs "src" "test") - -(def system nil) - -(defn init [n-peers] - (alter-var-root #'system (constantly (onyx-dev-env n-peers)))) - -(defn start [] - (alter-var-root #'system component/start)) - -(defn stop [] - (alter-var-root #'system (fn [s] (when s (component/stop s))))) - -(defn go [n-peers] - (init n-peers) - (start)) - -(defn reset [] - (stop) - (refresh)) diff --git a/project.clj b/project.clj index 523848b..549f841 100644 --- a/project.clj +++ b/project.clj @@ -4,7 +4,14 @@ :license {:name "Eclipse Public License" :url "http://www.eclipse.org/legal/epl-v10.html"} :dependencies [[org.clojure/clojure "1.8.0"] - [org.onyxplatform/onyx "0.8.2"]] + [org.onyxplatform/onyx "0.8.8"] + [org.onyxplatform/onyx-sql "0.8.8.0"] + [mysql/mysql-connector-java "5.1.18"] + [org.onyxplatform/onyx-kafka "0.8.8.0"] + [org.onyxplatform/onyx-seq "0.8.8.0"] + [cheshire "5.5.0"] + [org.clojure/tools.cli "0.3.3"] + [aero "0.1.3"]] :plugins [[lein-cljfmt "0.3.0"] [lein-cloverage "1.0.7-SNAPSHOT"]] :profiles {:uberjar {:aot [desdemona.launcher.aeron-media-driver diff --git a/resources/config.edn b/resources/config.edn new file mode 100644 index 0000000..269a8b5 --- /dev/null +++ b/resources/config.edn @@ -0,0 +1,24 @@ +{:env-config + {:onyx/id #cond {:default #env ONYX_ID + :dev "1"} + :onyx.bookkeeper/server? true + :onyx.bookkeeper/delete-server-data? true + :zookeeper/address #cond {:default #env [ZOOKEEPER "zk:2181"] + :test #env [ZOOKEEPER "127.0.0.1:2181"]} + :zookeeper/server? #cond {:default false + :test true} + :zookeeper.server/port 2181} + :peer-config + {:onyx/id #cond {:default #env ONYX_ID + :dev "1"} + :zookeeper/address #cond {:default #env [ZOOKEEPER "zk:2181"] + :dev #env [ZOOKEEPER "192.168.99.100:2181"]} + :onyx.peer/job-scheduler :onyx.job-scheduler/greedy + :onyx.peer/zookeeper-timeout 60000 + :onyx.messaging/allow-short-circuit? true + :onyx.messaging/impl :aeron + ;; Change "localhost" to a resolvable hostname + ;; by any node in your cluster. + :onyx.messaging/bind-addr #cond {:default #env [BIND_ADDR "localhost"]} + :onyx.messaging/peer-port 40200 + :onyx.messaging.aeron/embedded-driver? false}} diff --git a/resources/prod-peer-config.edn b/resources/prod-peer-config.edn deleted file mode 100644 index d640ef5..0000000 --- a/resources/prod-peer-config.edn +++ /dev/null @@ -1,8 +0,0 @@ -{:zookeeper/address "127.0.0.1:2188" - :onyx.peer/job-scheduler :onyx.job-scheduler/greedy - :onyx.messaging/impl :aeron - ;; Change "localhost" to a resolvable hostname - ;; by any node in your cluster. - :onyx.messaging/bind-addr "localhost" - :onyx.messaging/peer-port 40200 - :onyx.messaging.aeron/embedded-driver? false} diff --git a/resources/sample_input.edn b/resources/sample_input.edn new file mode 100644 index 0000000..658dc89 --- /dev/null +++ b/resources/sample_input.edn @@ -0,0 +1,21 @@ +[{"venue_visibility" "public", "payment_required" "0", "yes_rsvp_count" 4, "group" {"group_lon" -123.14, "country" "ca", "city" "Vancouver", "group_lat" 49.26, "id" 1734488, "group_photo" {"highres_link" "http://photos4.meetupstatic.com/photos/event/1/d/3/highres_431820467.jpeg", "photo_link" "http://photos4.meetupstatic.com/photos/event/1/d/3/600_431820467.jpeg", "photo_id" 431820467, "thumb_link" "http://photos2.meetupstatic.com/photos/event/1/d/3/thumb_431820467.jpeg"}, "name" "Kitsilano Business Leaders", "state" "BC", "category" {"name" "career/business", "id" 2, "shortname" "career-business"}, "urlname" "KitsilanoBusinessLeaders", "join_mode" "open"}, "id" "227142972", "utc_offset" -28800000, "visibility" "public", "name" "12 Steps How to Build Mobile App to Rock Your Business", "maybe_rsvp_count" 0, "status" "upcoming", "venue" {"country" "ca", "city" "Vancouver", "address_1" "2941 West Broadway", "name" "G&F Financial Group", "lon" -123.170868, "state" "BC", "lat" 49.26437}, "time" 1452783600000, "rsvp_limit" 60, "description" "

Do you have a mobile app for your business? Do you have an effective mobile strategy that involves more than just having a mobile-friendly website?

\n

Following Melanie Haselmayr's Forbes article, we are featuring two local entrepreneurs who will share the 12 steps they took to build a dynamite app for their businesses. http://www.forbes.com/sites/allbusiness/2014/11/17/heres-why-your-business-needs-its-own-mobile-app/

\n


\n

 Ria Mizera, founder of ClosetRelay,

\n

\n


\n


\n

\n

Kristoffer Vik Hansen, co-founder of Spare

\n


\n

Early risers, come and kickstart your day once a month with insightful discussions on current business challenges.

\n

MEETING SCHEDULE

\n

6:50 - 7:20 am: Sign in, get your coffee and muffin early. Be ready to develop business trust and camaraderie. 

\n

7:20 - 7:45 am: Muffin Sponsor (5 min.), Success Circle...15 second insight on who you are and how your business can help others to succeed. 

\n

7:45 - 8:30 am: Keynote presentation 

\n

8:30 - 9:00 am: Door prizes; share and learn camaraderie continues 

\n

DOOR PRIZE donors: Contact Judy if you have a product/service to promote.

\n

MUFFIN SPONSOR: Contact Judy how you can be muffin sponsor of the month and promote your business for 5 minutes to a captive audience.

\n
\n
", "event_url" "http://www.meetup.com/KitsilanoBusinessLeaders/events/227142972/", "mtime" 1450129667481} + {"venue_visibility" "members", "payment_required" "0", "yes_rsvp_count" 1, "group" {"group_lon" -111.75, "country" "us", "city" "Gilbert", "group_lat" 33.36, "id" 5391722, "group_photo" {"highres_link" "http://photos4.meetupstatic.com/photos/event/e/2/4/highres_437583620.jpeg", "photo_link" "http://photos4.meetupstatic.com/photos/event/e/2/4/600_437583620.jpeg", "photo_id" 437583620, "thumb_link" "http://photos2.meetupstatic.com/photos/event/e/2/4/thumb_437583620.jpeg"}, "name" "Chandler Gilbert Ladies and Gents Singles Over 45", "state" "AZ", "category" {"name" "parents/family", "id" 25, "shortname" "parents-family"}, "urlname" "meetup-group-HwhOSVfy", "join_mode" "open"}, "id" "227421769", "utc_offset" -25200000, "visibility" "public", "name" "Let's meetup for a walk...", "maybe_rsvp_count" 0, "status" "suggested", "time" 1452524400000, "duration" 5400000, "rsvp_limit" 0, "description" "

Let's meetup for some fun and fitness while walking at the Riparian Preserve.

", "event_url" "http://www.meetup.com/meetup-group-HwhOSVfy/events/227421769/", "mtime" 1450129667292} + {"venue_visibility" "members", + "payment_required" "0", + "yes_rsvp_count" 1, + "group" {"group_lon" -118.33, "country" "us", "city" "Los Angeles", "group_lat" 34.1, "id" 511174, + "group_photo" {"highres_link" "http://photos3.meetupstatic.com/photos/event/5/5/1/d/highres_436221789.jpeg", + "photo_link" "http://photos3.meetupstatic.com/photos/event/5/5/1/d/600_436221789.jpeg", + "photo_id" 436221789, + "thumb_link" "http://photos3.meetupstatic.com/photos/event/5/5/1/d/thumb_436221789.jpeg"}, + "name" "L.A. Foodies", "state" "CA", + "category" {"name" "food/drink", + "id" 10, "shortname" "food-drink"}, + "urlname" "Foodies", "join_mode" "approval"}, + "id" "227420766", "utc_offset" -28800000, "visibility" "public", + "name" "$85 five-course tasting menu @ Trois Mec!", + "maybe_rsvp_count" 0, "status" "upcoming", + "time" 1452304800000, "rsvp_limit" 6, + "event_url" "http://www.meetup.com/Foodies/events/227420766/", + "mtime" 1450129671307} + :done] diff --git a/resources/table.sql b/resources/table.sql new file mode 100644 index 0000000..d036c6e --- /dev/null +++ b/resources/table.sql @@ -0,0 +1 @@ +CREATE TABLE logLines (id int primary key auto_increment, line text); diff --git a/script/build.sh b/script/build.sh old mode 100644 new mode 100755 diff --git a/script/run-peers.sh b/script/run-peers.sh deleted file mode 100755 index d739053..0000000 --- a/script/run-peers.sh +++ /dev/null @@ -1,10 +0,0 @@ -#!/usr/bin/env bash -set -o errexit -set -o nounset -set -o xtrace - -echo "Setting shared memory for Aeron" -mount -t tmpfs -o remount,rw,nosuid,nodev,noexec,relatime,size=256M tmpfs /dev/shm -APP_NAME=$(echo "desdemona" | sed s/"-"/"_"/g) -java -cp /srv/desdemona.jar "$APP_NAME.launcher.aeron_media_driver" & -java -cp /srv/desdemona.jar "$APP_NAME.launcher.launch_prod_peers" $ONYX_ID $NPEERS diff --git a/script/run_aeron.sh b/script/run_aeron.sh new file mode 100755 index 0000000..58bdf86 --- /dev/null +++ b/script/run_aeron.sh @@ -0,0 +1,7 @@ +#!/bin/sh + +echo "Setting shared memory for Aeron" +mount -t tmpfs -o remount,rw,nosuid,nodev,noexec,relatime,size=256M tmpfs /dev/shm +APP_NAME=$(echo "desdemona" | sed s/"-"/"_"/g) + +exec java -cp /srv/desdemona.jar "$APP_NAME.launcher.aeron_media_driver" >>/var/log/aeron.log 2>&1 diff --git a/script/run_peers.sh b/script/run_peers.sh new file mode 100755 index 0000000..f297749 --- /dev/null +++ b/script/run_peers.sh @@ -0,0 +1,8 @@ +#!/usr/bin/env bash +set -o errexit +set -o nounset +set -o xtrace + +export BIND_ADDR=$(hostname --ip-address) +export APP_NAME=$(echo "desdemona" | sed s/"-"/"_"/g) +exec java -cp /srv/desdemona.jar "$APP_NAME.launcher.launch_prod_peers" $NPEERS diff --git a/script/syslog-ng/Dockerfile b/script/syslog-ng/Dockerfile new file mode 100644 index 0000000..371de53 --- /dev/null +++ b/script/syslog-ng/Dockerfile @@ -0,0 +1,24 @@ +FROM debian:latest +MAINTAINER Andras Mitzki + +RUN apt-get update -qq && apt-get install -y \ + wget + +RUN wget -qO - http://download.opensuse.org/repositories/home:/laszlo_budai:/syslog-ng/Debian_8.0/Release.key | apt-key add - +RUN echo 'deb http://download.opensuse.org/repositories/home:/laszlo_budai:/syslog-ng/Debian_8.0 ./' | tee --append /etc/apt/sources.list.d/syslog-ng-obs.list + +RUN apt-get update -qq && apt-get install -y \ + syslog-ng + +ADD openjdk-libjvm.conf /etc/ld.so.conf.d/openjdk-libjvm.conf +RUN ldconfig + +RUN wget -q http://artfiles.org/apache.org/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz -O /tmp/kafka_2.11-0.9.0.0.tgz +RUN tar xfz /tmp/kafka_2.11-0.9.0.0.tgz -C /opt +RUN ln -s /opt/kafka_2.11-0.9.0.0 /opt/kafka + +EXPOSE 514 +EXPOSE 601 +EXPOSE 6514 + +ENTRYPOINT ["/usr/sbin/syslog-ng", "-F"] diff --git a/script/syslog-ng/openjdk-libjvm.conf b/script/syslog-ng/openjdk-libjvm.conf new file mode 100644 index 0000000..433be80 --- /dev/null +++ b/script/syslog-ng/openjdk-libjvm.conf @@ -0,0 +1 @@ +/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/amd64/server/ diff --git a/script/syslog-ng/syslog-ng.conf b/script/syslog-ng/syslog-ng.conf new file mode 100644 index 0000000..c3924c5 --- /dev/null +++ b/script/syslog-ng/syslog-ng.conf @@ -0,0 +1,30 @@ +@version: 3.7 +@module mod-java +@include "scl.conf" + +source s_tcp { + network(port(601)); +}; + +destination d_file { + file("/var/log/testlogs"); +}; + +destination d_kafka { + kafka( + client_lib_dir("/opt/kafka/libs/*.jar:/usr/share/kafka/lib/*.jar") + kafka_bootstrap_servers("kafka:9092") + topic("test1") + ); +}; + + +log { + source(s_tcp); + destination(d_file); +}; + +log { + source(s_tcp); + destination(d_kafka); +}; diff --git a/src/desdemona/catalogs/sample_catalog.clj b/src/desdemona/catalogs/sample_catalog.clj index 5b488d5..8debc6d 100644 --- a/src/desdemona/catalogs/sample_catalog.clj +++ b/src/desdemona/catalogs/sample_catalog.clj @@ -1,4 +1,5 @@ -(ns desdemona.catalogs.sample-catalog) +(ns desdemona.catalogs.sample-catalog + (:require [desdemona.functions.sample-functions :refer [format-line upper-case transform-segment-shape prepare-rows]])) ;;; Catalogs describe each task in a workflow. We use ;;; them for describing input and output sources, injecting parameters, @@ -7,35 +8,32 @@ (defn build-catalog ([] (build-catalog 5 50)) ([batch-size batch-timeout] - [{:onyx/name :read-lines - :onyx/plugin :desdemona.plugins.http-reader/reader - :onyx/type :input - :onyx/medium :http - :http/uri "http://textfiles.com/stories/abbey.txt" - :onyx/batch-size batch-size - :onyx/batch-timeout batch-timeout - :onyx/max-peers 1 - :onyx/doc "Reads lines from an HTTP url text file"} + [ + {:onyx/name :format-line + :onyx/fn :desdemona.functions.sample-functions/format-line + :onyx/type :function + :onyx/batch-size batch-size + :onyx/batch-timeout batch-timeout + :onyx/doc "Strips the line of any leading or trailing whitespace"} - {:onyx/name :format-line - :onyx/fn :desdemona.functions.sample-functions/format-line - :onyx/type :function - :onyx/batch-size batch-size - :onyx/batch-timeout batch-timeout - :onyx/doc "Strips the line of any leading or trailing whitespace"} + {:onyx/name :upper-case + :onyx/fn :desdemona.functions.sample-functions/upper-case + :onyx/type :function + :onyx/batch-size batch-size + :onyx/batch-timeout batch-timeout + :onyx/doc "Capitalizes the first letter of the line"} - {:onyx/name :upper-case - :onyx/fn :desdemona.functions.sample-functions/upper-case - :onyx/type :function - :onyx/batch-size batch-size - :onyx/batch-timeout batch-timeout - :onyx/doc "Capitalizes the first letter of the line"} + {:onyx/name :extract-line-info + :onyx/fn :desdemona.functions.sample-functions/transform-segment-shape + :onyx/type :function + :onyx/batch-size batch-size + :onyx/batch-timeout batch-timeout + :keypath {"line" [:line]} + :onyx/params [:keypath] + :onyx/doc "Extracts the line"} - {:onyx/name :write-lines - :onyx/plugin :onyx.plugin.core-async/output - :onyx/type :output - :onyx/medium :core.async - :onyx/batch-size batch-size - :onyx/batch-timeout batch-timeout - :onyx/max-peers 1 - :onyx/doc "Writes segments to a core.async channel"}])) + {:onyx/name :prepare-rows + :onyx/fn :desdemona.functions.sample-functions/prepare-rows + :onyx/type :function + :onyx/batch-size batch-size + :onyx/batch-timeout batch-timeout}])) diff --git a/src/desdemona/functions/sample_functions.clj b/src/desdemona/functions/sample_functions.clj index 6e60479..525501a 100644 --- a/src/desdemona/functions/sample_functions.clj +++ b/src/desdemona/functions/sample_functions.clj @@ -1,5 +1,7 @@ (ns desdemona.functions.sample-functions - (:require [clojure.string :refer [trim capitalize]])) + (:require [clojure + [string :refer [capitalize trim]] + [walk :refer [postwalk]]])) ;;; Defines functions to be used by the peers. These are located ;;; with fully qualified namespaced keywords, such as @@ -13,3 +15,20 @@ (let [upper-cased (apply str (capitalize (first line)) (rest line))] (assoc-in segment [:line] upper-cased)) segment)) + +(defn transform-segment-shape + "Recursively restructures a segment {:new-key [paths...]}" + [paths segment] + (try (let [f (fn [[k v]] + (if (vector? v) + [k (get-in segment v)] + [k v]))] + (postwalk (fn [x] (if (map? x) (into {} (map f x)) x)) paths)) + (catch Exception e + segment))) + +(defn get-in-segment [keypath segment] + (get-in segment keypath)) + +(defn prepare-rows [segment] + {:rows [segment]}) diff --git a/src/desdemona/jobs/sample_submit_job.clj b/src/desdemona/jobs/sample_submit_job.clj index 4083ac9..c40bf19 100644 --- a/src/desdemona/jobs/sample_submit_job.clj +++ b/src/desdemona/jobs/sample_submit_job.clj @@ -1,33 +1,58 @@ (ns desdemona.jobs.sample-submit-job - (:require [clojure.java.io :refer [resource]] - [com.stuartsierra.component :as component] - [desdemona.workflows.sample-workflow :refer [workflow]] - [desdemona.catalogs.sample-catalog :refer [build-catalog] :as sc] - [desdemona.lifecycles.sample-lifecycle :refer [build-lifecycles] :as sl] - [desdemona.flow-conditions.sample-flow-conditions :as sf] - [desdemona.functions.sample-functions] - [desdemona.dev-inputs.sample-input :as dev-inputs] - [desdemona.utils :as u] - [onyx.api])) + (:require [desdemona.catalogs.sample-catalog :refer [build-catalog]] + [desdemona.tasks.kafka :refer [add-kafka-input add-kafka-output]] + [desdemona.tasks.core-async :refer [add-core-async-input add-core-async-output]] + [desdemona.tasks.sql :refer [add-sql-partition-input add-sql-insert-output]] + [desdemona.tasks.file-input :refer [add-seq-file-input]] + [desdemona.lifecycles.sample-lifecycle :refer [build-lifecycles]] + [desdemona.lifecycles.metrics :refer [add-metrics]] + [desdemona.lifecycles.logging :refer [add-logging]] + [desdemona.workflows.sample-workflow :refer [build-workflow]] + [aero.core :refer [read-config]] + [onyx.api])) -(defn submit-job [dev-env] - (let [dev-cfg (-> "dev-peer-config.edn" resource slurp read-string) - peer-config (assoc dev-cfg :onyx/id (:onyx-id dev-env)) - ;; Turn :read-lines and :write-lines into core.async I/O channels - stubs [:read-lines :write-lines] - ;; Stubs the catalog entries for core.async I/O - dev-catalog (u/in-memory-catalog (build-catalog 20 50) stubs) - ;; Stubs the lifecycles for core.async I/O - dev-lifecycles (u/in-memory-lifecycles (build-lifecycles) dev-catalog stubs)] - ;; Automatically pipes the data structure into the channel, attaching :done at the end - (u/bind-inputs! dev-lifecycles {:read-lines dev-inputs/lines}) - (let [job {:workflow workflow - :catalog dev-catalog - :lifecycles dev-lifecycles - :flow-conditions sf/flow-conditions - :task-scheduler :onyx.task-scheduler/balanced}] - (onyx.api/submit-job peer-config job) - ;; Automatically grab output from the stubbed core.async channels, - ;; returning a vector of the results with data structures representing - ;; the output. - (u/collect-outputs! dev-lifecycles [:write-lines])))) +;;;; +;; Lets build a job +;; Depending on the mode, the job is built up in a different way +;; When :dev mode, onyx-seq will be used as an input, with the meetup data being +;; included in the onyx-seq lifecycle for easy access +;; core.async is then added as an output task +;; +;; When using :prod mode, kafka is added as an input, and onyx-sql is used as the output + +(defn build-job [mode] + (let [batch-size 1 + batch-timeout 1000 + base-job {:catalog (build-catalog batch-size batch-timeout) + :lifecycles (build-lifecycles {:mode mode}) + :workflow (build-workflow {:mode mode}) + :task-scheduler :onyx.task-scheduler/balanced}] + (cond-> base-job + (= :dev mode) (add-core-async-output :write-lines {:onyx/batch-size batch-size}) + (= :dev mode) (add-seq-file-input :read-lines {:onyx/batch-size batch-size + :filename "resources/sample_input.edn"}) + (= :prod mode) (add-kafka-input :read-lines {:onyx/batch-size batch-size + :onyx/max-peers 1 + :kafka/topic "test1" + :kafka/group-id "onyx-consumer" + :kafka/zookeeper "zk:2181" + :kafka/deserializer-fn :desdemona.tasks.kafka/deserialize-message-raw + :kafka/offset-reset :smallest}) + (= :prod mode) (add-sql-insert-output :write-lines {:onyx/batch-size batch-size + :sql/classname "com.mysql.jdbc.Driver" + :sql/subprotocol "mysql" + :sql/subname "//db:3306/logs" + :sql/user "onyx" + :sql/password "onyx" + :sql/table :logLines}) + true (add-logging :read-lines) + true (add-logging :write-lines)))) + +(defn -main [& args] + (let [config (read-config (clojure.java.io/resource "config.edn") {:profile :dev}) + peer-config (get config :peer-config) + job (build-job :prod)] + (println peer-config) + (println job) + (let [{:keys [job-id]} (onyx.api/submit-job peer-config job)] + (println "Submitted job: " job-id)))) diff --git a/src/desdemona/launcher/aeron_media_driver.clj b/src/desdemona/launcher/aeron_media_driver.clj index d28ea0a..617a1b4 100644 --- a/src/desdemona/launcher/aeron_media_driver.clj +++ b/src/desdemona/launcher/aeron_media_driver.clj @@ -1,11 +1,38 @@ (ns desdemona.launcher.aeron-media-driver (:gen-class) - (:require [clojure.core.async :refer [chan (MediaDriver$Context.) + threading-mode (.threadingMode threading-mode-obj) + delete-dirs (.dirsDeleteOnStart delete-dirs)) + media-driver (try (MediaDriver/launch ctx) + (catch IllegalStateException ise (throw (Exception. "Error starting media driver. This may be due to a media driver data incompatibility between versions. Check that no other media driver has been started and then use -d to delete the directory on startup" ise))))] (println "Launched the Media Driver. Blocking forever...") (OnyxDevEnv n-peers)) diff --git a/src/desdemona/launcher/launch_prod_peers.clj b/src/desdemona/launcher/launch_prod_peers.clj index a8c1e3d..d34b69b 100644 --- a/src/desdemona/launcher/launch_prod_peers.clj +++ b/src/desdemona/launcher/launch_prod_peers.clj @@ -1,26 +1,43 @@ (ns desdemona.launcher.launch-prod-peers - (:gen-class) - (:require [clojure.core.async :refer [chan "prod-peer-config.edn" - resource slurp read-string) :onyx/id onyx-id) - peer-group (onyx.api/start-peer-group peer-config) - peers (onyx.api/start-peers n-peers peer-group)] - (.addShutdownHook (Runtime/getRuntime) - (Thread. - (fn [] - (doseq [v-peer peers] - (onyx.api/shutdown-peer v-peer)) - (onyx.api/shutdown-peer-group peer-group) - (shutdown-agents)))) - (println "Started peers. Blocking forever.") - ;; Block forever. - ( (:peer-config config) + (assoc :onyx.log/config {:appenders + {:standard-out + {:enabled? true + :async? false + :output-fn t/default-output-fn + :fn standard-out-logger}}})) + peer-group (onyx.api/start-peer-group peer-config) + env (onyx.api/start-env (:env-config config)) + peers (onyx.api/start-peers n-peers peer-group)] + (println "Attempting to connect to to Zookeeper: " (:zookeeper/address peer-config)) + (.addShutdownHook (Runtime/getRuntime) + (Thread. + (fn [] + (doseq [v-peer peers] + (onyx.api/shutdown-peer v-peer)) + (onyx.api/shutdown-peer-group peer-group) + (shutdown-agents)))) + (println "Started peers. Blocking forever.") + ;; Block forever. + ( "prod-peer-config.edn" resource slurp read-string) - peer-config (assoc cfg :onyx/id onyx-id) - lifecycles (sample-lifecycle/build-lifecycles)] - (let [job {:workflow workflow - :catalog (build-catalog 20 50) - :lifecycles lifecycles - :task-scheduler :onyx.task-scheduler/balanced}] - (onyx.api/submit-job peer-config job) - (shutdown-agents)))) diff --git a/src/desdemona/lifecycles/logging.clj b/src/desdemona/lifecycles/logging.clj new file mode 100644 index 0000000..e015b46 --- /dev/null +++ b/src/desdemona/lifecycles/logging.clj @@ -0,0 +1,19 @@ +(ns desdemona.lifecycles.logging + (:require [taoensso.timbre :refer [info]])) + +(defn log-batch [event lifecycle] + (let [task-name (:onyx/name (:onyx.core/task-map event))] + (doseq [m (map :message (mapcat :leaves (:tree (:onyx.core/results event))))] + (info task-name " logging segment: " m))) + {}) + +(def log-calls + {:lifecycle/after-batch log-batch}) + +(defn add-logging + "Add's logging output to a tasks output-batch. " + [job task] + (if-let [entry (first (filter #(= (:onyx/name %) task) (:catalog job)))] + (-> job + (update-in [:lifecycles] conj {:lifecycle/task task + :lifecycle/calls ::log-calls})))) diff --git a/src/desdemona/lifecycles/metrics.clj b/src/desdemona/lifecycles/metrics.clj new file mode 100644 index 0000000..59281b4 --- /dev/null +++ b/src/desdemona/lifecycles/metrics.clj @@ -0,0 +1,9 @@ +(ns desdemona.lifecycles.metrics) + +(defn add-metrics + "Add's throughput and latency metrics to a task" + [job task opts] + (-> job + (update-in [:lifecycles] conj (merge {:lifecycle/task task ; Or :all for all tasks in the workflow + :lifecycle/calls :onyx.lifecycle.metrics.metrics/calls} + opts)))) diff --git a/src/desdemona/lifecycles/sample_lifecycle.clj b/src/desdemona/lifecycles/sample_lifecycle.clj index 54fa301..f029b09 100644 --- a/src/desdemona/lifecycles/sample_lifecycle.clj +++ b/src/desdemona/lifecycles/sample_lifecycle.clj @@ -1,32 +1,6 @@ -(ns desdemona.lifecycles.sample-lifecycle - (:require [clojure.core.async :refer [chan sliding-buffer >!!]] - [onyx.plugin.core-async :refer [take-segments!]] - [taoensso.timbre :refer [info]] - [onyx.static.planning :refer [find-task]] - [desdemona.utils :as u])) +(ns desdemona.lifecycles.sample-lifecycle) -(defn log-batch [event lifecycle] - (let [task-name (:onyx/name (:onyx.core/task-map event))] - (doseq [m (map :message (mapcat :leaves (:tree (:onyx.core/results event))))] - (info task-name " logging segment: " m))) - {}) - -(def log-calls - {:lifecycle/after-batch log-batch}) - -(defn build-lifecycles [] - [{:lifecycle/task :read-lines - :lifecycle/calls :desdemona.plugins.http-reader/reader-calls - :lifecycle/replaceable? true - :lifecycle/doc "Lifecycle for reading from a core.async chan"} - {:lifecycle/task :write-lines - :lifecycle/calls :desdemona.utils/out-calls - :core.async/id (java.util.UUID/randomUUID) - :lifecycle/replaceable? true - :lifecycle/doc "Lifecycle for your output task. When using in-memory-lifecycles, this will be replaced"} - {:lifecycle/task :write-lines - :lifecycle/calls :onyx.plugin.core-async/writer-calls - :lifecycle/doc "Lifecycle for injecting a core.async writer chan"} - {:lifecycle/task :write-lines - :lifecycle/calls ::log-calls - :lifecycle/doc "Lifecycle for printing the output of a task's batch"}]) +(defn build-lifecycles + "Put your environment-independent lifecycles here" + [ctx] + []) diff --git a/src/desdemona/plugins/http_reader.clj b/src/desdemona/plugins/http_reader.clj deleted file mode 100644 index 2df36ad..0000000 --- a/src/desdemona/plugins/http_reader.clj +++ /dev/null @@ -1,87 +0,0 @@ -(ns desdemona.plugins.http-reader - (:require [clojure.core.async :refer [chan >!! !! ch {:line x})) - (>!! ch :done)) - (catch Exception e - (.printStackTrace e))))] - {:http/fut fut - :http/chan ch - :http/retry-ch (chan 1000) - :http/drained? (atom false) - :http/pending-messages (atom {})})) - -(def reader-calls - {:lifecycle/before-task-start inject-reader}) - -(defrecord HttpReader [] - p-ext/Pipeline - (write-batch - [this event] - (function/write-batch event)) - - (read-batch [_ {:keys [onyx.core/task-map http/chan http/retry-ch http/pending-messages http/drained?] :as event}] - (let [pending (count @pending-messages) - max-pending (or (:onyx/max-pending task-map) (:onyx/max-pending defaults)) - batch-size (:onyx/batch-size task-map) - max-segments (min (- max-pending pending) batch-size) - ms (or (:onyx/batch-timeout task-map) (:onyx/batch-timeout defaults)) - step-ms (/ ms (:onyx/batch-size task-map)) - timeout-ch (timeout ms) - batch (if (zero? max-segments) - (!! retry-ch msg) - (swap! pending-messages dissoc message-id))) - - (pending? - [_ {:keys [http/pending-messages]} message-id] - (get @pending-messages message-id)) - - (drained? - [_ {:keys [http/drained?]}] - @drained?)) - -(defn reader [pipeline-data] - (->HttpReader)) diff --git a/src/desdemona/tasks/core_async.clj b/src/desdemona/tasks/core_async.clj new file mode 100644 index 0000000..58531a3 --- /dev/null +++ b/src/desdemona/tasks/core_async.clj @@ -0,0 +1,81 @@ +(ns desdemona.tasks.core-async + (:require [clojure.core.async :refer [chan sliding-buffer >!!]] + [onyx.plugin.core-async :refer [take-segments!]] + [taoensso.timbre :refer [info]] + [clojure.set :refer [join]] + [cheshire.core :as json])) + +(def channels (atom {})) + +(def default-channel-size 1000) + +(defn get-channel + ([id] (get-channel id nil)) + ([id size] + (if-let [id (get @channels id)] + id + (do (swap! channels assoc id (chan (or size default-channel-size))) + (get-channel id))))) + +(defn inject-in-ch + [_ lifecycle] + {:core.async/chan (get-channel (:core.async/id lifecycle) + (or (:core.async/size lifecycle) default-channel-size))}) +(defn inject-out-ch + [_ lifecycle] + {:core.async/chan (get-channel (:core.async/id lifecycle) + (or (:core.async/size lifecycle) default-channel-size))}) + +(def in-calls + {:lifecycle/before-task-start inject-in-ch}) + +(def out-calls + {:lifecycle/before-task-start inject-out-ch}) + +(defn get-core-async-channels + [{:keys [catalog lifecycles]}] + (let [lifecycle-catalog-join (join catalog lifecycles {:onyx/name :lifecycle/task})] + (reduce (fn [acc item] + (assoc acc + (:onyx/name item) + (get-channel (:core.async/id item)))) {} (filter :core.async/id lifecycle-catalog-join)))) + + +(defn add-core-async-input + ([job task opts] (add-core-async-input job task opts default-channel-size)) + ([job task opts chan-size] + (-> job + (update :catalog conj (merge {:onyx/name task + :onyx/plugin :onyx.plugin.core-async/input + :onyx/type :input + :onyx/medium :core.async + ;:onyx/batch-size batch-size + :onyx/max-peers 1 + :onyx/doc "Reads segments from a core.async channel"} + opts)) + (update :lifecycles into [{:lifecycle/task task + :lifecycle/calls ::in-calls + :core.async/id (java.util.UUID/randomUUID) + :core.async/size chan-size} + {:lifecycle/task task + :lifecycle/calls :onyx.plugin.core-async/reader-calls}])))) + +(defn add-core-async-output + ([job task opts] (add-core-async-output job task opts default-channel-size)) + ([job task opts chan-size] + (-> job + (update :catalog conj (merge {:onyx/name task + :onyx/plugin :onyx.plugin.core-async/output + :onyx/type :output + :onyx/medium :core.async + :onyx/max-peers 1 + ;:onyx/batch-size batch-size + :onyx/doc "Writes segments to a core.async channel"} + opts)) + + (update :lifecycles into [{:lifecycle/task task + :core.async/id (java.util.UUID/randomUUID) + :core.async/size (inc chan-size) + :lifecycle/calls ::out-calls} + {:lifecycle/task task + :lifecycle/calls :onyx.plugin.core-async/writer-calls}])))) diff --git a/src/desdemona/tasks/file_input.clj b/src/desdemona/tasks/file_input.clj new file mode 100644 index 0000000..28d6388 --- /dev/null +++ b/src/desdemona/tasks/file_input.clj @@ -0,0 +1,28 @@ +(ns desdemona.tasks.file-input + (:require [taoensso.timbre :refer [info]])) + +(defn inject-in-reader [event lifecycle] + (let [filename (:filename (:onyx.core/task-map event))] + {:seq/seq (read-string (slurp filename))})) + +(def in-seq-calls + {:lifecycle/before-task-start inject-in-reader}) + +(defn add-seq-file-input + "Adds task catalog entry and task lifecycles to use an edn file as an input" + [job task opts] + (-> job + (update :catalog conj (merge {:onyx/name task + :onyx/plugin :onyx.plugin.seq/input + :onyx/type :input + :onyx/medium :seq + ;:onyx/batch-size batch-size + ; :filename filename + :seq/checkpoint? true + :onyx/max-peers 1 + :onyx/doc "Reads segments from seq"} + opts)) + (update :lifecycles into [{:lifecycle/task task + :lifecycle/calls ::in-seq-calls} + {:lifecycle/task task + :lifecycle/calls :onyx.plugin.seq/reader-calls}]))) diff --git a/src/desdemona/tasks/kafka.clj b/src/desdemona/tasks/kafka.clj new file mode 100644 index 0000000..00c2aa5 --- /dev/null +++ b/src/desdemona/tasks/kafka.clj @@ -0,0 +1,69 @@ +(ns desdemona.tasks.kafka + (:require [taoensso.timbre :refer [info]] + [cheshire.core :as json])) + +(defn deserialize-message-json [bytes] + (try + (json/parse-string (String. bytes "UTF-8")) + (catch Exception e + {:error e}))) + +(defn deserialize-message-edn [bytes] + (try + (read-string (String. bytes "UTF-8")) + (catch Exception e + {:error e}))) + +(defn deserialize-message-raw [bytes] + (try + {:line (String. bytes "UTF-8")} + (catch Exception e + {:error e}))) + +(defn serialize-message-json [segment] + (.getBytes (json/generate-string segment))) + +(defn serialize-message-edn [segment] + (.getBytes segment)) + +(defn add-kafka-input + "Instrument a job with Kafka lifecycles and catalog entries." + [job task opts] + (-> job + (update :catalog conj (merge {:onyx/name task + :onyx/plugin :onyx.plugin.kafka/read-messages + :onyx/type :input + :onyx/medium :kafka + ;:kafka/topic "topic" + ;:kafka/group-id "group-id" + :kafka/fetch-size 307200 + :kafka/chan-capacity 1000 + ;:kafka/zookeeper "zookeeper-addr" + :kafka/offset-reset :smallest + ;:kafka/force-reset? true + :kafka/empty-read-back-off 500 + :kafka/commit-interval 500 + ;:kafka/deserializer-fn ::deserialize-message-json + ;:onyx/batch-size 100 + :onyx/doc "Reads messages from a Kafka topic"} + opts)) + (update :lifecycles conj {:lifecycle/task task + :lifecycle/calls :onyx.plugin.kafka/read-messages-calls}))) + +(defn add-kafka-output + "Instrument a job with Kafka lifecycles and catalog entries." + [job task opts] + (-> job + (update :catalog conj (merge {:onyx/name task + :onyx/plugin :onyx.plugin.kafka/write-messages + :onyx/type :output + :onyx/medium :kafka + ;:onyx/batch-size batch-size + ;:kafka/topic topic + ;:kafka/zookeeper zookeeper-addr + ;:kafka/serializer-fn (expand-serializer-fn serializer-fn) + :kafka/request-size 307200 + :onyx/doc "Writes messages to a Kafka topic"} + opts)) + (update :lifecycles conj {:lifecycle/task task + :lifecycle/calls :onyx.plugin.kafka/write-messages-calls}))) diff --git a/src/desdemona/tasks/sql.clj b/src/desdemona/tasks/sql.clj new file mode 100644 index 0000000..f52efa8 --- /dev/null +++ b/src/desdemona/tasks/sql.clj @@ -0,0 +1,52 @@ +(ns desdemona.tasks.sql + (:require [schema.core :as s] + [taoensso.timbre :refer [info]])) + +;; TODO, add read-rows function task + +(s/defn add-sql-partition-input + "Adds an sql patition input task to a job" + [job task opts] + (-> job + (update :catalog conj (merge {:onyx/name task + :onyx/plugin :onyx.plugin.sql/partition-keys + :onyx/type :input + :onyx/medium :sql + ;:onyx/batch-size batch-size + ;:sql/classname classname + ;:sql/subprotocol subprotocol + ;:sql/subname subname + ;:sql/user user + ;:sql/password password + ;:sql/table table-name ; e.g. :your-table + ;:sql/id id-column ; e.g. :an-id-column + :sql/columns [:*] + ;; 500 * 1000 = 50,000 rows + ;; to be processed within :onyx/pending-timeout, 60s by default + :sql/rows-per-segment 500 + :onyx/max-pending 1000 + :onyx/max-peers 1 + :onyx/doc "Partitions a range of primary keys into subranges"} + opts)) + (update :lifecycles conj {:lifecycle/task task + :lifecycle/calls :onyx.plugin.sql/read-rows-calls}))) + +(defn add-sql-insert-output + "Adds an sql insert rows output task to a job" + [job task opts] + (-> job + (update :catalog conj (merge {:onyx/name task + :onyx/plugin :onyx.plugin.sql/write-rows + :onyx/type :output + :onyx/medium :sql + ;:onyx/batch-size batch-size + ;:sql/classname classname + ;:sql/subprotocol subprotocol + ;:sql/subname subname + ;:sql/user user + ;:sql/password password + ;:sql/table table-name + :onyx/doc "Writes segments from the :rows keys to the SQL database"} + opts)) + (update :lifecycles conj {:lifecycle/task task + :lifecycle/calls :onyx.plugin.sql/write-rows-calls}))) diff --git a/src/desdemona/utils.clj b/src/desdemona/utils.clj deleted file mode 100644 index 58a6854..0000000 --- a/src/desdemona/utils.clj +++ /dev/null @@ -1,183 +0,0 @@ -(ns desdemona.utils - (:require [clojure.test :refer [is]] - [clojure.core.async :refer [chan sliding-buffer >!!]] - [clojure.java.io :refer [resource]] - [onyx.plugin.core-async :refer [take-segments!]])) - -;;;; Test utils ;;;; - -(def zk-address "127.0.0.1") - -(def zk-port 2188) - -(def zk-str (str zk-address ":" zk-port)) - -(defn only [coll] - (assert (not (next coll))) - (if-let [result (first coll)] - result - (assert false))) - -(defn find-task - "Finds the catalog entry where the :onyx/name key equals task-name" - [catalog task-name] - (let [matches (filter #(= task-name (:onyx/name %)) catalog)] - (when-not (seq matches) - (throw (ex-info (format "Couldn't find task %s in catalog" task-name) - {:catalog catalog :task-name task-name}))) - (first matches))) - -(defn update-task - "Finds the catalog entry with :onyx/name task-name - and applies f to it, returning the full catalog with the - transformed catalog entry" - [catalog task-name f] - (mapv (fn [entry] - (if (= task-name (:onyx/name entry)) - (f entry) - entry)) - catalog)) - -(defn add-to-job - "Adds to the catalog and lifecycles of a job in form - {:workflow ... - :catalog ... - :lifecycles ...}" - [job {:keys [catalog lifecycles]}] - (-> job - (update :catalog into catalog) - (update :lifecycles into lifecycles))) - -(defn n-peers - "Takes a workflow and catalog, returns the minimum number of peers - needed to execute this job." - [catalog workflow] - (let [task-set (into #{} (apply concat workflow))] - (reduce - (fn [sum t] - (+ sum (or (:onyx/min-peers (find-task catalog t)) 1))) - 0 task-set))) - -(defn segments-equal? - "Onyx is a parallel, distributed system - so ordering isn't guaranteed. - Does an unordered comparison of segments to check for equality." - [expected actual] - (is (= (into #{} expected) (into #{} (remove (partial = :done) actual)))) - (is (= :done (last actual))) - (is (= (dec (count actual)) (count expected)))) - -(defn load-peer-config [onyx-id] - (assoc (-> "dev-peer-config.edn" resource slurp read-string) - :onyx/id onyx-id - :zookeeper/address zk-str)) - -(defn load-env-config [onyx-id] - (assoc (-> "env-config.edn" resource slurp read-string) - :onyx/id onyx-id - :zookeeper/address zk-str - :zookeeper.server/port zk-port)) - -(defn in-memory-catalog - "Takes a catalog and a set of input/output task names, - returning a new catalog with all I/O catalog entries - that were specified turned into core.async plugins. The - core.async entries preserve all non-Onyx parameters." - [catalog tasks] - (mapv - (fn [entry] - (cond (and (some #{(:onyx/name entry)} tasks) (= (:onyx/type entry) :input)) - (merge - entry - {:onyx/plugin :onyx.plugin.core-async/input - :onyx/type :input - :onyx/medium :core.async - :onyx/max-peers 1 - :onyx/doc "Reads segments from a core.async channel"}) - (and (some #{(:onyx/name entry)} tasks) (= (:onyx/type entry) :output)) - (merge - entry - {:onyx/plugin :onyx.plugin.core-async/output - :onyx/type :output - :onyx/medium :core.async - :onyx/max-peers 1 - :onyx/doc "Writes segments to a core.async channel"}) - :else entry)) - catalog)) - -;;;; Lifecycles utils ;;;; - -(def input-channel-capacity 10000) - -(def output-channel-capacity (inc input-channel-capacity)) - -(def get-input-channel - (memoize - (fn [id] (chan input-channel-capacity)))) - -(def get-output-channel - (memoize - (fn [id] (chan (sliding-buffer output-channel-capacity))))) - -(defn channel-id-for [lifecycles task-name] - (->> lifecycles - (filter #(= task-name (:lifecycle/task %))) - (map :core.async/id) - (remove nil?) - (first))) - -(defn inject-in-ch [event lifecycle] - {:core.async/chan (get-input-channel (:core.async/id lifecycle))}) - -(defn inject-out-ch [event lifecycle] - {:core.async/chan (get-output-channel (:core.async/id lifecycle))}) - -(def in-calls - {:lifecycle/before-task-start inject-in-ch}) - -(def out-calls - {:lifecycle/before-task-start inject-out-ch}) - -;;; Stubs lifecycles to use core.async IO, instead of, say, Kafka or Datomic. -(defn in-memory-lifecycles - [lifecycles catalog tasks] - (vec - (mapcat - (fn [{:keys [lifecycle/task lifecycle/replaceable?] :as lifecycle}] - (let [catalog-entry (find-task catalog task)] - (cond (and (some #{task} tasks) replaceable? - (= (:onyx/type catalog-entry) :input)) - [{:lifecycle/task task - :lifecycle/calls ::in-calls - :core.async/id (java.util.UUID/randomUUID)} - {:lifecycle/task task - :lifecycle/calls :onyx.plugin.core-async/reader-calls}] - (and (some #{task} tasks) replaceable? - (= (:onyx/type catalog-entry) :output)) - [{:lifecycle/task task - :lifecycle/calls ::out-calls - :core.async/id (java.util.UUID/randomUUID)} - {:lifecycle/task task - :lifecycle/calls :onyx.plugin.core-async/writer-calls}] - :else [lifecycle]))) - lifecycles))) - -(defn bind-inputs! [lifecycles mapping] - (doseq [[task segments] mapping] - (let [in-ch (get-input-channel (channel-id-for lifecycles task)) - n-segments (count segments)] - (when (< input-channel-capacity n-segments) - (throw (ex-info "Input channel capacity is smaller than bound inputs. Capacity can be adjusted in utils.clj" - {:channel-size input-channel-capacity - :n-segments n-segments}))) - (when-not ((set (map :lifecycle/task lifecycles)) task) - (throw (ex-info (str "Cannot bind input for task " task " as lifecycles are missing. Check that inputs are being bound to the correct task name.") - {:input task - :lifecycles lifecycles}))) - (doseq [segment segments] - (>!! in-ch segment)) - (>!! in-ch :done)))) - -(defn collect-outputs! [lifecycles output-tasks] - (->> output-tasks - (map #(get-output-channel (channel-id-for lifecycles %))) - (map #(take-segments! %)))) diff --git a/src/desdemona/workflows/sample_workflow.clj b/src/desdemona/workflows/sample_workflow.clj index cb54ccb..61fdc96 100644 --- a/src/desdemona/workflows/sample_workflow.clj +++ b/src/desdemona/workflows/sample_workflow.clj @@ -3,7 +3,16 @@ ;;; The workflow of an Onyx job describes the graph of all possible ;;; tasks that data can flow between. -(def workflow - [[:read-lines :format-line] - [:format-line :upper-case] - [:upper-case :write-lines]]) +(defmulti build-workflow :mode) + +(defmethod build-workflow :dev + [ctx] + [[:read-lines :extract-line-info] + [:extract-line-info :prepare-rows] + [:prepare-rows :write-lines]]) + +(defmethod build-workflow :prod + [ctx] + [[:read-lines :extract-line-info] + [:extract-line-info :prepare-rows] + [:prepare-rows :write-lines]]) diff --git a/test/desdemona/jobs/sample_job_test.clj b/test/desdemona/jobs/sample_job_test.clj index 2a7d4fa..a138843 100644 --- a/test/desdemona/jobs/sample_job_test.clj +++ b/test/desdemona/jobs/sample_job_test.clj @@ -1,53 +1,28 @@ (ns desdemona.jobs.sample-job-test (:require [clojure.test :refer [deftest is]] - [clojure.core.async :refer [>!!]] - [clojure.java.io :refer [resource]] - [com.stuartsierra.component :as component] - [desdemona.launcher.dev-system :refer [onyx-dev-env]] - [desdemona.workflows.sample-workflow :refer [workflow]] - [desdemona.catalogs.sample-catalog :refer [build-catalog] :as sc] - [desdemona.lifecycles.sample-lifecycle :refer [build-lifecycles] :as sl] - [desdemona.plugins.http-reader] - [desdemona.functions.sample-functions] - [desdemona.dev-inputs.sample-input :as dev-inputs] - [desdemona.utils :as u] - [onyx.api])) + [onyx api + [test-helper :refer [feedback-exception! validate-enough-peers! load-config with-test-env]]] + [desdemona.jobs.sample-submit-job :refer [build-job]] + [desdemona.tasks.core-async :refer [get-core-async-channels]] + [onyx.plugin.core-async :refer [take-segments!]] + ; Make the plugins load + [onyx.plugin.kafka] + [onyx.plugin.seq] + [onyx.plugin.sql])) -(deftest test-sample-dev-job - (try - (let [stubs [:read-lines :write-lines] - catalog (u/in-memory-catalog (build-catalog) stubs) - lifecycles (u/in-memory-lifecycles (build-lifecycles) catalog stubs)] - (user/go (u/n-peers catalog workflow)) - (u/bind-inputs! lifecycles {:read-lines dev-inputs/lines}) - (let [peer-config (u/load-peer-config (:onyx-id user/system)) - job {:workflow workflow - :catalog catalog - :lifecycles lifecycles - :task-scheduler :onyx.task-scheduler/balanced}] - (onyx.api/submit-job peer-config job) - (let [[results] (u/collect-outputs! lifecycles [:write-lines])] - (is (seq results))))) - (catch InterruptedException e - (Thread/interrupted)) - (finally - (user/stop)))) - -(deftest test-sample-prod-job - (try - (let [catalog (build-catalog 20 500) - lifecycles (build-lifecycles)] - (user/go (u/n-peers catalog workflow)) - (u/bind-inputs! lifecycles {:read-lines dev-inputs/lines}) - (let [peer-config (u/load-peer-config (:onyx-id user/system)) - job {:workflow workflow - :catalog catalog - :lifecycles lifecycles - :task-scheduler :onyx.task-scheduler/balanced}] - (onyx.api/submit-job peer-config job) - (let [[results] (u/collect-outputs! lifecycles [:write-lines])] - (is (seq results))))) - (catch InterruptedException e - (Thread/interrupted)) - (finally - (user/stop)))) +(deftest onyx-dev-job-test + (let [id (java.util.UUID/randomUUID) + config (load-config) + env-config (assoc (:env-config config) :onyx/id id) + peer-config (assoc (:peer-config config) :onyx/id id)] + ;; Be sure to set the peer count (5 here) to a number greater than + ;; the amount of tasks in your job. + (with-test-env [test-env [5 env-config peer-config]] + (let [job (build-job :dev) + {:keys [write-lines]} (get-core-async-channels job) + _ (validate-enough-peers! test-env job) + {:keys [job-id]} (onyx.api/submit-job peer-config job)] + (feedback-exception! peer-config job-id) + (let [results (take-segments! write-lines)] + (is (= 4 (count results))) + (is (= :done (last results)))))))) From 513fd3eeef81645d0d33117c46707be35d4e7e36 Mon Sep 17 00:00:00 2001 From: Sean Schulte Date: Wed, 3 Feb 2016 08:50:37 -0600 Subject: [PATCH 02/17] cljfmt --- env/dev/desdemona/dev_inputs/sample_input.clj | 3 +- src/desdemona/catalogs/sample_catalog.clj | 53 ++++++++-------- src/desdemona/functions/sample_functions.clj | 4 +- src/desdemona/jobs/sample_submit_job.clj | 26 ++++---- src/desdemona/launcher/aeron_media_driver.clj | 4 +- src/desdemona/launcher/launch_prod_peers.clj | 62 +++++++++---------- src/desdemona/tasks/core_async.clj | 1 - src/desdemona/tasks/kafka.clj | 10 +-- src/desdemona/tasks/sql.clj | 18 +++--- test/desdemona/jobs/sample_job_test.clj | 40 ++++++------ 10 files changed, 109 insertions(+), 112 deletions(-) diff --git a/env/dev/desdemona/dev_inputs/sample_input.clj b/env/dev/desdemona/dev_inputs/sample_input.clj index b8215db..33205ce 100644 --- a/env/dev/desdemona/dev_inputs/sample_input.clj +++ b/env/dev/desdemona/dev_inputs/sample_input.clj @@ -14,5 +14,4 @@ {:line ""} {:line "2016-01-29T21:44:03+00:00 192.168.99.1 102 <1> Jan 29 15:44:03 lips.local sean[68673]: hello ernie, it's 2016-01-29 15:44:03.164769137 -0600 CST"} {:line ""} - {:line "2016-01-29T21:44:04+00:00 192.168.99.1 102 <1> Jan 29 15:44:03 lips.local sean[68673]: hello ernie, it's 2016-01-29 15:44:03.669196559 -0600 CST"}] - ) + {:line "2016-01-29T21:44:04+00:00 192.168.99.1 102 <1> Jan 29 15:44:03 lips.local sean[68673]: hello ernie, it's 2016-01-29 15:44:03.669196559 -0600 CST"}]) diff --git a/src/desdemona/catalogs/sample_catalog.clj b/src/desdemona/catalogs/sample_catalog.clj index 8debc6d..afbe1a4 100644 --- a/src/desdemona/catalogs/sample_catalog.clj +++ b/src/desdemona/catalogs/sample_catalog.clj @@ -1,5 +1,5 @@ (ns desdemona.catalogs.sample-catalog - (:require [desdemona.functions.sample-functions :refer [format-line upper-case transform-segment-shape prepare-rows]])) + (:require [desdemona.functions.sample-functions :refer [format-line upper-case transform-segment-shape prepare-rows]])) ;;; Catalogs describe each task in a workflow. We use ;;; them for describing input and output sources, injecting parameters, @@ -8,32 +8,31 @@ (defn build-catalog ([] (build-catalog 5 50)) ([batch-size batch-timeout] - [ - {:onyx/name :format-line - :onyx/fn :desdemona.functions.sample-functions/format-line - :onyx/type :function - :onyx/batch-size batch-size - :onyx/batch-timeout batch-timeout - :onyx/doc "Strips the line of any leading or trailing whitespace"} + [{:onyx/name :format-line + :onyx/fn :desdemona.functions.sample-functions/format-line + :onyx/type :function + :onyx/batch-size batch-size + :onyx/batch-timeout batch-timeout + :onyx/doc "Strips the line of any leading or trailing whitespace"} - {:onyx/name :upper-case - :onyx/fn :desdemona.functions.sample-functions/upper-case - :onyx/type :function - :onyx/batch-size batch-size - :onyx/batch-timeout batch-timeout - :onyx/doc "Capitalizes the first letter of the line"} + {:onyx/name :upper-case + :onyx/fn :desdemona.functions.sample-functions/upper-case + :onyx/type :function + :onyx/batch-size batch-size + :onyx/batch-timeout batch-timeout + :onyx/doc "Capitalizes the first letter of the line"} - {:onyx/name :extract-line-info - :onyx/fn :desdemona.functions.sample-functions/transform-segment-shape - :onyx/type :function - :onyx/batch-size batch-size - :onyx/batch-timeout batch-timeout - :keypath {"line" [:line]} - :onyx/params [:keypath] - :onyx/doc "Extracts the line"} + {:onyx/name :extract-line-info + :onyx/fn :desdemona.functions.sample-functions/transform-segment-shape + :onyx/type :function + :onyx/batch-size batch-size + :onyx/batch-timeout batch-timeout + :keypath {"line" [:line]} + :onyx/params [:keypath] + :onyx/doc "Extracts the line"} - {:onyx/name :prepare-rows - :onyx/fn :desdemona.functions.sample-functions/prepare-rows - :onyx/type :function - :onyx/batch-size batch-size - :onyx/batch-timeout batch-timeout}])) + {:onyx/name :prepare-rows + :onyx/fn :desdemona.functions.sample-functions/prepare-rows + :onyx/type :function + :onyx/batch-size batch-size + :onyx/batch-timeout batch-timeout}])) diff --git a/src/desdemona/functions/sample_functions.clj b/src/desdemona/functions/sample_functions.clj index 525501a..07aca4b 100644 --- a/src/desdemona/functions/sample_functions.clj +++ b/src/desdemona/functions/sample_functions.clj @@ -1,7 +1,7 @@ (ns desdemona.functions.sample-functions (:require [clojure - [string :refer [capitalize trim]] - [walk :refer [postwalk]]])) + [string :refer [capitalize trim]] + [walk :refer [postwalk]]])) ;;; Defines functions to be used by the peers. These are located ;;; with fully qualified namespaced keywords, such as diff --git a/src/desdemona/jobs/sample_submit_job.clj b/src/desdemona/jobs/sample_submit_job.clj index c40bf19..c1298d7 100644 --- a/src/desdemona/jobs/sample_submit_job.clj +++ b/src/desdemona/jobs/sample_submit_job.clj @@ -1,15 +1,15 @@ (ns desdemona.jobs.sample-submit-job - (:require [desdemona.catalogs.sample-catalog :refer [build-catalog]] - [desdemona.tasks.kafka :refer [add-kafka-input add-kafka-output]] - [desdemona.tasks.core-async :refer [add-core-async-input add-core-async-output]] - [desdemona.tasks.sql :refer [add-sql-partition-input add-sql-insert-output]] - [desdemona.tasks.file-input :refer [add-seq-file-input]] - [desdemona.lifecycles.sample-lifecycle :refer [build-lifecycles]] - [desdemona.lifecycles.metrics :refer [add-metrics]] - [desdemona.lifecycles.logging :refer [add-logging]] - [desdemona.workflows.sample-workflow :refer [build-workflow]] - [aero.core :refer [read-config]] - [onyx.api])) + (:require [desdemona.catalogs.sample-catalog :refer [build-catalog]] + [desdemona.tasks.kafka :refer [add-kafka-input add-kafka-output]] + [desdemona.tasks.core-async :refer [add-core-async-input add-core-async-output]] + [desdemona.tasks.sql :refer [add-sql-partition-input add-sql-insert-output]] + [desdemona.tasks.file-input :refer [add-seq-file-input]] + [desdemona.lifecycles.sample-lifecycle :refer [build-lifecycles]] + [desdemona.lifecycles.metrics :refer [add-metrics]] + [desdemona.lifecycles.logging :refer [add-logging]] + [desdemona.workflows.sample-workflow :refer [build-workflow]] + [aero.core :refer [read-config]] + [onyx.api])) ;;;; ;; Lets build a job @@ -52,7 +52,7 @@ (let [config (read-config (clojure.java.io/resource "config.edn") {:profile :dev}) peer-config (get config :peer-config) job (build-job :prod)] - (println peer-config) - (println job) + (println peer-config) + (println job) (let [{:keys [job-id]} (onyx.api/submit-job peer-config job)] (println "Submitted job: " job-id)))) diff --git a/src/desdemona/launcher/aeron_media_driver.clj b/src/desdemona/launcher/aeron_media_driver.clj index 617a1b4..b6d1eea 100644 --- a/src/desdemona/launcher/aeron_media_driver.clj +++ b/src/desdemona/launcher/aeron_media_driver.clj @@ -28,11 +28,11 @@ (= threading-mode "dedicated") ThreadingMode/DEDICATED (= threading-mode "shared-network") ThreadingMode/SHARED_NETWORK) - _ (println "Starting media driver with threading mode:" threading-mode". Use -t to supply an alternative threading mode.") + _ (println "Starting media driver with threading mode:" threading-mode ". Use -t to supply an alternative threading mode.") ctx (cond-> (MediaDriver$Context.) threading-mode (.threadingMode threading-mode-obj) delete-dirs (.dirsDeleteOnStart delete-dirs)) media-driver (try (MediaDriver/launch ctx) - (catch IllegalStateException ise (throw (Exception. "Error starting media driver. This may be due to a media driver data incompatibility between versions. Check that no other media driver has been started and then use -d to delete the directory on startup" ise))))] + (catch IllegalStateException ise (throw (Exception. "Error starting media driver. This may be due to a media driver data incompatibility between versions. Check that no other media driver has been started and then use -d to delete the directory on startup" ise))))] (println "Launched the Media Driver. Blocking forever...") ( (:peer-config config) - (assoc :onyx.log/config {:appenders - {:standard-out - {:enabled? true - :async? false - :output-fn t/default-output-fn - :fn standard-out-logger}}})) - peer-group (onyx.api/start-peer-group peer-config) - env (onyx.api/start-env (:env-config config)) - peers (onyx.api/start-peers n-peers peer-group)] - (println "Attempting to connect to to Zookeeper: " (:zookeeper/address peer-config)) - (.addShutdownHook (Runtime/getRuntime) - (Thread. - (fn [] - (doseq [v-peer peers] - (onyx.api/shutdown-peer v-peer)) - (onyx.api/shutdown-peer-group peer-group) - (shutdown-agents)))) - (println "Started peers. Blocking forever.") + config (read-config (clojure.java.io/resource "config.edn") {:profile :default}) + peer-config (-> (:peer-config config) + (assoc :onyx.log/config {:appenders + {:standard-out + {:enabled? true + :async? false + :output-fn t/default-output-fn + :fn standard-out-logger}}})) + peer-group (onyx.api/start-peer-group peer-config) + env (onyx.api/start-env (:env-config config)) + peers (onyx.api/start-peers n-peers peer-group)] + (println "Attempting to connect to to Zookeeper: " (:zookeeper/address peer-config)) + (.addShutdownHook (Runtime/getRuntime) + (Thread. + (fn [] + (doseq [v-peer peers] + (onyx.api/shutdown-peer v-peer)) + (onyx.api/shutdown-peer-group peer-group) + (shutdown-agents)))) + (println "Started peers. Blocking forever.") ;; Block forever. - ( job - (update :catalog conj (merge {:onyx/name task - :onyx/plugin :onyx.plugin.sql/write-rows - :onyx/type :output - :onyx/medium :sql + (-> job + (update :catalog conj (merge {:onyx/name task + :onyx/plugin :onyx.plugin.sql/write-rows + :onyx/type :output + :onyx/medium :sql ;:onyx/batch-size batch-size ;:sql/classname classname ;:sql/subprotocol subprotocol @@ -46,7 +46,7 @@ ;:sql/user user ;:sql/password password ;:sql/table table-name - :onyx/doc "Writes segments from the :rows keys to the SQL database"} - opts)) - (update :lifecycles conj {:lifecycle/task task - :lifecycle/calls :onyx.plugin.sql/write-rows-calls}))) + :onyx/doc "Writes segments from the :rows keys to the SQL database"} + opts)) + (update :lifecycles conj {:lifecycle/task task + :lifecycle/calls :onyx.plugin.sql/write-rows-calls}))) diff --git a/test/desdemona/jobs/sample_job_test.clj b/test/desdemona/jobs/sample_job_test.clj index a138843..bdd1ea1 100644 --- a/test/desdemona/jobs/sample_job_test.clj +++ b/test/desdemona/jobs/sample_job_test.clj @@ -1,28 +1,28 @@ (ns desdemona.jobs.sample-job-test (:require [clojure.test :refer [deftest is]] - [onyx api - [test-helper :refer [feedback-exception! validate-enough-peers! load-config with-test-env]]] - [desdemona.jobs.sample-submit-job :refer [build-job]] - [desdemona.tasks.core-async :refer [get-core-async-channels]] - [onyx.plugin.core-async :refer [take-segments!]] + [onyx api + [test-helper :refer [feedback-exception! validate-enough-peers! load-config with-test-env]]] + [desdemona.jobs.sample-submit-job :refer [build-job]] + [desdemona.tasks.core-async :refer [get-core-async-channels]] + [onyx.plugin.core-async :refer [take-segments!]] ; Make the plugins load - [onyx.plugin.kafka] - [onyx.plugin.seq] - [onyx.plugin.sql])) + [onyx.plugin.kafka] + [onyx.plugin.seq] + [onyx.plugin.sql])) (deftest onyx-dev-job-test (let [id (java.util.UUID/randomUUID) - config (load-config) - env-config (assoc (:env-config config) :onyx/id id) - peer-config (assoc (:peer-config config) :onyx/id id)] + config (load-config) + env-config (assoc (:env-config config) :onyx/id id) + peer-config (assoc (:peer-config config) :onyx/id id)] ;; Be sure to set the peer count (5 here) to a number greater than ;; the amount of tasks in your job. - (with-test-env [test-env [5 env-config peer-config]] - (let [job (build-job :dev) - {:keys [write-lines]} (get-core-async-channels job) - _ (validate-enough-peers! test-env job) - {:keys [job-id]} (onyx.api/submit-job peer-config job)] - (feedback-exception! peer-config job-id) - (let [results (take-segments! write-lines)] - (is (= 4 (count results))) - (is (= :done (last results)))))))) + (with-test-env [test-env [5 env-config peer-config]] + (let [job (build-job :dev) + {:keys [write-lines]} (get-core-async-channels job) + _ (validate-enough-peers! test-env job) + {:keys [job-id]} (onyx.api/submit-job peer-config job)] + (feedback-exception! peer-config job-id) + (let [results (take-segments! write-lines)] + (is (= 4 (count results))) + (is (= :done (last results)))))))) From 25aba722c7738e428f58c5e44b2ecce1754d14e8 Mon Sep 17 00:00:00 2001 From: Sean Schulte Date: Wed, 3 Feb 2016 10:17:58 -0600 Subject: [PATCH 03/17] PR feedback --- docker-compose.yml | 6 ------ env/dev/desdemona/dev_inputs/sample_input.clj | 3 --- 2 files changed, 9 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 09d8b58..0b200b0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -44,9 +44,3 @@ syslog-ng: - ./script/syslog-ng/syslog-ng.conf:/etc/syslog-ng/syslog-ng.conf ports: - "601:601" -#kafkacat: -# build: script/kafka-meetup-streamer -# links: -# - kafka:kafka -# environment: -# BROKER_LIST: kafka:9092 diff --git a/env/dev/desdemona/dev_inputs/sample_input.clj b/env/dev/desdemona/dev_inputs/sample_input.clj index 33205ce..127a022 100644 --- a/env/dev/desdemona/dev_inputs/sample_input.clj +++ b/env/dev/desdemona/dev_inputs/sample_input.clj @@ -1,8 +1,5 @@ (ns desdemona.dev-inputs.sample-input) -;; Story from http://textfiles.com/stories/advtthum.txt -;; Warning, I haven't read it. :) - (def lines [{:line "2016-01-29T21:44:01+00:00 192.168.99.1 102 <1> Jan 29 15:44:01 lips.local sean[68673]: hello ernie, it's 2016-01-29 15:44:01.150100217 -0600 CST"} {:line ""} From abb11d61003435db4d7d96de5efad4a4e20ebee6 Mon Sep 17 00:00:00 2001 From: Sean Schulte Date: Fri, 5 Feb 2016 11:10:09 -0600 Subject: [PATCH 04/17] removed a tab (the only one I could find) --- src/desdemona/launcher/launch_prod_peers.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/desdemona/launcher/launch_prod_peers.clj b/src/desdemona/launcher/launch_prod_peers.clj index 9afa255..db87790 100644 --- a/src/desdemona/launcher/launch_prod_peers.clj +++ b/src/desdemona/launcher/launch_prod_peers.clj @@ -39,5 +39,5 @@ (onyx.api/shutdown-peer-group peer-group) (shutdown-agents)))) (println "Started peers. Blocking forever.") - ;; Block forever. + ;; Block forever. ( Date: Fri, 5 Feb 2016 12:08:45 -0600 Subject: [PATCH 05/17] eclint --- project.clj | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/project.clj b/project.clj index f2269a5..58a392a 100644 --- a/project.clj +++ b/project.clj @@ -4,14 +4,14 @@ :license {:name "Eclipse Public License" :url "http://www.eclipse.org/legal/epl-v10.html"} :dependencies [[org.clojure/clojure "1.8.0"] - [org.onyxplatform/onyx "0.8.8"] - [org.onyxplatform/onyx-sql "0.8.8.0"] - [mysql/mysql-connector-java "5.1.18"] - [org.onyxplatform/onyx-kafka "0.8.8.0"] - [org.onyxplatform/onyx-seq "0.8.8.0"] - [cheshire "5.5.0"] - [org.clojure/tools.cli "0.3.3"] - [aero "0.1.3"]] + [org.onyxplatform/onyx "0.8.8"] + [org.onyxplatform/onyx-sql "0.8.8.0"] + [mysql/mysql-connector-java "5.1.18"] + [org.onyxplatform/onyx-kafka "0.8.8.0"] + [org.onyxplatform/onyx-seq "0.8.8.0"] + [cheshire "5.5.0"] + [org.clojure/tools.cli "0.3.3"] + [aero "0.1.3"]] :plugins [[lein-cljfmt "0.3.0"] [lein-cloverage "1.0.7-SNAPSHOT"] [lein-kibit "0.1.2"] From 9202c7677ae4aa548ea14cf473f15d2479ef933e Mon Sep 17 00:00:00 2001 From: Sean Schulte Date: Fri, 5 Feb 2016 12:22:16 -0600 Subject: [PATCH 06/17] kibit --- src/desdemona/launcher/launch_prod_peers.clj | 18 ++++++++++-------- src/desdemona/lifecycles/logging.clj | 9 ++++++--- src/desdemona/lifecycles/metrics.clj | 12 ++++++++---- 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/src/desdemona/launcher/launch_prod_peers.clj b/src/desdemona/launcher/launch_prod_peers.clj index db87790..4e210b2 100644 --- a/src/desdemona/launcher/launch_prod_peers.clj +++ b/src/desdemona/launcher/launch_prod_peers.clj @@ -20,17 +20,19 @@ (defn -main [n & args] (let [n-peers (Integer/parseInt n) config (read-config (clojure.java.io/resource "config.edn") {:profile :default}) - peer-config (-> (:peer-config config) - (assoc :onyx.log/config {:appenders - {:standard-out - {:enabled? true - :async? false - :output-fn t/default-output-fn - :fn standard-out-logger}}})) + (assoc + (:peer-config config) + :onyx.log/config + {:appenders + {:standard-out + {:enabled? true, + :async? false, + :output-fn t/default-output-fn, + :fn standard-out-logger}}}) peer-group (onyx.api/start-peer-group peer-config) env (onyx.api/start-env (:env-config config)) peers (onyx.api/start-peers n-peers peer-group)] - (println "Attempting to connect to to Zookeeper: " (:zookeeper/address peer-config)) + (println "Attempting to connect to Zookeeper: " (:zookeeper/address peer-config)) (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] diff --git a/src/desdemona/lifecycles/logging.clj b/src/desdemona/lifecycles/logging.clj index e015b46..42a0725 100644 --- a/src/desdemona/lifecycles/logging.clj +++ b/src/desdemona/lifecycles/logging.clj @@ -14,6 +14,9 @@ "Add's logging output to a tasks output-batch. " [job task] (if-let [entry (first (filter #(= (:onyx/name %) task) (:catalog job)))] - (-> job - (update-in [:lifecycles] conj {:lifecycle/task task - :lifecycle/calls ::log-calls})))) + (update-in + job + [:lifecycles] + conj + {:lifecycle/task task, + :lifecycle/calls :desdemona.lifecycles.logging/log-calls}))) diff --git a/src/desdemona/lifecycles/metrics.clj b/src/desdemona/lifecycles/metrics.clj index 59281b4..2132f49 100644 --- a/src/desdemona/lifecycles/metrics.clj +++ b/src/desdemona/lifecycles/metrics.clj @@ -3,7 +3,11 @@ (defn add-metrics "Add's throughput and latency metrics to a task" [job task opts] - (-> job - (update-in [:lifecycles] conj (merge {:lifecycle/task task ; Or :all for all tasks in the workflow - :lifecycle/calls :onyx.lifecycle.metrics.metrics/calls} - opts)))) + (update-in + job + [:lifecycles] + conj + (merge + {:lifecycle/task task, + :lifecycle/calls :onyx.lifecycle.metrics.metrics/calls} + opts))) From 41655e1632bad02cdd69bce975861836732514ce Mon Sep 17 00:00:00 2001 From: Sean Schulte Date: Fri, 5 Feb 2016 12:29:04 -0600 Subject: [PATCH 07/17] eastwood --- src/desdemona/launcher/launch_prod_peers.clj | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/desdemona/launcher/launch_prod_peers.clj b/src/desdemona/launcher/launch_prod_peers.clj index 4e210b2..8b28030 100644 --- a/src/desdemona/launcher/launch_prod_peers.clj +++ b/src/desdemona/launcher/launch_prod_peers.clj @@ -20,15 +20,15 @@ (defn -main [n & args] (let [n-peers (Integer/parseInt n) config (read-config (clojure.java.io/resource "config.edn") {:profile :default}) - (assoc - (:peer-config config) - :onyx.log/config - {:appenders - {:standard-out - {:enabled? true, - :async? false, - :output-fn t/default-output-fn, - :fn standard-out-logger}}}) + peer-config (assoc + (:peer-config config) + :onyx.log/config + {:appenders + {:standard-out + {:enabled? true, + :async? false, + :output-fn t/default-output-fn, + :fn standard-out-logger}}}) peer-group (onyx.api/start-peer-group peer-config) env (onyx.api/start-env (:env-config config)) peers (onyx.api/start-peers n-peers peer-group)] From e70657f325b19f6a14342e17ca069e91dbc34171 Mon Sep 17 00:00:00 2001 From: Sean Schulte Date: Mon, 8 Feb 2016 12:15:34 -0600 Subject: [PATCH 08/17] Adding some unit tests for a few of the functions we actually use. --- .../functions/sample_functions_test.clj | 13 +++++++++++++ test/desdemona/jobs/sample_job_test.clj | 6 +++--- test/desdemona/tasks/kafka_test.clj | 8 ++++++++ .../workflows/sample_workflow_test.clj | 17 +++++++++++++++++ 4 files changed, 41 insertions(+), 3 deletions(-) create mode 100644 test/desdemona/functions/sample_functions_test.clj create mode 100644 test/desdemona/tasks/kafka_test.clj create mode 100644 test/desdemona/workflows/sample_workflow_test.clj diff --git a/test/desdemona/functions/sample_functions_test.clj b/test/desdemona/functions/sample_functions_test.clj new file mode 100644 index 0000000..66b6052 --- /dev/null +++ b/test/desdemona/functions/sample_functions_test.clj @@ -0,0 +1,13 @@ +(ns desdemona.functions.sample-functions-test + (:require [clojure.test :refer [deftest is]] + [desdemona.functions.sample-functions :refer [transform-segment-shape prepare-rows]])) + +(deftest transform-segment-shape-test + (let [got (transform-segment-shape {"line" [:line]} {:line "this is a log line"}) + expected {"line" "this is a log line"}] + (is (= got expected)))) + +(deftest prepare-rows-test + (let [got (prepare-rows {"line" "this is a log line"}) + expected {"rows" {"line" "this is a log line"}}] + (is (= got expected)))) diff --git a/test/desdemona/jobs/sample_job_test.clj b/test/desdemona/jobs/sample_job_test.clj index bdd1ea1..a784f6a 100644 --- a/test/desdemona/jobs/sample_job_test.clj +++ b/test/desdemona/jobs/sample_job_test.clj @@ -5,7 +5,7 @@ [desdemona.jobs.sample-submit-job :refer [build-job]] [desdemona.tasks.core-async :refer [get-core-async-channels]] [onyx.plugin.core-async :refer [take-segments!]] - ; Make the plugins load + ; Make the plugins load [onyx.plugin.kafka] [onyx.plugin.seq] [onyx.plugin.sql])) @@ -15,8 +15,8 @@ config (load-config) env-config (assoc (:env-config config) :onyx/id id) peer-config (assoc (:peer-config config) :onyx/id id)] - ;; Be sure to set the peer count (5 here) to a number greater than - ;; the amount of tasks in your job. + ;; Be sure to set the peer count (5 here) to a number greater than + ;; the amount of tasks in your job. (with-test-env [test-env [5 env-config peer-config]] (let [job (build-job :dev) {:keys [write-lines]} (get-core-async-channels job) diff --git a/test/desdemona/tasks/kafka_test.clj b/test/desdemona/tasks/kafka_test.clj new file mode 100644 index 0000000..1de565c --- /dev/null +++ b/test/desdemona/tasks/kafka_test.clj @@ -0,0 +1,8 @@ +(ns desdemona.tasks.kafka-test + (:require [clojure.test :refer [deftest is]] + [desdemona.tasks.kafka :refer [deserialize-message-raw]])) + +(deftest deserialize-message-raw-test + (let [got (deserialize-message-raw (.getBytes "this is raw text")) + expected {:line "this is raw text"}] + (is (= got expected)))) diff --git a/test/desdemona/workflows/sample_workflow_test.clj b/test/desdemona/workflows/sample_workflow_test.clj new file mode 100644 index 0000000..98c7d6e --- /dev/null +++ b/test/desdemona/workflows/sample_workflow_test.clj @@ -0,0 +1,17 @@ +(ns desdemona.workflows.sample-workflow-test + (:require [clojure.test :refer [deftest is]] + [desdemona.workflows.sample-workflow :refer [build-workflow]])) + +(deftest dev-workflow-test + (let [expected [[:read-lines :extract-line-info] + [:extract-line-info :prepare-rows] + [:prepare-rows :write-lines]] + got (build-workflow {:mode :dev})] + (is (= got expected)))) + +(deftest prod-workflow-test + (let [expected [[:read-lines :extract-line-info] + [:extract-line-info :prepare-rows] + [:prepare-rows :write-lines]] + got (build-workflow {:mode :prod})] + (is (= got expected)))) From cc7275129a6ca6c889d81e021dc2e8d6ae9a66a7 Mon Sep 17 00:00:00 2001 From: Sean Schulte Date: Mon, 8 Feb 2016 13:31:17 -0600 Subject: [PATCH 09/17] If I could read I would have known that this test was failing. --- test/desdemona/functions/sample_functions_test.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/desdemona/functions/sample_functions_test.clj b/test/desdemona/functions/sample_functions_test.clj index 66b6052..6ff0f11 100644 --- a/test/desdemona/functions/sample_functions_test.clj +++ b/test/desdemona/functions/sample_functions_test.clj @@ -9,5 +9,5 @@ (deftest prepare-rows-test (let [got (prepare-rows {"line" "this is a log line"}) - expected {"rows" {"line" "this is a log line"}}] + expected {:rows [{"line" "this is a log line"}]}] (is (= got expected)))) From e522e0814644c77897fd50c4cf42c02ac26ac109 Mon Sep 17 00:00:00 2001 From: Sean Schulte Date: Mon, 8 Feb 2016 14:36:13 -0600 Subject: [PATCH 10/17] Updated README --- README.md | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/README.md b/README.md index b94d8c8..2cd4c8a 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,45 @@ You can launch a sample job as follows: (desdemona.jobs.sample-submit-job/submit-job user/system) ``` +### Using Docker Compose + +Start the cluster: + +``` +docker-compose up +``` + +Wait until it's all started. It should say this and then wait: + +``` +peer_1 | Started peers. Blocking forever. +``` + +Make sure you create the Kafka topic: + +``` +docker run --rm -it --link desdemona_kafka_1:kafka1 kafka bash -c "\$KAFKA_HOME/bin/kafka-console-producer.sh --topic test1 --broker-list=kafka1:9092" +``` + +And the MySQL database: + +``` +docker run -it --link desdemona_db_1:mysql --rm mysql sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"' +``` + +``` +use logs; +CREATE TABLE logLines (id int primary key auto_increment, line text); +``` + +Now you can submit the job: + +``` +ZOOKEEPER=$(echo $DOCKER_HOST|cut -d ':' -f 2|sed "s/\/\///g") lein run -m desdemona.jobs.sample-submit-job +``` + +Anything you send to syslog on that Docker host (there's a syslog-ng relay running as a container) will appear in MySQL. + ### Production Mode Peers First start the Aeron media driver, which should be used in production From 9598b6ffdcda6acef6604e25756baca1c8474453 Mon Sep 17 00:00:00 2001 From: Sean Schulte Date: Mon, 8 Feb 2016 14:36:21 -0600 Subject: [PATCH 11/17] Deleting some unused code from the template --- src/desdemona/catalogs/sample_catalog.clj | 18 ++---------- .../sample_flow_conditions.clj | 29 +------------------ src/desdemona/functions/sample_functions.clj | 13 --------- 3 files changed, 3 insertions(+), 57 deletions(-) diff --git a/src/desdemona/catalogs/sample_catalog.clj b/src/desdemona/catalogs/sample_catalog.clj index afbe1a4..8d7a916 100644 --- a/src/desdemona/catalogs/sample_catalog.clj +++ b/src/desdemona/catalogs/sample_catalog.clj @@ -1,5 +1,5 @@ (ns desdemona.catalogs.sample-catalog - (:require [desdemona.functions.sample-functions :refer [format-line upper-case transform-segment-shape prepare-rows]])) + (:require [desdemona.functions.sample-functions :refer [transform-segment-shape prepare-rows]])) ;;; Catalogs describe each task in a workflow. We use ;;; them for describing input and output sources, injecting parameters, @@ -8,21 +8,7 @@ (defn build-catalog ([] (build-catalog 5 50)) ([batch-size batch-timeout] - [{:onyx/name :format-line - :onyx/fn :desdemona.functions.sample-functions/format-line - :onyx/type :function - :onyx/batch-size batch-size - :onyx/batch-timeout batch-timeout - :onyx/doc "Strips the line of any leading or trailing whitespace"} - - {:onyx/name :upper-case - :onyx/fn :desdemona.functions.sample-functions/upper-case - :onyx/type :function - :onyx/batch-size batch-size - :onyx/batch-timeout batch-timeout - :onyx/doc "Capitalizes the first letter of the line"} - - {:onyx/name :extract-line-info + [{:onyx/name :extract-line-info :onyx/fn :desdemona.functions.sample-functions/transform-segment-shape :onyx/type :function :onyx/batch-size batch-size diff --git a/src/desdemona/flow_conditions/sample_flow_conditions.clj b/src/desdemona/flow_conditions/sample_flow_conditions.clj index 47adc7b..375f033 100644 --- a/src/desdemona/flow_conditions/sample_flow_conditions.clj +++ b/src/desdemona/flow_conditions/sample_flow_conditions.clj @@ -1,31 +1,4 @@ (ns desdemona.flow-conditions.sample-flow-conditions) (def flow-conditions - [{:flow/from :upper-case - :flow/to [:write-lines] - :flow/short-circuit? true - :flow/thrown-exception? true - :flow/post-transform :desdemona.flow-conditions.sample-flow-conditions/substitute-segment - :flow/predicate :desdemona.flow-conditions.sample-flow-conditions/npe? - :flow/doc "Send a canned value if this segment threw a NullPointerException."} - {:flow/from :format-line - :flow/to [:upper-case] - :param/disallow-char \B - :param/max-line-length 60 - :flow/predicate [:and - [:not [:desdemona.flow-conditions.sample-flow-conditions/starts-with? :param/disallow-char]] - [:desdemona.flow-conditions.sample-flow-conditions/within-length? :param/max-line-length]] - :flow/doc "Output the line if it doesn't start with 'B' and is less than 60 characters"}]) - -(defn starts-with? [event old {:keys [line]} all-new disallowed-char] - (= (first line) disallowed-char)) - -(defn within-length? [event old {:keys [line]} all-new max-length] - (<= (count line) max-length)) - -(defn npe? [event old ex all-new] - (= (class ex) java.lang.NullPointerException)) - -(defn substitute-segment [event ex] - {:line "<<< Blank line was here >>>"}) - + []) diff --git a/src/desdemona/functions/sample_functions.clj b/src/desdemona/functions/sample_functions.clj index 07aca4b..d74bb2b 100644 --- a/src/desdemona/functions/sample_functions.clj +++ b/src/desdemona/functions/sample_functions.clj @@ -1,21 +1,11 @@ (ns desdemona.functions.sample-functions (:require [clojure - [string :refer [capitalize trim]] [walk :refer [postwalk]]])) ;;; Defines functions to be used by the peers. These are located ;;; with fully qualified namespaced keywords, such as ;;; desdemona.functions.sample-functions/format-line -(defn format-line [segment] - (update-in segment [:line] trim)) - -(defn upper-case [{:keys [line] :as segment}] - (if (seq line) - (let [upper-cased (apply str (capitalize (first line)) (rest line))] - (assoc-in segment [:line] upper-cased)) - segment)) - (defn transform-segment-shape "Recursively restructures a segment {:new-key [paths...]}" [paths segment] @@ -27,8 +17,5 @@ (catch Exception e segment))) -(defn get-in-segment [keypath segment] - (get-in segment keypath)) - (defn prepare-rows [segment] {:rows [segment]}) From 142faa7642035150a5130628f02aab5becdb8eb4 Mon Sep 17 00:00:00 2001 From: Sean Schulte Date: Mon, 8 Feb 2016 16:10:36 -0600 Subject: [PATCH 12/17] Removing unused script. --- script/run-container.sh | 15 --------------- 1 file changed, 15 deletions(-) delete mode 100644 script/run-container.sh diff --git a/script/run-container.sh b/script/run-container.sh deleted file mode 100644 index e2ec7a6..0000000 --- a/script/run-container.sh +++ /dev/null @@ -1,15 +0,0 @@ -#!/usr/bin/env bash -set -o errexit -set -o nounset -set -o xtrace - -if [[ "$#" -ne 2 ]]; then - echo "Usage: $0 onyx-id n-peers" - echo "Example: $0 3424312384i3423 4" - exit 1 -fi - -ONYX_ID=$1 -NPEERS=$2 - -docker run --privileged -e ONYX_ID=$ONYX_ID -e NPEERS=$NPEERS -d desdemona:0.1.0 From b3169d2cee4556d0fdbf9045d26044dbf25c659b Mon Sep 17 00:00:00 2001 From: Sean Schulte Date: Tue, 9 Feb 2016 12:47:47 -0600 Subject: [PATCH 13/17] Removed dev/prod modes, always running inside Docker. --- src/desdemona/jobs/sample_submit_job.clj | 54 ++++++++----------- src/desdemona/lifecycles/metrics.clj | 6 +-- src/desdemona/lifecycles/sample_lifecycle.clj | 1 - src/desdemona/workflows/sample_workflow.clj | 9 +--- test/desdemona/jobs/sample_job_test.clj | 41 +++++++------- .../workflows/sample_workflow_test.clj | 11 +--- 6 files changed, 49 insertions(+), 73 deletions(-) diff --git a/src/desdemona/jobs/sample_submit_job.clj b/src/desdemona/jobs/sample_submit_job.clj index c1298d7..9fe4433 100644 --- a/src/desdemona/jobs/sample_submit_job.clj +++ b/src/desdemona/jobs/sample_submit_job.clj @@ -13,46 +13,36 @@ ;;;; ;; Lets build a job -;; Depending on the mode, the job is built up in a different way -;; When :dev mode, onyx-seq will be used as an input, with the meetup data being -;; included in the onyx-seq lifecycle for easy access -;; core.async is then added as an output task -;; -;; When using :prod mode, kafka is added as an input, and onyx-sql is used as the output +;; Since we always run in Docker Compose, kafka is added as an input, and onyx-sql is used as the output -(defn build-job [mode] +(defn build-job [] (let [batch-size 1 batch-timeout 1000 base-job {:catalog (build-catalog batch-size batch-timeout) - :lifecycles (build-lifecycles {:mode mode}) - :workflow (build-workflow {:mode mode}) + :lifecycles (build-lifecycles) + :workflow (build-workflow) :task-scheduler :onyx.task-scheduler/balanced}] - (cond-> base-job - (= :dev mode) (add-core-async-output :write-lines {:onyx/batch-size batch-size}) - (= :dev mode) (add-seq-file-input :read-lines {:onyx/batch-size batch-size - :filename "resources/sample_input.edn"}) - (= :prod mode) (add-kafka-input :read-lines {:onyx/batch-size batch-size - :onyx/max-peers 1 - :kafka/topic "test1" - :kafka/group-id "onyx-consumer" - :kafka/zookeeper "zk:2181" - :kafka/deserializer-fn :desdemona.tasks.kafka/deserialize-message-raw - :kafka/offset-reset :smallest}) - (= :prod mode) (add-sql-insert-output :write-lines {:onyx/batch-size batch-size - :sql/classname "com.mysql.jdbc.Driver" - :sql/subprotocol "mysql" - :sql/subname "//db:3306/logs" - :sql/user "onyx" - :sql/password "onyx" - :sql/table :logLines}) - true (add-logging :read-lines) - true (add-logging :write-lines)))) + (-> base-job + (add-kafka-input :read-lines {:onyx/batch-size batch-size + :onyx/max-peers 1 + :kafka/topic "test1" + :kafka/group-id "onyx-consumer" + :kafka/zookeeper "zk:2181" + :kafka/deserializer-fn :desdemona.tasks.kafka/deserialize-message-raw + :kafka/offset-reset :smallest}) + (add-sql-insert-output :write-lines {:onyx/batch-size batch-size + :sql/classname "com.mysql.jdbc.Driver" + :sql/subprotocol "mysql" + :sql/subname "//db:3306/logs" + :sql/user "onyx" + :sql/password "onyx" + :sql/table :logLines}) + (add-logging :read-lines) + (add-logging :write-lines)))) (defn -main [& args] (let [config (read-config (clojure.java.io/resource "config.edn") {:profile :dev}) peer-config (get config :peer-config) - job (build-job :prod)] - (println peer-config) - (println job) + job (build-job)] (let [{:keys [job-id]} (onyx.api/submit-job peer-config job)] (println "Submitted job: " job-id)))) diff --git a/src/desdemona/lifecycles/metrics.clj b/src/desdemona/lifecycles/metrics.clj index 2132f49..e43328f 100644 --- a/src/desdemona/lifecycles/metrics.clj +++ b/src/desdemona/lifecycles/metrics.clj @@ -3,11 +3,11 @@ (defn add-metrics "Add's throughput and latency metrics to a task" [job task opts] - (update-in + (update job - [:lifecycles] + :lifecycles conj (merge - {:lifecycle/task task, + {:lifecycle/task task :lifecycle/calls :onyx.lifecycle.metrics.metrics/calls} opts))) diff --git a/src/desdemona/lifecycles/sample_lifecycle.clj b/src/desdemona/lifecycles/sample_lifecycle.clj index f029b09..56dae1c 100644 --- a/src/desdemona/lifecycles/sample_lifecycle.clj +++ b/src/desdemona/lifecycles/sample_lifecycle.clj @@ -2,5 +2,4 @@ (defn build-lifecycles "Put your environment-independent lifecycles here" - [ctx] []) diff --git a/src/desdemona/workflows/sample_workflow.clj b/src/desdemona/workflows/sample_workflow.clj index 61fdc96..68c359d 100644 --- a/src/desdemona/workflows/sample_workflow.clj +++ b/src/desdemona/workflows/sample_workflow.clj @@ -5,14 +5,7 @@ (defmulti build-workflow :mode) -(defmethod build-workflow :dev - [ctx] - [[:read-lines :extract-line-info] - [:extract-line-info :prepare-rows] - [:prepare-rows :write-lines]]) - -(defmethod build-workflow :prod - [ctx] +(defn build-workflow [] [[:read-lines :extract-line-info] [:extract-line-info :prepare-rows] [:prepare-rows :write-lines]]) diff --git a/test/desdemona/jobs/sample_job_test.clj b/test/desdemona/jobs/sample_job_test.clj index a784f6a..0a78ad8 100644 --- a/test/desdemona/jobs/sample_job_test.clj +++ b/test/desdemona/jobs/sample_job_test.clj @@ -1,28 +1,29 @@ (ns desdemona.jobs.sample-job-test (:require [clojure.test :refer [deftest is]] - [onyx api - [test-helper :refer [feedback-exception! validate-enough-peers! load-config with-test-env]]] [desdemona.jobs.sample-submit-job :refer [build-job]] - [desdemona.tasks.core-async :refer [get-core-async-channels]] - [onyx.plugin.core-async :refer [take-segments!]] ; Make the plugins load [onyx.plugin.kafka] [onyx.plugin.seq] [onyx.plugin.sql])) -(deftest onyx-dev-job-test - (let [id (java.util.UUID/randomUUID) - config (load-config) - env-config (assoc (:env-config config) :onyx/id id) - peer-config (assoc (:peer-config config) :onyx/id id)] - ;; Be sure to set the peer count (5 here) to a number greater than - ;; the amount of tasks in your job. - (with-test-env [test-env [5 env-config peer-config]] - (let [job (build-job :dev) - {:keys [write-lines]} (get-core-async-channels job) - _ (validate-enough-peers! test-env job) - {:keys [job-id]} (onyx.api/submit-job peer-config job)] - (feedback-exception! peer-config job-id) - (let [results (take-segments! write-lines)] - (is (= 4 (count results))) - (is (= :done (last results)))))))) +(defn by-name [catalog name] + (first (filter (fn [x] (= name (x :onyx/name))) catalog))) + +(deftest build-job-test + (let [job (build-job) + expected-catalog-names [:extract-line-info :prepare-rows :read-lines :write-lines] + catalog (job :catalog) + workflow (job :workflow) + expected-workflow [[:read-lines :extract-line-info] + [:extract-line-info :prepare-rows] + [:prepare-rows :write-lines]] + lifecycles (job :lifecycles) + expected-lifecycles [{:lifecycle/task :write-lines :lifecycle/calls :desdemona.lifecycles.logging/log-calls} + {:lifecycle/task :read-lines :lifecycle/calls :desdemona.lifecycles.logging/log-calls} + {:lifecycle/task :write-lines :lifecycle/calls :onyx.plugin.sql/write-rows-calls} + {:lifecycle/task :read-lines, :lifecycle/calls :onyx.plugin.kafka/read-messages-calls}]] + (is (= (map :onyx/name catalog) expected-catalog-names)) + (is (= ((by-name catalog :read-lines) :kafka/topic) "test1")) + (is (= ((by-name catalog :write-lines) :sql/table) :logLines)) + (is (= workflow expected-workflow)) + (is (= lifecycles expected-lifecycles)))) diff --git a/test/desdemona/workflows/sample_workflow_test.clj b/test/desdemona/workflows/sample_workflow_test.clj index 98c7d6e..807b56a 100644 --- a/test/desdemona/workflows/sample_workflow_test.clj +++ b/test/desdemona/workflows/sample_workflow_test.clj @@ -2,16 +2,9 @@ (:require [clojure.test :refer [deftest is]] [desdemona.workflows.sample-workflow :refer [build-workflow]])) -(deftest dev-workflow-test +(deftest build-workflow-test (let [expected [[:read-lines :extract-line-info] [:extract-line-info :prepare-rows] [:prepare-rows :write-lines]] - got (build-workflow {:mode :dev})] - (is (= got expected)))) - -(deftest prod-workflow-test - (let [expected [[:read-lines :extract-line-info] - [:extract-line-info :prepare-rows] - [:prepare-rows :write-lines]] - got (build-workflow {:mode :prod})] + got (build-workflow)] (is (= got expected)))) From c54da58d2dc6f561a70509806c2589987f2747a0 Mon Sep 17 00:00:00 2001 From: Sean Schulte Date: Tue, 9 Feb 2016 12:50:14 -0600 Subject: [PATCH 14/17] Don't need to define build-workflow twice. --- src/desdemona/workflows/sample_workflow.clj | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/desdemona/workflows/sample_workflow.clj b/src/desdemona/workflows/sample_workflow.clj index 68c359d..2f30771 100644 --- a/src/desdemona/workflows/sample_workflow.clj +++ b/src/desdemona/workflows/sample_workflow.clj @@ -3,8 +3,6 @@ ;;; The workflow of an Onyx job describes the graph of all possible ;;; tasks that data can flow between. -(defmulti build-workflow :mode) - (defn build-workflow [] [[:read-lines :extract-line-info] [:extract-line-info :prepare-rows] From 93e9418458aea32da8c99a1e29a0eea74b9768af Mon Sep 17 00:00:00 2001 From: Sean Schulte Date: Tue, 9 Feb 2016 13:16:55 -0600 Subject: [PATCH 15/17] Removing a bunch of unused utils --- src/desdemona/jobs/sample_submit_job.clj | 2 - src/desdemona/utils.clj | 131 +---------------------- 2 files changed, 4 insertions(+), 129 deletions(-) diff --git a/src/desdemona/jobs/sample_submit_job.clj b/src/desdemona/jobs/sample_submit_job.clj index 9fe4433..5960c80 100644 --- a/src/desdemona/jobs/sample_submit_job.clj +++ b/src/desdemona/jobs/sample_submit_job.clj @@ -1,9 +1,7 @@ (ns desdemona.jobs.sample-submit-job (:require [desdemona.catalogs.sample-catalog :refer [build-catalog]] [desdemona.tasks.kafka :refer [add-kafka-input add-kafka-output]] - [desdemona.tasks.core-async :refer [add-core-async-input add-core-async-output]] [desdemona.tasks.sql :refer [add-sql-partition-input add-sql-insert-output]] - [desdemona.tasks.file-input :refer [add-seq-file-input]] [desdemona.lifecycles.sample-lifecycle :refer [build-lifecycles]] [desdemona.lifecycles.metrics :refer [add-metrics]] [desdemona.lifecycles.logging :refer [add-logging]] diff --git a/src/desdemona/utils.clj b/src/desdemona/utils.clj index a30f3bc..29dce09 100644 --- a/src/desdemona/utils.clj +++ b/src/desdemona/utils.clj @@ -1,8 +1,6 @@ (ns desdemona.utils - (:require [clojure.test :refer [is]] - [clojure.core.async :refer [chan sliding-buffer >!!]] - [clojure.java.io :refer [resource]] - [onyx.plugin.core-async :refer [take-segments!]])) + (:require [clojure.core.async :refer [chan sliding-buffer >!!]] + [clojure.java.io :refer [resource]])) ;;;; Test utils ;;;; @@ -12,12 +10,6 @@ (def zk-str (str zk-address ":" zk-port)) -(defn only [coll] - (assert (not (next coll))) - (if-let [result (first coll)] - result - (assert false))) - (defn find-task "Finds the catalog entry where the :onyx/name key equals task-name" [catalog task-name] @@ -27,27 +19,6 @@ {:catalog catalog :task-name task-name}))) (first matches))) -(defn update-task - "Finds the catalog entry with :onyx/name task-name - and applies f to it, returning the full catalog with the - transformed catalog entry" - [catalog task-name f] - (mapv (fn [entry] - (if (= task-name (:onyx/name entry)) - (f entry) - entry)) - catalog)) - -(defn add-to-job - "Adds to the catalog and lifecycles of a job in form - {:workflow ... - :catalog ... - :lifecycles ...}" - [job {:keys [catalog lifecycles]}] - (-> job - (update :catalog into catalog) - (update :lifecycles into lifecycles))) - (defn n-peers "Takes a workflow and catalog, returns the minimum number of peers needed to execute this job." @@ -58,52 +29,6 @@ (+ sum (or (:onyx/min-peers (find-task catalog t)) 1))) 0 task-set))) -(defn segments-equal? - "Onyx is a parallel, distributed system - so ordering isn't guaranteed. - Does an unordered comparison of segments to check for equality." - [expected actual] - (is (= (set expected) (set (remove (partial = :done) actual)))) - (is (= :done (last actual))) - (is (= (dec (count actual)) (count expected)))) - -(defn load-peer-config [onyx-id] - (assoc (-> "dev-peer-config.edn" resource slurp read-string) - :onyx/id onyx-id - :zookeeper/address zk-str)) - -(defn load-env-config [onyx-id] - (assoc (-> "env-config.edn" resource slurp read-string) - :onyx/id onyx-id - :zookeeper/address zk-str - :zookeeper.server/port zk-port)) - -(defn in-memory-catalog - "Takes a catalog and a set of input/output task names, - returning a new catalog with all I/O catalog entries - that were specified turned into core.async plugins. The - core.async entries preserve all non-Onyx parameters." - [catalog tasks] - (mapv - (fn [entry] - (cond (and (some #{(:onyx/name entry)} tasks) (= (:onyx/type entry) :input)) - (merge - entry - {:onyx/plugin :onyx.plugin.core-async/input - :onyx/type :input - :onyx/medium :core.async - :onyx/max-peers 1 - :onyx/doc "Reads segments from a core.async channel"}) - (and (some #{(:onyx/name entry)} tasks) (= (:onyx/type entry) :output)) - (merge - entry - {:onyx/plugin :onyx.plugin.core-async/output - :onyx/type :output - :onyx/medium :core.async - :onyx/max-peers 1 - :onyx/doc "Writes segments to a core.async channel"}) - :else entry)) - catalog)) - ;;;; Lifecycles utils ;;;; (def input-channel-capacity 10000) @@ -118,13 +43,6 @@ (memoize (fn [id] (chan (sliding-buffer output-channel-capacity))))) -(defn channel-id-for [lifecycles task-name] - (->> lifecycles - (filter #(= task-name (:lifecycle/task %))) - (map :core.async/id) - (remove nil?) - (first))) - (defn inject-in-ch [event lifecycle] {:core.async/chan (get-input-channel (:core.async/id lifecycle))}) @@ -138,46 +56,5 @@ {:lifecycle/before-task-start inject-out-ch}) ;;; Stubs lifecycles to use core.async IO, instead of, say, Kafka or Datomic. -(defn in-memory-lifecycles - [lifecycles catalog tasks] - (vec - (mapcat - (fn [{:keys [lifecycle/task lifecycle/replaceable?] :as lifecycle}] - (let [catalog-entry (find-task catalog task)] - (cond (and (some #{task} tasks) replaceable? - (= (:onyx/type catalog-entry) :input)) - [{:lifecycle/task task - :lifecycle/calls ::in-calls - :core.async/id (java.util.UUID/randomUUID)} - {:lifecycle/task task - :lifecycle/calls :onyx.plugin.core-async/reader-calls}] - (and (some #{task} tasks) replaceable? - (= (:onyx/type catalog-entry) :output)) - [{:lifecycle/task task - :lifecycle/calls ::out-calls - :core.async/id (java.util.UUID/randomUUID)} - {:lifecycle/task task - :lifecycle/calls :onyx.plugin.core-async/writer-calls}] - :else [lifecycle]))) - lifecycles))) - -(defn bind-inputs! [lifecycles mapping] - (doseq [[task segments] mapping] - (let [in-ch (get-input-channel (channel-id-for lifecycles task)) - n-segments (count segments)] - (when (< input-channel-capacity n-segments) - (throw (ex-info "Input channel capacity is smaller than bound inputs. Capacity can be adjusted in utils.clj" - {:channel-size input-channel-capacity - :n-segments n-segments}))) - (when-not ((set (map :lifecycle/task lifecycles)) task) - (throw (ex-info (str "Cannot bind input for task " task " as lifecycles are missing. Check that inputs are being bound to the correct task name.") - {:input task - :lifecycles lifecycles}))) - (doseq [segment segments] - (>!! in-ch segment)) - (>!! in-ch :done)))) - -(defn collect-outputs! [lifecycles output-tasks] - (->> output-tasks - (map #(get-output-channel (channel-id-for lifecycles %))) - (map take-segments!))) + + From b5731463a075a8f6f58a1702489cff66f04a2e0a Mon Sep 17 00:00:00 2001 From: Sean Schulte Date: Tue, 9 Feb 2016 13:24:10 -0600 Subject: [PATCH 16/17] Using one of the utils for a test --- test/desdemona/jobs/sample_job_test.clj | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/test/desdemona/jobs/sample_job_test.clj b/test/desdemona/jobs/sample_job_test.clj index 0a78ad8..f1794dd 100644 --- a/test/desdemona/jobs/sample_job_test.clj +++ b/test/desdemona/jobs/sample_job_test.clj @@ -1,14 +1,12 @@ (ns desdemona.jobs.sample-job-test (:require [clojure.test :refer [deftest is]] [desdemona.jobs.sample-submit-job :refer [build-job]] + [desdemona.utils :refer [find-task]] ; Make the plugins load [onyx.plugin.kafka] [onyx.plugin.seq] [onyx.plugin.sql])) -(defn by-name [catalog name] - (first (filter (fn [x] (= name (x :onyx/name))) catalog))) - (deftest build-job-test (let [job (build-job) expected-catalog-names [:extract-line-info :prepare-rows :read-lines :write-lines] @@ -23,7 +21,7 @@ {:lifecycle/task :write-lines :lifecycle/calls :onyx.plugin.sql/write-rows-calls} {:lifecycle/task :read-lines, :lifecycle/calls :onyx.plugin.kafka/read-messages-calls}]] (is (= (map :onyx/name catalog) expected-catalog-names)) - (is (= ((by-name catalog :read-lines) :kafka/topic) "test1")) - (is (= ((by-name catalog :write-lines) :sql/table) :logLines)) + (is (= ((find-task catalog :read-lines) :kafka/topic) "test1")) + (is (= ((find-task catalog :write-lines) :sql/table) :logLines)) (is (= workflow expected-workflow)) (is (= lifecycles expected-lifecycles)))) From cb9762a4b558e4f0730c18270b88a6c61868baca Mon Sep 17 00:00:00 2001 From: Sean Schulte Date: Tue, 9 Feb 2016 13:35:54 -0600 Subject: [PATCH 17/17] Testing error case for raw deserializer --- test/desdemona/tasks/kafka_test.clj | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/desdemona/tasks/kafka_test.clj b/test/desdemona/tasks/kafka_test.clj index 1de565c..18fa437 100644 --- a/test/desdemona/tasks/kafka_test.clj +++ b/test/desdemona/tasks/kafka_test.clj @@ -6,3 +6,7 @@ (let [got (deserialize-message-raw (.getBytes "this is raw text")) expected {:line "this is raw text"}] (is (= got expected)))) + +(deftest deserialize-message-raw-fails-test + (let [got (deserialize-message-raw "this should be bytes")] + (is (= (.getMessage (got :error)) "No matching ctor found for class java.lang.String"))))