In [55]:
import os
from pathlib import Path 
import datetime as dt 
import os, json, time, math, requests

from intuitlib.client import AuthClient

from typing import Iterator, Tuple, Dict, Any, List 


In [9]:
from pyspark.sql import SparkSession, Row, types as T, functions as F
import ujson, os, re

In [2]:
list(range(2018, 2026, 1))

[2018, 2019, 2020, 2021, 2022, 2023, 2024, 2025]

In [84]:
class Job:
    def __init__(self, light_load:bool = True, lastFY:bool = False):
        base_dir = Path("c:/Users/ZheRao/OneDrive - Monette Farms/Monette Farms Team Site - Innovation Projects/Production/Database_Spark_ETL")
        base_dir_original = Path("c:/Users/ZheRao/OneDrive - Monette Farms/Monette Farms Team Site - Innovation Projects/Production/Database")
        self.json_download_path = Path("c:/Users/ZheRao/OneDrive - Monette Farms/Desktop/Work Files/Projects/5 - HP Data")
        self.base_dir = base_dir
        self.today = dt.date.today()
        current_FY = self.today.year + 1 if self.today.month >= 11 else self.today.year
        if light_load:
            if lastFY:
                first_year = current_FY - 1
            else:
                first_year = current_FY 
        else:
            first_year = 2019
        self.scope = list(range(first_year, current_FY+1, 1))
        month_format = "".join(["0",str(self.today.month)]) if self.today.month < 10 else str(self.today.month)
        self.us_companies = ["MFUSA", "MFAZ", "MSUSA", "MPUSA"]
        self.company_names = self.us_companies + ["MSL", "NexGen", "MFBC", "MPL", "MFL"]
        self.raw_path = {
            "QBO": {
                "Raw": base_dir/"Bronze"/"QBO"/"Raw"/f"{self.today.year}_{self.today.month}",
                "GL": base_dir/"Bronze"/"QBO"/"GeneralLedger",
                "PL": base_dir/"Bronze"/"QBO"/"ProfitAndLoss",
                "Time":base_dir/"Bronze"/"QBOTime",
                "APAR": base_dir/"Bronze"/"QBO"/"APAR"
            },
            "Delivery": {"Traction":base_dir/"Bronze"/"Traction", "HP":base_dir/"Bronze"/"HarvestProfit"},
            "Auth": {"QBO":base_dir_original/"Bronze"/"Authentication"/"QBO", "QBOTime": base_dir_original/"Bronze"/"Authentication"/"QBOTime",
                     "Harvest Profit": base_dir_original/"Bronze"/"Authentication"/"Harvest Profit"},
            "Log": base_dir/"Load_History"/f"{self.today.year}"/month_format
        }
        self.silver_path = {
            "QBO": {
                "Dimension_time": base_dir/"Silver"/"QBO"/"Dimension"/f"{self.today.year}_{self.today.month}",
                "Dimension": base_dir/"Silver"/"QBO"/"Dimension",
                "Raw": base_dir/"Silver"/"QBO"/"Fact"/"Raw",
                "PL": base_dir/"Silver"/"QBO"/"Fact"/"ProfitAndLoss",
                "GL": base_dir/"Silver"/"QBO"/"Fact"/"GeneralLedger",
                "Time": base_dir/"Silver"/"QBOTime",
                "APAR": base_dir/"Silver"/"QBO"/"Fact"/"APAR"
            },
            "Delivery": {"Traction":base_dir/"Silver"/"Traction", "HP":base_dir/"Silver"/"HarvestProfit"}
        }
        
    
    def get_fx(self):
        key  = os.getenv("ALPHAVANTAGE_KEY")
        url  = ("https://www.alphavantage.co/query?"
                "function=CURRENCY_EXCHANGE_RATE"
                "&from_currency=USD&to_currency=CAD"
                f"&apikey={key}")
        rate = float(requests.get(url, timeout=10).json()
                    ["Realtime Currency Exchange Rate"]["5. Exchange Rate"])
        self.fx = rate
    
    def create_log(self, path: Path) -> None:
        self.check_file(path)
        day_format = "".join(["0",str(self.today.day)]) if self.today.day < 10 else str(self.today.day)
        self.log = open(path/(day_format+"_Log.txt"), "a")

    
    def close_log(self):
        self.log.close()

    def check_file(self, path: Path) -> None:
        if not Path.exists(path):
            os.makedirs(path)

In [85]:
self = Job(light_load=True,lastFY=False)
self.scope

[2025]

In [86]:
today = dt.date.today()
today

datetime.date(2025, 10, 27)

In [87]:
jobs_MFL = []
jobs_others = []
last_day = {3: 31, 6:30, 9:30, 12:31}
for company in self.company_names:
    if company == "MFL":
        jobs = jobs_MFL 
    else:
        jobs = jobs_others
    fy = self.scope[0]
    jobs.append((company,
                 dt.date(fy-1, 10, 1),
                 dt.date(fy-1, 12, 31)))    # add last quarter from last for fiscal year consistency
    for year in self.scope:
        for month in [1, 4, 7, 10]:
            if year == today.year and month > today.month:
                continue
            jobs.append((company, 
                         dt.date(year, month, 1), 
                         dt.date(year, month+2, last_day[month+2])))
len(jobs_MFL), len(jobs_others)

(5, 40)

In [105]:
spark = (
    SparkSession.builder
    .appName("test")
    .master("local[3]")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.python.use.daemon", "false")
    .getOrCreate()
)

In [None]:
# TARGET_SCHEMA = T.StructType([
#     T.StructField("TransactionDate", T.StringType(), nullable=False),
#     T.StructField("TransactionType", T.StringType(), nullable=False),
#     T.StructField("TransactionID_partial", T.StringType(), nullable=False),
#     T.StructField("DocNumber", T.StringType(), nullable=True),
#     T.StructField("Name", T.StringType(), nullable=True),
#     T.StructField("NameID", T.StringType(), nullable=True),
#     T.StructField("Memo", T.StringType(), nullable=True),
#     T.StructField("SplitAcc", T.StringType(), nullable=True),
#     T.StructField("SplitAccID", T.StringType(), nullable=True),
#     T.StructField("Amount", T.StringType(), nullable=True),
#     T.StructField("Balance", T.StringType(), nullable=True),
#     T.StructField("Location", T.StringType(), nullable=True),
#     T.StructField("FarmID", T.StringType(), nullable=False),
#     T.StructField("Class", T.StringType(), nullable=True),
#     T.StructField("ClassID", T.StringType(), nullable=True)
# ])

In [None]:
# columns_all = ["TransactionDate", "TransactionType", "DocNumber", "Name", "Location", "Class", "Memo", "SplitAcc", "Amount", "Balance"]
# columns_nofarm = [col for col in columns_all if col != "Location"]
# columns_nofarmclass = [col for col in columns_nofarm if col != "Class"]


# Extract PL

In [91]:
def _refresh_auth_client(company: str, secret: dict[str, str]) -> AuthClient:
    """ 
        create auth_client object for company called with, return auth_client for data extraction
    """
    mode = "production"
    # create auth_client object
    if company in ["MFUSA","MPUSA","MFAZ","MSUSA"]:
        auth_client = AuthClient(client_id = secret["USA"]["client_id"],
                        client_secret = secret["USA"]["client_secret"],
                        redirect_uri = "https://developer.intuit.com/v2/OAuth2Playground/RedirectUrl",
                        environment = mode)
    else:
        auth_client = AuthClient(client_id = secret["CA"]["client_id"],
                                client_secret = secret["CA"]["client_secret"],
                                redirect_uri = "https://developer.intuit.com/v2/OAuth2Playground/RedirectUrl",
                                environment = mode)
    # assign tokens
    with open(self.raw_path["Auth"]["QBO"]/"tokens.json", "r") as f:
        tokens = json.load(f)
    auth_client.access_token = tokens[company]["access_token"]
    auth_client.refresh_token = tokens[company]["refresh_token"]
    auth_client.realm_id = tokens[company]["realm_id"]
    # refresh
    auth_client.refresh()
    # save refreshed tokens
    tokens[company]["access_token"] = auth_client.access_token 
    tokens[company]["refresh_token"] = auth_client.refresh_token 
    tokens[company]["realm_id"] = auth_client.realm_id 
    with open(self.raw_path["Auth"]["QBO"]/"tokens.json", "w") as f:
        json.dump(tokens, f, indent=4)
    return auth_client 

In [92]:
extract_MFL = []
extract_others = []
current_company = ""

with open(self.raw_path["Auth"]["QBO"]/"client_secrets.json", "r") as f:
    secret = json.load(f)

for (company, start, end) in jobs_MFL + jobs_others:
    if company != current_company:
        # refresh company credential
        auth_client = _refresh_auth_client(company, secret)
        current_company = company
    if company == "MFL":
        extract = extract_MFL 
    else:
        extract = extract_others
    extract.append({
        "company": company,
        "realm_id": auth_client.realm_id,
        "token": auth_client.access_token,
        "start": start.isoformat(),
        "end": end.isoformat(),
        "report": "ProfitAndLossDetail",
        "out_path": self.raw_path["QBO"]["PL"]/company/(str(start.year)+"_"+str(start.month)+".json")
    })
len(extract_MFL) + len(extract_others)


45

In [93]:
extract_MFL[0]

{'company': 'MFL',
 'realm_id': 123146146745069,
 'token': 'eyJhbGciOiJkaXIiLCJlbmMiOiJBMTI4Q0JDLUhTMjU2IiwieC5vcmciOiJIMCJ9..b9NGErRjiSi43R29tDxCKA.t6t0mw1DNfXPtR88h8-GXEvd4QcUYJJtbslmbskJKt2Kjw2jYhBHbqEdDEQwQrqUyeFdptZgCDvELc1p2QLJSMuLgvNGLHeR4Q0Eg62eBpCXl4z8TE4Ir3cDsgGF8-mLmMOjFAW9C3m0FR1YVMCfLQznZB0ElJmwHGToSnBJgbHYwZ9afb1blY3F7rBxbhs7ZmNMt-3F3JpcNP410yF8uyU2QTyCL1AtS47XiAvUl-YmnPgz4vd0kJ0NRELqM0_-vqp6rFaTWPpDlhi4eeNgfvpUVQzAiejS5mdDZfTw8EuMgw10eiFF-gpYKbHPGZ0JIom2e9Uxn1p1MtVu0QbfJDk36PIqPnV11ASYUljpsmepsX7Dt1Ql-g2nm99h-du6yg8VFP0ej9FpAJ3zPa0IFCJSkbYaAFPGLpFtlon3LmF6MbhicQlrkqPNVHgbGfL4lMddRqoRe-EV_EfgHV9KsKHGnsmWqCQU6YlavAlozEo.MR6OT5o46BbETjtnW6_4sQ',
 'start': '2024-10-01',
 'end': '2024-12-31',
 'report': 'ProfitAndLossDetail',
 'out_path': WindowsPath('c:/Users/ZheRao/OneDrive - Monette Farms/Monette Farms Team Site - Innovation Projects/Production/Database_Spark_ETL/Bronze/QBO/ProfitAndLoss/MFL/2024_10.json')}

In [94]:
# MFL 3 partitions, others 5 partitions
MFL_partition = [extract_MFL[i::3] for i in range(3)]

In [95]:
other_partition = [extract_others[i::5] for i in range(5)]

In [99]:
partitions = MFL_partition + other_partition 

In [100]:
len(partitions)

8

In [None]:
def extract_partition(it) -> None:
    """ 
        This function processes tasks inside one partition
            one task is extract raw content from QBO API call
    """
    BASE_URL = "https://quickbooks.api.intuit.com"
    minor_version = 75
    session = requests.Session()
    session.headers.update({"Accept": "application/json"})

    # request_with_retry

    for task in it:
        session.headers.update({
            "Authorization": f"Bearer {task["token"]}",
        })
        company = task["company"]
        realm_id = task["realm_id"]
        start = task["start"]
        end = task["end"]
        report_name = task["report"]

        url = f"{BASE_URL}/v3/company/{realm_id}/reports/{report_name}"
        params = {
            "minorversion": minor_version,
            "start_date": start,
            "end_date": end,
            "columns": "all"
        }

        resp = session.get(url, params=params)
        payload = resp.content

        with open(task["out_path"], "wb") as f:
            f.write(payload)

    

In [106]:
rdd = spark.sparkContext.parallelize(partitions, len(partitions))
rdd.foreachPartition(extract_partition)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 3.0 failed 1 times, most recent failure: Lost task 2.0 in stage 3.0 (TID 14) (MFARM-AI executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:713)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:757)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:686)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:652)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:628)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:585)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:538)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 17 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:713)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:757)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:686)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:652)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:628)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:585)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:538)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 17 more


In [107]:
rdd = spark.sparkContext.parallelize(range(10), 3)
def ping(it):
    import os, socket
    _ = os.getpid()
    for _ in it:
        pass
rdd.foreachPartition(ping)


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 16) (MFARM-AI executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:713)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:757)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:686)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:652)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:628)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:585)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:538)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 17 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:713)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:757)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:686)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:652)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:628)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:585)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:538)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 17 more


In [57]:
with open(self.raw_path["Auth"]["QBO"]/"client_secrets.json", "r") as f:
    secret = json.load(f)

In [60]:
auth_MFL = _refresh_auth_client("MFL", secret)
auth_MFL

<intuitlib.client.AuthClient at 0x24e5b4a3fe0>

In [64]:
task_test = extract_MFL[0]
task_test

{'company': 'MFL',
 'start': '2024-10-01',
 'end': '2024-12-31',
 'report': 'ProfitAndLossDetail',
 'out_path': WindowsPath('c:/Users/ZheRao/OneDrive - Monette Farms/Monette Farms Team Site - Innovation Projects/Production/Database/Bronze/QBO/ProfitAndLoss/MFL/2024_10.json')}

In [65]:
task_test.update({"realm_id": auth_MFL.realm_id,
                    "token": auth_MFL.access_token,})
task_test

{'company': 'MFL',
 'start': '2024-10-01',
 'end': '2024-12-31',
 'report': 'ProfitAndLossDetail',
 'out_path': WindowsPath('c:/Users/ZheRao/OneDrive - Monette Farms/Monette Farms Team Site - Innovation Projects/Production/Database/Bronze/QBO/ProfitAndLoss/MFL/2024_10.json'),
 'realm_id': 123146146745069,
 'token': 'eyJhbGciOiJkaXIiLCJlbmMiOiJBMTI4Q0JDLUhTMjU2IiwieC5vcmciOiJIMCJ9..7lpV4xVKKv7V5TbQ4rsCuA.zmcftkWHiWR3hzppIf3PvIHcOlCebD808thg_oo8N_TW1IRmxd7OHGoEsq7Ik05YQlIjAyEcVylMOmjWIIKX06uWW6oOBs69U2S2p7r-Asy-KVUnvnZqRI3P903Qc7XpRX8YFIpo0TkzYBx-C6XnKKjvysawDvrkFrLe7rJAYjh9Q4g-mzJKQJagjdJvWmu5LFgZoG6Y_m4XNkbIW7CJtRg_iILoCO3EpKq9hjS-zzjcTL1ozNz5ruS3q-CPo0Uzdp2f8uADCJrZ7NVjM4QwxB1lGcs22C04XT3M9TiMe9mCtt6PrXyhH2HRZ2BZPy1iRSKH0mMPq2Mr5BSfUme2ZXinBu6kIgaBYhJXDxh261xRaOgGji0q_xhBSZBUmuzBS_wlDVygqFpcjIy1TX8fBWFWHygdMYf-oGl9QYz4Tfj7jyvLujoJ0MijHsVPlIlkOfbwPNYQ1n0t-6b6OEfcJM9bYLz70coIPDH_9gd2Ivw.0cKIg1FbAeBw8dXjuyrMfA'}

In [67]:
session = requests.Session()
session.headers.update({
    "Authorization": f"Bearer {task_test["token"]}",
    "Accept": "application/json"
})
company = task_test["company"]
realm_id = task_test["realm_id"]
start = task_test["start"]
end = task_test["end"]
report_name = task_test["report"]


In [68]:
BASE_URL = "https://quickbooks.api.intuit.com"
minor_version = 75
url = f"{BASE_URL}/v3/company/{realm_id}/reports/{report_name}"
params = {
    "minorversion": minor_version,
    "start_date": start,
    "end_date": end,
    "columns": "all"
}
resp = session.get(url, params=params, timeout=60)
resp.raise_for_status()

In [70]:
raw = resp.content

In [72]:
with open("test.json", "wb") as f:
    f.write(raw)

In [73]:
import orjson

In [74]:
with open("test.json","rb") as f:
    data = orjson.loads(f.read())

In [75]:
data

{'Header': {'Time': '2025-10-27T09:34:28-07:00',
  'ReportName': 'ProfitAndLossDetail',
  'ReportBasis': 'Accrual',
  'StartPeriod': '2024-10-01',
  'EndPeriod': '2024-12-31',
  'Currency': 'CAD',
  'Option': [{'Name': 'NoReportData', 'Value': 'false'}]},
 'Columns': {'Column': [{'ColTitle': 'Date',
    'ColType': 'Date',
    'MetaData': [{'Name': 'ColKey', 'Value': 'tx_date'}]},
   {'ColTitle': 'Transaction Type',
    'ColType': 'String',
    'MetaData': [{'Name': 'ColKey', 'Value': 'txn_type'}]},
   {'ColTitle': '#',
    'ColType': 'String',
    'MetaData': [{'Name': 'ColKey', 'Value': 'doc_num'}]},
   {'ColTitle': 'Name',
    'ColType': 'String',
    'MetaData': [{'Name': 'ColKey', 'Value': 'name'}]},
   {'ColTitle': 'Location',
    'ColType': 'String',
    'MetaData': [{'Name': 'ColKey', 'Value': 'dept_name'}]},
   {'ColTitle': 'Class',
    'ColType': 'String',
    'MetaData': [{'Name': 'ColKey', 'Value': 'klass_name'}]},
   {'ColTitle': 'Memo/Description',
    'ColType': 'String',