## Add initial data to COS from local directory

In [None]:
import os

import requests
import xmltodict
import io

import pandas as pd

from dotenv import load_dotenv

load_dotenv('./.env_load')

## Configurations and supporting functions

### Read in environmental variables

In [None]:
# urls
identityURL = os.getenv("IDENTITY_URL")
# buckets location the same as watsonx.data
buckets_location = os.getenv("COS_BUCKETS_LOCATION")
bucket_endpoint =  f"https://s3.{buckets_location}.cloud-object-storage.appdomain.cloud"

print("COS endpoint - the same as watsonx.data location", bucket_endpoint)

# local files directories

# files to be converted to parquet and uploaded into hive bucket
files_directory = '../../data/files'

# pdfs to be uploaded into input data bucket
pdfs_directory = '../../data/pdfs'

In [None]:
# credentials and configurations
cloud_api_key = os.getenv("CLOUD_API_KEY")
cos_instance_crn = os.getenv("COS_INSTANCE_CRN")

### Generate token and use it for the session

In [None]:
def generate_token():
    """To geneate user token for other requests"""
    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }

    payload = {
        'grant_type': f'urn:ibm:params:oauth:grant-type:apikey',
        'apikey': f"{cloud_api_key}"
    }

    res = requests.post(f'{identityURL}', headers=headers, data=payload, verify=False)
    if res.status_code in [200, 201, 202]:
        print("Successfully generated token")
    else:
        print("Code for token generation", res.status_code)
        print("Message", res.text)
    cur_string = res.json()
    access_token = cur_string['access_token']

    return access_token

### Functions to create sessions

In [None]:
def create_cos_session(access_token):
    s_cos = requests.Session()
    s_cos.headers.clear()
    headers_cos={
        "Authorization":"Bearer {}".format(access_token),
        "ibm-service-instance-id": cos_instance_crn
    }

    s_cos.headers.update(headers_cos) 
    print("COS API session created")
    return s_cos

### Create COS session and find hive and input-data buckets

In [None]:
cur_token = generate_token()
s_cos = create_cos_session(cur_token)

In [None]:
# Find input bucket name
ex_buckets_names = []
input_bucket = ""
hive_bucket = ""

r = s_cos.get(bucket_endpoint)
buckets_list = xmltodict.parse(r.text)['ListAllMyBucketsResult']['Buckets']

for bucket_name in buckets_list["Bucket"]:
    if bucket_name['Name'].startswith("input-data"):
        input_bucket = bucket_name['Name']
    if "hive" in bucket_name['Name']:
        hive_bucket = bucket_name['Name']
if input_bucket == "":
    print("Input bucket was not found, add name manually")
else:
    print(f"Identified input bucket as {input_bucket} in {bucket_endpoint}")

if hive_bucket == "":
    print("Hive bucket was not found, add name manually")
else:
    print(f"Identified hive bucket as {hive_bucket} in {bucket_endpoint}")

## Binary files upload - Hive bucket

In [None]:
# local directory with files


print(f"Will save data from {files_directory} to", hive_bucket, "at endpoint", bucket_endpoint)

In [None]:
files_dfs =  dict()
files_dfs['accounts'] = pd.read_csv(f'{files_directory}/accounts.csv')
files_dfs['holdings_up_2023'] = pd.read_csv(f'{files_directory}/holdings_up_2023.csv')

In [None]:
def save_data_cos(df_out: pd.DataFrame, output_path: str, convert_parquet=True) -> None:
    """
    Saves df_out as parquet or json file on output_path in COS
    """
    with io.BytesIO() as output:
        if convert_parquet:
            df_out.to_parquet(output, index=False)
        else:
            df_out.to_json(output, orient = 'records', index=False, lines=True)
        data_object = output.getvalue()

    url_new_file = f"{bucket_endpoint}/{hive_bucket}/{output_path}"
    r = s_cos.put(url_new_file, data=data_object)
    print("Status code", r.status_code)

### Convert data types and save as parquet in COS `HIVE_BUCKET`

In [None]:
for d_n in files_dfs:

    #data types original
    display(files_dfs[d_n].head())
    display(files_dfs[d_n].info())

    if 'customer_id' in files_dfs[d_n].columns:
        files_dfs[d_n]['customer_id'] = files_dfs[d_n]['customer_id'].astype('int32')

    if 'account_id' in files_dfs[d_n].columns:
        files_dfs[d_n]['account_id'] = files_dfs[d_n]['account_id'].astype('int32')

    if 'tax_liability' in files_dfs[d_n].columns:
        files_dfs[d_n]['tax_liability'] = files_dfs[d_n]['tax_liability'].astype('float')

    print('After datatypes conversions')
    display(files_dfs[d_n].head())
    display(files_dfs[d_n].info())

    save_data_cos(files_dfs[d_n], output_path = f'input_data_hive/{d_n}_ht/{d_n}.parquet')

### Save `tax_liability.json` to COS `HIVE_BUCKET`

In [None]:
json_f_name = 'tax_liability'
json_path = f'{files_directory}/{json_f_name}.json'
cur_df = pd.read_json(json_path, orient="records")
save_data_cos(cur_df,output_path = f'input_data_hive/{json_f_name}_ht/{json_f_name}.json', convert_parquet=False)

## Pdfs for RAG to input bucket

In [None]:
print(f"Will be adding data {pdfs_directory} to {input_bucket} in {bucket_endpoint}")

for root, dirs, files in os.walk(pdfs_directory):
    for file in files:
        if file.lower().endswith(".pdf"):
            local_path = os.path.join(root, file)
            output_path = os.path.join('pdfs', file).replace("\\", "/")
            url_new_file = f"{bucket_endpoint}/{input_bucket}/{output_path}"
            print(f"Uploading {local_path} to {url_new_file}")
            with open(local_path, "rb") as b_file:
                r = s_cos.put(url_new_file, data=b_file)
                print(f"Status code for {local_path} is {r.status_code}")
                if r.status_code not in [200, 201, 202]:
                    print("Didn' succeed, response is", r.text)