In [1301]:
''' Jupyter Notebook is an incredibly powerful tool for interactively developing and presenting data science projects.  '''
''' It allows you to run code, display the results, and add explanations, formulas, and charts all in one place.  '''

''' Script that can check for duplicate entries for TASK documents, it would be great if we could do this for all WMx/OMx ES indices.'''
''' Modify your script to check for any process name by the below field(s) to see if any duplicate data exists. '''
print(f"Check the records if index has duplicate recores")

Check the records if index has duplicate recores


In [1302]:
#!pip uninstall -y elasticsearch
#!pip install elasticsearch==7.13.0

In [1303]:
# !pip freeze

In [1304]:
!pip show elasticsearch

Name: elasticsearch
Version: 7.13.0
Summary: Python client for Elasticsearch
Home-page: https://github.com/elastic/elasticsearch-py
Author: Honza Kr√°l, Nick Lang
Author-email: honza.kral@gmail.com, nick@nicklang.com
License: Apache-2.0
Location: /home/biadmin/monitoring/jupyter_notebook/.venv/lib/python3.9/site-packages
Requires: certifi, urllib3
Required-by: 


In [1305]:
from elasticsearch import Elasticsearch
import os
import json
import pandas as pd
from dotenv import load_dotenv

In [1306]:
''' pip install python-dotenv'''
load_dotenv() # will search for .env file in local folder and load variables 

True

In [1307]:
def get_headers():
    ''' Elasticsearch Header '''
    return {
            'Content-type': 'application/json', 
            'Authorization' : '{}'.format(os.getenv('BASIC_AUTH')),
            'Connection': 'close'
    }

In [1308]:
def get_es_instance(host):
    es_client = Elasticsearch(hosts="http://{}".format(host), headers=get_headers(), timeout=5,  verify_certs=False)
    return es_client

# es_obj_s_client = get_es_instance("localhost:9200")

In [1309]:
# resp = es_obj_s_client.cluster.health()
# print(json.dumps(resp, indent=2))

In [1310]:
es_host_duplicates = {}

In [1311]:
def json_value_to_transform_trim(raw_json):
    ''' update value in the form of json format'''
    # print(f"raw_json : {raw_json}")
    def get_recursive_nested_all(d):
        # print(f"get_recursive_nested_all : {d}")
        if isinstance(d, list):
            for i in d:
                get_recursive_nested_all(i)
        elif isinstance(d, dict):
            for k, v in d.items():
                if not isinstance(v, (list, dict)):
                    # print("%%%%", k, v)
                    d[k] = v
                else:
                    # print("####", k, v)
                    get_recursive_nested_all(v)
        return d

    return get_recursive_nested_all(raw_json)

In [1312]:
def query_handler(process):
    ''' check dsl for any duplicates '''
    '''https://stackoverflow.com/questions/53076349/script-writing-to-get-distinct-value-from-elasticsearch '''
    ''' https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-terms-aggregation.html#search-aggregations-bucket-terms-aggregation-script '''
    ''' So you can see in the results as how key are constructed (keys are unique). '''
    if process == 'sample':
        query = {
          "size": 0,
          "aggs": {
            "duplicates": {
              "terms": {
                    "field": "TASKID",
                    "min_doc_count": 2,
                    "size" : 10000  
                }
             }
          }
        }
    elif process == 'wx_task':
        query = {
          "size": 0,
          "aggs": {
            "duplicates": {
              "terms": {
                "script": {
                  "source": "doc['TASKID'].value",
                  "lang": "painless",
                  "params": {
                    "param": ","
                  }
                },
                "min_doc_count": 2,
                "size" : 10000
              }
            }
          }
        }
    elif process == 'wx_asn':
         query = {
          "size": 0,
          "aggs": {
            "duplicates": {
              "terms": {
                "script": {
                  "source": "doc['ASNKEY.keyword'].value + params.param + doc['SITEID.keyword'].value",
                  "lang": "painless",
                  "params": {
                    "param": ","
                  }
                },
                "min_doc_count": 2,
                "size" : 10000
              }
            }
          }
        }
    elif process == 'wx_orderdtl':
        query = {
          "size": 0,
          "aggs": {
            "duplicates": {
              "terms": {
                "script": {
                  "source": "doc['ORDERKEY.keyword'].value + params.param + doc['ORDERLINENO'].value + params.param + doc['SITEID.keyword'].value",
                  "lang": "painless",
                  "params": {
                    "param": ","
                  }
                },
                "min_doc_count": 2,
                "size" : 10000
              }
            }
          }
        }
    elif process == 'wx_inv_hold':
        query = {
          "size": 0,
          "aggs": {
            "duplicates": {
              "terms": {
                "script": {
                  "source": "doc['INVHOLDID'].value",
                  "lang": "painless",
                  "params": {
                    "param": ","
                  }
                },
                "min_doc_count": 2,
                "size" : 10000
              }
            }
          }
        }
    elif process == 'wx_inv_holdtrans':
        query = {
          "size": 0,
          "aggs": {
            "duplicates": {
              "terms": {
                "script": {
                  "source": "doc['INVHOLDTRANSID'].value",
                  "lang": "painless",
                  "params": {
                    "param": ","
                  }
                },
                "min_doc_count": 2,
                "size" : 10000
              }
            }
          }
        }
    elif process == 'om_appointment':
        query = {
          "size": 0,
          "aggs": {
            "duplicates": {
              "terms": {
                "script": {
                  "source": "doc['CRN'].value",
                  "lang": "painless",
                  "params": {
                    "param": ","
                  }
                },
                "min_doc_count": 2,
                "size" : 10000
              }
            }
          }
        }
    return process, query

In [1313]:
def check_duplicates_tasks(env, process):
    ''' check duplicates'''
    
    # print('\n', env)
    # print(os.getenv("{}_ES_HOST".format(str(env).upper())))
    es_host = "{}_ES_HOST".format(str(env).upper())
    dataframe_column.append(es_host)
    dataframe_process.append(process)

    if 'wx' in process:
        dataframe_db.append('WMx DB')
    elif 'om' in process:
        dataframe_db.append('OMx DB')
    # print(es_host)
    
    ''' instance '''
    es_obj_s_client = get_es_instance(os.getenv(es_host))
    dataframe_es_client.append(es_obj_s_client)

    ''' return index_name and query '''
    index_name, query = query_handler(process)
    
    response = es_obj_s_client.search(index=index_name, body=query)
    # print(json.dumps(response, indent=2))
    
    duplicates_list = response['aggregations']['duplicates']['buckets']
    
    # print(f"total duplicates : {json.dumps(len(duplicates_list), indent=2)}")
    # print(json.dumps(duplicates_list, indent=2))
    # lookup = json_value_to_transform_trim(response['aggregations'])
    # print(f"lookup - type(lookup) : {type(lookup)}, lookup : {lookup}")

    es_host_duplicates.update({es_host : len(duplicates_list)})
    dataframe_value.append(len(duplicates_list))

    # return dataframe_dict

In [1314]:
# env_list = ['prod1','prod2','prod3','prod4','prod6','prod7','prod8','prod9','prod10','prod12','prod13','prod14','prod16','prod17','prod18','prod19','prod20']
env_list = ['prod1','prod2']
dataframe_dict = {}
dataframe_column, dataframe_process, dataframe_es_client, dataframe_value, dataframe_db = [], [], [], [], []

''' Script that can check for duplicate entries for TASK documents, it would be great if we could do this for all WMx/OMx ES indices.'''
''' Modify your script to check for any process name by the below field(s) to see if any duplicate data exists. '''
# process_name = ['wx_task', 'wx_inv_hold', 'wx_inv_holdtrans', 'om_appointment', 'wx_asn']
process_name = ['wx_task', 'wx_asn', 'wx_orderdtl']
for env in env_list:
    print(f"Progressing for {env} ..")
    for each_index in process_name:
        check_duplicates_tasks(env, each_index)
# print('\n Duplicate records')
# print(json.dumps(dataframe_dict, indent=2))

Progressing for prod1 ..
Progressing for prod2 ..


In [1315]:
''' update dict for dataframe '''
dataframe_dict.update({'ES_Cluster' : dataframe_column})
dataframe_dict.update({'ES URL' : dataframe_es_client})
dataframe_dict.update({'DB' : dataframe_db})
dataframe_dict.update({'Process_Name' : dataframe_process})
dataframe_dict.update({'Duplicates_Count' : dataframe_value})

In [1316]:
df = pd.DataFrame(dataframe_dict)
# display(df)

In [1317]:
# writing to Excel
# df.to_csv("duplicate.csv")
# print("Export csv files successfully..")

In [1318]:
''' filter if any duplicate data exist in df dataframe '''
# display(df.filter(items=['ES_Cluster', 'Process_Name'], axis=1))

' filter if any duplicate data exist in df dataframe '

In [1319]:
''' check if any duplicate data exist in df dataframe '''
df = df[df['Duplicates_Count'] > 0]
display(df)

Unnamed: 0,ES_Cluster,ES URL,DB,Process_Name,Duplicates_Count
