# **Pepperjam API**

In [13]:
IsFullLoad = False
Debugging = True

#### **Import necessary libraries**

In [14]:
from dataclasses import dataclass
from datetime import datetime, timedelta, date
from pyspark.sql.utils import AnalysisException
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import col, explode, max, size, when, element_at, struct, lit, udf, from_json
from datetime import datetime, date, timedelta
from notebookutils import mssparkutils
from typing import List
from pyspark.sql import SparkSession
from dateutil.relativedelta import relativedelta
from pprint import pprint

import pyspark
import requests
import json
import csv
import logging
import delta
import concurrent.futures

# This configuration setting has been added to spark pool configuration, which is applied when the pool is started.
# spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)


#### Constants 

In [15]:
LOAD_BLOCK = timedelta(days=60)

#### **Create logger object to record INFO and ERROR events**
**Logger name must be added to the spark.synapse.logAnalytics.filter.loggerName.match line on the spark pool configuration. Example logger name: "Awin_Notebook_Logging" in this code: mylogger = spark_log4j.LogManager.getLogger("Awin_Notebook_Logging")**

In [16]:
if Debugging:
    mylogger = logging.getLogger('azure.synapse.pyspark')
    mylogger.setLevel(logging.DEBUG) # Change this log level to reduce or increase the amount of logging: DEBUG, INFO, WARNING, ERROR.
    mylogger.handlers.clear()
    mylogger.addHandler(logging.StreamHandler())
    mylogger.propagate = False
else:
    spark_log4j = sc._jvm.org.apache.log4j
    mylogger = spark_log4j.LogManager.getLogger("Pepperjam_Notebook_Logging")

##### **Run the KeyVaultSecrets notebook to gain access to the list of key vault secrets**

In [17]:
%run KeyVaultSecrets

#### **Classes to handle the client objects and error responses**

In [18]:
@dataclass
class Client: #This will be specific to the api. The initial fields were copied from CJ, but now I want to grab the necessary fields for Pepperjam (PJ) 
    ProgramName: str
    apiKey: str
    ProgramId: str
    LoadParams: LoadParams = None  

@dataclass
class LoadParams:
    IsDelta: bool
    Endpoint: str
    DLPath_STG: str
    DLPath_PSA: str
    DateStart: datetime
    DateEnd: datetime
    MergeKeys: str = None
    DeltaColumn: str = None
    RelativeDate: int = None
    QueryParams: str = ''

class APIError(Exception):
    """Base class for API-related exceptions."""
    pass

class APIResponseError(APIError):
    def __init__(self, status_code,ProgramId,endpoint, message,client=None, details=None):
        self.status_code = status_code
        self.ProgramId = ProgramId
        self.endpoint = endpoint
        self.message = message
        self.details = details
        self.client = client

    def __str__(self):
        base_msg = f"APIResponseError for program ({self.ProgramId}) on endpoint ({self.endpoint}) raised with status {self.status_code}: {self.message}"
        if self.details:
            base_msg += f"\nDetails: {self.details}\nEndpoint: {self.endpoint}"
        return base_msg

class APITokenError(APIError):
    def __init__(self, message, client=None,details=None):
        self.message = message
        self.details = details
        self.client = client

    def __str__(self):
        client_msg = f" for client ({self.client.ProgramName})-{self.client.ProgramId}" if self.client else ""
        base_msg = f"APITokenError{client_msg} raised: {self.message}"
        if self.details:
            base_msg += f"\nDetails: {self.details}"
        return base_msg

class APIProgramIdError(APIError):
    def __init__(self, ProgramId,client=None,details=None):
        self.ProgramId = ProgramId
        self.details = details
        self.client = client

    def __str__(self):
        client_msg = f" for client ({self.client.ProgramName})-{self.client.ProgramId}" if self.client else ""
        base_msg = f'APIProgramIdError{client_msg} raised: ProgramId "{self.ProgramId}" is invalid'
        if self.details:
            base_msg += f"\nDetails: {self.details}"
        return base_msg

class APIDeserializeError(APIError):
    def __init__(self, client=None,details=None):
        self.details = details

    def __str__(self):
        client_msg = f" for client ({self.client.ProgramName})-{self.client.ProgramId}" if self.client else ""
        base_msg = f'APIDeserializeError{client_msg} raised'
        if self.details:
            base_msg += f"\nDetails: {self.details}"
        return base_msg

### **Pepperjam (PJ) Data Access Class**   
_Includes all methods to retrieve and deserialize commission junction API's_

In [19]:
class PJ_DataAccess:
    # Sends a single API request to advertiser endpoint arg, when addtl query params are not needed
    def getData(self, client: Client, end_point: str) -> list[dict]:   
        mylogger.debug(f'Retrieving {end_point}: for {client.ProgramId}')
        records = []
        moreRecords = True
        
        endpoint = end_point
        base_url = f'https://api.pepperjamnetwork.com/20120402/advertiser/{endpoint}?apiKey={client.apiKey}{client.LoadParams.QueryParams}&format=json'

        mylogger.info(f'...Calling {endpoint} for {client.ProgramName}-{client.ProgramId}')

        while moreRecords == True:
            try:
                resp = requests.get(url = base_url)
        
                # Convert response.text to json object
                json_response = json.loads(resp.text)

                if resp.status_code == 429:
                    raise APIResponseError(status_code = resp.status_code,ProgramId = client.ProgramId, endpoint= endpoint, message=resp.reason)

                elif resp.status_code != 200:
                    raise APIResponseError(status_code = resp.status_code,ProgramId = client.ProgramId, endpoint= endpoint, message=resp.reason)
                
            except APIError as e:
                mylogger.error(str(e))
                break

            else:
                # Add records to records list
                records.extend(json_response['data'])
        
            # Assign next href for pagination
            if "next" in json_response["meta"]['pagination']:
                base_url = json_response["meta"]['pagination']['next']['href']
            else:
                moreRecords = False
        return records

    # Sends API request to advertiser end_point argument, when start and end date are required
    def getParamData(self, client: Client, start_date: str, end_date: str, end_point: str) -> list[dict]: 
        records = []
        moreRecords = True
    
        endpoint = end_point       #Add actual endpoint from postman
        startDate = start_date
        endDate = end_date
        base_url = f'https://api.pepperjamnetwork.com/20120402/advertiser/{endpoint}?apiKey={client.apiKey}{client.LoadParams.QueryParams}&format=json&startDate={startDate}&endDate={endDate}'
        
        mylogger.info(f'...Calling {endpoint} for {client.ProgramName}-{client.ProgramId}. Start Date {startDate} End Date {endDate}')

        while moreRecords == True:
            try:
                resp = requests.get(url = base_url)
        
                # Convert response.text to json object
                json_response = json.loads(resp.text)

                if resp.status_code == 429:
                    raise APIResponseError(status_code = resp.status_code,ProgramId = client.ProgramId, endpoint= endpoint, message=resp.reason)

                elif resp.status_code != 200:
                    raise APIResponseError(status_code = resp.status_code,ProgramId = client.ProgramId, endpoint= endpoint, message=resp.reason)
                
            except APIError as e:
                mylogger.error(str(e))
                break

            else:
                # Add records to records list
                records.extend(json_response['data'])
        
            # Assign next href for pagination
            if "next" in json_response["meta"]['pagination']:
                base_url = json_response["meta"]['pagination']['next']['href']
            else:
                moreRecords = False
    
        return records 

    # Send an API request with no pagination (advertiser creative)
    def getNoPagination(self, client: Client, end_point:str) -> list[dict]: 
        records = []
        moreRecords = True
        
        endpoint = end_point
        base_url = f'https://api.pepperjamnetwork.com/20120402/advertiser/{endpoint}?apiKey={client.apiKey}&format=json'

        resp = requests.get(url = base_url)
        json_response = json.loads(resp.text)
        records.extend(json_response['data'])
        return records

### **Pepperjam (PJ) Deserialize Class**   
_Methods to convert to RDD and deserialize PepperJam API data_

In [20]:
class PJ_Deserialize:
    def _ConvertToPysparkRDD(self, json_list: list[dict]) -> pyspark.rdd.PipelinedRDD:
        # Converts the list of dictionaries returned by the get call into a Resilient Distributed Dataset (rdd)
        rdd = spark.sparkContext.parallelize(json_list)
        json_rdd = rdd.map(lambda x: json.dumps(x))
        
        return json_rdd


    # deserialize data
    def ds(self, json_list: list[dict]) -> pyspark.sql.dataframe.DataFrame:
        json_rdd = self._ConvertToPysparkRDD(json_list)
        json_df = spark.read.json(json_rdd)
        
        if "promotions" in json_df.columns:
            json_df = json_df.drop(json_df.promotions)
        if "private_affiliates" in json_df.columns:
            json_df = json_df.drop(json_df.private_affiliates)
        return json_df

    def banner_ds(self, json_list: list[dict]) -> pyspark.sql.dataframe.DataFrame:
        json_rdd = self._ConvertToPysparkRDD(json_list)
        json_df = spark.read.json(json_rdd)
        
        if "promotions" in json_df.columns:
            json_df = json_df.drop(json_df.promotions)
        if "private_affiliates" in json_df.columns:
            json_df = json_df.drop(json_df.private_affiliates)
        return json_df
    
    def banner_promotion_ds(self, json_list: list[dict]) -> pyspark.sql.dataframe.DataFrame:
        json_rdd = self._ConvertToPysparkRDD(json_list)
        json_rdd_filtered = json_rdd.filter(lambda x: len(json.loads(x).get('promotions') or []) > 0)
        json_df = spark.read.json(json_rdd_filtered)


        if "promotions" in json_df.columns:
            json_df = json_df.select(
                "id",
                explode(col("promotions")).alias("promotions")
            )
            json_df = json_df.select(
                col("id").alias("creative_banner_id"),
                col("promotions.id").alias("promotion_id"),
                col("promotions.name").alias("promotion_name"),
            )
            return json_df
        else:
            return None

    def banner_private_aff_ds(self,json_list: list[dict]) -> pyspark.sql.dataframe.DataFrame:
        json_rdd = self._ConvertToPysparkRDD(json_list)
        json_df = spark.read.json(json_rdd)

        if "private_affiliates" in json_df.columns:
            json_df = json_df.select(
                "id",
                explode(col("private_affiliates")).alias("private_affiliates")
            )
            json_df = json_df.select(
                col("id").alias("creative_banner_id"),
                col("private_affiliates.affiliate_id").alias("private_affiliate_id"),
                col("private_affiliates.company_name").alias("private_affiliate_company_name"),
                col("private_affiliates.first_name").alias("private_affiliate_first_name"),
                col("private_affiliates.last_name").alias("private_affiliate_last_name"),
            )
            return json_df
        else:
            return None

    def coupon_ds(self, json_list: list[dict]) -> pyspark.sql.dataframe.DataFrame:
        json_rdd = self._ConvertToPysparkRDD(json_list)
        json_df = spark.read.json(json_rdd)
        if "promotions" in json_df.columns:
            json_df = json_df.drop(json_df.promotions)
        if "private_affiliates" in json_df.columns:
            json_df = json_df.drop(json_df.private_affiliates)
        return json_df


    def ad_publisher_ds(self, json_list: list[dict]) -> pyspark.sql.dataframe.DataFrame:
        json_rdd = self._ConvertToPysparkRDD(json_list)
        json_df = spark.read.json(json_rdd)
        if "category" in json_df.columns:
            json_df = json_df.drop(json_df.category)
        if "promotional_method" in json_df.columns:
            json_df = json_df.drop(json_df.promotional_method)
        return json_df

    def coupon_promotion_ds(self, json_list: list[dict]) -> pyspark.sql.dataframe.DataFrame:
        json_rdd = self._ConvertToPysparkRDD(json_list)
        json_rdd_filtered = json_rdd.filter(lambda x: len(json.loads(x).get('promotions') or []) > 0)
        json_df = spark.read.json(json_rdd_filtered)


        if "promotions" in json_df.columns:
            json_df = json_df.select(
                "id",
                explode(col("promotions")).alias("promotions")
            )
            json_df = json_df.select(
                col("id").alias("creative_coupon_id"),
                col("promotions.id").alias("promotion_id"),
                col("promotions.name").alias("promotion_name"),
            )
            return json_df
        else:
            return None

    def coupon_private_aff_ds(self,json_list: list[dict]) -> pyspark.sql.dataframe.DataFrame:
        json_rdd = self._ConvertToPysparkRDD(json_list)
        json_df = spark.read.json(json_rdd)
        if "private_affiliates" in json_df.columns:
            json_df = json_df.select(
                "id",
                explode(col("private_affiliates")).alias("private_affiliates")
            )
            json_df = json_df.select(
                col("id").alias("creative_coupon_id"),
                col("private_affiliates.affiliate_id").alias("private_affiliate_id"),
                col("private_affiliates.company_name").alias("private_affiliate_company_name"),
                col("private_affiliates.first_name").alias("private_affiliate_first_name"),
                col("private_affiliates.last_name").alias("private_affiliate_last_name"),
            )
            return json_df
        else:
            return None

    def ad_publisher_category_ds(self,json_list: list[dict]) -> pyspark.sql.dataframe.DataFrame:
        json_rdd = self._ConvertToPysparkRDD(json_list)
        json_df = spark.read.json(json_rdd)
        json_df = json_df.withColumnRenamed("id", "advertiser_publisher_id")

        if("category") in json_df.columns:
            json_df = json_df.select(
                "advertiser_publisher_id",
                explode(col("category")).alias("category")
            )
            json_df = json_df.select(
                col("advertiser_publisher_id").alias("advertiser_publisher_id"),
                col("category.id").alias("category_id"),
                col("category.name").alias("category_name"),
            )
            return json_df
        else:
            return None

    def ad_publisher_promo_method_ds(self,json_list: list[dict]) -> pyspark.sql.dataframe.DataFrame:
        json_rdd = self._ConvertToPysparkRDD(json_list)
        json_df = spark.read.json(json_rdd)
        json_df = json_df.withColumnRenamed("id", "advertiser_publisher_id")

        if("promotional_method") in json_df.columns:
            json_df = json_df.select(
                "advertiser_publisher_id",
                explode(col("promotional_method")).alias("promotional_method")
            )
            json_df = json_df.select(
                col("advertiser_publisher_id").alias("advertiser_publisher_id"),
                col("promotional_method.id").alias("promotional_method_id"),
                col("promotional_method.name").alias("promotional_method_name"),
            )
            return json_df
        else:
            return None   

    def creative_text_affiliates_ds(self,json_list: list[dict]) -> pyspark.sql.dataframe.DataFrame:
        json_rdd = self._ConvertToPysparkRDD(json_list)
        json_df = spark.read.json(json_rdd)
        json_df = json_df.withColumnRenamed("id", "advertiser_id")

        if "private_affiliates" in json_df.columns:
            json_df = json_df.select(
                "advertiser_id",
                explode(col("private_affiliates")).alias("private_affiliates")
            )
            json_df = json_df.select(
                col("advertiser_id").alias("advertiser_id"),
                col("private_affiliates.affiliate_id").alias("private_affiliate_id"),
                col("private_affiliates.company_name").alias("private_affiliate_company_name"),
                col("private_affiliates.first_name").alias("private_affiliate_first_name"),
                col("private_affiliates.last_name").alias("private_affiliate_last_name"),
            )
            return json_df
        else:
            return None

    def creative_text_promo_ds(self,json_list: list[dict]) -> pyspark.sql.dataframe.DataFrame:
        json_rdd = self._ConvertToPysparkRDD(json_list)
        json_rdd_filtered = json_rdd.filter(lambda x: len(json.loads(x).get('promotions') or []) > 0)
        json_df = spark.read.json(json_rdd_filtered)
        json_df = json_df.withColumnRenamed("id", "advertiser_id")

        if("promotions") in json_df.columns:
            json_df = json_df.select(
                "advertiser_id",
                explode(col("promotions")).alias("promotions")
            )
            json_df = json_df.select(
                col("advertiser_id").alias("advertiser_id"),
                col("promotions.id").alias("promotions_id"),
                col("promotions.name").alias("promotions_name"),
            )
            return json_df
        else:
            return None



In [21]:
class PJ_ControlLogic:
    def __init__(self):
        self.clientSecretValues = self._GetClientSecretValues()
        self.dataAccess = PJ_DataAccess()
        self.deserialize = PJ_Deserialize()
        self.container = 'datalake'
        self.datalakename = 'gen3dlsprod'

    #------------------------------- PRIVATE METHODS -------------------------------#
    def _GetClientSecretValues(self):
        kvName = 'gen3-PepperJam-kv'
        kv = KeyVaultValues()
        clientSecretValues = []
        clientSecretNames = kv.GetKVSecretList(kvName)

        for secret in clientSecretNames:
            secretValue = kv.GetKVSecretJSON(kvName, secret['SecretName'], 'KeyVault_Master' )
            if secretValue.get('Program Name') is not None:
                clientSecretValues.append(
                    Client(
                        ProgramName = secretValue.get('Program Name').replace(' ','').replace('/','') if secretValue.get('Program Name') is not None else 'NotDefined',
                        apiKey = secretValue.get('API Key'),
                        ProgramId =secretValue.get('Program ID')
                    )
                )
        return clientSecretValues

    def _SetDateStartEnd(self, client: Client) -> None:
        fullLoadStartDate = date(2020, 8, 1)

        if IsFullLoad or client.LoadParams.IsDelta == False or mssparkutils.fs.exists(client.LoadParams.DLPath_PSA) == False:
            mylogger.info(f'...delta loading unavailable or toggled off for {client.ProgramName}-{client.ProgramId} {client.LoadParams.Endpoint}')
            client.LoadParams.DateStart = fullLoadStartDate
            mylogger.info(f'...loading data since {client.LoadParams.DateStart}')

        else:
            mylogger.info(f'...retrieving max_delta_value for {client.ProgramName}-{client.ProgramId} {client.LoadParams.Endpoint} from PSA')
            # Load the delta file into a dataframe
            df_destination_delta = spark.read.format("delta").load(client.LoadParams.DLPath_PSA)

            # Find the maximum delta column value
            max_delta_value = df_destination_delta.select(max(f"{client.LoadParams.DeltaColumn}")).first()[0]

            # Parse data type to [date], truncating any timestamp
            client.LoadParams.DateStart = datetime.strptime(max_delta_value.split(' ')[0], '%Y-%m-%d').date()
            client.LoadParams.DateStart = client.LoadParams.DateStart - LOAD_BLOCK
            mylogger.info(f'...loading data since {client.LoadParams.DateStart}')

        client.LoadParams.DateEnd = datetime.utcnow().date()

# TIME CLASS
    def _GetRelativeStartEndDates(self, client: Client) -> list[tuple]:
        # CHUNK ONE
        startDate       = client.LoadParams.DateStart
        startDateStr    = (datetime.strftime(startDate, '%Y-%m-%d'))
        dateDelta = relativedelta(months=client.LoadParams.RelativeDate) # CHUNK SIZE
        endDate = startDate + dateDelta
        endDateStr = (datetime.strftime(endDate, '%Y-%m-%d'))

        currDay = date.today() # call today once to avoid constant time updates
        DateBounds = []
        
        if (endDate > currDay):
            endDate = currDay
            endDateStr = (datetime.strftime(endDate, '%Y-%m-%d'))
        
        DateBounds.append((startDateStr, endDateStr))
        while (endDate < currDay):

            startDate = endDate + relativedelta(days=1)
            startDateStr = (datetime.strftime(startDate, '%Y-%m-%d'))
            
            endDate = startDate + dateDelta
            endDateStr = (datetime.strftime(endDate, '%Y-%m-%d'))

            if (endDate > currDay):
                endDate = currDay
                endDateStr = (datetime.strftime(endDate, '%Y-%m-%d'))

            DateBounds.append((startDateStr, endDateStr))

        
        return DateBounds

    def _Get1MStartEndDates(self, client: Client) -> list[tuple]:
        # CHUNK ONE
        startDate       = (datetime.strptime('2020-08-01', '%Y-%m-%d'))
        startDateStr    = (datetime.strftime(startDate, '%Y-%m-%d'))
        dateDelta = relativedelta(months=1) # CHUNK SIZE
        endDate = startDate + dateDelta
        endDateStr = (datetime.strftime(endDate, '%Y-%m-%d'))

        currDay = datetime.today() # call today once to avoid constant time updates
        DateBounds = []

        DateBounds.append((startDateStr, endDateStr))
        while (endDate < currDay):

            startDate = endDate + relativedelta(days=1)
            startDateStr = (datetime.strftime(startDate, '%Y-%m-%d'))
            
            endDate = startDate + dateDelta
            endDateStr = (datetime.strftime(endDate, '%Y-%m-%d'))

            if (endDate > currDay):
                endDate = currDay
                endDateStr = (datetime.strftime(endDate, '%Y-%m-%d'))

            DateBounds.append((startDateStr, endDateStr))

        
        return DateBounds

    def _Get6MStartEndDates(self, client: Client) -> list[tuple]:
        # CHUNK ONE
        startDate       = (datetime.strptime('2020-08-01', '%Y-%m-%d'))
        startDateStr    = (datetime.strftime(startDate, '%Y-%m-%d'))
        dateDelta = relativedelta(months=6) # CHUNK SIZE
        endDate = startDate + dateDelta
        endDateStr = (datetime.strftime(endDate, '%Y-%m-%d'))

        currDay = datetime.today() # call today once to avoid constant time updates
        DateBounds = []

        DateBounds.append((startDateStr, endDateStr))
        while (endDate < currDay):

            startDate = endDate + relativedelta(days=1)
            startDateStr = (datetime.strftime(startDate, '%Y-%m-%d'))
            
            endDate = startDate + dateDelta
            endDateStr = (datetime.strftime(endDate, '%Y-%m-%d'))

            if (endDate > currDay):
                endDate = currDay
                endDateStr = (datetime.strftime(endDate, '%Y-%m-%d'))

            DateBounds.append((startDateStr, endDateStr))

        
        return DateBounds

    def _Get1YStartEndDates(self, client: Client) -> list[tuple]:
            # CHUNK ONE
            startDate       = (datetime.strptime('2020-08-01', '%Y-%m-%d'))
            startDateStr    = (datetime.strftime(startDate, '%Y-%m-%d'))
            dateDelta = relativedelta(years=1) # CHUNK SIZE
            endDate = startDate + dateDelta
            endDateStr = (datetime.strftime(endDate, '%Y-%m-%d'))

            currDay = datetime.today() # call today once to avoid constant time updates
            DateBounds = []

            DateBounds.append((startDateStr, endDateStr))
            while (endDate < currDay):

                startDate = endDate + relativedelta(days=1)
                startDateStr = (datetime.strftime(startDate, '%Y-%m-%d'))
                
                endDate = startDate + dateDelta
                endDateStr = (datetime.strftime(endDate, '%Y-%m-%d'))

                if (endDate > currDay):
                    endDate = currDay
                    endDateStr = (datetime.strftime(endDate, '%Y-%m-%d'))

                DateBounds.append((startDateStr, endDateStr))

            
            return DateBounds
    
    def _getLatestMonth(self, client: Client) -> list[tuple]:
        startDate = date.today() + relativedelta(months= -1)
        endDate = date.today()
        return startDate, endDate

    def _WriteDataFrameToLake(self, df, client: Client) -> None:
        mylogger.info(f'...writing {client.LoadParams.Endpoint} data to DataLake for {client.ProgramName}-{client.ProgramId}')

        if client.LoadParams.IsDelta == True:

            # Add ETLImportDate & Write file to STG folder in lake
            df = df.withColumn('ETLImportDate', lit(datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')))
            df.write.mode('overwrite').parquet(client.LoadParams.DLPath_STG)

            # Run the merge activity to move table to PSA
            self._MergeDataToPSA(client)
        else:

            # Add ETLImportDate & Write file to PSA folder in lake
            df = df.withColumn('ETLImportDate', lit(datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')))
            if mssparkutils.fs.exists(client.LoadParams.DLPath_PSA):
                mssparkutils.fs.rm(client.LoadParams.DLPath_PSA, True)
            df.write.format('delta').mode('overwrite').save(client.LoadParams.DLPath_PSA)

        mylogger.info(f'File overwrite complete: {client.LoadParams.Endpoint} for {client.ProgramName}-{client.ProgramId}')

    def _MergeDataToPSA(self, client) -> None:
        # Load the stg Parquet file to a dataframe
        source_parquet = spark.read.load(client.LoadParams.DLPath_STG, format='parquet')

        if mssparkutils.fs.exists(client.LoadParams.DLPath_PSA) and client.LoadParams.IsDelta:
            mylogger.info(f'...Merging data to PSA: {client.LoadParams.Endpoint} for {client.ProgramName}-{client.ProgramId}')
            # Load the psa Delta file
            destination_delta = delta.DeltaTable.forPath(spark, client.LoadParams.DLPath_PSA)

            if client.LoadParams.MergeKeys != "":
                # Merge STG into PSA
                (
                    destination_delta.alias('t')
                        .merge(
                            source_parquet.alias('s'),
                            client.LoadParams.MergeKeys
                            )
                        .whenMatchedUpdateAll()
                    .whenNotMatchedInsertAll()
                    .execute()
                )
            else:
                # No Merge keys available, but still delta load while replacing any records from our StartDate (to avoid duplicates in overlapping load periods).
                # !! Important - the spark configuration line in the top cell to disable the "replace where check constraint" is what allows non matching records
                #                in the replaceWhere constraint to be inserted. It must disabled for this to work properly. !!
                dateStart = client.LoadParams.DateStart   #.strftime(f'{client.LoadParams.DeltaValueFormat}')
                dateEnd   = client.LoadParams.DateEnd     #.strftime(f'{client.LoadParams.DeltaValueFormat}')
                deltaCol = 'cast(' + client.LoadParams.DeltaColumn + ' as date)'
                #mylogger.info(dateStart)
                #mylogger.info(dateEnd)
                #mylogger.info(deltaCol)
                (
                    source_parquet
                        .write
                        .format("delta")
                        .mode("overwrite")
                        .option("replaceWhere", f"{deltaCol} >= '{dateStart}' AND {deltaCol} <= '{dateEnd}'") #update 
                        .save(client.LoadParams.DLPath_PSA)
                )
        else:
            mylogger.info(f'...Initial Load\n ...Creating PSA file in "DELTA" format: {client.LoadParams.Endpoint} for {client.ProgramName}-{client.ProgramId}')
            # Create dest delta table (psa)
            (
                source_parquet
                    .write
                    .format("delta")
                    .mode("overwrite") # if exists, overwrite the delta file
                    .option("overwriteSchema", "True")
                    .save(client.LoadParams.DLPath_PSA) #destination
            )

        mylogger.info(f'Merge activity complete: {client.LoadParams.Endpoint} for {client.ProgramName}-{client.ProgramId}')

    #------------------------------- PUBLIC METHODS -------------------------------#
    
    ## COPY ACTIVITIES ##
    # we added a '_' to file endpoints with '/' so folders aren't created in the lake. endpoints with no '/' are the same as the file

    
    #------ AdvertiserPublisher -----#
    def CopyDataToLake_AdvertiserPublisher(self, client: Client):
        endpoint = 'publisher'
        #Set Load Params
        client.LoadParams = LoadParams(
            IsDelta = False,
            Endpoint = {endpoint},
            DLPath_STG = f'/Pepperjam/{client.ProgramId}/STG/{endpoint}',
            DLPath_PSA = f'/Pepperjam/{client.ProgramId}/PSA/{endpoint}',
            DateStart = None,
            DateEnd = None,
            DeltaColumn = '',
            MergeKeys = 't.id = s.id',
            RelativeDate = None,
            QueryParams = ''
        )

        #Build category Id's into Load Params
        categories = self.dataAccess.getData(client = client, end_point='publisher/category')

        categoryIds = [record["id"] for record in categories]
        queryParam = "&categoryId=" + ",".join(categoryIds)

        client.LoadParams.QueryParams = queryParam

        #Build promotional_methods into Load Params
        promo_methods = self.dataAccess.getData(client = client, end_point='publisher/promotional-method')

        promoMethodIds = [record["id"] for record in promo_methods]
        queryParam = "&promotionalMethodId=" + ",".join(promoMethodIds)
    
        all_records = self.dataAccess.getData(client = client, end_point = endpoint)

        if len(all_records) > 0:
            mylogger.info(f'...Deserializing advertiser/publisher data...')
            df = self.deserialize.ad_publisher_ds(all_records)
            # Ensure the dataframe is not NULL and that all_records is the correct data type
            if type(df) != type(None) and not isinstance(all_records, str):
                self._WriteDataFrameToLake(df, client)

                self.CopyDataToLake_AdvertiserPublisherCategory(client,all_records)

                self.CopyDataToLake_AdvertiserPublisherPromotionalMethod(client,all_records)
            else:
                mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')

        else:
            mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')    

    def CopyDataToLake_AdvertiserPublisherCategory(self, client: Client,datalist):
        endpoint = 'publisher_category'
        #Set Load Params
        client.LoadParams = LoadParams(
            IsDelta = False,
            Endpoint = {endpoint},
            DLPath_STG = f'/Pepperjam/{client.ProgramId}/STG/{endpoint}',
            DLPath_PSA = f'/Pepperjam/{client.ProgramId}/PSA/{endpoint}',
            DateStart = None,
            DateEnd = None,
            DeltaColumn = '',
            MergeKeys = 't.id = s.id',
            RelativeDate = None
        )


        if len(datalist) > 0:
           mylogger.info(f'...Deserializing advertiser/publisher/category data...')
           df = self.deserialize.ad_publisher_category_ds(datalist)

           if type(df) != type(None):
               self._WriteDataFrameToLake(df, client)
           else:
               mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')

        else:
           mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')    

    def CopyDataToLake_AdvertiserPublisherPromotionalMethod(self, client: Client,datalist):
        endpoint = 'publisher_promotional-method'
        #Set Load Params
        client.LoadParams = LoadParams(
            IsDelta = False,
            Endpoint = {endpoint},
            DLPath_STG = f'/Pepperjam/{client.ProgramId}/STG/{endpoint}',
            DLPath_PSA = f'/Pepperjam/{client.ProgramId}/PSA/{endpoint}',
            DateStart = None,
            DateEnd = None,
            DeltaColumn = '',
            MergeKeys = 't.id = s.id',
            RelativeDate = None
        )

        

        if len(datalist) > 0:
            df = self.deserialize.ad_publisher_promo_method_ds(datalist)
            mylogger.info(f'...Deserializing publisher/promotional-method data...')

            if type(df) != type(None):
                self._WriteDataFrameToLake(df, client)
            else:
                mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')

        else:
            mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')    


    #------ AdvertiserTerm -----#
    def CopyDataToLake_AdvertiserTerm(self, client: Client):
        endpoint = 'term'
        #Set Load Params
        client.LoadParams = LoadParams(
            IsDelta = False,
            Endpoint = {endpoint},
            DLPath_STG = f'/Pepperjam/{client.ProgramId}/STG/{endpoint}',
            DLPath_PSA = f'/Pepperjam/{client.ProgramId}/PSA/{endpoint}',
            DateStart = None,
            DateEnd = None,
            DeltaColumn = '',
            MergeKeys = 't.id = s.id',
            RelativeDate = None
        )

        all_records = self.dataAccess.getData(client = client, end_point = endpoint)

        if len(all_records) > 0:
            df = self.deserialize.ds(all_records)
            mylogger.info(f'...Deserializing term data...')

            if type(df) != type(None):
                self._WriteDataFrameToLake(df, client)
            else:
                mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')

        else:
            mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')   


    #------ AdvertiserGroup -----#
    def CopyDataToLake_AdvertiserGroup(self, client: Client):
        endpoint = 'group'
        #Set Load Params
        client.LoadParams = LoadParams(
            IsDelta = False,
            Endpoint = {endpoint},
            DLPath_STG = f'/Pepperjam/{client.ProgramId}/STG/{endpoint}',
            DLPath_PSA = f'/Pepperjam/{client.ProgramId}/PSA/{endpoint}',
            DateStart = None,
            DateEnd = None,
            DeltaColumn = '',
            MergeKeys = 't.id = s.id',
            RelativeDate = None
        )

        all_records = self.dataAccess.getData(client = client, end_point = endpoint)

        if len(all_records) > 0:
            df = self.deserialize.ds(all_records)
            mylogger.info(f'...Deserializing group data...')

            if type(df) != type(None):
                self._WriteDataFrameToLake(df, client)
            else:
                mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')

        else:
            mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')   


    #------ Advertiser Report (requires date logic/Query Params) -----#
    def CopyDataToLake_TransactionSummary(self, client: Client):
        endpoint = 'report_transaction-summary'
        #Set Load Params
        client.LoadParams = LoadParams(
            IsDelta = True,
            Endpoint = {endpoint},
            DLPath_STG = f'/Pepperjam/{client.ProgramId}/STG/{endpoint}',
            DLPath_PSA = f'/Pepperjam/{client.ProgramId}/PSA/{endpoint}',
            DateStart = None,
            DateEnd = None,                                     
            DeltaColumn = 'date',
            MergeKeys = 't.publisher_id = s.publisher_id AND t.date = s.date',
            RelativeDate = 12,   #PLEASE note: The period of time formed by the start date and the end date must not exceed 1 year (12 months)
            QueryParams = '&groupBy=publisher_date'
        )
        self._SetDateStartEnd(client)
        DateBounds = self._GetRelativeStartEndDates(client) # set bounds of 1Y
        all_records = []
        for first, last in DateBounds:
            monthly_records = self.dataAccess.getParamData(
                client = client,
                start_date = first,
                end_date = last,
                end_point = 'report/transaction-summary'
            )
            all_records.extend(monthly_records)
            
        if len(all_records) > 0:
            df = self.deserialize.ds(all_records)
            mylogger.info(f'...Deserializing report/transaction-summary data...')
            
            if type(df) != type(None):
                self._WriteDataFrameToLake(df, client)
            else:
                mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')

        else:
            mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')

    def CopyDataToLake_ReportAccountTransactions(self, client: Client):
        endpoint = 'report_account-transactions'
        #Set Load Params
        client.LoadParams = LoadParams(
            IsDelta = False,
            Endpoint = {endpoint},
            DLPath_STG = f'/Pepperjam/{client.ProgramId}/STG/{endpoint}',
            DLPath_PSA = f'/Pepperjam/{client.ProgramId}/PSA/{endpoint}',
            DateStart = None,
            DateEnd = None,
            DeltaColumn = '',
            MergeKeys = 't.id = s.id',
            RelativeDate = 6    #PLEASE note: The period of time formed by the start date and the end date must not exceed 1 year
        )
        self._SetDateStartEnd(client)
        DateBounds = self._GetRelativeStartEndDates(client)
        all_records = []
        for first, last in DateBounds:
            monthly_records = self.dataAccess.getParamData(
                client = client,
                start_date = first,
                end_date = last,
                end_point = 'report/account-transactions'
            )
            all_records.extend(monthly_records)
            
        if len(all_records) > 0:
            df = self.deserialize.ds(all_records)
            mylogger.info(f'...Deserializing report/account-transactions data...')
            
            if type(df) != type(None):
                self._WriteDataFrameToLake(df, client)
            else:
                mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')

        else:
            mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')

    def CopyDataToLake_ReportCreativeDetails(self, client: Client):
        endpoint = 'report_creative-details'
        #Set Load Params
        client.LoadParams = LoadParams(
            IsDelta = False,                                                
            Endpoint = {endpoint},
            DLPath_STG = f'/Pepperjam/{client.ProgramId}/STG/{endpoint}',
            DLPath_PSA = f'/Pepperjam/{client.ProgramId}/PSA/{endpoint}',
            DateStart = None,
            DateEnd = None,                                                 
            DeltaColumn = '',   
            MergeKeys = '',
            RelativeDate = 12 #PLEASE note: The period of time formed by the start date and the end date must not exceed 1 year

        )
        self._SetDateStartEnd(client)
        DateBounds = self._GetRelativeStartEndDates(client)
        all_records = []
        for first, last in DateBounds:
            monthly_records = self.dataAccess.getParamData(
                client = client,
                start_date = first,
                end_date = last,
                end_point = 'report/creative-details'
            )
            all_records.extend(monthly_records)
        
        if len(all_records) > 0:
            df = self.deserialize.ds(all_records)
            mylogger.info(f'...Deserializing report/creative-details data...')
            
            if type(df) != type(None):
                self._WriteDataFrameToLake(df, client)
            else:
                mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')

        else:
            mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')
    
    def CopyDataToLake_ReportItemizedTransactionSummary(self, client: Client):
        endpoint = 'report_itemized-transaction-summary'
        #Set Load Params
        client.LoadParams = LoadParams(
            IsDelta = False,                                      
            Endpoint = {endpoint},
            DLPath_STG = f'/Pepperjam/{client.ProgramId}/STG/{endpoint}',
            DLPath_PSA = f'/Pepperjam/{client.ProgramId}/PSA/{endpoint}',
            DateStart = None,
            DateEnd = None,                                         
            DeltaColumn = '', 
            MergeKeys =  '',
            RelativeDate = 12   #Please note: The period of time formed by the start date and the end date must not exceed 1 year

        )
        self._SetDateStartEnd(client)
        DateBounds = self._GetRelativeStartEndDates(client)
        all_records = []
        for first, last in DateBounds:
            monthly_records = self.dataAccess.getParamData(
                client = client,
                start_date = first,
                end_date = last,
                end_point = 'report/itemized-transaction-summary'
            )
            all_records.extend(monthly_records)
            
        if len(all_records) > 0:
            df = self.deserialize.ds(all_records)
            mylogger.info(f'...Deserializing report/itemized-transaction-summary data...')
            
            if type(df) != type(None):
                self._WriteDataFrameToLake(df, client)
            else:
                mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')

        else:
            mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')

    # ---- Delta Loaded Tables in the solution ---- #
    def CopyDataToLake_ReportItemizedTransactions(self, client: Client):
        endpoint = 'report_itemized-transactions'
        #Set Load Params
        client.LoadParams = LoadParams(
            IsDelta = True,                                        
            Endpoint = {endpoint},
            DLPath_STG = f'/Pepperjam/{client.ProgramId}/STG/{endpoint}',
            DLPath_PSA = f'/Pepperjam/{client.ProgramId}/PSA/{endpoint}',
            DateStart = None,
            DateEnd = None,                                                
            DeltaColumn = 'sale_date',
            MergeKeys = 't.transaction_id = s.transaction_id and t.item_id = s.item_id', 
            RelativeDate = 6 #PLEASE note: The period of time formed by the start date and the end date must not exceed 6 months
        )
        self._SetDateStartEnd(client)
        DateBounds = self._GetRelativeStartEndDates(client) 
        all_records = []
        for first, last in DateBounds:
            monthly_records = self.dataAccess.getParamData(
                client = client,
                start_date = first,
                end_date = last,
                end_point = 'report/itemized-transactions'
            )
            all_records.extend(monthly_records)
            
        if len(all_records) > 0:
            df = self.deserialize.ds(all_records)
            mylogger.info(f'...Deserializing report/itemized-transactions data...')
            
            if type(df) != type(None):
                self._WriteDataFrameToLake(df, client)
            else:
                mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')

        else:
            mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')
 
    def CopyDataToLake_ReportTransactionHistory(self, client: Client):
        endpoint = 'report_transaction-history'
        #Set Load Params
        client.LoadParams = LoadParams(
            IsDelta = True,
            Endpoint = {endpoint},
            DLPath_STG = f'/Pepperjam/{client.ProgramId}/STG/{endpoint}',
            DLPath_PSA = f'/Pepperjam/{client.ProgramId}/PSA/{endpoint}',
            DateStart = None,
            DateEnd = None,                                                 
            DeltaColumn = 'process_date',                     
            MergeKeys = '',     #Data has no true grain. Need to append daily (I believe)
            RelativeDate = 1    #PLEASE note: The period of time formed by the start date and the end date must not exceed 1 months
        )
        self._SetDateStartEnd(client)
        DateBounds = self._GetRelativeStartEndDates(client) # set bounds of 1M
        all_records = []
        for first, last in DateBounds:
            monthly_records = self.dataAccess.getParamData(
                client = client,
                start_date = first,
                end_date = last,
                end_point = 'report/transaction-history'
            )
            all_records.extend(monthly_records)
            
        if len(all_records) > 0:
            df = self.deserialize.ds(all_records)
            mylogger.info(f'...Deserializing report/transaction-history data...')
            if type(df) != type(None):
                self._WriteDataFrameToLake(df, client)
            else:
                mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')

        else:
            mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')

    def CopyDataToLake_TransactionDetail(self, client: Client):
        endpoint = 'report_transaction-details'
        #Set Load Params
        client.LoadParams = LoadParams(
            IsDelta = True,
            Endpoint = {endpoint},
            DLPath_STG = f'/Pepperjam/{client.ProgramId}/STG/{endpoint}',
            DLPath_PSA = f'/Pepperjam/{client.ProgramId}/PSA/{endpoint}',
            DateStart = None,
            DateEnd = None,
            DeltaColumn = 'sale_date',
            MergeKeys = 't.transaction_id = s.transaction_id',
            RelativeDate = 6 #PLEASE note: The period of time formed by the start date and the end date must not exceed 6 months
        )

        self._SetDateStartEnd(client)

        # Run the _Get6MStartEndDates function to get the appropriate months for the param
        DateBounds = self._GetRelativeStartEndDates(client)
        all_records = []

        # Start for loop for each month in the bounds list
        for (first, last) in DateBounds:
            monthly_records = self.dataAccess.getParamData(
                client = client,
                start_date = first,
                end_date = last,
                end_point = 'report/transaction-details'
            )
            all_records.extend(monthly_records)

        if len(all_records) > 0:
            df = self.deserialize.ds(all_records)
            mylogger.info(f'...Deserializing report/transaction-details data...')
            
            if type(df) != type(None):
                self._WriteDataFrameToLake(df, client)
            else:
                mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')

        else:
            mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')


    #------ Advertiser Creative -----#
        #no pagination
    def CopyDataToLake_AdvertiserCreativeGeneric(self, client: Client):
        endpoint = 'creative_generic'
        #Set Load Params
        client.LoadParams = LoadParams(
            IsDelta = False,
            Endpoint = {endpoint},
            DLPath_STG = f'/Pepperjam/{client.ProgramId}/STG/{endpoint}',
            DLPath_PSA = f'/Pepperjam/{client.ProgramId}/PSA/{endpoint}',
            DateStart = None,
            DateEnd = None,
            DeltaColumn = '',
            MergeKeys = '',
            RelativeDate = None
        )

        all_records = self.dataAccess.getNoPagination(client = client, end_point = endpoint)

        if len(all_records) > 0:
            df = self.deserialize.ds(all_records)
            mylogger.info(f'...Deserializing creative/generic data...')

            if type(df) != type(None):
                self._WriteDataFrameToLake(df, client)
            else:
                mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')

        else:
            mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')  

    def CopyDataToLake_AdvertiserCreativeText(self, client: Client):
        endpoint = 'creative_text'
        #Set Load Params
        client.LoadParams = LoadParams(
            IsDelta = False,
            Endpoint = {endpoint},
            DLPath_STG = f'/Pepperjam/{client.ProgramId}/STG/{endpoint}',
            DLPath_PSA = f'/Pepperjam/{client.ProgramId}/PSA/{endpoint}',
            DateStart = None,
            DateEnd = None,
            DeltaColumn = '',
            MergeKeys = 't.id = s.id',
            RelativeDate = None,
            QueryParams = ''
        )

        all_records = self.dataAccess.getData(client = client, end_point = 'creative/text')

        
        if len(all_records) > 0:
            df = self.deserialize.ds(all_records)
            mylogger.info(f'...Deserializing creative/text data...')

            if type(df) != type(None):
                self._WriteDataFrameToLake(df, client) 

                self.CopyDataToLake_AdvertiserCreativeTextPrivateAffiliates(client,all_records)

                self.CopyDataToLake_AdvertiserCreativeTextPromotions(client,all_records)
                
            else:
                mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')

        else:
            mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')  

    def CopyDataToLake_AdvertiserCreativeTextPrivateAffiliates(self, client: Client,datalist):
        endpoint = 'creative_text_affiliates'
        #Set Load Params
        client.LoadParams = LoadParams(
            IsDelta = False,
            Endpoint = {endpoint},
            DLPath_STG = f'/Pepperjam/{client.ProgramId}/STG/{endpoint}',
            DLPath_PSA = f'/Pepperjam/{client.ProgramId}/PSA/{endpoint}',
            DateStart = None,
            DateEnd = None,
            DeltaColumn = '',
            MergeKeys = 't.id = s.id',
            RelativeDate = None
        )

        

        
        if len(datalist) > 0:
            df = self.deserialize.creative_text_affiliates_ds(datalist)
            mylogger.info(f'...Deserializing creative/text data...')

            if type(df) != type(None):
                self._WriteDataFrameToLake(df, client) 
            else:
                mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')

        else:
            mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}') 

    def CopyDataToLake_AdvertiserCreativeTextPromotions(self, client: Client,datalist):
        endpoint = 'creative_text_promotions'
        #Set Load Params
        client.LoadParams = LoadParams(
            IsDelta = False,
            Endpoint = {endpoint},
            DLPath_STG = f'/Pepperjam/{client.ProgramId}/STG/{endpoint}',
            DLPath_PSA = f'/Pepperjam/{client.ProgramId}/PSA/{endpoint}',
            DateStart = None,
            DateEnd = None,
            DeltaColumn = '',
            MergeKeys = 't.id = s.id',
            RelativeDate = None
        )

        

        
        if len(datalist) > 0:
            df = self.deserialize.creative_text_promo_ds(datalist)
            mylogger.info(f'...Deserializing creative/text data...')

            if type(df) != type(None):
                self._WriteDataFrameToLake(df, client) 
            else:
                mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')

        else:
            mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}') 

    def CopyDataToLake_AdvertiserCreativeBanner(self, client: Client):
        endpoint = 'creative_banner'
        #Set Load Params
        client.LoadParams = LoadParams(
            IsDelta = False,
            Endpoint = {endpoint},
            DLPath_STG = f'/Pepperjam/{client.ProgramId}/STG/{endpoint}',
            DLPath_PSA = f'/Pepperjam/{client.ProgramId}/PSA/{endpoint}',
            DateStart = None,
            DateEnd = None,
            DeltaColumn = '',
            MergeKeys = 't.id = s.id',
            RelativeDate = None
        )

        all_records = self.dataAccess.getData(client = client, end_point='creative/banner')

        if len(all_records) > 0:
            mylogger.info(f'...Deserializing creative/banner data...')
            df = self.deserialize.banner_ds(all_records)

            if type(df) != type(None):
                mylogger.info(f'...Writing Creative Banner to lake...') 
                self._WriteDataFrameToLake(df, client) 
                              
            else:
                mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')

            self.CopyDataToLake_AdvertiserCreativeBannerPromotions(client,all_records)
            self.CopyDataToLake_AdvertiserCreativeBannerPrivateAffiliates(client,all_records)
  
        else:
            mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}') 


    def CopyDataToLake_AdvertiserCreativeBannerPromotions(self, client: Client, datalist):
        endpoint = 'creative_banner-promotions'
        #Set Load Params
        client.LoadParams = LoadParams(
            IsDelta = False,
            Endpoint = {endpoint},
            DLPath_STG = f'/Pepperjam/{client.ProgramId}/STG/{endpoint}',
            DLPath_PSA = f'/Pepperjam/{client.ProgramId}/PSA/{endpoint}',
            DateStart = None,
            DateEnd = None,
            DeltaColumn = '',
            MergeKeys = 't.id = s.id',
            RelativeDate = None
        )
        if len(datalist) > 0:
            df = self.deserialize.banner_promotion_ds(datalist)
            mylogger.info(f'...Deserializing creative/banner promotion data...')

            if type(df) != type(None):
                mylogger.info(f'...Writing Creative Banner Promotions to lake...')   
                self._WriteDataFrameToLake(df, client)
            else:
                mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')

        else:
            mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}') 

    def CopyDataToLake_AdvertiserCreativeBannerPrivateAffiliates(self, client: Client, datalist):
        endpoint = 'creative_banner-private-affiliates'
        #Set Load Params
        client.LoadParams = LoadParams(
            IsDelta = False,
            Endpoint = {endpoint},
            DLPath_STG = f'/Pepperjam/{client.ProgramId}/STG/{endpoint}',
            DLPath_PSA = f'/Pepperjam/{client.ProgramId}/PSA/{endpoint}',
            DateStart = None,
            DateEnd = None,
            DeltaColumn = '',
            MergeKeys = 't.id = s.id',
            RelativeDate = None
        )
        if len(datalist) > 0:
            df = self.deserialize.banner_private_aff_ds(datalist)
            mylogger.info(f'...Deserializing creative/banner affiliate data...')

            if type(df) != type(None):
                mylogger.info(f'...Writing Creative Banner Affiliates to lake...')
                self._WriteDataFrameToLake(df, client) 
            else:
                mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')

        else:
            mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')  

    def CopyDataToLake_AdvertiserCreativeCoupon(self, client: Client):
        endpoint = 'creative_coupon'
        #Set Load Params
        client.LoadParams = LoadParams(
            IsDelta = False,
            Endpoint = {endpoint},
            DLPath_STG = f'/Pepperjam/{client.ProgramId}/STG/{endpoint}',
            DLPath_PSA = f'/Pepperjam/{client.ProgramId}/PSA/{endpoint}',
            DateStart = None,
            DateEnd = None,
            DeltaColumn = '',
            MergeKeys = 't.id = s.id',
            RelativeDate = None
        )

        all_records = self.dataAccess.getData(client = client, end_point = 'creative/coupon')

        if len(all_records) > 0:
            df = self.deserialize.coupon_ds(all_records)
            mylogger.info(f'...Deserializing creative/coupon data...')
            if type(df) != type(None):
                self._WriteDataFrameToLake(df, client)
            else:
                mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')
            
            
            self.CopyDataToLake_AdvertiserCreativeCouponPrivateAffiliates(client,all_records)

            self.CopyDataToLake_AdvertiserCreativeCouponPromotions(client,all_records)

        else:
            mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}') 

        

    def CopyDataToLake_AdvertiserCreativeCouponPromotions(self, client: Client, datalist):
        endpoint = 'creative_coupon-promotions'
        #Set Load Params
        client.LoadParams = LoadParams(
            IsDelta = False,
            Endpoint = {endpoint},
            DLPath_STG = f'/Pepperjam/{client.ProgramId}/STG/{endpoint}',
            DLPath_PSA = f'/Pepperjam/{client.ProgramId}/PSA/{endpoint}',
            DateStart = None,
            DateEnd = None,
            DeltaColumn = '',
            MergeKeys = 't.id = s.id',
            RelativeDate = None
        )
        if len(datalist) > 0:
            df = self.deserialize.coupon_promotion_ds(datalist)
            mylogger.info(f'...Deserializing creative/coupon promotion data...')

            if type(df) != type(None):
                mylogger.info(f'...Writing Creative Coupon Promotions to lake...') 
                self._WriteDataFrameToLake(df, client)
            else:
                mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')

        else:
            mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}') 

    def CopyDataToLake_AdvertiserCreativeCouponPrivateAffiliates(self, client: Client, datalist):
        endpoint = 'creative_coupon-private-affiliates'
        #Set Load Params
        client.LoadParams = LoadParams(
            IsDelta = False,
            Endpoint = {endpoint},
            DLPath_STG = f'/Pepperjam/{client.ProgramId}/STG/{endpoint}',
            DLPath_PSA = f'/Pepperjam/{client.ProgramId}/PSA/{endpoint}',
            DateStart = None,
            DateEnd = None,
            DeltaColumn = '',
            MergeKeys = 't.id = s.id',
            RelativeDate = None
        )
        if len(datalist) > 0:
            df = self.deserialize.coupon_private_aff_ds(datalist)
            mylogger.info(f'...Deserializing creative/coupon affiliates data...')

            if type(df) != type(None):
                mylogger.info(f'...Writing Creative Coupon Affiliates to lake...')   
                self._WriteDataFrameToLake(df, client)
            else:
                mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')

        else:
            mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}') 


    def CopyDataToLake_AdvertiserCreativeProduct(self, client: Client):
        endpoint = 'creative_product'
        #Set Load Params
        client.LoadParams = LoadParams(
            IsDelta = False,
            Endpoint = {endpoint},
            DLPath_STG = f'/Pepperjam/{client.ProgramId}/STG/{endpoint}',
            DLPath_PSA = f'/Pepperjam/{client.ProgramId}/PSA/{endpoint}',
            DateStart = None,
            DateEnd = None,
            DeltaColumn = '',
            MergeKeys = '',
            RelativeDate = None
        )

        all_records = self.dataAccess.getData(client = client, end_point='creative/product')

        if len(all_records) > 0:
            df = self.deserialize.ds(all_records)
            mylogger.info(f'...Deserializing creative/product data...')

            if type(df) != type(None):
                self._WriteDataFrameToLake(df, client)
            else:
                mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')

        else:
            mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')  



    def CopyDataToLake_AdvertiserCreativePromotion(self, client: Client):
        endpoint = 'creative_promotion'
        #Set Load Params
        client.LoadParams = LoadParams(
            IsDelta = False,
            Endpoint = {endpoint},
            DLPath_STG = f'/Pepperjam/{client.ProgramId}/STG/{endpoint}',
            DLPath_PSA = f'/Pepperjam/{client.ProgramId}/PSA/{endpoint}',
            DateStart = None,
            DateEnd = None,
            DeltaColumn = '',
            MergeKeys = 't.id = s.id',
            RelativeDate = None
        )

        all_records = self.dataAccess.getData(client = client, end_point = 'creative/promotion')

        if len(all_records) > 0:
            df = self.deserialize.ds(all_records)
            mylogger.info(f'...Deserializing creative/promotion data...')

            if type(df) != type(None):
                self._WriteDataFrameToLake(df, client)
            else:
                mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')

        else:
            mylogger.info(f'No {client.LoadParams.Endpoint} data available for {client.ProgramName}-{client.ProgramId}')  


    
    #-------- Master Control Method - Load ALL-----#
    def LoadAllClientData(self) -> None:
        for client in self.clientSecretValues:
            mylogger.info(f'Copying PJ data for client: {client.ProgramName}...')

            #self.CopyDataToLake_AdvertiserPublisher(client)


            #self.CopyDataToLake_AdvertiserTerm(client)

            #self.CopyDataToLake_AdvertiserGroup(client)

            #self.CopyDataToLake_ReportAccountTransactions(client)
            #self.CopyDataToLake_ReportCreativeDetails(client)
            #self.CopyDataToLake_ReportItemizedTransactionSummary(client)

            #Delta Loaded Tables 
            #self.CopyDataToLake_ReportItemizedTransactions(client)
            #self.CopyDataToLake_ReportTransactionHistory(client)
            #self.CopyDataToLake_TransactionDetail(client)

            #self.CopyDataToLake_AdvertiserCreativeGeneric(client)
            #self.CopyDataToLake_AdvertiserCreativeText(client)
            #self.CopyDataToLake_AdvertiserCreativeBanner(client)
            #self.CopyDataToLake_AdvertiserCreativeCoupon(client)
            #self.CopyDataToLake_AdvertiserCreativeProduct(client)
            #self.CopyDataToLake_AdvertiserCreativePromotion(client)



            mylogger.info(f'Finished copying PJ data for client: {client.ProgramName} \n\n')
        mylogger.info(f'Finished copying PJ data for all available clients')

    def LoadClientData(self,client: Client) -> None:
        mylogger.info(f'Copying PJ data for client: {client.ProgramName}...')

        self.CopyDataToLake_AdvertiserPublisher(client)


        self.CopyDataToLake_AdvertiserTerm(client)

        self.CopyDataToLake_AdvertiserGroup(client)

        self.CopyDataToLake_ReportAccountTransactions(client)
        self.CopyDataToLake_ReportCreativeDetails(client)
        self.CopyDataToLake_ReportItemizedTransactionSummary(client)
        self.CopyDataToLake_TransactionSummary(client)

        #Delta Loaded Tables 
        self.CopyDataToLake_ReportItemizedTransactions(client)
        self.CopyDataToLake_ReportTransactionHistory(client)
        self.CopyDataToLake_TransactionDetail(client)

        self.CopyDataToLake_AdvertiserCreativeGeneric(client)
        self.CopyDataToLake_AdvertiserCreativeText(client)
        self.CopyDataToLake_AdvertiserCreativeBanner(client)
        self.CopyDataToLake_AdvertiserCreativeCoupon(client)
        self.CopyDataToLake_AdvertiserCreativeProduct(client)
        self.CopyDataToLake_AdvertiserCreativePromotion(client)



        mylogger.info(f'Finished copying PJ data for client: {client.ProgramName} \n\n')


In [22]:
def main():
    PJAPIDataFetcher = PJ_ControlLogic()
    PJAPIDataFetcher.LoadAllClientData()


## **Run the main program to retrieve all data**

In [23]:
#main()

In [None]:
PJAPIDataFetcher = PJ_ControlLogic()
def handle_client_data(client):
    try:
        PJAPIDataFetcher.LoadClientData(client)
        return client, None  # Return None for error if successful
    except Exception as e:
        return client, e  # Return the exception if an error occurs

# Declare or refine client list
clientsToProcess = [client for client in PJAPIDataFetcher.clientSecretValues]
# Debug a client ID
# clientsToProcess = [client for client in PJAPIDataFetcher.clientSecretValues if client.ProgramId == '9310']

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    futures = {executor.submit(handle_client_data, client): client for client in clientsToProcess}

    for future in concurrent.futures.as_completed(futures):
        client = futures[future]  # Retrieve the client associated with this future
        result, error = future.result()
        if error:
            mylogger.error(f'An error occurred on {client.ProgramName} - {client.ProgramId}: {str(error)}')


In [25]:
#PJAPIDataFetcher = PJ_ControlLogic()
#with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
#    futures = [executor.submit(PJAPIDataFetcher.LoadClientData, client) for client in PJAPIDataFetcher.clientSecretValues]
#
#    for future in concurrent.futures.as_completed(futures):
#        try:
#            future.result()
#        except Exception as e:
#            mylogger.error(f'An error occured on {client.ProgramName} - {client.ProgramId}: {str(e)}')
#
#mylogger.info('All client data has been processed.\n\n')


## **Testing cells**
This area houses notebook cells which are in use to test the functionality of methods and functions above.

In [26]:
#Testing Cell

# old copy activities 
#PJAPIDataFetcher = PJ_ControlLogic()
## Does just what the LoadAllClientData() method does, but it explicitly calls each function for more visibility in back end logs
##------ LOAD TABLES ------#BombshellSportswear - 9943
#for client in PJAPIDataFetcher.clientSecretValues:
#    if client.ProgramId == '9943':
#        PJAPIDataFetcher.LoadClientData(client)
#        mylogger.info(f'\n\nLOADING CLIENT {client.ProgramId}\n\n')
#        PJAPIDataFetcher.CopyDataToLake_AdvertiserCreativeBanner(client)
    #PJAPIDataFetcher.CopyDataToLake_AdvertiserCreativeCoupon(client)
#    if client.ProgramId == '2337':
#        
#        PJAPIDataFetcher.CopyDataToLake_AdvertiserPublisher(client)
#
#
#        PJAPIDataFetcher.CopyDataToLake_AdvertiserTerm(client)
#
#        PJAPIDataFetcher.CopyDataToLake_AdvertiserGroup(client)
#
#        PJAPIDataFetcher.CopyDataToLake_ReportAccountTransactions(client)
#        PJAPIDataFetcher.CopyDataToLake_ReportCreativeDetails(client)
#        PJAPIDataFetcher.CopyDataToLake_ReportItemizedTransactionSummary(client)
#        PJAPIDataFetcher.CopyDataToLake_TransactionSummary(client)
#
#        #Delta Loaded Tables 
#        PJAPIDataFetcher.CopyDataToLake_ReportItemizedTransactions(client)
#        PJAPIDataFetcher.CopyDataToLake_ReportTransactionHistory(client)
#        PJAPIDataFetcher.CopyDataToLake_TransactionDetail(client)
#
#        PJAPIDataFetcher.CopyDataToLake_AdvertiserCreativeGeneric(client)
#        PJAPIDataFetcher.CopyDataToLake_AdvertiserCreativeText(client)

#        PJAPIDataFetcher.CopyDataToLake_AdvertiserCreativeProduct(client)
#        PJAPIDataFetcher.CopyDataToLake_AdvertiserCreativePromotion(client)
#
##
#mylogger.info(f'Data load finished')

In [27]:


#TestClient =  PJAPIDataFetcher.clientSecretValues[1]
#data = PJAPIDataFetcher.CopyDataToLake_ReportCreativeDetails(TestClient) #grabbing the FOURTH value in the list 
#print(TestClient)
# #PJAPIDataFetcher.LoadAllClientData(TestClient)


# # publisher
# PJAPIDataFetcher.CopyDataToLake_AdvertiserPublisher(TestClient)
#data = PJAPIDataFetcher.CopyDataToLake_AdvertiserPublisherCategory(TestClient)
# PJAPIDataFetcher.CopyDataToLake_AdvertiserPublisherPromotionalMethod(TestClient)

# # term
# PJAPIDataFetcher.CopyDataToLake_AdvertiserTerm(TestClient)

# # Group
# PJAPIDataFetcher.CopyDataToLake_AdvertiserGroup(TestClient)

# #Report 
# PJAPIDataFetcher.CopyDataToLake_TransactionSummary(TestClient)
# PJAPIDataFetcher.CopyDataToLake_ReportAccountTransactions(TestClient)
# PJAPIDataFetcher.CopyDataToLake_ReportCreativeDetails(TestClient)
# PJAPIDataFetcher.CopyDataToLake_ReportItemizedTransactionSummary(TestClient)


# #Delta
# PJAPIDataFetcher.CopyDataToLake_TransactionDetail(TestClient)
# PJAPIDataFetcher.CopyDataToLake_ReportItemizedTransactions(TestClient)
# PJAPIDataFetcher.CopyDataToLake_ReportTransactionHistory(TestClient)   #Removing for now since it takes the longest

# # Creative
# PJAPIDataFetcher.CopyDataToLake_AdvertiserCreativeGeneric(TestClient) #most clients have no response
#ata =  PJAPIDataFetcher.CopyDataToLake_AdvertiserCreativeText(TestClient)
#df = PJAPIDataFetcher.CopyDataToLake_AdvertiserCreativeBannerPromotions(TestClient)
#df = PJAPIDataFetcher.CopyDataToLake_AdvertiserCreativeBanner(TestClient)
#df = PJAPIDataFetcher.CopyDataToLake_AdvertiserCreativeCoupon(TestClient)
# PJAPIDataFetcher.CopyDataToLake_AdvertiserCreativeProduct(TestClient)
# PJAPIDataFetcher.CopyDataToLake_AdvertiserCreativePromotion(TestClient)



In [28]:

#len(PJAPIDataFetcher.clientSecretValues)
#df.show()
#data

In [29]:
    #advertiser/report/order-status             = Forbidden (Code: 403)
    #advertiser/report/demand-detail            = Forbidden (Code: 403)
    #advertiser/report/demand-summary           = Forbidden (Code: 403)
    #advertiser/report/itemized-demand-detail   = Forbidden (Code: 403)
    #advertiser/report/itemized-demand-summary  = Forbidden (Code: 403)


    #advertiser/report/order-commission-rule    = dependent on transactionid
    #advertiser/itemized-list                   = 2 records
    #advertiser/itemized-list/product           = listId required
    #advertiser/group/member                    = groupId required



