### Data & Innovation Day - Realtime - Azure AI Services and Fabric demonstration

The NOAA’s Center for Operational Oceanographic Products and Services (CO-OPS) provides real-time information on tides, water levels, currents, and other coastal oceanographic and meteorological data. This data serves a crucial purpose in aiding the maritime transportation industry by ensuring safe and efficient navigation through waterways and ports. In addition, current trends and predictions play a significant role in assisting individuals in preparing for potential flooding due to storms, tsunamis, and sea level fluctuations.

![A blue and black rectangle with a white cross  Description automatically generated](https://dataplatformblogcdn.azureedge.net/wp-content/uploads/2023/07/a-blue-and-black-rectangle-with-a-white-cross-des.png)

https://blog.fabric.microsoft.com/en-US/blog/microsoft-fabric-event-streams-generating-real-time-insights-with-python-kql-and-powerbi/

https://tidesandcurrents.noaa.gov/map/index.html


In [1]:
# Welcome to your new notebook
# Type here in the cell editor to add code!
%pip install azure-eventhub
%pip install azure-identity
%pip install aiohttp

StatementMeta(, ff194b8d-2ac9-4e56-af8b-a6d79b20c670, 10, Finished, Available, Finished)

Collecting azure-eventhub
  Downloading azure_eventhub-5.12.2-py3-none-any.whl.metadata (69 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m69.3/69.3 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
Downloading azure_eventhub-5.12.2-py3-none-any.whl (326 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m326.6/326.6 kB[0m [31m7.8 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: azure-eventhub
Successfully installed azure-eventhub-5.12.2

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnot

**Collecting Coastal Water Level and Atmospheric Data with Python**

NOAA provides real-time water level information that is updated every 6 minutes. You can use HTTP Get API call to fetch water level and location information for a specific station. This API takes a StationID (full list is here) as a required input and some other optional parameters. 

```xml
<data>
<metadata id="8594900" name="Washington" lat="38.8733" lon="-77.0217"/>
<observations>
<wl t="2023-07-09 19:30" v="8.232" s="0.023" f="1,0,0,0" q="p"/>
</observations>
</data>
```

**Create a request payload, issue the HTTP call and retrieve the response from NOAA into a Python function**

In [2]:
import requests
import time
from datetime import datetime
# Fetch water level data from tidesandcurrents.noaa.gov
def fetch_water_level(station_id):
    url = "https://api.tidesandcurrents.noaa.gov/api/prod/datagetter"
    # Collect water level for one station
    payload = {
        "product": "water_level",
        "application": "Fabric EventStream",  # Replace with your application name
        "datum": "STND",
        "station": station_id,  # Set station ID
        "time_zone": "gmt",
        "units": "english",  # Use "english" for feet, "metric" for meters
        "format": "json",
        "date": "latest",
    }
    response = requests.get(url, params=payload)
    if response.status_code == 200:
        data = response.json()
        return data
    return None

StatementMeta(, ff194b8d-2ac9-4e56-af8b-a6d79b20c670, 12, Finished, Available, Finished)

**Unit test of API Call**

```javascript
{'Current_time': '2024-01-12 08:20:25', 'Water Level Value': '8.937', 'Station Latitude': '47.5617', 'Station Longitude': '-122.6230', 'Station': 'Water Station'}
```

In [3]:
stationID = "9445958"  # Sample Bremerton, WA [9445958]
station_data = fetch_water_level(stationID)

if station_data is not None:
                # Fetch station location
                data = station_data["data"]
                if len(data) > 0:
                    water_level = data[0]["v"]
                station_latitude = station_data["metadata"]["lat"]
                station_longitude = station_data["metadata"]["lon"]
                # Create a data point with the time and value
                current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                # Generate the data structure
                water_point = {
                    "Current_time": current_time,
                    "Water Level Value": water_level,
                    "Station Latitude": station_latitude,
                    "Station Longitude": station_longitude,
                    "Station": "Water Station",
                }
print(water_point)


StatementMeta(, ff194b8d-2ac9-4e56-af8b-a6d79b20c670, 13, Finished, Available, Finished)

{'Current_time': '2024-10-28 10:00:50', 'Water Level Value': '20.371', 'Station Latitude': '47.5617', 'Station Longitude': '-122.6230', 'Station': 'Water Station'}


**Ingesting Data into Fabric Eventstreams (using Python)**

Now that we have a well-tested Python code to retrieve all the data we need from its source, let us see how we can package the same in a ‘EventData’ structure and send it on to a Fabric Eventstream. For now, we will focus on the Python code but in a subsequent, we will show how to connect the Python code to a Eventstream.

The above Python code acts as a Eventstream Producer/client and publishes water level data from NOAA every **6 minutes** to it. This is the interval at which NOAA refreshes its data. Refreshing at a higher frequency would be redundant.

In [None]:
# code for fabric event stream
EVENTSTREAM_CONNECTION_STR = ""
EVENTSTREAM_NAME = "es_3f228009-e282-4216-852a-a4bb12080b44"

StatementMeta(, ff194b8d-2ac9-4e56-af8b-a6d79b20c670, 14, Finished, Available, Finished)

In [5]:
import time
import json
from datetime import datetime
from azure.eventhub import EventData
from azure.eventhub.aio import EventHubProducerClient

async def run():
    """
    This function sends water level data to an EventStream using Azure Event Hubs.
    It fetches the current water level from a specified station and sends it as a JSON payload to the EventStream.
    The function runs indefinitely, generating a new data point every 6 minutes.
    """

    # Create a producer client to send messages to the EventStream.
    # Specify a connection string to your EventStream namespace and the eventstream name.
    producer = EventHubProducerClient.from_connection_string(
        conn_str=EVENTSTREAM_CONNECTION_STR, eventhub_name=EVENTSTREAM_NAME
    )

    # Specify the station ID
    stationID = "9445958"  # Sample Bremerton, WA [9445958]

    async with producer:
        while True:
            # Create a batch.
            event_data_batch = await producer.create_batch()

            # Fetch current water level in feet
            station_data = fetch_water_level(stationID)
           
            if station_data is not None:
                # Fetch station location
                data = station_data["data"]
                if len(data) > 0:
                    water_level = data[0]["v"]
                station_latitude = station_data["metadata"]["lat"]
                station_longitude = station_data["metadata"]["lon"]

                # Create a data point with the time and value
                current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

                # Generate the data structure
                water_point = {
                    "Current_time": current_time,
                    "Water Level Value": water_level,
                    "Station Latitude": station_latitude,
                    "Station Longitude": station_longitude,
                    "Station": "Water Station",
                }

                # Convert the curve data to JSON format
                json_water_data = json.dumps(water_point)
                event_data_batch.add(EventData(json_water_data))

                # Send the batch of events to the event hub.
                await producer.send_batch(event_data_batch)

            # Wait for 6 minutes before generating the next point
            time.sleep(360)

await run()

StatementMeta(, ff194b8d-2ac9-4e56-af8b-a6d79b20c670, 15, Submitted, Running, Running)