# Fetch Credentials

In [0]:
# Fetch credentials from Azure Key Vault
ACCOUNT_ID      = dbutils.secrets.get(scope="azure_key_vault", key="NS-SB-ACCOUNT-ID")
VERSION         = "v1"
REALM           = dbutils.secrets.get(scope="azure_key_vault", key="NS-SB-REALM")
CONSUMER_KEY    = dbutils.secrets.get(scope="azure_key_vault", key="NS-SB-CONSUMER-KEY")
CONSUMER_SECRET = dbutils.secrets.get(scope="azure_key_vault", key="NS-SB-CONSUMER-SECRET")
TOKEN_KEY       = dbutils.secrets.get(scope="azure_key_vault", key="NS-SB-TOKEN-ID")
TOKEN_SECRET    = dbutils.secrets.get(scope="azure_key_vault", key="NS-SB-TOKEN-SECRET")

# Define Logging and Handler

In [0]:
from datetime import datetime
from zoneinfo import ZoneInfo
import logging

# Configure the root logger’s level & format
logging.basicConfig(
    level=logging.INFO,
    format= "%(asctime)s %(levelname)s %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)

# Grab the root logger once for the whole module
root_logger = logging.getLogger()

# Define a custom ListHandler. Handler that appends each record to the target list.
class ListHandler(logging.Handler):
    def __init__(self, target_list):
        super().__init__() # under the hood invokes logging.Handler.__init__
        self.target = target_list

    def emit(self, record): # “record” is the LogRecord created by logging.info()/error()        
        utc_dt = datetime.fromtimestamp(
            record.created, # a UTC-based POSIX timestamp
            tz=ZoneInfo("UTC") # keep in UTC
        )

        # Append structured log entry
        self.target.append({
            "run_ts": utc_dt,
            "level": record.levelname,
            "message": record.getMessage()
        })

# Define Functions

New version of cluster has no requests_oauthlib installed.
We have to install it first.

In [0]:
%pip install requests_oauthlib

Collecting requests_oauthlib
  Downloading requests_oauthlib-2.0.0-py2.py3-none-any.whl.metadata (11 kB)
Downloading requests_oauthlib-2.0.0-py2.py3-none-any.whl (24 kB)
Installing collected packages: requests_oauthlib
Successfully installed requests_oauthlib-2.0.0
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
import requests
from requests_oauthlib import OAuth1

def set_netsuite_connection(
    account_id: str,
    version: str,
    realm: str,
    limit: int = 1000,
    params: dict = None
):
    """
    Build REST endpoint, OAuth1 auth, headers, and default params.
    """
    # Build the base URL for REST calls
    base_url = f"https://{account_id}.suitetalk.api.netsuite.com/services/rest/record/{version}/"

    # Set authentication
    oauth = OAuth1(
        client_key=CONSUMER_KEY,
        client_secret=CONSUMER_SECRET,
        resource_owner_key=TOKEN_KEY,
        resource_owner_secret=TOKEN_SECRET,
        signature_method="HMAC-SHA256",
        realm=realm
    )

    # Standard NetSuite REST headers
    headers = {
        "Accept": "application/json",
        "Content-Type": "application/json"
    }

    # Set parameters. Clone params per call to avoid shared mutable state in threads
    call_params = dict(params) if params else {}
    call_params.setdefault("limit", limit) # only assigns limit if the key "limit" is not already present in the call_params dict.
    
    return base_url, oauth, headers, call_params


def get_all_netsuite_page(endpoint: str, base_url: str, oauth: OAuth1, headers: dict, params: dict) -> list:
    """
    Paginate through NetSuite list endpoint, collecting all item summaries.
    """
    # Set url
    url = base_url + endpoint
    
    # Log the start    
    logging.info("Starting GET all NetSuite Pages for endpoint '%s' with parameters %s at URL: %s", endpoint, params, url)
    
    # Final result
    all_page_items = []

    while True:
        # Get a page
        try:
            page_resp = requests.get(url=url, auth=oauth, headers=headers, params=params)
            logging.info("Received HTTP %d for page URL: %s", page_resp.status_code, url)
            page_resp.raise_for_status()
        except requests.RequestException as e:
            logging.error("GET page request to %s failed: %s", url, e)
            raise

        # Transform the page response into json
        page = page_resp.json()

        # Collect all item summaries (just id + links) to the result
        all_page_items.extend(page.get("items", []))

        # Get next link
        next_link = [link for link in page.get("links", []) if link.get("rel") == "next"]
        
        # Find any "next" link
        if not next_link:
            logging.info("No next link found. Completed GET all NetSuite Pages.")
            break
        
        # Move to the next page URL
        next_url = next_link[0]["href"]
        logging.info("Found next link, will request: %s", next_url)
        url = next_url

        # Clear the parameters on subsequent calls because they inherit from initial parameters
        params = {}

    return all_page_items


def get_netsuite_page_details(all_page_items: list, oauth: OAuth1, headers: dict) -> list:
    """
    Fetch full JSON for each item summary.
    """
    # Log the start
    logging.info("Starting GET NetSuite page details")

    # Final result
    all_item_details = []
    
    # If empty input, retrun empty result immediately
    if not all_page_items:
        logging.info("No pages found. End.")
        return all_item_details
    
    # Get item details
    for item in all_page_items:
        # Fetch item id
        item_id = item.get("id", "(no-id)")

        # Fetch the item url
        item_detail_url = item["links"][0]["href"]

        # Fetch the item details by item url (single page for each item)
        try:
            # Get response
            item_resp = requests.get(url=item_detail_url, auth=oauth, headers=headers) # no limit parameter

            # Log info message
            logging.info("Received HTTP %d for item URL: %s", item_resp.status_code, item_detail_url)
            item_resp.raise_for_status()

            # Transform the item response to json
            item_detail = item_resp.json()

            # Append the details to result
            all_item_details.append(item_detail)
        except requests.RequestException as e:
            logging.error("GET item request to %s failed: %s", item_detail_url, e)
            continue # continue to the next item if current item failed

    logging.info("Completed GET NetSuite page details")
    return all_item_details

In [0]:
import json

def write_to_lakehouse(
    data: list            =None, 
    partition_number: int =1, 
    medal: str            ="bronze",
    subfolder_path: str   ="netsuite", 
    entity: str           ="",
    file_format: str      ="delta", 
    write_mode: str       ="overwrite", 
    merge_schema: bool    ="True"
):  
    """
    Write a list of JSON records into a Delta table in ADLS and register the table in Unity Catalog.
    """
    # Check whether the data is empty
    if not data:
        logging.info("No data for %s. Skip write.", entity)
        return

    # # Build a RDD
    # json_rdd = sc.parallelize([json.dumps(record) for record in data])

    # # Read RDD as json
    # df = spark.read.json(json_rdd)

    # Build a one-column DataFrame of JSON strings - Shared Cluster can't use RDD, so turn to save json in tmp first
    json_list = [(json.dumps(rec),) for rec in data]
    schema    = StructType([StructField("value", StringType(), True)])
    json_df   = spark.createDataFrame(json_list, schema)

    # Write it out as text to a temporary ADLS folder
    tmp_dir = (
        f"abfss://{medal}@qydatalake.dfs.core.windows.net/"
        f"{subfolder_path.strip('/')}/_tmp_json/{entity}/"
    )
    json_df.coalesce(1).write.mode("overwrite").text(tmp_dir)

    # Read the JSON files back so Spark infers the full nested schema
    df = spark.read.json(tmp_dir)

    # Define the storage path
    path = f"abfss://{medal}@qydatalake.dfs.core.windows.net/{subfolder_path}/{entity}/"

    # Define the catalog name and table name
    subfolder_path = subfolder_path.strip("/") # Remove the leading or trailing "/"
    parts = subfolder_path.split("/") # Splits the cleaned path into a list of directory names.
    catalog_name = parts[0] # take the first part as catalog name
    if parts[1:]:
        postfix = "_".join(parts[1:]) # take the rest of part (if exists) as postfix of table name
        table_name = f"{entity}_{postfix}"
    else:
        table_name = entity

    # Write into lakehouse
    (
        df
        .coalesce(partition_number) # data is small (< 250 MB), collapse to one file to avoid metadata scan overhead
        .write
        .format(file_format)
        .mode(write_mode)
        .option("mergeSchema", merge_schema) # allows new columns to be added when the table already exists
        .option("path", path) # writes the Delta table into ADLS
        .saveAsTable(f"{catalog_name}.{medal}.{table_name}") # registers (or refreshes) the table in Unity Catalog on Databricks
    )

    # Clean up the temporary JSON folder
    dbutils.fs.rm(tmp_dir, recurse=True)

    logging.info("Wrote %d records of %s to %s", len(data), entity, path)

In [0]:
from pyspark.sql.types import TimestampType, StructField, StructType, StringType
import pyspark.sql.functions as F

def log_progress(
    log_records: list     =None,
    medal: str            ="bronze",
    subfolder_path: str   ="netsuite", 
    entity: str           =""
):
    """
    Persist captured logs into Delta under ADLS.
    """
    if not log_records:
        return

    # Define the log path
    log_path = f"abfss://{medal}@qydatalake.dfs.core.windows.net/{subfolder_path}/logs/{entity}_log"

    # Define the log schema
    schema = StructType([
        StructField("run_ts", TimestampType(), nullable=False),
        StructField("level", StringType(), nullable=False),
        StructField("message", StringType(), nullable=False)
    ])

    # Create the log dataframe with exact one file output
    log_df = spark.createDataFrame(log_records, schema).coalesce(1)

    # Write the log
    (
        log_df
        .write
        .format("delta")
        .mode("append")
        .option("mergeSchema", True)
        .save(log_path)
    )

    logging.info("Logged %d entries for %s", len(log_records), entity)

In [0]:
def fetch_entity_data(
    entity: str, base_url: str, oauth: OAuth1, headers: dict, params: dict
):
    """
    Thread-safe wrapper: fetch items & details, capture logs, return data + logs.
    """
    entity_logs = []
    list_handler = ListHandler(entity_logs)
    list_handler.setLevel(logging.INFO)
    root_logger.addHandler(list_handler)

    try:
        # 1. Fetch all pages (just returns the “item” summaries: id + self‐link)
        page_items = get_all_netsuite_page(endpoint=entity, base_url=base_url, oauth=oauth, headers=headers, params=params)
        # 2. Fetch full JSON for each “item” in every page (a list of dicts)
        page_item_details = get_netsuite_page_details(all_page_items=page_items, oauth=oauth, headers=headers)
    finally:
        root_logger.removeHandler(list_handler)

    return entity, page_item_details, entity_logs

# 1. Fetch entities from NetSuite by REST API
# 2. Write the output to Lakehouse
# 3. Log progress

In [0]:
from concurrent.futures import ThreadPoolExecutor, as_completed

# === MAIN PIPELINE ===
if __name__ == "__main__":
    # Prepare NetSuite connection
    base_url, oauth, headers, params = set_netsuite_connection(account_id=ACCOUNT_ID, version=VERSION, realm=REALM)

    # Entities we want to load
    entities = ["account", "subsidiary", "department"]

    # Parallel HTTP fetch
    with ThreadPoolExecutor(max_workers=len(entities)) as pool:
        futures = {
            pool.submit(fetch_entity_data, entity, base_url, oauth, headers, params): entity
            for entity in entities
        }

        for future in as_completed(futures): # futures is a Dict[Future, str]
            # entity = futures[future]
            entity, data, logs = future.result()

            # Write to Lakehouse
            write_to_lakehouse(data=data, partition_number=1, medal="bronze", subfolder_path="netsuite", entity=entity)

            # Log progress
            log_progress(log_records=logs, medal="bronze", subfolder_path="netsuite", entity=entity)

# --- End of netsuite_loader.py ---

2025-07-13 20:25:13 INFO Starting GET all NetSuite Pages for endpoint 'account' with parameters {'limit': 1000} at URL: https://[REDACTED].suitetalk.api.netsuite.com/services/rest/record/v1/account
2025-07-13 20:25:13 INFO Starting GET all NetSuite Pages for endpoint 'subsidiary' with parameters {'limit': 1000} at URL: https://[REDACTED].suitetalk.api.netsuite.com/services/rest/record/v1/subsidiary
2025-07-13 20:25:13 INFO Starting GET all NetSuite Pages for endpoint 'department' with parameters {'limit': 1000} at URL: https://[REDACTED].suitetalk.api.netsuite.com/services/rest/record/v1/department
2025-07-13 20:25:13 INFO Received HTTP 200 for page URL: https://[REDACTED].suitetalk.api.netsuite.com/services/rest/record/v1/account
2025-07-13 20:25:13 INFO No next link found. Completed GET all NetSuite Pages.
2025-07-13 20:25:13 INFO Starting GET NetSuite page details
2025-07-13 20:25:13 INFO Received HTTP 200 for page URL: https://[REDACTED].suitetalk.api.netsuite.com/services/rest/rec