In [33]:
import pymongo, re
from bson import ObjectId

Connection to MongoDB

In [34]:
class Connection:
    """Class that connects to the database creating all necessary methods for connection and closing connection, as well as the base database for adding new HTS records and the string_dict collection too
    """

    def __init__(self, db_path: str):
        """_init_ function of the class, defines the connection variables

        Args:
            db_path (str): Path to the database connection on MongoDB
        """

        self.client = pymongo.MongoClient(db_path)
        self.db = self.client['hts']
        self.collection_records = self.db['hts_records']
        self.collection_string_dict = self.db['string_dict']

Definition of pipelines and criterias for pymongo queries, as well as conversion of the hts query string

-Here are the methods to process the string input from user, as well as patterns created:

In [35]:
#Patterns to clean up and organize input of HTS number from user
remove_punctuation = r'[!\"#$%&\'()*+,-./:;<=>?@\[\]\^_`{|}~—]'
gather_hts_number = [
    (
        r'(^[\d]{4})([\d]{2})([\d]{2})([\d]{2})$', 'Complete_record'
    ),
    (
        r'(^[\d]{4})([\d]{2})([\d]{2})$', 'Base_semifull'
    ),
    (
        r'(^[\d]{4})([\d]{2})$', 'Base_subrecord'
    ),
    (
        r'(^[\d]{4})$', 'Base_chapter'
    )
]

def processString(test_string: str) -> dict[str, any]:
    """Processes the string using the patterns for removing symbols and grouping the numbers according HTS syntax

    Args:
        test_string (str): Test string HTS number from user

    Returns:
        dict: Creates an object with 'type' of query and 'groups' Match object with the HTS grouped
    """

    string_no_symbols = re.sub(remove_punctuation, '', test_string)

    for pattern in gather_hts_number:

        matched_str = re.match(pattern=pattern[0], string=string_no_symbols)

        if matched_str:
            return {
                'type': pattern[1],
                'groups': matched_str
            }
        
def processGroups(processed_str: dict) -> dict[str, any]:
    """Processes the groups from the originally processed string to add the HTS numbers for further database query

    Args:
        processed_str (dict): Object with the type and groups originally gathered from user input

    Returns:
        dict: Returns an object with 'type', 'main_group' and 'sub_groups' for DB query
    """

    query_chap = processed_str['groups'].group(1)
    groups = gatherGroups(processed_str['groups'])
    
    if len(groups) == 0:
        return {
            'type': processed_str['type'],
            'main_group': query_chap
        }
    else:
        return {
            'type': processed_str['type'],
            'main_group': query_chap,
            'sub_groups': groups
        }


def gatherGroups(groups: re.Match) -> list:
    """Helper method for the processGroups() that formats the HTS subrecords adding the previous numbers for database query

    Args:
        groups (re.Match): Match object containing each individual sub_record from user initial input

    Returns:
        list: Returns a list of hts sub_records for DB query
    """

    list_of_groups = []
    previous_group = ''
    first_run = True

    for i in range(1, len(groups.groups()) + 1):

        if first_run:
            previous_group = groups.group(i)
            first_run = False
        else:
            result = previous_group + '.' + groups.group(i)
            list_of_groups.append(result)
            previous_group = previous_group + '.' + groups.group(i)

    return list_of_groups


In [36]:
def createQueryGroups(test_list: list) -> list[dict[str, any]]:
    """Main function that creates the query groups for DB query as a list of objects (for bulk query of several records)

    Args:
        test_list (list): List of strings or string in single element list containing raw input from user for hts query

    Returns:
        list: Returns a list of resulting objects\n \tThese objects are with type of query, main chapter, and sub_records if applicable for DB query, as follows \n
        {
            'type': (str),
            'main_group': (str),
            'sub_groups': (list)
        }
    """

    list_of_results = []

    for element in test_list:
        string_processed = processString(element)
        groups_processed = processGroups(string_processed)
        list_of_results.append(groups_processed)
        
    return list_of_results

Function definition for executing the queries with the Connection class

In [37]:
#Query handling.
#----------------------------------

def queryComplete(document: dict, query_sub_groups: list[dict[str, any]]) -> list[dict[str, any]]:
    """Function that performs the processing of the document queried in the hts_records collection when the query is of 'Complete_record' for each of the documents gathered

    Args:
        document (dict): Individual document inside the 'data' key in the document queried
        query_sub_groups (list): Original query sub_groups

    Returns:
        list[dict[str, any]]: Returns a list of dictionaries with the relevant records based on the sub_groups on the original query
    """

    data = document['data']

    if len(data) == 1:

        return [data[0]]

    result = [data[0]]

    for sub_group in query_sub_groups:

        for d in data:
            if d['htsno'] == sub_group:
                result.append(d)
    
    return result

def processRawQuery(raw_results: list[dict[str, any]]) -> list[dict[str, any]]:
    """Function that removes all null value keys from the original raw_query on the document gathered from the db.

    Args:
        raw_results (list[dict[str, any]]): Raw list of documents gathered on the first query of the db with the main and sub_groups

    Returns:
        list[dict[str, any]]: Returns a list of documents parsed without empty keys for the final result of query
    """

    processed_result = []

    for result in raw_results:

        new_result = {}

        for key in result.keys():

            if result[key]:
                
                new_result[key] = result[key]
        
        processed_result.append(new_result)

    return processed_result

def completeRecordArming(groups: dict, raw_results: list[dict[str, any]], processed_results: list[dict[str, any]]) -> dict:

    return {
        'query': groups,
        'raw_results': raw_results,
        'processed_results': processed_results
    }



Main function that pipelines with the previous query functions

In [39]:
def queryHTSDB(query_groups: list[dict[str, any]], hts_records: pymongo.collection.Collection) -> list[dict[str, any]]:
    """Function that handles the query on the db of the original information parsed in the query string as query_groups and handles the different case scenarios of type of query:\n
    'Complete_record', 'Base_semifull', 'Base_subrecord', 'Base_chapter'

    Args:
        query_groups (list[dict[str, any]]): Query groups parsed from initial user query
        hts_records (pymongo.collection.Collection): DB hts_record collection connection to perform query

    Returns:
        list[dict[str, any]]: Returns final list with the query results processed according to the type of query
    """

    result = []

    for group in query_groups:

        document = hts_records.find_one({'header': group['main_group']})

        if group['type'] != 'Base_chapter':

            raw_results = queryComplete(document, group['sub_groups'])

            result.append(completeRecordArming(group, raw_results, processRawQuery(raw_results)))

        elif group['type'] == 'Base_chapter':

            raw_results = queryComplete(document, group['main_group'])

            result.append(completeRecordArming(group, raw_results, processRawQuery(raw_results)))

    return result




In [40]:
query_base = ['0101']
print(createQueryGroups(query_base))

[{'type': 'Base_chapter', 'main_group': '0101'}]


Testing execution

In [41]:
connection = Connection('mongodb+srv://mrgoodkato:OflF6rW69vlr5RK5@cluster0.uv65fyv.mongodb.net/')

query_strings = ['0101.21.00.20', '0202.30.80.00', '0202.30.06.00', '3001.90.01.50', '5001.00.00.00', '0101.21.00', '0101.21', '0202.30']

result = queryHTSDB(createQueryGroups(query_base), connection.collection_records)

print(result)

for r in result:
    print()
    print(r['query'])
    print(r['raw_results'])
    print(r['processed_results'])
    print()


[{'query': {'type': 'Base_chapter', 'main_group': '0101'}, 'raw_results': [{'htsno': '0101', 'indent': 0, 'description': 'Live horses, asses, mules and hinnies:', 'superior': None, 'units': [], 'general': '', 'special': '', 'other': '', 'footnotes': [], 'quotaQuantity': '', 'additionalDuties': '', 'addiitionalDuties': None}], 'processed_results': [{'htsno': '0101', 'description': 'Live horses, asses, mules and hinnies:'}]}]

{'type': 'Base_chapter', 'main_group': '0101'}
[{'htsno': '0101', 'indent': 0, 'description': 'Live horses, asses, mules and hinnies:', 'superior': None, 'units': [], 'general': '', 'special': '', 'other': '', 'footnotes': [], 'quotaQuantity': '', 'additionalDuties': '', 'addiitionalDuties': None}]
[{'htsno': '0101', 'description': 'Live horses, asses, mules and hinnies:'}]

