In [24]:
# -*- coding: utf-8 -*-
#
#     ||          ____  _ __
#  +------+      / __ )(_) /_______________ _____  ___
#  | 0xBC |     / __  / / __/ ___/ ___/ __ `/_  / / _ \
#  +------+    / /_/ / / /_/ /__/ /  / /_/ / / /_/  __/
#   ||  ||    /_____/_/\__/\___/_/   \__,_/ /___/\___/
#
#  Copyright (C) 2014 Bitcraze AB
#
#  Crazyflie Nano Quadcopter Client
#
#  This program is free software; you can redistribute it and/or
#  modify it under the terms of the GNU General Public License
#  as published by the Free Software Foundation; either version 2
#  of the License, or (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Simple example that connects to the first Crazyflie found, triggers
reading of all the parameters and displays their values. It then modifies
one parameter and reads back it's value. Finally it disconnects.
"""
import logging
import random
import time

import cflib.crtp
from cflib.crazyflie import Crazyflie
from cflib.utils import uri_helper

address = uri_helper.uri_from_env(default='radio://0/27/2M/E7E7E7E702')

# Only output errors from the logging framework
logging.basicConfig(level=logging.ERROR)


class ParamExample:
    """
    Simple logging example class that logs the Stabilizer from a supplied
    link uri and disconnects after 5s.
    """

    def __init__(self, link_uri):
        """ Initialize and run the example with the specified link_uri """

        self._cf = Crazyflie(rw_cache='./cache')

        # Connect some callbacks from the Crazyflie API
        #self._cf.connected.add_callback(self._connected)
        # self._cf.fully_connected.add_callback(self._fully_connected)
        # self._cf.disconnected.add_callback(self._disconnected)
        # self._cf.connection_failed.add_callback(self._connection_failed)
        # self._cf.connection_lost.add_callback(self._connection_lost)

        print('Connecting to %s' % link_uri)

        # Try to connect to the Crazyflie
        self._cf.open_link(link_uri)

        # Variable used to keep main loop occupied until disconnect
        self.is_connected = True

        self._param_check_list = []
        self._param_groups = []

        random.seed()
        self._cf.param.add_update_callback(group='locSrv', name=None, cb=self._loc_srv_callback)

    # def _connected(self, link_uri):
    #     """ This callback is called form the Crazyflie API when a Crazyflie
    #     has been connected and the TOCs have been downloaded. Parameter values are not downloaded yet."""
    #     print('Connected to %s' % link_uri)

    #     # Print the param TOC
    #     p_toc = self._cf.param.toc.toc
    #     for group in sorted(p_toc.keys()):
    #         # print('{}'.format(group))
    #         for param in sorted(p_toc[group].keys()):
    #             # print('\t{}'.format(param))
    #             self._param_check_list.append('{0}.{1}'.format(group, param))
    #         self._param_groups.append('{}'.format(group))
    #         # For every group, register the callback
    #         self._cf.param.add_update_callback(group=group, name=None, cb=self._param_callback)

    #     # You can also register a callback for a specific group.name combo
    #     self._cf.param.add_update_callback(group='cpu', name='flash',
    #                                        cb=self._cpu_flash_callback)

    #     print('')
    def _loc_srv_callback(self, name, value):
        """Callback for locSrv group"""
        if name.startswith('locSrv'):
            print(f'{name}: {value}')

    def _fully_connected(self, link_uri):
        """This callback is called when the Crazyflie has been connected and all parameters have been
        downloaded. It is now OK to set and get parameters."""
        print(f'Parameters downloaded to {link_uri}')

        # We can get a parameter value directly without using a callback
        value = self._cf.param.get_value('posEstAlt.estAlphaZr')
        print(f'Value read with get() is {value}')

        # When a parameter is set, the callback is called with the new value
        self._cf.param.add_update_callback(group='posEstAlt', name='estAlphaZr', cb=self._a_pitch_kd_callback)
        # When setting a value the parameter is automatically read back
        # and the registered callbacks will get the updated value
        self._cf.param.set_value('pid_attitude.pitch_kd', 0.1234)

    def _cpu_flash_callback(self, name, value):
        """Specific callback for the cpu.flash parameter"""
        print('The connected Crazyflie has {}kb of flash'.format(value))

    def _param_callback(self, name, value):
        """Generic callback registered for all the groups"""
        print('{0}: {1}'.format(name, value))

        # Remove each parameter from the list when fetched
        self._param_check_list.remove(name)
        if len(self._param_check_list) == 0:
            print('Have fetched all parameter values.')

            # Remove all the group callbacks
            for g in self._param_groups:
                self._cf.param.remove_update_callback(group=g, cb=self._param_callback)

    def _a_pitch_kd_callback(self, name, value):
        """Callback for pid_attitude.pitch_kd"""
        print('Read back: {0}={1}'.format(name, value))

        # This is the end of the example, close link
        self._cf.close_link()

    def _connection_failed(self, link_uri, msg):
        """Callback when connection initial connection fails (i.e no Crazyflie
        at the specified address)"""
        print('Connection to %s failed: %s' % (link_uri, msg))
        self.is_connected = False

    def _connection_lost(self, link_uri, msg):
        """Callback when disconnected after a connection has been made (i.e
        Crazyflie moves out of range)"""
        print('Connection to %s lost: %s' % (link_uri, msg))

    def _disconnected(self, link_uri):
        """Callback when the Crazyflie is disconnected (called in all cases)"""
        print('Disconnected from %s' % link_uri)
        self.is_connected = False
    def get_parameter_value(self, param_name):
        """
        Fetch the value of a specific parameter.

        :param param_name: The name of the parameter to fetch.
        """
        try:
            value = self._cf.param.get_value(param_name)
            print(f'{param_name}: {value}')
        except KeyError:
            print(f'Parameter {param_name} not found.')


if __name__ == '__main__':
    import logging
    logging.getLogger().setLevel(logging.INFO)
    # Initialize the low-level drivers
    cflib.crtp.init_drivers()
    pe = ParamExample(address)


    # The Crazyflie lib doesn't contain anything to keep the application
    # alive, so this is where your application should do something. In our
    # case we are just waiting until we are disconnected.
    pe.get_parameter_value('locSrv.x')
    pe._cf.close_link()



INFO:cflib.crazyflie:Callback->Connection initialized[radio://0/27/2M/E7E7E7E702]


Connecting to radio://0/27/2M/E7E7E7E702


INFO:cflib.crazyflie:We are connected[radio://0/27/2M/E7E7E7E702], request connection setup
INFO:cflib.crazyflie.platformservice:Request _request_protocol_version()
INFO:cflib.crazyflie:Callback->Connected to [radio://0/27/2M/E7E7E7E702]
INFO:cflib.crazyflie.platformservice:_crt_service_callback
INFO:cflib.crazyflie.platformservice:Request protocol version
INFO:cflib.crazyflie.platformservice:_platform_callback
INFO:cflib.crazyflie.platformservice:Protocol version (platform): 6
INFO:cflib.crazyflie.toc:TOC for port [5] found in cache
INFO:cflib.crazyflie:Log TOC finished updating
INFO:cflib.crazyflie.mem:6 memories found
INFO:cflib.crazyflie:Memories finished updating
INFO:cflib.crazyflie.toc:TOC for port [2] found in cache
INFO:cflib.crazyflie:Param TOC finished updating
INFO:cflib.crazyflie:Callback->Connection setup finished [radio://0/27/2M/E7E7E7E702]


locSrv.enRangeStreamFP32: 0
locSrv.enLhAngleStream: 0
locSrv.extPosStdDev: 0.009999999776482582
locSrv.extQuatStdDev: 0.0044999998062849045


INFO:cflib.crazyflie:All parameters updated
INFO:cflib.crazyflie:Closing link
INFO:cflib.crazyflie:Callback->Connection completed [radio://0/27/2M/E7E7E7E702]
INFO:cflib.crazyflie:Callback->Disconnected from [radio://0/27/2M/E7E7E7E702]


Parameter locSrv.x not found.


In [7]:
import cflib.crtp
from cflib.crazyflie import Crazyflie
from cflib.crazyflie.log import LogConfig
from cflib.crazyflie.syncCrazyflie import SyncCrazyflie
from cflib.crazyflie.syncLogger import SyncLogger
import logging

uri = 'radio://0/27/2M/E7E7E7E702'


def wait_for_position_estimator(scf):
    print('Waiting for estimator to find position...')

    log_config = LogConfig(name='Kalman Variance', period_in_ms=500)
    log_config.add_variable('kalman.varPX', 'float')
    log_config.add_variable('kalman.varPY', 'float')
    log_config.add_variable('kalman.varPZ', 'float')

    var_y_history = [1000] * 10
    var_x_history = [1000] * 10
    var_z_history = [1000] * 10

    threshold = 0.001

    with SyncLogger(scf, log_config) as logger:
        for log_entry in logger:
            data = log_entry[1]

            var_x_history.append(data['kalman.varPX'])
            var_x_history.pop(0)
            var_y_history.append(data['kalman.varPY'])
            var_y_history.pop(0)
            var_z_history.append(data['kalman.varPZ'])
            var_z_history.pop(0)

            min_x = min(var_x_history)
            max_x = max(var_x_history)
            min_y = min(var_y_history)
            max_y = max(var_y_history)
            min_z = min(var_z_history)
            max_z = max(var_z_history)

            # print('{} {} {}'.
            #       format(max_x - min_x, max_y - min_y, max_z - min_z))

            if (max_x - min_x) < threshold and (
                    max_y - min_y) < threshold and (
                    max_z - min_z) < threshold:
                break


In [9]:
with SyncCrazyflie(uri, cf=Crazyflie(rw_cache='./cache')) as scf:
    scf.cf.positioning


Exception: No driver found or malformed URI: radio://0/27/2M/E7E7E7E702

In [2]:
# -*- coding: utf-8 -*-
#
#     ||          ____  _ __
#  +------+      / __ )(_) /_______________ _____  ___
#  | 0xBC |     / __  / / __/ ___/ ___/ __ `/_  / / _ \
#  +------+    / /_/ / / /_/ /__/ /  / /_/ / / /_/  __/
#   ||  ||    /_____/_/\__/\___/_/   \__,_/ /___/\___/
#
#  Copyright (C) 2019 Bitcraze AB
#
#  Crazyflie Nano Quadcopter Client
#
#  This program is free software; you can redistribute it and/or
#  modify it under the terms of the GNU General Public License
#  as published by the Free Software Foundation; either version 2
#  of the License, or (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Simple example that connects to one crazyflie, sets the initial position/yaw
and flies a trajectory.

The initial pose (x, y, z, yaw) is configured in a number of variables and
the trajectory is flown relative to this position, using the initial yaw.

This example is intended to work with any absolute positioning system.
It aims at documenting how to take off with the Crazyflie in an orientation
that is different from the standard positive X orientation and how to set the
initial position of the kalman estimator.
"""
import math
import time

import cflib.crtp
from cflib.crazyflie import Crazyflie
from cflib.crazyflie.log import LogConfig
from cflib.crazyflie.syncCrazyflie import SyncCrazyflie
from cflib.crazyflie.syncLogger import SyncLogger
from cflib.utils import uri_helper

# URI to the Crazyflie to connect to
uri = uri_helper.uri_from_env(default='radio://0/27/2M/E7E7E7E702')

# Only output errors from the logging framework
#logging.basicConfig(level=logging.ERROR)

# Change the sequence according to your setup
#             x    y    z
sequence = [
    (0, 0, 0.7),
    (-0.7, 0, 0.7),
    (0, 0, 0.7),
    (0, 0, 0.2),
]


def wait_for_position_estimator(scf):
    print('Waiting for estimator to find position...')

    log_config = LogConfig(name='Kalman Variance', period_in_ms=500)
    log_config.add_variable('kalman.varPX', 'float')
    log_config.add_variable('kalman.varPY', 'float')
    log_config.add_variable('kalman.varPZ', 'float')

    var_y_history = [1000] * 10
    var_x_history = [1000] * 10
    var_z_history = [1000] * 10

    threshold = 0.001

    with SyncLogger(scf, log_config) as logger:
        for log_entry in logger:
            data = log_entry[1]

            var_x_history.append(data['kalman.varPX'])
            var_x_history.pop(0)
            var_y_history.append(data['kalman.varPY'])
            var_y_history.pop(0)
            var_z_history.append(data['kalman.varPZ'])
            var_z_history.pop(0)

            min_x = min(var_x_history)
            max_x = max(var_x_history)
            min_y = min(var_y_history)
            max_y = max(var_y_history)
            min_z = min(var_z_history)
            max_z = max(var_z_history)

            # print("{} {} {}".
            #       format(max_x - min_x, max_y - min_y, max_z - min_z))
            print(f"{data}")

            if (max_x - min_x) < threshold and (
                    max_y - min_y) < threshold and (
                    max_z - min_z) < threshold:
                break


def set_initial_position(scf, x, y, z, yaw_deg):
    scf.cf.param.set_value('kalman.initialX', x)
    scf.cf.param.set_value('kalman.initialY', y)
    scf.cf.param.set_value('kalman.initialZ', z)

    yaw_radians = math.radians(yaw_deg)
    scf.cf.param.set_value('kalman.initialYaw', yaw_radians)


def reset_estimator(scf):
    cf = scf.cf
    #cf.param.set_value('kalman.resetEstimation', '1')
    #time.sleep(0.1)
    #cf.param.set_value('kalman.resetEstimation', '0')

    wait_for_position_estimator(cf)


def run_sequence(scf, sequence, base_x, base_y, base_z, yaw):
    cf = scf.cf

    for position in sequence:
        print('Setting position {}'.format(position))

        x = position[0] + base_x
        y = position[1] + base_y
        z = position[2] + base_z

        for i in range(50):
            cf.commander.send_position_setpoint(x, y, z, yaw)
            time.sleep(0.1)

    cf.commander.send_stop_setpoint()
    # Hand control over to the high level commander to avoid timeout and locking of the Crazyflie
    cf.commander.send_notify_setpoint_stop()

    # Make sure that the last packet leaves before the link is closed
    # since the message queue is not flushed before closing
    time.sleep(0.1)


if __name__ == '__main__':
    cflib.crtp.init_drivers()

    # Set these to the position and yaw based on how your Crazyflie is placed
    # on the floor
    initial_x = 1.0
    initial_y = 1.0
    initial_z = 0.0
    initial_yaw = 90  # In degrees
    # 0: positive X direction
    # 90: positive Y direction
    # 180: negative X direction
    # 270: negative Y direction

    with SyncCrazyflie(uri, cf=Crazyflie(rw_cache='./cache')) as scf:
        #set_initial_position(scf, initial_x, initial_y, initial_z, initial_yaw)
        reset_estimator(scf)
        #run_sequence(scf, sequence,initial_x, initial_y, initial_z, initial_yaw)


Error no LogEntry to handle id=14
Error no LogEntry to handle id=13


Waiting for estimator to find position...
{'kalman.varPX': 0.00014963423018343747, 'kalman.varPY': 0.00028474637656472623, 'kalman.varPZ': 0.000789106241427362}
{'kalman.varPX': 0.00014963423018343747, 'kalman.varPY': 0.00028474637656472623, 'kalman.varPZ': 0.000789106241427362}
{'kalman.varPX': 7.136446220101789e-05, 'kalman.varPY': 7.526645640609786e-05, 'kalman.varPZ': 0.00023681431775912642}
{'kalman.varPX': 7.136446220101789e-05, 'kalman.varPY': 7.526645640609786e-05, 'kalman.varPZ': 0.00023681431775912642}
{'kalman.varPX': 6.370241317199543e-05, 'kalman.varPY': 0.00019446038641035557, 'kalman.varPZ': 0.0006831511855125427}
{'kalman.varPX': 6.370241317199543e-05, 'kalman.varPY': 0.00019446038641035557, 'kalman.varPZ': 0.0006831511855125427}
{'kalman.varPX': 5.623718971037306e-05, 'kalman.varPY': 0.00027858075918629766, 'kalman.varPZ': 0.001077299821190536}
{'kalman.varPX': 5.623718971037306e-05, 'kalman.varPY': 0.00027858075918629766, 'kalman.varPZ': 0.001077299821190536}
{'kalman

In [3]:
# -*- coding: utf-8 -*-
#
#     ||          ____  _ __
#  +------+      / __ )(_) /_______________ _____  ___
#  | 0xBC |     / __  / / __/ ___/ ___/ __ `/_  / / _ \
#  +------+    / /_/ / / /_/ /__/ /  / /_/ / / /_/  __/
#   ||  ||    /_____/_/\__/\___/_/   \__,_/ /___/\___/
#
#  Copyright (C) 2022 Bitcraze AB
#
#  This program is free software; you can redistribute it and/or
#  modify it under the terms of the GNU General Public License
#  as published by the Free Software Foundation; either version 2
#  of the License, or (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
This functionality is experimental and may not work properly!

Script to run a full base station geometry estimation of a lighthouse
system. The script records data from a Crazyflie that is moved around in
the flight space and creates a solution that minimizes the error
in the measured positions.

The execution of the script takes you through a number of steps, please follow
the instructions.

This script works with 2 or more base stations (if supported by the CF firmware).

This script is a temporary implementation until similar functionality has been
added to the client.

Prerequisites:
1. The base station calibration data must have been
received by the Crazyflie before this script is executed.

2. 2 or more base stations
'''
from __future__ import annotations

import logging
import pickle
import time
from threading import Event

import numpy as np

import cflib.crtp  # noqa
from cflib.crazyflie import Crazyflie
from cflib.crazyflie.mem.lighthouse_memory import LighthouseBsGeometry
from cflib.crazyflie.syncCrazyflie import SyncCrazyflie
from cflib.localization.lighthouse_bs_vector import LighthouseBsVectors
from cflib.localization.lighthouse_config_manager import LighthouseConfigWriter
from cflib.localization.lighthouse_geometry_solver import LighthouseGeometrySolver
from cflib.localization.lighthouse_initial_estimator import LighthouseInitialEstimator
from cflib.localization.lighthouse_sample_matcher import LighthouseSampleMatcher
from cflib.localization.lighthouse_sweep_angle_reader import LighthouseSweepAngleAverageReader
from cflib.localization.lighthouse_sweep_angle_reader import LighthouseSweepAngleReader
from cflib.localization.lighthouse_system_aligner import LighthouseSystemAligner
from cflib.localization.lighthouse_system_scaler import LighthouseSystemScaler
from cflib.localization.lighthouse_types import LhCfPoseSample
from cflib.localization.lighthouse_types import LhDeck4SensorPositions
from cflib.localization.lighthouse_types import LhMeasurement
from cflib.localization.lighthouse_types import Pose
from cflib.utils import uri_helper

REFERENCE_DIST = 1.0


def record_angles_average(scf: SyncCrazyflie) -> LhCfPoseSample:
    """Record angles and average over the samples to reduce noise"""
    recorded_angles = None

    is_ready = Event()

    def ready_cb(averages):
        nonlocal recorded_angles
        recorded_angles = averages
        is_ready.set()

    reader = LighthouseSweepAngleAverageReader(scf.cf, ready_cb)
    reader.start_angle_collection()
    is_ready.wait()

    angles_calibrated = {}
    for bs_id, data in recorded_angles.items():
        angles_calibrated[bs_id] = data[1]

    result = LhCfPoseSample(angles_calibrated=angles_calibrated)

    visible = ', '.join(map(lambda x: str(x + 1), recorded_angles.keys()))
    print(f'  Position recorded, base station ids visible: {visible}')

    if len(recorded_angles.keys()) < 2:
        print('Received too few base stations, we need at least two. Please try again!')
        result = None

    return result


def record_angles_sequence(scf: SyncCrazyflie, recording_time_s: float) -> list[LhCfPoseSample]:
    """Record angles and return a list of the samples"""
    result: list[LhCfPoseSample] = []

    bs_seen = set()

    def ready_cb(bs_id: int, angles: LighthouseBsVectors):
        now = time.time()
        measurement = LhMeasurement(timestamp=now, base_station_id=bs_id, angles=angles)
        result.append(measurement)
        bs_seen.add(str(bs_id + 1))

    reader = LighthouseSweepAngleReader(scf.cf, ready_cb)
    reader.start()
    end_time = time.time() + recording_time_s

    while time.time() < end_time:
        time_left = int(end_time - time.time())
        visible = ', '.join(sorted(bs_seen))
        print(f'{time_left}s, bs visible: {visible}')
        bs_seen = set()
        time.sleep(0.5)

    reader.stop()

    return result


def parse_recording_time(recording_time: str, default: int) -> int:
    """Interpret recording time input by user"""
    try:
        return int(recording_time)
    except ValueError:
        return default


def print_base_stations_poses(base_stations: dict[int, Pose]):
    """Pretty print of base stations pose"""
    for bs_id, pose in sorted(base_stations.items()):
        pos = pose.translation
        print(f'    {bs_id + 1}: ({pos[0]}, {pos[1]}, {pos[2]})')


def set_axes_equal(ax):
    '''Make axes of 3D plot have equal scale so that spheres appear as spheres,
    cubes as cubes, etc..  This is one possible solution to Matplotlib's
    ax.set_aspect('equal') and ax.axis('equal') not working for 3D.

    Input
    ax: a matplotlib axis, e.g., as output from plt.gca().
    '''

    x_limits = ax.get_xlim3d()
    y_limits = ax.get_ylim3d()
    z_limits = ax.get_zlim3d()

    x_range = abs(x_limits[1] - x_limits[0])
    x_middle = np.mean(x_limits)
    y_range = abs(y_limits[1] - y_limits[0])
    y_middle = np.mean(y_limits)
    z_range = abs(z_limits[1] - z_limits[0])
    z_middle = np.mean(z_limits)

    # The plot bounding box is a sphere in the sense of the infinity
    # norm, hence I call half the max range the plot radius.
    plot_radius = 0.5*max([x_range, y_range, z_range])

    ax.set_xlim3d([x_middle - plot_radius, x_middle + plot_radius])
    ax.set_ylim3d([y_middle - plot_radius, y_middle + plot_radius])
    ax.set_zlim3d([z_middle - plot_radius, z_middle + plot_radius])


def visualize(cf_poses: list[Pose], bs_poses: list[Pose]):
    """Visualize positions of base stations and Crazyflie positions"""
    # Set to True to visualize positions
    # Requires PyPlot
    visualize_positions = False
    if visualize_positions:
        import matplotlib.pyplot as plt

        positions = np.array(list(map(lambda x: x.translation, cf_poses)))

        fig = plt.figure()
        ax = fig.add_subplot(projection='3d')

        x_cf = positions[:, 0]
        y_cf = positions[:, 1]
        z_cf = positions[:, 2]

        ax.scatter(x_cf, y_cf, z_cf)

        positions = np.array(list(map(lambda x: x.translation, bs_poses)))

        x_bs = positions[:, 0]
        y_bs = positions[:, 1]
        z_bs = positions[:, 2]

        ax.scatter(x_bs, y_bs, z_bs, c='red')

        set_axes_equal(ax)
        print('Close graph window to continue')
        plt.show()


def write_to_file(name: str,
                  origin: LhCfPoseSample,
                  x_axis: list[LhCfPoseSample],
                  xy_plane: list[LhCfPoseSample],
                  samples: list[LhCfPoseSample]):
    with open(name, 'wb') as handle:
        data = (origin, x_axis, xy_plane, samples)
        pickle.dump(data, handle, protocol=pickle.HIGHEST_PROTOCOL)


def load_from_file(name: str):
    with open(name, 'rb') as handle:
        return pickle.load(handle)


def estimate_geometry(origin: LhCfPoseSample,
                      x_axis: list[LhCfPoseSample],
                      xy_plane: list[LhCfPoseSample],
                      samples: list[LhCfPoseSample]) -> dict[int, Pose]:
    """Estimate the geometry of the system based on samples recorded by a Crazyflie"""
    matched_samples = [origin] + x_axis + xy_plane + LighthouseSampleMatcher.match(samples, min_nr_of_bs_in_match=2)
    initial_guess, cleaned_matched_samples = LighthouseInitialEstimator.estimate(
        matched_samples, LhDeck4SensorPositions.positions)

    print('Initial guess base stations at:')
    print_base_stations_poses(initial_guess.bs_poses)

    print(f'{len(cleaned_matched_samples)} samples will be used')
    visualize(initial_guess.cf_poses, initial_guess.bs_poses.values())

    solution = LighthouseGeometrySolver.solve(initial_guess, cleaned_matched_samples, LhDeck4SensorPositions.positions)
    if not solution.success:
        print('Solution did not converge, it might not be good!')

    start_x_axis = 1
    start_xy_plane = 1 + len(x_axis)
    origin_pos = solution.cf_poses[0].translation
    x_axis_poses = solution.cf_poses[start_x_axis:start_x_axis + len(x_axis)]
    x_axis_pos = list(map(lambda x: x.translation, x_axis_poses))
    xy_plane_poses = solution.cf_poses[start_xy_plane:start_xy_plane + len(xy_plane)]
    xy_plane_pos = list(map(lambda x: x.translation, xy_plane_poses))

    print('Raw solution:')
    print('  Base stations at:')
    print_base_stations_poses(solution.bs_poses)
    print('  Solution match per base station:')
    for bs_id, value in solution.error_info['bs'].items():
        print(f'    {bs_id + 1}: {value}')

    # Align the solution
    bs_aligned_poses, transformation = LighthouseSystemAligner.align(
        origin_pos, x_axis_pos, xy_plane_pos, solution.bs_poses)

    cf_aligned_poses = list(map(transformation.rotate_translate_pose, solution.cf_poses))

    # Scale the solution
    bs_scaled_poses, cf_scaled_poses, scale = LighthouseSystemScaler.scale_fixed_point(bs_aligned_poses,
                                                                                       cf_aligned_poses,
                                                                                       [REFERENCE_DIST, 0, 0],
                                                                                       cf_aligned_poses[1])

    print()
    print('Final solution:')
    print('  Base stations at:')
    print_base_stations_poses(bs_scaled_poses)

    visualize(cf_scaled_poses, bs_scaled_poses.values())

    return bs_scaled_poses


def upload_geometry(scf: SyncCrazyflie, bs_poses: dict[int, Pose]):
    """Upload the geometry to the Crazyflie"""
    geo_dict = {}
    for bs_id, pose in bs_poses.items():
        geo = LighthouseBsGeometry()
        geo.origin = pose.translation.tolist()
        geo.rotation_matrix = pose.rot_matrix.tolist()
        geo.valid = True
        geo_dict[bs_id] = geo

    event = Event()

    def data_written(_):
        event.set()

    helper = LighthouseConfigWriter(scf.cf)
    helper.write_and_store_config(data_written, geos=geo_dict)
    event.wait()


def estimate_from_file(file_name: str):
    origin, x_axis, xy_plane, samples = load_from_file(file_name)
    estimate_geometry(origin, x_axis, xy_plane, samples)


def connect_and_estimate(uri: str, file_name: str | None = None):
    """Connect to a Crazyflie, collect data and estimate the geometry of the system"""
    print(f'Step 1. Connecting to the Crazyflie on uri {uri}...')
    with SyncCrazyflie(uri, cf=Crazyflie(rw_cache='./cache')) as scf:
        print('  Connected')
        print('')
        print('In the 3 following steps we will define the coordinate system.')

        print('Step 2. Put the Crazyflie where you want the origin of your coordinate system.')

        origin = None
        do_repeat = True
        while do_repeat:
            input('Press return when ready. ')
            print('  Recording...')
            measurement = record_angles_average(scf)
            do_repeat = False
            if measurement is not None:
                origin = measurement
            else:
                do_repeat = True

        print(f'Step 3. Put the Crazyflie on the positive X-axis, exactly {REFERENCE_DIST} meters from the origin. ' +
              'This position defines the direction of the X-axis, but it is also used for scaling of the system.')
        x_axis = []
        do_repeat = True
        while do_repeat:
            input('Press return when ready. ')
            print('  Recording...')
            measurement = record_angles_average(scf)
            do_repeat = False
            if measurement is not None:
                x_axis = [measurement]
            else:
                do_repeat = True

        print('Step 4. Put the Crazyflie somehere in the XY-plane, but not on the X-axis.')
        print('Multiple samples can be recorded if you want to, type "r" before you hit enter to repeat the step.')
        xy_plane = []
        do_repeat = True
        while do_repeat:
            do_repeat = 'r' == input('Press return when ready. ').lower()
            print('  Recording...')
            measurement = record_angles_average(scf)
            if measurement is not None:
                xy_plane.append(measurement)
            else:
                do_repeat = True

        print()
        print('Step 5. We will now record data from the space you plan to fly in and optimize the base station ' +
              'geometry based on this data. Move the Crazyflie around, try to cover all of the space, make sure ' +
              'all the base stations are received and do not move too fast.')
        default_time = 20
        recording_time = input(f'Enter the number of seconds you want to record ({default_time} by default), ' +
                               'recording starts when you hit enter. ')
        recording_time_s = parse_recording_time(recording_time, default_time)
        print('  Recording started...')
        samples = record_angles_sequence(scf, recording_time_s)
        print('  Recording ended')

        if file_name:
            write_to_file(file_name, origin, x_axis, xy_plane, samples)
            print(f'Wrote data to file {file_name}')

        print('Step 6. Estimating geometry...')
        bs_poses = estimate_geometry(origin, x_axis, xy_plane, samples)
        print('  Geometry estimated')

        print('Step 7. Upload geometry to the Crazyflie')
        input('Press enter to upload geometry. ')
        upload_geometry(scf, bs_poses)
        print('Geometry uploaded')


# Only output errors from the logging framework
logging.basicConfig(level=logging.ERROR)

if __name__ == '__main__':
    # Initialize the low-level drivers
    cflib.crtp.init_drivers()

    uri = uri_helper.uri_from_env(default='radio://0/27/2M/E7E7E7E702')

    # Set a file name to write the measurement data to file. Useful for debugging
    file_name = None
    # file_name = 'lh_geo_estimate_data.pickle'

    connect_and_estimate(uri, file_name=file_name)

    # Run the estimation on data from file instead of live measurements
    # estimate_from_file(file_name)

Step 1. Connecting to the Crazyflie on uri radio://0/27/2M/E7E7E7E702...
  Connected

In the 3 following steps we will define the coordinate system.
Step 2. Put the Crazyflie where you want the origin of your coordinate system.


In [1]:
import pickle
def load_from_file(name: str):
    with open(name, 'rb') as handle:
        return pickle.load(handle)
loaded = load_from_file('CF_Aligned_POSES.pickle')
print(loaded)

[<cflib.localization.lighthouse_types.Pose object at 0x7f493c0d7e20>, <cflib.localization.lighthouse_types.Pose object at 0x7f493c0d6140>, <cflib.localization.lighthouse_types.Pose object at 0x7f48ef708ac0>, <cflib.localization.lighthouse_types.Pose object at 0x7f48ef708250>, <cflib.localization.lighthouse_types.Pose object at 0x7f48c6f08580>, <cflib.localization.lighthouse_types.Pose object at 0x7f48c6f087c0>, <cflib.localization.lighthouse_types.Pose object at 0x7f48c6f08a00>, <cflib.localization.lighthouse_types.Pose object at 0x7f48c6f08970>, <cflib.localization.lighthouse_types.Pose object at 0x7f48c6f088e0>, <cflib.localization.lighthouse_types.Pose object at 0x7f48c6f08850>, <cflib.localization.lighthouse_types.Pose object at 0x7f48c6f098d0>, <cflib.localization.lighthouse_types.Pose object at 0x7f48c6f09450>, <cflib.localization.lighthouse_types.Pose object at 0x7f48c6f08520>, <cflib.localization.lighthouse_types.Pose object at 0x7f48c6f08610>, <cflib.localization.lighthouse_ty

In [7]:
print(loaded[100].translation)

[0.40196904 0.21358562 0.69415261]
