In [135]:
import os
from typing import Optional, List, Dict

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, scan, streaming_bulk

In [None]:
type_map = {
    "int":"integer",
    "float":"float",
    "double":"double",
    "str": "text",
    "bool": "boolean",
    "datetime": "date",
    "list[int]":"integer",
    "list[str]":"text",
    "list[float]": "float",
    "list[double]": "double",
    "torch.tensor": "dense_vector",
    "numpy.ndarray": "dense_vector"
}

In [244]:
def traverse_map (map_dict):
    original_map_dict = dict(map_dict)
    for k, v in map_dict.items():
        if isinstance(v, dict):
            traverse_map(v)
        else:
            try:
                map_dict[k] = {"type":TYPE_MAP[v.lower()]}
            except Exception as e:
                print(f'{e.__class__.__name__}: Key {v} not found in TYPE_MAP. Mapping not updated')
                return original_map_dict
    return map_dict


In [7]:
TYPE_MAP =  {
    "int":"integer",
    "float":"float",
    "double":"double",
    "str": "text",
    "bool": "boolean",
    "datetime": "date",
    "list[int]":"integer",
    "list[str]":"text",
    "list[float]": "float",
    "list[double]": "double",
    "torch.tensor": "dense_vector",
    "numpy.ndarray": "dense_vector"
}

In [34]:
user_map = {
    "name":"str",
    "age":"int",
    "education":{
        "primary":{
            "school":"str"
        },
        "secondary":"str",
        "tertiary":"str"
    }
}

In [247]:
final_map = {"mappings":{"properties":{}}}

In [248]:
updated_map = traverse_map(user_map)

KeyError: Key meow not found in TYPE_MAP. Mapping not updated


In [249]:
updated_map

{'name': {'type': 'text'},
 'age': {'type': 'integer'},
 'education': {'primary': {'school': 'meow'},
  'secondary': {'type': 'text'},
  'tertiary': {'type': 'text'}}}

In [8]:
TYPE_MAP =  {
    "int":"integer",
    "float":"float",
    "double":"double",
    "str": "text",
    "bool": "boolean",
    "datetime": "date",
    "list[int]":"integer",
    "list[str]":"text",
    "list[float]": "float",
    "list[double]": "double",
    "torch.tensor": "dense_vector",
    "numpy.ndarray": "dense_vector"
}

In [254]:
user_map = {
    "name":"str",
    "age":"int",
    "education":{
        "primary":{
            "school":"str"
        },
        "secondary":"str",
        "tertiary":"str"
    }
}

user_map = {
    "owner_name": "str",
    "owner_id": "str",
    "pets":{
        "cats": "int",
        "dogs": "int"
    }
}

In [288]:
MAX_BULK_SIZE=10000
class DocMgr():
    
    def __init__(self):
        self.url = f"https://{os.environ.get('ELASTICSEARCH_HOST')}:{os.environ.get('ELASTICSEARCH_C_PORT')}"
        self.username = os.environ.get('ELASTIC_USERNAME')
        self.password = os.environ.get('ELASTIC_PASSWORD')
        self.client = Elasticsearch(self.url, 
                                    verify_certs=False, 
                                    basic_auth=(self.username, self.password))
        
        self.consolidated_actions=[]

    def _check_data_type(self, var, var_type):
        try:
            assert type(var)==var_type
        except:
            return False
        return True
    
        
    def _check_valid_values(self, map_dict:dict) -> int:
        """
        Traverse mapping dictionary to ensure that all types are valid types within TYPE_MAP

        Args:
            map_dict (dict): Mapping to be checked

        Returns:
            int: 0 if there is invalid types, 1 otherwise

        """
        ret_val = 1
        for k, v in map_dict.items():
            if isinstance(v, dict):
                ret_val = self._check_valid_values(v)
            else:
                if not v in TYPE_MAP:
                    print(f"'{v}' type for '{k}' NOT FOUND")
                    return 0

        return ret_val * 1
    
    def _traverse_map (self, map_dict:Dict) -> Dict:
        """
        Traverse mapping dictionary to convert data type into framework specific type

        Args:
            map_dict (dict): Mapping to be used to create ES index

        Returns:
            dict: updated mapping dictionary

        """
        dictionary ={"properties":dict()}
        for k, v in map_dict.items():
            if isinstance(v, dict):
                dictionary['properties'][k]= self._traverse_map(v)
            else:
                dictionary['properties'][k]={"type":TYPE_MAP[v]}       
        return dictionary
    
    def _flush(self):
        errors = []
        list_of_es_ids = []
        for ok, item in streaming_bulk(self.client, self.consolidated_actions):
            if not ok:
                errors.append(item)
            else:
                list_of_es_ids.append(item['index']['_id'])
        print("List of faulty documents:", errors)
        self.consolidated_actions=[] # Reset List
        return list_of_es_ids
        
    def _flatten(self, d, parent_key='', sep='.'):
        """
        Flatten nested dictionary keys to dotted parameters because Elasticsearch. 
        """
        items = []
        for k, v in d.items():
            new_key = parent_key + sep + k if parent_key else k
            if isinstance(v, collections.MutableMapping):
                items.extend(flatten(v, new_key, sep=sep).items())
            else:
                items.append((new_key, v))
        return dict(items)
    
    def create_collection(self, collection_name: str, schema: Dict) -> Dict:
        """
        Create the index on ElasticSearch

        Args:
            collection_name (str): Index name of ES
            schema (dict): Mapping to be used to create ES index

        Returns:
            dict: response of error, or 200 if no errors caught
            
        """
        if not self._check_data_type(schema, dict):
            return {"response":"Type of 'schema' is not dict"}
        if not self._check_data_type(collection_name, str):
            return {"response":"Type of 'collection_name' is not str"}

        mapping_validity = self._check_valid_values(schema)
        if not mapping_validity:
            return {"response": "KeyError: data type not found in TYPE_MAP"}
        updated_mapping = self._traverse_map(schema)
        try:
            self.client.indices.create(index=collection_name, mappings=updated_mapping)
        except Exception as e:
            return {"response":f"{e}"}
        return {"response":"200"}
    
    def delete_collection(self, collection_name: str) -> dict:
        """
        Create the index on ElasticSearch

        Args:
            collection_name (str): Index name of ES
            schema (dict): Mapping to be used to create ES index

        Returns:
            dict: response of error, or 200 if no errors caught

        """
        try:
            self.client.indices.delete(index=collection_name)
        except Exception as e:
            return {"response": f"{e}"}
        return {"response":"200"}
    
    def create_document(self, collection_name: str, documents: List[Dict], id_field: str=None) -> dict:
        """
        Upload document(s) in the specified index within ElasticSearch

        Args:
            collection_name (str): Index name of ES
            documents (list): A list of document objects to be ingested
            id_field (str, Optional): Specify the key amongst the document object to be the id field. If not specified, id will be generated by ES. 

        Returns:
            dict: response of error along with the faulty document, or code 200 along with the ids of ingested document if no errors caught

        """
        if not self._check_data_type(documents, list):
            return {"response":"Type of 'documents' is not dict"}
        if not self._check_data_type(collection_name, str):
            return {"response":"Type of 'collection_name' is not str"}
        if not id_field is None:
            if not self._check_data_type(id_field, str):
                return {"response":"Type of 'id_field' is not str"}
        
        # If id_field is specified, verify that all documents possess the id_field. 
        if id_field != None:
            for doc in documents:
                if not id_field in doc.keys():
                    print("Fix document, or set 'id_field' to None. No documents uploaded.")
                    return {"response": "Fix document, or set 'id_field' to None. No documents uploaded.",
                           "error_doc": doc}
                try:
                    doc[id_field] = str(doc[id_field])
                except Exception as e:
                    return {"response": "id cannot be casted to String type. No documents uploaded.",
                           "error_doc": doc}
        all_id = []
        for doc in documents:
            action_dict={}
            action_dict['_op_type']= 'index'
            action_dict['_index']=collection_name
            if id_field != None:
                action_dict['_id']=doc[id_field]
                doc.pop(id_field)
            action_dict['_source']=doc
            self.consolidated_actions.append(action_dict)
            if len(self.consolidated_actions) == MAX_BULK_SIZE:
                all_id = all_id+self._flush()
        
        all_id = all_id+self._flush()
        
        return {"response":"200", "ids": all_id}
    
    def delete_document(self, collection_name: str, doc_id:str) -> dict:
        """
        Delete document from index based on the specified document id. 

        Args:
            collection_name (str): Index name of ES
            doc_id (str): id of doc to be deleted

        Returns:
            dict: response of error along with the faulty document, or code 200 along with elastic API response

        """
        if not self._check_data_type(collection_name, str):
            return {"response":"Type of 'collection_name' is not str"}
        if not self._check_data_type(doc_id, str):
            return {"response":"Type of 'doc_id' is not str"}
        
        # Check for document's existence     
        search_result = self.client.search(index=collection_name, query={"match":{"_id":doc_id}})
        result_count = search_result['hits']['total']['value']
        
        if result_count == 0:
            return {"response": f"Document '{doc_id}' not found!"}
        
        try:
            resp = self.client.delete(index="meow", id=doc_id)
        except Exception as e:
            return {"response":f"{e.__class__.__name__}. Document Deletion failed"}
        
        return {"response":"200", "api_resp": resp}

    
    def update_document(self, collection_name: str, doc_id:str, properties: dict) -> dict:
        """
        Delete document from index based on the specified document id. 

        Args:
            collection_name (str): Index name of ES
            doc_id (str): id of doc to be updated
            properties (dict): key and values of fields to be updated.

        Returns:
            dict: response of error along with the faulty document, or code 200 along with elastic API response

        """
        if not self._check_data_type(collection_name, str):
            return {"response":"Type of 'collection_name' is not str"}
        if not self._check_data_type(doc_id, str):
            return {"response":"Type of 'doc_id' is not str"}
        if not self._check_data_type(properties, dict):
            return {"response":"Type of 'properties' is not dict"}
        
        
        # Check for document's existence     
        search_result = self.client.search(index=collection_name, query={"match":{"_id":doc_id}})
        result_count = search_result['hits']['total']['value']
        
        if result_count == 0:
            return {"response": f"Document '{doc_id}' not found, create document first"}
        
        try:
            resp = self.client.update(index=collection_name, id=doc_id, doc=properties)
        except Exception as e:
            return {"response":f"{e.__class__.__name__}. Document Deletion failed"}
        
        return {"response":"200", "api_resp": resp}
    
    def read_document(self, collection_name: str, doc_id:str) -> dict:
        """
        Read document from index based on the specified document id. 

        Args:
            collection_name (str): Index name of ES
            doc_id (str): id of doc to be deleted

        Returns:
            dict: response of error along with the faulty document, or code 200 along with the retrieved document

        """
        if not self._check_data_type(collection_name, str):
            return {"response":"Type of 'collection_name' is not str"}
        if not self._check_data_type(doc_id, str):
            return {"response":"Type of 'doc_id' is not str"}
        
        # Check for document's existence     
        search_result = self.client.search(index=collection_name, query={"match":{"_id":doc_id}})
        result_count = search_result['hits']['total']['value']
        
        if result_count == 0:
            return {"response": f"Document '{doc_id}' not found!"}
        
        doc_body = search_result['hits']['hits']
        
        return {"response":"200", "api_resp": doc_body}
    
    def query_collection(self, collection_name: str, field_value_dict:dict) -> dict:
        """
        Read document from index based on the specified document id. 

        Args:
            collection_name (str): Index name of ES
            field_value_dict (dict): A dictionary with the field to be queried as the key, and the value to be queried as the value of the dictionary

        Returns:
            dict: response of error along with the faulty document, or code 200 along with the list of retrieved document

        """
        if not self._check_data_type(collection_name, str):
            return {"response":"Type of 'collection_name' is not str"}
        if not self._check_data_type(field_value_dict, dict):
            return {"response":"Type of 'field_value_dict' is not dict"}
        
        # Check for document's existence
        field_value_dict = self._flatten(field_value_dict)
        search_result = self.client.search(index=collection_name, query={"match":field_value_dict})
        result_count = search_result['hits']['total']['value']
        
        if result_count == 0:
            return {"response": f"No documents found."}
        
        docs = search_result['hits']['hits']
        
        return {"response":"200", "api_resp": docs}
    
    def custom_query(self, collection_name: str, query:dict) -> dict:
        """
        Read document from index based on the specified document id. 

        Args:
            collection_name (str): Index name of ES
            query (dict): Custom query for ES users who are familiar with the query format

        Returns:
            dict: response of error along with the faulty document, or code 200 along with the list of retrieved document

        """
        if not self._check_data_type(collection_name, str):
            return {"response":"Type of 'collection_name' is not str"}
        if not self._check_data_type(query, dict):
            return {"response":"Type of 'field_value_dict' is not dict"}
        
        # Check for document's existence
        search_result = self.client.search(index=collection_name, query=query)
        result_count = search_result['hits']['total']['value']
        
        if result_count == 0:
            return {"response": f"No documents found."}
        
        docs = search_result['hits']['hits']
        
        return {"response":"200", "api_resp": docs}
    
    def get_all_documents(self, collection_name: str) -> dict:
        """
        Generator method to retrieve 

        Args:
            collection_name (str): Index name of ES
            
        Returns:
            Generator Object: Iterable object containing all documents within index specified by 
        """
        if not self._check_data_type(collection_name, str):
            return {"response":"Type of 'collection_name' is not str"}
        docs_response = scan(self.client, index=collection_name, query={"query":{"match_all":{}}})
        for item in docs_response:
            yield item
    

In [289]:
es_mgr = DocMgr()

  _transport = transport_class(


In [258]:
create_res = es_mgr.create_collection(collection_name = "meow", schema = user_map)
print(create_res)

{'response': '200'}




In [257]:
del_res = es_mgr.delete_collection("meow")
print(del_res)

{'response': '200'}




In [259]:
import random
import string

# printing lowercase

rand_data = []
user_map = {
    "owner_name": "str",
    "owner_id": "str",
    "cats": "int",
    "dogs": "int"
}

letters = string.ascii_lowercase
for i in range(1000):
    fake_data = {}
    fake_data["owner_name"] = ''.join(random.choice(letters) for i in range(10))
    fake_data['owner_id'] = i
    fake_data['pets']={}
    fake_data['pets']['cats'] = random.randint(0,5)
    fake_data['pets']['dogs'] = random.randint(0,5)
    rand_data.append(fake_data)

In [260]:
rand_data

[{'owner_name': 'lnrhlqcncv', 'owner_id': 0, 'pets': {'cats': 0, 'dogs': 3}},
 {'owner_name': 'clkwivkdhw', 'owner_id': 1, 'pets': {'cats': 3, 'dogs': 0}},
 {'owner_name': 'gstnqvfwtp', 'owner_id': 2, 'pets': {'cats': 5, 'dogs': 1}},
 {'owner_name': 'vkoodqrllo', 'owner_id': 3, 'pets': {'cats': 0, 'dogs': 0}},
 {'owner_name': 'vrowmvilat', 'owner_id': 4, 'pets': {'cats': 3, 'dogs': 4}},
 {'owner_name': 'yuxkhbetfh', 'owner_id': 5, 'pets': {'cats': 3, 'dogs': 1}},
 {'owner_name': 'ocmvaylszk', 'owner_id': 6, 'pets': {'cats': 3, 'dogs': 5}},
 {'owner_name': 'usnkgkzjyt', 'owner_id': 7, 'pets': {'cats': 0, 'dogs': 0}},
 {'owner_name': 'vpebtbldvw', 'owner_id': 8, 'pets': {'cats': 4, 'dogs': 2}},
 {'owner_name': 'asjxnxdoqn', 'owner_id': 9, 'pets': {'cats': 1, 'dogs': 0}},
 {'owner_name': 'hontqlnwoo', 'owner_id': 10, 'pets': {'cats': 1, 'dogs': 5}},
 {'owner_name': 'txmqlfkdke', 'owner_id': 11, 'pets': {'cats': 3, 'dogs': 0}},
 {'owner_name': 'qiaflgwrqr', 'owner_id': 12, 'pets': {'cats':

In [261]:

upload_res = es_mgr.create_document(collection_name = "meow", documents=rand_data, id_field='owner_name')

List of faulty documents: []




In [262]:
upload_res

{'response': '200',
 'ids': ['lnrhlqcncv',
  'clkwivkdhw',
  'gstnqvfwtp',
  'vkoodqrllo',
  'vrowmvilat',
  'yuxkhbetfh',
  'ocmvaylszk',
  'usnkgkzjyt',
  'vpebtbldvw',
  'asjxnxdoqn',
  'hontqlnwoo',
  'txmqlfkdke',
  'qiaflgwrqr',
  'doxtystqry',
  'aubrezdwhk',
  'rjavtsciam',
  'grixpvzhmj',
  'gmamwrpxuj',
  'nfprhshykn',
  'nsqpyievxs',
  'oekpkuejbc',
  'vnljigkuij',
  'ylazhhnxee',
  'blppqhutbl',
  'fqdtrlimjl',
  'fqkbtyzydj',
  'qpyxpklavp',
  'myoslmgokh',
  'pigxupacue',
  'wagrfowora',
  'tlzvopqxxy',
  'olzlgvkokh',
  'kxfeeehjds',
  'ybqgwlxjgo',
  'ivkbmsddel',
  'ljxgzhgfnb',
  'pohvjerfpt',
  'uzmlgshecx',
  'spntypcgwn',
  'nypljtytum',
  'libjncsdul',
  'ryzljmlmbk',
  'nrwwgmqrms',
  'docarprobg',
  'uytngdqfis',
  'wcgzuafwxl',
  'ykbriycorx',
  'wjbjnqjxho',
  'zvzupffxzc',
  'mvkhoncwjp',
  'gkuzbktjci',
  'rnaewdrqdm',
  'oejtmseksm',
  'ydfxouhasj',
  'jthgsmmmgn',
  'ohbqdlrqhi',
  'jivrppmnaq',
  'zycfisacxr',
  'dvsqltxgxt',
  'uozjkdgyal',
  'spmascbazv

In [225]:

delete_res = es_mgr.delete_document(collection_name = "meow", doc_id='Yzy8E4UBimE7927zEki1')



In [277]:
delete_res

{'response': '200',
 'api_resp': ObjectApiResponse({'_index': 'meow', '_id': 'Yzy8E4UBimE7927zEki1', '_version': 2, 'result': 'deleted', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1002, '_primary_term': 1})}

In [234]:
some_text = "meow"
_check_data_type(some_text, int)

False

In [266]:
es_mgr.client.update(index="meow", id='kxfeeehjds', doc={'pets':{'cats':4}})



ObjectApiResponse({'_index': 'meow', '_id': 'kxfeeehjds', '_version': 2, 'result': 'updated', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1000, '_primary_term': 1})

In [274]:
search_res['hits']['hits'][0]['_source']

{'owner_id': 32, 'pets': {'cats': 4, 'dogs': 1}}

In [280]:
es_mgr.read_document(collection_name='meow', doc_id='kxfeeehjds')



{'response': '200',
 'api_resp': [{'_index': 'meow',
   '_id': 'kxfeeehjds',
   '_score': 1.0,
   '_source': {'owner_id': 32, 'pets': {'cats': 4, 'dogs': 1}}}]}

In [292]:
es_mgr.query_collection(collection_name='meow', field_value_dict={'owner_id':3})



{'response': '200',
 'api_resp': [{'_index': 'meow',
   '_id': 'vkoodqrllo',
   '_score': 6.5042877,
   '_source': {'owner_id': 3, 'pets': {'cats': 0, 'dogs': 0}}}]}

In [286]:
import collections

def flatten(d, parent_key='', sep='.'):
    items = []
    for k, v in d.items():
        new_key = parent_key + sep + k if parent_key else k
        if isinstance(v, collections.MutableMapping):
            items.extend(flatten(v, new_key, sep=sep).items())
        else:
            items.append((new_key, v))
    return dict(items)

In [287]:
flatten({'pets':{'cats':4}})

  if isinstance(v, collections.MutableMapping):


{'pets.cats': 4}

In [300]:
docs_response = scan(es_mgr.client, index='meow', query={"query":{"match":{'pets.cats':3}}})


In [299]:
for i in docs_response:
    print(i)

{'_index': 'meow', '_id': 'clkwivkdhw', '_score': None, '_source': {'owner_id': 1, 'pets': {'cats': 3, 'dogs': 0}}, 'sort': [1]}
{'_index': 'meow', '_id': 'vrowmvilat', '_score': None, '_source': {'owner_id': 4, 'pets': {'cats': 3, 'dogs': 4}}, 'sort': [4]}
{'_index': 'meow', '_id': 'yuxkhbetfh', '_score': None, '_source': {'owner_id': 5, 'pets': {'cats': 3, 'dogs': 1}}, 'sort': [5]}
{'_index': 'meow', '_id': 'ocmvaylszk', '_score': None, '_source': {'owner_id': 6, 'pets': {'cats': 3, 'dogs': 5}}, 'sort': [6]}
{'_index': 'meow', '_id': 'txmqlfkdke', '_score': None, '_source': {'owner_id': 11, 'pets': {'cats': 3, 'dogs': 0}}, 'sort': [11]}
{'_index': 'meow', '_id': 'ylazhhnxee', '_score': None, '_source': {'owner_id': 22, 'pets': {'cats': 3, 'dogs': 0}}, 'sort': [22]}
{'_index': 'meow', '_id': 'ybqgwlxjgo', '_score': None, '_source': {'owner_id': 33, 'pets': {'cats': 3, 'dogs': 5}}, 'sort': [33]}
{'_index': 'meow', '_id': 'dvsqltxgxt', '_score': None, '_source': {'owner_id': 58, 'pets':



In [301]:
docs_response

<generator object scan at 0x7fdfb3b9a7b0>