# import and setup

In [1]:
import os
from pathlib import Path 
import datetime as dt 
import os, json, time, math, requests
from intuitlib.client import AuthClient
import subprocess, shlex

In [2]:
from pyspark.sql import SparkSession, Row, types as T, functions as F 
import re

In [3]:
import orjson
from typing import TypedDict, Literal

In [4]:
config_path = "projects/ETLPipeline/ETLCodeBase_Spark/config/info.json"
with open(Path.home()/config_path, "r") as f:
    config = json.load(f)
config

{'systeminfo': {'last_load_date': '2000-01-01', 'last_fx': 1.4116},
 'companyinfo': {'us_companies': ['MFUSA', 'MFAZ', 'MSUSA', 'MPUSA'],
  'ca_companies': ['MSL', 'NexGen', 'MFBC', 'MPL', 'MFL']},
 'directories': {'base': 'projects/ETLPipeline/Database',
  'credentials': {'QBO': '.inputs/dev'},
  'bronze': {'QBO': {'PL': 'Bronze/QBO/PL', 'GL': 'Bronze/QBO/GL'}},
  'silver': {'QBO': {'PLV0': 'Silver/QBO/PL_v0', 'GL': 'Silver/QBO/GL'}}}}

In [5]:
project_config_path = "projects/ETLPipeline/ETLCodeBase_Spark/config/project_config.json"
with open(Path.home()/project_config_path, "r") as f:
    project_config = json.load(f)
project_config

{'PL': {'columns': {'TransactionDate': 'date',
   'TransactionType': 'str',
   'TransactionID': 'str',
   'DocNumber': 'str',
   'Name': 'str',
   'NameID': 'str',
   'Memo': 'str',
   'SplitAcc': 'str',
   'SplitAccID': 'str',
   'Amount': 'float',
   'Balance': 'float',
   'Location': 'str',
   'LocationID': 'str',
   'Class': 'str',
   'ClassID': 'str'},
  'locations': {},
  'columns_QBO_meta': {'tx_date': 'TransactionDate',
   'txn_type': 'TransactionType',
   'doc_num': 'DocNumber',
   'name': 'Name',
   'dept_name': 'Location',
   'klass_name': 'Class',
   'memo': 'Memo',
   'split_acc': 'SplitAcc',
   'subt_nat_amount': 'Amount',
   'subt_nat_amount_nt': 'Amount',
   'rbal_nat_amount': 'Balance',
   'rbal_nat_amount_nt': 'Balance'}}}

In [6]:
today = dt.date.today()
config["systeminfo"]["last_load_date"] = today.isoformat()

# Job Class - task creater

In [7]:
class Job:
    def __init__(self, light_load:bool = True, lastFY:bool = False):
        self.today = dt.date.today()
        current_FY = self.today.year + 1 if self.today.month >= 11 else self.today.year
        if light_load:
            if lastFY:
                first_year = current_FY - 1
            else:
                first_year = current_FY 
        else:
            first_year = 2019
        self.scope = list(range(first_year, current_FY+1, 1))
    
    def get_fx(self):
        key  = os.getenv("ALPHAVANTAGE_KEY")
        url  = ("https://www.alphavantage.co/query?"
                "function=CURRENCY_EXCHANGE_RATE"
                "&from_currency=USD&to_currency=CAD"
                f"&apikey={key}")
        rate = float(requests.get(url, timeout=10).json()
                    ["Realtime Currency Exchange Rate"]["5. Exchange Rate"])
        self.fx = rate

    def check_file(self, path: Path) -> None:
        path.mkdir(parents=True, exist_ok=True)

In [8]:
self = Job(light_load=True,lastFY=True)
self.scope

[2025, 2026]

In [9]:
today.year

2025

## create jobs

In [10]:
jobs_MFL = []
jobs_others = []
last_day = {3: 31, 6:30, 9:30, 12:31}
for company in config["companyinfo"]["us_companies"] + config["companyinfo"]["ca_companies"]:
    if company == "MFL":
        jobs = jobs_MFL 
    else:
        jobs = jobs_others
    fy = self.scope[0]
    jobs.append((company,
                 dt.date(fy-1, 10, 1),
                 dt.date(fy-1, 12, 31)))    # add last quarter from last for fiscal year consistency
    for year in self.scope:
        for month in [1, 4, 7, 10]:
            if dt.date(year,month,1) > today:
                continue
            jobs.append((company, 
                         dt.date(year, month, 1), 
                         dt.date(year, month+2, last_day[month+2])))
len(jobs_MFL), len(jobs_others)

(5, 40)

In [11]:
jobs_MFL

[('MFL', datetime.date(2024, 10, 1), datetime.date(2024, 12, 31)),
 ('MFL', datetime.date(2025, 1, 1), datetime.date(2025, 3, 31)),
 ('MFL', datetime.date(2025, 4, 1), datetime.date(2025, 6, 30)),
 ('MFL', datetime.date(2025, 7, 1), datetime.date(2025, 9, 30)),
 ('MFL', datetime.date(2025, 10, 1), datetime.date(2025, 12, 31))]

## append tokens for auth

In [12]:
BASE_DIR = Path.home() / config["directories"]["base"]
token_path = BASE_DIR / config["directories"]["credentials"]["QBO"]
os.listdir(token_path)

['client_secrets.json', 'tokens.json', 'copies_Linux']

In [13]:
def _refresh_auth_client(company: str, config:dict) -> AuthClient:
    """ 
        create auth_client object for company called with, return auth_client for data extraction
    """
    mode = "production"
    BASE_DIR = Path.home() / config["directories"]["base"]
    token_path = BASE_DIR / config["directories"]["credentials"]["QBO"]
    with open(token_path/"client_secrets.json", "r") as f:
        secret = json.load(f)
    # create auth_client object
    if company in ["MFUSA","MPUSA","MFAZ","MSUSA"]:
        auth_client = AuthClient(client_id = secret["USA"]["client_id"],
                        client_secret = secret["USA"]["client_secret"],
                        redirect_uri = "https://developer.intuit.com/v2/OAuth2Playground/RedirectUrl",
                        environment = mode)
    else:
        auth_client = AuthClient(client_id = secret["CA"]["client_id"],
                                client_secret = secret["CA"]["client_secret"],
                                redirect_uri = "https://developer.intuit.com/v2/OAuth2Playground/RedirectUrl",
                                environment = mode)
    # assign tokens
    with open(token_path/"tokens.json", "r") as f:
        tokens = json.load(f)
    auth_client.access_token = tokens[company]["access_token"]
    auth_client.refresh_token = tokens[company]["refresh_token"]
    auth_client.realm_id = tokens[company]["realm_id"]
    # # refresh
    # auth_client.refresh()
    # # save refreshed tokens
    # tokens[company]["access_token"] = auth_client.access_token 
    # tokens[company]["refresh_token"] = auth_client.refresh_token 
    # tokens[company]["realm_id"] = auth_client.realm_id 
    # with open(token_path/"tokens.json", "w") as f:
    #     json.dump(tokens, f, indent=4)
    return auth_client 

In [14]:
extract_MFL = []
extract_others = []
current_company = ""
for (company, start, end) in jobs_MFL + jobs_others:
    if company != current_company:
        # refresh company credential
        auth_client = _refresh_auth_client(company, config)
        current_company = company
    if company == "MFL":
        extract = extract_MFL 
    else:
        extract = extract_others
    extract.append({
        "company": company,
        "realm_id": auth_client.realm_id,
        "token": auth_client.access_token,
        "start": start.isoformat(),
        "end": end.isoformat(),
        "report": "ProfitAndLossDetail",
        "out_path": BASE_DIR / config["directories"]["bronze"]["QBO"]["PL"]/company/(str(start.year)+"_"+str(start.month)+".json")
    })
len(extract_MFL) + len(extract_others)

45

In [17]:
extract_MFL[0]

{'company': 'MFL',
 'realm_id': 123146146745069,
 'token': 'eyJhbGciOiJkaXIiLCJlbmMiOiJBMTI4Q0JDLUhTMjU2IiwieC5vcmciOiJIMCJ9.._8X0pJLHP_d659Wi-fM_6g.pYNgSjJ6WJyM9_VwsinMA_4DbhEHEId9X7lp4GtXiveZwtwVg184l9b3krsxDqCDDc-Wzzc4R4lGF3kL4mSCgxQvwiGHhfpmTW70KAnWC_p-jVF-iKO8Mnw-PopgF5_HmIXe83qPZntZGNMdMahCVXk8AoFmfezJwJpJIZEQEa6sh5zCVaANcwwCcP2zq_ezByplJIrjpX1UJIj7fl78GW0KBuLeQrxCpbzJlvidR37CbzjpwM48rmKpgIE_p8zub7NLMFVuqWIRsFosfDvDh4devOt9aFAQ_gquIcXMBuL_IEmB-b8aZXGy_-GacAAvJ-rVHTvUna6Pq7qAXarP5KiKHaNK-Ms1OSaPpZIN3T_jxL-tr7XUOk87Pm6__6viYGqrL2oSfVy1PN6iJ4q7zAbzRLvoAdgEZl0NqJR4ETCFjGXzvoqOhDmOxvIwAGgo0B-lawcsFmQ46ODrQdBA7p9lHzi0znFbc613I3VXJKE.Dji2sVN77WqWtNzk4XbMPw',
 'start': '2024-10-01',
 'end': '2024-12-31',
 'report': 'ProfitAndLossDetail',
 'out_path': PosixPath('/home/zhe_rao/projects/ETLPipeline/Database/Bronze/QBO/PL/MFL/2024_10.json')}

In [89]:
# MFL 3 partitions, others 5 partitions
# MFL_partition = [extract_MFL[i::3] for i in range(3)]
# other_partition = [extract_others[i::5] for i in range(5)]
# partitions = MFL_partition + other_partition 

partitions = extract_MFL + extract_others
len(partitions)

45

In [18]:
def switch_data_source(extract: list[dict[str,str]]) -> None:
    """ 
        This function switch the "report" and "out_path" for each extract task to GL
    """
    for task in extract:
        task["report"] = "GeneralLedger"
        task["out_path"] = BASE_DIR / config["directories"]["bronze"]["QBO"]["GL"]/task["company"]/(str(task["start"][:4])+"_"+str(task["start"][5:7])+".json")


In [20]:
# switch_data_source(extract_MFL)

In [21]:
extract_MFL[0]

{'company': 'MFL',
 'realm_id': 123146146745069,
 'token': 'eyJhbGciOiJkaXIiLCJlbmMiOiJBMTI4Q0JDLUhTMjU2IiwieC5vcmciOiJIMCJ9.._8X0pJLHP_d659Wi-fM_6g.pYNgSjJ6WJyM9_VwsinMA_4DbhEHEId9X7lp4GtXiveZwtwVg184l9b3krsxDqCDDc-Wzzc4R4lGF3kL4mSCgxQvwiGHhfpmTW70KAnWC_p-jVF-iKO8Mnw-PopgF5_HmIXe83qPZntZGNMdMahCVXk8AoFmfezJwJpJIZEQEa6sh5zCVaANcwwCcP2zq_ezByplJIrjpX1UJIj7fl78GW0KBuLeQrxCpbzJlvidR37CbzjpwM48rmKpgIE_p8zub7NLMFVuqWIRsFosfDvDh4devOt9aFAQ_gquIcXMBuL_IEmB-b8aZXGy_-GacAAvJ-rVHTvUna6Pq7qAXarP5KiKHaNK-Ms1OSaPpZIN3T_jxL-tr7XUOk87Pm6__6viYGqrL2oSfVy1PN6iJ4q7zAbzRLvoAdgEZl0NqJR4ETCFjGXzvoqOhDmOxvIwAGgo0B-lawcsFmQ46ODrQdBA7p9lHzi0znFbc613I3VXJKE.Dji2sVN77WqWtNzk4XbMPw',
 'start': '2024-10-01',
 'end': '2024-12-31',
 'report': 'ProfitAndLossDetail',
 'out_path': PosixPath('/home/zhe_rao/projects/ETLPipeline/Database/Bronze/QBO/PL/MFL/2024_10.json')}

# Start Spark

In [None]:
# spark.stop()

In [94]:
# get Poetry's python path
PY = subprocess.check_output(shlex.split("poetry env info --path"), text=True).strip() + "/bin/python"
spark = (
    SparkSession.builder
      .appName("test")
      .master("local[*]")                           # use all cores during dev
      .config("spark.local.ip", "127.0.0.1")        # silences loopback complaints
      .config("spark.driver.bindAddress", "127.0.0.1")
      .config("spark.driver.host", "127.0.0.1")
      .config("spark.pyspark.driver.python", PY)    # ensure Poetry python on driver
      .config("spark.pyspark.python", PY)           # ...and executors
      # .config("spark.python.use.daemon", "true")  # default; faster
      .getOrCreate()
)

your 131072x1 screen size is bogus. expect trouble
25/11/12 16:27:05 WARN Utils: Your hostname, MFARM-AI resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/11/12 16:27:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/12 16:27:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# PL Pipeline

## extract

In [None]:
def extract_partition(it) -> None:
    """ 
        This function processes tasks inside one partition
            one task is extract raw content from QBO API call
    """
    BASE_URL = "https://quickbooks.api.intuit.com"
    minor_version = 75
    session = requests.Session()
    session.headers.update({"Accept": "application/json"})

    # request_with_retry

    for task in it:
        session.headers.update({
            "Authorization": f'Bearer {task["token"]}',
        })
        realm_id = task["realm_id"]
        start = task["start"]
        end = task["end"]
        report_name = task["report"]

        url = f"{BASE_URL}/v3/company/{realm_id}/reports/{report_name}"
        params = {
            "minorversion": minor_version,
            "start_date": start,
            "end_date": end,
            "columns": "all"
        }

        resp = session.get(url, params=params)
        payload = resp.content

        task["out_path"].parent.mkdir(parents=True, exist_ok=True)

        with open(task["out_path"], "wb") as f:
            f.write(payload)

    

45

In [None]:
cores = 8   # cannot expand this to more cores - QBO throttle
rdd = spark.sparkContext.parallelize(partitions, cores * 3)
rdd.foreachPartition(extract_partition)

                                                                                

## transform - stage 1

In [18]:
path = extract_MFL[0]["out_path"]
path

PosixPath('/home/zhe_rao/projects/ETLPipeline/Database/Bronze/QBO/PL/MFL/2024_10.json')

In [46]:
with open(path, "rb") as f:
    raw_bytes = f.read()
data = orjson.loads(raw_bytes)

In [20]:
data["Rows"]["Row"][0]["Rows"]["Row"][1]["Rows"]["Row"][0].keys()

dict_keys(['Header', 'Rows', 'Summary', 'type'])

In [21]:
dict.fromkeys(list(project_config["PL"]["columns"].keys()))

{'TransactionDate': None,
 'TransactionType': None,
 'TransactionID': None,
 'DocNumber': None,
 'Name': None,
 'NameID': None,
 'Memo': None,
 'SplitAcc': None,
 'SplitAccID': None,
 'Amount': None,
 'Balance': None,
 'Location': None,
 'FarmID': None,
 'Class': None,
 'ClassID': None}

In [None]:
class col_MetaData(TypedDict):
    Name: str
    Value: str 

class col_types(TypedDict):
    ColTitle: str 
    ColType: Literal["Date", "String", "Money"]
    MetaData: list[col_MetaData]

def report_col(col_meta:list[col_types], col_map:dict[str,str]) -> list[str]:
    """ 
        This function extract and standardize all columns from a given report, prevent positioanl drift (e.g., record class value under location column)
        This function will account for the variations of different report - make the pipeline robust
        when a standardized mapping cannot be found in col_map, the raw column name will be preserved and move forward to avoid pipeline disruption - logged later

        args:    
            col_meta is the json_file["Columns"]["Column"] - list of meta data on report columns at the beginning of each QBO report
            col_map is the mapping dictionary from configuration file - "raw_name" : "standardized_name" pair

    """
    standardized_columns = []
    for i in range(len(col_meta)):
        raw_name = col_meta[i]["MetaData"][0]["Value"]
        standardized_name = col_map.get(raw_name, raw_name)
        standardized_columns.append(standardized_name)
    return standardized_columns

In [102]:
def crawler(json_level:dict, cols:list[str], company:str, acc_info:tuple[str]=None):
    """ 
        this recursive function crawls into each node, yield/return leaf nodes extracted values

        args:
            json_level  - current node level in the json object - next level = json_level["Rows"]["Row"]
            cols        - the top level column names extraction at the report-level
            company     - the company code for current report
            acc_info    - (acc_ID, acc_fullname) information from last layer
    """
    # determine if current node is a leaf node 
    if json_level["type"] == "Data":
        # keys = list(project_config["PL"]["columns"].keys())
        rows = dict.fromkeys(cols)
        for i in range(len(cols)):
            col_name = cols[i]
            value = json_level["ColData"][i]
            if value["value"] != '':    # record value only if it is not empty string
                if col_name != "DocNumber":     # DocNumber column value requires adding company code at the beginning
                    rows[col_name] = value["value"]
                else:
                    rows[col_name] = company + "-" + value["value"]
                    continue        # no need to check and add company code again
            value_id = value.get("id", '')
            if len(value_id) >= 1:
                if col_name == "TransactionType":
                    rows["TransactionID"] = company + value_id
                else:
                    rows[col_name+"ID"] = company + value_id 
        if acc_info:    # record account information when it exist - should always exist - log if not
            rows["AccID"] = company + acc_info[0]
            # accnum, accname = acc_info[1].split(" ")
            acc = acc_info[1]
            rows["AccNum"] = company + acc[:6]
            rows["AccName"] = company + acc[7:]
        yield rows
        
    
    # determine whether the account information should be recorded 
    header = json_level.get("Header")
    if isinstance(header, dict):
        coldata = header.get("ColData", {})
        if coldata:
            coldata = coldata[0]
            if "id" in coldata and "value" in coldata:
                acc_info = (coldata["id"], coldata["value"])
    
    # keep crawling forward if the next level exists
    data = json_level.get("Rows", {})
    if "Row" in data:
        for node_path in data["Row"]:
            yield from crawler(json_level=node_path,cols=cols,company=company,acc_info=acc_info)


In [103]:
def flatten_partition(it):
    """ 
        this function iterate through one partition - go through individual jobs (correspond to one JSON file) - record global information and initiate crawler
    """
    col_map = project_config["PL"]["columns_QBO_meta"]
    for task in it:
        # extract company
        company = task["company"]
        # load JSON
        with open(task["out_path"], "rb") as f:
            raw_bytes = f.read()
        data = orjson.loads(raw_bytes)
        if data:
            # extract columns 
            cols = report_col(data["Columns"]["Column"],col_map)
            # initiate crawler if there's data
            if data.get("Rows",{}):
                for item in data["Rows"]["Row"]:
                    yield from crawler(json_level=item,cols=cols,company=company)

In [104]:
len(partitions)

45

In [109]:
TARGET_SCHEMA = T.StructType([
    T.StructField("TransactionDate", T.StringType(), nullable=False),
    T.StructField("TransactionType", T.StringType(), nullable=False),
    T.StructField("TransactionID", T.StringType(), nullable=False),
    T.StructField("DocNumber", T.StringType(), nullable=True),
    T.StructField("Name", T.StringType(), nullable=True),
    T.StructField("NameID", T.StringType(), nullable=True),
    T.StructField("Memo", T.StringType(), nullable=True),
    T.StructField("SplitAcc", T.StringType(), nullable=True),
    T.StructField("SplitAccID", T.StringType(), nullable=True),
    T.StructField("Amount", T.StringType(), nullable=True),
    T.StructField("Balance", T.StringType(), nullable=True),
    T.StructField("Location", T.StringType(), nullable=True),
    T.StructField("LocationID", T.StringType(), nullable=True),
    T.StructField("Class", T.StringType(), nullable=True),
    T.StructField("ClassID", T.StringType(), nullable=True)
])

In [110]:
cores = 8       # this can be infinitely expanded
rdd = spark.sparkContext.parallelize(partitions, cores*4).mapPartitions(flatten_partition)
df = spark.createDataFrame(rdd, schema=TARGET_SCHEMA)


## create fiscal year and write out

                                                                                

64294