In [2]:
# Use local airsim package (DO NOT pip install airsim, UNINSTALL IF ALREADY INSTALLED > "pip uninstall airsim")
import airsim 

import numpy as np
import math

# To view images
from PIL import Image
import io

# To estimate code time, gets timestamp
import time

#gets timestamp
import datetime

# To play sounds
import winsound

import threading
import sys

################################################################################################################################################################################
################################################################################################################################################################################

#Feedback Haptic effects
from pathlib import Path
sys.path.append(str(Path().resolve().parent).replace('\\','/') + '/Nitho SDK')
import sdl2 #Local sdl2 dir which is a wrapper to SDL2.dll

sdl2.SDL_Init(sdl2.SDL_INIT_JOYSTICK)
joystick = sdl2.SDL_JoystickOpen(0)
def getBrakeVal():
    sdl2.SDL_PumpEvents()
    joy_y = sdl2.SDL_JoystickGetAxis(joystick, 1) 
    joy_y = (joy_y / 32767)
    if(joy_y < 0.001):
        joy_y=0
    return joy_y

class Haptic:

    def __init__(self):
        sdl2.SDL_Init(sdl2.SDL_INIT_TIMER | sdl2.SDL_INIT_JOYSTICK | sdl2.SDL_INIT_HAPTIC)

        print("Trying to find haptics")
        if (sdl2.SDL_NumHaptics() == 0):
            print("No haptic devices found")
            sdl2.SDL_Quit()

        for index in range(0, sdl2.SDL_NumHaptics()):
	        print("Found", index, ":", sdl2.SDL_HapticName(index))
    
        index = 0

        self.haptic = sdl2.SDL_HapticOpen(index)
        if self.haptic == None:
            print("Unable to open device")
            sdl2.SDL_Quit()
            exit(0)
        else:
            print("Using device", index)

        self.nefx = 0
        self.efx = [0] * 12
        self.id = [0] * 12
        supported = sdl2.SDL_HapticQuery(self.haptic)
    
        if (supported & sdl2.SDL_HAPTIC_SINE):
            print("   effect", self.nefx, "Sine Wave")
            self.efx[self.nefx] = sdl2.SDL_HapticEffect(type=sdl2.SDL_HAPTIC_SINE, periodic= \
                sdl2.SDL_HapticPeriodic(type=sdl2.SDL_HAPTIC_SINE, direction=sdl2.SDL_HapticDirection(type=sdl2.SDL_HAPTIC_POLAR, dir=(9000,0,0)), \
                period=1000, magnitude=0x4000, length=5000, attack_length=1000, fade_length=1000))
            self.id[self.nefx] = sdl2.SDL_HapticNewEffect(self.haptic, self.efx[self.nefx])
            self.nefx += 1

        if (supported & sdl2.SDL_HAPTIC_TRIANGLE):
            print("   effect", self.nefx, "Triangle")
            self.efx[self.nefx] = sdl2.SDL_HapticEffect(type=sdl2.SDL_HAPTIC_TRIANGLE, periodic= \
                sdl2.SDL_HapticPeriodic(type=sdl2.SDL_HAPTIC_SINE, direction=sdl2.SDL_HapticDirection(type=sdl2.SDL_HAPTIC_CARTESIAN, dir=(1,0,0)), \
                period=1000, magnitude=0x4000, length=5000, attack_length=1000, fade_length=1000))
            self.id[self.nefx] = sdl2.SDL_HapticNewEffect(self.haptic, self.efx[self.nefx])
            self.nefx += 1

        if (supported & sdl2.SDL_HAPTIC_SAWTOOTHUP):
            print("   effect", self.nefx, "Sawtooth Up")
            self.efx[self.nefx] = sdl2.SDL_HapticEffect(type=sdl2.SDL_HAPTIC_SAWTOOTHUP, periodic= \
                sdl2.SDL_HapticPeriodic(type=sdl2.SDL_HAPTIC_SAWTOOTHUP, direction=sdl2.SDL_HapticDirection(type=sdl2.SDL_HAPTIC_POLAR, dir=(9000,0,0)), \
                period=500, magnitude=0x5000, length=5000, attack_length=1000, fade_length=1000))
            self.id[self.nefx] = sdl2.SDL_HapticNewEffect(self.haptic, self.efx[self.nefx])
            self.nefx += 1

        if (supported & sdl2.SDL_HAPTIC_SAWTOOTHDOWN):
            print("   effect", self.nefx, "Sawtooth Down")
            self.efx[self.nefx] = sdl2.SDL_HapticEffect(type=sdl2.SDL_HAPTIC_SAWTOOTHDOWN, periodic= \
                sdl2.SDL_HapticPeriodic(type=sdl2.SDL_HAPTIC_SAWTOOTHDOWN, direction=sdl2.SDL_HapticDirection(type=sdl2.SDL_HAPTIC_CARTESIAN, dir=(1,0,0)), \
                period=500, magnitude=0x5000, length=5000, attack_length=1000, fade_length=1000))
            self.id[self.nefx] = sdl2.SDL_HapticNewEffect(self.haptic, self.efx[self.nefx])
            self.nefx += 1

        if (supported & sdl2.SDL_HAPTIC_RAMP):
            print("   effect", self.nefx, "Ramp")
            self.efx[self.nefx] = sdl2.SDL_HapticEffect(type=sdl2.SDL_HAPTIC_RAMP, ramp= \
                sdl2.SDL_HapticRamp(type=sdl2.SDL_HAPTIC_RAMP, direction=sdl2.SDL_HapticDirection(type=sdl2.SDL_HAPTIC_POLAR, dir=(9000,0,0)), \
                start=0x5000, end=0x0000, length=5000, attack_length=1000, fade_length=1000))
            self.id[self.nefx] = sdl2.SDL_HapticNewEffect(self.haptic, self.efx[self.nefx])
            self.nefx += 1

        if (supported & sdl2.SDL_HAPTIC_CONSTANT):
            print("   effect", self.nefx, "Constant Force")
            self.efx[self.nefx] = sdl2.SDL_HapticEffect(type=sdl2.SDL_HAPTIC_CONSTANT, constant= \
                sdl2.SDL_HapticConstant(type=sdl2.SDL_HAPTIC_CONSTANT, direction=sdl2.SDL_HapticDirection(type=sdl2.SDL_HAPTIC_CARTESIAN, dir=(1,0,0)), \
                length=5000, level=0x4000, attack_length=1000, fade_length=1000))
            self.id[self.nefx] = sdl2.SDL_HapticNewEffect(self.haptic, self.efx[self.nefx])
            self.nefx += 1
    
    def start_haptic(self,haptic_number):
        sdl2.SDL_HapticRunEffect(self.haptic, self.id[haptic_number], 1)

    def stop_haptic(self):
        sdl2.SDL_HapticStopAll(self.haptic)

import time
h = Haptic()

#h.start_haptic(5) #Starts Haptic Feedback, Value is haptic effect number
#h.stop_haptic() #Stops Haptic Feedback

################################################################################################################################################################################
################################################################################################################################################################################

#(Only on jupyter notebook) I've been experiencing an issue/bug where calling "airsim.CarClient()" makes jupyter kernel stuck in a loop and no other commands can be run.
#So we're going to call it from a new thread

def Find_BADAS_ClientNormal():
    global client
    client = airsim.CarClient()

def Find_BADAS_Client(): #Thread
    threading.Thread(target=Find_BADAS_ClientNormal).start()

Find_BADAS_Client() #Connect to client, call one time

Trying to find haptics
Found 0 : b'LS-USBMX1/2/3 Steering\xc2\xa1\xc2\xad'
Using device 0
   effect 0 Sine Wave
   effect 1 Triangle
   effect 2 Sawtooth Up
   effect 3 Sawtooth Down
   effect 4 Ramp
   effect 5 Constant Force


# Emergency Brake Functions

In [None]:
def BeepAlert():
    for i in range(5):
        winsound.Beep(1000, 100)
        
def StartFullBrake():
    client.setBrakeInput(1)
    threading.Thread(target=BeepAlert).start()

def StopBrake():
    client.setBrakeInput(0)
        
# Overrides brake control and brakes until speed is less than 1        
def EmergencyBrake_TillCarStop():
    h.start_haptic(5) # Start Steering wheel Haptic
    StartFullBrake()
    
#     Keep brakes on till car speed almost zero
    while(client.getCarState().speed > 1):
        time.sleep(0.1)
    
    StopBrake()
    h.stop_haptic()

# Brake Test after 5 sec

In [None]:
time.sleep(6)
EmergencyBrake_TillCarStop()

# Prints car speed every half second

In [None]:
while(True):
    time.sleep(0.5)
    print(client.getCarState().speed)

# Take one image and show it

In [None]:
png_image = client.simGetImage("1", airsim.ImageType.Scene)
Image.open(io.BytesIO(png_image)).show()

# Helpful

In [None]:
# Get multiple images in one call (Faster than getting a single image in a for loop)
responses = client.simGetImages([ airsim.ImageRequest(0, airsim.ImageType.Scene), 
                               airsim.ImageRequest(0, airsim.ImageType.Scene), 
                               airsim.ImageRequest(0, airsim.ImageType.Scene), 
                               airsim.ImageRequest(0, airsim.ImageType.Scene), 
                               airsim.ImageRequest(0, airsim.ImageType.Scene), 
                               airsim.ImageRequest(0, airsim.ImageType.Scene)])



# reset driver control
client.enableApiControl(False) # Enables keyboard/joystick controls

#     Control car Api, fully disables player control
#     car_controls = airsim.CarControls()
#     client.enableApiControl(True) # Immediately disables keyboard/joystick controls
#     car_controls.brake = 1 # Will not immediately set brakes to true, still needs "client.setCarControls(car_controls)"" 
#     client.setCarControls(car_controls)  # Must be set after changing properties of "car_controls"
#     # WAIT X SECONDS
#     car_controls.brake = 0
#     client.setCarControls(car_controls)
#     client.enableApiControl(False) # Enables back keyboard/joystick controls

# to estimate code time
start = time.time()
# 
print(time.time() - start)

# Run cmd command at proj root dir
import os
project_root = dirname(dirname("__file__"))
# os.system("start / wait cmd " + project_root + "/k test.exe")
os.system("start cmd " + project_root + "/k test.exe")

# Print freq of frames (Scene View) is taken from 1 camera

In [None]:
while(True):

    x = 0
    start = time.time()

    while(True):
        png_image = client.simGetImage("0", airsim.ImageType.Scene)
        if(time.time() - start > 1):
            break
        x+=1

    print(x)

# Print freq of frames (Scene View) is taken from 2 cameras

In [None]:
while(True):

    x = 0
    start = time.time()

    while(True):
        responses = client.simGetImages([ airsim.ImageRequest(1, airsim.ImageType.Scene)
                                        , airsim.ImageRequest(2, airsim.ImageType.Scene)
                                        ]) 
        if(time.time() - start > 1):
            break
        x+=1

    print(x)

In [None]:
# DEPRECATED, USE DISTANCE FROM ENGINE INSTEAD
# Estimate frequency of calculating nearest objects distances per second

#Rotate camera upwards to prevent getting distances from ground 
client.simSetCameraOrientation(0, airsim.to_quaternion(0.5, 0, 0)); #radians

while(True):

    x = 0
    start = time.time()

    while(True):
        responses = client.simGetImages([ airsim.ImageRequest(0, airsim.ImageType.DepthPerspective, True)])
        minDist = min(float(s) for s in responses[0].image_data_float)
#         print(minDist)
        
        if(time.time() - start > 1):
            break
        x+=1
        
    print(x)

# Calculate actual distance from two positions of car

In [None]:
#Run at position 1
v = client.simGetVehiclePose().position
x1 = v.x_val
y1 = v.y_val
z1 = v.z_val

In [None]:
#Run at position 2 to get distance
v = client.simGetVehiclePose().position
x2 = v.x_val
y2 = v.y_val
z2 = v.z_val

def calculateDistance(x1,y1,z1,x2,y2,z2):  
     return math.sqrt((x2 - x1)**2 + (y2 - y1)**2 + (z2 - z1)**2)

calculateDistance(x1,y1,z1,x2,y2,y2)

# New functions for BADAS
Only works with the BADAS Sim versions. Make sure to have the latest version.

In [None]:
#Testing getAdasPacket()
while (True):
    time.sleep(1)
    x = client.getAdasPacket()
    print('Event Number Cars: ' , x[0])
    print('Distance Cars: ' , x[1])
    print('Event Number Pedestrian: ' , x[2])
    print('Distance Pedestrian: ' , x[3])

In [None]:
#Get brake value
while (True):
    time.sleep(1)
    x = client.getCarControls() #CarState vs CarControls?
    print(x['brake'])

In [None]:
# #Test maneuver
# #Steering without braking
# maxSteerAngle = 50
# def Steer(angle):
#     car_controls = airsim.CarControls()
#     car_controls.throttle = 0
#     car_controls.steering = angle/maxSteerAngle
#     client.enableApiControl(True)
#     client.setCarControls(car_controls)
#     time.sleep(10)
#     client.enableApiControl(False)
# time.sleep(5)    
# Steer(25)

In [3]:
def unit_vector(vector):
    """ Returns the unit vector of the vector.  """
    return vector / np.linalg.norm(vector)

def angle_between(v1, v2):
    v1_u = unit_vector(v1)
    v2_u = unit_vector(v2)
    return np.arccos(np.clip(np.dot(v1_u, v2_u), -1.0, 1.0))

#BADAS v2.6+
def SteerByAngle(angle, direction):
    
    FirstVec = client.getSuvFwdVec() #Get first forward vector
    
    car_controls = airsim.CarControls()
    
    car_controls.throttle = 0 # Change later, need to meet certain velocity depending on manuever equation
    
    if(direction == 'right'):
        car_controls.steering = 1
    elif(direction == 'left'):
        car_controls.steering = -1
        
    #Start steering    
    client.enableApiControl(True)  
    client.setCarControls(car_controls)
    
    #Keep steering until required angle is met
    while (True):
        SecVec = client.getSuvFwdVec()
        radians = angle_between( (FirstVec['X'], FirstVec['Y']), (SecVec['X'], SecVec['Y'])) #Get angle betwen first and second forward vectors
        
        if(math.degrees(radians) >= angle): #Stop steering if required angle is met
            break
        
        if(client.getCarState().speed < 1): #Stop steering in case of errors
            break
            
        time.sleep(0.01)
        
    client.enableApiControl(False) # Return driver full controls



In [4]:
#Test
time.sleep(5)    
SteerByAngle(15,'right')
time.sleep(0.2)   
SteerByAngle(15,'left')

In [19]:
import time

time.time()

1550253117.417256

In [32]:
import time
millis1 = int(round(time.time() * 1000))

time.sleep(0.1)

millis2 = int(round(time.time() * 1000))

print(millis2-millis1)

100


In [2]:
Find_BADAS_Client() #Connect to client, call one time

In [8]:
timeSinceBrakeEvent = 0
timeOfBrakePressed = 0
while True:
    AdasPacket = client.getAdasPacket()
    if AdasPacket[0] == 1:
        timeSinceBrakeEvent = time.time()
        print('event happened')
        break
while True:
    BrakeVal = getBrakeVal()
    if BrakeVal > 0:
        timeOfBrakePressed = time.time()
        print('brake happened')
        break
T = timeOfBrakePressed - timeSinceBrakeEvent
print('Time to brake after emerg: ' ,  T)

event happened
brake happened
Time to brake after emerg:  0.5652458667755127
