In [1]:
''' The Jupyter notebook is a web-based notebook environment for interactive computing. '''
print(f"user permission via Search-Guard")

user permission via Search-Guard


In [2]:
from elasticsearch import Elasticsearch
import os
import json
import pandas as pd
import jsondiff
import logging
from dotenv import load_dotenv
import socket
import requests
import base64
import sys
import warnings
warnings.filterwarnings("ignore")

In [3]:
''' pip install python-dotenv'''
load_dotenv() # will search for .env file in local folder and load variables

True

In [4]:
ca_cert_path = './certs/qa13-es8-ca.pem'

In [5]:
# print(os.getenv('ES_DEV_V8_HOST'))

In [6]:
''' https://restfulapi.net/http-status-codes/ '''
http_status_code = {
    200 : 'Indicates that the request has succeeded.',
    201 : 'Indicates that the request has succeeded and a new resource has been created as a result.',
    400 : 'The server could not understand the request due to incorrect syntax. The client should NOT repeat the request without modifications.',
    401 : 'Unauthorized rquests. Insufficient permissions',
    403 : 'Unauthorized request. Insufficient permissions. The client does not have access rights to the content. ',
    500 : 'The server encountered an unexpected condition that prevented it from fulfilling the request.'
}

In [7]:
def get_headers(auth):
    ''' Elasticsearch Header '''
    ''' 
    Basic Authentication is a method for an HTTP user agent (e.g., a web browser) 
    to provide a username and password when making a request. 
    You can send the authorization header 
    when making requests and accessing to ES Cluster based on Search-Guard as X-pack. 
      
    Basic Auth : 
    {
        'Content-type': 'application/json', 
        'Authorization' : 'Basic base64.encode(id:password), 
        'Connection': 'close'
    }
    '''
    base64_encoded_user_credential = os.getenv(auth)
    print(f"base64_encoded_user_credential : {base64_encoded_user_credential}")
    print(f"User - {base64_decode_for_search_guard(base64_encoded_user_credential)}")
    headers = {
            'Content-type': 'application/json', 
            'Authorization' : 'Basic {}'.format(base64_encoded_user_credential),
            'Connection': 'close'
    }
    print(f"headers : {json.dumps(headers, indent=2)}")

    return headers

In [8]:
''' test base64 encode '''
def base64_encode_for_search_guard(id_pass):
    ''' format -> <id>:<password> '''
    encoded = '{}'.format(base64.b64encode(id_pass.encode('utf-8')).decode())
    # print(encoded)
    return encoded

In [9]:
''' test base64 decode '''
def base64_decode_for_search_guard(id_pass):
    ''' format -> <id>:<password> '''
    return base64.b64decode(id_pass).decode('utf-8')

### Search Guard API
* Check the overall health of Search Guard Plugin on the ES cluster

In [10]:
''' Search Guard Health '''
resp = requests.get(url=f"{os.getenv('ES_DEV_V8_HOST')}/_searchguard/health", headers=None, verify=False, timeout=600)
print(resp)
# print(resp.json())
# print(json.dumps(resp.json(), indent=2))
display(pd.DataFrame([resp.json()]))

<Response [200]>


Unnamed: 0,message,mode,status
0,,strict,UP


### User Permission based on Search Guard
* Role based User Permission check using requests library

In [11]:
class color:
   PURPLE = '\033[95m'
   CYAN = '\033[96m'
   DARKCYAN = '\033[36m'
   BLUE = '\033[94m'
   GREEN = '\033[92m'
   YELLOW = '\033[93m'
   RED = '\033[91m'
   BOLD = '\033[1m'
   UNDERLINE = '\033[4m'
   END = '\033[0m'

In [12]:
def add_documents(user, header, IDX):
    if user != 'es_spark':
        return
    
    body = [
                {"index" : { "_index" : IDX, "_id" : "1"}},
                {"ADDTS" : "03/09/2023 02:06:34.739993" },
                {"index" : { "_index" : IDX, "_id" : "2"}},
                {"ADDTS" : "03/09/2023 02:06:34.739993" }
    ]

    payload = '\n'.join([json.dumps(line) for line in body]) + '\n'
    # print(f"User : {user.split(':')[0]}")
    resp = requests.post(url="{}/_bulk".format(os.getenv('ES_DEV_V8_HOST')), headers=header, data=payload, verify=False, timeout=5)
    ''' check first records if user has permission to write'''
    status = resp.json()["items"][0]["index"]["status"]
    if "errors" in resp.json() and resp.json()["errors"] == True:
        # print(f"POST [{status, http_status_code.get(status, '')}] User ['{user.split(':')[0]}'] - CREATE INDEX Request : {resp.json()}")
        print(f"POST [{status, {http_status_code.get(status, '')}}] User ['{user.split(':')[0]}'] - Write Docs Request")
    else:
        # print(f"POST [{resp.status_code, http_status_code.get(resp.status_code, '')}] User ['{user.split(':')[0]}'] - CREATE INDEX Request : {resp.json()}")
        print(f"POST [{resp.status_code}, {http_status_code.get(resp.status_code, '')}] User ['{user.split(':')[0]}'] - Write Docs Request")


In [13]:
def verify_user_permission():
    ''' Verify user's permission using the API'''
    ''' User Permission with a different role for ES v.8.17.0 based on Searcch Guard '''
    # api_list = ['_cat/nodes?format=json', '_cat/aliases?format=json', '_cat/nodes?format=json', 
    #             'wx_test/_search', 'om_test/_search', 'logstash-2024.08.21/_search'
    #            ]
    api_list = ['wx_test/_search', 'om_test/_search']
    user_list = os.getenv('USER_LIST').split(",")
    for user in user_list:
        ''' Verify search permission '''
        print(f"\nUser : {user.split(':')[0]}")
        
        ''' Add header for each user '''
        header = {'Content-type': 'application/json', 
                  'Authorization' : 'Basic {}'.format(base64_encode_for_search_guard(user)), 
                  'Connection': 'close'}
        print('-'*50)

        ''' _search '''
        for api in api_list:
            ''' verify if user have the permission to write documents '''
            add_documents(user.split(':')[0], header, api.split('/')[0])
            ''' verify if user have the permissin to search '''
            resp = requests.get(url="{}/{}".format(os.getenv('ES_DEV_V8_HOST'), api), headers=header, verify=False, timeout=5)
            color_print = color.GREEN if resp.status_code == 200 else color.RED
            # print(f"{color.BOLD}{color_print} GET {[resp.status_code , http_status_code.get(resp.status_code, '')]} User [{user.split(':')[0]}], Urls [{api}]{color.END}")
            print(f"GET {[resp.status_code , http_status_code.get(resp.status_code, '')]} User [{user.split(':')[0]}], Urls [{api}]")

In [14]:
verify_user_permission()


User : es_admin
--------------------------------------------------
GET [200, 'Indicates that the request has succeeded.'] User [es_admin], Urls [wx_test/_search]
GET [200, 'Indicates that the request has succeeded.'] User [es_admin], Urls [om_test/_search]

User : es_spark
--------------------------------------------------
POST [200, Indicates that the request has succeeded.] User ['es_spark'] - Write Docs Request
GET [200, 'Indicates that the request has succeeded.'] User [es_spark], Urls [wx_test/_search]
POST [200, Indicates that the request has succeeded.] User ['es_spark'] - Write Docs Request
GET [200, 'Indicates that the request has succeeded.'] User [es_spark], Urls [om_test/_search]

User : wxusers
--------------------------------------------------
GET [200, 'Indicates that the request has succeeded.'] User [wxusers], Urls [wx_test/_search]
GET [403, 'Unauthorized request. Insufficient permissions. The client does not have access rights to the content. '] User [wxusers], Urls

### IPytest
* ipytest allows you to run Pytest in Jupyter notebooks.
* The pytest framework makes it easy to write small tests. It can be used to write various types of software tests, including unit tests and others

In [15]:
import pytest
import ipytest
''' https://github.com/chmp/ipytest?tab=readme-ov-file '''
''' https://github.com/chmp/ipytest/blob/main/Example.ipynb '''
''' https://docs.search-guard.com/latest/offline-tls-tool '''
import warnings
warnings.filterwarnings('ignore')
warnings.simplefilter('ignore')

In [16]:
''' To begin, you need to install pytest and ipytest, a tool designed to run pytest tests directly in Jupyter. Execute the following in a Jupyter cell '''
ipytest.autoconfig(coverage=True)

In [17]:
@pytest.mark.skip(reason="no way of currently testing this")
def test_pytest_skip_func():
    assert 42 == 42

In [18]:
def test_search_guard_es_cluster_func():
    ''' Check if search guard was installed in ES cluster via _searchguard api '''
    resp = requests.get(url=f"{os.getenv('ES_DEV_V8_HOST')}/_searchguard/health", headers=None, verify=False, timeout=600)
    assert resp.status_code == 200
    assert resp.json() == {
          "message": None,
          "mode": "strict",
          "status": "UP"
    }

In [19]:
def test_base64_basic_auth_func():
    # resp = es_search_guard.ping()
    # assert resp == True
    expected_encoded ='dGVzdDox'
    expected_decoded = 'test:1'
    encoded = base64_encode_for_search_guard(expected_decoded)
    assert "Basic {}".format(encoded) == 'Basic {}'.format(expected_encoded)
    assert base64_decode_for_search_guard(expected_encoded) == expected_decoded

In [20]:
''' Execute the tests using ipytest.run(). You can pass command-line arguments to control test behavior: '''
''' The pytest framework makes it easy to write small, readable tests, and can scale to support complex functional testing for applications and libraries. '''
ipytest.run('-vv')
# ipytest.run('-qq')

platform win32 -- Python 3.11.7, pytest-8.2.2, pluggy-1.5.0 -- C:\Users\euiyoung.hwang\Git_Workspace\ELK_Stack_Repo\.venv\Scripts\python.exe
cachedir: .pytest_cache
rootdir: C:\Users\euiyoung.hwang\Git_Workspace\ELK_Stack_Repo
configfile: pyproject.toml
plugins: anyio-4.8.0, cov-6.0.0
collected 3 items. [0m

[33mSKIPPED[0m (no way of currentl...)[32m [ 33%][0mfunc 
[32mPASSED[0m[32m              [ 66%][0msearch_guard_es_cluster_func 
[32mPASSED[0m[32m                    [100%][0m_basic_auth_func 

---------- coverage: platform win32, python 3.11.7-final-0 -----------
Name                                                                       Stmts   Miss  Cover
----------------------------------------------------------------------------------------------
C:\Users\euiyoung.hwang\AppData\Local\Temp\ipykernel_34208\403595424.py        4      1    75%
C:\Users\euiyoung.hwang\AppData\Local\Temp\ipykernel_34208\1132335815.py       2      1    50%
C:\Users\euiyoung.hwang\AppData\L

<ExitCode.OK: 0>