Skip to content

Latest commit

 

History

History
155 lines (126 loc) · 7.61 KB

scenario_01_rest-ksql-gcp.adoc

File metadata and controls

155 lines (126 loc) · 7.61 KB

Scenario 1 - REST source, KSQL to wrangle, stream to GCS & GBQ, viz with GDS

Environment Data

Other REST data sources to play with

Current datetime (useful smoke test)

ccloud topic create current-datetime
# Current time
curl -i -X POST -H "Accept:application/json" \
          -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
          -d '{
    "name": "source_rest_current-datetime",
    "config": {
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter":"org.apache.kafka.connect.storage.StringConverter",
      "connector.class": "com.tm.kafka.connect.rest.RestSourceConnector",
      "tasks.max": "1",
      "rest.source.poll.interval.ms": "60000",
      "rest.source.method": "GET",
      "rest.source.url": "http://worldclockapi.com/api/json/utc/now",
      "rest.source.payload.converter.class": "com.tm.kafka.connect.rest.converter.StringPayloadConverter",
      "rest.source.properties": "Content-Type:application/json,Accept::application/json",
      "rest.source.topic.selector": "com.tm.kafka.connect.rest.selector.SimpleTopicSelector",
      "rest.source.destination.topics": "current-datetime"
    }
  }'

Check status:

$ curl -s "http://localhost:8083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort
source_rest_current-datetime        |  RUNNING  |  RUNNING
source_rest_flood-monitoring-L2404  |  RUNNING  |  RUNNING

Check the data:

"2018-08-07T09:55Z"
"2018-08-07T09:56Z"

Weather

#!/bin/bash

curl -i -X POST -H "Accept:application/json" \
          -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
          -d '{
    "name": "source-rest-weather_york",
    "config": {
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter":"org.apache.kafka.connect.storage.StringConverter",
      "connector.class": "com.tm.kafka.connect.rest.RestSourceConnector",
      "tasks.max": "1",
      "rest.source.poll.interval.ms": "900000",
      "rest.source.method": "GET",
      "rest.source.url": "https://api.openweathermap.org/data/2.5/weather?q=York,uk&appid=5139ef0dd688cdb7d864f4e118445aa3",
      "rest.source.payload.converter.class": "com.tm.kafka.connect.rest.converter.StringPayloadConverter",
      "rest.source.properties": "Content-Type:application/json,Accept::application/json",
      "rest.source.topic.selector": "com.tm.kafka.connect.rest.selector.SimpleTopicSelector",
      "rest.source.destination.topics": "york-weather"
    }
  }'
ksql> create stream weather (weather array<struct<icon varchar ,description varchar, main varchar,id int>>,main struct<temp double,pressure bigint, humidity bigint>, visibility bigint, wind struct<speed double,deg int>,name varchar) with (kafka_topic='london-weather',value_format='json');

 Message
----------------
 Stream created
----------------
ksql> select name,weather[0], main->temp, main->temp - 273.15 as temp_c from weather;
London | {ICON=04n, DESCRIPTION=overcast clouds, MAIN=Clouds, ID=804} | 291.64 | 18.49000000000001
London | {ICON=04n, DESCRIPTION=overcast clouds, MAIN=Clouds, ID=804} | 291.63 | 18.480000000000018
London | {ICON=04n, DESCRIPTION=overcast clouds, MAIN=Clouds, ID=804} | 291.69 | 18.54000000000002
^CQuery terminated
ksql> select name,weather[0]->description, main->temp, main->temp - 273.15 as temp_c from weather;
London | overcast clouds | 291.64 | 18.49000000000001
London | overcast clouds | 291.63 | 18.480000000000018
London | overcast clouds | 291.69 | 18.54000000000002

Stock details

ccloud topic create iex-stock-aapl-quote
ccloud topic create iex-stock-aapl-company
# IEX APPL quote
# https://iextrading.com/developer/
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d '{
    "name": "source_rest_iex-stock-aapl-quote",
    "config": {"key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.storage.StringConverter","connector.class": "com.tm.kafka.connect.rest.RestSourceConnector","tasks.max": "1", "rest.source.method": "GET", "rest.source.payload.converter.class": "com.tm.kafka.connect.rest.converter.StringPayloadConverter", "rest.source.properties": "Content-Type:application/json,Accept::application/json", "rest.source.topic.selector": "com.tm.kafka.connect.rest.selector.SimpleTopicSelector",
      "rest.source.url": "https://api.iextrading.com/1.0/stock/aapl/quote",
      "rest.source.poll.interval.ms": "60000",
      "rest.source.destination.topics": "iex-stock-aapl-quote"
    }}'
# IEX APPL company
# https://iextrading.com/developer/
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d '{
    "name": "source_rest_iex-stock-aapl-company",
    "config": {"key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.storage.StringConverter","connector.class": "com.tm.kafka.connect.rest.RestSourceConnector","tasks.max": "1", "rest.source.method": "GET", "rest.source.payload.converter.class": "com.tm.kafka.connect.rest.converter.StringPayloadConverter", "rest.source.properties": "Content-Type:application/json,Accept::application/json", "rest.source.topic.selector": "com.tm.kafka.connect.rest.selector.SimpleTopicSelector",
      "rest.source.url": "https://api.iextrading.com/1.0/stock/aapl/company",
      "rest.source.poll.interval.ms": "600000",
      "rest.source.destination.topics": "iex-stock-aapl-company"
    }}'

Check the data:

$ ccloud consume --from-beginning --topic iex-stock-aapl-quote
{"symbol":"AAPL","companyName":"Apple Inc.","primaryExchange":"Nasdaq Global Select","sector":"Technology","calculationPrice":"close","open":207.93,"openTime":1533562200581,"close":209.07,"closeTime":1533585600168,"high":209.25,"low":207.07,"latestPrice":209.07,"latestSource":"Close","latestTime":"August 6, 2018","latestUpdate":1533585600168,"latestVolume":25390079,"iexRealtimePrice":null,"iexRealtimeSize":null,"iexLastUpdated":null,"delayedPrice":209.06,"delayedPriceTime":1533585600229,"extendedPrice":209.02,"extendedChange":-0.05,"extendedChangePercent":-0.00024,"extendedPriceTime":1533589186272,"previousClose":207.99,"change":1.08,"changePercent":0.00519,"iexMarketPercent":null,"iexVolume":null,"avgTotalVolume":23922439,"iexBidPrice":null,"iexBidSize":null,"iexAskPrice":null,"iexAskSize":null,"marketCap":1009792628820,"peRatio":20.18,"week52High":209.25,"week52Low":149.16,"ytdChange":0.22852624924298495}

Check the data:

$ ccloud consume --from-beginning --topic iex-stock-aapl-company
{"symbol":"AAPL","companyName":"Apple Inc.","exchange":"Nasdaq Global Select","industry":"Computer Hardware","website":"http://www.apple.com","description":"Apple Inc is designs, manufactures and markets mobile communication and media devices and personal computers, and sells a variety of related software, services, accessories, networking solutions and third-party digital content and applications.","CEO":"Timothy D. Cook","issueType":"cs","sector":"Technology","tags":["Technology","Consumer Electronics","Computer Hardware"]}