Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported by contacting the project team at [customer@eteexr.com](customer@eteexr.com). All
complaints will be reviewed and investigated and will result in a response that
is deemed necessary and appropriate to the circumstances. The project team is
obligated to maintain confidentiality with regard to the reporter of an incident. + + + +## About the Project + +This API enables easy device data reading and communication. + +In this repository, you can access: +- etee Python API package source code +- etee Python API example scripts +- Setup files + +With this API, you will be able to retrieve a wide range of [data from the eteeControllers][url-documentation-data]: +* **Tactile data**: individual finger curling and pressure, trackpad, slider LED, tracker (connection, proximity sensor; no positional tracking) and system button. + +

+ +
+ Tactile and sensing data from the eteeController. +

+ +* **Gestures**: grip, pinch and point gestures. + +

+ +
+ Main eteeController gestures. +

+ +* **Inertial Measuring Units (IMU) and 3D rotation**: raw 9-axis IMU data (accelerometer, gyroscope, magnetometer data) and 3D rotation estimations (quaternion, euler angles). +* **Device State**: right or left hand. +* **Battery State**: charge level, charge status. + +For more information, please, visit our documentation site: +[Developer Documentation - etee Python API][url-documentation] + +

(back to top)

## Getting Started


### Step 1: Clone the repository

1. Open a command prompt.
2. Change the current working directory to the location where you want the cloned directory.
3. Type git clone, and then paste the URL for the Github repository.
   ```sh
   git clone https://github.com/eteeXR/etee-Python-API.git
   ```


### Step 2: Package Installation & Environment
This repository comes ready with two different setup files:
- `setup_repo.py` will create a virtual environment (venv) using the dependencies listed in `requirements.txt`.
- `setup.py` will automate the ***etee-api*** package installation.

> **Note:** You will need the pip package to run the installation.

#### 2.1. Setting Up a Virtual Environment

> **If you do not want to run the example scripts**, please skip this step and move to:
> 2.2. Installing the Package.

After cloning the repository, you will need to install all the project dependencies to run the example code.
Unlike *step 2.2*., these steps will install extra dependencies required only when running the example scripts
(e.g. `keyboard`, `matplotlib`).

1. Open a command prompt.
2. Navigate to the directory where the `setup_repo.py` is located.
3. Run the script.
   ```sh
   python setup_repo.py
   ```
4. This should have created a venv inside your project directory. You can activate it at any time by navigating
to the directory containing the /venv folder and entering the following command line:
   ```sh
   .venv\Scripts\activate.bat
   ```


#### 2.2. Installing the Package

You can automate the installation of the ***etee-api*** package and all its dependencies in
your Python environment, by using the `setup.py` script.

1. Open a command prompt.
2. If you followed *step 2.1.*, make sure that you activate the venv (see previous section).
3. Navigate to the directory where the `setup.py` is located.
4. Use the pip method to run the installation file.
   ```sh
   pip install .
   ```
5. To check that the package has been successfully installed, you can run the pip list command.
   ```sh
   pip list
   ```
   If the package was installed, you will see its name and version listed:
   ```text
   Package Version
   --------------- -------
   etee-api 1.0.0
   ```

(back to top)

## Usage

### Hardware Setup

To get started, set up the hardware. This involves connecting the eteeDongle to your
PC or laptop, and turning ON the eteeControllers. You will know they are connected if the eteeDongle keeps blinking in
the following pattern: pink-pink-blue.

### Example Scripts

If you followed the previous steps on installation, you should be able to run any example scripts.

1. Open a command prompt.
2. In case you followed *step 2.1.*, make sure that you activate the venv.
3. Navigate to the directory where the example script is located.
4. Use the python method to run the example script.
   ```sh
   python example_script_name.py
   ```

### Further Information
We also have quickstart and more detailed developer guides
in our documentation page. This might help you understand the different API functionalities and how to integrate them
in your custom applications.

_To learn more about the API and the eteeController data, visit our Developer Documentation Site_

(back to top)

## Contributing

### How to Contribute

Contributions are what make the open source community such an amazing place to learn, inspire, and create.
Any contributions you make are **greatly appreciated**.

If you have a suggestion that would make this better, please fork the repo and create a pull request.
You can also simply open an issue to describe your suggestion or report a bug.

1. Fork the Project
2. Create your Feature Branch (`git checkout -b feature/AmazingFeature`)
3. Commit your Changes (`git commit -m 'Add some AmazingFeature'`)
4. Push to the Branch (`git push origin feature/AmazingFeature`)
5. Open a Pull Request

### Semantic Type Prefixes

To help us and the community easily identify the nature of your *commit* or *issue*, use **semantic type prefixes**
to precede your message / title.

Some common type prefixes include:

- `feat`: A new feature for the user, not a new feature for a build script.
- `fix`: Bug fix for the user, not a fix for a build scripts.
- `enhanc`: Enhancement or improvement to existing feature.
- `perf`: Code improved in terms of processing performance.
- `refactor`: Refactoring production code (e.g. renaming a variable).
- `chore`: Changes to the build process or auxiliary tools and libraries.
- `docs`: Changes to documentation.
- `style`: Formatting, missing semicolons, etc. No code change.
- `vendor`: Update version for dependencies and other packages.
- `test`: Adding missing tests or refactoring tests. No production code change.

**Format**: `(): `, where < scope > is optional.

For example, your commit message header might look like this:
```text
feat(imu): Implemented Euler angles estimation from quaternion
```

(back to top)

## License

Distributed under the Apache 2.0 License. See `LICENSE.txt` for more information.

(back to top)

## Authors

This repository was created by the TG0 team, for the etee brand.

Code and documentation authors include:
- **Dimitri Chikhladze** (API development and documentation)
- **Pilar Zhang Qiu** (API development, documentation and releases)

(back to top)

## Contact

For any queries or reports about the API, please use the **Issues tab** in this repository.
When possible, use an identifier to help us describe your query, report or request.

For further support or queries, you can contact us:
- Support e-mail: [customer@eteexr.com](customer@eteexr.com)
- Support Form: [https://eteexr.com/pages/support-form](https://eteexr.com/pages/support-form)

(back to top)

+ + + + +[url-documentation]: https://tg0-etee-python-api.readthedocs-hosted.com/en/latest/index.html# +[url-documentation-setup-hw]: https://tg0-etee-python-api.readthedocs-hosted.com/en/latest/setup.html +[url-documentation-quickstart]: https://tg0-etee-python-api.readthedocs-hosted.com/en/latest/quickstart.html +[url-documentation-api-functions]: https://tg0-etee-python-api.readthedocs-hosted.com/en/latest/guide.html +[url-documentation-data]: https://tg0-etee-python-api.readthedocs-hosted.com/en/latest/serial.html#etee-packet-elements +[url-issues-tab]: https://github.com/eteeXR/etee-Python-API/issues +[url-python-venv]: https://docs.python.org/3/tutorial/venv.html +[url-python-package-installation]: https://packaging.python.org/en/latest/tutorials/installing-packages/ +[url-tg0-website]: https://tg0.co.uk/ +[url-eteexr-website]: https://eteexr.com/ +[url-semantic-type-prefixes-1]: https://dev.to/puritanic/how-are-you-writing-a-commit-message-1ih7 +[url-semantic-type-prefixes-2]: http://karma-runner.github.io/1.0/dev/git-commit-msg.html diff --git a/etee/__init__.py b/etee/__init__.py new file mode 100644 index 0000000..8d70ac6 --- /dev/null +++ b/etee/__init__.py @@ -0,0 +1,8 @@ +# Import subpackage methods +from .tangio_for_etee import * + +# Import main package methods +from .quaternion import * +from .ahrs import * +from .driver_eteecontroller import * +from ._version import __version__ diff --git a/etee/_version.py b/etee/_version.py new file mode 100644 index 0000000..5becc17 --- /dev/null +++ b/etee/_version.py @@ -0,0 +1 @@ +__version__ = "1.0.0" diff --git a/etee/ahrs.py b/etee/ahrs.py new file mode 100644 index 0000000..e3676e6 --- /dev/null +++ b/etee/ahrs.py @@ -0,0 +1,234 @@ +""" +License: +-------- +Copyright 2022 Tangi0 Ltd. (trading as TG0) + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + +File description: +----------------- +Class and methods for the Attitude and Heading Reference System (AHRS) calculations. +Includes methods to set the IMU sensor offsets, update the AHRS or calculate the device quaternions. + +""" + +import math +from queue import Queue +import time +import warnings + +import numpy as np +from numpy.linalg import norm +from etee import Quaternion + + +class Ahrs: + """ + Class implementing Attitude and Heading Reference System (AHRS) calculations. + """ + beta = 0.0315 + accel_sensitivity = 4.0 / 32768.0 + gyro_sensitivity = (2000.0 / 32768.0) * (math.pi/180) + mag_sensitivity = [0.38, 0.38, 0.61] + + @staticmethod + def _current_seconds_time(): + """ + Get current time. + + :return: current time as float + """ + return time.time() + + def __init__(self, sampleperiod=None, quaternion=None, beta=None): + """ + Initialize the class with the given parameters. + + :param float sampleperiod: the sample period + :param list[float] quaternion: initial quaternion + :param float beta: algorithm gain beta + """ + if sampleperiod is not None: + self.samplePeriod = sampleperiod + if quaternion is not None: + self.quaternion = quaternion + if beta is not None: + self.beta = beta + + self.samplePeriod = 1/97 + self.quaternion = Quaternion(1, 0, 0, 0) + self.euler = [0, 0, 0] + self.dynamicFrequencyQueue = Queue(100) + + self.gyro_offset = [0, 0, 0] + self.mag_offset = [0, 0, 0] + + def set_gyro_offset(self, value): + """ + Set gyroscope offsets to given values. + + :param list[float] value: New gyroscope offset values + """ + self.gyro_offset = value + + def set_mag_offset(self, value): + """ + Set magnetometer offsets to given values. + + :param list[float] value: New magnetometer offset values + """ + self.mag_offset = value + + def update(self, gyroscope, accelerometer, magnetometer): + """ + Perform one update step with data from a AHRS sensor array. + + :param list[float] gyroscope: A three-element array containing the gyroscope data in radians per second. + :param list[float] accelerometer: A three-element array containing the accelerometer data. + Can be any unit since a normalized value is used. + :param list[float] magnetometer: A three-element array containing the magnetometer data. + Can be any unit since a normalized value is used. + """ + q = self.quaternion + + gyroscope = np.array(gyroscope, dtype=float).flatten() + accelerometer = np.array(accelerometer, dtype=float).flatten() + magnetometer = np.array(magnetometer, dtype=float).flatten() + + # Normalise accelerometer measurement + if norm(accelerometer) == 0: + warnings.warn("accelerometer is zero") + return + accelerometer /= norm(accelerometer) + + # Normalise magnetometer measurement + if norm(magnetometer) == 0: + warnings.warn("magnetometer is zero") + return + magnetometer /= norm(magnetometer) + + h = q * (Quaternion(0, magnetometer[0], magnetometer[1], magnetometer[2]) * q.conj()) + b = np.array([0, norm(h[1:3]), 0, h[3]]) + + # Gradient descent algorithm corrective step + f = np.array([ + 2*(q[1]*q[3] - q[0]*q[2]) - accelerometer[0], + 2*(q[0]*q[1] + q[2]*q[3]) - accelerometer[1], + 2*(0.5 - q[1]**2 - q[2]**2) - accelerometer[2], + 2*b[1]*(0.5 - q[2]**2 - q[3]**2) + 2*b[3]*(q[1]*q[3] - q[0]*q[2]) - magnetometer[0], + 2*b[1]*(q[1]*q[2] - q[0]*q[3]) + 2*b[3]*(q[0]*q[1] + q[2]*q[3]) - magnetometer[1], + 2*b[1]*(q[0]*q[2] + q[1]*q[3]) + 2*b[3]*(0.5 - q[1]**2 - q[2]**2) - magnetometer[2] + ]) + j = np.array([ + [-2*q[2], 2*q[3], -2*q[0], 2*q[1]], + [2*q[1], 2*q[0], 2*q[3], 2*q[2]], + [0, -4*q[1], -4*q[2], 0], + [-2*b[3]*q[2], 2*b[3]*q[3], -4*b[1]*q[2]-2*b[3]*q[0], -4*b[1]*q[3]+2*b[3]*q[1]], + [-2*b[1]*q[3]+2*b[3]*q[1], 2*b[1]*q[2]+2*b[3]*q[0], 2*b[1]*q[1]+2*b[3]*q[3], -2*b[1]*q[0]+2*b[3]*q[2]], + [2*b[1]*q[2], 2*b[1]*q[3]-4*b[3]*q[1], 2*b[1]*q[0]-4*b[3]*q[2], 2*b[1]*q[1]] + ]) + step = j.T.dot(f) + step /= norm(step) # normalise step magnitude + + # Compute rate of change of quaternion + qdot = (q * Quaternion(0, gyroscope[0], gyroscope[1], gyroscope[2])) * 0.5 - self.beta * step.T + + # Integrate to yield quaternion + q += qdot * self.samplePeriod + self.quaternion = Quaternion(q / norm(q)) # normalise quaternion + + def update_imu(self, gyroscope, accelerometer): + """ + Perform one update step with data from an IMU sensor array. + + :param list[float] gyroscope: A three-element array containing the gyroscope data in radians per second. + :param list[float] accelerometer: A three-element array containing the accelerometer data. + Can be any unit since a normalized value is used. + """ + q = self.quaternion + + gyroscope = np.array(gyroscope, dtype=float).flatten() + accelerometer = np.array(accelerometer, dtype=float).flatten() + + # Normalise accelerometer measurement + if norm(accelerometer) == 0: + return + accelerometer /= norm(accelerometer) + + # Gradient descent algorithm corrective step + f = np.array([ + 2*(q[1]*q[3] - q[0]*q[2]) - accelerometer[0], + 2*(q[0]*q[1] + q[2]*q[3]) - accelerometer[1], + 2*(0.5 - q[1]**2 - q[2]**2) - accelerometer[2] + ]) + j = np.array([ + [-2*q[2], 2*q[3], -2*q[0], 2*q[1]], + [2*q[1], 2*q[0], 2*q[3], 2*q[2]], + [0, -4*q[1], -4*q[2], 0] + ]) + step = j.T.dot(f) + step /= norm(step) # normalise step magnitude + + # Compute rate of change of quaternion + qdot = (q * Quaternion(0, gyroscope[0], gyroscope[1], gyroscope[2])) * 0.5 - self.beta * step.T + + # Integrate to yield quaternion + q += qdot * self.samplePeriod + self.quaternion = Quaternion(q / norm(q)) # normalise quaternion + + def get_quaternion(self, gyroscope, accelerometer, magnetometer=None): + """ + Calculate and return the quaternion values given the IMU sensors values. + + :param list[float] gyroscope: A three-element array containing the gyroscope data in radians per second. + :param list[float] accelerometer: A three-element array containing the accelerometer data. + Can be any unit since a normalized value is used. + :param list[float] magnetometer: A three-element array containing the magnetometer data. + Can be any unit since a normalized value is used. + :return: Quaternion calculated from the given data. + """ + if self.dynamicFrequencyQueue.full(): + start_time = self.dynamicFrequencyQueue.get() + current_time = self._current_seconds_time() + self.dynamicFrequencyQueue.put(current_time) + self.samplePeriod = (current_time - start_time)/100 + else: + self.dynamicFrequencyQueue.put(self._current_seconds_time()) + + gyroscope = np.array(gyroscope) - np.array(self.gyro_offset) + gyroscope = gyroscope * self.gyro_sensitivity + accelerometer = np.array(accelerometer) * self.accel_sensitivity + + if magnetometer is None: + self.update_imu(gyroscope, accelerometer) + else: + magnetometer = np.array(magnetometer) - np.array(self.mag_offset) + magnetometer = magnetometer * np.array(self.mag_sensitivity) + self.update(gyroscope, accelerometer, magnetometer) + return self.quaternion + + def get_euler(self, gyroscope, accelerometer, magnetometer=None): + """ + Estimate and return the euler angles for the given IMU sensor values. + + :param list[float] gyroscope: A three-element array containing the gyroscope data in radians per second. + :param list[float] accelerometer: A three-element array containing the accelerometer data. + Can be any unit since a normalized value is used. + :param list[float] magnetometer: A three-element array containing the magnetometer data. + Can be any unit since a normalized value is used. + :return: Euler angles estimated from the given data. + """ + self.get_quaternion(gyroscope, accelerometer, magnetometer) + self.euler = self.quaternion.to_euler() + return self.euler diff --git a/etee/config/etee_controller.yaml b/etee/config/etee_controller.yaml new file mode 100644 index 0000000..1b1294a --- /dev/null +++ b/etee/config/etee_controller.yaml @@ -0,0 +1,218 @@ +total_bytes: + data_bytes: 42 + end_bytes: 2 + +widgets: + # -------------- Finger data -------------- + # Pinky + pinky_pull: + bit: [47, 46, 45, 44, 43, 42, 41] + pinky_force: + bit: [ 183, 182, 181, 180, 179, 178, 177 ] + pinky_touched: + byte: 5 + bit: 0 + pinky_clicked: + byte: 0 + bit: 7 + + # Ring + ring_pull: + bit: [39, 38, 37, 36, 35, 34, 33] + ring_force: + bit: [ 175, 174, 173, 172, 171, 170, 169 ] + ring_touched: + byte: 4 + bit: 0 + ring_clicked: + byte: 0 + bit: 6 + + # Middle + middle_pull: + bit: [31, 30, 29, 28, 27, 26, 25] + middle_force: + bit: [ 167, 166, 165, 164, 163, 162, 161 ] + middle_touched: + byte: 3 + bit: 0 + middle_clicked: + byte: 0 + bit: 5 + + # Index + index_pull: + bit: [23, 22, 21, 20, 19, 18, 17] + index_force: + bit: [ 159, 158, 157, 156, 155, 154, 153 ] + index_touched: + byte: 2 + bit: 0 + index_clicked: + byte: 0 + bit: 4 + + # Thumb + thumb_pull: + bit: [15, 14, 13, 12, 11, 10, 9] + thumb_force: + bit: [ 151, 150, 149, 148, 147, 146, 145 ] + thumb_touched: + byte: 1 + bit: 0 + thumb_clicked: + byte: 0 + bit: 3 + + # -------------- Trackpad data -------------- + trackpad_x: # Location + byte: 6 + trackpad_y: + byte: 7 + + trackpad_pull: # Pressure + bit: [111, 110, 109, 108, 107, 106, 105] + trackpad_force: + bit: [ 143, 142, 141, 140, 139, 138, 137 ] + + trackpad_touched: # Touch/Click + byte: 0 + bit: 2 + trackpad_clicked: + byte: 0 + bit: 1 + + # -------------- Slider -------------- + slider_value: # Y-axis location + bit: [79, 78, 77, 76, 75, 74, 73] + slider_touched: # Touch + byte: 9 + bit: 0 + slider_up_touched: # Slider Up/Down Buttons + byte: 11 + bit: 5 + slider_down_touched: + byte: 11 + bit: 6 + + # -------------- Proximity sensor (in tracker) -------------- + tracker_on: # Whether a VR tracker is connected + byte: 11 + bit: 2 + proximity_value: # Analog range + bit: [71, 70, 69, 68, 67, 66, 65] + proximity_touched: # Touch/Click + byte: 8 + bit: 0 + proximity_clicked: + byte: 11 + bit: 1 + + # -------------- IMU -------------- + # Accelerometer + accel_x: + byte: [23, 24] + single_value: + signed: + + accel_y: + byte: [25, 26] + single_value: + signed: + + accel_z: + byte: [27, 28] + single_value: + signed: + + # Magnetometer + mag_x: + byte: [29, 30] + single_value: + signed: + + mag_y: + byte: [31, 32] + single_value: + signed: + + mag_z: + byte: [33, 34] + single_value: + signed: + + # Gyroscope + gyro_x: + byte: [35, 36] + single_value: + signed: + + gyro_y: + byte: [37, 38] + single_value: + signed: + + gyro_z: + byte: [39, 40] + single_value: + signed: + + # -------------- Gestures -------------- + # Grip gesture + grip_pull: + bit: [87, 86, 85, 84, 83, 82, 81] + grip_force: + bit: [ 119, 118, 117, 116, 115, 114, 113 ] + grip_touched: # Gestures + byte: 10 + bit: 0 + grip_clicked: + byte: 11 + bit: 0 + + # Pinch with trackpad + pinch_trackpad_pull: + bit: [ 127, 126, 125, 124, 123, 122, 121 ] + pinch_trackpad_clicked: + byte: 15 + bit: 0 + + # Pinch with thumb finger + pinch_thumbfinger_pull: + bit: [ 135, 134, 133, 132, 131, 130, 129 ] + pinch_thumbfinger_clicked: + byte: 16 + bit: 0 + + # Point where trackpad can be used alongside the gesture. """
License:
--------
Copyright 2022 Tangi0 Ltd. (trading as TG0)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.


File description:
-----------------
Main file for package: etee.
Classes and methods for eteeController events, communication and data retrieval.

""" (trading as TG0) + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + +File description: +----------------- +Main file for package: etee. +Classes and methods for eteeController events, communication and data retrieval. + +""" + +import os +import time + +from .tangio_for_etee import TG0Driver, serial_ports, parse_utf8 +from . import Ahrs + +ETEE_CONTROLLER_DATA_CONFIG = os.path.join(os.path.dirname(__file__), "config", "etee_controller.yaml") + + +class EteeControllerEvent: + """ + This class manages eteeController driver events by allowing callback functions to be connected or disconnected from them. + """ + def __init__(self): + """ + Class constructor method. + """ + self.callbacks = list() + + def connect(self, callback): + """ + Connect a callback function to the event. + + :param callable callback: Callback function. + """ + self.callbacks.append(callback) + + def disconnect(self, callback): + """ + Remove a function from the event callbacks. + + :param callable callback: Callback function. + """ + self.callbacks.remove(callback) + + def emit(self): + """ + Emit event. + """ + for cb in self.callbacks: + cb() + + +class EteeController: + """ + This class manages the communication between the driver and the eteeControllers, through the eteeDongle. + It also handles the data loop which retrieves and stores the eteeControllers data into an internal buffer, + allowing for their retrieval through its class methods. + """ + ETEE_DONGLE_VID = 9114 + ETEE_DONGLE_PID = None + + def __init__(self): + """ + Class constructor method. + """ + self._hand_last_on_left = 0 + self._hand_last_on_right = 0 + self._api_data_left = None + self._api_data_right = None + self._frameno_left = 0 + self._frameno_right = 0 + self._ahrs_left = Ahrs() + self._ahrs_right = Ahrs() + self._quaternion_left = None + self._quaternion_right = None + self._absolute_imu_on = False + self._euler_left = None + self._euler_right = None + + self.driver = TG0Driver(ETEE_CONTROLLER_DATA_CONFIG) + self.driver.add_callback(self._api_data_callback) + self.driver.add_print_callback(self._print_callback) + self.driver.add_serial_exception_callbacks(self._serial_exception_callback) + + self.connection_port = None + self.dongle_connection = False + + # ---------------- Events ---------------- + self.left_hand_received = EteeControllerEvent() + """Event for receiving left controller data. + + :type: Event """ + + self.right_hand_received = EteeControllerEvent() + """Event for receiving right controller data. + + :type: Event """ + + self.hand_received = EteeControllerEvent() + """Event for receiving data from any controller. + + :type: Event """ + + self.left_hand_lost = EteeControllerEvent() + """Event for losing left controller connection. Occurs when data is not received for more than 0.5 seconds. + + :type: Event """ + + self.right_hand_lost = EteeControllerEvent() + """Event for losing right controller connection. Occurs when data is not received for more than 0.5 seconds. + + :type: Event """ + + self.data_lost = EteeControllerEvent() + """Event for losing data from both controllers. Occurs when data is not received for more than 0.5 seconds. + + :type: Event """ + + self.left_connected = EteeControllerEvent() + """Event for left controller connection detected by the dongle. + + :type: Event """ + + self.right_connected = EteeControllerEvent() + """Event for right controller connection detected by the dongle. + + :type: Event """ + + self.left_disconnected = EteeControllerEvent() + """Event for left controller disconnection from the dongle. + + :type: Event """ + + self.right_disconnected = EteeControllerEvent() + """Event for right controller disconnection from the dongle. + + :type: Event """ + + self.dongle_disconnected = EteeControllerEvent() + """Event for dongle disconnection. + + :type: Event """ + + # ---------------- Utility ---------------- + def connect_port(self, port=None): + """ + Attempt to establish serial connection to an etee dongle port. If a COM port argument is provided, + connection is attempted with the specified port. If the port argument is None, the driver automatically detects + any COM ports with an etee dongle and connects to the first available one. Default port value is None. + + :param str or None port: etee dongle COM port. + :return: Success flag - True if the connection is successful, False if otherwise + :rtype: bool + """ + if port[0:3] == 'COM': + return self.driver.connect(port) + else: + raise ValueError("The port value should be of the form 'COMx', where x is the COM port number.") + + def connect(self): + """ + Establish serial connection to an etee dongle. This function automatically detects etee dongles connected to a COM port + and connects to the first available one. + """ + available_ports = self.get_available_etee_ports() + print("The following ports found: {}".format(available_ports)) + if len(available_ports) > 0: + port = available_ports[0] + connected = self.connect_port(port) + if connected: + self.connection_port = port + self.dongle_connection = True + print("Connection to etee dongle successful.") + else: + self.connection_port = None + self.dongle_connection = False + print("Connection to etee dongle unsuccessful.") + else: + self.dongle_connection = False + + def get_number_available_etee_ports(self): + """ + Get the number of available etee dongle COM ports. + + :return: Number of available etee dongle ports. + :rtype: int + """ + available_ports = self.get_available_etee_ports() + return len(available_ports) + + def get_available_etee_ports(self): + """ + Get all available etee dongle COM ports. Other devices are automatically filtered out through a + VID and PID filtering method. + + :return: List of COM port names with etee dongles connected. + :rtype: list[str] + """ + return [x[0] for x in serial_ports(self.ETEE_DONGLE_VID, self.ETEE_DONGLE_PID)] + + def disconnect(self): + """ + Close serial connection to etee dongle. + + :return: Success flag - True if the connection was closed successfully, False if otherwise + :rtype: bool + """ + return self.driver.disconnect() + + def run(self): + """ + Initiates the data loop in a separate thread. The data loop reads serial data, parses it and stores it in an internal buffer. + The data loop also listens to serial and data events and manages event callback functions. + """ + self.driver.run() + + def stop(self): + """ + Stops the data loop. + """ + self.driver.stop() + + def start_data(self): + """ + Sends command to the etee controller to start the data stream. + """ + self.driver.send_command(b"BP+AG\r\n") + + def stop_data(self): + """ + Sends command to the etee controller to stop the data stream. + """ + self.driver.send_command(b"BP+AS\r\n") + + def _api_data_callback(self, frameno, data): + """ + Manages part of the data loop. Parses the argument data and stores it in the corresponding hand's + internal buffer and update the API events (e.g. hand received or hand lost). + + :param dict data: Dictionary of the parsed controller data. + """ + if data["hand"] == 0: + # print(time.time() - self.hand_last_on_left) + self._api_data_left = data + self._hand_last_on_left = time.time() + self._frameno_left += 1 + self._update_quaternion_left() + self.left_hand_received.emit() + + elif data["hand"] == 1: + self._api_data_right = data + self._hand_last_on_right = time.time() + self._frameno_right += 1 + self._update_quaternion_right() + self.right_hand_received.emit() + + self.hand_received.emit() + + if time.time() - self._hand_last_on_left > 0.1 and self._api_data_left is not None: + self._api_data_left = None + self.left_hand_lost.emit() + if time.time() - self._hand_last_on_right > 0.1 and self._api_data_left is not None: + self._api_data_right = None + self.right_hand_lost.emit() + + def _serial_exception_callback(self): + """ + Emit a disconnection event if the etee dongle connection is lost. + """ + ports = self.get_available_etee_ports() + if self.driver.serial_reader.port not in ports: + self.driver.disconnect() + self.dongle_disconnected.emit() + + def _print_callback(self, reading): + """ + Print messages received from the dongle, which are not data packets, and emits the corresponding connection events. + + :param bytes reading: Print message received from dongle. + """ + if reading == b"R connection complete\r\n": + self.right_connected.emit() + elif reading == b"L connection complete\r\n": + self.left_connected.emit() + elif reading == b"R disconnected\r\n": + self._api_data_right = None + self.right_disconnected.emit() + elif reading == b"L disconnected\r\n": + self._api_data_left = None + self.left_disconnected.emit() + + def _rest_callback(self, reading): + """ + Handles controller connection loss events. + Either controller's connection is lost if no data from the controller is received in 100ms. + + :param bytes reading: Print message received from dongle. + """ + if reading is not None: + return + left_lost = time.time() - self._hand_last_on_left > 0.1 and self._api_data_left is not None + right_lost = time.time() - self._hand_last_on_right > 0.1 and self._api_data_right is not None + if left_lost: + self._api_data_left = None + self.left_hand_lost.emit() + if right_lost: + self._api_data_right = None + self.right_hand_lost.emit() + if left_lost and right_lost: + self.data_lost.emit() + + # ---------------- IMU Processing ---------------- + def absolute_imu_enabled(self, on): + """ + Enables or disables absolute orientation. Absolute orientation uses data from the accelerometer, + gyroscope and magnetometer sensors for quaternion calculations. If disabled, the default mode will be enabled, + which uses relative orientation, calculated only though the accelerometer and gyroscope data. + + :param bool on: True to switch to absolute orientation, False for relative orientation. + """ + self._absolute_imu_on = on + + def _update_quaternion_left(self): + """ + Calculates and updates the left controller's quaternion and euler. + """ + accel = [ + self.get_left("accel_x"), + self.get_left("accel_y"), + self.get_left("accel_z")] + gyro = [ + self.get_left("gyro_x"), + self.get_left("gyro_y"), + self.get_left("gyro_z")] + if None in gyro or None in accel: + return + if self._absolute_imu_on: + mag = [ + self.get_left("mag_x"), + self.get_left("mag_y"), + self.get_left("mag_z")] + if None in mag: + return + else: + mag = None + self._quaternion_left = self._ahrs_left.get_quaternion(gyro, accel, mag) + self._euler_left = self._ahrs_left.get_euler(gyro, accel, mag) + + def _update_quaternion_right(self): + """ + Calculates and updates the right controller's quaternion and euler. + """ + accel = [self.get_right("accel_x"), self.get_right("accel_y"), self.get_right("accel_z")] + gyro = [self.get_right("gyro_x"), self.get_right("gyro_y"), self.get_right("gyro_z")] + if None in gyro or None in accel: + return + if self._absolute_imu_on: + mag = [self.get_right("mag_x"), self.get_right("mag_y"), self.get_right("mag_z")] + if None in mag: + return + else: + mag = None + self._quaternion_right = self._ahrs_right.get_quaternion(gyro, accel, mag) + self._euler_right = self._ahrs_right.get_euler(gyro, accel, mag) + + def update_gyro_offset_left(self): + """ + Retrieves the gyroscope calibration parameters saved on the left etee controller, and updates the calibration + offsets in the driver model. + """ + response = self.driver.send_command(b"BL+gf\r\n") + try: + x = float(response.split(b"X:")[1].split(b" ")[0].decode()) + y = float(response.split(b"Y:")[1].split(b" ")[0].decode()) + z = float(response.split(b"Z:")[1].split(b"\r\n")[0].decode()) + self._ahrs_left.set_gyro_offset([x, y, z]) + except: + pass + + def update_gyro_offset_right(self): + """ + Retrieves the gyroscope calibration parameters saved on the right etee controller, and updates the calibration + offsets in the driver model. + """ + response = self.driver.send_command(b"BR+gf\r\n") + try: + x = float(response.split(b"X:")[1].split(b" ")[0].decode()) + y = float(response.split(b"Y:")[1].split(b" ")[0].decode()) + z = float(response.split(b"Z:")[1].split(b"\r\n")[0].decode()) + self._ahrs_right.set_gyro_offset([x, y, z]) + except: + pass + + def update_mag_offset_left(self): + """ + Retrieves the magnetometer calibration parameters saved on the left etee controller, and updates the calibration + offsets in the driver model. + """ + response = self.driver.send_command(b"BL+mf\r\n") + try: + x = float(response.split(b"X:")[1].split(b" ")[0].decode()) + y = float(response.split(b"Y:")[1].split(b" ")[0].decode()) + z = float(response.split(b"Z:")[1].split(b"\r\n")[0].decode()) + self._ahrs_left.set_mag_offset([x, y, z]) + except: + pass + + def update_mag_offset_right(self): + """ + Retrieves the magnetometer calibration parameters saved on the right etee controller, and updates the calibration + offsets in the driver model. + """ + response = self.driver.send_command(b"BR+mf\r\n") + try: + x = float(response.split(b"X:")[1].split(b" ")[0].decode()) + y = float(response.split(b"Y:")[1].split(b" ")[0].decode()) + z = float(response.split(b"Z:")[1].split(b"\r\n")[0].decode()) + self._ahrs_right.set_mag_offset([x, y, z]) + except: + pass + + def update_imu_offsets(self): + """ + Retrieves the gyroscope and magnetometer calibration parameters from both controllers, and updates the calibration + offsets in the driver model. + """ + print("Updating gyro and magnetometer offsets. Please, wait...") + time.sleep(2) + self.update_gyro_offset_left() + self.update_gyro_offset_right() + self.update_mag_offset_left() + self.update_mag_offset_right() + print("Gyro and magnetometer offsets updated!") + + # ---------------- Firmware Versions ---------------- + def get_dongle_version(self): + """ + Retrieve the firmware version of the connected dongle. + + :return: Returns the dongle firmware version if a dongle is connected. + If no dongle is connected, the firmware version value will be None. + :rtype: str + """ + ret = None + response = self.driver.send_command(b"AT+AB\r\n") + response = parse_utf8(response) + if "NRF" in response: + ret = response.split("NRF")[1].split("\r\n")[0] + return ret + + def get_etee_versions(self): + """ + Retrieve the firmware version from the connected controllers. + + :return: Returns the firmware versions of the connected controllers. + If a controller is not connected, its firmware version value will be None. + :rtype: list[str] + """ + ret = [None, None] + response = self.driver.send_command(b"BP+AB\r\n", response_keys=[b"R:AB=etee", b"L:AB=etee"], verbose=True) + if response is None: + return ret + if response[b"R:AB=etee"] is not None and b'-' in response[b"R:AB=etee"]: + ret[1] = response[b"R:AB=etee"].decode().split("-")[1] + if response[b"L:AB=etee"] is not None and b'-' in response[b"L:AB=etee"]: + ret[0] = response[b"L:AB=etee"].decode().split("-")[1] + return ret + + # ---------------- Get controller data by key ---------------- + def get_left(self, w): + """ + Get a key value in the current internal data buffer for the left device. + + :param str w: Key for the device data to be retrieved, as defined in the YAML file. + :return: Left controller's value for the key provided. + """ + if self._api_data_left: + return self._api_data_left[w] + else: + return None + + def get_right(self, w): + """ + Get a key value in the current internal data buffer for the right device. + + :param str w: Key for the device data to be retrieved, as defined in the YAML file. + :return: Right controller's value for the key provided. + """ + if self._api_data_right: + return self._api_data_right[w] + else: + return None + + def get_data(self, dev, w): + """ + Get a key value in the current internal data buffer for the specified device (left or right). + + :param str dev: Selected controller hand. Possible values: "left", "right". + :param str w: Key for the device data to be retrieved, as defined in the YAML file. + :return: Selected controller's value for the key provided. + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left(w) + elif dev == "right": + return self.get_right(w) + else: + raise ValueError("Input 'dev' must be: 'left' or 'right'") + + # ---------------- Get hand/controller connection status ---------------- + def all_hands_on(self): + """ + Check if both left and right controllers are connected. + + :return: Returns true if data has been recently received from both controllers. + :rtype: bool + """ + return (self.get_left is not None) and (self.get_right is not None) + + def any_hand_on(self): + """ + Check if either left or right controller is connected. + + :return: Returns true if data has been recently received from any of the controller. + :rtype: bool + """ + return (self._api_data_left is not None) or (self._api_data_right is not None) + + def left_hand_on(self): + """ + Check if the left controller is connected. + + :return: Returns true if data has been recently received from the left controller. + :rtype: bool + """ + return self._api_data_left is not None + + def right_hand_on(self): + """ + Check if the right controller is connected. + + :return: Returns true if data has been recently received from the right controller. + :rtype: bool + """ + return self._api_data_right is not None + + # ================ Getter functions for sensor data ================ + # ---------------- Get pinky finger data ---------------- + def get_pinky_pull(self, dev): + """ + Returns the pinky finger pull value for the selected device/controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Pinky finger pull pressure (i.e. first pressure range, corresponding to light touch) for the selected controller. + Range: 0-126. Base value: 0. + :rtype: int + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("pinky_pull") + elif dev == "right": + return self.get_right("pinky_pull") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_pinky_force(self, dev): + """ + Returns the pinky finger force value for the selected device/controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Pinky finger force pressure (i.e. second pressure range, corresponding to squeeze levels) for the selected controller. + Range: 0-126. Base value: 0. + :rtype: int + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("pinky_force") + elif dev == "right": + return self.get_right("pinky_force") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_pinky_touched(self, dev): + """ + Returns the pinky finger touch value for the selected device/controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: True if the selected controller's pinky finger is touched. + :rtype: bool + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("pinky_touched") + elif dev == "right": + return self.get_right("pinky_touched") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_pinky_clicked(self, dev): + """ + Returns the pinky finger click value for the selected device/controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: True if the selected controller's pinky finger is clicked. + :rtype: bool + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("pinky_clicked") + elif dev == "right": + return self.get_right("pinky_clicked") + else: + raise ValueError("Input must be: 'left' or 'right'") + + # ---------------- Get ring finger data ---------------- + def get_ring_pull(self, dev): + """ + Returns the ring finger pull value for the selected device/controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Ring finger pull pressure (i.e. first pressure range, corresponding to light touch) for the selected controller. + Range: 0-126. Base value: 0. + :rtype: int + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("ring_pull") + elif dev == "right": + return self.get_right("ring_pull") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_ring_force(self, dev): + """ + Returns the ring finger force value for the selected device/controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Ring finger force pressure (i.e. second pressure range, corresponding to squeeze levels) for the selected controller. + Range: 0-126. Base value: 0. + :rtype: int + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("ring_force") + elif dev == "right": + return self.get_right("ring_force") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_ring_touched(self, dev): + """ + Returns the ring finger touch value for the selected device/controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: True if the selected controller's ring finger is touched. + :rtype: bool + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("ring_touched") + elif dev == "right": + return self.get_right("ring_touched") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_ring_clicked(self, dev): + """ + Returns the ring finger click value for the selected device/controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: True if the selected controller's ring finger is clicked. + :rtype: bool + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("ring_clicked") + elif dev == "right": + return self.get_right("ring_clicked") + else: + raise ValueError("Input must be: 'left' or 'right'") + + # ---------------- Get middle finger data ---------------- + def get_middle_pull(self, dev): + """ + Returns the middle finger pull value for the selected device/controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Middle finger pull pressure (i.e. first pressure range, corresponding to light touch) for the selected controller. + Range: 0-126. Base value: 0. + :rtype: int + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("middle_pull") + elif dev == "right": + return self.get_right("middle_pull") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_middle_force(self, dev): + """ + Returns the middle finger force value for the selected device/controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Middle finger force pressure (i.e. second pressure range, corresponding to squeeze levels) for the selected controller. + Range: 0-126. Base value: 0. + :rtype: int + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("middle_force") + elif dev == "right": + return self.get_right("middle_force") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_middle_touched(self, dev): + """ + Returns the middle finger touch value for the selected device/controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: True if the selected controller's middle finger is touched. + :rtype: bool + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("middle_touched") + elif dev == "right": + return self.get_right("middle_touched") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_middle_clicked(self, dev): + """ + Returns the middle finger click value for the selected device/controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: True if the selected controller's middle finger is clicked. + :rtype: bool + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("middle_clicked") + elif dev == "right": + return self.get_right("middle_clicked") + else: + raise ValueError("Input must be: 'left' or 'right'") + + # ---------------- Get index finger data ---------------- + def get_index_pull(self, dev): + """ + Returns the index finger pull value for the selected device/controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Index finger pull pressure (i.e. first pressure range, corresponding to light touch) for the selected controller. + Range: 0-126. Base value: 0. + :rtype: int + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("index_pull") + elif dev == "right": + return self.get_right("index_pull") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_index_force(self, dev): + """ + Returns the index finger force value for the selected device/controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Index finger force pressure (i.e. second pressure range, corresponding to squeeze levels) for the selected controller. + Range: 0-126. Base value: 0. + :rtype: int + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("index_force") + elif dev == "right": + return self.get_right("index_force") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_index_touched(self, dev): + """ + Returns the index finger touch value for the selected device/controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: True if the selected controller's index finger is touched. + :rtype: bool + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("index_touched") + elif dev == "right": + return self.get_right("index_touched") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_index_clicked(self, dev): + """ + Returns the index finger click value for the selected device/controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: True if the selected controller's index finger is clicked. + :rtype: bool + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("index_clicked") + elif dev == "right": + return self.get_right("index_clicked") + else: + raise ValueError("Input must be: 'left' or 'right'") + + # ---------------- Get thumb finger data ---------------- + def get_thumb_pull(self, dev): + """ + Returns the thumb finger pull value for the selected device/controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Thumb finger pull pressure (i.e. first pressure range, corresponding to light touch) for the selected controller. + Range: 0-126. Base value: 0. + :rtype: int + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("thumb_pull") + elif dev == "right": + return self.get_right("thumb_pull") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_thumb_force(self, dev): + """ + Returns the thumb finger force value for the selected device/controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Thumb finger force pressure (i.e. second pressure range, corresponding to squeeze levels) for the selected controller. + Range: 0-126. Base value: 0. + :rtype: int + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("thumb_force") + elif dev == "right": + return self.get_right("thumb_force") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_thumb_touched(self, dev): + """ + Returns the thumb finger touch value for the selected device/controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: True if the selected controller's thumb finger is touched. + :rtype: bool + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("thumb_touched") + elif dev == "right": + return self.get_right("thumb_touched") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_thumb_clicked(self, dev): + """ + Returns the thumb finger click value for the selected device/controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: True if the selected controller's thumb finger is clicked. + :rtype: bool + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("thumb_clicked") + elif dev == "right": + return self.get_right("thumb_clicked") + else: + raise ValueError("Input must be: 'left' or 'right'") + + # ---------------- Get all fingers data ---------------- + def get_device_finger_pressures(self, dev): + """ + Returns all the fingers pull and force pressure values. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Two arrays containing the pull and force pressures for the selected controller's five finger sensors. + The first element of each array will be the thumb, and the las the pinky. For example: fingers_pull[2] = index finger pull. + Pull and force values range: 0-126. Base values: 0. + :rtype: list[int], list[int] + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left" or "right": + fingers_pull = [self.get_data(dev, "thumb_pull"), self.get_data(dev, "index_pull"), + self.get_data(dev, "middle_pull"), + self.get_data(dev, "ring_pull"), self.get_data(dev, "pinky_pull")] + fingers_force = [self.get_data(dev, "thumb_force"), self.get_data(dev, "index_force"), + self.get_data(dev, "middle_force"), + self.get_data(dev, "ring_force"), self.get_data(dev, "pinky_force")] + return fingers_pull, fingers_force + else: + raise ValueError("Input must be: 'left' or 'right'") + + # ---------------- Get tracker data ---------------- + def get_tracker_connections(self): + """ + Checks if both eteeTrackers are connected to the controllers. + + :return: Returns True if both trackers are connected. + :rtype: bool + """ + return self.get_left("tracker_on") and self.get_right("tracker_on") + + def get_tracker_connection(self, dev): + """ + Checks if the selected controller has an eteeTracker connected. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Returns True if the selected tracker is connected. + :rtype: bool + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("tracker_on") + elif dev == "right": + return self.get_right("tracker_on") + else: + raise ValueError("Input must be: 'left' or 'right'") + + # ---------------- Get proximity sensor data from tracker ---------------- + def get_proximity(self, dev): + """ + Returns the proximity sensor analog value for the selected controller. + This sensor is only available when an eteeTracker is connected. + If disconnected, the value will always be 0, even when the sensor is interacted with. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Value of the selected tracker's proximity sensor. + Range: 0-126. Base value: 0. + :rtype: int + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("proximity_value") + elif dev == "right": + return self.get_right("proximity_value") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_proximity_touched(self, dev): + """ + Returns the proximity sensor touch value for the selected controller. + This sensor is only available when an eteeTracker is connected. + If disconnected, the value will always be false, even when the sensor is touched. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: True if the selected tracker's proximity sensor value is at touch level. + :rtype: bool + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("proximity_touched") + elif dev == "right": + return self.get_right("proximity_touched") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_proximity_clicked(self, dev): + """ + Returns the proximity sensor click value for the selected controller. + This sensor is only available when an eteeTracker is connected. + If disconnected, the value will always be false, even when the sensor is touched. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: True if the selected tracker's proximity sensor value is at click level. + :rtype: bool + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("proximity_clicked") + elif dev == "right": + return self.get_right("proximity_clicked") + else: + raise ValueError("Input must be: 'left' or 'right'") + + # ---------------- Get trackpad data ---------------- + def get_trackpad_x(self, dev): # Location + """ + Returns the trackpad x-axis position for the selected controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Trackpad X (horizontal) coordinate for the selected controller. + Range: 0-255. If not touched, the value is 126. + :rtype: int + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("trackpad_x") + elif dev == "right": + return self.get_right("trackpad_x") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_trackpad_y(self, dev): + """ + Returns the trackpad y-axis position for the selected controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Trackpad Y (vertical) coordinate for the selected controller. + Range: 0-255. If not touched, the value is 126. + :rtype: int + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("trackpad_y") + elif dev == "right": + return self.get_right("trackpad_y") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_trackpad_xy(self, dev): + """ + Returns the trackpad x-axis and y-axis positions for the selected controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Trackpad XY coordinates for the selected controller. + Range for each axis coordinate: 0-255. If not touched, the value is 126. + :rtype: list[int] + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left" or "right": + xy = [self.get_data(dev, "trackpad_x"), self.get_data(dev, "trackpad_y")] + if None in xy: + return None + else: + return xy + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_trackpad_pull(self, dev): # Pressure + """ + Returns the trackpad pull pressure value for the selected controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Trackpad pull pressure (i.e. first pressure range, corresponding to light touch) for the selected controller. + Range: 0-126. Base value: 0. + :rtype: int + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("trackpad_pull") + elif dev == "right": + return self.get_right("trackpad_pull") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_trackpad_force(self, dev): + """ + Returns the trackpad force pressure value for the selected controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Trackpad force pressure (i.e. second pressure range, corresponding to hard press) for the selected controller. + Range: 0-126. Base value: 0. + :rtype: int + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("trackpad_force") + elif dev == "right": + return self.get_right("trackpad_force") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_trackpad_touched(self, dev): + """ + Returns the trackpad touch value for the selected controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: True if the selected trackpad is touched. + :rtype: bool + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("trackpad_touched") + elif dev == "right": + return self.get_right("trackpad_touched") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_trackpad_clicked(self, dev): + """ + Returns the trackpad click value for the selected controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: True if the selected trackpad is clicked. + :rtype: bool + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("trackpad_clicked") + elif dev == "right": + return self.get_right("trackpad_clicked") + else: + raise ValueError("Input must be: 'left' or 'right'") + + # ---------------- Get slider data ---------------- + def get_slider_value(self, dev): # Location in Y-axis + """ + Returns the slider positional value for the selected controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: LED slider position, alongside its Y-axis (vertical), for the selected controller. + Range: 0-126. If not touched, the slider value is 126. + :rtype: int + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("slider_value") + elif dev == "right": + return self.get_right("slider_value") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_slider_touched(self, dev): # Touch + """ + Returns the slider touch value for the selected controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: True if the selected LED light is touched. + :rtype: bool + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("slider_touched") + elif dev == "right": + return self.get_right("slider_touched") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_slider_up_button(self, dev): # Slider Up/Down Button + """ + Returns the slider UP button value for the selected controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: True if the upper part of selected LED is touched. + :rtype: bool + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("slider_up_touched") + elif dev == "right": + return self.get_right("slider_up_touched") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_slider_down_button(self, dev): + """ + Returns the slider DOWN button value for the selected controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: True if the lower part of selected LED is touched. + :rtype: bool + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("slider_down_touched") + elif dev == "right": + return self.get_right("slider_down_touched") + else: + raise ValueError("Input must be: 'left' or 'right'") + + # ---------------- Get grip gesture data ---------------- + def get_grip_pull(self, dev): + """ + Returns the grip gesture's pull pressure value for the selected controller. + If the gesture is not performed, the pull value will be 0. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Grip gesture's pull pressure (i.e. first pressure range, corresponding to light touch) for the selected controller. + Range: 0-126. Base value: 0. + :rtype: int + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("grip_pull") + elif dev == "right": + return self.get_right("grip_pull") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_grip_force(self, dev): + """ + Returns the grip gesture's force pressure value for the selected controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Grip gesture's force pressure (i.e. second pressure range, corresponding to squeeze levels) for the selected controller. + Range: 0-126. Base value: 0. + :rtype: int + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("grip_force") + elif dev == "right": + return self.get_right("grip_force") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_grip_touched(self, dev): + """ + Returns the grip gesture's touch value for the selected controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: True if the grip gesture reaches touch level in the selected controller. + :rtype: bool + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("grip_touched") + elif dev == "right": + return self.get_right("grip_touched") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_grip_clicked(self, dev): + """ + Returns the grip gesture's click value for the selected controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: True if the grip gesture reaches click level in the selected controller. + :rtype: bool + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("grip_clicked") + elif dev == "right": + return self.get_right("grip_clicked") + else: + raise ValueError("Input must be: 'left' or 'right'") + + # ---------------- Get standard pinch (using trackpad) gesture data ---------------- + def get_pinch_trackpad_pull(self, dev): + """ + Returns the pull pressure value for the pinch with trackpad gesture in the selected controller. + If the gesture is not performed, the pull value will be 0. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Pull pressure for the pinch gesture (trackpad variation) in the selected controller. + Range: 0-126. Base value: 0. + :rtype: int + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("pinch_trackpad_pull") + elif dev == "right": + return self.get_right("pinch_trackpad_pull") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_pinch_trackpad_clicked(self, dev): + """ + Returns the click value for the pinch with trackpad gesture in the selected controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: True if the pinch gesture (trackpad variation) reaches click level in the selected controller. + :rtype: bool + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("pinch_trackpad_clicked") + elif dev == "right": + return self.get_right("pinch_trackpad_clicked") + else: + raise ValueError("Input must be: 'left' or 'right'") + + # ---------------- Get alternative pinch (using thumb finger) gesture data ---------------- + def get_pinch_thumbfinger_pull(self, dev): + """ + Returns the pull pressure value for the pinch with thumb finger gesture in the selected controller. + If the gesture is not performed, the pull value will be 0. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Pull pressure for the pinch gesture (thumb finger variation) in the selected controller. + Range: 0-126. Base value: 0. + :rtype: int + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("pinch_thumbfinger_pull") + elif dev == "right": + return self.get_right("pinch_thumbfinger_pull") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_pinch_thumbfinger_clicked(self, dev): + """ + Returns the click value for the pinch with thumb finger gesture in the selected controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: True if the pinch gesture (thumb finger variation) reaches click level in the selected controller. + Range: 0-126. Base value: 0. + :rtype: bool + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("pinch_thumbfinger_clicked") + elif dev == "right": + return self.get_right("pinch_thumbfinger_clicked") + else: + raise ValueError("Input must be: 'left' or 'right'") + + # ---------------- Get independent point (trackpad can be used) gesture data ---------------- + def get_point_independent_clicked(self, dev): + """ + This is the main point gesture used in VR and XBOX-controller based games. + + Returns the click value for the independent point (trackpad can be touched) gesture in the selected controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: True if the independent point gesture variation is detected in the selected controller. In this variation, the trackpad can be used alongside the point gesture. + :rtype: bool + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("point_independent_clicked") + elif dev == "right": + return self.get_right("point_independent_clicked") + else: + raise ValueError("Input must be: 'left' or 'right'") + + # ---------------- Get exclude-trackpad point (trackpad must not be touched) gesture data ---------------- + def get_point_excl_tp_clicked(self, dev): + """ + This is the alternative point gesture. + + Returns the click value for the exclude-trackpad point (trackpad must not be touched) gesture in the selected controller. + In this variation, if the user touches the trackpad while doing the point gesture, the gesture will be cancelled. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: True if the exclude-trackpad point gesture variation (i.e. where the trackpad is not touched) is detected in the selected controller. + :rtype: bool + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("point_exclude_trackpad_clicked") + elif dev == "right": + return self.get_right("point_exclude_trackpad_clicked") + else: + raise ValueError("Input must be: 'left' or 'right'") + + # ---------------- Get IMU and quaternions ---------------- + def get_quaternion(self, dev): + """ + Returns the quaternion for the selected controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Rotation quaternion for the selected controller. + :rtype: list[int] + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self._quaternion_left + elif dev == "right": + return self._quaternion_right + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_euler(self, dev): + """ + Returns the euler angles for the selected controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Euler angles (roll, pitch, yaw) for the selected controller. + :rtype: list[int] + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self._euler_left + elif dev == "right": + return self._euler_right + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_accel(self, dev): + """ + Returns the accelerometer values for the selected controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Acceleration vector for the selected controller. + :rtype: list[int] + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left" or "right": + accel = [self.get_data(dev, "accel_x"), self.get_data(dev, "accel_y"), self.get_data(dev, "accel_z")] + if None in accel: + return None + else: + return accel + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_gyro(self, dev): + """ + Returns the gyroscope values for the selected controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Angular acceleration vector for the selected controller. + :rtype: list[int] + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left" or "right": + gyro = [self.get_data(dev, "gyro_x"), self.get_data(dev, "gyro_y"), self.get_data(dev, "gyro_z")] + if None in gyro: + return None + else: + return gyro + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_mag(self, dev): + """ + Returns the magnetometer values for the selected controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Magnetic flux density vector for the selected controller. + :rtype: list[int] + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left" or "right": + mag = [self.get_data(dev, "mag_x"), self.get_data(dev, "mag_y"), self.get_data(dev, "mag_z")] + if None in mag: + return None + else: + return mag + else: + raise ValueError("Input must be: 'left' or 'right'") + + # ---------------- Get battery data ---------------- + def get_battery_level(self, dev): + """ + Returns the battery level for the selected controller. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: Battery fuel gauge level for the selected controller. + Range: 0-100. + :rtype: int + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("battery_level") + elif dev == "right": + return self.get_right("battery_level") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_charging_in_progress_status(self, dev): + """ + Checks if the selected controller is charging. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: True if the selected battery is charging. + :rtype: bool + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("battery_charging") + elif dev == "right": + return self.get_right("battery_charging") + else: + raise ValueError("Input must be: 'left' or 'right'") + + def get_charging_complete_status(self, dev): + """ + Checks if the selected controller has finished charging (battery level is 100%). + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: True if the selected battery charging has been completed. + :rtype: bool + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("battery_charging_complete") + elif dev == "right": + return self.get_right("battery_charging_complete") + else: + raise ValueError("Input must be: 'left' or 'right'") + + # ---------------- Get system/power button data ---------------- + def get_system_button_pressed(self, dev): + """ + Checks if the system button is pressed. + + :param str dev: Selected device hand. Possible values: "left", "right". + :return: True if the selected system button is pressed. + :rtype: bool + :raises ValueError: if the dev input is not "left" or "right" + """ + if dev == "left": + return self.get_left("system_button") + elif dev == "right": + return self.get_right("system_button") + else: + raise ValueError("Input must be: 'left' or 'right'") diff --git a/etee/quaternion.py b/etee/quaternion.py new file mode 100644 index 0000000..db467da --- /dev/null +++ b/etee/quaternion.py @@ -0,0 +1,206 @@ +""" +License: +-------- +Copyright 2022 Tangi0 Ltd. (trading as TG0) + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + +File description: +----------------- +Class and methods for quaternion arithmetics and euler angle estimations. + +""" + +import numbers +import numpy as np +import math + + +class Quaternion: + """ + Class implementing basic quaternion arithmetics and euler angle estimations. + """ + def __init__(self, w_or_q, x=None, y=None, z=None): + """ + Initializes a Quaternion object. + + :param float w_or_q: A scalar representing the real part of the quaternion, another Quaternion object or a + four-element array containing the quaternion values. + :param float x: The first imaginary part if w_or_q is a scalar. + :param float y: The second imaginary part if w_or_q is a scalar. + :param float z: The third imaginary part if w_or_q is a scalar. + """ + self._q = np.array([1, 0, 0, 0]) + + if x is not None and y is not None and z is not None: + w = w_or_q + q = np.array([w, x, y, z]) + elif isinstance(w_or_q, Quaternion): + q = np.array(w_or_q.q) + else: + q = np.array(w_or_q) + if len(q) != 4: + raise ValueError("Expecting a 4-element array or w x y z as parameters") + + self._set_q(q) + + # ---------------- Quaternion specific interfaces ---------------- + def conj(self): + """ + Returns the conjugate of the quaternion + + :return: conjugate of the quaternion + """ + return Quaternion(self._q[0], -self._q[1], -self._q[2], -self._q[3]) + + def to_angle_axis(self): + """ + Returns the quaternion's 3D rotation in an axis-angle representation. + + If the quaternion is the identity quaternion (1, 0, 0, 0), a rotation along the x-axis with angle 0 is returned. + + :return: rad, x, y, z --> where rad is the magnitude of rotation around the rotation axis, and x,y,z represent + the rotation axis' direction vector. + """ + if self[0] == 1 and self[1] == 0 and self[2] == 0 and self[3] == 0: + return 0, 1, 0, 0 + rad = np.arccos(self[0]) * 2 + imaginary_factor = np.sin(rad / 2) + if abs(imaginary_factor) < 1e-8: + return 0, 1, 0, 0 + x = self._q[1] / imaginary_factor + y = self._q[2] / imaginary_factor + z = self._q[3] / imaginary_factor + return rad, x, y, z + + @staticmethod + def from_angle_axis(rad, x, y, z): + """ + Returns the quaternion given the axis-angle representation. + + :param float rad: Magnitude of rotation about the rotation axis, in radians. + :param float x: x-component of the rotation axis' direction vector. + :param float y: y-component of the rotation axis' direction vector. + :param float z: z-component of the rotation axis' direction vector. + :return: quaternion + """ + s = np.sin(rad / 2) + return Quaternion(np.cos(rad / 2), x*s, y*s, z*s) + + def to_euler(self): + """ + Convert the quaternion into euler angles (roll, pitch, yaw). + + Roll is rotation around x in radians (counterclockwise), + pitch is rotation around y in radians (counterclockwise), and + yaw is rotation around z in radians (counterclockwise) + + :return: roll, pitch, yaw + """ + w, x, y, z = self._q[0], self._q[1], self._q[2], self._q[3] + + t0 = +2.0 * (w * x + y * z) + t1 = +1.0 - 2.0 * (x * x + y * y) + roll_x = math.atan2(t0, t1) + + t2 = +2.0 * (w * y - z * x) + t2 = +1.0 if t2 > +1.0 else t2 + t2 = -1.0 if t2 < -1.0 else t2 + pitch_y = math.asin(t2) * 2 + + t3 = +2.0 * (w * z + x * y) + t4 = +1.0 - 2.0 * (y * y + z * z) + yaw_z = math.atan2(t3, t4) + + return roll_x, pitch_y, yaw_z # in radians + + # ---------------- Quaternion operations ---------------- + def __mul__(self, other): + """ + Multiply the given quaternion with another quaternion or a scalar + + :param other: Quaternion object or number. + :return: Resultant Quaternion from the operation. + """ + if isinstance(other, Quaternion): + w = self._q[0]*other._q[0] - self._q[1]*other._q[1] - self._q[2]*other._q[2] - self._q[3]*other._q[3] + x = self._q[0]*other._q[1] + self._q[1]*other._q[0] + self._q[2]*other._q[3] - self._q[3]*other._q[2] + y = self._q[0]*other._q[2] - self._q[1]*other._q[3] + self._q[2]*other._q[0] + self._q[3]*other._q[1] + z = self._q[0]*other._q[3] + self._q[1]*other._q[2] - self._q[2]*other._q[1] + self._q[3]*other._q[0] + + return Quaternion(w, x, y, z) + elif isinstance(other, numbers.Number): + q = self._q * other + return Quaternion(q) + + def __add__(self, other): + """ + Add two quaternions element-wise or add a scalar to each element of the quaternion. + + :param other: Quaternion object or number. + :return: Resultant Quaternion from the operation. + """ + if not isinstance(other, Quaternion): + if len(other) != 4: + raise TypeError("Quaternions must be added to other quaternions or a 4-element array") + q = self.q + other + else: + q = self.q + other.q + + return Quaternion(q) + + # ---------------- Implementing other interfaces to ease working with the class ---------------- + def _set_q(self, q): + """ + Set quaternion values. + + :param q: New quaternion value, in ndarray format. + """ + self._q = q + + def _get_q(self): + """ + Return the current quaternion + + :return: current quaternion + """ + return self._q + + q = property(_get_q, _set_q) + + def __getitem__(self, item): + """ + Return specified quaternion component. + + :param item: key for quaternion item to be retrieved + :return: quaternion item value + """ + return self._q[item] + + def __array__(self): + """ + Return a copy of the quaternion ndarray. + + :return: quaternion + """ + return self._q + + def tolist(self): + """ + Convert and return quaternion as a list. + + :return: quaternion as list + """ + return self._q.tolist() + diff --git a/etee/tangio_for_etee/__init__.py b/etee/tangio_for_etee/__init__.py new file mode 100644 index 0000000..0b9f43c --- /dev/null +++ b/etee/tangio_for_etee/__init__.py @@ -0,0 +1,2 @@ +from .utilities import * +from .driver_base import * diff --git a/etee/tangio_for_etee/driver_base.py b/etee/tangio_for_etee/driver_base.py new file mode 100644 index 0000000..3456487 --- /dev/null +++ b/etee/tangio_for_etee/driver_base.py @@ -0,0 +1,643 @@ +""" +License: +-------- +Copyright 2022 Tangi0 Ltd. (trading as TG0) + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + +File description: +----------------- +Main file for subpackage: tangio_for_etee. +Methods for low-level TG0 device communication. + +""" + +from builtins import object +import threading +from bitstring import BitArray +import atexit +import serial +import warnings +import yaml +import os +import time + +from . import serial_ports + +yaml.warnings({'YAMLLoadWarning': False}) +warnings.filterwarnings("ignore", category=DeprecationWarning) +os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' + +DEFAULT_READLINE_TIMEOUT = 1 +DEFAULT_READ_DATA_TIMEOUT = 1 +DEFAULT_READ_SERIAL_TIMEOUT = 1 +DEFAULT_READ_RESPONSE_TIMEOUT = 1 + + +class SerialReader(object): + """ + This class is in charge of connecting to the hardware device through serial communication, and start reading and + parsing the data it receives to the defined data structure. + """ + + def __init__(self, config_file=None, *, baud_rate=115200, data_bytes=None, end_bytes=None, widgets=None): + """ + Initializes SerialReader class with the given parameters. + + :param str config_file: path to the file which contains the data structure definition. + :param int baud_rate: serial connection baud_rate. + :param int data_bytes: byte length of data to be received from the device. + :param int end_bytes: length of data packet delimiter characters (\xff). + :param dict widgets: dictionary defining the data structure. + """ + super().__init__() + self.serial = None + self.baud_rate = baud_rate + self.serial_lock = threading.Lock() + self.port = None + + if config_file is not None or widgets is not None: + self.data_bytes = None + self.end_bytes = None + self.widgets = None + self.configure(config_file, data_bytes=data_bytes, end_bytes=end_bytes, widgets=widgets) + + def connect(self, port=None): + """ + Opens a connection through a serial reader instance through a specified port or the first available TG0 port. + + :param str port: string representation the port of connection. + If None is passed, the driver will connect to the first available TG0 port. + """ + available_ports = serial_ports() + if not available_ports: + raise Exception("No TG0 device is found!") + if port is None: + for i in range(len(available_ports)): + port = available_ports[i][0] + try: + self.serial = serial.Serial(port, self.baud_rate, timeout=0.1, write_timeout=0.1) + print("Connected to port {}".format(port)) + self.port = port + break + except: + print("Failed to connect to port {}, try other port...".format(port)) + elif port in [i[0] for i in available_ports]: + try: + self.serial = serial.Serial(port, self.baud_rate, timeout=1, write_timeout=1) + print("connected to port {}".format(port)) + self.port = port + except: + print("Failed to connect to port {}, try other port...".format(port)) + else: + raise Exception("No TG0 device found at port {}!".format(port)) + + def close_connection(self): + """ + Closes the connection to hardware through the serial port maintained by the serial reader instance. + """ + if self.serial is not None: + self.serial.close() + if self.serial_lock.locked(): + self.serial_lock.release() + self.port = None + print("Connection closed") + + def reset_input(self): + """ + Empty the driver input buffer, which stores the device transmitted data. + """ + self.serial.reset_input_buffer() + + def readline(self, delim=b"\r\n", num=None, timeout=DEFAULT_READ_DATA_TIMEOUT): + """ + Reads bytes from the serial until a delimiter. + + :param bytes delim: delimiter until to which bytes are read. + :param int num: maximum number of characters to be read. + :param float timeout: timeout duration in seconds. + :return: read byte string. + """ + line = b"" + elapsed = 0 + time_start = time.time() + self.serial_lock.acquire() + while elapsed < timeout: + try: + char = self.serial.read() + line = line + char + except serial.SerialException as e: + self.serial_lock.release() + raise e + if isinstance(delim, list): + delim_found = False + for item in delim: + if line[-len(item):] == item: + delim_found = True + break + if delim_found: + break + else: + if line[-len(delim):] == delim: + break + if len(line) == num: + break + elapsed = time.time() - time_start + self.serial_lock.release() + return line + + def write(self, message): + """ + Writes a message to the serial, but does not read the device's response. + + :param bytes message: message written to the serial. + """ + self.serial.write(message) + + def send_command(self, message, response_start=b"OK", response_end=b"END", response_keys=None, timeout=DEFAULT_READ_RESPONSE_TIMEOUT, verbose=False): + """ + Writes a message to the serial, and reads response. + + :param bytes message: message written to the serial. + :param bytes response_start: a delimiter starting from which the response is read. + :param bytes response_end: a delimiter until which the response is read. + :param bytes response_keys: the commands leaves as soon as all these keys are read. + :param float timeout: read timeout duration in seconds. + :param bool verbose: enable or disable verbose mode, which prints messages sent and received through serial. + :return: read response + """ + if verbose: + print("write: ", message) + self.serial.write(message) + previous_timeout = self.serial.timeout + self.serial.timeout = 1 + response = b"" + response_started = (response_start is None) + if response_keys is not None: + keys_received = [False] * len(response_keys) + elapsed = 0 + time_start = time.time() + while elapsed < timeout: + elapsed = time.time() - time_start + line = self.readline(delim=b"\r\n") + if verbose: + print("readline: ", line) + if line == b'': + continue + elif b"\r\n" not in line: + print("Line was not read.") + continue + elif (response_start + b"\r\n") in line: + response += (response_start + b"\r\n") + response_started = True + else: + if response_started: + response += line + if response_end is not None: + if (response_end + b"\r\n") in line: + break + if response_keys is not None: + for i, key in enumerate(response_keys): + if key in line: + keys_received[i] = True + if all(keys_received): + break + if response_keys is not None: + response_dict = dict() + for key in response_keys: + if key in response: + response_dict[key] = response.split(key)[1].split(b"\r\n")[0] + else: + response_dict[key] = None + print(key, response_dict[key]) + response = response_dict + + self.serial.timeout = previous_timeout + + return response + + def configure(self, config_file=None, *, data_bytes=None, end_bytes=None, widgets=None): + """ + Update the information on data structure configuration. + This provides information on data packet structures, and byte-bit location of specific device parameters, + allowing for their retrieval during device communication. + + :param str config_file: path to the file which contains the data structure definition. + :param int data_bytes: byte length of data to be received from the device. + :param int end_bytes: length of data packet delimiter characters (\xff). + :param dict widgets: dictionary defining the data structure. + """ + if config_file is not None: + with open(config_file, 'r') as fstream: + conf_dict = yaml.safe_load(fstream) + self.data_bytes = conf_dict["total_bytes"]["data_bytes"] + self.end_bytes = conf_dict["total_bytes"]["end_bytes"] + self.widgets = conf_dict["widgets"] + else: + if data_bytes is not None and end_bytes is not None and widgets is not None: + self.data_bytes = data_bytes + self.end_bytes = end_bytes + self.widgets = widgets + else: + raise Exception("Since config_file is None, data_bytes, end_bytes and widgets are required arguments.") + + def raw2data(self, raw): + """ + Parses raw data to the specified data structure. + + :param bytes raw: raw binary data from the serial port. + :return: data structure parsed from the raw data following the data structure specification. + """ + events = {} + for name, properties in self.widgets.items(): + if "byte" in properties: + indices = properties["byte"] + is_signed = "signed" in properties + if isinstance(indices, list): + if any([x > len(raw) for x in indices]): + raise Exception("Widget index our of range") + if "single_value" in properties: + event = int.from_bytes(b"".join([raw[x:x + 1] for x in indices]), byteorder='little', signed=is_signed) + else: + event = [int.from_bytes(raw[x:x + 1], byteorder='big', signed=is_signed) for x in indices] + else: + if indices > len(raw): + raise Exception("Widget index our of range") + if 'bit' not in properties: + event = int.from_bytes(raw[indices:indices + 1], byteorder='big', signed=is_signed) + else: + event = BitArray(hex="0x" + raw[indices:indices + 1].hex()).bin[::-1] + event = int(event[properties['bit']]) + elif "bit" in properties: + indices = properties["bit"] + if any([x > len(raw) * 8 for x in indices]): + raise Exception("Widget index our of range") + bit_array = BitArray(hex="0x" + raw[::-1].hex()).bin[::-1] + event = int("".join([bit_array[i] for i in indices]), 2) + + events[name] = event + return events + + def read_widgets_and_text(self, timeout=DEFAULT_READ_SERIAL_TIMEOUT): + """ + Reads binary data from the serial port and calls raw2data to parse it. + + :param float timeout: time after which the method will stop trying to read the widget value, in seconds. + :return: data structure instance parsed using the raw2data method from a serial port raw data reading. + """ + events = None + start_time = time.time() + while time.time() - start_time < timeout: + data = self.readline(delim=[b"\xff\xff", b"\r\n"]) + if b"\xff\xff" == data[-self.end_bytes:] and len(data) == self.data_bytes + self.end_bytes: + data = data[0:-self.end_bytes] + events = self.raw2data(data) + break + if b"\r\n" == data[-2:]: + events = data + break + return events + + +class TG0Driver: + """ + This driver connects to TG0 devices through serial communication and retrieves data in a loop. It then stores this + data in a separate container for each device, overwriting them periodically as new data is fetched from the hardware. + This class is usually used for inheritance from a child class specific to the hardware device that it's been used for. + """ + def __init__(self, config_file=None, *, data_bytes=None, end_bytes=None, widgets=None, keep_alive_period=5): + """ + Initializes the TG0Driver class with the given parameters. + + :param str config_file: path to the file which contains the data structure definition. + :param int data_bytes: byte length of data to be received from the device. + :param int end_bytes: length of data packet delimiter characters (\xff). + :param dict widgets: dictionary defining the data structure. + :param float keep_alive_period: duration of time to keep the connection alive even when no data is transmitted. + """ + self.baud_rate = 115200 + self.serial_reader = None + self.port = None + self.thread = None + self.keep_alive_period = keep_alive_period + self.last_alive_time = 0 + self.config_file = config_file + self.data_bytes = data_bytes + self.end_bytes = end_bytes + self.widgets = widgets + self.run_mode = False + self.sleep_mode = False + self.callbacks = list() + self.print_callbacks = list() + self.rest_callbacks = list() + self.serial_exception_callbacks = list() + self.connection_callbacks = list() + self.current_data = None + self.frameno = -1 + self.read_text = False + self.loop_is_running = False + + def connect(self, port=None, close_at_exit=True): + """ + Attempts to open a connection between the driver and the hardware device. + + :param str port: string representation of the hardware port to be used to establish the connection to hardware. + This is an optional parameter. connect2hardware can be invoked with None port, and it will choose the + first available COM port. + :param bool close_at_exit: boolean to define whether the connection to the device will be closed at exit. + :return: success flag; true if the connection was successful, false otherwise. + """ + try: + self.serial_reader = SerialReader(self.config_file, baud_rate=self.baud_rate, data_bytes=self.data_bytes, + end_bytes=self.end_bytes, widgets=self.widgets) + self.serial_reader.connect(port=port) + if close_at_exit: + self.close_connection_at_exit() + self.last_alive_time = time.time() + self.connection_handler(state=1) + self.port = self.serial_reader.port + return True + except: + self.connection_handler(state=2) + raise Exception("Could not connect to hardware!") + + def disconnect(self): + """ + Commands the SerialReader instance to close the connection to hardware. + + :return: true if the connection to hardware was closed correctly, false otherwise. + """ + self.stop() + try: + self.serial_reader.close_connection() + self.connection_handler(state=0) + return True + except: + return False + + def configure(self, config_file=None, data_bytes=None, end_bytes=None, widgets=None): + """ + Update the information on data structure definition for specific data retrieval during device communication. + + :param str config_file: path to the file which contains the data structure definition. + :param int data_bytes: byte length of data to be received from the device. + :param int end_bytes: length of data packet delimiter characters (\xff). + :param dict widgets: dictionary defining the data structure. + """ + self.serial_reader.configure(config_file, data_bytes=data_bytes, end_bytes=end_bytes, widgets=widgets) + + def run(self): + """ + Launches a separate thread that reads data from the serial connection cyclically, and makes it available + through the getter methods. + """ + self.read_text = True + self.run_mode = True + self.thread = threading.Thread(target=self.loop) + self.thread.daemon = True + self.thread.start() + + def stop(self): + """ + Stops the data thread. + """ + self.run_mode = False + + def sleep(self, timeout=3): + """ + Suspends the data thread. During sleep, no data is read from serial. + + :param timeout: duration of the driver sleep. + :return: false is the driver loop is running. + """ + self.sleep_mode = True + start_time = time.time() + while (time.time() - start_time < timeout): + if not self.loop_is_running: + break + time.sleep(0.01) + return not self.loop_is_running + + def clear_callbacks(self): + """ + Clear all active callbacks. + """ + self.callbacks.clear() + + def add_callback(self, cb): + """ + Adds a specific callback for handling data messages received from the dongle. Note: Data messages contain sensor + information from the controller, and can be identified by their '\\\\xff\\\\xff' end flag. + + :param cb: callback. + """ + self.callbacks.append(cb) + + def add_print_callback(self, cb): + """ + Adds a specific callback for handling print messages received from the dongle, which are not data packets. + Note: Print messages contain information such as connection status or commands, and can be identified by + their '\\\\r\\\\n' end flag. + + :param cb: callback. + """ + self.print_callbacks.append(cb) + + def add_connection_callback(self, cb): + """ + Adds a specific callback for driver to device connection events. + + :param cb: callback. + """ + self.connection_callbacks.append(cb) + + def add_serial_exception_callbacks(self, cb): + """ + Adds a specific callback for raising exceptions errors. + + :param cb: callback. + """ + self.serial_exception_callbacks.append(cb) + + def add_rest_callback(self, cb): + """ + Adds any other callback that is not a data, print, connection or exception callback. + + :param cb: callback. + """ + self.rest_callbacks.append(cb) + + def data_handler(self, frameno, data): + """ + Handles the active callbacks for data parsing. Data received from the devices can be classified in data and + print messages. Data messages contain sensor information from the controller, and can be identified by their + b'\\\\xff\\\\xff' end flag. + + :param int frameno: Frame number. + :param bytes data: Data message received from the device. + """ + for cb in self.callbacks: + cb(frameno, data) + + def print_handler(self, reading): + """ + Handles the active callbacks for device status and command prints. Data received from the devices can be + classified in data and print messages. Print messages contain information such as connection status or + commands, and can be identified by their b'\r\n' end flag. + + :param int frameno: Frame number. + :param bytes data: Print message received from the device. + """ + for cb in self.print_callbacks: + cb(reading) + + def connection_handler(self, state): + """ + Handles the active connection callbacks. + + :param int state: + state 0: not connected, + state 1: connected, + state 2: failed building connection, + state 3: waiting/trying to rebuild previously connected COM. + """ + for cb in self.connection_callbacks: + cb(state) + + def serial_exception_handler(self): + """ + Handles the active callbacks for exception errors. + """ + for cb in self.serial_exception_callbacks: + cb() + + def rest_handler(self, reading): + """ + Handles any other active callbacks that are not data, print, connection or exception callbacks. + + :param bytes reading: Message received. + """ + for cb in self.rest_callbacks: + cb(reading) + + def next(self): + """ + Reads widget data from the data as specified by the data structure and stores it in a data holder. + """ + try: + reading = self.serial_reader.read_widgets_and_text() + except serial.SerialException: + self.serial_exception_handler() + return + + if reading is not None: + self.last_alive_time = time.time() + + if isinstance(reading, dict): + self.current_data = reading + self.frameno += 1 + self.data_handler(self.frameno, reading) + elif isinstance(reading, bytes): + self.print_handler(reading) + else: + self.rest_handler(reading) + + def loop(self): + """ + Method that loops infinitely parsing data read from the serial connection and storing it. + """ + while self.run_mode: + if not self.sleep_mode: + self.next() + self.loop_is_running = True + else: + self.loop_is_running = False + time.sleep(0.01) + + def write(self, message): + """ + Send the message to the hardware device. This is usually an instruction or command to be executed. + + :param bytes message: message to be passed. + """ + self.serial_reader.write(message) + + def send_command(self, command, sleep=True, *args, **kargs): + """ + Suspends reading data from the device. Sends command and reads response. + + :param bytes command: Command to be sent. + :param bool sleep: Boolean to toggle a sleep routine before the command is sent. True by default. + :return: Read response. + """ + if sleep and self.run_mode: + self.sleep() + try: + response = self.serial_reader.send_command(command, *args, **kargs) + except Exception as e: + self.sleep_mode = False + print("Sending command failed") + return + self.sleep_mode = False + return response + + def close_connection_at_exit(self): + """ + Register a callback to close the connection to hardware when the program is about to exit. + """ + atexit.register(self.disconnect) + + def set_alive_period(self, period): + """ + Set a different value for the duration of time to keep the connection alive even when no data is transmitted. + + :param float period: duration of alive period, in seconds. + """ + self.keep_alive_period = period + + def is_alive(self): + """ + Returns if the device connection is alive. + + :return: false if data hs not been received for a time longer than the alive period time. + """ + return (time.time() - self.last_alive_time) < self.keep_alive_period + + def last_alive(self): + """ + Returns the last time when hardware device data was received. + + :return: how long ago the data was last read, in seconds. + """ + return time.time() - self.last_alive_time + + +class _TG0DataQueue: + """ + This class handles the queue for data received from the hardware device. + """ + def __init__(self, queue): + """ + Initialise the class. + """ + self.queue = queue + + def put_frame(self, frameno, data): + """ + Put items into the queue. + + :param int frameno: frame number + :param bytes data: data passed + """ + self.queue.put((frameno, data)) diff --git a/etee/tangio_for_etee/utilities.py b/etee/tangio_for_etee/utilities.py new file mode 100644 index 0000000..d7b8800 --- /dev/null +++ b/etee/tangio_for_etee/utilities.py @@ -0,0 +1,90 @@ +""" +License: +-------- +Copyright 2022 Tangi0 Ltd. (trading as TG0) + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + +File description: +----------------- +Utility methods to retrieve COM port data and perform parsing/decoding. + +""" + +from serial.tools import list_ports + + +# ------------------------- COM Ports ------------------------- +def get_ports(predicate=None): + """ + Returns list available COM ports and their information. + + :param function predicate: predicate filtering criteria for ports. + :return: list of available COM ports meeting the filtering criteria + """ + if predicate is None: + predicate = lambda x: True + infos = list(filter(predicate, list_ports.comports())) + ports = [(info.device, info.vid) for info in infos] + return ports + + +def _get_port_info_predicate(vid, pid): + """ + Filter available COM ports by VID and/or PID + + :param int vid: device VID to filter. + :param int pid: device PID to filter. + + :return: true if a port fits the VID and/or PID criteria. + """ + if isinstance(vid, list): + return lambda port_info: port_info.vid in vid + elif isinstance(pid, list): + return lambda port_info: port_info.pid in pid + elif vid is not None and pid is not None: + return lambda port_info: port_info.vid == vid and port_info.pid == pid + elif vid is not None: + return lambda port_info: port_info.vid == vid + elif pid is not None: + return lambda port_info: port_info == pid + else: + return None + + +def serial_ports(vid=None, pid=None): + """ + Returns the available serial ports. + If VID and/or PID values are passed, the method will filter out COM ports that do not meet the VID and/or PID criteria. + + :param int vid: Device VID to filter. By default, it is None. + :param int pid: Device PID to filter. By default, it is None. + :return: List of available COM ports after VID/PID filtering. + """ + return get_ports(_get_port_info_predicate(vid, pid)) + + +# ------------------------- Parse Bytestring ------------------------- +def parse_utf8(bytestring): + """ + Decodes and parses a given bytestring. + + :param byte bytestring: bytestring to be parsed + :return: parsed data as string + """ + parsed = b"" + for i, byta in enumerate(bytestring): + if byta < 0x80: + parsed += bytestring[i:i + 1] + return parsed.decode() diff --git a/examples/01_Retrieving_Data/right_index_events_based.py b/examples/01_Retrieving_Data/right_index_events_based.py new file mode 100644 index 0000000..495d709 --- /dev/null +++ b/examples/01_Retrieving_Data/right_index_events_based.py @@ -0,0 +1,68 @@ +""" +Example code: +------------- +This script shows how to use event-based methods to retrieve controller data. +In this case, the process_right_index() callback function is connected to the right_hand_received event. +This will cause the callback function (which prints controller data) to be called every time that data from the +right controller is received by the dongle, and subsequently transmitted to the driver. +""" + +import time +import sys +import keyboard +from datetime import datetime +from etee import EteeController + + +def process_right_index(): + right_index_pull = etee.get_index_pull('right') + right_index_force = etee.get_index_force('right') + current_time = datetime.now().strftime("%H:%M:%S.%f") + print(current_time, f"The right index pressure is: pull = {right_index_pull:>3} | force = {right_index_force:>3}") + + +if __name__ == "__main__": + # Initialise the etee driver + etee = EteeController() + num_dongles_available = etee.get_number_available_etee_ports() + if num_dongles_available > 0: + etee.right_hand_received.connect(process_right_index) # Add the process_right_index() function + # as callbacks when right controller data is received + etee.connect() # Attempt connection to etee dongle + time.sleep(1) + etee.start_data() # Attempt to send a command to etee controllers to start data stream + etee.run() # Start data loop + else: + print("---") + print("No dongle found. Please, insert an etee dongle and re-run the application.") + sys.exit("Exiting application...") + + while True: + # If 'Esc' key is pressed while printing data, stop controller data stream, data loop and exit application + if keyboard.is_pressed('Esc'): + print("\n'Esc' key was pressed. Exiting application...") + + etee.stop_data() # Stop controller data stream + print("Controller data stream stopped.") + etee.stop() # Stop data loop + print("Data loop stopped.") + + time.sleep(0.05) + sys.exit(0) # Exit driver + + # Else continue printing controller data + else: + # If no data received from controller, retry controller connection + # If no dongle is connected, exit application + num_dongles_available = etee.get_number_available_etee_ports() + if num_dongles_available == 0: + print("---") + print("Dongle disconnected. Please, re-insert the dongle and re-run the application.") + + etee.stop_data() # Stop controller data stream + print("Controller data stream stopped.") + etee.stop() # Stop data loop + print("Data loop stopped.") + + time.sleep(0.05) + sys.exit("Exiting application...") \ No newline at end of file diff --git a/examples/01_Retrieving_Data/right_index_getter_function.py b/examples/01_Retrieving_Data/right_index_getter_function.py new file mode 100644 index 0000000..cc3da89 --- /dev/null +++ b/examples/01_Retrieving_Data/right_index_getter_function.py @@ -0,0 +1,77 @@ +""" +Example code: +------------- +This script shows how to use getter functions to retrieve controller data. +In this case, getter functions are called every loop. Getter functions pull data from the driver internal buffer, which +gets updated every time data is received from the controllers. If event-based methods are used, the frequency of data +received from the controllers can be irregular (e.g. 5ms, or 10ms). Unlike the event-based method, getter functions +allow the user to have better control in the timing of data retrieval. +""" + +import time +import sys +import keyboard +from datetime import datetime +from etee import EteeController + + +def process_right_index(): + right_index_pull = etee.get_index_pull('right') + right_index_force = etee.get_index_force('right') + return right_index_pull, right_index_force + + +if __name__ == "__main__": + # Initialise the etee driver and find dongle + etee = EteeController() + num_dongles_available = etee.get_number_available_etee_ports() + if num_dongles_available > 0: + etee.connect() # Attempt connection to etee dongle + time.sleep(1) + etee.start_data() # Attempt to send a command to etee controllers to start data stream + etee.run() # Start data loop + else: + print("---") + print("No dongle found. Please, insert an etee dongle and re-run the application.") + sys.exit("Exiting application...") + + # If dongle is connected, print index values + while True: + # If 'Esc' key is pressed while printing data, stop controller data stream, data loop and exit application + if keyboard.is_pressed('Esc'): + print("\n'Esc' key was pressed. Exiting application...") + + etee.stop_data() # Stop controller data stream + print("Controller data stream stopped.") + etee.stop() # Stop data loop + print("Data loop stopped.") + + time.sleep(0.05) + sys.exit(0) # Exit driver + + # Else continue printing controller data + else: + num_dongles_available = etee.get_number_available_etee_ports() + current_time = datetime.now().strftime("%H:%M:%S.%f") + + if num_dongles_available > 0: + right_index_pull, right_index_force = process_right_index() + if right_index_pull is None: + print("---") + print(current_time, "Right etee controller not detected. Please reconnect controller.") + etee.start_data() # Retry reconnection and data stream access in controllers + time.sleep(0.05) + else: + print(current_time, f"The right index pressure is: pull = {right_index_pull:>3} | force = {right_index_force:>3}") + time.sleep(0.05) + else: + print("---") + print(current_time, "Dongle disconnected. Please, re-insert the dongle and re-run the application.") + + etee.stop_data() # Stop controller data stream + print("Controller data stream stopped.") + etee.stop() # Stop data loop + print("Data loop stopped.") + + time.sleep(0.05) + sys.exit("Exiting application...") diff --git a/examples/02_Print_Data/print_etee_euler_angles.py b/examples/02_Print_Data/print_etee_euler_angles.py new file mode 100644 index 0000000..8a5e102 --- /dev/null +++ b/examples/02_Print_Data/print_etee_euler_angles.py @@ -0,0 +1,99 @@ +""" +Example code: +------------- +This script prints the estimated Euler angles (roll, pitch, yaw) from the selected eteeController, +using getter functions. +""" + +import time +import sys +import keyboard +from math import pi +from datetime import datetime +from etee import EteeController + +rad2deg_factor = 180/pi + + +def print_title(): + print("======================================================") + print(r" __ ___ ____ ____") + print(r" ___ / /____ ___ / | / __ \/ _/") + print(r" / _ \/ __/ _ \/ _ \ / /| | / /_/ // / ") + print(r"/ __/ /_/ __/ __/ / ___ |/ ____// / ") + print(r"\___/\__/\___/\___/ /_/ |_/_/ /___/ ") + print(" ") + print("Welcome to this etee CLI application.\nYou can print the Euler angles from one controller here.") + print("======================================================") + + +if __name__ == "__main__": + # Initialise the etee driver and find dongle + etee = EteeController() + num_dongles_available = etee.get_number_available_etee_ports() + if num_dongles_available > 0: + etee.connect() # Attempt connection to etee dongle + time.sleep(1) + etee.start_data() # Attempt to send a command to etee controllers to start data stream + etee.run() # Start data loop + else: + print("No dongle found. Please, insert an etee dongle and re-run the application.") + sys.exit("Exiting application...") + print_title() + + # Prompt for user to select the hand to print values from + print("Please, enter what controller hand you would like to print the values from. Valid options: right, left.") + valid_controllers = ["right", "left"] + controller_selected = input("--> Enter controller hand: ") + while controller_selected not in valid_controllers: + print("Input not valid! Please enter a valid input: right, left.") + controller_selected = input("--> Enter controller hand: ") + print("Your selected controller hand: ", controller_selected) + + while True: + # If 'Esc' key is pressed while printing data, stop controller data stream, data loop and exit application + if keyboard.is_pressed('Esc'): + print("\n'Esc' key was pressed. Exiting application...") + + etee.stop_data() # Stop controller data stream + print("Controller data stream stopped.") + etee.stop() # Stop data loop + print("Data loop stopped.") + + time.sleep(0.05) + sys.exit(0) # Exit driver + + # Else continue printing controller data + else: + current_time = datetime.now().strftime("%H:%M:%S.%f") + num_dongles_available = etee.get_number_available_etee_ports() + + # If a dongle is connected, try to print finger pressure values + if num_dongles_available > 0: + euler_angles_rad = etee.get_euler(controller_selected) + if euler_angles_rad is None: + print("---") + print(current_time, f"The {controller_selected} etee controller was not detected. Please reconnect controller.") + etee.start_data() # If a controller has reconnected with the dongle, + # it will start the etee controller data stream + time.sleep(0.5) + else: + euler_angles_deg = [euler_angles_rad[0] * rad2deg_factor + 180, + euler_angles_rad[1] * rad2deg_factor + 180, + euler_angles_rad[2] * rad2deg_factor + 180] + print(current_time, f"Controller: {controller_selected} --> " + f"Euler Angles: roll({round(euler_angles_deg[0],3):>8}), " + f"pitch({round(euler_angles_deg[1],3):>8}), " + f"yaw({round(euler_angles_deg[2],3):>8})") + time.sleep(0.05) + else: + print("---") + print(current_time, "Dongle disconnected. Please, re-insert the dongle and re-run the application.") + + etee.stop_data() # Stop controller data stream + print("Controller data stream stopped.") + etee.stop() # Stop data loop + print("Data loop stopped.") + + time.sleep(0.05) + sys.exit("Exiting application...") diff --git a/examples/02_Print_Data/print_etee_finger_data.py b/examples/02_Print_Data/print_etee_finger_data.py new file mode 100644 index 0000000..1fd8f70 --- /dev/null +++ b/examples/02_Print_Data/print_etee_finger_data.py @@ -0,0 +1,114 @@ +""" +Example code: +------------- +This script prints the finger values (pressure: pull and force, touch, click) from the selected finger in an +eteeController, using getter functions. +""" + +import time +import sys +import keyboard +from datetime import datetime +from etee import EteeController + + +def process_finger(dev, finger): + finger_pull = finger + "_pull" + finger_force = finger + "_force" + finger_touch = finger + "_touched" + finger_click = finger + "_clicked" + + finger_data = [etee.get_data(dev, finger_pull), etee.get_data(dev, finger_force), + etee.get_data(dev, finger_touch), etee.get_data(dev, finger_click)] + return finger_data + + +def print_title(): + print("======================================================") + print(r" __ ___ ____ ____") + print(r" ___ / /____ ___ / | / __ \/ _/") + print(r" / _ \/ __/ _ \/ _ \ / /| | / /_/ // / ") + print(r"/ __/ /_/ __/ __/ / ___ |/ ____// / ") + print(r"\___/\__/\___/\___/ /_/ |_/_/ /___/ ") + print(" ") + print("Welcome to this etee CLI application.\nYou can print an individual finger data here.") + print("======================================================") + + +if __name__ == "__main__": + # Initialise the etee driver and find dongle + etee = EteeController() + num_dongles_available = etee.get_number_available_etee_ports() + if num_dongles_available > 0: + etee.connect() # Attempt connection to etee dongle + time.sleep(1) + etee.start_data() # Attempt to send a command to etee controllers to start data stream + etee.run() # Start data loop + else: + print("No dongle found. Please, insert an etee dongle and re-run the application.") + sys.exit("Exiting application...") + + print_title() + + # Prompt for user to select the hand to print values from + print("Please, enter what controller hand you would like to print the values from. Valid options: right, left.") + valid_controllers = ["right", "left"] + controller_selected = input("--> Enter controller hand: ") + while controller_selected not in valid_controllers: + print("Input not valid! Please enter a valid input: right, left.") + controller_selected = input("--> Enter controller hand: ") + print("Your selected controller hand: ", controller_selected) + + # Prompt for user to select the finger to print values from + print("---") + print("Please, enter a finger name. Valid options: thumb, index, middle, ring, pinky.") + valid_fingers = ["thumb", "index", "middle", "ring", "pinky"] + finger_selected = input("Enter finger name: ") + while finger_selected not in valid_fingers: + print("Input not valid! Please enter a valid input: thumb, index, middle, ring, pinky.") + finger_selected = input("--> Enter finger name: ") + print("Your selected finger: ", finger_selected) + + # If dongle is connected, print index values + while True: + # If 'Esc' key is pressed while printing data, stop controller data stream, data loop and exit application + if keyboard.is_pressed('Esc'): + print("\n'Esc' key was pressed. Exiting application...") + + etee.stop_data() # Stop controller data stream + print("Controller data stream stopped.") + etee.stop() # Stop data loop + print("Data loop stopped.") + + time.sleep(0.05) + sys.exit(0) # Exit driver + + # Else continue printing controller data + else: + current_time = datetime.now().strftime("%H:%M:%S.%f") + num_dongles_available = etee.get_number_available_etee_ports() + + if num_dongles_available > 0: + selected_finger = process_finger(controller_selected, finger_selected) + if selected_finger[0] == None: + print("---") + print(current_time, f"The {controller_selected} etee controller was not detected. Please reconnect controller.") + etee.start_data() # If a controller has reconnected with the dongle, it will start etee controller data stream + time.sleep(0.05) + else: + print(current_time, f"Selected: {controller_selected} {finger_selected} --> " + f"Pressure (pull: {selected_finger[0]:<3}, force: {selected_finger[1]:<3}) | " + f"Touch: {'True' if selected_finger[2] else 'False':<5} | " + f"Click: {'True' if selected_finger[3] else 'False':<5}") + time.sleep(0.05) + else: + print("---") + print(current_time, "Dongle disconnected. Please, re-insert the dongle and re-run the application.") + + etee.stop_data() # Stop controller data stream + print("Controller data stream stopped.") + etee.stop() # Stop data loop + print("Data loop stopped.") + + time.sleep(0.05) + sys.exit("Exiting application...") diff --git a/examples/02_Print_Data/print_etee_fingers_all_data.py b/examples/02_Print_Data/print_etee_fingers_all_data.py new file mode 100644 index 0000000..7eef2fe --- /dev/null +++ b/examples/02_Print_Data/print_etee_fingers_all_data.py @@ -0,0 +1,96 @@ +""" +Example code: +------------- +This script prints the finger pressure values (pull and force) from all the fingers in the selected eteeController, +using getter functions. +""" + +import time +import sys +import keyboard + +from datetime import datetime +from etee import EteeController + + +def print_title(): + print("======================================================") + print(r" __ ___ ____ ____") + print(r" ___ / /____ ___ / | / __ \/ _/") + print(r" / _ \/ __/ _ \/ _ \ / /| | / /_/ // / ") + print(r"/ __/ /_/ __/ __/ / ___ |/ ____// / ") + print(r"\___/\__/\___/\___/ /_/ |_/_/ /___/ ") + print(" ") + print("Welcome to this etee CLI application.\nYou can print the data for all fingers in one controller here.") + print("======================================================") + + +if __name__ == "__main__": + # Initialise the etee driver and find dongle + etee = EteeController() + num_dongles_available = etee.get_number_available_etee_ports() + if num_dongles_available > 0: + etee.connect() # Attempt connection to etee dongle + time.sleep(1) + etee.start_data() # Attempt to send a command to etee controllers to start data stream + etee.run() # Start data loop + else: + print("No dongle found. Please, insert an etee dongle and re-run the application.") + sys.exit("Exiting application...") + print_title() + + # Prompt for user to select the hand to print values from + print("Please, enter what controller hand you would like to print the values from. Valid options: right, left.") + valid_controllers = ["right", "left"] + controller_selected = input("--> Enter controller hand: ") + while controller_selected not in valid_controllers: + print("Input not valid! Please enter a valid input: right, left.") + controller_selected = input("--> Enter controller hand: ") + print("Your selected controller hand: ", controller_selected) + + while True: + # If 'Esc' key is pressed while printing data, stop controller data stream, data loop and exit application + if keyboard.is_pressed('Esc'): + print("\n'Esc' key was pressed. Exiting application...") + + etee.stop_data() # Stop controller data stream + print("Controller data stream stopped.") + etee.stop() # Stop data loop + print("Data loop stopped.") + + time.sleep(0.05) + sys.exit(0) # Exit driver + + # Else continue printing controller data + else: + current_time = datetime.now().strftime("%H:%M:%S.%f") + num_dongles_available = etee.get_number_available_etee_ports() + + # If a dongle is connected, try to print finger pressure values + if num_dongles_available > 0: + selected_pull, selected_force = etee.get_device_finger_pressures(controller_selected) + if selected_pull[0] is None: + print("---") + print(current_time, f"The {controller_selected} etee controller was not detected. Please reconnect controller.") + etee.start_data() # If a controller has reconnected with the dongle, + # it will start the etee controller data stream + time.sleep(0.1) + else: + print(current_time, f"Controller: {controller_selected} --> " + f"Thumb: p({selected_pull[0]:>3}), f({selected_force[0]:>3}) " + f"| Index: p({selected_pull[1]:>3}), f({selected_force[1]:>3}) " + f"| Middle: p({selected_pull[2]:>3}), f({selected_force[2]:>3}) " + f"| Ring: p({selected_pull[3]:>3}), f({selected_force[3]:>3}) " + f"| Pinky: p({selected_pull[4]:>3}), f({selected_force[4]:>3})") + time.sleep(0.05) + else: + print("---") + print(current_time, "Dongle disconnected. Please, re-insert the dongle and re-run the application.") + + etee.stop_data() # Stop controller data stream + print("Controller data stream stopped.") + etee.stop() # Stop data loop + print("Data loop stopped.") + + time.sleep(0.05) + sys.exit("Exiting application...") diff --git a/examples/02_Print_Data/print_etee_imu.py b/examples/02_Print_Data/print_etee_imu.py new file mode 100644 index 0000000..ce7b360 --- /dev/null +++ b/examples/02_Print_Data/print_etee_imu.py @@ -0,0 +1,119 @@ +""" +Example code: +------------- +This script prints the IMU sensor values from the selected eteeController, using getter functions. +""" + +import time +import sys +import keyboard + +from datetime import datetime +from etee import EteeController + + +def print_title(): + print("======================================================") + print(r" __ ___ ____ ____") + print(r" ___ / /____ ___ / | / __ \/ _/") + print(r" / _ \/ __/ _ \/ _ \ / /| | / /_/ // / ") + print(r"/ __/ /_/ __/ __/ / ___ |/ ____// / ") + print(r"\___/\__/\___/\___/ /_/ |_/_/ /___/ ") + print(" ") + print("Welcome to this etee CLI application.\nYou can print the IMU data from one controller here.") + print("======================================================") + + +def adjust_imu(original_arr, offsets_arr): + adj_arr = [0, 0, 0] + if original_arr is None or offsets_arr is None: + pass + else: + for i in range(3): + adj_arr[i] = original_arr[i] - offsets_arr[i] + return adj_arr + + +if __name__ == "__main__": + # Initialise the etee driver and find dongle + etee = EteeController() + num_dongles_available = etee.get_number_available_etee_ports() + if num_dongles_available > 0: + etee.connect() # Attempt connection to etee dongle + time.sleep(1) + etee.start_data() # Attempt to send a command to etee controllers to start data stream + etee.run() # Start data loop + else: + print("No dongle found. Please, insert an etee dongle and re-run the application.") + sys.exit("Exiting application...") + print_title() + + # Prompt for user to select the hand to print values from + print("Please, enter what controller hand you would like to print the values from. Valid options: right, left.") + valid_controllers = ["right", "left"] + controller_selected = input("--> Enter controller hand: ") + while controller_selected not in valid_controllers: + print("Input not valid! Please enter a valid input: right, left.") + controller_selected = input("--> Enter controller hand: ") + print("Your selected controller hand: ", controller_selected) + + # IMU offsets + print("Don't move, calculating IMU offsets...") + time.sleep(2) + accel_offset = etee.get_accel(controller_selected) + gyro_offset = etee.get_gyro(controller_selected) + mag_offset = etee.get_mag(controller_selected) + print("Offsets calculated and applied.") + + while True: + # If 'Esc' key is pressed while printing data, stop controller data stream, data loop and exit application + if keyboard.is_pressed('Esc'): + print("\n'Esc' key was pressed. Exiting application...") + + etee.stop_data() # Stop controller data stream + print("Controller data stream stopped.") + etee.stop() # Stop data loop + print("Data loop stopped.") + + time.sleep(0.05) + sys.exit(0) # Exit driver + + # Else continue printing controller data + else: + current_time = datetime.now().strftime("%H:%M:%S.%f") + num_dongles_available = etee.get_number_available_etee_ports() + + # If a dongle is connected, try to print finger pressure values + if num_dongles_available > 0: + accel = etee.get_accel(controller_selected) + adj_accel = adjust_imu(accel, accel_offset) + + gyro = etee.get_gyro(controller_selected) + adj_gyro = adjust_imu(gyro, gyro_offset) + + mag = etee.get_mag(controller_selected) + adj_mag = adjust_imu(mag, mag_offset) + + if adj_accel is None: + print("---") + print(current_time, f"The {controller_selected} etee controller was not detected. Please reconnect controller.") + etee.start_data() # If a controller has reconnected with the dongle, + # it will start the etee controller data stream + time.sleep(0.1) + else: + print(current_time, f"Controller: {controller_selected} --> " + f"Accel: X({adj_accel[0]:>6}), Y({adj_accel[1]:>6}), Z({adj_accel[2]:>6}) " + f"| Gyro: X({adj_gyro[0]:>6}), Y({adj_gyro[1]:>6}), Z({adj_gyro[2]:>6}) " + f"| Magnetometer: X({adj_mag[0]:>6}), Y({adj_mag[1]:>6}), Z({adj_mag[2]:>6})") + time.sleep(0.05) + else: + print("---") + print(current_time, "Dongle disconnected. Please, re-insert the dongle and re-run the application.") + + etee.stop_data() # Stop controller data stream + print("Controller data stream stopped.") + etee.stop() # Stop data loop + print("Data loop stopped.") + + time.sleep(0.05) + sys.exit("Exiting application...") diff --git a/examples/02_Print_Data/print_etee_quaternions.py b/examples/02_Print_Data/print_etee_quaternions.py new file mode 100644 index 0000000..475a737 --- /dev/null +++ b/examples/02_Print_Data/print_etee_quaternions.py @@ -0,0 +1,94 @@ +""" +Example code: +------------- +This script prints the quaternions (representing spatial orientation and rotation) from the selected eteeController, +using getter functions. +""" + +import time +import sys +import keyboard + +from datetime import datetime +from etee import EteeController + + +def print_title(): + print("======================================================") + print(r" __ ___ ____ ____") + print(r" ___ / /____ ___ / | / __ \/ _/") + print(r" / _ \/ __/ _ \/ _ \ / /| | / /_/ // / ") + print(r"/ __/ /_/ __/ __/ / ___ |/ ____// / ") + print(r"\___/\__/\___/\___/ /_/ |_/_/ /___/ ") + print(" ") + print("Welcome to this etee CLI application.\nYou can print the quaternion data from one controller here.") + print("======================================================") + + +if __name__ == "__main__": + # Initialise the etee driver and find dongle + etee = EteeController() + num_dongles_available = etee.get_number_available_etee_ports() + if num_dongles_available > 0: + etee.connect() # Attempt connection to etee dongle + time.sleep(1) + etee.start_data() # Attempt to send a command to etee controllers to start data stream + etee.run() # Start data loop + else: + print("No dongle found. Please, insert an etee dongle and re-run the application.") + sys.exit("Exiting application...") + print_title() + + # Prompt for user to select the hand to print values from + print("Please, enter what controller hand you would like to print the values from. Valid options: right, left.") + valid_controllers = ["right", "left"] + controller_selected = input("--> Enter controller hand: ") + while controller_selected not in valid_controllers: + print("Input not valid! Please enter a valid input: right, left.") + controller_selected = input("--> Enter controller hand: ") + print("Your selected controller hand: ", controller_selected) + + while True: + # If 'Esc' key is pressed while printing data, stop controller data stream, data loop and exit application + if keyboard.is_pressed('Esc'): + print("\n'Esc' key was pressed. Exiting application...") + + etee.stop_data() # Stop controller data stream + print("Controller data stream stopped.") + etee.stop() # Stop data loop + print("Data loop stopped.") + + time.sleep(0.05) + sys.exit(0) # Exit driver + + # Else continue printing controller data + else: + current_time = datetime.now().strftime("%H:%M:%S.%f") + num_dongles_available = etee.get_number_available_etee_ports() + + # If a dongle is connected, try to print finger pressure values + if num_dongles_available > 0: + quaternions = etee.get_quaternion(controller_selected) + + if quaternions is None: + print("---") + print(current_time, f"The {controller_selected} etee controller was not detected. Please reconnect controller.") + etee.start_data() # If a controller has reconnected with the dongle, + # it will start the etee controller data stream + time.sleep(0.1) + else: + print(current_time, f"Controller: {controller_selected} --> " + f"Quaternions: q0({quaternions[0]:>6}), q1({quaternions[1]:>6}), " + f"q2({quaternions[2]:>6}), q3({quaternions[3]:>6})") + time.sleep(0.05) + else: + print("---") + print(current_time, "Dongle disconnected. Please, re-insert the dongle and re-run the application.") + + etee.stop_data() # Stop controller data stream + print("Controller data stream stopped.") + etee.stop() # Stop data loop + print("Data loop stopped.") + + time.sleep(0.05) + sys.exit("Exiting application...") diff --git a/examples/02_Print_Data/print_etee_slider_data.py b/examples/02_Print_Data/print_etee_slider_data.py new file mode 100644 index 0000000..a73dbba --- /dev/null +++ b/examples/02_Print_Data/print_etee_slider_data.py @@ -0,0 +1,102 @@ +""" +Example code: +------------- +This script prints the slider values (position, touch, UP and DOWN buttons) from the selected eteeController, +using getter functions. +""" + +import time +import sys +import keyboard +from datetime import datetime +from etee import EteeController + + +def process_slider(dev): + slider = [etee.get_slider_value(dev), + etee.get_slider_touched(dev), + etee.get_slider_up_button(dev), etee.get_slider_down_button(dev)] + return slider + + +def print_title(): + print("======================================================") + print(r" __ ___ ____ ____") + print(r" ___ / /____ ___ / | / __ \/ _/") + print(r" / _ \/ __/ _ \/ _ \ / /| | / /_/ // / ") + print(r"/ __/ /_/ __/ __/ / ___ |/ ____// / ") + print(r"\___/\__/\___/\___/ /_/ |_/_/ /___/ ") + print(" ") + print("Welcome to this etee CLI application.\nYou can print a controller's slider data here.") + print("======================================================") + + +if __name__ == "__main__": + # Initialise the etee driver and find dongle + etee = EteeController() + num_dongles_available = etee.get_number_available_etee_ports() + if num_dongles_available > 0: + etee.connect() # Attempt connection to etee dongle + time.sleep(1) + etee.start_data() # Attempt to send a command to etee controllers to start data stream + etee.run() # Start data loop + else: + print("No dongle found. Please, insert an etee dongle and re-run the application.") + sys.exit("Exiting application...") + + print_title() + + # Prompt for user to select the hand to print values from + print("Please, enter what controller hand you would like to print the values from. Valid options: right, left.") + valid_controllers = ["right", "left"] + controller_selected = input("--> Enter controller hand: ") + while controller_selected not in valid_controllers: + print("Input not valid! Please enter a valid input: right, left.") + controller_selected = input("--> Enter controller hand: ") + print("You selected controller hand: ", controller_selected) + + # If dongle is connected, print index values + while True: + # If 'Esc' key is pressed while printing data, stop controller data stream, data loop and exit application + if keyboard.is_pressed('Esc'): + print("\n'Esc' key was pressed. Exiting application...") + + etee.stop_data() # Stop controller data stream + print("Controller data stream stopped.") + etee.stop() # Stop data loop + print("Data loop stopped.") + + time.sleep(0.05) + sys.exit(0) # Exit driver + + # Else continue printing controller data + else: + current_time = datetime.now().strftime("%H:%M:%S.%f") + num_dongles_available = etee.get_number_available_etee_ports() + + if num_dongles_available > 0: + selected_value = process_slider(controller_selected) + if selected_value[0] == None: + print("---") + print(current_time, f"The {controller_selected} etee controller was not detected. Please reconnect controller.") + etee.start_data() # If a controller has reconnected with the dongle, it will start etee controller data stream + time.sleep(0.05) + + else: + print(current_time, f"Controller: {controller_selected} --> " + f"LED Slider Y-Value: {selected_value[0]:>3} " + f"| Slider Touched: {'True' if selected_value[1] else 'False':<5} " + f"| Slider Up Button: {'True' if selected_value[2] else 'False':<5} " + f"| Slider Down Button: {'True' if selected_value[3] else 'False':<5}") + time.sleep(0.05) + else: + print("---") + print(current_time, "Dongle disconnected. Please, re-insert the dongle and re-run the application.") + + etee.stop_data() # Stop controller data stream + print("Controller data stream stopped.") + etee.stop() # Stop data loop + print("Data loop stopped.") + + time.sleep(0.05) + sys.exit("Exiting application...") diff --git a/examples/02_Print_Data/print_etee_sliders_buttons.py b/examples/02_Print_Data/print_etee_sliders_buttons.py new file mode 100644 index 0000000..c352736 --- /dev/null +++ b/examples/02_Print_Data/print_etee_sliders_buttons.py @@ -0,0 +1,116 @@ +""" +Example code: +------------- +This script prints the slider UP/DOWN button values from both eteeControllers, using getter functions. +""" + +import time +import sys +import keyboard +from datetime import datetime +from etee import EteeController +# -*- coding: utf-8 -*- + + +def process_left_slider_buttons(): + left_slider = [etee.get_slider_up_button('left'), etee.get_slider_down_button('left')] + return left_slider + + +def process_right_slider_buttons(): + right_slider = [etee.get_slider_up_button('right'), etee.get_slider_down_button('right')] + return right_slider + + +def print_title(): + print("======================================================") + print(r" __ ___ ____ ____") + print(r" ___ / /____ ___ / | / __ \/ _/") + print(r" / _ \/ __/ _ \/ _ \ / /| | / /_/ // / ") + print(r"/ __/ /_/ __/ __/ / ___ |/ ____// / ") + print(r"\___/\__/\___/\___/ /_/ |_/_/ /___/ ") + print(" ") + print("Welcome to this etee CLI application.\nYou can print both controller's slider UP/DOWN data here.") + print("======================================================") + + +def check_controller_connection(left_data, right_data): + connection = True + if left_data[0] == None and right_data[0] == None: + print("---") + print(current_time, f"The left and right etee controllers were not detected. Please reconnect controller.") + etee.start_data() # If a controller has reconnected with the dongle, it will start etee controller data stream + time.sleep(0.05) + connection = False + elif left_data[0] == None: + print("---") + print(current_time, f"The left etee controller was not detected. Please reconnect controller.") + etee.start_data() # If a controller has reconnected with the dongle, it will start etee controller data stream + time.sleep(0.05) + connection = False + elif right_data[0] == None: + print("---") + print(current_time, f"The right etee controller was not detected. Please reconnect controller.") + etee.start_data() # If a controller has reconnected with the dongle, it will start etee controller data stream + time.sleep(0.05) + connection = False + return connection + +if __name__ == "__main__": + # Initialise the etee driver and find dongle + etee = EteeController() + num_dongles_available = etee.get_number_available_etee_ports() + if num_dongles_available > 0: + etee.connect() # Attempt connection to etee dongle + time.sleep(1) + etee.start_data() # Attempt to send a command to etee controllers to start data stream + etee.run() # Start data loop + else: + print("No dongle found. Please, insert an etee dongle and re-run the application.") + sys.exit("Exiting application...") + + print_title() + + # If dongle is connected, print index values + while True: + # If 'Esc' key is pressed while printing data, stop controller data stream, data loop and exit application + if keyboard.is_pressed('Esc'): + print("\n'Esc' key was pressed. Exiting application...") + + etee.stop_data() # Stop controller data stream + print("Controller data stream stopped.") + etee.stop() # Stop data loop + print("Data loop stopped.") + + time.sleep(0.05) + sys.exit(0) # Exit driver + + # Else continue printing controller data + else: + current_time = datetime.now().strftime("%H:%M:%S.%f") + num_dongles_available = etee.get_number_available_etee_ports() + + if num_dongles_available > 0: + left_buttons = process_left_slider_buttons() + right_buttons = process_right_slider_buttons() + controllers_connected = check_controller_connection(left_buttons, right_buttons) + if controllers_connected: + print(current_time, f"| Left slider - Up {'●' if left_buttons[0] else '○'} " + f", Down {'●' if left_buttons[1] else '○'} " + f"| Right slider - Up {'●' if right_buttons[0] else '○'} " + f", Down {'●' if right_buttons[1] else '○'}") + time.sleep(0.05) + else: + print("Please, connect both controllers.") + + else: + print("---") + print(current_time, "Dongle disconnected. Please, re-insert the dongle and re-run the application.") + + etee.stop_data() # Stop controller data stream + print("Controller data stream stopped.") + etee.stop() # Stop data loop + print("Data loop stopped.") + + time.sleep(0.05) + sys.exit("Exiting application...") diff --git a/examples/02_Print_Data/print_etee_trackpad_data.py b/examples/02_Print_Data/print_etee_trackpad_data.py new file mode 100644 index 0000000..c389b2b --- /dev/null +++ b/examples/02_Print_Data/print_etee_trackpad_data.py @@ -0,0 +1,101 @@ +""" +Example code: +------------- +This script prints the trackpad values (location: x-position and y-position, pressure: pull and force, touch, click) +from the selected eteeController, using getter functions. +""" + +import time +import sys +import keyboard +from datetime import datetime +from etee import EteeController + + +def process_trackpad(dev): + loc = [etee.get_data(dev, "trackpad_x"), etee.get_data(dev, "trackpad_y")] + pressure = [etee.get_data(dev, "trackpad_pull"), etee.get_data(dev, "trackpad_force"), + etee.get_data(dev, "trackpad_touched"), etee.get_data(dev, "trackpad_clicked")] + return loc, pressure + + +def print_title(): + print("======================================================") + print(r" __ ___ ____ ____") + print(r" ___ / /____ ___ / | / __ \/ _/") + print(r" / _ \/ __/ _ \/ _ \ / /| | / /_/ // / ") + print(r"/ __/ /_/ __/ __/ / ___ |/ ____// / ") + print(r"\___/\__/\___/\___/ /_/ |_/_/ /___/ ") + print(" ") + print("Welcome to this etee CLI application.\nYou can print trackpad data here.") + print("======================================================") + + +if __name__ == "__main__": + # Initialise the etee driver and find dongle + etee = EteeController() + num_dongles_available = etee.get_number_available_etee_ports() + if num_dongles_available > 0: + etee.connect() # Attempt connection to etee dongle + time.sleep(1) + etee.start_data() # Attempt to send a command to etee controllers to start data stream + etee.run() # Start data loop + else: + print("No dongle found. Please, insert an etee dongle and re-run the application.") + sys.exit("Exiting application...") + + print_title() + + # Prompt for user to select the hand to print values from + print("Please, enter what controller hand you would like to print the values from. Valid options: right, left.") + valid_controllers = ["right", "left"] + controller_selected = input("--> Enter controller hand: ") + while controller_selected not in valid_controllers: + print("Input not valid! Please enter a valid input: right, left.") + controller_selected = input("--> Enter controller hand: ") + print("Your selected controller hand: ", controller_selected) + + # If dongle is connected, print index values + while True: + # If 'Esc' key is pressed while printing data, stop controller data stream, data loop and exit application + if keyboard.is_pressed('Esc'): + print("\n'Esc' key was pressed. Exiting application...") + + etee.stop_data() # Stop controller data stream + print("Controller data stream stopped.") + etee.stop() # Stop data loop + print("Data loop stopped.") + + time.sleep(0.05) + sys.exit(0) # Exit driver + + # Else continue printing controller data + else: + current_time = datetime.now().strftime("%H:%M:%S.%f") + num_dongles_available = etee.get_number_available_etee_ports() + + if num_dongles_available > 0: + selected_loc, selected_pressure = process_trackpad(controller_selected) + if selected_loc[0] == None: + print("---") + print(current_time, f"The {controller_selected} etee controller was not detected. Please reconnect controller.") + etee.start_data() # If a controller has reconnected with the dongle, it will start etee controller data stream + time.sleep(0.05) + else: + print(current_time, f"Selected Trackpad: {controller_selected} --> " + f"X-Axis: {selected_loc[0]:<3}, Y-Axis: {selected_loc[1]:<3} |" + f"Pressure (pull: {selected_pressure[0]:<3}, force: {selected_pressure[1]:<3}), " + f"Touch: {'True' if selected_pressure[2] else 'False':<5}, " + f"Click: {'True' if selected_pressure[3] else 'False':<5}") + time.sleep(0.05) + else: + print("---") + print(current_time, "Dongle disconnected. Please, re-insert the dongle and re-run the application.") + + etee.stop_data() # Stop controller data stream + print("Controller data stream stopped.") + etee.stop() # Stop data loop + print("Data loop stopped.") + + time.sleep(0.05) + sys.exit("Exiting application...") diff --git a/examples/03_Plot_Data/plot_etee_euler_angles.py b/examples/03_Plot_Data/plot_etee_euler_angles.py new file mode 100644 index 0000000..3ad8718 --- /dev/null +++ b/examples/03_Plot_Data/plot_etee_euler_angles.py @@ -0,0 +1,113 @@ +""" +Example code: +------------- +This script starts a liveplot of the selected eteeController's Euler angles. +""" + +import matplotlib.pyplot as plt +import matplotlib.animation as animation +import sys +import time +from etee import EteeController +from math import pi +rad2deg_factor = 180/pi + + +# Ask user to select between left or right controller +def select_hand(): + # Prompt for user to select the hand to print values from + print("Please, enter what controller hand you would like to print the values from. Valid options: right, left.") + valid_controllers = ["right", "left"] + controller_selected = input("--> Enter controller hand: ") + while controller_selected not in valid_controllers: + print("Input not valid! Please enter a valid input: right, left.") + controller_selected = input("--> Enter controller hand: ") + print("Your selected controller hand: ", controller_selected) + return controller_selected + + +# This function is called periodically from FuncAnimation +def animate(i, y_1, y_2, y_3): + # Retrieve euler angles + euler_angles_rad = etee.get_euler(device) + + # Get euler angles + if euler_angles_rad is not None: + euler_angles_deg = [euler_angles_rad[0] * rad2deg_factor + 180, + euler_angles_rad[1] * rad2deg_factor + 180, + euler_angles_rad[2] * rad2deg_factor + 180] + else: + euler_angles_deg = [0, 0, 0] + + roll = round(euler_angles_deg[0], 3) + pitch = round(euler_angles_deg[1], 3) + yaw = round(euler_angles_deg[2], 3) + + # Add y to list + y_1.append(roll) + y_2.append(pitch) + y_3.append(yaw) + + # Limit y list to set number of items + y_1 = y_1[-x_len:] + y_2 = y_2[-x_len:] + y_3 = y_3[-x_len:] + + # Update line with new Y values + line_1.set_ydata(y_1) + line_2.set_ydata(y_2) + line_3.set_ydata(y_3) + + return line_1, line_2, line_3, + + +if __name__ == "__main__": + # ------------ Initialise etee ------------ + # Initialise the etee driver and find dongle + etee = EteeController() + num_dongles_available = etee.get_number_available_etee_ports() + if num_dongles_available > 0: + etee.connect() # Attempt connection to etee dongle + time.sleep(1) + etee.start_data() # Attempt to send a command to etee controllers to start data stream + etee.run() # Start data loop + else: + print("No dongle found. Please, insert an etee dongle and re-run the application.") + sys.exit("Exiting application...") + + # Ask user for the hand data to plot + device = select_hand() + + # Turn absolute IMU off + etee.absolute_imu_enabled(False) + + # ------------ Initialise graph ------------ + # Parameters + x_len = 200 # Number of points to display + y_range = [0, 360] # Range of possible Y values to display + + # Create figure for plotting + fig = plt.figure() + ax = fig.add_subplot(1, 1, 1) + xs = list(range(0, 200)) + ys_1 = [0] * x_len + ys_2 = [0] * x_len + ys_3 = [0] * x_len + + ax.set_ylim(y_range) + + # Create a blank line. We will update the line in animate + line_1, = ax.plot(xs, ys_1) + line_2, = ax.plot(xs, ys_2) + line_3, = ax.plot(xs, ys_3) + + # Add labels + plt.title('Live Plot of Euler angles') + plt.xlabel('Samples') + plt.ylabel('Euler angles (degrees)') + plt.legend([line_1, line_2, line_3], ['Roll', 'Pitch', 'Yaw']) + + # ------------ Plot live values ------------ + # Set up plot to call animate() function periodically + ani = animation.FuncAnimation(fig, animate, fargs=(ys_1, ys_2, ys_3,), interval=50, blit=True) + plt.show() diff --git a/examples/03_Plot_Data/plot_etee_quaternions.py b/examples/03_Plot_Data/plot_etee_quaternions.py new file mode 100644 index 0000000..192f27c --- /dev/null +++ b/examples/03_Plot_Data/plot_etee_quaternions.py @@ -0,0 +1,108 @@ +""" +Example code: +------------- +This script starts a liveplot of the selected eteeController's quaternion values. +""" + +import matplotlib.pyplot as plt +import matplotlib.animation as animation +import sys +import time +from etee import EteeController + + +# Ask user to select between left or right controller +def select_hand(): + # Prompt for user to select the hand to print values from + print("Please, enter what controller hand you would like to print the values from. Valid options: right, left.") + valid_controllers = ["right", "left"] + controller_selected = input("--> Enter controller hand: ") + while controller_selected not in valid_controllers: + print("Input not valid! Please enter a valid input: right, left.") + controller_selected = input("--> Enter controller hand: ") + print("Your selected controller hand: ", controller_selected) + return controller_selected + + +# This function is called periodically from FuncAnimation +def animate(i, y_1, y_2, y_3, y_4): + # Retrieve euler angles + quaternions = etee.get_quaternion(device) + + # Get euler angles + if quaternions is None: + quaternions = [0, 0, 0, 0] + + # Add y to list + y_1.append(quaternions[0]) + y_2.append(quaternions[1]) + y_3.append(quaternions[2]) + y_4.append(quaternions[3]) + + # Limit y list to set number of items + y_1 = y_1[-x_len:] + y_2 = y_2[-x_len:] + y_3 = y_3[-x_len:] + y_4 = y_4[-x_len:] + + # Update line with new Y values + line_1.set_ydata(y_1) + line_2.set_ydata(y_2) + line_3.set_ydata(y_3) + line_4.set_ydata(y_4) + + return line_1, line_2, line_3, line_4 + + +if __name__ == "__main__": + # ------------ Initialise etee ------------ + # Initialise the etee driver and find dongle + etee = EteeController() + num_dongles_available = etee.get_number_available_etee_ports() + if num_dongles_available > 0: + etee.connect() # Attempt connection to etee dongle + time.sleep(1) + etee.start_data() # Attempt to send a command to etee controllers to start data stream + etee.run() # Start data loop + else: + print("No dongle found. Please, insert an etee dongle and re-run the application.") + sys.exit("Exiting application...") + + # Ask user for the hand data to plot + device = select_hand() + + # Turn absolute IMU off + etee.absolute_imu_enabled(False) + + # ------------ Initialise graph ------------ + # Parameters + x_len = 200 # Number of points to display + y_range = [-1, 1] # Range of possible Y values to display + + # Create figure for plotting + fig = plt.figure() + ax = fig.add_subplot(1, 1, 1) + xs = list(range(0, 200)) + ys_1 = [0] * x_len + ys_2 = [0] * x_len + ys_3 = [0] * x_len + ys_4 = [0] * x_len + + ax.set_ylim(y_range) + + # Create a blank line. We will update the line in animate + line_1, = ax.plot(xs, ys_1) + line_2, = ax.plot(xs, ys_2) + line_3, = ax.plot(xs, ys_3) + line_4, = ax.plot(xs, ys_4) + + # Add labels + plt.title('Live Plot of Quaternion Values') + plt.xlabel('Samples') + plt.ylabel('Quaternion Values') + plt.legend([line_1, line_2, line_3, line_4], ['q0', 'q1', 'q2', 'q3']) + + # ------------ Plot live values ------------ + # Set up plot to call animate() function periodically + ani = animation.FuncAnimation(fig, animate, fargs=(ys_1, ys_2, ys_3, ys_4), interval=50, blit=True) + plt.show() diff --git a/img/etee-controller-sensors.jpg b/img/etee-controller-sensors.jpg new file mode 100644 index 0000000..a984e22 Binary files /dev/null and b/img/etee-controller-sensors.jpg differ diff --git a/img/etee-gestures.jpg b/img/etee-gestures.jpg new file mode 100644 index 0000000..26f271c Binary files /dev/null and b/img/etee-gestures.jpg differ diff --git a/img/logo-banner-dark-mode.png b/img/logo-banner-dark-mode.png new file mode 100644 index 0000000..f84dea2 Binary files /dev/null and b/img/logo-banner-dark-mode.png differ diff --git a/img/logo-banner-light-mode.png b/img/logo-banner-light-mode.png new file mode 100644 index 0000000..74bb435 Binary files /dev/null and b/img/logo-banner-light-mode.png differ diff --git a/img/logo-dark-mode.png b/img/logo-dark-mode.png new file mode 100644 index 0000000..7efa8e7 Binary files /dev/null and b/img/logo-dark-mode.png differ diff --git a/img/logo-light-mode.png b/img/logo-light-mode.png new file mode 100644 index 0000000..ee5f40a Binary files /dev/null and b/img/logo-light-mode.png differ diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..4b8cf7a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +numpy>=1.22.3 +bitstring>=3.1.9 +pyserial>=3.5 +PyYAML>=3.12 +keyboard>=0.13.5 +matplotlib>=3.6.2 \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..bbbdc8a --- /dev/null +++ b/setup.py @@ -0,0 +1,33 @@ +""" +Script to automate the etee_api package installation. +To use this file, navigate to the directory where setup.py is located and use the command:. + ---------------- + $ pip install . + ---------------- +This will automatically install the package in your environment. +""" + +from setuptools import setup, find_packages +from etee import __version__ + +setup( + name="etee-api", + version=__version__, + python_requires='>=3.8, <4', + packages=find_packages(), + install_requires=[ + 'numpy>=1.22.3', + 'bitstring>=3.1.9', + 'pyserial>=3.5', + 'PyYAML>=3.12', + ], + package_data={'etee': ['config/*.yaml']}, + author="Dimitri Chikhladze, Pilar Zhang Qiu", + author_email="pilar@tg0.co.uk", + description="Official Python API for the eteeController devices. " + "This API enables easy device data reading and communication. " + "To learn more about the controllers, visit: eteexr.com .", + url='https://github.com/eteexr', + license='Apache-2.0', + license_files='LICENSE.txt' +) diff --git a/setup_repo.py b/setup_repo.py new file mode 100644 index 0000000..e91fe35 --- /dev/null +++ b/setup_repo.py @@ -0,0 +1,67 @@ +""" +Script to automate setting up of skeleton repository by setting up a virtual environment and installing requirements. +By default this is a python 3 virtual environment called venv. The script will install any missing dependencies. +""" + +import os +import subprocess +import sys +import platform + + +# Check OS and use proper command +if platform.system() == "Windows": + which_command = "where" +else: + which_command = "which" + +# Check that virtualenv is installed, and if not install it. +print("Looking for virtualenv...") +which_venv = subprocess.call([which_command, "virtualenv"], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) +if which_venv == 1: + print("Virtualenv not installed, installing...") + install_venv = subprocess.run(["pip", "install", "virtualenv"]) + if install_venv.returncode != 0: + print("Virtualenv installation failed, please install manually and then run this script again") + exit(1) + else: + print("Virtualenv installation successful") +elif which_venv == 0: + which_venv = subprocess.check_output([which_command, "virtualenv"]) + print("Virtualenv found at {}".format(which_venv.decode('utf-8').rstrip())) +else: + print("ERROR") + exit(1) + +# If it doesn't exist already, create the virtual environment +print("Looking for venv...") +if not os.path.isdir("venv/"): + print("Not found, creating...") + # System independent way of finding the python executable + # Linux should return /usr/bin/python3 + python_source = "--python={}".format(str(sys.executable)) + create_venv = subprocess.run(["virtualenv", python_source, "venv"]) + if create_venv.returncode != 0: + print("Virtual environment 'venv' creation failed, please create it manually and then run this script again") + exit(1) + else: + print("Virtual environment 'venv' created successfully") +else: + print("Virtual environment 'venv' already exists") + +# Activate the virtual environment (so we're running using the python in venv rather than the global one) +print("Activating virtual environment venv") +if platform.system() == "Windows": + exec(open("venv/Scripts/activate_this.py").read(), dict(__file__="venv/Scripts/activate_this.py")) +else: + # Linux and MacOS (untested) + exec(open("venv/bin/activate_this.py").read(), dict(__file__="venv/bin/activate_this.py")) + +# Install the requirements from the file 'requirements.txt' +install_reqs = subprocess.run(["pip", "install", "-r", "requirements.txt"]) +if install_reqs.returncode != 0: + print("Failed to install some requirements, please install them manually and then run this script again") + exit(1) +else: + print("Requirements successfully installed") +