### Step-1 Get Access to Raw Error Log Dataset

In [0]:
# spark get access to filesystem's authorization 
access_key = ""
spark.conf.set("fs.azure.account.key.emmadatalakesa.dfs.core.windows.net", access_key)

In [0]:
# load error log raw data from specified error log path with schma app:string, timestamp: YYYY-MM-dd, log_content: string
# our training is daily model training, so, timestamp is YYYY-MM-dd  
error_log_path = "abfss://idm@emmadatalakesa.dfs.core.windows.net/error_log/"
display(dbutils.fs.ls(error_log_path))

path,name,size,modificationTime
abfss://idm@emmadatalakesa.dfs.core.windows.net/error_log/error_log,error_log,3092,1688130446000


### Step-2 Load Raw Error Log Dataset from FileSystem to Dataframe

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# app: string, 
# timestamp: string,
# log_content: string
error_log_schema = StructType(fields = [
    StructField("app", StringType(), False),
    StructField("timestamp", StringType(), True),
    StructField("log_content", StringType(), True)
])

In [0]:
error_log_df = spark.read.option("header", True)\
    .schema(error_log_schema) \
    .csv(error_log_path) 
# after load dataset by specific schema, print schema and print data from dataframe 
error_log_df.printSchema()
error_log_df = error_log_df.filter(error_log_df.app == 'idm')
display(error_log_df)

root
 |-- app: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- log_content: string (nullable = true)



app,timestamp,log_content
idm,2023/6/30,Invalid user name/password or user account is locked or expired
idm,2023/6/30,Invalid credential with user name and password
idm,2023/6/30,DB CONNECTION URL IS :jdbc:postgresql://usg4-smax-rds-prod.clkmmx1qw2cg.us-gov-west-1.rds.amazonaws.com:5432/itom-cdf-idm?ssl=true&sslmode=verify-full&sslfactory=com.hp.ccue.identity.installer.factory.PostgresFactory&sslrootcert=/tmp/dbCerts.crt\nUsing user: cdfidm\nExceptions thrown: SSL error: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target\nException found when creating the connection and try again after 5 seconds\nChecking time left -2000 milli seconds\n\nFail to execute command! [ sh idm.sh -h ] can show the usage of the CLI\nStarting db build @ Mon May 23 23:52:00 UTC 2022\n2022-05-23T23:52:00.909160500+00:00 INFO DATABASE_CONNECTION_URL is not defined.\n2022-05-23T23:52:00.926541625+00:00 INFO build DB with ADMIN_DATABASE_CONNECTION_URL as jdbc:postgresql://usg4-smax-rds-prod.clkmmx1qw2cg.us-gov-west-1.rds.amazonaws.com:5432/postgres?ssl=true&sslmode=verify-full&sslfactory=com.hp.ccue.identity.database.postgresql.PostgresFactory&sslrootcert=/tmp/dbCerts.crt\n2022-05-23T23:52:00.939900353+00:00 INFO Start to build postgres schema...\nBuild postgres...\nSSL error: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target\n\nFail to execute command! [ sh idm.sh -h ] can show the usage of the CLI\n
idm,2023/6/30,IdM pods don't restart automatically after a broken database recovers
idm,2023/6/30,sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
idm,2023/6/30,ERROR: value too long for type character varying(255)
idm,2023/6/30,The request token is invalid
idm,2023/6/30,SMAL errors during login The request token is invalid
idm,2023/6/30,The request token is invalid. It may have already been used or expired. (Code: 40002)
idm,2023/6/30,ERROR [https-abcd-efg-8443-exec-41] com.company.ccue.identity.saml2.authentication.IdmSamlAuthenticationProvider [] - Validate assertion - An error occurred while validate the assertion: Authentication statement is too old to be used with value


### Step -3 Load Json(embedded) Datasets from Local file to Redis

#### step 3-1 Install Python 3rd Libraries

In [0]:
%pip install redis > nohup
%pip install jsonpickle > nohup 

Python interpreter will be restarted.
Python interpreter will be restarted.
Python interpreter will be restarted.
Python interpreter will be restarted.


#### step 3-2 Execute Python Scripts to Load Error Diagnoise Related Datasets to Redis Hash Set 

##### solutions_dataset.json 
* Dataset Description: solutions_dataset.json stores different errors' solutoin steps and causes
* Redis Description: data will be stored in Redis's hashset with hash_name = `solutions`, key = `solutoin_id`, value = `solution_content`.

##### error_keyword_dataset.json 
* Dataset Description: error_keyword_dataset.json stores different error and exceptoin logs' keywrods that are generated from different applications' product environment. 
* Redis: error_keyword_dataset.json will be stored in Redis's hashset with hash_name=`error_keyword`, key = `error_keyword_id`, value = `error_keyword_content`.

##### solution_error_keyword_mapping.json
* Dataset Description: solution_error_keyword_mapping.json stores `solution` and `error_keyword` mapping relationships. In ER-Diagram Design they are Multiple to Multiple. 
* Redis: solution_error_keyword_mapping.json will be stored in Redis's hashset with hash_name=`solution_error_keyword_mapping`, key = `error_keyword_id`, value = `{s_id_1, s_id_2, s_id_3, ...}`

In [0]:
import json 
import redis 
import jsonpickle

def get_redis_client(host, port, password):
    redis_client = redis.Redis(host = host, port = port, db = 0, password = password, ssl=True)
    if redis_client is not None: 
        print("Success retrieve redis client")
    else:
        print("Failed to retrieve redis client")

def load_solution_dataset(solution_dataset_path):
    json_data = None 
    with open(solution_dataset_path, 'r') as dataset_file:
        json_data = json.load(dataset_file)
    
    print(f"#load_solution_dataset ret json_data non-none status: {json_data is not None}")
    return json_data

def clean_hash_table(redis_client, hash_name):
    if redis_client is None:
        print(f"#clean_hash_table with hash_name {hash_name} recv redis handler is None return!")
        return 
    redis_client.delete(hash_name)

def traverse_hash_table(redis_client, hash_name):
    if redis_client is None:
        print(f"traverse_hash_table with hash_name {hash_name} recv redis handler is None return!")
        return 
    k_set = redis_client.hkeys(hash_name)
    if len(k_set) == 0:
        print(f"traverse_hash_table with hash_name {hash_name} got empty key set, return!")
        return 
    for key in k_set:
        value = redis_client.hget(hash_name, key)
        print(f"traverse_hash_table key: {key}, value: {value}")


def insert_data_to_redis(redis_client, json_data_arr, hash_name):
    print(f"#insert_data_to_redis recv redis_client non-none status: {redis_client is not None}\n json_data_arr non-none status {json_data_arr is not None}\n, hash_name non-none status: {hash_name is not None}")
    
    if redis_client is None:
        print("#insert_data_to_redis recv redis client is None, return!")
        return 
    
    if json_data_arr is None:
        print("#insert_data_to_redis recv json_data_arr is None, return!")
        return 
    
    if hash_name is None:
        print("#insert_data_to_redis recv hash_name is None, return!")
        return 
    
    w_cnt = 0
    r_cnt = 0
    for item in json_data_arr:
        index = str(item["id"])
        json_data = json.dumps(item)
        w_cnt += 1
        print(f"write to redis with index {index}, with value len {len(json_data)}")
        redis_client.hset(hash_name, index, json_data)
        value = redis_client.hget(hash_name, index)
        if value is not None:
            r_cnt += 1
        #print(f"#insert_data_to_redis retrieve with hash_name: {hash_name}, index: {index}, value={value}")    
    return (r_cnt, w_cnt)
  
def insert_keyword_solutions_mapping(redis_client, json_data_arr, hash_name):
    print(f"insert_keyword_solutions_mapping recv redis_client non-none status: {redis_client is not None}\njson_data_arr non-none status: {json_data_arr is not None}\nhash_name non-none status {hash_name is not None}")

    # hash_name = 'keyword_solutions_mapping'
    # key: keyword_id 
    # value: set(solution_id)
    for item in json_data_arr: 
        # step-1: find by given hash_name && keyword_id to get the hash set or create a new has set 
        # step-2: append current solution_id to the find/new created hash set 
        # step-3: save the key & value pair to redis (key: keyword_id, value: hash set(solution_id_1, solution_id_2,...))

        # s_id: solution id, k_id: error_keyword id
        # store key: k_id, value: {s_id_1, s_id_2,}
        if item is None or item['s_id'] is None or item['k_id'] is None:
            print(f"insert_keyword_solutions_mapping recv invalid item ignore and continue")
            continue 
        s_id = item['s_id']
        k_id = item['k_id']
        ret_set_str = redis_client.hget(hash_name, k_id)
        ret_set = None
        if ret_set_str is None:
            ret_set = set()
        else:
            print(f"hash set is not none {ret_set_str}")
            ret_set = jsonpickle.decode(ret_set_str)
            print(f"hash set str is converted into set {type(ret_set)}")
        
        ret_set.add(s_id)
        set_json_str = jsonpickle.encode(ret_set)
        print(f"#insert_keyword_solutions_mapping write key: {k_id}, value: {set_json_str} to redis")
        redis_client.hset(hash_name, k_id, set_json_str)
    print(f"insert_keyword_solutions_mapping finish && return")
    return 

def insert_keyword_modules_mapping(redis_client, json_data_arr, hash_name):
    print(f"insert_keyword_modules_mapping recv redis_client non-none status: {redis_client is not None}\njson_data_arr non-none status: {json_data_arr is not None}\nhash_name non-none status {hash_name is not None}")

    # hash_name = 'keyword_modules_mapping'
    # key: keyword_id
    # value: set(module_id)
    for item in json_data_arr:
        # step-1: find by given hash_name && keyword_id to get the hash set or create a new hash set  
        # {"m_id": "xxx", "k_id": "xxx"}
        m_id = item['m_id']
        k_id = item['k_id']
        print (f"#insert_keyword_modules_mapping recv m_id:{m_id}, k_id: {k_id}\n")
        if m_id is not None and k_id is not None:
            print(f"#insert")
        else:
            print(f"#insert_keyword_modules_mapping")
        #>>  redis_client.hget(hash_name, )
        # step-2: append current module_id to the find/new create hahs set
        # step-3: save the key & value pair to tredis (key: keyword_id, value: hash set(module_id_1, module_id_2, ...))
        print (item)

    print("insert_keyword_modules_mapping finish && return")
    return

def write_solution_dataset_to_redis():
    print("BEGIN write_solution_dataset_to_redis ")
    # redis hash table specific hash_name 
    hash_name = 'solutions'
    
    # dataset local directory  
    local_dataset_path ='/Workspace/idm_chatbot/dataset/solutions_dataset.json'

    # here we convert json data content into json array 
    json_data_arr = load_solution_dataset(local_dataset_path)
    
    # redis basic configuration 
    redis_host = ""
    redis_port = 6380
    redis_password = "="
    
    # create redis client by given redis configuration items 
    redis_client = redis.Redis(host = redis_host, port = redis_port, db = 0, password = redis_password, ssl=True)
    print(f"redis_client non-null status {redis_client is not None}")

    # write json item to correspoinding hash table with specific hash name 
    (r_cnt, w_cnt) = insert_data_to_redis(redis_client, json_data_arr, hash_name=hash_name)
    
    # print final data results 
    print(f"we total write to redis with hash value [{hash_name}] total write count: {w_cnt}, success read count {r_cnt}")

    # do not forget close redis client connection 
    redis_client.close()
    print("END write_solution_dataset_to_redis ")


def write_error_keyword_dataset_to_redis():
    print("BEGIN write_error_keyword_dataset_to_redis ")
    hash_name = 'error_keyword'
    local_dataset_path = '/Workspace/idm_chatbot/dataset/error_keyword_dataset.json'

    # here we convert json data content into json array 
    json_data_arr = load_solution_dataset(local_dataset_path)

    # redis basic configuration 
    redis_host = ""
    redis_port = 6380
    redis_password = ""

    # create redis client by given redis configuration items 
    redis_client = redis.Redis(host = redis_host, port = redis_port, db = 0, password = redis_password, ssl=True)
    print(f"redis_client non-null status {redis_client is not None}")

    # write json items to corresponding hash table with speicfic hash name 
    (r_cnt, w_cnt) = insert_data_to_redis(redis_client, json_data_arr, hash_name=hash_name)

    # do not forget close redis client connection 
    redis_client.close()
    print("END write_error_keyword_dataset_to_redis ")

def write_error_solution_mapping_to_redis():
    print("BEGIN write_solution_dataset_to_redis ")
    hash_name = "solution_error_keyword_mapping"
    local_dataset_path = '/Workspace/idm_chatbot/dataset/solution_error_keyword_mapping.json'

    # here we convert json data content into json array 
    json_data_arr = load_solution_dataset(local_dataset_path)

    # redis basic configuration 
    redis_host = "hostname"
    redis_port = 6380
    redis_password = "password"

    # create redis client by given redis configuration items 
    redis_client = redis.Redis(host = redis_host, port = redis_port, db = 0, password = redis_password, ssl=True)
    print(f"redis_client non-null status {redis_client is not None}")

    # traverse each item in json arrary and insert each item to redis by given specific hash_name 
    insert_keyword_solutions_mapping(redis_client, json_data_arr, hash_name=hash_name)

    # do not forget close redis client connection 
    redis_client.close()
    print("END write_solution_dataset_to_redis ")

#print("write_solution_dataset_to_redis")
#write_solution_dataset_to_redis()

#print("write_error_keyword_dataset_to_redis write data")
#write_error_keyword_dataset_to_redis()

print("write_error_solution_mapping_to_redis")
#write_error_solution_mapping_to_redis()

hash_table_name = 'solution_error_keyword_mapping'
redis_host = ""
redis_port = 6380
redis_password = "="
redis_client = redis.Redis(host = redis_host, port = redis_port, db = 0, password = redis_password, ssl=True)

print(f"traverse redis hash table with name {hash_table_name}")
traverse_hash_table(redis_client, hash_table_name)
print(f"clean redis hash table with name {hash_table_name}")
clean_hash_table(redis_client, hash_table_name)

print(f"traverse redis hash table with name {hash_table_name}")
traverse_hash_table(redis_client, hash_table_name)


write_error_solution_mapping_to_redis
traverse redis hash table with name solution_error_keyword_mapping
traverse_hash_table key: b'1', value: b'{"py/set": ["1", "2"]}'
traverse_hash_table key: b'2', value: b'{"py/set": ["1", "2"]}'
traverse_hash_table key: b'3', value: b'{"py/set": ["1", "2"]}'
traverse_hash_table key: b'4', value: b'{"py/set": ["1", "2"]}'
traverse_hash_table key: b'5', value: b'{"py/set": ["1", "2"]}'
traverse_hash_table key: b'6', value: b'{"py/set": ["1", "2"]}'
traverse_hash_table key: b'7', value: b'{"py/set": ["1", "2"]}'
traverse_hash_table key: b'8', value: b'{"py/set": ["1", "2"]}'
traverse_hash_table key: b'9', value: b'{"py/set": ["1", "2"]}'
traverse_hash_table key: b'10', value: b'{"py/set": ["3"]}'
traverse_hash_table key: b'11', value: b'{"py/set": ["3"]}'
traverse_hash_table key: b'12', value: b'{"py/set": ["3"]}'
traverse_hash_table key: b'13', value: b'{"py/set": ["3"]}'
traverse_hash_table key: b'14', value: b'{"py/set": ["3"]}'
traverse_hash_table

#### 3-3 Read Dataset from Redis to DataFrame

%md 
#### step 3-2 Execute Python Scripts to Load Error Diagnoise Related Datasets to Redis Hash Set 

##### solutions_dataset.json 
* Dataset Description: solutions_dataset.json stores different errors' solutoin steps and causes
* Redis Description: data will be stored in Redis's hashset with hash_name = `solutions`, key = `solutoin_id`, value = `solution_content`.

##### error_keyword_dataset.json 
* Dataset Description: error_keyword_dataset.json stores different error and exceptoin logs' keywrods that are generated from different applications' product environment. 
* Redis: error_keyword_dataset.json will be stored in Redis's hashset with hash_name=`error_keyword`, key = `error_keyword_id`, value = `error_keyword_content`.

##### solution_error_keyword_mapping.json
* Dataset Description: solution_error_keyword_mapping.json stores `solution` and `error_keyword` mapping relationships. In ER-Diagram Design they are Multiple to Multiple. 
* Redis: solution_error_keyword_mapping.json will be stored in Redis's hashset with hash_name=`solution_error_keyword_mapping`, key = `error_keyword_id`, value = `{s_id_1, s_id_2, s_id_3, ...}`

In [0]:
# solution_dataset.json -> redis[solutions] -> dataframe[solutions]

# error_keyword_dataset.json -> redis[error_keywrod] -> dataframe[error_keywords]

# solution_error_keyword_mapping.json -> redis[solution_error_keyword_mapping] -> dataframe[solution_error_keyword_mapping]

#### step 3-4 Execute Python Scripts to Load Module Diagnoise Related Datasets to Redis Hash Set 

##### modules_dataset.json 
* Dataset Description: solutions_dataset.json stores different errors' solutoin steps and causes
* Redis: data will be stored in Redis's hashset with hash_name = `solutions`, key = `solutoin_id`, value = `solution_content`

##### error_keyword_dataset.json 
* Dataset Description: error_keyword_dataset.json stores different error and exceptoin logs' keywrods that are generated from different applications' product environment. 
* Redis: error_keyword_dataset.json will be stored in Redis's hashset with hash_name=`error_keyword`, key = `error_keyword_id`, value = `error_keyword_content`.

##### module_error_keyword_mapping.json
* Dataset Description: module_error_keyword_mapping.json stores `module` and `error_keywrod` mapping relationships. In ER-Diagram Design they are Multiple to Multiple.
* Redis module_error_keyword_mapping.json will be stored in Redis's hashset with hash_name=`module_error_keyword_mapping`, key=`error_keyword_id`, value=`{m_id_1, m_id_2, m_id_3, ...}`

In [0]:
# here we create connection to our Azure Redis && try to insert data to redis 
import json 
import redis 

def write_json_to_redis(redis_client, hash_name, field_name, json_data):
    if redis_client is None:
        print("redis client is None return!")
        return 
    print(f"begin write data {json_data} to redis")
    redis_client.hset(hash_name, field_name, json_data)
    ret_value = redis_client.hget(hash_name, field_name)
    print(f"ret_value {ret_value}")


print("create redis client ")
redis_host = "hostname"
redis_port = 6380
redis_password = "="
redis_client = redis.Redis(host=redis_host, port=redis_port, db=0, password=redis_password, ssl=True)

if redis_client is None: 
    print("Fail to retrieve redis client")
else:
    print("Success retrieve redis client")

hash_value='hash_value'
hash_field='hash_field'
value = 'value'
(w_cnt, r_cnt) = write_json_to_redis(redis_client, hash_value, hash_field, value)

  


create redis client 
Success retrieve redis client
begin write data value to redis
ret_value b'value'
