In [None]:
import time
import boto3
from opcua import Client
from datetime import datetime

class UmatiDynamoDBIntegration:
    def __init__(self, server_url: str, table_name: str, region: str = 'eu-central-1'):
        self.server_url = server_url
        self.dynamodb = boto3.resource('dynamodb', region_name=region)
        self.table = self.dynamodb.Table(table_name)

    def collect_and_store(self):
        try:
            # Connect to OPC UA server (Blocking call)
            client = Client(self.server_url)
            client.connect()  # Synchronous connection

            # Retrieve data from stack light nodes (L1, L2, L3)
            l1_node = client.get_node("ns=14;i=58676")  # L1
            l2_node = client.get_node("ns=14;i=58682")  # L2
            l3_node = client.get_node("ns=14;i=58665")  # L3

            # Get the state of each light
            l1_state = l1_node.get_value()
            l2_state = l2_node.get_value()
            l3_state = l3_node.get_value()

            # Prepare data to be stored in DynamoDB
            item = {
                "machineId": "machine_4",  # Partition key (machine ID)
                "timestamp": datetime.utcnow().isoformat(),  # Sort key with current timestamp
                "L1_state": l1_state,  # State of L1 stack light
                "L2_state": l2_state,  # State of L2 stack light
                "L3_state": l3_state,  # State of L3 stack light
                "status": "running"  # Machine status (this can be dynamic if needed)
            }

            # Store in DynamoDB
            self.table.put_item(Item=item)

            # Disconnect from OPC UA server
            client.disconnect()  # Blocking call

        except Exception as e:
            print(f"Error collecting or storing data: {e}")

def main():
    # Initialize the integration
    integration = UmatiDynamoDBIntegration(
        server_url='opc.tcp://opcua.umati.app:4840',  # OPC UA server URL
        table_name='MachineStackLights'  # New DynamoDB table name
    )
    
    # Run continuous data collection and storage loop
    while True:
        try:
            integration.collect_and_store()  # Synchronous call now
            print("Data collected and stored successfully.")
            # Wait 30 seconds before the next collection
            time.sleep(30)  # Use synchronous sleep here
        except Exception as e:
            # In case of an error, continue retrying after 5 seconds
            print(f"Error in main loop: {e}")
            time.sleep(5)

if __name__ == "__main__":
    main()


Requested secure channel timeout to be 3600000ms, got 600000ms instead
  "timestamp": datetime.utcnow().isoformat(),  # Sort key with current timestamp


Data collected and stored successfully.


Requested secure channel timeout to be 3600000ms, got 600000ms instead


Data collected and stored successfully.


KeyboardInterrupt: 