In [259]:
import time

import asyncio
from math import floor, ceil, radians, sin, cos, sqrt, degrees, atan2, pi
# nest_asyncio allows async operations within the Juypeter Notebook
import nest_asyncio
nest_asyncio.apply()

In [260]:


def translateValue(valueIn,minIn,maxIn,minOut,maxOut):
    ''' Translate value from one range to another '''
    valueIn = min(maxIn, max(minIn, valueIn)) # clamp the input value
    valueOut =(valueIn-minIn)/(maxIn-minIn)*(maxOut-minOut)+minOut # translate the value
    return valueOut

def normalize(angle):
    '''Normalize radians to -PI to PI radians range'''
    return (angle + pi) % (2 * pi) - pi

def convert_dm_dd(degree :str,minutes :str, hemi :str) -> str:
    '''
    Converts "degrees, decimal.minutes" -> degrees.decimal:str 

    Returns tuple :
        float   degrees.decimal with 6 digits of precision,
        integer with 9 digits of precision

    e.g.
    49.6939697, 496939697 =  convert_dm_dd("49","41.638187","N") 

    Micropyhton has poor floating point precision
    this means that distance and bearing calculations are too inaccurate.
    The solution here provides float values, but also integer values specific for the 
    distance and bearing calculation.
    ''' 
    degree = int(degree)
    minuite, minuite_decimal = minutes.split('.')
    degree_decimal  = int(minuite + minuite_decimal) // 6

    if hemi in ['S','W']:
        degree=degree * -1

    dd = str(degree)+"."+str(degree_decimal) 
    dd = '{:<010s}'.format(dd) # Micropython 10 Digits Precision (2147483647), 
    dd = dd[:10] #we choose 9 digit precision

    return dd

def convert_dd_int(degreedecimal:str) -> int:
    '''
    Converts degree decimal to integer
 
    Micropython has limited floading precision of 10 Digits.
    Some functions like math.cos, have results that are too imprecise for navigation
    The poor mans fix is to do these calculation with integers

    Args:
        degreedecimal: "49.6939697" 

    Returns:
        integer 496939697 suitable for precise calculations
       
    '''
    degree, decimal = degreedecimal.split('.')
    dd = str(degree)+str(decimal) 
    dd = '{:<09s}'.format(dd) # Micropython 10 Digits Precision (2147483647), we choose 9 digit precision
    dd = dd[:9]
    return int(dd)

    
def positionDifference(position_str:str,destination_str:str) -> tuple:
    ''' 
        Provides the longitudal and latitudal differnce in meters
        This calculation is suitable for distances 10km or less and
        it is customized to mitigate the poor floating point precision of Micropython.
        For larger distances the Haversine formula would be needed
    '''

    lat_p = convert_dd_int(position_str[0])
    lon_p = convert_dd_int(position_str[1])

    lat_d = convert_dd_int(destination_str[0])
    lon_d = convert_dd_int(destination_str[1])

    # delta longitude, latitude, in arcdegrees
    dx_arc = lon_d-lon_p 
    dy_arc = lat_d-lat_p 


    # correct dx_arc due to the curvature of the earth
    latA = float(destination_str[0])
    latB = float(position_str[0])
    latAvg = (latA+latB)/2
    correct_dx_arc = dx_arc*cos(radians(latAvg))   

    # The distances dx and dy are arcdegrees, which is then converted to meters
    dx_meters = correct_dx_arc * 0.011112       # 111120 / 10000000 # 111120 meters in an arcdegree
    dy_meters = dy_arc * 0.011112           	# 111120 / 10000000 # 111120 meters in an arcdegree

    return dx_meters, dy_meters

 
def distanceBearing(gps_current_position_str:tuple,gps_target_position_str:tuple) -> tuple:
    '''
    Provides distance (meters:float) and bearing (degrees:float) from a position to a destination 
    '''

    dx_meters, dy_meters = positionDifference(gps_current_position_str,gps_target_position_str)
    
    distance_meters = round(sqrt(dx_meters**2 + dy_meters**2),1)   # distance in meters, limted to 1 decimal
    bearing_rad = round(atan2(dx_meters,dy_meters),3) # bearing in radians, limited to 3 decimals

    return distance_meters, bearing_rad ,dx_meters, dy_meters

In [261]:
gps_current_position =  ("49.69440585066409","10.827450473363868" ) #Home orkshop
#destination = ("49.69848278709498","10.85355993252834" )  #HBurkhardtome
gps_target_position = ("49.684309507663706", "10.837448319639334") #Krausenbechhofen
#destination = ("49.694428996515875", "10.827444023181") # Fence Lin
#destination=("49.694407742568984", "10.827534547767414") # Kellerstrasse at Workshop




In [None]:

class Store(object):
    ''' Central Point of Truth '''
    _instance = None # is a singleton

    @classmethod
    def instance(cls):
        if cls._instance is None:
            cls._instance = Store()
            cls._instance._initialize()
        return cls._instance

    def _initialize( self ):
        
        #RoboBuoy
        self.current_speed = 0
        self.current_course = 0

        # GPS 
        self.gps_course = 1    
        self.gps_current_position =  ("49.69440585066409","10.827450473363868" ) #Home 

        # Compass 
        self.magnetic_course = 0
        self.fuse_compass_alpha = 0.3
        self.magnetic_declinaiton = 0
        self.true_magnetic_estimate = 0

        # Gyro
        self.gyro_course = 0
        self.fuse_gyro_alpha = 0.3
        self.gyro_declinaiton = 0
        self.true_gyro_estimate = 0
        
        # Target 
        self.gps_target_position = ("49.684309507663706", "10.837448319639334") #Krausenbechhofen
        self.distance = 0
        self.bearing = 0
        self.dLon = 0
        self.dLat = 0

        
      


        

In [263]:

store = Store.instance()
distance, bearing, dLon, dLat = distanceBearing(store.gps_current_position,store.gps_target_position)
store.distance = distance
store.bearing = bearing
store.dLon = dLon
store.dLat = dLat

In [264]:
'''
async def main():
   print("Main was called")
   asyncio.create_task(control_position())

asyncio.run(main())
'''

'\nasync def main():\n   print("Main was called")\n   asyncio.create_task(control_position())\n\nasyncio.run(main())\n'

In [None]:
async def fuseMagneticCourseTask():
    ''' 
        fuses the gps_course with magnetic_course 
        using a complementary filter generates a magnetic_declination
        uses magnetic_declination to correct the magnetic_course
        produces true_course 
    '''
    store = Store.instance()
    try:
        print('starting fuseCompassTask')
        while True:
            
                await asyncio.sleep(0.1)
                
                # Read the course value to fuse
                # store.magnetic_course = readMagCourse()
            
                # Estimate
                store.true_magnetic_estimate = store.fuse_compass_alpha * store.gps_course + (1-store.fuse_compass_alpha) * (store.magnetic_course + store.magnetic_declinaiton) 

                # Update the magnetic_declinaiton
                store.magnetic_declinaiton =   store.true_magnetic_estimate - store.magnetic_course  
                print(f"gps_course={store.gps_course:.2f}_rad, magnetic_course={store.magnetic_course:.2f}_rad, magnetic_declinaiton={store.magnetic_declinaiton:.2f}, true_magnetic_estimate={store.true_magnetic_estimate:.2f}")

    except asyncio.CancelledError:
        print( "stopping fuseMagneticCourseTask" )


In [None]:
async def fuseGyroCourseTask():

    store = Store.instance()
    try:
        print('starting fuseGyroCourseTask')
        while True:
            
                await asyncio.sleep(0.01)

                # Read the gyro value to fuse
                # store.magnetic_course = readGyroCourse()
                   # Integrate the gyro, update the current course

                #_,_,gyro_z,deltaT = readCalibratedGyro()

                gyro_z = 0.01 # radians per second
                deltaT = 0.01 # dt

                store.current_course =  store.current_course + gyro_z * deltaT 

                print(f"current_course={store.current_course:.2f}_rad, magnetic_course={store.magnetic_course:.2f}_rad, magnetic_declinaiton={store.magnetic_declinaiton:.2f}, true_magnetic_estimate={store.true_magnetic_estimate:.2f}")


    except asyncio.CancelledError:
        print( "stopping fuseGyroCourseTask" )

In [267]:
async def main():
   print("fuseMagneticCourseTask")
   fuseMagneticCourse = asyncio.create_task(fuseMagneticCourseTask())
   await asyncio.sleep(5)
   fuseMagneticCourse.cancel()


asyncio.run(main())

fuseMagneticCourseTask
starting fuseCompassTask
gps_course=1.00_rad, magnetic_course=0.00_rad, magnetic_declinaiton=0.30, true_magnetic_estimate=0.30
gps_course=1.00_rad, magnetic_course=0.00_rad, magnetic_declinaiton=0.51, true_magnetic_estimate=0.51
gps_course=1.00_rad, magnetic_course=0.00_rad, magnetic_declinaiton=0.66, true_magnetic_estimate=0.66
gps_course=1.00_rad, magnetic_course=0.00_rad, magnetic_declinaiton=0.76, true_magnetic_estimate=0.76
gps_course=1.00_rad, magnetic_course=0.00_rad, magnetic_declinaiton=0.83, true_magnetic_estimate=0.83
gps_course=1.00_rad, magnetic_course=0.00_rad, magnetic_declinaiton=0.88, true_magnetic_estimate=0.88
gps_course=1.00_rad, magnetic_course=0.00_rad, magnetic_declinaiton=0.92, true_magnetic_estimate=0.92
gps_course=1.00_rad, magnetic_course=0.00_rad, magnetic_declinaiton=0.94, true_magnetic_estimate=0.94
gps_course=1.00_rad, magnetic_course=0.00_rad, magnetic_declinaiton=0.96, true_magnetic_estimate=0.96
gps_course=1.00_rad, magnetic_cour