# Monitoring

## Overview

The **Monitoring-Events App** lets you set up Monitors with an **Area of Interest (AOI)** and a set of criteria you specify to look for newly acquired matching imagery or other events.When a Monitor finds a matching event, the app shows you the events in a highly consistent manner with data that you can use. The app also allows to see the already created monitors and update, disable or enable the monitors.

The source for the events is the **MGP Discovery** catalog. When an image is added or modified, this generates an event in the **Monitoring** system. This means that a "new" image may have an older collection date. However, the **Monitoring** system does not go back in time to find older imagery or events.

## OAuth Validation Process

The following cell validates the Maxar ```OAuth2```.With ```OAuth2``` Password flow you exchange your username and password for a bearer token that is used to authenticate requests. You need to perform the following tasks in order to validate yout authentication:

1. **Install Python dependencies**:  
    ```pip install requests pyjwt```

2. **User Input Credentials**:  
    The user is prompted to securely  input their Maxar credentials, which is required to access Maxar Services.  

3. **Send the ```OAuth``` request**:  
    MGP uses OpenID Connect and the majority of MGP API calls require Bearer Token authentication. A basic OAuth password request is sent to adquire an access token that can be sent in the ```Bearer Token``` parameter of any Maxar MGP Endpoint.


In [None]:
import getpass
import time
import requests
import jwt

CLIENT_ID = "mgp"  
AUTH_URL = "https://account.maxar.com/auth/realms/mds/protocol/openid-connect/token" 

TOKEN = None          
CREATOR_ID = None     

def get_token(email: str, password: str) -> str:
    data = {
        "client_id": CLIENT_ID,
        "username": email,
        "password": password,
        "grant_type": "password"
    }
    response = requests.post(AUTH_URL, data=data)
    if not response.ok:
        raise Exception(f"Login failed with status code {response.status_code}")

    token_data = response.json()
    tok = token_data.get("access_token")
    if not tok:
        raise Exception("No access token received.")
    return tok

print("Please enter your credentials.")
email = input("Email (username): ")
password = getpass.getpass("Password: ")

try:
    TOKEN = get_token(email, password)
    print("\nLogin succeeded! Access token retrieved.\n")
except Exception as e:
    print(f"\nError: {e}")
    TOKEN = None

decoded_token = None

def is_token_valid(decoded: dict) -> bool:
    if not decoded or "exp" not in decoded:
        return False
    return decoded["exp"] > int(time.time())

if TOKEN:
    decoded_token = jwt.decode(TOKEN, options={"verify_signature": False})
    
    if is_token_valid(decoded_token):
        print("\nYour token is valid.")
    else:
        print("\nYour token has expired or is invalid.")
    
    if "sub" in decoded_token:
        CREATOR_ID = decoded_token["sub"]
    else:
        print("\nNo 'sub' field found in the token.")


## Default Interactive Map

This is the **default map** displayed when starting the app. It is centered on Denver, Colorado.

**Prerequisite**:  

Install the following library in order to use the leaflet map:  
* ```pip install ipyleaflet```
* ```pip install ipywidgets```

In [None]:
from ipyleaflet import Map, basemaps, basemap_to_tiles

default_map = Map(center=(39.57, -105.045), zoom=8)
tile_layer = basemap_to_tiles(basemaps.OpenStreetMap.Mapnik)
default_map.add_layer(tile_layer)

default_map

## Create Monitor

Create a **Monitor** to watch for new imagery within an **Area of Interest** that match specified criteria.

Follow this steps to create a **Monitor**:  
1. Click on the ```DrawControl``` button(the square one) to start drawing a Bbox by dragging the mouse and releasing it in an **Area of Interest**.
2. After releasing the mouse the output of the Bbox will display below the map and a modal with inputs will come up.
3. Fill all the required inputs and then click on the ```Create Monitor``` button or click on ```Cancel``` to start again.
4. If the request is successful the modal will hide and you should see the result below the map.  

**Note**: The oAuth Validation Process is required to do this.

In [None]:
import requests
from ipyleaflet import Map, basemaps, basemap_to_tiles, DrawControl
from ipywidgets import (
    VBox, HBox, Output, Button, Text, FloatText, SelectMultiple,
    Layout, Box, HTML
)

default_map = Map(center=(39.57, -105.045), zoom=8)
tile_layer = basemap_to_tiles(basemaps.OpenStreetMap.Mapnik)
default_map.add_layer(tile_layer)

rectangle_style = {
    "shapeOptions": {
        "color": "#ff7800",
        "weight": 2,
        "fillOpacity": 0.1
    }
}
draw_control = DrawControl(edit=False, remove=True)
draw_control.polyline = {}
draw_control.polygon = {}
draw_control.circle = {}
draw_control.marker = {}
draw_control.circlemarker = {}
draw_control.rectangle = rectangle_style

out = Output()

bbox_coords = []
rectangle_created = False

def handle_draw(self, action, geo_json):
    global rectangle_created, bbox_coords
    with out:
        out.clear_output()
        if action == 'created':
            if rectangle_created:
                print("A bounding box already exists. Please delete it first.")
                return
            coords_poly = geo_json['geometry']['coordinates'][0]
            xs = [c[0] for c in coords_poly]
            ys = [c[1] for c in coords_poly]
            minX, maxX = min(xs), max(xs)
            minY, maxY = min(ys), max(ys)
            bbox_coords = [minX, minY, maxX, maxY]
            rectangle_created = True

            print(f"Rectangle created. BBOX = {bbox_coords}")
            draw_control.rectangle = {}
            show_modal()  # Open the modal

        elif action == 'deleted':
            rectangle_created = False
            bbox_coords = []
            print("Bbox deleted. You may draw a new one.")
            draw_control.rectangle = rectangle_style

draw_control.on_draw(handle_draw)
default_map.add_control(draw_control)

modal_overlay = Box(
    layout=Layout(
        position='fixed',
        top='0px',
        left='0px',
        width='100%',
        height='100%',
        align_items='center',
        justify_content='center',
        background_color='rgba(0,0,0,0.5)',
        display='none',
        z_index='9999'
    )
)

# FORM FIELDS
description_input = Text(description="Description:", layout={"width": "300px"})
cloud_coverage_input = FloatText(description="Cloud (0-100):", layout={"width": "300px"})
off_nadir_input = FloatText(description="Off Nadir (0-90):", layout={"width": "300px"})

platform_list = ["wv01", "wv02", "wv03", "geoeye1", "pleiades", "kompsat"]
platform_select = SelectMultiple(
    options=platform_list,
    value=platform_list,
    description="Platforms:",
    layout={"width": "200px", "height": "100px"}
)

store_name_input = Text(description="Store Name*:", layout={"width": "300px"})
store_address_input = Text(description="Store Address:", layout={"width": "300px"})
market_segment_input = Text(description="Market Segment:", layout={"width": "300px"})

create_monitor_out = Output()  # This was inside the modal
request_result_out = Output()  # NEW: Output for success messages outside the modal

create_button = Button(description="Create Monitor", button_style="success")
close_modal_button = Button(description="Close", button_style="warning")

modal_box = VBox([
    HTML("<h3>Create Monitor</h3>"),
    description_input,
    cloud_coverage_input,
    off_nadir_input,
    platform_select,
    store_name_input,
    store_address_input,
    market_segment_input,
    HBox([close_modal_button, create_button]),
    create_monitor_out  # still here if you want ephemeral messages inside the modal
], layout=Layout(
    padding='20px',
    background_color='white',
    border='2px solid black',
    width='400px'
))

modal_overlay.children = [modal_box]

def show_modal():
    modal_overlay.layout.display = 'flex'

def hide_modal():
    modal_overlay.layout.display = 'none'

CREATOR_ID = None

def create_monitor_action(b):
    with create_monitor_out:
        create_monitor_out.clear_output()

    with request_result_out:
        request_result_out.clear_output()
        print("Sending request to create monitor...")

    if not store_name_input.value.strip():
        with create_monitor_out:
            print("Error: Store Name is required.")
        return

    global TOKEN  
    if not TOKEN:
        with create_monitor_out:
            print("Error: No token available!")
        return

    # Build request
    minX, minY, maxX, maxY = bbox_coords
    coordinates = [[
        [minX, minY],
        [maxX, minY],
        [maxX, maxY],
        [minX, minY],
    ]]

    request_body = {
        "source": "discovery/catalog",
        "description": description_input.value,
        "aoi_geojson": {
            "type": "Polygon",
            "coordinates": coordinates
        },
        "match_criteria": {
            "platform": {
                "in": list(platform_select.value)
            },
            "eo:cloud_cover": {
                "eq": cloud_coverage_input.value
            },
            "view:off_nadir": {
                "eq": off_nadir_input.value
            }
        },
        "metadata": {
            "store_name": store_name_input.value,
            "address": store_address_input.value,
            "market_segment": market_segment_input.value
        }
    }

    url = "https://api.maxar.com/monitoring/v1/monitors"
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {TOKEN}"
    }

    # Make the request
    try:
        resp = requests.post(url, json=request_body, headers=headers, timeout=30)
        with request_result_out:  # print outside the modal
            if resp.ok:
                data = resp.json()
                print("Monitor created successfully!")
                print("Response:", data)
                hide_modal()
            else:
                print(f"Failed to create monitor. HTTP {resp.status_code}")
                try:
                    print("Response:", resp.json())
                except:
                    print("Error parsing response JSON.")
    except Exception as e:
        with request_result_out:
            print("Network error or request failed:", str(e))

create_button.on_click(create_monitor_action)
close_modal_button.on_click(lambda b: hide_modal())

ui = VBox([
    default_map,
    out,
    modal_overlay,
    request_result_out  
])

ui


## Manage Existing Monitors

In order to update an existing monitor you will only need the desired monitor ```ID```.  

These all the fields that you can update: 
* ```description```
* ```start_datetime```
* ```end_datetime```
* ```metadata```  

**Requirements**  
```pip install pandas```

**Note**: The following code will return a list of created monitors associated with your account. Make sure that you have authenticated by running the first cell.



In [None]:
import requests
import pandas as pd

MONITORS = []  

def fetch_monitors():
    global MONITORS, CREATOR_ID, TOKEN
    
    if not TOKEN:
        print("ERROR: No token available.")
        return
    
    if not CREATOR_ID:
        print("ERROR: No creator_id found.")
        return
    
    url = f"https://api.maxar.com/monitoring/v1/monitors?limit=1000&sort=asc&filter=creator_id:{CREATOR_ID}"
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {TOKEN}"
    }
    
    print(f"Fetching monitors for creator_id={CREATOR_ID} ...")
    resp = requests.get(url, headers=headers)
    if not resp.ok:
        print(f"Error {resp.status_code}: {resp.reason}")
        try:
            print("Details:", resp.json())
        except:
            pass
        return
    
    data = resp.json()
    all_monitors = data.get("data", {}).get("monitors", [])
    
    # Filter only monitors that have a 'store_name' in their metadata
    filtered_monitors = [
        m for m in all_monitors
        if m.get("metadata", {}).get("store_name")
    ]
    
    # Build a list of organized fields
    organized_monitors = []
    for m in filtered_monitors:
        metadata = m.get("metadata", {})
        organized_monitors.append({
            "id": m["id"],
            "description": m.get("description", ""),
            "store_name": metadata.get("store_name", ""),
            "market_segment": metadata.get("market_segment", ""),
            "address": metadata.get("address", ""),
            "enabled": m.get("enabled", False)
        })
    
    MONITORS = organized_monitors
    
    if MONITORS:
        df = pd.DataFrame(MONITORS)
        display(df[["id", "description", "store_name", "market_segment", "address", "enabled"]])
    else:
        print("No monitors with 'store_name' found.")

fetch_monitors()


## How to disable/enable a Monitor

To disable a ```Monitor``` you will only need the monitor's ```ID```.  

Follow this steps to do this:  
1. First you will need to run the last block of code to get the list of monitors.
2. After the list is displayed in the table you can grab the ```ID``` of the monitor that you want to disable.
3. Run the following code to disable/enable a monitor by entering the monitor ```ID``` and the ```action```(enable|disable). 
4. After running the code you will see the response below.

In [None]:
import requests
import json

def toggle_monitor_status():
    monitor_id = input("Enter the Monitor ID: ")
    action = input("Enter 'enable' or 'disable': ").strip().lower() or "enable"
    
    if action not in ["enable", "disable"]:
        print("Invalid action. Please enter 'enable' or 'disable'.")
        return

    url = f"https://api.maxar.com/monitoring/v1/monitors/{monitor_id}/{action}"
    
    print(f"\nSending POST request to: {url}")
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {TOKEN}"
    }
    
    try:
        response = requests.post(url, headers=headers)
        response.raise_for_status()  
        data = response.json()
        print("Success! Server responded with:")
        print(json.dumps(data, indent=2))
    except requests.exceptions.RequestException as e:
        print(f"Request failed: {e}")

toggle_monitor_status()


## Update Monitors

To update a ```monitor``` keep in mind the following prerequisite:
* start date cannot occur more than an hour in the past
* end date must occur at least five minutes into the future
* start date cannot be updated if monitor is enabled
* monitor cannot be updated if it already started

Follow this steps to update a monitor:
1. Using the Managing Existing Monitors walkthrough you can get the id of the ```monitor``` that you want to update.
2. Run the code below and start filling the inputs, if you want to leave an input blank just hit 'Enter'.
3. Once all the inputs are set, the code will run the request and the response will displayed below the code cell.

In [None]:
import requests
import json
from ipywidgets import Text, Button, VBox, Output, HBox, DatePicker, HTML
from datetime import datetime



update_out = Output()

def update_monitor():
    # Prompt the user for inputs
    monitor_id = input("Monitor ID to update: ").strip()
    
    new_description = input("New Description (leave blank to keep existing): ")
    start_dt_str = input("New Start Datetime (YYYY-MM-DD, or blank): ")
    end_dt_str = input("New End Datetime (YYYY-MM-DD, or blank): ")
    
    store_name = input("Store Name: ")
    store_address = input("Address: ")
    market_segment = input("Market Segment: ")
    
    start_datetime = None
    end_datetime = None
    
    if start_dt_str.strip():
        start_datetime = f"{start_dt_str}T00:00:00Z"
    if end_dt_str.strip():
        end_datetime = f"{end_dt_str}T00:00:00Z"
    
    # Build the request body
    body = {}
    if new_description.strip():
        body["description"] = new_description.strip()
    if start_datetime:
        body["start_datetime"] = start_datetime
    if end_datetime:
        body["end_datetime"] = end_datetime

    body["metadata"] = {
        "store_name": store_name,
        "address": store_address,
        "market_segment": market_segment
    }
    
    url = f"https://api.maxar.com/monitoring/v1/monitors/{monitor_id}"
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {TOKEN}"
    }
    
    print("\nSending PATCH request to update monitor...")
    try:
        resp = requests.patch(url, headers=headers, json=body)
        if resp.ok:
            print("Monitor updated successfully!")
            print("Response JSON:")
            print(json.dumps(resp.json(), indent=2))
        else:
            print(f"Update failed. HTTP {resp.status_code}")
            try:
                print("Error response:", resp.json())
            except:
                print("Could not parse error response as JSON.")
    except Exception as e:
        print("Request failed:", str(e))

update_monitor()


## Filter/Sort monitors

```Sort``` and ```filter``` your monitors by specifying additional parameters in the request. The Monitoring API supports the following parameters:
* limit: the number of monitors to return in the response. Default amount is 10.
* filter:  one or more filters in the format of ```key:value```, filters are logically ANDed so a given filter can only be queried once.
* sort: the order in which the result will return (```asc``` or ```desc```).
* starting_after: The token returned when the sort order is ```asc```. Use this to fetch the next page of results.
* ending_before: The token returned when the sort order is ```desc```. Use this to fetch the next page of results.

Follow this steps to sort and filter your monitors:
1. Run the OAuth Validation process (in the first code cell) if it hasn't been already, this way you have a valid ```TOKEN``` variable.
2. Run the code below and start filling the inputs, if you want to leave an input blank just hit 'Enter'. This are some examples for the parameters:
    * Filters: ```creator_id:ABCD123```, or ```metadata.store_name:Walmart```
    * Sort Order: ```asc``` or ```desc```
    * Limit: How many results to be returned (default 10).
    * starting_after/ending_before: Provide the token from a previous response to paginate results.
3. Once all the inputs are set, the code will run the request and the response will display below the code cell.


In [None]:
import requests
import pandas as pd

def fetch_monitors_with_params():
    # Prompt user for inputs ( filter(s), limit, sorting order & pagination tokens) to fetch monitors accordingly.

    global TOKEN

    if not TOKEN:
        print("Error: No token available. Please authenticate.")
        return
    
    user_filter = input("Enter filter(s) in 'key:value' format, comma-separated (or blank): ")
    sort_order = input("Enter the sorting order ('asc' or 'desc'), default desc: ").lower() or "desc"
    limit_amount = input("Enter the limit (integer), default 10: ")
    starting_after = input("Enter the starting_after token (optional): ")
    ending_before = input("Enter the ending_before token (optional): ")

    try:
        limit = int(limit_amount) if limit_amount else 10
    except ValueError:
        print("Invalid limit, using default amount.")
        limit = 10
    
    # Build base URL
    base_url = "https://api.maxar.com/monitoring/v1/monitors"
    url = f"{base_url}?limit={limit}&sort={sort_order}"

    if user_filter:
        # Split by comma, then add &filter= for each chunk.
        filter_parts = [f.strip() for f in user_filter.split(",") if f.strip()]
        for fp in filter_parts:
            url += f"&filter={fp}"
    
    if starting_after:
        url += f"&starting_after={starting_after}"
    if ending_before:
        url += f"&ending_before={ending_before}"
    
    print(f"\nFetching monitors with URL:\n{url}\n")
    
    # Build request 
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {TOKEN}"
    }
    
    try:
        resp = requests.get(url, headers=headers)
        resp.raise_for_status()
    except requests.exceptions.RequestException as e:
        print(f"Request failed: {e}")
        return
    
    data = resp.json() 
    filtered_monitors = data.get("data", {}).get("monitors", [])
    
    if not filtered_monitors:
        print("No monitors found for the given parameters.")
        return
    
    # Build the list of fields from the filtered monitors
    organized_monitors = []
    for m in filtered_monitors:
        metadata = m.get("metadata", {})
        organized_monitors.append({
            "id": m["id"],
            "description": m.get("description", ""),
            "store_name": metadata.get("store_name", ""),
            "market_segment": metadata.get("market_segment", ""),
            "address": metadata.get("address", ""),
            "enabled": m.get("enabled", False)
        })

    MONITORS = organized_monitors


    # Display the results
    df = pd.DataFrame(MONITORS)
    display(df[["id", "description", "store_name", "market_segment", "address", "enabled"]])

    
    # If you need the next_page_token for subsequent pagination, it can be retrieved.
    next_page_token = data.get("data", {}).get("next_page_token")
    if next_page_token:
        print("\nUse this next_page_token for further pagination:")
        print("next_page_token =", next_page_token)

# Use example
fetch_monitors_with_params()


     

## List Monitor Events

Display all the events of all the monitors and specify additional parameters for more precise result. Following these steps to display your events:
1.  Run the OAuth Validation process (in the first code cell) if it hasn't already, this way you have a valid ```TOKEN``` variable.
2.  Run the code below and start filling the inputs, if you want to leave an input blank just hit 'Enter'. This are some examples for the parameters:
    * Start Date: provide a start_date in the format of ```YYYY-MM-DD```.
    * End Date: provide a start date in the format of ```YYYY-MM-DD```.
    * Sort Order: ```asc``` or ```desc```
3. Once all the inputs are set, the code will run the request and the response will display below the code cell.

In [None]:
import requests
import pandas as pd
from datetime import datetime

global TOKEN, CREATOR_ID

if not TOKEN:
    raise ValueError("Error: No token available. Please authenticate.")

base_url =  "https://api.maxar.com"
headers = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {TOKEN}"
}

# Fetch all monitors
def fetch_monitors():
    url = f"{base_url}/monitoring/v1/monitors?sort=desc&filter=creator_id:{CREATOR_ID}"
    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        data = response.json()
        monitors = data.get("data", {}).get("monitors", [])
        return monitors
    except requests.exceptions.RequestException as e:
        print("Error fetching monitors.")
        return []

# Fetch a single monitor's events
def fetch_single_monitor_events(monitor_id, start_date, end_date, event_type, limit=100, sort="desc"):

    params = [
        f"limit={limit}",
        f"sort={sort}"
    ]

    if start_date:
        params.append(f"start_date={start_date}")
    if end_date:
        params.append(f"end_date={end_date}")
    if event_type:
        params.append(f"event_type={event_type}")
    
    query_s = "&".join(params)
    url = f"{base_url}/monitoring/v1/monitors/{monitor_id}/events?{query_s}"

    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        data = response.json()
        events = data.get("data", {}).get("events", [])
        return events
    except requests.exceptions.RequestException as e:
        #print(f"Error fetching events for monitor {monitor_id}.")
        return []
    
# Fetch all events from all monitors
def fetch_all_events(start_date, end_date, event_type):
    all_events = []
    monitors = fetch_monitors()

    for m in monitors:
        monitor_id = m["id"]
        monitor_events = fetch_single_monitor_events(monitor_id, start_date, end_date, event_type)

        # Add displayed fields in events feed page
        store_name = m.get("metadata", {}).get("store_name", "")
        market_segment = m.get("metadata", {}).get("market_segment", "")
        store_address = m.get("metadata", {}).get("address", "")

        # Flatten event list
        for evt in monitor_events:
            displayed_event = {
                "monitor_id": monitor_id,
                "store_name": store_name,
                "market_segment": market_segment,
                "address": store_address,
                "event_id": evt["id"],
                "event_timestamp": evt.get("event_timestamp", ""),
                "type": evt.get("event", {}.get("type", "")),
            }

            all_events.append(displayed_event)
    
    return all_events

def get_input(prompt, default=None):
    
    user_input = input(f"{prompt} [{'Enter' if default is None else default}]: ").strip()
    return user_input if user_input else default

# Validate date fields
def validate_date(date_s):
    try:
        return datetime.strptime(date_s, "%Y-%m-%d").strftime("%Y-%m-%dT%H:%M:%SZ")
    except (ValueError, TypeError):
        return None

#Get user inputs and validate date parameters
start_date_filter = validate_date(get_input("Enter start date (YYY-MM-DD) or press Enter to skip"))
end_date_filter = validate_date(get_input("Enter end date (YYY-MM-DD) or press Enter to skip"))
event_type_filter = get_input("Enter event type or press Enter to skip")

events = fetch_all_events(start_date_filter, end_date_filter, event_type_filter)
print(f"Fetched {len(events)} events in total.")

df = pd.DataFrame(events)

display(df)