In [1]:
import os
import scenic
scenic_script = "./examples/carla/experiment3.scenic"
scenario = scenic.scenarioFromFile(scenic_script)



In [32]:
import itertools
i = [1,2,4,3]
j = [4,5]
p = set(itertools.permutations(i))
print(p)
g = removeInfeasiblePermutation([1], p)
print(g)

{(1, 3, 2), (1, 2, 3), (2, 1, 3), (3, 2, 1), (3, 1, 2), (2, 3, 1)}
(1, 3, 2)
(1, 2, 3)
infeasible_permutation:  {(1, 3, 2), (1, 2, 3)}
p_tuple
{(2, 3, 1), (3, 2, 1), (2, 1, 3), (3, 1, 2)}


In [30]:
def removeInfeasiblePermutation(failed_list, permutation_set):
    infeasible_permutation = set()
    for p_tuple in permutation_set:
        if tupleStartsWith(failed_list, p_tuple):
            infeasible_permutation.add(p_tuple)
    return permutation_set.difference(infeasible_permutation)
            
def tupleStartsWith(failed_list, permutation_tuple):
    for i in range(len(failed_list)):
        if failed_list[i] != permutation_tuple[i]:
            return False
    return True

In [35]:
from scenic.core.vectors import OrientedVector, Vector
import matplotlib.pyplot as plt
from scenic.core.type_support import toVector
from scenic.core.regions import PolygonalRegion, PointInRegionDistribution, SectorRegion
import os.path as path
from scenic.core.regions import SectorRegion
from scenic.core.geometry import triangulatePolygon, normalizeAngle
from shapely.geometry.polygon import Polygon
from shapely.geometry.multipolygon import MultiPolygon
from scenic.core.distributions import *
import math
import subprocess
import os


# TODO : need to handle jointly dependent objs as well
def isFeatureValid(feature, label, smt_file_path, cached_variables, featureType, debug=False, falseTesting=False):
    
    # TODO : need to handle jointly dependent objs as well
    if os.path.isfile(smt_file_path):
        os.remove(smt_file_path)
    
    open(smt_file_path, 'w').close()
    writeSMTtoFile(smt_file_path, '(set-logic QF_NRA)')

    # Instantiate cached_variables dictionary
    x, y, label_feature = None,None,None
    
    if not falseTesting:
        if featureType == 'position':
            label_feature = (str(label.x), str(label.y))
        if featureType == 'heading':
            label_feature = str(label)
    else:
        x = 1144.11438466291
        y = 1085.6681557273737
        heading = -0.293950447126917
        if featureType == 'position':
            label_feature = (str(x), str(y))
        
        
    # Encode object's position to SMT formula
    smt_var = feature.encodeToSMT(smt_file_path, cached_variables, debug = debug)
    
    if featureType == 'position':
        (x_cond, y_cond) = vector_operation_smt(label_feature, "equal", smt_var)
        writeSMTtoFile(smt_file_path, smt_assert(None, smt_and(x_cond, y_cond)))
    else:
        writeSMTtoFile(smt_file_path, smt_assert(None, smt_equal(label_feature, smt_var)))
    writeSMTtoFile(smt_file_path, "(check-sat)")
    writeSMTtoFile(smt_file_path, "(exit)")

    if not debug:
        if subprocess.call("./run_smt_encoding.sh") == 1:
            return True
        else:
            return False
    else:
        print("DEBUG MODE: NO OUTPUT WILL BE RETURNED")
            
    return None

def validateLabelTesting(scenario, debug=False, falseTesting=False):
    ego_visibleDistance = 10 #meters
    ego_viewAngle = 135 #deg
    smt_file_path = './test_smt_encoding.smt2'
    
    resetScenarioDependency(scenario)
    
    if not falseTesting:
        sample = scenario.generate()
        scene = sample[0]
        label_ego_pos = scene.objects[0].position
        label_ego_heading = scene.objects[0].heading
    else:
        x = 1144.11438466291
        y = 1085.6681557273737
        label_ego_pos = Vector(x,y)
        label_ego_heading = -0.293950447126917
    
    # Create Ego's VisibleRegion
    cached_variables = {}
    cached_variables['variables'] = []
    egoVisibleRegion = SectorRegion(label_ego_pos, ego_visibleDistance, label_ego_heading, \
                                    math.radians(ego_viewAngle))
    cached_variables['egoVisibleRegion'] = egoVisibleRegion

    for i in range(len(scenario.objects)):
        print("i: ",i)
        obj = scenario.objects[i]
        label_feature = scene.objects[i].position
        print("position: ", label_feature)
        feature = obj.position
        if isFeatureValid(feature, label_feature, smt_file_path,cached_variables, featureType='position', \
                          debug=debug, falseTesting=falseTesting):
            print("position is valid")
            # condition the validated feature with the label's feature
            feature.conditionTo(label_feature)
        else:
            print("NOT VALID: POSITION")
            return False

        label_feature = scene.objects[i].heading
        print("heading: ", label_feature)
        feature = obj.heading
        if isFeatureValid(feature, label_feature, smt_file_path, cached_variables, featureType='heading', \
                          debug=debug, falseTesting=falseTesting): 
            print("heading is valid")
            # condition the validated feature with the label's feature
            feature.conditionTo(Constant(label_feature))
        else:
            print("NOT VALID: HEADING")
            return False
        
    ## Check Hard Constraint Satisfaction
    if not scenario.checkRequirements():
        return False
    
    return True 

def validateLabel(scenario, img_name, debug=False, falseTesting=False):
    ego_visibleDistance = 10 #meters
    ego_viewAngle = 135 #deg
    smt_file_path = './test_smt_encoding.smt2'
    
    resetScenarioDependency(scenario)
    
    label = nusc.get_img_data(img_name)
    (x_ego, y_ego) = label['EgoCar']['position']
    label_ego_pos = Vector(x_ego, y_ego)
    label_ego_heading = normalizeAngle(math.radians(label['EgoCar']['heading']))
    
    # Create Ego's VisibleRegion
    cached_variables = {}
    cached_variables['variables'] = []
    egoVisibleRegion = SectorRegion(label_ego_pos, ego_visibleDistance, label_ego_heading, \
                                    math.radians(ego_viewAngle))
    cached_variables['egoVisibleRegion'] = egoVisibleRegion
    
    ## Validate EgoCar label
    if not checkPosHeading(scenario.objects[0], label['EgoCar'],smt_file_path, cached_variables, \
                           debug, falseTesting):
        return False
    
    ## Validate Other agents' labels
    # create a permutation of all label's Vehicles
    permutation = permutedIndexSet(label)
    feasible_permuation = permutation
    
    # try permutations of label matching
    for p in permutation:
        if p not in feasible_permutation:
            continue
            
        (boolean, failed_permutation) = tryPermutation(p, label, scenario)
        if not boolean:
            feasible_permuation = removeInfeasiblePermutation(failed_permutation, permutation)
            # Need reset conditioned objects with wrong permutation of labels
            resetScenarioDependency(scenario)
            # reset cached_variables dictionary
            resetDictionary(cached_variables, egoVisibleRegion)
            # Ego's label is valid so keep that conditioned obj
            checkPosHeading(scenario.objects[0], label['EgoCar'], smt_file_path, cached_variables, \
                           debug, falseTesting)
        else:
            break
            
    ## Check Hard Constraint Satisfaction
    if not scenario.checkRequirements():
        return False
    return True 

def tryPermutation(permutation_tuple, label, scenario, smt_file_path, cached_variables, \
                           debug=False, falseTesting=False):
    failed_index_sequence = []
    for i in range(len(scenario.objects)-1):
        index = permutation_tuple[i]
        label_obj = label['Vehicles'][index]
        scenic_obj= scenario.objects[i+1]
        if checkPosHeading(scenic_obj, label_obj, smt_file_path, cached_variables, \
                           debug, falseTesting):
            failed_index_sequence.append(index)
        else:
            failed_index_sequence.append(index)
            return (False, failed_index_sequence)
    return (True, None)
    
def checkPosHeading(scenic_obj, label_obj, smt_file_path, cached_variables, debug=False, falseTesting=False):
    (x,y) = label_obj['position']
    label_feature = Vector(x,y)
    print("position: ", label_feature)
    feature = scenic_obj.position
    if isFeatureValid(feature, label_feature, smt_file_path,cached_variables, \
                      featureType='position', \
                      debug=debug, falseTesting=falseTesting):
        print("position is valid")
        # condition the validated feature with the label's feature
        feature.conditionTo(label_feature)
    else:
        print("NOT VALID: POSITION")
        return False

    label_feature = normalizeAngle(math.radians(label_obj['heading']))
    print("heading: ", label_feature)
    feature = scenic_obj.heading
    if isFeatureValid(feature, label_feature, smt_file_path, cached_variables, \
                      featureType='heading', \
                      debug=debug, falseTesting=falseTesting): 
        print("heading is valid")
        # condition the validated feature with the label's feature
        feature.conditionTo(Constant(label_feature))
    else:
        print("NOT VALID: HEADING")
        return False
    return True

def permutedIndexSet(label):
    index_list = [i for i in range(len(label['Vehicles']))]
    return set(itertools.permutations(index_list))

def removeInfeasiblePermutation(failed_list, permutation_set):
    infeasible_permutation = set()
    for p_tuple in permutation_set:
        if tupleStartsWith(failed_list, p_tuple):
            infeasible_permutation.add(p_tuple)
    return permutation_set.difference(infeasible_permutation)
            
def tupleStartsWith(failed_list, permutation_tuple):
    for i in range(len(failed_list)):
        if failed_list[i] != permutation_tuple[i]:
            return False
    return True

        
def resetDictionary(cached_variables, egoVisibleRegion):
    cached_variables = {}
    cached_variables['variables'] = []
    cached_variables['egoVisibleRegion'] = egoVisibleRegion
        
def resetScenarioDependency(scenario):
    for obj in scenario.objects:
        resetConditionedVar(obj.position)
        resetConditionedVar(obj.heading)

def resetConditionedVar(obj):
    obj._conditioned = obj
    if (obj._dependencies is None):
        return None
    for dep in obj._dependencies:
        resetConditionedVar(dep)
    return None

# ego_visibleDistance = 5 #meters
# ego_viewAngle = 135 #deg
# smt_file_path = './test_smt_encoding.smt2'
directory = '/Users/edwardkim/Desktop/Scenic_Query/nuscenes_data/experiment_folder/experiment'
filenames = os.listdir(directory)
print("label validity: ", validateLabel(scenario, filenames[0] ,debug=False, falseTesting=False))

position:  (1316.9486451760638 @ 1037.9575936034432)


NameError: name 'smt_file_path' is not defined

In [3]:
from scenic.simulators.carla.nusc_query_api import NuscQueryAPI
nusc = NuscQueryAPI(version='v1.0-trainval', \
                    dataroot='/Users/edwardkim/Desktop/Scenic_Query/nuscenes_data')

Loading NuScenes tables for version v1.0-trainval...
Loading nuScenes-lidarseg...
32 category,
8 attribute,
4 visibility,
64386 instance,
12 sensor,
10200 calibrated_sensor,
2631083 ego_pose,
68 log,
850 scene,
34149 sample,
2631083 sample_data,
1166187 sample_annotation,
4 map,
34149 lidarseg,
Done loading in 41.350 seconds.
Reverse indexing ...
Done reverse indexing in 13.7 seconds.


In [8]:
directory = '/Users/edwardkim/Desktop/Scenic_Query/nuscenes_data/experiment_folder/experiment'
import os
filenames = os.listdir(directory)
# print(filenames)
print((nusc.get_img_data(filenames[0])['Vehicles']))

[{'heading': 140.23800000002723, 'position': (1311.723, 1037.931), 'box': <shapely.geometry.polygon.Polygon object at 0x12f1edd00>}, {'heading': 233.8929999962449, 'position': (1302.584, 1072.481), 'box': <shapely.geometry.polygon.Polygon object at 0x12f1edcd0>}, {'heading': 232.85499999984137, 'position': (1298.531, 1074.182), 'box': <shapely.geometry.polygon.Polygon object at 0x1323e3d90>}]


In [None]:

from scenic.core.distributions import *
from scenic.core.vectors import Vector
from scenic.core.regions import SectorRegion
import math
import subprocess
import os


# # TODO : need to handle jointly dependent objs as well
# def isPositionValid(obj, label, smt_file_path, ego_info, debug=False, falseTesting=False):
    
#     # TODO : need to handle jointly dependent objs as well
#     if os.path.isfile(smt_file_path):
#         os.remove(smt_file_path)
    
#     open(smt_file_path, 'w').close()
#     writeSMTtoFile(smt_file_path, '(set-logic QF_NRA)')

#     # Instantiate cached_variables dictionary
#     cached_variables = {}
#     cached_variables['variables'] = []
#     if not falseTesting:
#         x = label.position.x
#         y = label.position.y
#         heading = ego_info['headingAngle']
#     else:
#         x = 838.6186494157367
#         y = 1541.506284067565
#         heading = -1.7422135918874386
        
#     # Create Ego's VisibleRegion
#     print("sampled position: "+ str(x)+" "+ str(y))
#     print("sampled heading : "+ str(heading))
#     ego_labelled_position = Vector(x,y)
#     ego_visibleDistance = ego_info['visibleDistance']
#     ego_viewAngle = ego_info['viewAngle']
#     ego_labelled_heading = heading
#     egoVisibleRegion = SectorRegion(ego_labelled_position, ego_visibleDistance, \
#                                     ego_labelled_heading, ego_viewAngle)
#     cached_variables['egoVisibleRegion'] = egoVisibleRegion
    
#     # instantiate smt_var
#     x_var = findVariableName(smt_file_path, cached_variables, 'x', debug=debug)
#     y_var = findVariableName(smt_file_path, cached_variables, 'y', debug=debug)
#     smt_var = (x_var, y_var)
#     writeSMTtoFile(smt_file_path, smt_assert("equal", x_var, str(x)))
#     writeSMTtoFile(smt_file_path, smt_assert("equal", y_var, str(y)))
        
#     # Encode object's position to SMT formula
#     obj.position.encodeToSMT(smt_file_path, cached_variables, smt_var, debug = debug)

#     writeSMTtoFile(smt_file_path, "(check-sat)")
#     writeSMTtoFile(smt_file_path, "(exit)")

#     if not debug:
#         if subprocess.call("./run_smt_encoding.sh") == 1:
#             return True
#         else:
#             return False
#     else:
#         print("DEBUG MODE: NO OUTPUT WILL BE RETURNED")
            
#     return None


# def testPositionSMTEncoding(scenario, obj, testNum=1, debug=False, falseTest=False):
#     ego_visibleDistance = 5 #meters
#     ego_viewAngle = 135 #deg
#     smt_file_path = './test_smt_encoding.smt2'
    
#     for i in range(testNum):
#         resetConditionedObj(scenario)
#         label = obj.sample()
#         print("label.position: ", label.position)
#         print("label.heading: ", label.heading)
        
#         ego_info = {
#             'visibleDistance' : ego_visibleDistance,
#             'viewAngle' : math.radians(ego_viewAngle), #radians,
#             'headingAngle' : label.heading #radians
#         }
        
#         valid = isPositionValid(obj, label, smt_file_path, ego_info, debug=debug, falseTesting=falseTest)
#         print(valid)
#         if not valid and not debug:
#             return False
#     return True


# valid = testPositionSMTEncoding(scenario, scenario.egoObject, 1, debug=False, falseTest=False)
# print("final: ", valid)