Skip to content

bytewax/profiling-time-series-data

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

25 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Profiling Streaming Time Series Data

  • Skill level

    Intermediate, some knowledge of windowing and pandas will help

  • Time to complete

    Approx. 25 min

Introduction: In this guide, we will show you how you can combine bytewax with ydata-profiling to profile and understand the quality of your streaming data!

Prerequisites

Python modules

bytewax==0.16.2 
ydata-profiling==4.3.1
matplotlib==3.7

Your Takeaway

You'll be able to handle and structure data streams into snapshots using Bytewax, and then analyze them with ydata-profiling to create a comprehensive report of data characteristics for each device at each time interval.

Table of content

Resources

Github link

Jupyter Notebook link

Data sample link

Data Profiling

Instead of the usual approach, where data quality is assessed during the creation of the data warehouse or dashboard solution, it is a cheaper, more effective and ultimately more robust approach to monitor the quality closer to the source, which is a great fit for stream processing, since most data is created in real-time. This will prevent any data quality issues from multiplying in downstream tables and ending up in customer-facing services.

In what concerns data profiling, ydata-profiling has consistently been a crowd favorite, either for tabular or time-series data. And no wonder why — it’s one line of code for an extensive set of analysis and insights.

Let's see it in action!

Step 1. Environmental Sensor Telemetry Dataset

Let's download a subset of the Environmental Sensor Telemetry Dataset (License — CC0: Public Domain), which contains several measurements of temperature, humidity, carbon monoxide liquid petroleum gas, smoke, light, and motion from different IoT devices

wget https://raw.githubusercontent.com/bytewax/air-quality-sensor/main/data/iot_telemetry_data_1000

In a production environment, these measurements would be continuously generated by each device, and the input would look like what we expect in a streaming platform such as Kafka.

Step 2. Inputs and parsing

To simulate a stream of data, we will use the Bytewax CSVInput connector to read the CSV file we downloaded one line at a time. In a production use case, you could easily swap this out with the KafkaInput connector.

First, let’s make some necessary imports:

from datetime import datetime, timedelta, timezone
from bytewax.dataflow import Dataflow
from bytewax.connectors.stdio import StdOutput
from bytewax.connectors.files import CSVInput

Then, we define our dataflow object and add our CSV input.

flow = Dataflow()
flow.input("simulated_stream", CSVInput("iot_telemetry_data_1000"))

Afterward, we will use a stateless map method where we pass in a function to convert the string to a datetime object and restructure the data to the format (device_id, data).

# parse timestamp
def parse_time(reading_data):
reading_data["ts"] = datetime.fromtimestamp(float(reading_data["ts"]), timezone.utc)
return reading_data
flow.map(parse_time)
# remap format to tuple (device_id, reading_data)
flow.map(lambda reading_data: (reading_data['device'], reading_data))

The map method will make the change to each data point in a stateless way. The reason we have modified the shape of our data is so that we can easily group the data in the next steps to profile data for each device separately rather than for all of the devices simultaneously.

Step 3. Windowing

Now we will take advantage of the stateful capabilities of bytewax to gather data for each device over a duration of time that we have defined. ydata-profiling expects a snapshot of the data over time, therefore the window operator is the perfect method to use to do this.

from bytewax.window import EventClockConfig, TumblingWindow
# This is the accumulator function, and outputs a list of readings
def acc_values(acc, reading):
acc.append(reading)
return acc
# This function instructs the event clock on how to retrieve the
# event's datetime from the input.
def get_time(reading):
return reading["ts"]
# Configure the `fold_window` operator to use the event time.
cc = EventClockConfig(get_time, wait_for_system_duration=timedelta(seconds=30))
# And a 5 seconds tumbling window
align_to = datetime(2020, 1, 1, tzinfo=timezone.utc)
wc = TumblingWindow(align_to=align_to, length=timedelta(hours=1))
flow.fold_window("running_average", cc, wc, list, acc_values)

In ydata-profiling, we are able to produce summary statistics for a Pandas DataFrame which is specified for a particular context. For instance, in our example, we can produce snapshots of data referring to each IoT device or to particular time frames.

Step 4. Profile Report

After the snapshots are defined, leveraging ydata-profiling is as simple as calling the PorfileReport method for each of the dataframes we would like to analyze:

from ydata_profiling import ProfileReport
import pandas as pd
def profile(device_id__readings):
print(device_id__readings)
device_id, readings = device_id__readings
start_time = readings[0]['ts'].replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S')
df = pd.DataFrame(readings)
profile = ProfileReport(
df,
tsmode=True,
sortby="ts",
title=f"Sensor Readings - device: {device_id}"
)
profile.to_file(f"Ts_Profile_{device_id}-{start_time}.html")
return f"device {device_id} profiled at hour {start_time}"
flow.map(profile)

Step 5. Kicking Things off

Once the profile is complete, the dataflow expects some output, so we can use the built-in StdOutput to print the device that was profiled and the time it was profiled at that was returned by the profile function in the map step:

flow.output("out", StdOutput())

And we are ready to run our program! You can clone this repository to your machine and run the following commands:

python -m bytewax.run dataflow:flow

Summary

We can now use the profiling reports to validate the data quality, check for changes in schemas or data formats, and compare the data characteristics between different devices or time windows.

Being able to process and profile incoming data appropriately opens up a plethora of use cases across different domains, from the correction of errors in data schemas and formats to the highlighting and mitigation of additional issues that derive from real-world activities, such as anomaly detection (e.g., fraud or intrusion/threats detection), equipment malfunction, and other events that deviate from the expectations (e.g., data drifts or misalignment with business rules).

This guide was written with the support of the Ydata team

We want to hear from you!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #troubleshooting Slack channel!

Where to next?

See our full gallery of tutorials →

Share your tutorial progress!

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published