In [None]:
# @title Input File Path
file_path = "testFolder.zip" # @param {type:"string"}
destination_path = file_path.split('.zip')[0]

import base64
import requests
import os
import json
import zipfile

from google.colab import userdata

user_token = userdata.get('qb_user_token')

documents_dbid = "bt4p62yi8"
jobs_dbid = "bt4p2fsen"

documents_fields = {
    "Related Job": 9,
    "Store #": 10,
    "Document": 6,
    "Document Type": 7,
}
doc_fids = [6, 7, 9, 10]

jobs_fields = {
    "Record ID#": 3,
    "Store #": 14
}
jobs_fids = [3, 14]

### QB Calls

---



In [None]:
def parse_quickbase_api_response_clean_file(api_response):
    fields_by_id = {}

    for field in api_response['fields']:
        fields_by_id[field['id']] = field

    records = []
    for record in api_response['data']:
        result = {}

        for field_id, value in record.items():
            # print(value)
            field = fields_by_id.get(int(field_id))
            if field:
                label = field['label']
                if label == 'Record ID#':
                    label = 'rid'

                # Process field based on its type
                if field['type'] == 'file':
                    file_info = value['value']['versions'][0] if 'versions' in value['value'] and len(value['value']['versions']) > 0 else None
                    # print(file_info)
                    if file_info and  value['value']['url']:
                        # Only include URL and fileName, and only if URL is not empty
                        result[label] = {'url': value['value']['url'], 'fileName': file_info['fileName'], 'fid':field_id}
                else:
                    # For non-file fields, handle as before
                    value = value['value']
                    if isinstance(value, float) and value.is_integer():
                        value = int(value)
                    if(value == ''):
                        value = 'N/A'
                    result[label] = value

        records.append(result)

    return records


def get_records(user_token, dbid, fids, query, sortField=3):
  headers = {
    'QB-Realm-Hostname': 'apeximaging',
    'Authorization': 'QB-USER-TOKEN ' + user_token
  }
  body = {
      "from": dbid,
      "select": fids,
  }

  # print(body)
  r = requests.post(
  'https://api.quickbase.com/v1/records/query',
  headers = headers,
  json = body
  )
  # print(json.dumps(r.json(),indent=4))
  data = parse_quickbase_api_response_clean_file(r.json())
  return data

def get_base64(filename):
  data = open(filename, 'rb').read()
  base64_encoded = base64.b64encode(data).decode('UTF-8')

  return base64_encoded

def uploadToProjects(tid, upload_data):
  # data = open(filename, 'rb').read()
  # base64_encoded = base64.b64encode(data).decode('UTF-8')

  headers = {
    'QB-Realm-Hostname': 'apeximaging',
    'Authorization': 'QB-USER-TOKEN ' + user_token, #real key
  }

  body = {
  "to": tid,
  'data': upload_data
  }
    # Actual API Call
  r = requests.post(
  'https://api.quickbase.com/v1/records',
  headers = headers,
  json = body
  )

  if(r.status_code != 200):
    print('Oops it looks like there was an error with your query')
    print(r.status_code)
    print(r.json())
  else:
    return r.status_code


### Get Folders/Files


In [None]:
# Unzip file
def unzip_folder(zip_file_path, extract_dir):
    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
        zip_ref.extractall(extract_dir)


# Get all subfolders from a folder
def get_folders(folder_path):
    folders = [item for item in os.listdir(folder_path) if os.path.isdir(os.path.join(folder_path, item))]
    return folders

# Get all files from folder
def get_files(folder_path):
    files = [f'{folder_path}/{item}' for item in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, item)) and item != "Design Survey.pdf" and item!= "Punchlist Form.xlsx"]
    return files


# Move store numbers, cities, states into list
def get_store_info(folders):
  store_info = []
  for name in folders:
    store_number = name.split(' ')[0]
    store_address = name.split(' ')[1]

    new_folder_file_path = destination_path + "/" + destination_path + "/" + name
    # print(new_folder_file_path)

    store_info.append({"Store #": store_number, "Address": store_address, "File_path": new_folder_file_path})

  return store_info

# Move files into list
def get_file_info(store_info):
  file_info = []
  for store in store_info:
    file_list = get_files(store["File_path"])
    file_info.append({"Store #": store["Store #"], "File_list": file_list, "Parent_Path": store["File_path"]})

  return file_info



# Create new path for unzipped files
if not os.path.exists(destination_path):
  os.makedirs(destination_path)
  print("path did not exist")
else:
  print("path already existed")

# Unzip to new folder
unzip_folder(file_path, destination_path)
print("file should have been unzipped")

# Get all folder names
unzipped_path = str(destination_path)+"/"+str(destination_path)
folder_list = get_folders(unzipped_path)
print("all folder names obtained in folder_list")

# Get all store info
all_stores_in_week = get_store_info(folder_list)
print("\nAll Stores In Week:")
print(all_stores_in_week)

# Get all file info per store
file_list = get_file_info(all_stores_in_week)
print("\nFile List:")
print(file_list)

# Get QB Job Records
total_jobs_query = "{14.XEX.""}"    # store num XEX ""
qb_jobs = get_records(user_token, jobs_dbid, jobs_fids, total_jobs_query)
print("\nQB Jobs:")
print(qb_jobs)

path already existed
file should have been unzipped
all folder names obtained in folder_list

All Stores In Week:
[{'Store #': '33826', 'Address': 'Seattle,', 'File_path': 'testFolder/testFolder/33826 Seattle, WA'}, {'Store #': '33828', 'Address': 'Tacoma,', 'File_path': 'testFolder/testFolder/33828 Tacoma, WA'}, {'Store #': '33824', 'Address': 'Tacoma,', 'File_path': 'testFolder/testFolder/33824 Tacoma, WA'}, {'Store #': '5191', 'Address': 'Rockford,', 'File_path': 'testFolder/testFolder/5191 Rockford, IL'}, {'Store #': '33829', 'Address': 'Tacoma,', 'File_path': 'testFolder/testFolder/33829 Tacoma, WA'}]

File List:
[{'Store #': '33826', 'File_list': ['testFolder/testFolder/33826 Seattle, WA/2024.03.18_T-Ads_RMN_Installation_Guide-March_Stores.pdf'], 'Parent_Path': 'testFolder/testFolder/33826 Seattle, WA'}, {'Store #': '33828', 'File_list': [], 'Parent_Path': 'testFolder/testFolder/33828 Tacoma, WA'}, {'Store #': '33824', 'File_list': ['testFolder/testFolder/33824 Tacoma, WA/2024.03

In [None]:
# Map QB RID with the store #
qb_jobs_dict = {qb_job['Store #']: qb_job['rid'] for qb_job in qb_jobs}
print(qb_jobs_dict)

# iterate through file list to get 'rid: [store num, files]'
final_array = []
stores_not_in_AV = []

for file_dict in file_list:
  store_num = file_dict["Store #"]

  if store_num in qb_jobs_dict:
    rid = qb_jobs_dict[store_num]
    files = file_dict["File_list"]
    # print(f"Store #: {store_num}, RID: {rid}, Files: {files}")
    final_array.append({rid: [store_num, files]})
  else:
    stores_not_in_AV.append(store_num)

print('\nFinal Array: ')
print(final_array)

print('\nMissing Stores: ')
print(stores_not_in_AV)

{'33824': 1, '5191': 3, '33828': 4, '33829': 5, '33826': 6}
Store #: 33826, RID: 6, Files: ['testFolder/testFolder/33826 Seattle, WA/2024.03.18_T-Ads_RMN_Installation_Guide-March_Stores.pdf']
Store #: 33828, RID: 4, Files: []
Store #: 33824, RID: 1, Files: ['testFolder/testFolder/33824 Tacoma, WA/2024.03.18_T-Ads_RMN_Installation_Guide-March_Stores.pdf']
Store #: 5191, RID: 3, Files: []
Store #: 33829, RID: 5, Files: []

Final Array: 
[{6: ['33826', ['testFolder/testFolder/33826 Seattle, WA/2024.03.18_T-Ads_RMN_Installation_Guide-March_Stores.pdf']]}, {4: ['33828', []]}, {1: ['33824', ['testFolder/testFolder/33824 Tacoma, WA/2024.03.18_T-Ads_RMN_Installation_Guide-March_Stores.pdf']]}, {3: ['5191', []]}, {5: ['33829', []]}]

Missing Stores: 
[]


### Upload to QB

In [None]:
data_upload_array = []
stores_unable_to_upload = []

# Iterate through the final array and upload corresponding docs
for item in final_array:
  # Key - RID
  # Value[0] - Store #
  # Value[1] - File Array
  key, value = list(item.items())[0]

  for file in value[1]:
    # try to get base64 and add to upload array
    try:
      filename = file.split('/')[-1]
      base64_encoded = get_base64(file)

      temp_data = {
            '9':
            {
                'value': key
            },
            '6':
            {
              'value':
                {
                  'fileName': filename,
                  'data': base64_encoded,
                }

            }
        }
      data_upload_array.append(temp_data)

    except:
      print(f'Error: {filename} was unable to upload for store #{value[0]}')
      stores_unable_to_upload.append(value[0])

    # print(len(data_upload_array))


uploadToProjects(documents_dbid, data_upload_array)

1
2


200

### Stores Not Uploaded To

In [None]:
print("Stores not in AV:\n")
for store in stores_not_in_AV:
  print(f'Store #{store}')

print("\nStores unable to upload: ")
for store in stores_unable_to_upload:
  print(f'Store #{store}')

Stores not in AV:


Stores unable to upload: 
