# Generalized validation class


## Version - 3

In [79]:
import geopandas
import numpy as np
import threading
import time
import json

class ConfigValidator:
    def __init__(self, config, shapefile):
        self.config = config
        self.shapefile = shapefile
        self.gdf = geopandas.read_file(shapefile)
        # self.st = time.time()
        # for i in range(10):
            # self.gdf = self.gdf.append(self.gdf)
        # for i in range(4):
        #     self.gdf = self.gdf.append(self.gdf)
        # self.gdf = self.gdf.append(self.gdf)
        self.now = time.time()        
        # print(len(self.gdf), self.now - self.st)
        self.NUM_THREADS = 4
        batch_size = len(self.gdf)//self.NUM_THREADS
        self.gdf_batches = [self.gdf[i:len(self.gdf) if i == self.NUM_THREADS-1 else i+batch_size] for i in range(0, len(self.gdf), batch_size)]

    def validate_config_structure(self):
        config = self.config
        if 'attributes' not in config: 
            raise ValueError('Invalid Config - Property "attributes" not found')
        if 'geometry' not in config:
            raise ValueError('Invalid Config - Property "geometry" not found')
        if 'dtypes' in config['attributes']:
            valid_types = ['int', 'int64', 'float', 'double', 'text', 'objectID', 'date', 'json']
            for key in config['attributes']['dtypes'].keys():
                if key not in valid_types:
                    raise ValueError(f'Invalid Config - invalid dtype - "{key}"')
        # we can have more validations on this, like valid functions, valid featurename lists etc 
        # not needed as config will be generated from template or stored

    def parallel_execution(self, function, *args):
        threads = []
        results = [True]*self.NUM_THREADS

        def worker(*args):
            results[args[0]] = function(*args)

        for i in range(self.NUM_THREADS):
            thread = threading.Thread(target=worker, args=(i,) + args)
            threads.append(thread)

        # start the threads
        for thread in threads:
            thread.start()

        # wait for all threads to finish
        for thread in threads:
            thread.join()

        return all(results)

    def dtypes_validation(self):
        config = self.config
        if 'dtypes' in config['attributes']:
            # gdf = self.gdf
            # read shapefile in geopandas - dtype in pandas - object, int64, float64, datetime64, bool
            shapefile_dtypes = self.gdf.dtypes
            # mp to map standard types to pandas types
            mp = {
                'int' : np.dtype('int'),
                'int64' : np.dtype('int64'),
                'float' : np.dtype('float'),
                'float64' : np.dtype('float64'),
                'double' : np.dtype('float'),
                'text' : np.dtype('object_'),
                'objectID' : np.dtype('object_'),
                'date' : np.dtype('datetime64')
            }
            for dtype in config['attributes']['dtypes']:
                for featurename in config['attributes']['dtypes'][dtype]:
                    if(dtype == 'json'):
                        continue
                    if(shapefile_dtypes[featurename] != mp[dtype]):
                        raise ValueError(f'Invalid data type for {featurename}, should be {mp[dtype]} but is {shapefile_dtypes[featurename]}')
    
    # TODO rows where it is failing
    def check_json_structure(self, val):
        try:
            json.loads(val)
            return True
        except:
            return False

    def json_structure_validation(self, thread_id, featurename):
        if(self.gdf_batches[thread_id][featurename].apply(self.check_json_structure).all() == False):
            return False
        return True
    
    def json_validation(self):
        config = self.config
        if 'dtypes' in config['attributes']:
            if 'json' in config['attributes']['dtypes']:
                for featurename in config['attributes']['dtypes']['json']:
                    if(self.parallel_execution(self.json_structure_validation, featurename)==False):
                        raise ValueError(f'Invalid json found in feature - {featurename}')

    def inclusive_range_validation(self, thread_id, featurename):
        # lower <= all_vals <= upper 
        bounds = self.config['attributes']['ranges']['inclusive'][featurename]
        lower, upper = bounds[0], bounds[1]

        filtered_gdf = self.gdf_batches[thread_id][(self.gdf_batches[thread_id][featurename].notnull()) & ((self.gdf_batches[thread_id][featurename] < lower) | (self.gdf_batches[thread_id][featurename] > upper))]
        if(len(filtered_gdf) > 0):
            return False
        return True

    def exclusive_range_validation(self, thread_id, featurename):
        # all_vals < lower or all_vals > upper 
        bounds = self.config['attributes']['ranges']['exclusive'][featurename]
        lower, upper = bounds[0], bounds[1]
        
        filtered_gdf = self.gdf_batches[thread_id][(self.gdf_batches[thread_id][featurename].notnull()) & ((self.gdf_batches[thread_id][featurename] >= lower) & (self.gdf_batches[thread_id][featurename] <= upper))]
        if(len(filtered_gdf) > 0):
            return False
        return True
        
    def ranges_validation(self):
        config = self.config
        if 'ranges' in config['attributes']:
            if 'inclusive' in config['attributes']['ranges']:
                # for all featurename in inclusive, we parallelly execute validator function 
                for featurename in config['attributes']['ranges']['inclusive'].keys():
                    if(self.parallel_execution(self.inclusive_range_validation, featurename) == False):
                        bounds = self.config['attributes']['ranges']['inclusive'][featurename]
                        lower, upper = bounds[0], bounds[1]
                        raise ValueError(f'Invalid value for {featurename}, value outside [{lower},{upper}] found')

            if 'exclusive' in config['attributes']['ranges']:
                # for all featurename in exclusive, we parallelly execute validator function 
                for featurename in config['attributes']['ranges']['exclusive'].keys():
                    if(self.parallel_execution(self.exclusive_range_validation, featurename) == False):
                        bounds = self.config['attributes']['ranges']['exclusive'][featurename]
                        lower, upper = bounds[0], bounds[1]
                        raise ValueError(f'Invalid value for {featurename}, value found in range [{lower}, {upper}]')

    def equal_value_validation(self, thread_id, featurename):
        # all_vals = val
        val = self.config['attributes']['values']['equal'][featurename]
        
        filtered_gdf = self.gdf_batches[thread_id][(self.gdf_batches[thread_id][featurename].notnull()) & (self.gdf_batches[thread_id][featurename] != val)]
        if(len(filtered_gdf) > 0):
            return False
        return True

    def not_equal_value_validation(self, thread_id, featurename):
        # all_vals != val
        val = self.config['attributes']['values']['not_equal'][featurename]
        
        filtered_gdf = self.gdf_batches[thread_id][(self.gdf_batches[thread_id][featurename].notnull()) & (self.gdf_batches[thread_id][featurename] == val)]
        if(len(filtered_gdf) > 0):
            return False
        return True
        
    def values_validation(self):
        config = self.config
        if 'values' in config['attributes']:
            if 'equal' in config['attributes']['values']:
                for featurename in config['attributes']['values']['equal'].keys():
                    if(self.parallel_execution(self.equal_value_validation, featurename) == False):
                        val = self.config['attributes']['values']['equal'][featurename]
                        raise ValueError(f'Invalid value for {featurename}, value found not equal to {val}')

            if 'not_equal' in config['attributes']['values']:
                for featurename in config['attributes']['values']['not_equal'].keys():
                    if(self.parallel_execution(self.not_equal_value_validation, featurename) == False):
                        val = self.config['attributes']['values']['not_equal'][featurename]
                        raise ValueError(f'Invalid value for {featurename}, value found equal to {val}')

    def inclusive_subset_validation(self, thread_id, featurename):
        # all_vals belongs to vals
        vals = self.config['attributes']['subsets']['inclusive'][featurename]
        # print(self.gdf_batches[thread_id][featurename])
        filtered_gdf = self.gdf_batches[thread_id][(self.gdf_batches[thread_id][featurename].notnull()) & (~self.gdf_batches[thread_id][featurename].isin(vals))]
        if(len(filtered_gdf) > 0):
            return False
        return True
        
    def exclusive_subset_validation(self, thread_id, featurename):
        # all_vals not belongs to vals
        vals = self.config['attributes']['subsets']['exclusive'][featurename]
        
        filtered_gdf = self.gdf_batches[thread_id][(self.gdf_batches[thread_id][featurename].notnull()) & (self.gdf_batches[thread_id][featurename].isin(vals))]
        if(len(filtered_gdf) > 0):
            return False
        return True
    
    def subsets_validation(self):
        config = self.config
        if 'subsets' in config['attributes']:
            if 'inclusive' in config['attributes']['subsets']:
                for featurename in config['attributes']['subsets']['inclusive'].keys():
                    if(self.parallel_execution(self.inclusive_subset_validation, featurename) == False):
                        vals = self.config['attributes']['subsets']['inclusive'][featurename]
                        raise ValueError(f'Invalid value for {featurename}, value found which does not belong to the {vals}')

            if 'exclusive' in config['attributes']['subsets']:
                for featurename in config['attributes']['subsets']['exclusive'].keys():
                    if(self.parallel_execution(self.exclusive_subset_validation, featurename) == False):
                        vals = self.config['attributes']['subsets']['exclusive'][featurename]
                        raise ValueError(f'Invalid value for {featurename}, value found which belongs to the {vals}')
        
    def not_null_validation(self):
        config = self.config
        if 'not_null' in config['attributes']:
            for featurename in config['attributes']['not_null']:
                if self.gdf[featurename].isnull().any():
                    raise ValueError(f'Invalid value for {featurename}, null value found')
    
    def create_function(self, code):
        func_dict = {}
        exec(code, globals(), func_dict)
        return func_dict['fun']

    def run_attributes_check_functions_validation(self, thread_id, featurename):
        funcs = self.config['attributes']['check_functions'][featurename]
        for func in funcs:
            if(self.gdf_batches[thread_id][featurename].apply(self.create_function(func)).all() == False):
                return False
        return True
    
    def attributes_check_functions_validation(self):
        config = self.config
        if 'check_functions' in config['attributes']:
            for featurename in config['attributes']['check_functions'].keys():
                if(self.parallel_execution(self.run_attributes_check_functions_validation, featurename) == False):
                    raise ValueError(f'Invalid value for {featurename} - A function failed')

    def crs_validation(self):
        config = self.config
        if 'crs' in config['geometry']:
            if(str(self.gdf.crs) != config['geometry']['crs']):
                raise ValueError(f'Invalid crs {str(self.gdf.crs)} found')
            
    def geometry_types_validation(self):
        config = self.config
        # workaround for now...
        if 'types' in config['geometry']:
            valid_types = config['geometry']['types']
            types_found = set(self.gdf.geom_type)
            for type in types_found:
                if type not in valid_types:
                    raise ValueError(f'Invalid geometry type {type} found, it should be from {valid_types}')

    def run_geometry_check_functions_validation(self, thread_id, featurename = 'geometry'):
        funcs = self.config['attributes']['check_functions']
        for func in funcs:
            if(self.gdf_batches[thread_id][featurename].apply(self.create_function(func)).all() == False):
                return False
        return True

    def geometry_check_function_validation(self):
        config = self.config
        if 'check_functions' in config['geometry']:
            if(self.parallel_execution(self.run_geometry_check_functions_validation) == False):
                raise ValueError(f'Invalid value for geometry - A function failed')

    def validate(self):
        self.validate_config_structure()
        #### ATTRIBUTES ####
        # dtypes validation,
        self.dtypes_validation()
        # json validation,
        self.json_validation()
        # ranges validation,
        # self.ranges_validation()
        # # values validation,
        # self.values_validation()
        # # subsets validation, (considered for only belonging condition)
        # self.subsets_validation()
        # # not_null validation,
        # self.not_null_validation()
        # # check_functions validation, (run function for all values in feature, all must be true)
        # self.attributes_check_functions_validation()
        
        # #### GEOMETRY VALIDATION ####
        # # crs validation,
        # self.crs_validation()
        # # types validation
        # self.geometry_types_validation()
        # # check_functions validation
        # self.geometry_check_function_validation()
        print('end', time.time() - self.now)



### Example - testing

In [80]:
config = {
    'attributes' : {
        'dtypes' : {
            'int64' : ['Division'],
            'text' : ['FeatureNam', 'FeatureSta', 'Condition', 'Visible', 'Legible', 'Reflective',
                      'images', 'bboxes', 'geohash', 'fpath', 'RouteName', 'StreetName', 'UUID', 
                      'RouteMaint', 'RouteID', 'BeginFeatu', 'EndFeature', 'MaintCnt', 'LocCntyC', 
                      'RouteCla', 'RouteInv', 'Direction', 'TravelDir', 'UniqueID', 'SyncID'],
            'float' : ['lat', 'lon', 'MPLength', 'Length', 'Width', 'Area', 'BeginMp1', 'EndMp1',
                       'MaxMp1', 'Shape_Leng'],
            'json' : ['bboxes', 'images', 'Length']
        },
        'ranges' : {
            'inclusive' : {
                'lat' : [0, 100],
                'lon' : [-100, 100],
                'Division' : [0, 100],
                'Length' : [0, 10000],
                'Width' : [0, 10000],
                'BeginMp1' : [0, 10000],
                'EndMp1' : [0, 10000],
                'MaxMp1' : [0, 10000],
                'MPLength' : [0, 10000],
            },
            'exclusive' : {
                'lat' : [100, 100],
                'lon' : [100, 100],
                'Division' : [100, 100],
                'Length' : [10000, 10000],
                'Width' : [10000, 10000],
                'BeginMp1' : [10000, 10000],
                'EndMp1' : [10000, 10000],
                'MaxMp1' : [10000, 10000],
                'MPLength' : [10000, 10000],
            }
        },
        'values' : {
            'equal' : {
                'RouteMaint' : 'System'
            },
            'not_equal' : {
                'FeatureNam' : '-1',
                'FeatureSta' : '-1', 
                'Condition' : '-1', 
                'Visible' : '-1', 
                'Legible' : '-1', 
                'Reflective' : '-1',
                'images' : '-1',
                'bboxes' : '-1', 
                'geohash' : '-1', 
                'fpath' : '-1', 
                'RouteName' : '-1', 
                'StreetName' : '-1', 
                'UUID' : '-1', 
                'RouteMaint' : '-1', 
                'RouteID' : '-1', 
                'BeginFeatu' : '-1', 
                'EndFeature' : '-1', 
                'MaintCnt' : '-1', 
                'LocCntyC' : '-1', 
                'RouteCla' : '-1', 
                'RouteInv' : '-1', 
                'Direction' : '-1', 
                'TravelDir' : '-1', 
                'UniqueID' : '-1', 
                'SyncID' : '-1',
                'EndFeature' : '-1', 
                'MaintCnt' : '-1', 
                'LocCntyC' : '-1', 
                'RouteCla' : '-1', 
                'RouteInv' : '-1', 
                'Direction' : '-1', 
                'TravelDir' : '-1', 
                'UniqueID' : '-1', 
                'SyncID' : '-1',
            }
        },
        'subsets' : {
            'inclusive' : {
                'Condition' : ['good', 'damaged'],
                'Visible' : ['0', '1'],
                'Legible' : ['0', '1'],
                'Reflective' : ['0', '1'],
            },
            'exclusive' : {
                'FeatureNam' : ['a', 'b'],
                'FeatureSta' : ['a', 'b'], 
                'Condition' : ['a', 'b'], 
                'Visible' : ['a', 'b'], 
                'Legible' : ['a', 'b'], 
                'Reflective' : ['a', 'b'],
                'images' : ['a', 'b'],
                'bboxes' : ['a', 'b'], 
                'geohash' : ['a', 'b'], 
                'fpath' : ['a', 'b'], 
                'RouteName' : ['a', 'b'], 
                'StreetName' : ['a', 'b'], 
                'UUID' : ['a', 'b'], 
                'RouteMaint' : ['a', 'b'], 
                'RouteID' : ['a', 'b'], 
                'BeginFeatu' : ['a', 'b'], 
                'EndFeature' : ['a', 'b'], 
                'MaintCnt' : ['a', 'b'], 
                'LocCntyC' : ['a', 'b'], 
                'RouteCla' : ['a', 'b'], 
                'RouteInv' : ['a', 'b'], 
                'Direction' : ['a', 'b'], 
                'TravelDir' : ['a', 'b'], 
                'UniqueID' : ['a', 'b'], 
                'SyncID' : ['a', 'b'],
            }
        },
        'not_null' : ['lat', 'lon', 'images', 'bboxes', 'geohash', 'fpath', 'Length', 'Width', 'Area'], #much more
        'check_functions' : {
            'lat' : [
                    '''def fun(val):
                        return val >= 0 and val <= 100
                    ''',
                    # '''def fun(val):
                    #     return val < 35
                    # '''
                    ]
        }
    },
    'geometry' : {
        'types' : ['Point']
    }
}
validator = ConfigValidator(config=config, shapefile='../shapefile/point.shp')
validator.validate()

Exception in thread Exception in thread Thread-9109 (worker):
Traceback (most recent call last):
  File "/opt/homebrew/Cellar/python@3.11/3.11.2_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/threading.py", line 1038, in _bootstrap_inner
Exception in thread Thread-9107 (worker):
Traceback (most recent call last):
  File "/opt/homebrew/Cellar/python@3.11/3.11.2_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/threading.py", line 1038, in _bootstrap_inner
Thread-9108 (worker):
Traceback (most recent call last):
  File "/opt/homebrew/Cellar/python@3.11/3.11.2_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/threading.py", line 1038, in _bootstrap_inner
    self.run()
  File "/opt/homebrew/Cellar/python@3.11/3.11.2_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/threading.py", line 975, in run
  result = super().__getitem__(key)
    self.run()
  File "/opt/homebrew/Cellar/python@3.11/3.11.2_1/Frameworks/Python.framework/Versions/3.11/lib/python3.1

end 0.040538787841796875


## Version 2
#### num_threads as argument now and parallellism on featurenames list..

#### changes on version 1 - num_threads as argument, and included validation helper function to achieve parallellism on featurenames list i.e. concurrently apply validation on featurenames in chunks


In [None]:
import geopandas
import numpy as np
import threading
import time
class ConfigValidator:
    def __init__(self, config, shapefile):
        self.config = config
        self.shapefile = shapefile
        self.gdf = geopandas.read_file(shapefile)
        self.st = time.time()
        for i in range(12):
            self.gdf = self.gdf.append(self.gdf)
        # for i in range(4):
        #     self.gdf = self.gdf.append(self.gdf)
        # self.gdf = self.gdf.append(self.gdf)
        self.now = time.time()        
        print(len(self.gdf), self.now - self.st)

    def validate_config_structure(self):
        config = self.config
        if 'attributes' not in config: 
            raise ValueError('Invalid Config - Property "attributes" not found')
        if 'geometry' not in config:
            raise ValueError('Invalid Config - Property "geometry" not found')
        if 'dtypes' in config['attributes']:
            valid_types = ['int', 'int64', 'float', 'double', 'text', 'objectID', 'date']
            for key in config['attributes']['dtypes'].keys():
                if key not in valid_types:
                    raise ValueError(f'Invalid Config - invalid dtype - "{key}"')
        # we can have more validations on this, like valid functions, valid featurename lists etc 
        # not needed as config will be generated from template or stored

    def parallel_execution(self, num_threads, function, *args):
        threads = []
        for i in range(num_threads):
            thread = threading.Thread(target=function, args=(num_threads, i,) + args)
            threads.append(thread)

        # start the threads
        for thread in threads:
            thread.start()

        # wait for all threads to finish
        try:
            for thread in threads:
                thread.join()
        except:
            for thread in threads:
                thread._stop()
             
    def dtypes_validation(self):
        config = self.config
        if 'dtypes' in config['attributes']:
            # gdf = self.gdf
            # read shapefile in geopandas - dtype in pandas - object, int64, float64, datetime64, bool
            shapefile_dtypes = self.gdf.dtypes
            # mp to map standard types to pandas types
            mp = {
                'int' : np.dtype('int'),
                'int64' : np.dtype('int64'),
                'float' : np.dtype('float'),
                'float64' : np.dtype('float64'),
                'double' : np.dtype('float'),
                'text' : np.dtype('object_'),
                'objectID' : np.dtype('object_'),
                'date' : np.dtype('datetime64')
            }
            for dtype in config['attributes']['dtypes']:
                for featurename in config['attributes']['dtypes'][dtype]:
                    if(shapefile_dtypes[featurename] != mp[dtype]):
                        raise ValueError(f'Invalid data type for {featurename}, should be {mp[dtype]} but is {shapefile_dtypes[featurename]}')

    # this is to achieve parallellism in list featurenames.. run for batches parallelly.. 
    def validation_helper(self, num_threads, thread_id, function, featurenames):
        batch_size = len(featurenames)//num_threads
        l = thread_id * batch_size
        r = min(len(featurenames), l + batch_size) - 1
        # 7, 4 - 1,1,1,4 
        if(thread_id == num_threads - 1):
            r = len(featurenames) - 1 
        for ind in range(l, r + 1):
            featurename = featurenames[ind]
            function(1, 0, featurename)
            # self.parallel_execution(1, function, featurename)
        pass

    def inclusive_range_validation(self, num_threads, thread_id, featurename):
        # lower <= all_vals <= upper 
        # gdf = self.gdf
        batch_size = len(self.gdf)//num_threads
        # thread_id represents l to r in gdf
        l = thread_id * batch_size
        r = min(len(self.gdf), l + batch_size) - 1
        if(thread_id == num_threads - 1):
            r = len(self.gdf) - 1
        bounds = self.config['attributes']['ranges']['inclusive'][featurename]
        lower, upper = bounds[0], bounds[1]
        filtered_gdf = self.gdf[(self.gdf.index >= l) & (self.gdf.index <= r) & (self.gdf[featurename].notnull()) & ((self.gdf[featurename] < lower) | (self.gdf[featurename] > upper))]
        if(len(filtered_gdf) > 0):
            raise ValueError(f'Invalid value for {featurename}, value outside [{lower}, {upper}] found')

    def exclusive_range_validation(self, num_threads, thread_id, featurename):
        # all_vals < lower or all_vals > upper 
        # gdf = self.gdf
        batch_size = len(self.gdf)//num_threads
        # thread_id represents l to r in gdf
        l = thread_id * batch_size
        r = min(len(self.gdf), l + batch_size) - 1
        if(thread_id == num_threads - 1):
            r = len(self.gdf) - 1
        bounds = self.config['attributes']['ranges']['exclusive'][featurename]
        lower, upper = bounds[0], bounds[1]
        filtered_gdf = self.gdf[(self.gdf.index >= l) & (self.gdf.index <= r) & (self.gdf[featurename].notnull()) & ((self.gdf[featurename] >= lower) & (self.gdf[featurename] <= upper))]
        if(len(filtered_gdf) > 0):
            raise ValueError(f'Invalid value for {featurename}, value found in range [{lower}, {upper}]')

    def ranges_validation(self):
        config = self.config
        if 'ranges' in config['attributes']:
            if 'inclusive' in config['attributes']['ranges']:
                self.parallel_execution(
                    4,
                    self.validation_helper,
                    self.inclusive_range_validation,
                    list(config['attributes']['ranges']['inclusive'].keys())
                    )

            if 'exclusive' in config['attributes']['ranges']:
                self.parallel_execution(
                    4,
                    self.validation_helper,
                    self.exclusive_range_validation,
                    list(config['attributes']['ranges']['exclusive'].keys())
                    )
                for featurename in config['attributes']['ranges']['exclusive'].keys():
                    self.parallel_execution(1, self.exclusive_range_validation, featurename)

    def equal_value_validation(self, num_threads, thread_id, featurename):
        # all_vals = val
        # gdf = self.gdf
        batch_size = len(self.gdf)//num_threads
        # thread_id represents l to r in gdf
        l = thread_id * batch_size
        r = min(len(self.gdf), l + batch_size) - 1
        if(thread_id == num_threads - 1):
            r = len(self.gdf) - 1
        val = self.config['attributes']['values']['equal'][featurename]
        filtered_gdf = self.gdf[(self.gdf.index >= l) & (self.gdf.index <= r) & (self.gdf[featurename].notnull()) & (self.gdf[featurename] != val)]
        if(len(filtered_gdf) > 0):
            raise ValueError(f'Invalid value for {featurename}, value found not equal to {val}')

    def not_equal_value_validation(self, num_threads, thread_id, featurename):
        # all_vals != val
        # gdf = self.gdf
        batch_size = len(self.gdf)//num_threads
        # thread_id represents l to r in gdf
        l = thread_id * batch_size
        r = min(len(self.gdf), l + batch_size) - 1
        if(thread_id == num_threads - 1):
            r = len(self.gdf) - 1
        val = self.config['attributes']['values']['not_equal'][featurename]
        filtered_gdf = self.gdf[(self.gdf.index >= l) & (self.gdf.index <= r) & (self.gdf[featurename].notnull()) & (self.gdf[featurename] == val)]
        if(len(filtered_gdf) > 0):
            raise ValueError(f'Invalid value for {featurename}, value found equal to {val}')
    
    def values_validation(self):
        config = self.config
        if 'values' in config['attributes']:
            if 'equal' in config['attributes']['values']:
                self.parallel_execution(
                    4,
                    self.validation_helper,
                    self.equal_value_validation,
                    list(config['attributes']['values']['equal'].keys())
                    )

            if 'not_equal' in config['attributes']['values']:
                self.parallel_execution(
                    4,
                    self.validation_helper,
                    self.not_equal_value_validation,
                    list(config['attributes']['values']['not_equal'].keys())
                    )
    
    def inclusive_subset_validation(self, num_threads, thread_id, featurename):
        # all_vals belongs to vals
        # gdf = self.gdf
        batch_size = len(self.gdf)//num_threads
        # thread_id represents l to r in gdf
        l = thread_id * batch_size
        r = min(len(self.gdf), l + batch_size) - 1
        if(thread_id == num_threads - 1):
            r = len(self.gdf) - 1
        vals = self.config['attributes']['subsets']['inclusive'][featurename]
        filtered_gdf = self.gdf[(self.gdf.index >= l) & (self.gdf.index <= r) & (self.gdf[featurename].notnull()) & (~self.gdf[featurename].isin(vals))]
        if(len(filtered_gdf) > 0):
            raise ValueError(f'Invalid value for {featurename}, value found which does not belong to the {vals}')
        
    def exclusive_subset_validation(self, num_threads, thread_id, featurename):
        # all_vals not belongs to vals
        # gdf = self.gdf
        batch_size = len(self.gdf)//num_threads
        # thread_id represents l to r in gdf
        l = thread_id * batch_size
        r = min(len(self.gdf), l + batch_size) - 1
        if(thread_id == num_threads - 1):
            r = len(self.gdf) - 1
        vals = self.config['attributes']['subsets']['exclusive'][featurename]
        filtered_gdf = self.gdf[(self.gdf.index >= l) & (self.gdf.index <= r) & (self.gdf[featurename].notnull()) & (self.gdf[featurename].isin(vals))]
        if(len(filtered_gdf) > 0):
            raise ValueError(f'Invalid value for {featurename}, value found which belongs to the {vals}')

    def subsets_validation(self):
        config = self.config
        if 'subsets' in config['attributes']:
            if 'inclusive' in config['attributes']['subsets']:
                self.parallel_execution(
                    4,
                    self.validation_helper,
                    self.inclusive_subset_validation,
                    list(config['attributes']['subsets']['inclusive'].keys())
                    )

            if 'exclusive' in config['attributes']['subsets']:
                self.parallel_execution(
                    4,
                    self.validation_helper,
                    self.exclusive_subset_validation,
                    list(config['attributes']['subsets']['exclusive'].keys())
                    )
        
    def not_null_validation(self):
        config = self.config
        if 'not_null' in config['attributes']:
            for featurename in config['attributes']['not_null']:
                if self.gdf[featurename].isnull().any():
                    raise ValueError(f'Invalid value for {featurename}, null value found')

    def attributes_check_functions_validation(self):
        config = self.config
        if 'check_functions' in config['attributes']:
            pass

    def crs_validation(self):
        config = self.config
        if 'crs' in config['geometry']:
            if(str(self.gdf.crs) != config['geometry']['crs']):
                raise ValueError(f'Invalid crs {str(self.gdf.crs)} found')
            
    def geometry_types_validation(self):
        config = self.config
        # workaround for now...
        if 'types' in config['geometry']:
            valid_types = config['geometry']['types']
            types_found = set(self.gdf.geom_type)
            for type in types_found:
                if type not in valid_types:
                    raise ValueError(f'Invalid geometry type {type} found, it should be from {valid_types}')
            
    def geometry_check_function_validation(self):
        config = self.config
        if 'check_functions' in config['geometry']:
            pass

    def validate(self):
        self.validate_config_structure()
        #### ATTRIBUTES ####
        # dtypes validation,
        self.dtypes_validation()
        # ranges validation,
        self.ranges_validation()
        # values validation,
        self.values_validation()
        # subsets validation, (considered for only belonging condition)
        self.subsets_validation()
        # not_null validation,
        self.not_null_validation()
        # check_functions validation, (run function for all values in feature, all must be true)
        self.attributes_check_functions_validation()
        
        #### GEOMETRY VALIDATION ####
        # crs validation,
        self.crs_validation()
        # types validation
        self.geometry_types_validation()
        # check_functions validation
        self.geometry_check_function_validation()
        print('end', time.time() - self.now)

    

## Version 1 - no threading

In [6]:
import geopandas
import numpy as np
import threading
import time


class ConfigValidator:
    def __init__(self, config, shapefile):
        self.config = config
        self.shapefile = shapefile
        self.gdf = geopandas.read_file(shapefile)
        self.st = time.time()
        for i in range(12):
            self.gdf = self.gdf.append(self.gdf)
        # for i in range(4):
        #     self.gdf = self.gdf.append(self.gdf)
        # self.gdf = self.gdf.append(self.gdf)
        self.now = time.time()
        print(len(self.gdf), self.now - self.st)
        self.NUM_THREADS = 1

    def validate_config_structure(self):
        config = self.config
        if 'attributes' not in config:
            raise ValueError(
                'Invalid Config - Property "attributes" not found')
        if 'geometry' not in config:
            raise ValueError('Invalid Config - Property "geometry" not found')
        if 'dtypes' in config['attributes']:
            valid_types = ['int', 'int64', 'float',
                           'double', 'text', 'objectID', 'date']
            for key in config['attributes']['dtypes'].keys():
                if key not in valid_types:
                    raise ValueError(
                        f'Invalid Config - invalid dtype - "{key}"')
        # we can have more validations on this, like valid functions, valid featurename lists etc
        # not needed as config will be generated from template or stored

    def parallel_execution(self, function, *args):
        threads = []
        for i in range(self.NUM_THREADS):
            thread = threading.Thread(target=function, args=(i,) + args)
            threads.append(thread)

        # start the threads
        for thread in threads:
            thread.start()

        # wait for all threads to finish
        try:
            for thread in threads:
                thread.join()
        except:
            for thread in threads:
                thread._stop()

    def dtypes_validation(self):
        config = self.config
        if 'dtypes' in config['attributes']:
            # gdf = self.gdf
            # read shapefile in geopandas - dtype in pandas - object, int64, float64, datetime64, bool
            shapefile_dtypes = self.gdf.dtypes
            # mp to map standard types to pandas types
            mp = {
                'int': np.dtype('int'),
                'int64': np.dtype('int64'),
                'float': np.dtype('float'),
                'float64': np.dtype('float64'),
                'double': np.dtype('float'),
                'text': np.dtype('object_'),
                'objectID': np.dtype('object_'),
                'date': np.dtype('datetime64')
            }
            for dtype in config['attributes']['dtypes']:
                for featurename in config['attributes']['dtypes'][dtype]:
                    if (shapefile_dtypes[featurename] != mp[dtype]):
                        raise ValueError(
                            f'Invalid data type for {featurename}, should be {mp[dtype]} but is {shapefile_dtypes[featurename]}')

    def inclusive_range_validation(self, thread_id, featurename):
        # lower <= all_vals <= upper
        # gdf = self.gdf
        batch_size = len(self.gdf)//self.NUM_THREADS
        # thread_id represents l to r in gdf
        l = thread_id * batch_size
        r = min(len(self.gdf), l + batch_size) - 1
        if (thread_id == self.NUM_THREADS - 1):
            r = len(self.gdf) - 1
        bounds = self.config['attributes']['ranges']['inclusive'][featurename]
        lower, upper = bounds[0], bounds[1]
        filtered_gdf = self.gdf[(self.gdf.index >= l) & (self.gdf.index <= r) & (
            self.gdf[featurename].notnull()) & ((self.gdf[featurename] < lower) | (self.gdf[featurename] > upper))]
        if (len(filtered_gdf) > 0):
            raise ValueError(
                f'Invalid value for {featurename}, value outside [{lower}, {upper}] found')

    def exclusive_range_validation(self, thread_id, featurename):
        # all_vals < lower or all_vals > upper
        # gdf = self.gdf
        batch_size = len(self.gdf)//self.NUM_THREADS
        # thread_id represents l to r in gdf
        l = thread_id * batch_size
        r = min(len(self.gdf), l + batch_size) - 1
        if (thread_id == self.NUM_THREADS - 1):
            r = len(self.gdf) - 1
        bounds = self.config['attributes']['ranges']['exclusive'][featurename]
        lower, upper = bounds[0], bounds[1]
        filtered_gdf = self.gdf[(self.gdf.index >= l) & (self.gdf.index <= r) & (
            self.gdf[featurename].notnull()) & (self.gdf[featurename] >= lower) & (self.gdf[featurename] <= upper)]
        if (len(filtered_gdf) > 0):
            raise ValueError(
                f'Invalid value for {featurename}, value found in range [{lower}, {upper}]')

    def ranges_validation(self):
        config = self.config
        if 'ranges' in config['attributes']:
            if 'inclusive' in config['attributes']['ranges']:
                # for all featurename in inclusive, we parallelly execute validator function
                for featurename in config['attributes']['ranges']['inclusive'].keys():
                    self.parallel_execution(
                        self.inclusive_range_validation, featurename)

            if 'exclusive' in config['attributes']['ranges']:
                # for all featurename in exclusive, we parallelly execute validator function
                for featurename in config['attributes']['ranges']['exclusive'].keys():
                    self.parallel_execution(
                        self.exclusive_range_validation, featurename)

    def equal_value_validation(self, thread_id, featurename):
        # all_vals = val
        # gdf = self.gdf
        batch_size = len(self.gdf)//self.NUM_THREADS
        # thread_id represents l to r in gdf
        l = thread_id * batch_size
        r = min(len(self.gdf), l + batch_size) - 1
        if (thread_id == self.NUM_THREADS - 1):
            r = len(self.gdf) - 1
        val = self.config['attributes']['values']['equal'][featurename]
        filtered_gdf = self.gdf[(self.gdf.index >= l) & (self.gdf.index <= r) & (
            self.gdf[featurename].notnull()) & (self.gdf[featurename] != val)]
        if (len(filtered_gdf) > 0):
            raise ValueError(
                f'Invalid value for {featurename}, value found not equal to {val}')

    def not_equal_value_validation(self, thread_id, featurename):
        # all_vals != val
        # gdf = self.gdf
        batch_size = len(self.gdf)//self.NUM_THREADS
        # thread_id represents l to r in gdf
        l = thread_id * batch_size
        r = min(len(self.gdf), l + batch_size) - 1
        if (thread_id == self.NUM_THREADS - 1):
            r = len(self.gdf) - 1
        val = self.config['attributes']['values']['not_equal'][featurename]
        filtered_gdf = self.gdf[(self.gdf.index >= l) & (self.gdf.index <= r) & (
            self.gdf[featurename].notnull()) & (self.gdf[featurename] == val)]
        if (len(filtered_gdf) > 0):
            raise ValueError(
                f'Invalid value for {featurename}, value found equal to {val}')

    def values_validation(self):
        config = self.config
        if 'values' in config['attributes']:
            if 'equal' in config['attributes']['values']:
                for featurename in config['attributes']['values']['equal'].keys():
                    self.parallel_execution(
                        self.equal_value_validation, featurename)

            if 'not_equal' in config['attributes']['values']:
                for featurename in config['attributes']['values']['not_equal'].keys():
                    self.parallel_execution(
                        self.not_equal_value_validation, featurename)

    def inclusive_subset_validation(self, thread_id, featurename):
        # all_vals belongs to vals
        # gdf = self.gdf
        batch_size = len(self.gdf)//self.NUM_THREADS
        # thread_id represents l to r in gdf
        l = thread_id * batch_size
        r = min(len(self.gdf), l + batch_size) - 1
        if (thread_id == self.NUM_THREADS - 1):
            r = len(self.gdf) - 1
        vals = self.config['attributes']['subsets']['inclusive'][featurename]
        filtered_gdf = self.gdf[(self.gdf.index >= l) & (self.gdf.index <= r) & (
            self.gdf[featurename].notnull()) & (~self.gdf[featurename].isin(vals) & (self.gdf[featurename] != None))]
        if (len(filtered_gdf) > 0):
            raise ValueError(
                f'Invalid value for {featurename}, value found which does not belong to the {vals}')

    def exclusive_subset_validation(self, thread_id, featurename):
        # all_vals not belongs to vals
        # gdf = self.gdf
        batch_size = len(self.gdf)//self.NUM_THREADS
        # thread_id represents l to r in gdf
        l = thread_id * batch_size
        r = min(len(self.gdf), l + batch_size) - 1
        if (thread_id == self.NUM_THREADS - 1):
            r = len(self.gdf) - 1
        vals = self.config['attributes']['subsets']['exclusive'][featurename]
        filtered_gdf = self.gdf[(self.gdf.index >= l) & (self.gdf.index <= r) & (
            self.gdf[featurename].notnull()) & (self.gdf[featurename].isin(vals))]
        if (len(filtered_gdf) > 0):
            raise ValueError(
                f'Invalid value for {featurename}, value found which belongs to the {vals}')

    def subsets_validation(self):
        config = self.config
        if 'subsets' in config['attributes']:
            if 'inclusive' in config['attributes']['subsets']:
                for featurename in config['attributes']['subsets']['inclusive'].keys():
                    self.parallel_execution(
                        self.inclusive_subset_validation, featurename)

            if 'exclusive' in config['attributes']['subsets']:
                for featurename in config['attributes']['subsets']['exclusive'].keys():
                    self.parallel_execution(
                        self.exclusive_subset_validation, featurename)

    def not_null_validation(self):
        config = self.config
        if 'not_null' in config['attributes']:
            for featurename in config['attributes']['not_null']:
                if self.gdf[featurename].isnull().any():
                    raise ValueError(
                        f'Invalid value for {featurename}, null value found')

    def attributes_check_functions_validation(self):
        config = self.config
        if 'check_functions' in config['attributes']:
            pass

    def crs_validation(self):
        config = self.config
        if 'crs' in config['geometry']:
            if (str(self.gdf.crs) != config['geometry']['crs']):
                raise ValueError(f'Invalid crs {str(self.gdf.crs)} found')

    def geometry_types_validation(self):
        config = self.config
        # workaround for now...
        if 'types' in config['geometry']:
            valid_types = config['geometry']['types']
            types_found = set(self.gdf.geom_type)
            for type in types_found:
                if type not in valid_types:
                    raise ValueError(
                        f'Invalid geometry type {type} found, it should be from {valid_types}')

    def geometry_check_function_validation(self):
        config = self.config
        if 'check_functions' in config['geometry']:
            pass

    def validate(self):
        self.validate_config_structure()
        #### ATTRIBUTES ####
        # dtypes validation,
        self.dtypes_validation()
        # ranges validation,
        self.ranges_validation()
        # values validation,
        self.values_validation()
        # subsets validation, (considered for only belonging condition)
        self.subsets_validation()
        # not_null validation,
        self.not_null_validation()
        # check_functions validation, (run function for all values in feature, all must be true)
        self.attributes_check_functions_validation()

        #### GEOMETRY VALIDATION ####
        # crs validation,
        self.crs_validation()
        # types validation
        self.geometry_types_validation()
        # check_functions validation
        self.geometry_check_function_validation()
        print('end', time.time() - self.now)


## Comments
#### Experiment 1
Time taken is expected to be higher than we got as reading a shapefile in geopandas may take time

// bigger config - 

version, num of rows, additional time to increase rows, end - total time taken for validation 

V1 - 7225344 1.3997633457183838
end 19.2275447845459 

V2 - 7225344 1.4392359256744385
end 25.137534856796265

V3 - 7225344 1.1619951725006104
end 17.309794902801514
#### Experiment 2
//smaller config - 

V3 - 28901376 8.447794914245605
end 24.983617067337036

V2 - 28901376 8.805611848831177
end 49.451152324676514

V1 - 28901376 8.536412954330444
end 36.259761095047
#### Experiment 3
//bigger config -

V3 - 28901376 8.579703092575073
end 88.4816601276397

V2 - 4 minutes

V1 - 95 sec
#### Experiment 4
//bigger

V3 - 3612672 0.56266188621521
end 8.773582220077515

V2 - 3612672 0.560664176940918
end 10.410000085830688

V1 - 3612672 0.5555088520050049
end 9.491483211517334
#### Experiment 5
//smaller

V3 - 3612672 0.5471899509429932
end 2.008450031280517

V2 - 3612672 0.5653510093688965
end 2.100717067718506

V1 - 3612672 0.5493388175964355
end 2.0935730934143066

#### Version 4 is a mix of v2 and v3, v3 is small optimisation of v1, only v3 is complete (json structure validation remains)

In [7]:
# General validation config for a shapefile 
config = {
    'attributes' : {
        'dtypes' : {
            'int' : ['featurename'],
            'int64' : [],
            'float' : [],
            'float64' : [],
            'double' : [],
            'text' : [],
            'objectID' : [],
            'date' : []
        },
        'ranges' : {
            # all values
            'inclusive' : {
                'featurename' : ['lower', 'uppper'], #lower <= val <= upper
                # ...
                # can have mulitple ranges
            },
            'exclusive' : {
                'featurename' : ['lower', 'upper'], #val < lower, val > upper
                # ...
                # can have multiple ranges
            }
        },
        'values' : {
            'equal' : {
                'featurename' : 'val', 
                # ...
            },
            'not_equal' : {
                'featurename' : 'val', 
                # ...
            }
        },
        'subsets' : {
            # all values
            'inclusive' : {
                'featurename' : ['values'],
                # ...
            },
            'exclusive' : {
                'featurename' : ['values'],
                # ...
            }
        },
        'not_null' : ['features which must not have null, missing values'],
        # 'null' : ['features which can be null'],
        # custom functions for complex checks like, for town = oldtown, all featurestatus should be active, something like that
        'check_functions' : {
            'featurename' : ['function definitions in python maybe']
        }
    },
    'geometry' : {
        # there will be only one column for geometry, which can be of type point, polygon, etc
        'crs' : '',
        'types' : ['Point'],
        'check_functions' : []
        # it depends, maybe area, distance, intersection pairwise, number of overlaps, it can be anything
        # which is passed as functions, so will it be a good idea? 
    }
}