Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opcua File Example #371

Closed
jadamroth opened this issue Dec 22, 2020 · 1 comment
Closed

Opcua File Example #371

jadamroth opened this issue Dec 22, 2020 · 1 comment

Comments

@jadamroth
Copy link
Contributor

This isn't an issue, but to help anyone else who comes along looking for an example of reading and writing to Opcua File Objects like in the opcua docs.

I didn't see any other examples or issues of anyone implementing it, so I thought it might be useful. I'm new to this library and don't know the proper structure or else I'd submit a pull request to add the feature.

I didn't test the write method below because I didn't need it personally, but I think it should work (or at least be solvable if it isn't)

Thanks for the library

from asyncua import ua

class OpcuaFile:
    """ class object for reading files in opcua """

    def __init__(self, client, node=None, index=None, name_of_node=None):
        """
        :param client: opcua client (e.g. async with Client(url=url) as client)
        :param node: node of file object (e.g. node = client.get_node("ns=2;s=nameOfNode")
        :param index: namespace, integer (ns=2)
        :param name_of_node: string, name of the node
        """
        self._client = client

        # set node
        if node is not None:
            self._node = node
        elif index is not None and name_of_node is not None:
            self._node = client.get_node("ns=" + str(index) + ";s=" + name_of_node)
        else:
            raise Exception("missing class init arguments")

    async def open(self, open_mode):
        """ open file method """
        open_node = await self._node.get_child("Open")
        arg = ua.Variant(open_mode, ua.VariantType.Byte)
        return await self._node.call_method(open_node, arg)

    async def get_size(self):
        """ gets size of config """
        size_node = await self._node.get_child("Size")
        size = self._client.get_node(size_node)
        return await size.read_value()

    async def read(self):
        """ read file method, returns contents as file """
        handle = await self.open(ua.OpenFileMode.Read.value)
        size = await self.get_size()

        read_node = await self._node.get_child("Read")
        arg1 = ua.Variant(handle, ua.VariantType.UInt32)
        arg2 = ua.Variant(size, ua.VariantType.Int32)
        contents = await self._node.call_method(read_node, arg1, arg2)
        await self.close(handle)
        return contents

    async def write(self, file_contents):
        """ writes contents to file """
        handle = await self.open(ua.OpenFileMode.Write.value)

        write_node = await self._node.get_child("Write")
        arg1 = ua.Variant(handle, ua.VariantType.UInt32)
        arg2 = ua.Variant(file_contents, ua.VariantType.String)
        await self._node.call_method(write_node, arg1, arg2)
        await self.close(handle)

    async def close(self, handle):
        """ close file method """
        read_node = await self._node.get_child("Close")
        arg1 = ua.Variant(handle, ua.VariantType.UInt32)
        return await self._node.call_method(read_node, arg1)

Then call with something like this

import asyncio
from asyncua import Client, ua

async def read_file():
    url = "opc.tcp://192.168.1.15:4840"
    async with Client(url=url) as client:
        node = client.get_node("ns=2;s=NameOfNode")
        #file = OpcuaFile(client, index=2, name_of_node="NameOfNode") # option 1
        file = OpcuaFile(client, node) # option2
        contents = await file.read()

asyncio.run(read_file())
@jadamroth
Copy link
Contributor Author

I should note that are other ways to optimize this, such as adding the Identifiers (e.g. Identifiers.FileType_Open is i=11580), so we don't have to lookup the node by name each time. If I get around to it, I'll update

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant