Skip to content

Latest commit

 

History

History
301 lines (239 loc) · 13.1 KB

workshop_edge.adoc

File metadata and controls

301 lines (239 loc) · 13.1 KB

Ingesting data from the edge

In this workshop you’ll use MiNiFi to capture data from the edge and forward it to NiFi.

Preparation

To clean your environment and reset to the beginning of this lab, please SSH to your cluster host and run the following command:

Note
The command below will undo everything done in the cluster in previous workshops.
/tmp/resources/reset-to-lab.sh edge 1

Labs summary

  • Lab 1 - Run a simulator on Apache NiFi to send IoT sensors data to a MQTT broker.

  • Lab 2 - Create a flow to collect data from the MQTT broker using Cloudera Edge Flow Manager and publish it to a MiNiFi agent.

  • Lab 3 - Use the Edge Flow Manager to update existing edge flows and perform additional processing on the edge

Lab 1 - Apache NiFi: setup machine sensors simulator

In this lab you will run a simple Python script that simulates IoT sensor data from some hypothetical machines, and send the data to a MQTT broker (mosquitto). The MQTT broker plays the role of a gateway that is connected to many and different type of sensors through the "mqtt" protocol. Your cluster comes with an embedded MQTT broker that the simulation script publishes to. For convenience, we will use NiFi to run the script rather than Shell commands.

  1. Go to Apache NiFi and add a Processor (ExecuteProcess) to the canvas.

    simulate1
  2. Right-click the processor, select Configure (or, alternatively, just double-click the processor). On the PROPERTIES tab, set the properties shown below to run our Python simulate script.

    Command:           python3
    Command Arguments: /opt/demo/simulate.py
    simulate2
  3. In the SCHEDULING tab, set to Run Schedule: 1 sec

    Alternatively, you could set that to other time intervals: 1 sec, 30 sec, 1 min, etc…​

    runSimulator1or30
  4. In the SETTINGS tab:

    1. Check the "success" relationship in the AUTOMATICALLY TERMINATED RELATIONSHIPS section

    2. Set the processor name to "Generate Test Data"

    3. Click Apply.

      nifiTerminateRelationships
  5. You can then right-click to Start this simulator runner.

    nifiDemoStart
  6. Right-click and select Stop after a few seconds and look at the provenance. You’ll see that it has run a number of times and produced results.

    NiFiViewDataProvenance
    NiFiDataProvenance

Lab 2 - Configuring Edge Flow Management

Cloudera Edge Flow Management (EFM) gives you a visual overview of all MiNiFi agents in your environment, and allows you to update the flow configuration for each one, with versioning control thanks to the NiFi Registry integration. In this lab, you will create the MiNiFi flow and publish it for the MiNiFi agent to pick it up.

  1. Open the EFM Web UI at http://<public_dns>:10088/efm/ui/ and select the Dashboard tab (dashboard icon)

  2. Click on the EVENTS header and verify that your EFM server is receiving heartbeats from the MiNiFi agent. Click on the info icon on a heartbeat record to see the details of the heartbeat.

    cem heartbeats
  3. Select the Flow Designer tab (flow designer icon). To build a dataflow, select the desired class (iot-1) from the table and click OPEN. Alternatively, you can double-click on the desired class.

  4. Add a ConsumeMQTT Processor to the canvas, by dragging the processor icon to the canvas, selecting the ConsumeMQTT processor type and clicking on the Add button. Once the processor is on the canvas, double-click it and configure it with below settings:

    Broker URI:     tcp://<CLUSTER_HOSTNAME>:1883
    Client ID:      minifi-iot
    Topic Filter:   iot/#
    Max Queue Size: 60
    add consumer mqtt

    And ensure you scroll down on the properties page to set the Topic Filter and Max Queue Size:

    add consumer mqtt 2
  5. Add a Remote Process Group (RPG) to the canvas and configure it as follows:

    URL:                http://<CLUSTER_HOSTNAME>:8080/nifi
    Transport Protocol: HTTP
    add rpg
  6. At this point you need to connect the ConsumerMQTT processor to the RPG. For this, you first need to add an Input Port to the remote NiFi server.

    1. Open the NiFi Web UI at http://<public_dns>:8080/nifi/

    2. Drag an Input Port to the canvas.

    3. When prompted for its name, call it something like "from Gateway" and click ADD.

      add input port
  7. To terminate the NiFI Input Port let’s, for now, add a Funnel to the canvas…​

    add funnel
  8. …​ and setup a connection from the Input Port to it. To setup a connection, hover the mouse over the Input Port until an arrow symbol is shown in the center. Click on the arrow, drag it and drop it on the Funnel to connect the two elements.

    connecting
  9. Right-click on the Input Port and start it. Alternatively, click on the Input Port to select it and then press the start ("play") button on the Operate panel:

    operate panel
  10. You will need the ID of the Input Port to complete the connection of the ConsumeMQTT processor to the RPG (NiFi). Double-click on the Input Port and copy its ID.

    input port id
  11. Back to the Flow Designer, connect the ConsumeMQTT processor to the RPG. The connection requires an ID and you can paste here the ID you copied from the Input Port. Make sure that there are NO SPACES!

    connect to rpg
  12. Double-click the connection and update the following configuration:

    Flowfile Expiration:            60 seconds
    Back Pressure Object Threshold: 10000
    Connection Name:                Sensor data
    efm set cloude config
  13. The Flow is now complete, but before publishing it, create the Bucket in the NiFi Registry so that all versions of your flows are stored for review and audit. Open the NiFi Registry at http://<public_dns>:18080/nifi-registry, click on the wrench/spanner icon (spanner icon) on the top-right corner on and create a bucket called IoT (ATTENTION: the bucket name is CASE-SENSITIVE).

    create bucket
  14. You can now publish the flow for the MiNiFi agent to automatically pick up. Click Publish, add a descriptive comment for your changes and click Apply.

    publish flow
    cem first version
  15. Go back to the NiFi Registry Web UI and click on the NiFi Registry name, next to the Cloudera logo. If the flow publishing was successful, you should see the flow’s version details in the NiFi Registry.

    flow in nifi registry
  16. At this point, you can test the edge flow up until NiFi. Start the NiFi simulator (ExecuteProcess processor) again and confirm you can see the messages queued in NiFi.

    queued events
  17. Right-click on the queue and select List queue to see details of the received messages.

    list queue
  18. Experimenting clicking on the Info, Eye and Provenance icons for one of the messages to look at the message attributes, contents and provenance details, respectively.

    message queue details
  19. For example, the sensor readings in each message contain temperature values, that should all range between 0 and 100 Celsius. If you sample a few of the messages' contents, you should be able to notice that some readings on sensor_0 and sensor_1 are reporting some bogus values, as shown below. We will address this issue in the next section.

    bogus readings
  20. You can stop the simulator now (Stop the NiFi processor).

Lab 3 - Update the flow to perform additional processing on the edge

In the previous lab we noticed that some sensors were sending erroneous measurements intermittently. If we let these measurements be processed by our downstream applications we might have problems with the quality of the output of those applications.

We could filter out the erroneous readings in NiFi. However, if the volume of problematic data is large we would be wasting network bandwidth to send that data to NiFi in the first place. What we will do instead is to push additional logic to the edge to identify and filter those problems in place and avoid the overhead of sending them to NiFi.

We’ve noticed that the problem always happen with the temperatures in measurements sensor_0 and sensor_1, only. If any of these two temperatures are greater than 500 we must discard the entire sensor reading. If both of these temperatures are in the normal range (< 500) we can guarantee that all temperatures reported are correct and can be sent to NiFi.

  1. Go to the CEM Web UI and add a new processor to the canvas. In the Filter box of the dialog that appears, type "JsonPath". Select the EvaluateJSONPath processor and click Add.

  2. Double-click on the new processor and configure it with the following properties:

    Processor Name: Extract sensor_0 and sensor1 values
    Destination:    flowfile-attribute
    evaluate json path
  3. Click on the Add Property button and enter the following properties:

    Property Name Property Value

    sensor_0

    $.sensor_0

    sensor_1

    $.sensor_1

    extract attributes
  4. Click Apply to save the processor configuration.

  5. Drag one more new processor to the canvas. In the Filter box of the dialog that appears, type "Route". Select the RouteOnAttribute processor and click Add.

    route on attribute
  6. Double-click on the new processor and configure it with the following properties:

    Processor Name: Filter Errors
    Route Strategy: Route to Property name
  7. Click on the Add Property button and enter the following properties:

    Property Name Property Value

    error

    ${sensor_0:ge(500):or(${sensor_1:ge(500)})}

    route on attribute config
  8. Click Apply to save the processor configuration.

  9. Reconnect the ConsumeMQTT processor to the Extract sensor_0 and sensor1 values processor:

    1. Click on the existing connection between ConsumeMQTT and the RPG to select it.

    2. Drag the destination end of the connection to the Extract sensor_0 and sensor1 values processor.

      reconnect consume mqtt
  10. Connect the Extract sensor_0 and sensor1 values to the Filter errors processor. When the Create Connection dialog appear, select "matched" and click Create.

    width-800
    create connection
  11. Double-click the connection, update the following configuration and Apply the changes:

    Flowfile Expiration:            60 seconds
    Back Pressure Object Threshold: 10000
    Connection Name:                Extracted attributes
  12. Double-click the Extract sensor_0 and sensor1 values and check the following values in the AUTOMATICALLY TERMINATED RELATIONSHIPS section and click Apply:

    • failure

    • unmatched

    • sensor_0

    • sensor_1

    terminations
  13. Before creating the last connection, you will need (again) the ID of the NiFi Input Port. Go to the NiFi Web UI , double-click on the "from Gateway" Input Port and copy its ID.

    input port id
  14. Back on the CEM Web UI, connect the Filter errors processor to the RPG:

    connect filter ro rpg
  15. In the Create Connection dialog, check the "unmatched" checkbox and enter the copied input port ID, and click on Create:

    create last connection
  16. Double-click the connection, update the following configuration and Apply the changes:

    Flowfile Expiration:            60 seconds
    Back Pressure Object Threshold: 10000
    Connection Name:                Valid data
  17. To ignore the errors, double-click on the Filter errors processor, check the error checkbox under the AUTOMATICALLY TERMINATED RELATIONSHIPS section and click Apply:

    terminate errors
  18. Finally, click on ACTIONS > Publish…​ on the CEM canvas, enter a descriptive comment like "Added filtering of erroneous readings" and click Publish.

  19. Start the simulator again.

  20. Go to the NiFi Web UI and confirm that the data is flowing to NiFi. Examine the messages' contents, as we did before to confirm the problematic readings are gone.

  21. Stop the simulator once you have verified the data.