# Data Lake Example 4 - Event Notifications
For event notifications, there are several useful scenarios in which notifications can inform us about changes or actions taken on objects in a bucket. Notifications are triggered when specific actions (like object creation, deletion, or updates) occur, and they can initiate workflows in other systems like Node-RED or NiFi.

Since Kafka and RabbitMQ aren’t available in the current setup of the data lake system, we can focus on using webhook-based notifications with Node-RED.
* Example of use-case: Trigger an automated workflow when uploading a new file. For example, if a new video file (.webm) is uploaded, Node-RED (or NiFi) can start a workflow to process the file, such as generating thumbnails or analysing metadata.
* Notification Event: s3:ObjectCreated:* (triggers for any type of object creation event).


## 0. Create a Webhook endpoint in Node-RED
There is a basic example at: https://youtu.be/HzO4wsL2Eio?si=NDiqUGyHaKv5hm4K
### a. *Open Node-RED* in your browser 

* Typically at https://nodered.scene-dl.satrdlab.upv.e unless configured otherwise.

### b. *Add an HTTP In Node*:

* Drag an http in node onto the canvas. This will act as the webhook endpoint.
* Set the method to POST and URL to /test-minio (or any path you prefer).
* This endpoint URL will be https://nodered.scene-dl.satrdlab.upv.es/test-minio (assuming Node-RED runs through the proxy).

### c. *Add a Debug Node*:

* Drag a debug node onto the canvas.
* Connect the http in node to the debug node.
* Set the debug node to display the full message object (msg.payload). This will allow us to see the incoming notification data in the debug pane.

### d. *Add an HTTP Response Node*:

* Drag an http response node to the canvas.
* Connect the http in node to the http response node.
* This node will send a response back to Minio, confirming that the notification was received.

### e. *Deploy the Flow*:

* Click the Deploy button to save and activate the flow.
* Once deployed, Node-RED will be ready to receive event notifications at http://localhost:1880/s3-webhook.

## 1. Problems with development libraries
- Unfortunately, Boto3 does not directly support webhook notifications with MinIO
- Performing direct request to Minio's API with the Python 'requests' library is not straightforward
- Instead, it is more practical to directly use the mc client library

## 1. Load libraries and common configuration
For this example, it is better to use the minio library, otherwise you might get errors due to not-compatibility formats with the webhook-functionality

Example for not-compatibbility: WebhookConfigurations, CloudFunctionConfigurations, and EventBridgeConfigurations are not valid keys in the NotificationConfiguration dictionary for the native AWS S3 API. These configurations were specific to Minio's custom webhook functionality, which isn't compatible with the standard boto3 API.

In [26]:
import requests
import json
import hashlib
import hmac
import datetime

# Información de conexión
minio_url = 's3api.scene-dl.satrdlab.upv.es'  # Solo dominio
access_key = 'testuser'
secret_key = 'testscene'
bucket_name = 'testbucket'
region = 'us-east-1'

# Webhook URL de Node-RED
node_red_webhook_url = "https://nodered.scene-dl.satrdlab.upv.es/test-minio"

# Configuración de notificación
notification_config = {
    "QueueConfigurations": [
        {
            "QueueArn": "arn:minio:sqs::webhook",
            "Events": ["s3:ObjectCreated:*"],
            "Filter": {
                "Key": {
                    "FilterRules": [
                        {"Name": "suffix", "Value": "*"}
                    ]
                }
            },
            "Queue": {
                "Endpoint": node_red_webhook_url,
                "QueueArn": "arn:minio:sqs::webhook"
            }
        }
    ]
}

# Convertir configuración a JSON
json_config = json.dumps(notification_config)

# Obtener fecha y hora actual en UTC
current_time = datetime.datetime.utcnow()
amz_date = current_time.strftime('%Y%m%dT%H%M%SZ')
date_stamp = current_time.strftime('%Y%m%d')

# Canonical request
canonical_uri = f'/{bucket_name}'
canonical_querystring = 'notification'
canonical_headers = f'host:{minio_url}\nx-amz-date:{amz_date}\n'
signed_headers = 'host;x-amz-date'
payload_hash = hashlib.sha256(json_config.encode('utf-8')).hexdigest()

canonical_request = (
    f'PUT\n'
    f'{canonical_uri}\n'
    f'{canonical_querystring}\n'
    f'{canonical_headers}\n'
    f'{signed_headers}\n'
    f'{payload_hash}'
)

# String to sign
algorithm = 'AWS4-HMAC-SHA256'
credential_scope = f'{date_stamp}/{region}/s3/aws4_request'
string_to_sign = (
    f'{algorithm}\n'
    f'{amz_date}\n'
    f'{credential_scope}\n'
    f'{hashlib.sha256(canonical_request.encode("utf-8")).hexdigest()}'
)

# Crear la clave de firma
def sign(key, msg):
    return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()

def get_signature_key(key, date_stamp, region_name, service_name):
    k_date = sign(('AWS4' + key).encode('utf-8'), date_stamp)
    k_region = sign(k_date, region_name)
    k_service = sign(k_region, service_name)
    k_signing = sign(k_service, 'aws4_request')
    return k_signing

signing_key = get_signature_key(secret_key, date_stamp, region, 's3')
signature = hmac.new(signing_key, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest()

# Encabezado de autorización
authorization_header = (
    f'{algorithm} Credential={access_key}/{credential_scope}, '
    f'SignedHeaders={signed_headers}, Signature={signature}'
)

# Encabezados de la solicitud
headers = {
    'Content-Type': 'application/json',
    'x-amz-date': amz_date,
    'Authorization': authorization_header
}

# Realizar la solicitud PUT
url = f"https://{minio_url}/{bucket_name}?notification"
response = requests.put(url, headers=headers, data=json_config)

# Resultado de la solicitud
if response.status_code == 200:
    print("Notificación configurada exitosamente.")
else:
    print("Error al configurar la notificación:", response.content.decode())



Error al configurar la notificación: <?xml version="1.0" encoding="UTF-8"?>
<Error><Code>SignatureDoesNotMatch</Code><Message>The request signature we calculated does not match the signature you provided. Check your key and signing method.</Message><BucketName>testbucket</BucketName><Resource>/testbucket</Resource><RequestId>1806E6F7D50296DD</RequestId><HostId>dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8</HostId></Error>


## 2. Configuring S3 Event Notifications

In [22]:
# Parameters
bucket_name = "testbucket"
node_red_webhook_url = "https://nodered.scene-dl.satrdlab.upv.es/test-minio"  # Node-RED webhook URL for notifications

# Define the event notification configuration
notification_config = {
    "LambdaFunctionConfigurations": [
        {
            "LambdaFunctionArn": f"arn:minio:sqs::NodeRedQueue:{node_red_webhook_url}",
            "Events": ["s3:ObjectCreated:*"]
        }
    ]
}


# Convert to JSON format
notification_json = json.dumps(notification_config)

# Get current date and time in the required format (e.g., 20231106T123456Z)
current_date = datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')

# Apply the notification configuration to the bucket
try:
    response = s3_client.put_bucket_notification_configuration(
        Bucket=bucket_name,
        NotificationConfiguration=notification_config
    )
    print("Notification configuration applied:", response)
except Exception as e:
    print("Error setting notification:", e)

Error setting notification: An error occurred (UnsupportedNotification) when calling the PutBucketNotificationConfiguration operation: MinIO server does not support Topic or Cloud Function based notifications.


## 2. List all files from a bucket within a Data Lake

In [6]:
# List objects in the bucket
response = client.list_objects(Bucket=bucket_name)

# Print each file name (key)
if 'Contents' in response:
    for file in response['Contents']:
        print(file['Key'])
else:
    print("No files found in the bucket.")

athens.jpg
images/athens2.png


## 3. Download file from a Data Lake

In [7]:
# File details
download_path='athens_download.png'
bucket_name='testbucket'
object_name = 'images/athens2.png'    

# Download the file
client.download_file(bucket_name, object_name, download_path)
print(f"Downloaded {object_name} to {download_path}")

Downloaded images/athens2.png to athens_download.png


## 4. Delete a file from the Data Lake

In [8]:
# Delete the file
client.delete_object(Bucket=bucket_name, Key=object_name)
print("Delete successful")

Delete successful
