# Demonstration RAG Pipeline

## Istio

In [1]:
import re
import requests
from urllib.parse import urlsplit

def get_istio_auth_session(url: str, username: str, password: str) -> dict:
    """
    Determine if the specified URL is secured by Dex and try to obtain a session cookie.
    WARNING: only Dex `staticPasswords` and `LDAP` authentication are currently supported
             (we default default to using `staticPasswords` if both are enabled)

    :param url: Kubeflow server URL, including protocol
    :param username: Dex `staticPasswords` or `LDAP` username
    :param password: Dex `staticPasswords` or `LDAP` password
    :return: auth session information
    """
    # define the default return object
    auth_session = {
        "endpoint_url": url,    # KF endpoint URL
        "redirect_url": None,   # KF redirect URL, if applicable
        "dex_login_url": None,  # Dex login URL (for POST of credentials)
        "is_secured": None,     # True if KF endpoint is secured
        "session_cookie": None  # Resulting session cookies in the form "key1=value1; key2=value2"
    }

    # use a persistent session (for cookies)
    with requests.Session() as s:

        ################
        # Determine if Endpoint is Secured
        ################
        resp = s.get(url, allow_redirects=True)
        if resp.status_code != 200:
            raise RuntimeError(
                f"HTTP status code '{resp.status_code}' for GET against: {url}"
            )

        auth_session["redirect_url"] = resp.url

        # if we were NOT redirected, then the endpoint is UNSECURED
        if len(resp.history) == 0:
            auth_session["is_secured"] = False
            return auth_session
        else:
            auth_session["is_secured"] = True

        ################
        # Get Dex Login URL
        ################
        redirect_url_obj = urlsplit(auth_session["redirect_url"])

        # if we are at `/auth?=xxxx` path, we need to select an auth type
        if re.search(r"/auth$", redirect_url_obj.path):

            #######
            # TIP: choose the default auth type by including ONE of the following
            #######

            # OPTION 1: set "staticPasswords" as default auth type
            redirect_url_obj = redirect_url_obj._replace(
                path=re.sub(r"/auth$", "/auth/local", redirect_url_obj.path)
            )
            # OPTION 2: set "ldap" as default auth type
            # redirect_url_obj = redirect_url_obj._replace(
            #     path=re.sub(r"/auth$", "/auth/ldap", redirect_url_obj.path)
            # )

        # if we are at `/auth/xxxx/login` path, then no further action is needed (we can use it for login POST)
        if re.search(r"/auth/.*/login$", redirect_url_obj.path):
            auth_session["dex_login_url"] = redirect_url_obj.geturl()

        # else, we need to be redirected to the actual login page
        else:
            # this GET should redirect us to the `/auth/xxxx/login` path
            resp = s.get(redirect_url_obj.geturl(), allow_redirects=True)
            if resp.status_code != 200:
                raise RuntimeError(
                    f"HTTP status code '{resp.status_code}' for GET against: {redirect_url_obj.geturl()}"
                )

            # set the login url
            auth_session["dex_login_url"] = resp.url

        ################
        # Attempt Dex Login
        ################
        resp = s.post(
            auth_session["dex_login_url"],
            data={"login": username, "password": password},
            allow_redirects=True
        )
        if len(resp.history) == 0:
            raise RuntimeError(
                f"Login credentials were probably invalid - "
                f"No redirect after POST to: {auth_session['dex_login_url']}"
            )

        # store the session cookies in a "key1=value1; key2=value2" string
        auth_session["session_cookie"] = "; ".join([f"{c.name}={c.value}" for c in s.cookies])

    return auth_session

## General Functions

In [2]:
# Created and works
def set_formatted_user(
    user: str   
) -> any:
    return re.sub(r'[^a-z0-9]+', '-', user)

## SWIFT Functions

In [3]:
from decouple import Config,RepositoryEnv

from keystoneauth1 import loading, session
from keystoneauth1.identity import v3
from keystoneclient.v3 import client as keystone_client

import swiftclient as sc
import pickle

In [4]:
# Works
def is_swift_client(
    storage_client: any
) -> any:
    return isinstance(storage_client, sc.Connection)
# Works
def swift_setup_client(
    pre_auth_url: str,
    pre_auth_token: str,
    user_domain_name: str,
    project_domain_name: str,
    project_name: str,
    auth_version: str
) -> any:
    swift_client = sc.Connection(
        preauthurl = pre_auth_url,
        preauthtoken = pre_auth_token,
        os_options = {
            'user_domain_name': user_domain_name,
            'project_domain_name': project_domain_name,
            'project_name': project_name
        },
        auth_version = auth_version
    )
    return swift_client
# Works
def swift_create_bucket(
    swift_client: any,
    bucket_name: str
) -> bool:
    try:
        swift_client.put_container(
            container = bucket_name
        )
        return True
    except Exception as e:
        return False
# Works
def swift_check_bucket(
    swift_client: any,
    bucket_name:str
) -> any:
    try:
        bucket_info = swift_client.get_container(
            container = bucket_name
        )
        bucket_metadata = bucket_info[0]
        list_of_objects = bucket_info[1]
        return {'metadata': bucket_metadata, 'objects': list_of_objects}
    except Exception as e:
        return {} 
# Refactored
def swift_delete_bucket(
    swift_client: any,
    bucket_name: str
) -> bool:
    try:
        swift_client.delete_container(
            container = bucket_name
        )
        return True
    except Exception as e:
        return False
# Created
def swift_list_buckets(
    swift_client: any
) -> any:
    try:
        account_buckets = swift_client.get_account()[1]
        buckets = {}
        for bucket in account_buckets:
            bucket_name = bucket['name']
            bucket_count = bucket['count']
            bucket_size = bucket['bytes']
            buckets[bucket_name] = {
                'amount': bucket_count,
                'size': bucket_size
            }
        return buckets
    except Exception as e:
        return {}
# Works
def swift_create_object(
    swift_client: any,
    bucket_name: str, 
    object_path: str, 
    object_data: any,
    object_metadata: any
) -> bool: 
    # This should be updated to handle 5 GB objects
    # It also should handle metadata
    try:
        swift_client.put_object(
            container = bucket_name,
            obj = object_path,
            contents = object_data,
            headers = object_metadata
        )
        return True
    except Exception as e:
        return False
# Works
def swift_check_object(
    swift_client: any,
    bucket_name: str, 
    object_path: str
) -> any: 
    try:
        object_metadata = swift_client.head_object(
            container = bucket_name,
            obj = object_path
        )       
        return object_metadata
    except Exception as e:
        return {} 
# Refactored
def swift_get_object(
    swift_client:any,
    bucket_name: str,
    object_path: str
) -> any:
    # This should handle metadata
    try:
        response = swift_client.get_object(
            container = bucket_name,
            obj = object_path 
        )
        object_info = response[0]
        object_data = response[1]
        return {'data': object_data, 'info': object_info}
    except Exception as e:
        return {}     
# Refactored   
def swift_remove_object(
    swift_client: any,
    bucket_name: str, 
    object_path: str
) -> bool: 
    try:
        swift_client.delete_object(
            container = bucket_name, 
            obj = object_path
        )
        return True
    except Exception as e:
        return False
# Works
def swift_update_object(
    swift_client: any,
    bucket_name: str, 
    object_path: str, 
    object_data: any,
    object_metadata: any
) -> bool:  
    remove = swift_remove_object(
        swift_client = swift_client, 
        bucket_name = bucket_name, 
        object_path = object_path
    )
    if not remove:
        return False
    create = swift_create_object(
        swift_client = swift_client, 
        bucket_name = bucket_name, 
        object_path = object_path, 
        object_data = object_data,
        object_metadata = object_metadata
    )
    return create
# Works
def swift_create_or_update_object(
    swift_client: any,
    bucket_name: str, 
    object_path: str, 
    object_data: any,
    object_metadata: any
) -> any:
    bucket_info = swift_check_bucket(
        swift_client = swift_client, 
        bucket_name = bucket_name
    )
    
    if len(bucket_info) == 0:
        creation_status = swift_create_bucket(
            swift_client = swift_client, 
            bucket_name = bucket_name
        )
        if not creation_status:
            return False
    
    object_info = swift_check_object(
        swift_client = swift_client, 
        bucket_name = bucket_name, 
        object_path = object_path
    )
    
    if len(object_info) == 0:
        return swift_create_object(
            swift_client = swift_client, 
            bucket_name = bucket_name, 
            object_path = object_path, 
            object_data = object_data,
            object_metadata = object_metadata
        )
    else:
        return swift_update_object(
            swift_client = swift_client, 
            bucket_name = bucket_name, 
            object_path = object_path, 
            object_data = object_data,
            object_metadata = object_metadata
        )

## Storage Functions

In [5]:
# 3-2-2

# Refactored and Works
def set_encoded_metadata(
    used_client: str,
    object_metadata: any
) -> any:
    encoded_metadata = {}
    if used_client == 'swift':
        key_initial = 'x-object-meta'
        for key, value in object_metadata.items():
            encoded_key = key_initial + '-' + key
            if isinstance(value, list):
                encoded_metadata[encoded_key] = 'list=' + ','.join(map(str, value))
                continue
            encoded_metadata[encoded_key] = str(value)
    return encoded_metadata
# Refactored and works
def get_general_metadata(
    used_client: str,
    object_metadata: any
) -> any:
    general_metadata = {}
    if used_client == 'swift':
        key_initial = 'x-object-meta'
        for key, value in object_metadata.items():
            if not key_initial == key[:len(key_initial)]:
                general_metadata[key] = value
    return general_metadata
# Refactored and works
def get_decoded_metadata(
    used_client: str,
    object_metadata: any
) -> any: 
    decoded_metadata = {}
    if used_client == 'swift':
        key_initial = 'x-object-meta'
        for key, value in object_metadata.items():
            if key_initial == key[:len(key_initial)]:
                decoded_key = key[len(key_initial) + 1:]
                if 'list=' in value:
                    string_integers = value.split('=')[1]
                    values = string_integers.split(',')
                    if len(values) == 1 and values[0] == '':
                        decoded_metadata[decoded_key] = []
                    else:
                        try:
                            decoded_metadata[decoded_key] = list(map(int, values))
                        except:
                            decoded_metadata[decoded_key] = list(map(str, values))
                    continue
                if value.isnumeric():
                    decoded_metadata[decoded_key] = int(value)
                    continue
                decoded_metadata[decoded_key] = value
    return decoded_metadata
# Refactored and works
def set_bucket_names(
    storage_parameters: any
) -> any:
    storage_names = []
    bucket_prefix = storage_parameters['bucket-prefix']
    ice_id = storage_parameters['ice-id']
    user = storage_parameters['user']
    storage_names.append(bucket_prefix + '-forwarder-' + ice_id)
    storage_names.append(bucket_prefix + '-submitter-' + ice_id + '-' + set_formatted_user(user = user))
    storage_names.append(bucket_prefix + '-pipeline-' + ice_id + '-' + set_formatted_user(user = user))
    storage_names.append(bucket_prefix + '-experiment-' + ice_id + '-' + set_formatted_user(user = user))
    return storage_names
# Refactored and works
def setup_storage_client(
    storage_parameters: any
) -> any:
    storage_client = None
    if storage_parameters['used-client'] == 'swift':
        storage_client = swift_setup_client(
            pre_auth_url = storage_parameters['pre-auth-url'],
            pre_auth_token = storage_parameters['pre-auth-token'],
            user_domain_name = storage_parameters['user-domain-name'],
            project_domain_name = storage_parameters['project-domain-name'],
            project_name = storage_parameters['project-name'],
            auth_version = storage_parameters['auth-version']
        )
    return storage_client
# Refactored and works
def check_object_metadata(
    storage_client: any,
    bucket_name: str, 
    object_path: str
) -> any: 
    object_metadata = {
        'general-meta': {},
        'custom-meta': {}
    }
    if is_swift_client(storage_client = storage_client):
        all_metadata = swift_check_object(
           swift_client = storage_client,
           bucket_name = bucket_name,
           object_path = object_path
        ) 

        general_metadata = {}
        custom_metadata = {}
        if not len(all_metadata) == 0:
            general_metadata = get_general_metadata(
                used_client = 'swift',
                object_metadata = all_metadata
            )
            custom_metadata = get_decoded_metadata(
                used_client = 'swift',
                object_metadata = all_metadata
            )

        object_metadata['general-meta'] = general_metadata
        object_metadata['custom-meta'] = custom_metadata

    return object_metadata
# Refactored and works
def get_object_content(
    storage_client: any,
    bucket_name: str,
    object_path: str
) -> any:
    object_content = {}
    if is_swift_client(storage_client = storage_client):
        fetched_object = swift_get_object(
            swift_client = storage_client,
            bucket_name = bucket_name,
            object_path = object_path
        )
        object_content['data'] = pickle.loads(fetched_object['data'])
        object_content['general-meta'] = get_general_metadata(
            used_client = 'swift',
            object_metadata = fetched_object['info']
        )
        object_content['custom-meta'] = get_decoded_metadata(
            used_client = 'swift',
            object_metadata = fetched_object['info']
        )
    return object_content
# Refactored    
def remove_object(
    storage_client: any,
    bucket_name: str, 
    object_path: str
) -> bool: 
    removed = False
    if is_swift_client(storage_client = storage_client):
        removed = swift_remove_object(
            swift_client = storage_client,
            bucket_name = bucket_name,
            object_path = object_path
        )
    return removed
# Refactored and works
def create_or_update_object(
    storage_client: any,
    bucket_name: str, 
    object_path: str, 
    object_data: any,
    object_metadata: any
) -> any:
    success = False
    if is_swift_client(storage_client = storage_client):
        formatted_data = pickle.dumps(object_data)
        formatted_metadata = set_encoded_metadata(
            used_client = 'swift',
            object_metadata = object_metadata
        )

        success = swift_create_or_update_object(
            swift_client = storage_client,
            bucket_name = bucket_name,
            object_path = object_path,
            object_data = formatted_data,
            object_metadata = formatted_metadata
        )
    return success
# Created and works
def format_bucket_metadata(
    used_client: str,
    bucket_metadata: any
) -> any:
    formatted_metadata = {}
    if used_client == 'swift':
        relevant_values = {
            'x-container-object-count': 'object-count',
            'x-container-bytes-used-actual': 'used-bytes',
            'last-modified': 'date',
            'content-type': 'type'
        }
        formatted_metadata = {}
        for key,value in bucket_metadata.items():
            if key in relevant_values:
                formatted_key = relevant_values[key]
                formatted_metadata[formatted_key] = value
    return formatted_metadata
# Created and works
def format_bucket_objects(
    used_client: str,
    bucket_objects: any
) -> any:
    formatted_objects = {}
    if used_client == 'swift':
        for bucket_object in bucket_objects:
            formatted_object_metadata = {
                'hash': 'id',
                'bytes': 'used-bytes',
                'last_modified': 'date'
            }
            object_key = None
            object_metadata = {}
            for key, value in bucket_object.items():
                if key == 'name':
                    object_key = value
                if key in formatted_object_metadata:
                    formatted_key = formatted_object_metadata[key]
                    object_metadata[formatted_key] = value
            formatted_objects[object_key] = object_metadata
    return formatted_objects
# Created and works
def format_bucket_info(
    used_client: str,
    bucket_info: any
) -> any:
    bucket_metadata = {}
    bucket_objects = {}
    if used_client == 'swift':
        bucket_metadata = format_bucket_metadata(
            used_client = used_client,
            bucket_metadata = bucket_info['metadata']
        )
        bucket_objects = format_bucket_objects(
            used_client = used_client,
            bucket_objects = bucket_info['objects']
        )
    return {'metadata': bucket_metadata, 'objects': bucket_objects} 
# Created and works
def get_bucket_info(
    storage_client: any,
    bucket_name: str
) -> any:
    bucket_info = {}
    if is_swift_client(storage_client = storage_client):
        unformatted_bucket_info = swift_check_bucket(
            swift_client = storage_client,
            bucket_name = bucket_name
        )
        bucket_info = format_bucket_info(
            used_client = 'swift',
            bucket_info = unformatted_bucket_info
        )
    return bucket_info
# Created and works
def format_container_info(
    used_client: str,
    container_info: any
) -> any:
    formatted_container_info = {}
    if used_client == 'swift':
        for bucket in container_info:
            bucket_name = bucket['name']
            bucket_count = bucket['count']
            bucket_size = bucket['bytes']
            formatted_container_info[bucket_name] = {
                'amount': bucket_count,
                'size': bucket_size
            }
    return formatted_container_info
# Created and works
def get_container_info( 
    storage_client: any
) -> any:
    container_info = {}
    if is_swift_client(storage_client = storage_client):
        unformatted_container_info = swift_list_buckets(
            swift_client = storage_client 
        )
        container_info = format_container_info(
            used_client = 'swift',
            container_info = unformatted_container_info
        )
    return container_info

## Object functions

In [6]:
# 4-2-3

# Created and works
def set_object_path(
    object_name: str,
    path_replacers: any,
    path_names: any
):
    object_paths = {
        'root': 'name',
        'code': 'CODE/name',
        'slurm': 'CODE/SLURM/name',
        'ray': 'CODE/RAY/name',
        'data': 'DATA/name',
        'artifacts': 'ARTIFACTS/name',
        'time': 'TIMES/name'
    }

    i = 0
    path_split = object_paths[object_name].split('/')
    for name in path_split:
        if name in path_replacers:
            replacer = path_replacers[name]
            if 0 < len(replacer):
                path_split[i] = replacer
        i = i + 1
    
    if not len(path_names) == 0:
        path_split.extend(path_names)

    object_path = '/'.join(path_split)
    print('Used object path:' + str(object_path))
    return object_path
# created and works
def setup_storage(
    storage_parameters: any
) -> any:
    storage_client = setup_storage_client(
        storage_parameters = storage_parameters
    ) 
    
    storage_name = set_bucket_names(
       storage_parameters = storage_parameters
    )
    
    return storage_client, storage_name
# Created and works
def check_object(
    storage_client: any,
    bucket_name: str,
    object_name: str,
    path_replacers: any,
    path_names: any
) -> bool:
    object_path = set_object_path(
        object_name = object_name,
        path_replacers = path_replacers,
        path_names = path_names
    )
    # Consider making these functions object storage agnostic
    object_metadata = check_object_metadata(
        storage_client = storage_client,
        bucket_name = bucket_name,
        object_path = object_path
    )
    object_metadata['path'] = object_path
    return object_metadata
# Created and works
def get_object(
    storage_client: any,
    bucket_name: str,
    object_name: str,
    path_replacers: any,
    path_names: any
) -> any:
    checked_object = check_object(
        storage_client = storage_client,
        bucket_name = bucket_name,
        object_name = object_name,
        path_replacers = path_replacers,
        path_names = path_names
    )

    object_data = None
    if not len(checked_object['general-meta']) == 0:
        # Consider making these functions object storage agnostic
        object_data = get_object_content(
            storage_client = storage_client,
            bucket_name = bucket_name,
            object_path = checked_object['path']
        )

    return object_data
# Created and Works
def set_object(
    storage_client: any,
    bucket_name: str,
    object_name: str,
    path_replacers: any,
    path_names: any,
    overwrite: bool,
    object_data: any,
    object_metadata: any
):
    checked_object = check_object(
        storage_client = storage_client,
        bucket_name = bucket_name,
        object_name = object_name,
        path_replacers = path_replacers,
        path_names = path_names
    )
    
    perform = True
    if not len(checked_object['general-meta']) == 0 and not overwrite:
        perform = False
    
    if perform:
        create_or_update_object(
            storage_client = storage_client,
            bucket_name = bucket_name,
            object_path = checked_object['path'],
            object_data = object_data,
            object_metadata = object_metadata
        )
# Created and works
def check_bucket(
    storage_client: any,
    bucket_name: str
) -> any:
    return get_bucket_info(
        storage_client = storage_client,
        bucket_name = bucket_name
    )
# Created and works
def check_buckets(
    storage_client: any
) -> any:
    return get_container_info( 
        storage_client = storage_client
    )

## Metadata Functions

In [7]:
# Created and works
def general_object_metadata():
    general_object_metadata = {
        'version': 1
    }
    return general_object_metadata

## Access Functions

In [8]:
def get_storage_parameters(
    env_path: str,
    auth_url: str,
    pre_auth_url: str,
    auth_version: str,
    bucket_prefix: str,
    ice_id: str,
    user: str
):
    env_config = Config(RepositoryEnv(env_path))
    swift_auth_url = auth_url
    swift_user = env_config.get('CSC_USERNAME')
    swift_key = env_config.get('CSC_PASSWORD')
    swift_project_name = env_config.get('CSC_PROJECT_NAME')
    swift_user_domain_name = env_config.get('CSC_USER_DOMAIN_NAME')
    swift_project_domain_name = env_config.get('CSC_USER_DOMAIN_NAME')

    loader = loading.get_plugin_loader('password')
    auth = loader.load_from_options(
        auth_url = swift_auth_url,
        username = swift_user,
        password = swift_key,
        project_name = swift_project_name,
        user_domain_name = swift_user_domain_name,
        project_domain_name = swift_project_domain_name
    )

    keystone_session = session.Session(
        auth = auth
    )
    swift_token = keystone_session.get_token()

    swift_pre_auth_url = pre_auth_url
    swift_auth_version = auth_version

    storage_parameters = {
        'bucket-prefix': bucket_prefix,
        'ice-id': ice_id,
        'user': user,
        'used-client': 'swift',
        'pre-auth-url': str(swift_pre_auth_url),
        'pre-auth-token': str(swift_token),
        'user-domain-name': str(swift_user_domain_name),
        'project-domain-name': str(swift_project_domain_name),
        'project-name': str(swift_project_name),
        'auth-version': str(swift_auth_version)
    }

    return storage_parameters

## Code Functions

In [9]:
# Refactored
def set_code(
    storage_client: any,
    storage_name: str,
    file_path: str,
    overwrite: bool
):
    file_data = None
    print('User code storage:' + str(storage_name))
    print('Used code path:' + str(file_path))
    with open(file_path, 'r') as f:
        file_data = f.read()

    path_split = file_path.split('/')
    directory_name = path_split[-2]
    file_name = path_split[-1]
    
    set_object(
        storage_client = storage_client,
        bucket_name = storage_name,
        object_name = directory_name,
        path_replacers = {
            'name': file_name
        },
        path_names = [],
        overwrite = overwrite,
        object_data = file_data,
        object_metadata = general_object_metadata()
    )
# Refactored
def get_code(
    storage_client: any,
    storage_name: str,
    code_type: str,
    code_file: str
) -> any:
    print('User code storage:' + str(storage_name))
    fetched_object = get_object(
        storage_client = storage_client,
        bucket_name = storage_name,
        object_name = code_type,
        path_replacers = {
            'name': code_file
        },
        path_names = []
    )
    code_object = {
        'data': fetched_object['data'],
        'metadata': fetched_object['custom-meta']
    }
    return code_object

In [10]:
env_absolute_path = '/home/sfniila/.ssh/.env'

In [13]:
storage_parameters = get_storage_parameters(
    env_path = env_absolute_path,
    auth_url = 'https://pouta.csc.fi:5001/v3',
    pre_auth_url = 'https://a3s.fi:443/swift/v1/AUTH_6698ff90e6704a74930c33d6b66f1b5b',
    auth_version = '3',
    bucket_prefix = 'llm',
    ice_id = 's0-c0-u1',
    user = 'user@example.com'
)

storage_client, storage_names = setup_storage(
    storage_parameters = storage_parameters
)

In [14]:
storage_names

['llm-forwarder-s0-c0-u1',
 'llm-submitter-s0-c0-u1-user-example-com',
 'llm-pipeline-s0-c0-u1-user-example-com',
 'llm-experiment-s0-c0-u1-user-example-com']

## Ray

In [49]:
from ray.job_submission import JobSubmissionClient
from ray.job_submission import JobStatus
import time as t
import json
import requests

def test_url(
    target_url: str,
    timeout: int
) -> bool:
    try:
        response = requests.head(
            url = target_url, 
            timeout = timeout
        )
        if response.status_code == 200:
            return True
        return False
    except requests.ConnectionError:
        return False

def setup_ray(
    services: any,
    timeout: int
):
    start = t.time()
    ray_client = None
    if 0 < len(services):
        ray_dashboard_url = 'http://' + services['ray-dashboard']
        ray_exists = None
        while t.time() - start <= timeout:
            ray_exists = test_url(
                target_url = ray_dashboard_url,
                timeout = 5
            )
            if ray_exists:
                break
            t.sleep(5)
        if ray_exists:
            ray_client = JobSubmissionClient(
                address = ray_dashboard_url
            )
    return ray_client

def submit_ray_job(
    ray_client: any,
    ray_parameters: any,
    ray_job_file: any,
    working_directory: str,
    ray_job_envs: any,
    ray_job_packages: any
) -> any:
    command = "python " + str(ray_job_file)
    if 0 < len(ray_parameters):
        command = command + " '" + json.dumps(ray_parameters) + "'"
    job_id = ray_client.submit_job(
        entrypoint = command,
        runtime_env = {
            'working_dir': str(working_directory),
            'env_vars': ray_job_envs,
            'pip': ray_job_packages
        }
    )
    return job_id

def wait_ray_job(
    ray_client: any,
    ray_job_id: int, 
    waited_status: any,
    timeout: int
) -> any:
    start = t.time()
    job_status = None
    while t.time() - start <= timeout:
        status = ray_client.get_job_status(ray_job_id)
        print(f"status: {status}")
        if status in waited_status:
            job_status = status
            break
        t.sleep(5)
    job_logs = ray_client.get_job_logs(ray_job_id)
    return job_status, job_logs

def ray_job_handler(
    ray_client: any,
    ray_parameters: any,
    ray_job_file: str,
    ray_directory: str,
    ray_job_envs: any,
    ray_job_packages: any,
    timeout: int
) -> bool:
    ray_job_id = submit_ray_job(
        ray_client = ray_client,
        ray_parameters = ray_parameters,
        ray_job_file = ray_job_file,
        working_directory = ray_directory,
        ray_job_envs = ray_job_envs,
        ray_job_packages = ray_job_packages
    )

    print('Ray batch job id: ' + str(ray_job_id))
    
    ray_job_status, ray_job_logs = wait_ray_job(
        ray_client = ray_client,
        ray_job_id = ray_job_id,
        waited_status = {
            JobStatus.SUCCEEDED, 
            JobStatus.STOPPED, 
            JobStatus.FAILED
        }, 
        timeout = timeout
    )
    print('Ray batch job ended:')
    success = True
    if not ray_job_status == JobStatus.SUCCEEDED:
        print('Ray batch job failed')
        success = False
    else:
        print('Ray batch job succeeded')
    print(ray_job_logs)
    return success

  from .autonotebook import tqdm as notebook_tqdm
2024-11-08 12:04:28,830	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


## Job Functions

In [57]:
%%writefile ray/functions/pygithub.py

from github import Github

def pygithub_get_repo_paths(
    token: str,
    owner: str, 
    name: str
) -> any:
    g = Github(token)
    repo = g.get_repo(f"{owner}/{name}")
    contents = repo.get_contents("")
    paths = []
    while len(contents) > 0:
      file_content = contents.pop(0)
      if file_content.type == 'dir':
        contents.extend(repo.get_contents(file_content.path))
      else:
        paths.append(file_content.path)
    g.close()
    return paths

def pygithub_get_path_content(
    token: str,
    owner: str, 
    name: str, 
    path: str
) -> any:
    g = Github(token)
    repo = g.get_repo(f"{owner}/{name}")
    file_content = repo.get_contents(path)
    content = file_content.decoded_content.decode('utf-8')
    g.close()
    return content

Overwriting ray/functions/pygithub.py


In [58]:
%%writefile ray/functions/mongo_db.py

from pymongo import MongoClient as mc

def mongo_is_client(
    storage_client: any
) -> any:
    return isinstance(storage_client, mc.Connection)

def mongo_setup_client(
    username: str,
    password: str,
    address: str,
    port: str
) -> any:
    connection_prefix = 'mongodb://(username):(password)@(address):(port)/'
    connection_address = connection_prefix.replace('(username)', username)
    connection_address = connection_address.replace('(password)', password)
    connection_address = connection_address.replace('(address)', address)
    connection_address = connection_address.replace('(port)', port)
    mongo_client = mc(
        host = connection_address
    )
    return mongo_client

def mongo_get_database(
    mongo_client: any,
    database_name: str
) -> any:
    try:
        database = mongo_client[database_name]
        return database
    except Exception as e:
        return None

def mongo_check_database(
    mongo_client: any, 
    database_name: str
) -> bool:
    try:
        database_exists = database_name in mongo_client.list_database_names()
        return database_exists
    except Exception as e:
        return False

def mongo_list_databases(
    mongo_client: any
) -> any:
    try:
        databases = mongo_client.list_database_names()
        return databases
    except Exception as e:
        return []

def mongo_remove_database(
    mongo_client: any, 
    database_name: str
) -> bool:
    try:
        mongo_client.drop_database(database_name)
        return True
    except Exception as e:
        return False

def mongo_get_collection(
    mongo_client: any, 
    database_name: str, 
    collection_name: str
) -> bool:
    try:
        database = mongo_get_database(
            mongo_client = mongo_client,
            database_name = database_name
        )
        collection = database[collection_name]
        return collection
    except Exception as e:
        return None
    
def mongo_check_collection(
    mongo_client: any, 
    database_name: any, 
    collection_name: any
) -> bool:
    try:
        database = mongo_client[database_name]
        collection_exists = collection_name in database.list_collection_names()
        return collection_exists
    except Exception as e:
        return False

def mongo_update_collection(
    mongo_client: any, 
    database_name: str, 
    collection_name: str, 
    filter_query: any, 
    update_query: any
) -> any:
    try:
        collection = mongo_get_collection(
            mongo_client = mongo_client, 
            database_name = database_name, 
            collection_name = collection_name
        )
        result = collection.update_many(filter_query, update_query)
        return result
    except Exception as e:
        return None

def mongo_list_collections(
    mongo_client: any, 
    database_name: str
) -> bool:
    try:
        database = mongo_get_database(
            mongo_client = mongo_client,
            database_name = database_name
        )
        collections = database.list_collection_names()
        return collections
    except Exception as e:
        return []

def mongo_remove_collection(
    mongo_client: any, 
    database_name: str, 
    collection_name: str
) -> bool:
    try: 
        database = mongo_get_database(
            mongo_client = mongo_client,
            database_name = database_name
        )
        database.drop_collection(collection_name)
        return True
    except Exception as e:
        return False

def mongo_create_document(
    mongo_client: any, 
    database_name: str, 
    collection_name: str, 
    document: any
) -> any:
    try: 
        collection = mongo_get_collection(
            mongo_client = mongo_client, 
            database_name = database_name, 
            collection_name = collection_name
        )
        result = collection.insert_one(document)
        return result
    except Exception as e:
        return None

def mongo_get_document(
    mongo_client: any, 
    database_name: str, 
    collection_name: str, 
    filter_query: any
):
    try: 
        collection = mongo_get_collection(
            mongo_client = mongo_client, 
            database_name = database_name, 
            collection_name = collection_name
        )
        document = collection.find_one(filter_query)
        return document
    except Exception as e:
        print(e)
        return None 

def mongo_list_documents(
    mongo_client: any, 
    database_name: str, 
    collection_name: str, 
    filter_query: any,
    sorting_query: any
) -> any:
    try: 
        collection = mongo_get_collection(
            mongo_client = mongo_client, 
            database_name = database_name, 
            collection_name = collection_name
        )
        documents = list(collection.find(filter_query).sort(sorting_query))
        return documents
    except Exception as e:
        return []

def mongo_update_document(
    mongo_client: any, 
    database_name: any, 
    collection_name: any, 
    filter_query: any, 
    update_query: any
) -> any:
    try: 
        collection = mongo_get_collection(
            mongo_client = mongo_client, 
            database_name = database_name, 
            collection_name = collection_name
        )
        result = collection.update_one(filter_query, update_query)
        return result
    except Exception as e:
        return None

def mongo_remove_document(
    mongo_client: any, 
    database_name: str, 
    collection_name: str, 
    filter_query: any
) -> bool:
    try: 
        collection = mongo_get_collection(
            mongo_client = mongo_client, 
            database_name = database_name, 
            collection_name = collection_name
        )
        result = collection.delete_one(filter_query)
        return result
    except Exception as e:
        return None

Overwriting ray/functions/mongo_db.py


In [59]:
%%writefile ray/functions/minio_os.py

import io
import pickle
from minio import Minio

def is_minio_client(
    storage_client: any
) -> bool:
    return isinstance(storage_client, Minio)

def minio_setup_client(
    endpoint: str,
    username: str,
    password: str
) -> any:
    minio_client = Minio(
        endpoint = endpoint, 
        access_key = username, 
        secret_key = password,
        secure = False
    )
    return minio_client

def minio_pickle_data(
    data: any
) -> any:
    pickled_data = pickle.dumps(data)
    length = len(pickled_data)
    buffer = io.BytesIO()
    buffer.write(pickled_data)
    buffer.seek(0)
    return buffer, length

def minio_unpickle_data(
    pickled: any
) -> any:
    return pickle.loads(pickled)

def minio_create_bucket(
    minio_client: any,
    bucket_name: str
) -> bool: 
    try:
        minio_client.make_bucket(
            bucket_name = bucket_name
        )
        return True
    except Exception as e:
        print('MinIO bucket creation error')
        print(e)
        return False
    
def minio_check_bucket(
    minio_client: any,
    bucket_name:str
) -> bool:
    try:
        status = minio_client.bucket_exists(
            bucket_name = bucket_name
        )
        return status
    except Exception as e:
        print('MinIO bucket checking error')
        print(e)
        return False 
       
def minio_delete_bucket(
    minio_client: any,
    bucket_name:str
) -> bool:
    try:
        minio_client.remove_bucket(
            bucket_name = bucket_name
        )
        return True
    except Exception as e:
        print('MinIO bucket deletion error')
        print(e)
        return False

def minio_create_object(
    minio_client: any,
    bucket_name: str, 
    object_path: str, 
    data: any,
    metadata: dict
) -> bool: 
    # Be aware that MinIO objects have a size limit of 1GB, 
    # which might result to large header error    
    
    try:
        buffer, length = minio_pickle_data(
            data = data
        )

        minio_client.put_object(
            bucket_name = bucket_name,
            object_name = object_path,
            data = buffer,
            length = length,
            metadata = metadata
        )
        return True
    except Exception as e:
        print('MinIO object creation error')
        print(e)
        return False

def minio_check_object(
    minio_client: any,
    bucket_name: str, 
    object_path: str
) -> any: 
    try:
        object_info = minio_client.stat_object(
            bucket_name = bucket_name,
            object_name = object_path
        )      
        return object_info
    except Exception as e:
        return {}

def minio_delete_object(
    minio_client: any,
    bucket_name: str, 
    object_path: str
) -> bool: 
    try:
        minio_client.remove_object(
            bucket_name = bucket_name, 
            object_name = object_path
        )
        return True
    except Exception as e:
        print('MinIO object deletion error')
        print(e)
        return False

def minio_update_object(
    minio_client: any,
    bucket_name: str, 
    object_path: str, 
    data: any,
    metadata: dict
) -> bool:  
    remove = minio_delete_object(
        minio_client = minio_client,
        bucket_name = bucket_name,
        object_path = object_path
    )
    if remove:
        return minio_create_object(
            minio_client = minio_client, 
            bucket_name = bucket_name, 
            object_path = object_path, 
            data = data,
            metadata = metadata
        )
    return False

def minio_create_or_update_object(
    minio_client: any,
    bucket_name: str, 
    object_path: str, 
    data: any, 
    metadata: dict
) -> bool:
    bucket_status = minio_check_bucket(
        minio_client = minio_client,
        bucket_name = bucket_name
    )
    if not bucket_status:
        creation_status = minio_create_bucket(
            minio_client = minio_client,
            bucket_name = bucket_name
        )
        if not creation_status:
            return False
    object_status = minio_check_object(
        minio_client = minio_client,
        bucket_name = bucket_name, 
        object_path = object_path
    )
    if not object_status:
        return minio_create_object(
            minio_client = minio_client,
            bucket_name = bucket_name, 
            object_path = object_path, 
            data = data, 
            metadata = metadata
        )
    else:
        return minio_update_object(
            minio_client = minio_client,
            bucket_name = bucket_name, 
            object_path = object_path, 
            data = data, 
            metadata = metadata
        )

def minio_get_object_list(
    minio_client: any,
    bucket_name: str,
    path_prefix: str
) -> any:
    try:
        objects = minio_client.list_objects(
            bucket_name = bucket_name, 
            prefix = path_prefix, 
            recursive = True
        )
        return objects
    except Exception as e:
        return None  
    
def minio_get_object_data_and_metadata(
    minio_client: any,
    bucket_name: str, 
    object_path: str
) -> any:
    try:
        given_object_data = minio_client.get_object(
            bucket_name = bucket_name, 
            object_name = object_path
        )
        
        given_data = minio_unpickle_data(
            pickled = given_object_data.data
        )
        
        given_object_info = minio_client.stat_object(
            bucket_name = bucket_name, 
            object_name = object_path
        )
        
        given_metadata = given_object_info.metadata
        
        return {'data': given_data, 'metadata': given_metadata}
    except Exception as e:
        print('MinIO object fetching error')
        print(e)
        return None


Overwriting ray/functions/minio_os.py


In [60]:
%%writefile ray/functions/get_documents.py

from functions.minio_os import minio_check_object, minio_create_or_update_object, minio_get_object_data_and_metadata
from functions.pygithub import pygithub_get_repo_paths

def get_github_repo_documents(
    object_client: any,
    github_token: str,
    repository_owner: str,
    repository_name: str,
    object_bucket: str,
    repo_paths_object: str,
    relevant_files: any,
    replace: bool
) -> any:
    print('Fetching paths')

    object_exists = minio_check_object(
        minio_client = object_client,
        bucket_name = object_bucket, 
        object_path = repo_paths_object
    )
 
    repo_paths = []
    if replace == 'true' or not object_exists:
        print('Getting github paths')

        repo_paths = pygithub_get_repo_paths(
            token = github_token,
            owner = repository_owner, 
            name = repository_name
        )

        print('Storing paths')

        minio_create_or_update_object(
            minio_client = object_client,
            bucket_name = object_bucket, 
            object_path = repo_paths_object,
            data = repo_paths, 
            metadata = {}
        )

        print('Paths stored')
    else:
        print('Getting stored paths')
        repo_paths = minio_get_object_data_and_metadata(
            minio_client = object_client,
            bucket_name = object_bucket, 
            object_path = repo_paths_object
        )['data']

    print('Filtering paths')
    relevant_paths = []
    for path in repo_paths:
        path_split = path.split('/')
        file_end = path_split[-1].split('.')[-1].rstrip()
        if file_end in relevant_files:
            relevant_paths.append(path.rstrip())
    print('Paths filtered')

    formatted_paths = {
        'paths': relevant_paths
    }

    return formatted_paths

Overwriting ray/functions/get_documents.py


In [61]:
%%writefile ray/functions/create_documents.py

import re
import yaml 

import markdown
from bs4 import BeautifulSoup

from functions.formatting_documents import extract_yaml_values, extract_jupyter_notebook_markdown_and_code, parse_jupyter_notebook_markdown_into_text
from functions.tree_sitter import tree_create_python_code_and_function_documents

def create_markdown_documents(
    markdown_text: any
) -> any:
    html = markdown.markdown(markdown_text)
    soup = BeautifulSoup(html, features='html.parser')
    code_block_pattern = re.compile(r"```")
    
    documents = []
    document = ''
    index = 1
    for element in soup.descendants:
        if element.name in ['h2', 'h3', 'h4', 'h5', 'h6']:
            text = element.get_text(strip = True)
            if not document == '':
                document = document.replace('\n', '')
                if not len(document.split()) == 1:
                    documents.append({
                        'index': index,
                        'sub-index': 0,
                        'type': 'markdown',
                        'data': document
                    })
                    index += 1
                document = ''
            document += text
        elif element.name == 'p':
            text = element.get_text(strip = True)
            text = re.sub(code_block_pattern, '', text)
            text = text.rstrip('\n')
            text = text.replace('\nsh', '')
            text = text.replace('\nbash', '')
            document += ' ' + text
        elif element.name in ['ul', 'ol']:
            text = ''
            for li in element.find_all('li'):
                item = li.get_text(strip=True)
                if not '-' in item:
                    text += '-' + item
                    continue
                text += item
            document += ' ' + text
            
    documents.append({
        'index': index,
        'sub-index': 0,
        'type': 'markdown',
        'data': document
    })
    
    formatted_documents = {
        'text': documents
    }
    
    return formatted_documents

def create_yaml_documents(
    yaml_text: any
) -> any:
    yaml_data = list(yaml.safe_load_all(yaml_text))

    documents = []
    index = 1
    for data in yaml_data:
        yaml_values = extract_yaml_values(
            section = data,
            path = '',
            values = []
        )

        previous_root = ''
        document = ''
        sub_index = 1
        for value in yaml_values:
            equal_split = value.split('=')
            path_split = equal_split[0].split('/')
            root = path_split[0]
            if not root == previous_root:
                if 0 < len(document):
                    documents.append({
                        'index': index,
                        'sub-index': sub_index,
                        'type': 'yaml',
                        'data': document
                    })
                    sub_index += 1
                    
                previous_root = root
                document = value
            else:
                document += value
                
        documents.append({
            'index': index,
            'sub-index': sub_index,
            'type': 'yaml',
            'data': document
        })
        index += 1

    formatted_documents = {
        'text': documents
    }
            
    return formatted_documents

def create_python_documents(
    python_text: any
): 
    joined_code = ''.join(python_text)
    block_code_documents = tree_create_python_code_and_function_documents(
        code_document = joined_code
    )
    
    code_documents = []
    seen_function_names = []
    code_doc_index = 0
    for code_doc in block_code_documents:
        row_split = code_doc.split('\n')
        for row in row_split:
            if 'function' in row and 'code' in row:
                # This causes problems with some documents
                # list index out of range
                function_name = row.split(' ')[1]
                if not function_name in seen_function_names:
                    seen_function_names.append(function_name)
                else:
                    del block_code_documents[code_doc_index]
        code_doc_index += 1
    
    if 0 < len(block_code_documents):
        index = 1
        for code_doc in block_code_documents:
            code_documents.append({
                'index': index,
                'sub-index': 0,
                'type': 'python',
                'data': code_doc
            })
            index += 1
   
    formatted_documents = {
        'code': code_documents
    }
    return formatted_documents

def create_notebook_documents(
    notebook_text: any
):
    notebook_documents = extract_jupyter_notebook_markdown_and_code(
        notebook_text = notebook_text
    )
    
    markdown_documents = []
    for block in notebook_documents['markdown']:
        joined_text = ''.join(block['data'])
        markdown_text = parse_jupyter_notebook_markdown_into_text(
            markdown_text = joined_text
        )
        markdown_documents.append({
            'index': block['id'],
            'sub-index': 0,
            'type': 'markdown',
            'data': markdown_text
        })
    
    code_documents = []
    seen_function_names = []
    for block in notebook_documents['code']:
        joined_code = ''.join(block['data'])
        block_code_documents = tree_create_python_code_and_function_documents(
            code_document = joined_code
        )

        code_doc_index = 0
        for code_doc in block_code_documents:
            row_split = code_doc.split('\n')
            for row in row_split:
                if 'function' in row and 'code' in row:
                    # This causes problems with some documents
                    # list index out of range
                    function_name = row.split(' ')[1]
                    if not function_name in seen_function_names:
                        seen_function_names.append(function_name)
                    else:
                        del block_code_documents[code_doc_index]
            code_doc_index += 1
        
        if 0 < len(block_code_documents):
            sub_indexes = False
            if 1 < len(block_code_documents):
                sub_indexes = True
            index = 1
            for code_doc in block_code_documents:
                if sub_indexes:
                    code_documents.append({
                        'index': block['id'],
                        'sub-index': index, 
                        'type': 'python',
                        'data': code_doc
                    })
                else:
                    code_documents.append({ 
                        'index': block['id'],
                        'sub-index': 0,
                        'type': 'python',
                        'data': code_doc
                    })
                index += 1
    
    formatted_documents = {
        'text': markdown_documents,
        'code': code_documents
    }
    
    return formatted_documents

Overwriting ray/functions/create_documents.py


In [62]:
%%writefile ray/functions/formatting_documents.py

import re
import markdown
import nbformat

from bs4 import BeautifulSoup

def extract_yaml_values(
    section: any,
    path: str,
    values: any
) -> any:
    for key, value in section.items():
        if path == '':
            current_path = key
        else:
            current_path = path + '/' + key
        if isinstance(value, dict):
            extract_yaml_values(
                section = value,
                path = current_path,
                values = values
            )
        if isinstance(value, list):
            number = 1
            
            for case in value:
                base_path = current_path
                if isinstance(case, dict):
                   extract_yaml_values(
                       section = case,
                       path = current_path,
                       values = values
                   ) 
                   continue
                base_path += '/' + str(number)
                number += 1
                values.append(base_path + '=' + str(case))
        else:
            if isinstance(value, dict):
                continue
            if isinstance(value, list):
                continue
            values.append(current_path + '=' + str(value))
            
    return values

def parse_jupyter_notebook_markdown_into_text(
    markdown_text: any
) -> any:
    html = markdown.markdown(markdown_text)
    soup = BeautifulSoup(html, features='html.parser')
    text = soup.get_text()
    code_block_pattern = re.compile(r"```")
    text = re.sub(code_block_pattern, '', text)
    text = text.rstrip('\n')
    text = text.replace('\nsh', '\n')
    text = text.replace('\nbash', '\n')
    return text

def extract_jupyter_notebook_markdown_and_code(
    notebook_text: any
): 
    notebook_documents = {
        'markdown': [],
        'code': []
    }

    notebook = nbformat.reads(notebook_text, as_version=2)
    index = 1
    for cell in notebook.worksheets[0].cells:
        if cell.cell_type == 'markdown':
            notebook_documents['markdown'].append({
                'id': index,
                'data': cell.source
            })
            index += 1
        if cell.cell_type == 'code':
            notebook_documents['code'].append({
                'id': index,
                'data': cell.input
            })
            index += 1
    
    return notebook_documents

Overwriting ray/functions/formatting_documents.py


In [63]:
%%writefile ray/functions/tree_sitter.py

import re

import tree_sitter_python as tspython
from tree_sitter import Language, Parser

def tree_extract_imports(
    node: any, 
    code_text: str
) -> any:
    imports = []
    if node.type == 'import_statement' or node.type == 'import_from_statement':
        start_byte = node.start_byte
        end_byte = node.end_byte
        imports.append(code_text[start_byte:end_byte].decode('utf8'))
    for child in node.children:
        imports.extend(tree_extract_imports(child, code_text))
    return imports

def tree_extract_dependencies(
    node: any, 
    code_text: str
) -> any:
    dependencies = []
    for child in node.children:
        if child.type == 'call':
            dependency_name = child.child_by_field_name('function').text.decode('utf8')
            dependencies.append(dependency_name)
        dependencies.extend(tree_extract_dependencies(child, code_text))
    return dependencies

def tree_extract_code_and_dependencies(
    node: any,
    code_text: str
) -> any:
    codes = []
    if not node.type == 'function_definition':
        start_byte = node.start_byte
        end_byte = node.end_byte
        name = node.child_by_field_name('name')
        if name is None:
            code = code_text[start_byte:end_byte].decode('utf8')
            if not 'def' in code:
                dependencies = tree_extract_dependencies(node, code_text)
                codes.append({
                    'name': 'global',
                    'code': code,
                    'dependencies': dependencies
                })
    return codes

def tree_extract_functions_and_dependencies(
    node: any, 
    code_text: str
) -> any:
    functions = []
    if node.type == 'function_definition':
        start_byte = node.start_byte
        end_byte = node.end_byte
        name = node.child_by_field_name('name').text.decode('utf8')
        code = code_text[start_byte:end_byte].decode('utf8')
        dependencies = tree_extract_dependencies(node, code_text)
        functions.append({
            'name': name,
            'code': code,
            'dependencies': dependencies
        })
    for child in node.children:
        functions.extend(tree_extract_functions_and_dependencies(child, code_text))
    return functions

def tree_get_used_imports(
    general_imports: any,
    function_dependencies: any
) -> any:
    parsed_imports = {}
    for code_import in general_imports:
        import_factors = code_import.split('import')[-1].replace(' ', '')
        import_factors = import_factors.split(',')
    
        for factor in import_factors:
            if not factor in parsed_imports:
                parsed_imports[factor] = code_import.split('import')[0] + 'import ' + factor
            
    relevant_imports = {}
    for dependency in function_dependencies:
        initial_term = dependency.split('.')[0]
    
        if not initial_term in relevant_imports:
            if initial_term in parsed_imports:
                relevant_imports[initial_term] = parsed_imports[initial_term]
    
    used_imports = []
    for name, code in relevant_imports.items():
        used_imports.append(code)

    return used_imports

def tree_get_used_functions(
    general_functions: any,
    function_dependencies: any
): 
    used_functions = []
    for related_function_name in function_dependencies:
        for function in general_functions:
            if function['name'] == related_function_name:
                used_functions.append('from ice import ' + function['name'])
    return used_functions

def tree_create_code_document(
    code_imports: any,
    code_functions: any,
    function_item: any
) -> any:
    used_imports = tree_get_used_imports(
        general_imports = code_imports,
        function_dependencies = function_item['dependencies']
    )

    used_functions = tree_get_used_functions(
        general_functions = code_functions,
        function_dependencies = function_item['dependencies']
    )
    
    document = {
        'imports': used_imports,
        'functions': used_functions,
        'name': function_item['name'],
        'dependencies': function_item['dependencies'],
        'code': function_item['code']
    }
    
    return document
     
def tree_format_code_document(
    code_document: any
) -> any:
    formatted_document = ''
    for doc_import in code_document['imports']:
        formatted_document += doc_import + '\n'

    for doc_functions in code_document['functions']:
        formatted_document += doc_functions + '\n'

    if 0 < len(code_document['dependencies']):
        formatted_document += 'code dependencies\n'

        for doc_dependency in code_document['dependencies']:
            formatted_document += doc_dependency + '\n'

    if code_document['name'] == 'global':
        formatted_document += code_document['name'] + ' code\n'
    else:
        formatted_document += 'function ' + code_document['name'] + ' code\n'
    
    for line in code_document['code'].splitlines():
        if not bool(line.strip()):
            continue
        doc_code = re.sub(r'#.*','', line)
        if not bool(doc_code.strip()):
            continue
        formatted_document += doc_code + '\n'    
    return formatted_document

def tree_create_python_code_and_function_documents(
    code_document: any
):
    PY_LANGUAGE = Language(tspython.language())
    parser = Parser(PY_LANGUAGE)

    tree = parser.parse(
        bytes(
            code_document,
            "utf8"
        )
    )

    root_node = tree.root_node
    code_imports = tree_extract_imports(
        root_node, 
        bytes(
            code_document, 
            'utf8'
        )
    )

    code_global = tree_extract_code_and_dependencies(
        root_node, 
        bytes(
            code_document, 
            'utf8'
        )
    )

    code_functions = tree_extract_functions_and_dependencies(
        root_node, 
        bytes(
            code_document, 
            'utf8'
        )
    )

    initial_documents = []
    for item in code_global:
        document = tree_create_code_document(
            code_imports = code_imports,
            code_functions = code_functions,
            function_item = item
        )  
        initial_documents.append(document)

    for item in code_functions:
        document = tree_create_code_document(
            code_imports = code_imports,
            code_functions = code_functions,
            function_item = item
        )  
        initial_documents.append(document)

    formatted_documents = []
    seen_functions = []
    for document in initial_documents:
        if not document['name'] == 'global':
            if document['name'] in seen_functions:
                continue
        
        formatted_document = tree_format_code_document(
            code_document = document
        )

        formatted_documents.append(formatted_document)
        seen_functions.append(document['name'])

    return formatted_documents

Overwriting ray/functions/tree_sitter.py


In [64]:
%%writefile ray/functions/store_documents.py

from functions.pygithub import pygithub_get_path_content
from functions.mongo_db import mongo_check_collection, mongo_create_document
from functions.create_documents import create_markdown_documents, create_python_documents, create_notebook_documents, create_yaml_documents

def store_repository_path_documents(
    mongo_client: any,
    github_token: any,
    repository_owner: str,
    repository_name: str,
    repository_path: str,
    database_name: str,
    collection_name: str
):
    collection_exists = mongo_check_collection(
        mongo_client = mongo_client, 
        database_name = database_name, 
        collection_name = collection_name
    )

    if collection_exists:
        return False

    target_content = ''
    try:
        target_content = pygithub_get_path_content(
            token = github_token,
            owner = repository_owner, 
            name = repository_name, 
            path = repository_path
        )
    except Exception as e:
        print(repository_path)
        print('Get content error')
        print(e)

    if target_content == '':
        return False

    path_split = repository_path.split('/')
    target_type = path_split[-1].split('.')[-1]
    
    target_documents = {}
    if target_type == 'md':
        try:
            target_documents = create_markdown_documents(
                markdown_text = target_content
            )
        except Exception as e:
            print(repository_path)
            print('Create markdown document error')
            print(e)
    if target_type == 'yaml':
        try:
            target_documents = create_yaml_documents(
                yaml_text = target_content
            )
        except Exception as e:
            print(repository_path)
            print('Create yaml document error')
            print(e)
    if target_type == 'py':
        try:
            target_documents = create_python_documents(
                python_text = target_content
            )
        except Exception as e:
            print(repository_path)
            print('Create python document error')
            print(e)
    if target_type == 'ipynb':
        try:
            target_documents = create_notebook_documents(
                notebook_text = target_content
            )
        except Exception as e:
            print(repository_path)
            print('Create notebook document error')
            print(e)
    if 0 < len(target_documents):
        for doc_type, docs in target_documents.items():
            for document in docs:
                result = mongo_create_document(
                    mongo_client = mongo_client,
                    database_name = database_name,
                    collection_name = collection_name,
                    document = document
                )
        return True
    return False

def get_github_storage_prefix(
    repository_owner: str,
    repository_name: str
) -> str:
    return repository_owner + '|' + repository_name + '|'

def store_github_repository_documents(
    mongo_client: any,
    github_token: str,
    repository_owner: str,
    repository_name: str,
    repository_paths: any
) -> any:
    paths = repository_paths['paths']
    for path in paths:
        path_split = path.split('/')
        document_database_name = get_github_storage_prefix(repository_owner, repository_name) + path_split[-1].split('.')[-1]
        
        document_collection_name = ''
        for word in path_split[:-1]:
            document_collection_name += word[:2] + '|'
        document_collection_name += path_split[-1].split('.')[0]

        stored = store_repository_path_documents(
            mongo_client = mongo_client,
            github_token = github_token,
            repository_owner = repository_owner,
            repository_name = repository_name,
            repository_path = path,
            database_name = document_database_name,
            collection_name = document_collection_name
        )

Overwriting ray/functions/store_documents.py


## Fetching and Storing Data

In [65]:
%%writefile ray/fetch-store-rag-data.py
import sys
import ray
import json

from functions.mongo_db import mongo_setup_client
from functions.minio_os import minio_setup_client
from functions.get_documents import get_github_repo_documents
from functions.store_documents import store_github_repository_documents

from importlib.metadata import version

@ray.remote
def fetch_and_store_data(
    storage_parameters: any,
    data_parameters: any
):
    try:
        print('Creating mongo client')
        document_client = mongo_setup_client(
            username = storage_parameters['mongo-username'],
            password = storage_parameters['mongo-password'],
            address = storage_parameters['mongo-address'],
            port = storage_parameters['mongo-port']
        )
        print('Mongo client created')

        print('Creating minio client')
        object_client = minio_setup_client(
            endpoint = storage_parameters['minio-endpoint'],
            username = storage_parameters['minio-username'],
            password = storage_parameters['minio-password']
        )
        print('Minio client created')
        
        github_token = data_parameters['github-token']
        repository_owner = data_parameters['repository-owner']
        repository_name = data_parameters['repository-name']
        object_bucket = data_parameters['object-bucket']
        repo_paths_object = data_parameters['repo-paths-object']
        relevant_files = data_parameters['relevant-files']
        replace = data_parameters['replace']

        print('Getting repository paths')
        
        repository_paths = get_github_repo_documents(
            object_client = object_client,
            github_token = github_token,
            repository_owner = repository_owner,
            repository_name = repository_name,
            object_bucket = object_bucket,
            repo_paths_object = repo_paths_object,
            relevant_files = relevant_files,
            replace = replace
        )

        print('Storing repository documents')

        store_github_repository_documents(
            mongo_client = document_client,
            github_token = github_token,
            repository_owner = repository_owner, 
            repository_name = repository_name, 
            repository_paths = repository_paths
        )

        print('Documents stored')
        
        return True
    except Exception as e:
        print('Fetch and store error')
        print(e)
        return False

if __name__ == "__main__":
    print('Starting ray job')
    print('Python version is:' + str(sys.version))
    print('Ray version is:' + version('Ray'))
    print('PyGithub version is:' + version('PyGithub'))
    print('PyMongo version is:' + version('PyMongo'))
    print('Markdown version is:' + version('Markdown'))
    print('Tree-sitter version is:' + version('tree-sitter'))
    print('Tree-sitter-python version is:' + version('tree-sitter-python'))
    print('BeautifulSoup version is:' + version('beautifulsoup4'))
    print('NBformat version is:' + version('nbformat'))
    
    input = json.loads(sys.argv[1])

    storage_parameters = input['storage-parameters']
    data_parameters = input['data-parameters']

    print('Running fetch and store')
    
    fetch_store_status = ray.get(fetch_and_store_data.remote(
        storage_parameters = storage_parameters,
        data_parameters = data_parameters
    ))

    print('Fetch and store success:' + str(fetch_store_status))

    print('Ray job Complete')

Overwriting ray/fetch-store-rag-data.py


In [50]:
ray_client = setup_ray(
    services = {
        'ray-dashboard': '127.0.0.1:8265'
    },
    timeout = 10
)

In [51]:
storage_parameters = {
    'mongo-username': 'mongo123',
    'mongo-password': 'mongo456',
    'mongo-address': 'mongodb-service.llm.svc.cluster.local',
    'mongo-port': '27017',
    'minio-endpoint': 'mlflow-minio-service.mlflow.svc.cluster.local:9000',
    'minio-username': 'minioadmin',
    'minio-password': 'minioadmin'
}

In [52]:
from decouple import Config,RepositoryEnv
env_path = '/home/sfniila/.ssh/.env'
env_config = Config(RepositoryEnv(env_path))
github_token = env_config.get('GITHUB_TOKEN')

In [53]:
repository_owner = 'K123AsJ0k1'
repository_name = 'cloud-hpc-oss-mlops-platform'
object_bucket = 'llm-rag'
repo_paths_object = repository_name + '-paths'

In [54]:
data_parameters = {
    'github-token': github_token,
    'repository-owner': repository_owner,
    'repository-name': repository_name,
    'object-bucket': object_bucket,
    'repo-paths-object': repo_paths_object,
    'relevant-files': [
        'md',
        'yaml',
        'py',
        'ipynb'
    ],
    'replace': 'false'
}

In [55]:
fetch_store_parameters = {
    'storage-parameters': storage_parameters,
    'data-parameters': data_parameters
}

In [66]:
job_status = ray_job_handler(
    ray_client = ray_client,
    ray_parameters = fetch_store_parameters,
    ray_job_file = 'fetch-store-rag-data.py',
    ray_directory = '/home/sfniila/Project/cloud-hpc-oss-mlops-platform/applications/development/LLMs/pipeline/preprocessing/ray',
    ray_job_envs = {},
    ray_job_packages = [
        'pymongo',
        'minio',
        'PyGithub',
        'Markdown',
        'tree-sitter==0.23.0',
        'tree-sitter-python==0.23.0',
        'beautifulsoup4',
        'nbformat'
    ],
    timeout = 8000
)

2024-11-08 12:09:27,789	INFO dashboard_sdk.py:338 -- Uploading package gcs://_ray_pkg_de37e93f8a241cd3.zip.
2024-11-08 12:09:27,792	INFO packaging.py:530 -- Creating a file package for local directory '/home/sfniila/Project/cloud-hpc-oss-mlops-platform/applications/development/LLMs/pipeline/preprocessing/ray'.


Ray batch job id: raysubmit_JQRmyEAcV9VZBh1J
status: PENDING
status: RUNNING
status: RUNNING
status: RUNNING
status: RUNNING
status: RUNNING
status: RUNNING
status: RUNNING
status: RUNNING
status: RUNNING
status: RUNNING
status: RUNNING
status: RUNNING
status: SUCCEEDED
Ray batch job ended:
Ray batch job succeeded
2024-11-08 02:09:27,897	INFO job_manager.py:529 -- Runtime env is setting up.
Starting ray job
Python version is:3.12.7 | packaged by Anaconda, Inc. | (main, Oct  4 2024, 13:27:36) [GCC 11.2.0]
Ray version is:2.38.0
PyGithub version is:2.5.0
PyMongo version is:4.10.1
Markdown version is:3.7
Tree-sitter version is:0.23.0
Tree-sitter-python version is:0.23.0
BeautifulSoup version is:4.12.3
NBformat version is:5.10.4
Running fetch and store
2024-11-08 02:09:31,332	INFO worker.py:1491 -- Using address 10.244.0.40:6379 set in the environment variable RAY_ADDRESS
2024-11-08 02:09:31,332	INFO worker.py:1631 -- Connecting to existing Ray cluster at address: 10.244.0.40:6379...
2024-1

In [None]:
%%writefile ray/preprocess-rag-data.py
import sys
import ray
import json

from pymongo import MongoClient as mc

import re

from pymongo import ASCENDING

from langchain_text_splitters import (
    RecursiveCharacterTextSplitter,
)
from langchain_huggingface import HuggingFaceEmbeddings
import hashlib
import uuid
import re
from qdrant_client import QdrantClient as qc
from qdrant_client import models
from qdrant_client.models import VectorParams, Distance
from qdrant_client.models import PointStruct
import spacy

import io
import pickle
from minio import Minio
import meilisearch as ms

def get_github_storage_prefix(
    repository_owner: str,
    repository_name: str
) -> str:
    return repository_owner + '|' + repository_name + '|'

def mongo_is_client(
    storage_client: any
) -> any:
    return isinstance(storage_client, mc.Connection)

def mongo_setup_client(
    username: str,
    password: str,
    address: str,
    port: str
) -> any:
    connection_prefix = 'mongodb://(username):(password)@(address):(port)/'
    connection_address = connection_prefix.replace('(username)', username)
    connection_address = connection_address.replace('(password)', password)
    connection_address = connection_address.replace('(address)', address)
    connection_address = connection_address.replace('(port)', port)
    mongo_client = mc(
        host = connection_address
    )
    return mongo_client

def mongo_get_database(
    mongo_client: any,
    database_name: str
) -> any:
    try:
        database = mongo_client[database_name]
        return database
    except Exception as e:
        return None

def mongo_check_database(
    mongo_client: any, 
    database_name: str
) -> bool:
    try:
        database_exists = database_name in mongo_client.list_database_names()
        return database_exists
    except Exception as e:
        return False

def mongo_list_databases(
    mongo_client: any
) -> any:
    try:
        databases = mongo_client.list_database_names()
        return databases
    except Exception as e:
        return []

def mongo_remove_database(
    mongo_client: any, 
    database_name: str
) -> bool:
    try:
        mongo_client.drop_database(database_name)
        return True
    except Exception as e:
        return False

def mongo_get_collection(
    mongo_client: any, 
    database_name: str, 
    collection_name: str
) -> bool:
    try:
        database = mongo_get_database(
            mongo_client = mongo_client,
            database_name = database_name
        )
        collection = database[collection_name]
        return collection
    except Exception as e:
        return None
    
def mongo_check_collection(
    mongo_client: any, 
    database_name: any, 
    collection_name: any
) -> bool:
    try:
        database = mongo_client[database_name]
        collection_exists = collection_name in database.list_collection_names()
        return collection_exists
    except Exception as e:
        return False

def mongo_update_collection(
    mongo_client: any, 
    database_name: str, 
    collection_name: str, 
    filter_query: any, 
    update_query: any
) -> any:
    try:
        collection = mongo_get_collection(
            mongo_client = mongo_client, 
            database_name = database_name, 
            collection_name = collection_name
        )
        result = collection.update_many(filter_query, update_query)
        return result
    except Exception as e:
        return None

def mongo_list_collections(
    mongo_client: any, 
    database_name: str
) -> bool:
    try:
        database = mongo_get_database(
            mongo_client = mongo_client,
            database_name = database_name
        )
        collections = database.list_collection_names()
        return collections
    except Exception as e:
        return []

def mongo_remove_collection(
    mongo_client: any, 
    database_name: str, 
    collection_name: str
) -> bool:
    try: 
        database = mongo_get_database(
            mongo_client = mongo_client,
            database_name = database_name
        )
        database.drop_collection(collection_name)
        return True
    except Exception as e:
        return False

def mongo_create_document(
    mongo_client: any, 
    database_name: str, 
    collection_name: str, 
    document: any
) -> any:
    try: 
        collection = mongo_get_collection(
            mongo_client = mongo_client, 
            database_name = database_name, 
            collection_name = collection_name
        )
        result = collection.insert_one(document)
        return result
    except Exception as e:
        return None

def mongo_get_document(
    mongo_client: any, 
    database_name: str, 
    collection_name: str, 
    filter_query: any
):
    try: 
        collection = mongo_get_collection(
            mongo_client = mongo_client, 
            database_name = database_name, 
            collection_name = collection_name
        )
        document = collection.find_one(filter_query)
        return document
    except Exception as e:
        print(e)
        return None 

def mongo_list_documents(
    mongo_client: any, 
    database_name: str, 
    collection_name: str, 
    filter_query: any,
    sorting_query: any
) -> any:
    try: 
        collection = mongo_get_collection(
            mongo_client = mongo_client, 
            database_name = database_name, 
            collection_name = collection_name
        )
        documents = list(collection.find(filter_query).sort(sorting_query))
        return documents
    except Exception as e:
        return []

def mongo_update_document(
    mongo_client: any, 
    database_name: any, 
    collection_name: any, 
    filter_query: any, 
    update_query: any
) -> any:
    try: 
        collection = mongo_get_collection(
            mongo_client = mongo_client, 
            database_name = database_name, 
            collection_name = collection_name
        )
        result = collection.update_one(filter_query, update_query)
        return result
    except Exception as e:
        return None

def mongo_remove_document(
    mongo_client: any, 
    database_name: str, 
    collection_name: str, 
    filter_query: any
) -> bool:
    try: 
        collection = mongo_get_collection(
            mongo_client = mongo_client, 
            database_name = database_name, 
            collection_name = collection_name
        )
        result = collection.delete_one(filter_query)
        return result
    except Exception as e:
        return None
    
def get_stored_documents(
    mongo_client: any,
    database_prefix: str
) -> any:
    storage_structure = {}
    database_list = mongo_list_databases(
        mongo_client = mongo_client
    )
    for database_name in database_list:
        if database_prefix in database_name:
            collection_list = mongo_list_collections(
                mongo_client = mongo_client,
                database_name = database_name
            )
            storage_structure[database_name] = collection_list
    
    storage_documents = {}
    for database_name, collections in storage_structure.items():
        if not database_name in storage_documents:
            storage_documents[database_name] = {}
        for collection_name in collections:
            collection_documents = mongo_list_documents(
                mongo_client = mongo_client,
                database_name = database_name,
                collection_name = collection_name,
                filter_query = {},
                sorting_query = [
                    ('index', ASCENDING),
                    ('sub-index', ASCENDING)
                ]
            )
            storage_documents[database_name][collection_name] = collection_documents
            
    return storage_documents

def is_minio_client(
    storage_client: any
) -> bool:
    return isinstance(storage_client, Minio)

def setup_minio(
    endpoint: str,
    username: str,
    password: str
) -> any:
    minio_client = Minio(
        endpoint = endpoint, 
        access_key = username, 
        secret_key = password,
        secure = False
    )
    return minio_client

def pickle_data(
    data: any
) -> any:
    pickled_data = pickle.dumps(data)
    length = len(pickled_data)
    buffer = io.BytesIO()
    buffer.write(pickled_data)
    buffer.seek(0)
    return buffer, length

def unpickle_data(
    pickled: any
) -> any:
    return pickle.loads(pickled)

def minio_create_bucket(
    minio_client: any,
    bucket_name: str
) -> bool: 
    try:
        minio_client.make_bucket(
            bucket_name = bucket_name
        )
        return True
    except Exception as e:
        print('MinIO bucket creation error')
        print(e)
        return False
    
def minio_check_bucket(
    minio_client: any,
    bucket_name:str
) -> bool:
    try:
        status = minio_client.bucket_exists(
            bucket_name = bucket_name
        )
        return status
    except Exception as e:
        print('MinIO bucket checking error')
        print(e)
        return False 
       
def minio_delete_bucket(
    minio_client: any,
    bucket_name:str
) -> bool:
    try:
        minio_client.remove_bucket(
            bucket_name = bucket_name
        )
        return True
    except Exception as e:
        print('MinIO bucket deletion error')
        print(e)
        return False
# Works
def minio_create_object(
    minio_client: any,
    bucket_name: str, 
    object_path: str, 
    data: any,
    metadata: dict
) -> bool: 
    # Be aware that MinIO objects have a size limit of 1GB, 
    # which might result to large header error    
    
    try:
        buffer, length = pickle_data(
            data = data
        )

        minio_client.put_object(
            bucket_name = bucket_name,
            object_name = object_path,
            data = buffer,
            length = length,
            metadata = metadata
        )
        return True
    except Exception as e:
        print('MinIO object creation error')
        print(e)
        return False
# Works
def minio_check_object(
    minio_client: any,
    bucket_name: str, 
    object_path: str
) -> any: 
    try:
        object_info = minio_client.stat_object(
            bucket_name = bucket_name,
            object_name = object_path
        )      
        return object_info
    except Exception as e:
        return {}
# Works
def minio_delete_object(
    minio_client: any,
    bucket_name: str, 
    object_path: str
) -> bool: 
    try:
        minio_client.remove_object(
            bucket_name = bucket_name, 
            object_name = object_path
        )
        return True
    except Exception as e:
        print('MinIO object deletion error')
        print(e)
        return False
# Works
def minio_update_object(
    minio_client: any,
    bucket_name: str, 
    object_path: str, 
    data: any,
    metadata: dict
) -> bool:  
    remove = minio_delete_object(
        minio_client = minio_client,
        bucket_name = bucket_name,
        object_path = object_path
    )
    if remove:
        return minio_create_object(
            minio_client = minio_client, 
            bucket_name = bucket_name, 
            object_path = object_path, 
            data = data,
            metadata = metadata
        )
    return False
# works
def minio_create_or_update_object(
    minio_client: any,
    bucket_name: str, 
    object_path: str, 
    data: any, 
    metadata: dict
) -> bool:
    bucket_status = minio_check_bucket(
        minio_client = minio_client,
        bucket_name = bucket_name
    )
    if not bucket_status:
        creation_status = minio_create_bucket(
            minio_client = minio_client,
            bucket_name = bucket_name
        )
        if not creation_status:
            return False
    object_status = minio_check_object(
        minio_client = minio_client,
        bucket_name = bucket_name, 
        object_path = object_path
    )
    if not object_status:
        return minio_create_object(
            minio_client = minio_client,
            bucket_name = bucket_name, 
            object_path = object_path, 
            data = data, 
            metadata = metadata
        )
    else:
        return minio_update_object(
            minio_client = minio_client,
            bucket_name = bucket_name, 
            object_path = object_path, 
            data = data, 
            metadata = metadata
        )
# Works
def minio_get_object_list(
    minio_client: any,
    bucket_name: str,
    path_prefix: str
) -> any:
    try:
        objects = minio_client.list_objects(
            bucket_name = bucket_name, 
            prefix = path_prefix, 
            recursive = True
        )
        return objects
    except Exception as e:
        return None  
    
def minio_get_object_data_and_metadata(
    minio_client: any,
    bucket_name: str, 
    object_path: str
) -> any:
    try:
        given_object_data = minio_client.get_object(
            bucket_name = bucket_name, 
            object_name = object_path
        )
        
        # There seems to be some kind of a limit
        # with the amount of request a client 
        # can make, which is why this variable
        # is set here to give more time got the client
        # to complete the request

        given_data = unpickle_data(
            pickled = given_object_data.data
        )
        
        given_object_info = minio_client.stat_object(
            bucket_name = bucket_name, 
            object_name = object_path
        )
        
        given_metadata = given_object_info.metadata
        
        return {'data': given_data, 'metadata': given_metadata}
    except Exception as e:
        print('MinIO object fetching error')
        print(e)
        return None

def langchain_create_code_chunks(
    language: any,
    chunk_size: int,
    chunk_overlap: int,
    document: any
) -> any:
    splitter = RecursiveCharacterTextSplitter.from_language(
        language = language,
        chunk_size = chunk_size, 
        chunk_overlap = chunk_overlap
    )

    code_chunks = splitter.create_documents([document])
    code_chunks = [doc.page_content for doc in code_chunks]
    return code_chunks

def langchain_create_text_chunks(
    chunk_size: int,
    chunk_overlap: int,
    document: any
) -> any:
    splitter = RecursiveCharacterTextSplitter(
        chunk_size = chunk_size, 
        chunk_overlap = chunk_overlap,
        length_function = len,
        is_separator_regex = False
    )

    text_chunks = splitter.create_documents([document])
    text_chunks = [doc.page_content for doc in text_chunks]
    return text_chunks

def langchain_create_chunk_embeddings(
    model_name: str,
    chunks: any
) -> any:
    embedding_model = HuggingFaceEmbeddings(
        model_name = model_name
    )
    chunk_embeddings = embedding_model.embed_documents(
        texts = chunks
    )
    return chunk_embeddings

def qdrant_is_client(
    storage_client: any
) -> any:
    try:
        return isinstance(storage_client, qc.Connection)
    except Exception as e:
        return False

def qdrant_setup_client(
    api_key: str,
    address: str, 
    port: str
) -> any:
    try:
        qdrant_client = qc(
            host = address,
            port = int(port),
            api_key = api_key,
            https = False
        ) 
        return qdrant_client
    except Exception as e:
        return None

def qdrant_create_collection(
    qdrant_client: any, 
    collection_name: str,
    configuration: any
) -> any:
    try:
        result = qdrant_client.create_collection(
            collection_name = collection_name,
            vectors_config = configuration
        )
        return result
    except Exception as e:
        print(e)
        return None

def qdrant_get_collection(
    qdrant_client: any, 
    collection_name: str
) -> any:
    try:
        collection = qdrant_client.get_collection(
            collection_name = collection_name
        )
        return collection
    except Exception as e:
        return None

def qdrant_list_collections(
    qdrant_client: any
) -> any:
    try:
        collections = qdrant_client.get_collections()
        collection_list = []
        for description in collections.collections:
            collection_list.append(description.name)
        return collection_list
    except Exception as e:
        return []

def qdrant_remove_collection(
    qdrant_client: any, 
    collection_name: str
) -> bool:
    try:
        qdrant_client.delete_collection(collection_name)
        return True
    except Exception as e:
        return False

def qdrant_upsert_points(
    qdrant_client: qc, 
    collection_name: str,
    points: any
) -> any:
    try:
        results = qdrant_client.upsert(
            collection_name = collection_name, 
            points = points
        )
        return results
    except Exception as e:
        print(e)
        return None

def qdrant_search_data(
    qdrant_client: qc,  
    collection_name: str,
    scroll_filter: any,
    limit: str
) -> any:
    try:
        hits = qdrant_client.scroll(
            collection_name = collection_name,
            scroll_filter = scroll_filter,
            limit = limit
        )
        return hits
    except Exception as e:
        print(e)
        return []

def qdrant_search_vectors(
    qdrant_client: qc,  
    collection_name: str,
    query_vector: any,
    limit: str
) -> any:
    try:
        hits = qdrant_client.search(
            collection_name = collection_name,
            query_vector = query_vector,
            limit = limit
        )
        return hits
    except Exception as e:
        return []

def qdrant_remove_vectors(
    qdrant_client: qc,  
    collection_name: str, 
    vectors: str
) -> bool:
    try:
        results = qdrant_client.delete_vectors(
            collection_name = collection_name,
            vectors = vectors
        )
        return results
    except Exception as e:
        print(f"Error removing document: {e}")
        return None

def create_document_packet(
    document: any,
    configuration: any,
) -> any:
    document_type = document['type']
    used_configuration = configuration[document_type]
    
    document_chunks = []
    if document_type == 'python':
        document_chunks = langchain_create_code_chunks(
            language = used_configuration['language'],
            chunk_size = used_configuration['chunk-size'],
            chunk_overlap = used_configuration['chunk-overlap'],
            document = document['data']
        )
    if document_type == 'text' or document_type == 'yaml' or document_type == 'markdown':
        document_chunks = langchain_create_text_chunks(
            chunk_size = used_configuration['chunk-size'],
            chunk_overlap = used_configuration['chunk-overlap'],
            document = document['data']
        )
    # This needs to remove empty chunks
    filtered_chunks = []
    for chunk in document_chunks:
        if chunk.strip():
            filtered_chunks.append(chunk)
        
    vector_embedding = langchain_create_chunk_embeddings(
        model_name = used_configuration['model-name'],
        chunks = filtered_chunks
    )

    packet = {
        'chunks': filtered_chunks,
        'embeddings': vector_embedding
    }
    
    return packet

def format_chunk(
    document_chunk: any
) -> any:
    chunk = re.sub(r'[^\w\s]', '', document_chunk)
    chunk = re.sub(r'\s+', ' ', chunk) 
    chunk = chunk.strip()
    chunk = chunk.lower()
    # This helps to remove unique hashes for duplicates such as:
    # task_id = task_id )
    # task_id = task_id 
    # task_id = task_id )
    return chunk

def generate_chunk_hash(
    document_chunk: any
) -> any:
    cleaned_chunk = format_chunk(
        document_chunk = document_chunk
    )
    return hashlib.md5(cleaned_chunk.encode('utf-8')).hexdigest()

def generate_document_vectors(
    qdrant_client: any,
    document_database: any,
    document_collection: any,
    document_type: any,
    document_id: str, 
    document_chunks: any,
    document_embeddings: any,
    vector_collection: any
):
    vector_points = []
    vector_index = 0
    added_hashes = []
    for chunk in document_chunks:
        vector_id = document_id + '-' + str(vector_index + 1)
        vector_uuid = str(uuid.uuid5(uuid.NAMESPACE_DNS, vector_id))

        chunk_hash = generate_chunk_hash(
            document_chunk = chunk
        )
        
        existing_chunks = qdrant_search_data(
            qdrant_client = qdrant_client,
            collection_name = vector_collection,
            scroll_filter = models.Filter(
                must = [
                    models.FieldCondition(
                        key = 'chunk_hash',
                        match = models.MatchValue(
                            value = chunk_hash
                        )
                    )
                ]
            ),
            limit = 1
        )
        # Removes duplicates
        if len(existing_chunks[0]) == 0:
            if not chunk_hash in added_hashes:
                given_vector = document_embeddings[vector_index]

                chunk_point = PointStruct(
                    id = vector_uuid, 
                    vector = given_vector,
                    payload = {
                        'database': document_database,
                        'collection': document_collection,
                        'document': document_id,
                        'type': document_type,
                        'chunk': chunk,
                        'chunk_hash': chunk_hash
                    }
                )
                added_hashes.append(chunk_hash)
                vector_points.append(chunk_point)
        vector_index += 1
    return vector_points

def create_document_vectors(
    qdrant_client: any,
    document_database,
    document_collection,
    document: any,
    configuration: any,
    vector_collection: str
) -> bool:
    document_id = str(document['_id'])
    document_type = document['type']

    document_packet = {}
    try:
        document_packet = create_document_packet(
            document = document,
            configuration = configuration
        )
    except Exception as e:
        print(document_database,document_collection,document_id)
        print(e)

    if 0 == len(document_packet):
        return []
        
    document_chunks = document_packet['chunks']
    document_embeddings = document_packet['embeddings']
    
    if 0 == len(document_embeddings):
        return []
    
    vector_collections = qdrant_list_collections(
        qdrant_client = qdrant_client
    )
    
    collection_created = None
    if not vector_collection in vector_collections:
        collection_configuration = VectorParams(
            size = len(document_embeddings[0]), 
            distance = Distance.COSINE
        )
        collection_created = qdrant_create_collection(
            qdrant_client = qdrant_client,
            collection_name = vector_collection,
            configuration = collection_configuration
        )

    vector_points = generate_document_vectors(
        qdrant_client = qdrant_client,
        document_database = document_database,
        document_collection = document_collection,
        document_type = document_type,
        document_id = document_id,
        document_chunks = document_chunks,
        document_embeddings = document_embeddings,
        vector_collection = vector_collection
    )

    return vector_points

def store_vectors(
    minio_client: any,
    qdrant_client: any,
    configuration: any,
    storage_documents: any
):
    print('Storing vectors')
    
    used_object_bucket = configuration['object-bucket']
    used_object_path = configuration['object-path']
    
    identities_exists = minio_check_object(
        minio_client = minio_client,
        bucket_name = used_object_bucket, 
        object_path = used_object_path
    )

    document_identities = []
    # doesn't work
    if not len(identities_exists) == 0:
        document_identities = minio_get_object_data_and_metadata(
            minio_client = minio_client,
            bucket_name = used_object_bucket, 
            object_path = used_object_path
        )['data']
    
    amount_of_databases = len(storage_documents)
    database_index = 1
    for document_database, document_collections in storage_documents.items():
        vector_collection = document_database.replace('|','-') + '-embeddings'
        amount_of_collections = len(document_collections)
        collection_index = 1
        database_vectors = []
        for document_collection, documents in document_collections.items():
            #amount_of_documents = len(documents)
            for document in documents:
                document_identity = document_database + '-' + document_collection + '-' + str(document['_id'])

                if document_identity in document_identities:
                    continue
                    
                document_vectors = create_document_vectors(
                    qdrant_client = qdrant_client,
                    document_database = document_database,
                    document_collection = document_collection,
                    document = document,
                    configuration = configuration,
                    vector_collection = vector_collection
                )

                if 0 < len(document_vectors):
                    database_vectors.extend(document_vectors)
                    document_identities.append(document_identity)
            print('Collections: ' + str(collection_index) + '|' + str(amount_of_collections))
            collection_index += 1

        points_stored = qdrant_upsert_points(
            qdrant_client = qdrant_client, 
            collection_name = vector_collection,
            points = database_vectors
        )
        print('Databases: ' + str(database_index) + '|' + str(amount_of_databases))
        database_index += 1

    minio_create_or_update_object(
        minio_client = minio_client,
        bucket_name = used_object_bucket, 
        object_path = used_object_path,
        data = document_identities, 
        metadata = {}
    )
    
    print('Vectors stored')

def spacy_find_keywords(
    text: str
):   
    nlp = spacy.load("en_core_web_sm")
    formatted = nlp(text.lower())
    
    keywords = [
        token.lemma_ for token in formatted
        if not token.is_stop               
        and not token.is_punct              
        and not token.is_space              
        and len(token) > 1                  
    ]
    
    keywords = list(set(keywords))
    
    return keywords

def meili_is_client(
    storage_client: any
) -> any:
    try:
        return isinstance(storage_client, ms.Connection)
    except Exception as e:
        print(e)
        return False

def meili_setup_client(
    api_key: str,
    host: str
) -> any:
    try:
        meili_client = ms.Client(
            url = host, 
            api_key = api_key
        )
        return meili_client 
    except Exception as e:
        print(e)
        return None

def meili_get_index( 
    meili_client: any, 
    index_name: str
) -> any:
    try:
        index = meili_client.index(
            uid = index_name
        )
        return index
    except Exception as e:
        print(e)
        return None
    
def meili_check_index(
    meili_client: any, 
    index_name: str
) -> bool:
    try:
        meili_client.get_index(
            uid = index_name
        )
        return True
    except Exception as e:
        print(e)
        return False
    
def meili_remove_index(
    meili_client: any, 
    index_name: str
) -> bool:
    try:
        response = meili_client.index(
            index_name = index_name
        ).delete()
        return response
    except Exception as e:
        print(e)
        return None
    
def meili_list_indexes(
    meili_client: any
) -> bool:
    try:
        names = []
        indexes = meili_client.get_indexes()
        for index in indexes['results']:
            names.append(index.uid)
        return names
    except Exception as e:
        print(e)
        return None

def meili_add_documents(
    meili_client: any, 
    index_name: str, 
    documents: any
) -> any:
    try:
        index = meili_get_index(
            meili_client = meili_client,
            index_name = index_name
        )
        response = index.add_documents(
            documents = documents
        )
        return response
    except Exception as e:
        print(e)
        return None

def meili_set_filterable(
    meili_client: any, 
    index_name: str, 
    attributes: any
) -> any:
    try:
        index = meili_get_index(
            meili_client = meili_client,
            index_name = index_name
        )
        response = index.update_filterable_attributes(attributes)
        return response
    except Exception as e:
        print(e)
        return None

def meili_search_documents(
    meili_client: any, 
    index_name: str, 
    query: any, 
    options: any
) -> any:
    try:
        index = meili_get_index(
            meili_client = meili_client,
            index_name = index_name
        )
        response = index.search(
            query,
            options
        )
        return response
    except Exception as e:
        print(e)
        return None
    
def meili_update_documents(
    meili_client, 
    index_name, 
    documents
) -> any:
    try:
        index = meili_client.index(
            index_name = index_name
        )
        response = index.update_documents(
            documents = documents
        )
        return response
    except Exception as e:
        print(e)
        return None

def meili_delete_documents(
    meili_client: any, 
    index_name: str, 
    ids: any
) -> any:
    try:
        index = meili_client.index(
            index_name = index_name
        )
        response = index.delete_documents(
            document_ids = ids
        )
        return response
    except Exception as e:
        print(e)
        return None

def generate_keyword_uuid(
    document_id: str,
    document_index: int
) -> str:
    keyword_id = document_id + '-' + str(document_index + 1)
    keyword_uuid = str(uuid.uuid5(uuid.NAMESPACE_DNS, keyword_id))
    return keyword_uuid

def create_document_keywords(
    document_database: str,
    document_collection: str,
    document: any,
    document_index: int
) -> any:
    document_id = str(document['_id'])
    document_data = document['data']
    document_type = document['type']

    document_keywords = []
    try:
        document_keywords = spacy_find_keywords(
            text = document_data
        )
    except Exception as e:
        print(e)

    if 0 == len(document_keywords):
        return {}
    
    keyword_uuid = generate_keyword_uuid(
        document_id = document_id,
        document_index = document_index
    ) 

    payload = {
        'id': keyword_uuid,
        'database': document_database,
        'collection': document_collection,
        'document': document_id,
        'type': document_type,
        'keywords': document_keywords
    }

    return payload
    
def store_keywords(
    minio_client: any,
    meili_client: any,
    configuration: any,
    storage_documents: any
):
    print('Storing keywords')

    used_object_bucket = configuration['object-bucket']
    used_object_path = configuration['object-path']
    
    identities_exists = minio_check_object(
        minio_client = minio_client,
        bucket_name = used_object_bucket, 
        object_path = used_object_path
    )

    document_identities = []
    # doesn't work
    if not len(identities_exists) == 0:
        document_identities = minio_get_object_data_and_metadata(
            minio_client = minio_client,
            bucket_name = used_object_bucket, 
            object_path = used_object_path
        )['data']

    amount_of_databases = len(storage_documents)
    database_index = 1
    for document_database, collections in storage_documents.items():
        keyword_collection = document_database.replace('|','-') + '-keywords'
        database_keywords = []
        collection_index = 1
        amount_of_collections = len(collections)
        for document_collection, documents in collections.items():
            document_index = 1
            for document in documents:
                document_identity = document_database + '-' + document_collection + '-' + str(document['_id'])

                if document_identity in document_identities:
                    continue

                document_keywords = create_document_keywords(
                    document_database = document_database,
                    document_collection = document_collection,
                    document = document,
                    document_index = document_index
                )

                if 0 < len(document_keywords):
                    database_keywords.append(document_keywords)
                    document_identities.append(document_identity)
            print('Collections: ' + str(collection_index) + '|' + str(amount_of_collections))
            collection_index += 1
        #print(database_keywords)
        stored = meili_add_documents(
            meili_client = meili_client,
            index_name = keyword_collection,
            documents = database_keywords
        )

        meili_set_filterable(
            meili_client = meili_client, 
            index_name = keyword_collection, 
            attributes = ['keywords']
        )
        
        print('Databases: ' + str(database_index) + '|' + str(amount_of_databases))
        database_index += 1
        
    minio_create_or_update_object(
        minio_client = minio_client,
        bucket_name = used_object_bucket, 
        object_path = used_object_path,
        data = document_identities, 
        metadata = {}
    )

@ray.remote
def preprocess_data(
    document_client: any,
    object_client: any,
    vector_client: any,
    search_client: any,
    repository_owner: str,
    repository_name: str,
    configuration: any
):
    try:
        stored_documents = get_stored_documents(
            mongo_client = document_client,
            database_prefix = get_github_storage_prefix(
                repository_owner = repository_owner,
                repository_name = repository_name
            )
        )
        
        store_vectors(
            minio_client = object_client,
            qdrant_client = vector_client,
            configuration = configuration,
            storage_documents = stored_documents
        )

        store_keywords(
            minio_client = object_client,
            meili_client = search_client,
            configuration = configuration,
            storage_documents = stored_documents
        )
        
        return True
    except Exception as e:
        print(e)
        return False

if __name__ == "__main__":
    print('Starting ray job')
    print('Ray version is:' + str(ray.__version__))
    
    input = json.loads(sys.argv[1])

    storage_parameters = input['storage-parameters']
    data_parameters = input['data-parameters']

    mongo_client = mongo_setup_client(
        username = storage_parameters['mongo-username'],
        password = storage_parameters['mongo-password'],
        address = storage_parameters['mongo-address'],
        port = storage_parameters['mongo-port']
    )

    minio_client = setup_minio(
        endpoint = storage_parameters['minio-endpoint'],
        username = storage_parameters['minio-username'],
        password = storage_parameters['minio-password']
    )

    qdrant_client = qdrant_setup_client(
        api_key = storage_parameters['qdrant-key'],
        address = storage_parameters['qdrant-address'], 
        port = storage_parameters['qdrant-port']
    )

    meili_client = meili_setup_client(
        api_key = storage_parameters['meili-key'],
        host = storage_parameters['meili-host']
    )

    github_token = data_parameters['github-token']
    repository_owner = data_parameters['repository-owner']
    repository_name = data_parameters['repository-name']
    relevant_files = data_parameters['relevant-files']
    configuration = data_parameters['configuration']

    fetch_store_status = ray.get(preprocess_data.remote(
        document_client = mongo_client,
        object_client = minio_client,
        vector_client = qdrant_client,
        search_client = meili_client,
        repository_owner = repository_owner,
        repository_name = repository_name,
        configuration = configuration
    ))

    print('Preprocess success:' + str(fetch_store_status))

    print('Ray job Complete')