Skip to content

Latest commit

 

History

History
110 lines (86 loc) · 5.05 KB

File metadata and controls

110 lines (86 loc) · 5.05 KB

Discovery Streams

Discovery Streams dynamically finds kafka topics and makes available corresponding channels on a public websocket. Channels are named with the form of streaming:{dataset systemName} (example: streaming:central_ohio_transit_authority__cota_stream).

Using Discovery Streams

Connecting to Websocket

  1. Install websocat.
  2. Start websocat:
    websocat wss://streams.smartcolumbusos.com/socket/websocket -H='User-Agent: websocat'
  3. Connect to a dataset:
    {"topic": "streaming:central_ohio_transit_authority__cota_stream","event":"phx_join","payload":{},"ref":"1"}
  4. The following response indicates a successful connection:
    {"event":"phx_reply","payload":{"response":{},"status":"ok"},"ref":"1","topic":"streaming:central_ohio_transit_authority__cota_stream"}

Running Locally

  1. In the smartcitiesdata directory, install dependencies:

    mix deps.get
  2. In this directory, start Docker:

    MIX_ENV=integration mix docker.start
  3. Start the Phoenix server:

    MIX_ENV=integration mix phx.server
    
    # ...or start interactively:
    MIX_ENV=integration iex -S mix phx.server

Setting up to stream data locally

  1. Install websocat.

  2. Start Docker from Andi. Run Andi and Reaper locally.

  3. In discovery_streams > config > config.exs, under config :discovery_streams, set topic_prefix to "raw-".

    • Make sure to change it back to "transformed-" when finished with local testing!
  4. Run Discovery Streams by starting the Phoenix server:

    MIX_ENV=integration iex -S mix phx.server
  5. Make a PUT request to the Andi API to add a dataset.

    • For help with the API, see the Postman collection located here.
    • Use the central_ohio_transit_authority__cota_stream dataset found here, or create your own.
    • Before making the PUT request, make sure the dataset has technical.sourceType set to "stream".
  6. Start websocat:

    websocat ws://127.0.0.1:4001/socket/websocket -H='User-Agent: websocat'
  7. Connect to your dataset:

    {"topic": "streaming:central_ohio_transit_authority__cota_stream","event":"phx_join","payload":{},"ref":"1"}
    • If using your own dataset, replace central_ohio_transit_authority__cota_stream with the system name of your dataset.
  8. You should see the following success response:

    {"event":"phx_reply","payload":{"response":{},"status":"ok"},"ref":"1","topic":"streaming:central_ohio_transit_authority__cota_stream"}

    Every ten seconds (by default), you should see data events appear in the console.

Connecting to a private dataset

A private dataset can be streamed by a client within the organization that owns the dataset. An API key can be provided in the phx_join event by including the key-value pair "api_key":"<client API key>" in the payload:

   {"topic": "streaming:central_ohio_transit_authority__cota_stream","event":"phx_join","payload":{"api_key":"1234567890abcdefg"},"ref":"1"}

Setting a Filter

A filter can be provided in the phx_join event by giving a filter key and value as the payload:

# Stream only vehicles with an id of 11409
websocat wss://streams.smartcolumbusos.com/socket/websocket -H='User-Agent: websocat'
{"topic": "streaming:central_ohio_transit_authority__cota_stream","event":"phx_join","payload":{"vehicle.vehicle.id":"11409"},"ref":"1"}

# Include both a filter and an API key
websocat wss://streams.smartcolumbusos.com/socket/websocket -H='User-Agent: websocat'
{"topic": "streaming:central_ohio_transit_authority__cota_stream","event":"phx_join","payload":{"api_key":"1234567890abcdefg","vehicle.vehicle.id":"11409"},"ref":"1"}

Environment Variables

Variable Description Example
MIV_ENV Environment for Mix build dev, test, integration, or prod
KAFKA_BROKERS comma delimited list of kafka brokers kafka1.com:9092,kafka2.com:9092
SECRET_KEY_BASE Pheonix uses this to verify cookies. Generate with mix phx.gen.secret or pass in your own

To run the tests

  • Run mix test to run the tests a single time
  • Run mix test.watch to re-run the tests when a file changes
  • Run mix test.watch --stale to only rerun the tests for modules that have changes
  • Run mix test.integration to run the integration tests