In [118]:
import os
import time
import json
import requests
import random
import string
from google.cloud import bigquery
import urllib.parse
import hmac
import hashlib
import base64
import dlt
from dotenv import load_dotenv
from datetime import datetime, timedelta

In [41]:
# Load environment variables from secrets.env
load_dotenv("secrets.env")

# Authenticate with google cloud
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = os.getenv("GCP_KEY_PATH")

# BigQuery account info
PROJECT_ID = os.getenv("PROJECT_ID")
DATASET_ID = os.getenv("DATASET_ID")
DATASET_ID_STAGING = os.getenv("DATASET_ID_STAGING")

# NetSuite access secrets
NETSUITE_ACCOUNT_ID = os.getenv("NETSUITE_ACCOUNT_ID")
NETSUITE_REALM = os.getenv("NETSUITE_REALM")
NETSUITE_CONSUMER_KEY = os.getenv("NETSUITE_CONSUMER_KEY")
NETSUITE_CONSUMER_SECRET = os.getenv("NETSUITE_CONSUMER_SECRET")
NETSUITE_TOKEN = os.getenv("NETSUITE_TOKEN")
NETSUITE_TOKEN_SECRET = os.getenv("NETSUITE_TOKEN_SECRET")

In [42]:
# Initialize BigQuery client
client = bigquery.Client()

In [57]:
# Get BigQuery table IDs
ACCOUNT_TABLE = f"{PROJECT_ID}.{DATASET_ID}.Account"
ACCOUNT_TABLE_TEST = f"{PROJECT_ID}.{DATASET_ID}.Account_test"
ACCOUNT_TABLE_STAGING = f"{PROJECT_ID}.{DATASET_ID_STAGING}.Account_staging"

# TODO: Add the other tables here

In [44]:
# NetSuite API Endpoint
HTTP_METHOD = "POST"
BASE_URL = f"https://{NETSUITE_ACCOUNT_ID}.suitetalk.api.netsuite.com/services/rest/query/v1/suiteql"

In [99]:
"""
Input params in the form of:
params = {
    "limit": "5",
    "offset": "0"
}

Input SQL query in the form of:
query_body = {
    "q": "SELECT acctnumber, fullname, generalrate, currency FROM account"
}

"""
def get_netsuite_data(params, query):
    
    # Encode parameters for URL
    encoded_params = urllib.parse.urlencode(params)
    
    # Full URL with encoccquery params, used in post request
    url = f"{BASE_URL}?{encoded_params}"
    
    # Generate OAuth Parameters
    timestamp = str(int(time.time()))
    nonce = ''.join(random.choices(string.ascii_letters + string.digits, k=11))
    
    oauth_params = {
        "oauth_consumer_key": NETSUITE_CONSUMER_KEY,
        "oauth_token": NETSUITE_TOKEN,
        "oauth_signature_method": "HMAC-SHA256",
        "oauth_timestamp": timestamp,
        "oauth_nonce": nonce,
        "oauth_version": "1.0"
    }
    
    # Merge OAuth parameters with query parameters for signature
    all_params = {**oauth_params, **params}
    
    # Sort all parameters alphabetically (important for signature)
    sorted_params = sorted(all_params.items())
    encoded_param_string = "&".join([f"{urllib.parse.quote(k, safe='')}={urllib.parse.quote(v, safe='')}" for k, v in sorted_params])
    
    # Construct the Signature Base String
    base_string = f"{HTTP_METHOD}&{urllib.parse.quote(BASE_URL, safe='')}&{urllib.parse.quote(encoded_param_string, safe='')}"
    
    # Generate the HMAC-SHA256 Signature
    signing_key = f"{NETSUITE_CONSUMER_SECRET}&{NETSUITE_TOKEN_SECRET}"
    hashed = hmac.new(signing_key.encode(), base_string.encode(), hashlib.sha256)
    signature = base64.b64encode(hashed.digest()).decode()
    
    # URL Encode Signature Before Sending
    encoded_signature = urllib.parse.quote(signature, safe="")
    
    # Construct OAuth Header
    auth_header = (
        f'OAuth realm="{NETSUITE_REALM}", '
        f'oauth_consumer_key="{NETSUITE_CONSUMER_KEY}", '
        f'oauth_token="{NETSUITE_TOKEN}", '
        f'oauth_signature_method="HMAC-SHA256", '
        f'oauth_timestamp="{timestamp}", '
        f'oauth_nonce="{nonce}", '
        f'oauth_version="1.0", '
        f'oauth_signature="{encoded_signature}"'
    )
    
    # Headers
    headers = {
        "Authorization": auth_header,
        "Content-Type": "application/json",
        "Prefer": "transient",
        "Cache-Control": "no-cache",
        "Accept": "*/*",
        "Accept-Encoding": "gzip, deflate, br",
        "Connection": "keep-alive",
    }
    
    # Send Request
    response = requests.post(url, json=query, headers=headers)

    if response.status_code == 200:
        # return response.json().get("links", []), response.json().get("items", [])
        return response
    # Handle response error
    else:
        raise Exception(f"NetSuite API Error: {response.text}")
    
    # Check the response
    #print("\n🔹 Response Status Code:", response.status_code)
    #print("\n🔹 Response Body:", response.json())

In [100]:
LIMIT = 20

In [110]:
def load_data_to_bq(query):
    #TODO: remove noted lines, and instead stream to bigQuery after each batch
    
    
    timestamp = time.time()
    all_data = [] #REMOVE THIS

    # Initial parameters for request
    offset = 0
    params = {
        "limit": str(LIMIT),
        "offset": str(offset)
    }

    # Stop requesting data when this is false, and table has been fully retreived
    has_more = True
    
    while has_more:   
        
        response = get_netsuite_data(params, query)
        response.raise_for_status() # Make sure request was successful
        response_json = response.json()

        # Reached end of data table
        if response_json.get("hasMore") == False:
            has_more = False
        # Table has more data, need to request new batch
        else:
            offset += LIMIT
            params["offset"] = str(offset)
            

        # Delete 'link' column for each record, which seems to be sent over empty no matter what
        batch = response_json.get("items", [])
        for record in batch:
            if 'links' in record:
                del record['links']
            record['updated_at'] = timestamp

        
        
        all_data.extend(batch) # REMOVE THIS, replace with below line
        # load_data_to_bigquery(DESTINATION_TABLE, batch)

        
        # Temporary testing cutoff
        if offset >= 20:
            break

    return all_data # REMOVE THIS

In [104]:
"""
Inputs:
    Name of table in netsuite: String
    List of netsuite columns to retreive: List[String]

Functionality:
    Get LIMIT rows at a time, continuing until the entire table is retreived
    Filter based on input columns

Returns:
    request object
    
"""
def load_full_netsuite_table(table_name, columns):

    # Build query from table_name and columns
    query = {
        "q": f"SELECT {', '.join([item for item in columns])} FROM {table_name}"
    }
    
    return load_data_to_bq(query)



In [126]:
# Get NetSuite data from the last 2 days, based on lastmodifieddate
# can maybe change this to 1 day
def load_recent_netsuite_data(table_name, columns):

    # Get the current date and format it in MM/DD/YYYY
    current_date = datetime.now().strftime('%m/%d/%Y')

    # Build query with recent data condition
    query = {
        "q": f"""
        SELECT {', '.join([item for item in columns])} 
        FROM {table_name}
        WHERE lastmodifieddate >= TO_DATE('{current_date}', 'MM/DD/YYYY') - 2
        """
    }
    
    return load_data_to_bq(query)

In [106]:
columns = ["acctnumber", "fullname", "generalrate", "currency", "lastmodifieddate", "id"]
# columns = ["*"]
table_name = "account"
data = load_full_netsuite_table(table_name, columns)

In [49]:
# Insert colleccted data into BigQuery
# This is not a merge, and will send rows regarless of whether they are already in BigQuery
def load_data_to_bigquery(TABLE_ID, data):
    """
    Loads JSON data into BigQuery (staging table).
    
    Args:
        table_id (str): The  table name where data will be inserted.
        data (list): A list of dictionaries representing the records to load.
    """
    
    errors = client.insert_rows_json(TABLE_ID, data)
    if not errors:
        print(f"✅ Successfully loaded {len(data)} rows into BigQuery.")
    else:
        print(f"❌ Failed to load data into BigQuery: {errors}")

In [50]:
load_data_to_bigquery(ACCOUNT_TABLE, data)

✅ Successfully loaded 60 rows into BigQuery.


In [None]:
# LOAD TO STAGING TABLE

In [73]:
# Columns to merge in
columns = ["acctnumber", "currency", "lastmodifieddate", "id", "accountsearchdisplayname", "accountsearchdisplaynamecopy", "isinactive"]
# columns = ["*"]
table_name = "account"
merge_data = load_full_netsuite_table(table_name, columns)


In [None]:
load_data_to_bigquery(ACCOUNT_TABLE_STAGING, merge_data)

In [89]:
# merge data into BigQuery, so that existing rows are not duplicated
def merge_into_bigquery(target_table, staging_table, unique_key="id"):
    """
    Merges data from a staging table into the target table using BigQuery's MERGE statement.

    Args:
        target_table (str): The full target table name (e.g., "your_project.your_dataset.target_table").
        staging_table (str): The full staging table name (e.g., "your_project.your_dataset.staging_table").
        unique_key (str): The column that uniquely identifies a row (default is "id").
    """
    
    query = f"""
        MERGE `{target_table}` AS T
        USING `{staging_table}` AS S
        ON T.{unique_key} = S.{unique_key}
        
        WHEN MATCHED THEN 
            UPDATE SET
                T.fullname = S.fullname,
                T.acctnumber = S.acctnumber,
                T.generalrate = S.generalrate,
                T.currency = S.currency,
                T.lastmodifieddate = S.lastmodifieddate,
                T.id = S.id,
                T.accountsearchdisplayname = S.accountsearchdisplayname,
                T.accountsearchdisplaynamecopy = S.accountsearchdisplaynamecopy,
                T.isinactive = S.isinactive,
                T.updated_at = S.updated_at
    
        WHEN NOT MATCHED THEN
            INSERT (id, fullname, acctnumber, generalrate, currency, lastmodifieddate, 
                    accountsearchdisplayname, accountsearchdisplaynamecopy, isinactive, updated_at)
            VALUES (S.id, S.fullname, S.acctnumber, S.generalrate, S.currency, S.lastmodifieddate, 
                    S.accountsearchdisplayname, S.accountsearchdisplaynamecopy, S.isinactive, S.updated_at)
"""

    # Run the merge query
    try:
        job = client.query(query)
        job.result()  # Wait for the query to complete
        print(f"✅ Successfully merged data into {target_table}.")
    except Exception as e:
        print(f"❌ Failed to merge data into {target_table}: {e}")



In [None]:
# Merge the staging table into the main table
merge_into_bigquery(ACCOUNT_TABLE, ACCOUNT_TABLE_STAGING)

In [None]:
# Clear staging table after merge (to avoid duplicate processing)
client.query(f"DELETE FROM `{ACCOUNT_TABLE_STAGING}` WHERE TRUE").result()
print("✅ Staging table cleared.")

In [60]:
# Input full dataset ID, including project ID
def create_dataset(dataset_id):
    dataset = bigquery.Dataset(dataset_id)
    dataset.location = "US"
    client.create_dataset(dataset, exists_ok=True)
    print(f"✅ Dataset {dataset_id} created successfully.")

In [61]:
# Input full table ID, including dataset ID and project ID
# Input table schema
def create_table(table_id, schema):
    table = bigquery.Table(table_id, schema=schema)
    client.create_table(table, exists_ok=True)
    print(f"✅ Table {table_id} recreated successfully.")
    

In [76]:
ACCOUNT_TEST_SCHEMA = [
    bigquery.SchemaField("fullname", "STRING"),
    bigquery.SchemaField("acctnumber", "STRING"),
    bigquery.SchemaField("generalrate", "STRING"),
    bigquery.SchemaField("currency", "STRING"),
    bigquery.SchemaField("lastmodifieddate", "STRING"),
    bigquery.SchemaField("id", "STRING"),
    bigquery.SchemaField("accountsearchdisplayname", "STRING"),
    bigquery.SchemaField("accountsearchdisplaynamecopy", "STRING"),
    bigquery.SchemaField("isinactive", "STRING"),
    bigquery.SchemaField("updated_at", "TIMESTAMP"),
]

In [63]:
def drop_dataset(dataset_id):
    client.delete_dataset(dataset_id, delete_contents=True, not_found_ok=True)
    print(f"✅ Dataset {dataset_id} dropped successfully.")

In [64]:
def drop_table(table_id):    
    # Drop table
    client.delete_table(table_id, not_found_ok=True)
    print(f"✅ Table {table_id} dropped successfully.")

In [77]:
drop_table(ACCOUNT_TABLE_TEST)

✅ Table high-essence-450000-t7.NetSuite.Account_test dropped successfully.


In [78]:
create_table(ACCOUNT_TABLE_TEST, ACCOUNT_TEST_SCHEMA)

✅ Table high-essence-450000-t7.NetSuite.Account_test recreated successfully.


In [80]:
load_data_to_bigquery(ACCOUNT_TABLE_TEST, data)

✅ Successfully loaded 60 rows into BigQuery.


In [81]:
drop_table(ACCOUNT_TABLE_STAGING)

✅ Table high-essence-450000-t7.NetSuite_Staging.Account_staging dropped successfully.


In [82]:
create_table(ACCOUNT_TABLE_STAGING, ACCOUNT_TEST_SCHEMA)

✅ Table high-essence-450000-t7.NetSuite_Staging.Account_staging recreated successfully.


In [84]:
load_data_to_bigquery(ACCOUNT_TABLE_STAGING, merge_data)

✅ Successfully loaded 120 rows into BigQuery.


In [108]:
merge_into_bigquery(ACCOUNT_TABLE_TEST, ACCOUNT_TABLE_STAGING)

✅ Successfully merged data into high-essence-450000-t7.NetSuite.Account_test.


In [127]:
# columns = ["acctnumber", "currency", "lastmodifieddate", "id", "accountsearchdisplayname", "accountsearchdisplaynamecopy", "isinactive"]
columns = ["*"]
table_name = "transaction"

load_recent_netsuite_data(table_name, columns)

[{'abbrevtype': 'BINTRNFR',
  'balsegstatus': '5',
  'billingstatus': 'T',
  'createdby': '28739',
  'createddate': '3/25/2025',
  'currency': '1',
  'customtype': '-1',
  'daysopen': '1',
  'exchangerate': '1',
  'id': '19818281',
  'isfinchrg': 'F',
  'ismultishipto': 'F',
  'isreversal': 'F',
  'lastmodifiedby': '28739',
  'lastmodifieddate': '3/25/2025',
  'memo': 'RF-SMART Bin Xfer',
  'memorized': 'F',
  'needsbill': 'F',
  'nexus': '1',
  'number': '408921',
  'ordpicked': 'F',
  'paymenthold': 'F',
  'posting': 'T',
  'postingperiod': '281',
  'prevdate': '3/25/2025',
  'printedpickingticket': 'F',
  'recordtype': 'bintransfer',
  'status': 'Y',
  'tobeprinted': 'F',
  'trandate': '3/25/2025',
  'trandisplayname': 'Bin Transfer #BINT408921',
  'tranid': 'BINT408921',
  'transactionnumber': 'BINTRNFR408921',
  'type': 'BinTrnfr',
  'typebaseddocumentnumber': 'BINT408921',
  'userevenuearrangement': 'F',
  'visibletocustomer': 'T',
  'void': 'F',
  'voided': 'F',
  'updated_at': 