In [1]:
from viadot.tasks import C4CToDF, AzureSQLToDF, AzureSQLUpsert, SalesforceUpsert
from viadot.task_utils import add_ingestion_metadata_task
from viadot.utils import get_flow_last_run_date
from prefect import Flow, task, Parameter
import prefect
import pandas as pd
from prefect.executors import LocalDaskExecutor
from prefect.storage import Git
from prefect.run_configs import DockerRun
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
from prefect.engine import signals
from prefect.engine.state import Failed, Skipped
from prefect.utilities.notifications import slack_notifier
from typing import List

c4c_to_df_task = C4CToDF()
query_to_df_task = AzureSQLToDF()
azure_sql_upsert_task = AzureSQLUpsert()
sf_upsert_task = SalesforceUpsert()

schedule = Schedule(clocks=[CronClock("*/15 * * * *")])
SLACK_HANDLER = slack_notifier(only_states=[Failed])
STORAGE = Git(
    flow_path="c4c_to_sfdc/flows/sync/corporate_account_incremental.py",
    repo="",
    git_clone_url_secret_name="integrations_repo_url",
)
RUN_CONFIG = DockerRun(
    image="docker.pkg.github.com/dyvenia/viadot/viadot:dev", labels=["prod"]
)
EXECUTOR = LocalDaskExecutor()


ENVIRONMENTS = {
    "qa": {
        "name": "qa",
        "C4C_INSTANCE": "my336539",
        "C4C_CREDENTIALS_SECRET": "aia-c4c-qa",
        "AZURE_SQL_SCHEMA": "sandbox",
        "AZURE_SQL_CREDENTIALS_SECRET": "azuwevelcrsql-s-001",
        "SALESFORCE_CREDENTIALS_SECRET": "salesforce-qa",
        "SALESFORCE_DOMAIN": "test",
    },
    "prod": {
        "name": "prod",
        "C4C_INSTANCE": "my341115",
        "C4C_CREDENTIALS_SECRET": "aia-c4c-prod",
        "AZURE_SQL_SCHEMA": "raw",
        "AZURE_SQL_CREDENTIALS_SECRET": "azuwevelcesql-p-001",
        "SALESFORCE_CREDENTIALS_SECRET": "salesforce-prod",
    }
}

env = ENVIRONMENTS["prod"]


@task
def get_account_c4c_query_params(
    cursor_field: str = "EntityLastChangedOn",
    cursor_value: str = None,
    filters: str = None,
) -> dict:
    """
    Create parameters for a C4C API query.

    Args:
        cursor_field (str): The cursor field to use for filtering the data. By default,
        EntityLastChangedOn.
        cursor_value (str): The cursor value to use for filtering the data. This is a datetime
        value by which we filter C4C records. Has to be in this exact format:
        YYYY-MM-HHT00:00:00Z
        filters (dict, optional): Any additional filters to apply to the query. Note that
        mathematical symbols should be replaced by equivalent words, eg. 'eq', 'lt', etc.
        Will be passed to the $filter query parameter.
    """

    DEFAULT_PARAMS = {"$orderby": cursor_field}

    if cursor_value:
        cursor_filter = f"{cursor_field} ge datetimeoffset'{cursor_value}'"
    else:
        cursor_filter = ""

    if not filters:
        filters = ""

    if filters:
        if cursor_filter:
            combined_filters = filters + " and " + cursor_filter
        else:
            combined_filters = filters
    else:
        if cursor_filter:
            combined_filters = cursor_filter
        else:
            combined_filters = None

    query_params = DEFAULT_PARAMS.copy()

    if combined_filters:
        query_params.update({"$filter": combined_filters})

    return query_params


@task
def get_contact_c4c_query_params(
    acc_ids: List[str], cursor_field: str = "EntityLastChangedOn"
) -> dict:
    
    DEFAULT_PARAMS = {"$orderby": cursor_field}
    

    def get_account_id_filter(ids: List[str]) -> str:

        if not ids:
            return ""

        account_ids_quoted = ["'" + str(aid) + "'" for aid in ids]
        conditions = [f"AccountID eq {aid}" for aid in account_ids_quoted]
        return " or ".join(conditions)

    acc_ids_filter = get_account_id_filter(acc_ids)
    
    query_params = DEFAULT_PARAMS.copy()
    if acc_ids_filter:
        query_params.update({"$filter": acc_ids_filter})
        
    return query_params


@task
def add_updated_at_field(c4c_df: pd.DataFrame) -> pd.DataFrame:
    c4c_df = c4c_df.copy()

    c4c_df["updated_at_timestamp"] = c4c_df["EntityLastChangedOn"].str.extract(
        "(?:[/]Date\\((.*?)\\)[/])"
    )
    c4c_df["updated_at"] = pd.to_datetime(c4c_df["updated_at_timestamp"], unit="ms")

    c4c_df.drop("updated_at_timestamp", axis=1, inplace=True)

    return c4c_df


@task
def remove_c4c_metadata_cols(
    c4c_df: pd.DataFrame, metadata_cols: List[str]
) -> pd.DataFrame:

    if c4c_df.empty:
        raise signals.ENDRUN(Skipped("No data to process."))

    C4C_METADATA_COLS = ["__metadata"]
    METADATA_COLS = C4C_METADATA_COLS + metadata_cols
    cleaned = c4c_df.drop(METADATA_COLS, axis=1)
    return cleaned


@task
def get_account_transform_query(
    cursor_field: str, cursor_value: str, schema: str, table: str
) -> str:
    """Retrieve the freshly upserted data to load it further into Salesforce"""
    query = f"""
    SELECT
        a.Name,
        a.LanguageCode AS Language__c,
        a.City AS BillingCity,
        a.StateCodeText AS Region__c,
        a.StreetPostalCode AS BillingPostalCode,
        a.CountryCode AS BillingCountry,
        a.Street AS BillingStreet,
        a.IndustrialSectorCodeText AS Industry,
        a.ZACCOUNT_VMS_VMSClassification_KUT AS Classification__c,
        a.Phone,
        a.Fax,
        a.RoleCode AS AccountGroup__c,
        a.Email AS BusinessEmail__c,
        CASE
            WHEN a.CountryCode = 'FR' THEN '1100'
            WHEN a.CountryCode = 'DE' THEN '1010'
        END AS SalesOrg__c,
        a.AccountID AS SAPCustomerId__c,
        a.StateCodeText AS BillingState,
        a.Z_Attribute10_KUT AS AvailableInCC__c,
        '0122o000000coQxAAI' AS RecordTypeId
    FROM {schema}.{table} a
    WHERE {cursor_field} >= '{cursor_value}'
    """
    return query



@task
def get_contact_transform_query(schema: str, table: str, acc_ids: List[str]) -> str:
    account_ids_quoted = ["'" + str(aid) + "'" for aid in acc_ids]
    query = f"""
    SELECT
        c.LanguageCode AS Language__c,
        c.Phone,
        c.ExternalID AS SAPContactId__c,
        c.AccountID AS RelatedAccountSAPCustID__c,
        c.TitleCodeText AS Salutation,
        c.Email,
        c.Fax,
        c.Mobile AS MobilePhone,
        c.LastName,
        c.FirstName,
        c.JobTitle AS Title,
        c.DepartmentCodeText AS Department,
        c.FunctionCodeText AS Function__c
    FROM {schema}.{table} c
    WHERE c.AccountID IN ({", ".join(account_ids_quoted)})
    """
    return query


@task
def evaluate_cursor_value(cursor_value: str, flow_name: str):
    return cursor_value or get_flow_last_run_date(flow_name)


@task
def get_account_ids(accounts_df: pd.DataFrame) -> List[str]:
    return accounts_df.SAPCustomerId__c.tolist()


with Flow(
    "incr_c4c_corporate_account_sfdc_account",
    # storage=STORAGE,
    # run_config=RUN_CONFIG,
    # state_handlers=[SLACK_HANDLER],
    executor=EXECUTOR,
    # schedule=schedule,
) as f:
    """Integration of Cloud for Customers CorporateAccountCollection endpoint into Salesforce Account object"""

    # Constants
    ENV_NAME = env["name"]
    C4C_INSTANCE = env["C4C_INSTANCE"]
    URL = f"https://{C4C_INSTANCE}.crm.ondemand.com/sap/c4c/odata/v1/c4codataapi/"
    ACCOUNT_ENDPOINT = "CorporateAccountCollection"
    FILTERS = "(CountryCode eq 'FR' or CountryCode eq 'DK' or CountryCode eq 'SE' or CountryCode eq 'NO') and RoleCode eq 'Z040' and Z_Attribute10_KUT eq 'Z1' and LifeCycleStatusCode eq '2'"
    C4C_ACCOUNT_METADATA_COLS = [
        "CorporateAccountAddress",
        "CorporateAccountAttachmentFolder",
        "CorporateAccountHasContactPerson",
        "CorporateAccountIdentification",
        "CorporateAccountInternationalVersion",
        "CorporateAccountSalesData",
        "CorporateAccountSkills",
        "CorporateAccountTaxNumber",
        "CorporateAccountTeam",
        "CorporateAccountTextCollection",
        "CorporateAccountVisitingHours",
        "CorporateAccountVisitingInformationDetails",
        "OwnerEmployeeBasicData",
        "ParentAccount",
    ]

    CONTACT_ENDPOINT = "ContactCollection"
    C4C_CONTACT_METADATA_COLS = [
        "ContactAttachmentFolder",
        "ContactInternationalVersion",
        "ContactIsContactPersonFor",
        "ContactOwnerEmployeeBasicData",
        "ContactPersonalAddress",
        "ContactTextCollection",
        "CorporateAccount",
    ]

    AZURE_SQL_CURSOR_FIELD = "updated_at"
    AZURE_SQL_SCHEMA = env["AZURE_SQL_SCHEMA"]
    
    ACCOUNT_AZURE_SQL_TABLE = "c4c_corporate_account"
    ACCOUNT_PRIMARY_KEY = "AccountID"
    ACCOUNT_SF_TABLE = "Account"
    ACCOUNT_SF_EXTERNAL_ID_FIELD = "SAPCustomerId__c"
    
    CONTACT_AZURE_SQL_TABLE = "c4c_contact"
    CONTACT_PRIMARY_KEY = "ExternalID"
    CONTACT_SF_TABLE = "Contact"
    CONTACT_SF_EXTERNAL_ID_FIELD = "SAPContactId__c"
    
    C4C_CREDENTIALS_SECRET = env["C4C_CREDENTIALS_SECRET"]
    AZURE_SQL_CREDENTIALS_SECRET = env["AZURE_SQL_CREDENTIALS_SECRET"]
    SALESFORCE_CREDENTIALS_SECRET = env["SALESFORCE_CREDENTIALS_SECRET"]
    
    SALESFORCE_DOMAIN = env.get("SALESFORCE_DOMAIN")

    # Cursor
    manual_cursor_value = Parameter("cursor_value", default=None)
    cursor_value = evaluate_cursor_value(manual_cursor_value, f.name)

    # Account
    account_c4c_query_params = get_account_c4c_query_params(cursor_value=cursor_value, filters=FILTERS)
    df_account = c4c_to_df_task(
        url=URL,
        endpoint=ACCOUNT_ENDPOINT,
        params=account_c4c_query_params,
        credentials_secret=C4C_CREDENTIALS_SECRET,
    )
    df_account_cleaned = remove_c4c_metadata_cols(df_account, metadata_cols=C4C_ACCOUNT_METADATA_COLS)
    df_account_with_updated_at_field = add_updated_at_field(df_account_cleaned)
    df_account_with_metadata = add_ingestion_metadata_task(df_account_with_updated_at_field)
    account_upsert_stg_into_prod = azure_sql_upsert_task(
        df=df_account_with_metadata,
        schema=AZURE_SQL_SCHEMA,
        table=ACCOUNT_AZURE_SQL_TABLE,
        on=ACCOUNT_PRIMARY_KEY,
        credentials_secret=AZURE_SQL_CREDENTIALS_SECRET,
    )
    account_transform_query = get_account_transform_query(
        cursor_field=AZURE_SQL_CURSOR_FIELD,
        cursor_value=cursor_value,
        schema=AZURE_SQL_SCHEMA,
        table=ACCOUNT_AZURE_SQL_TABLE,
    )
    final_accounts_data = query_to_df_task(
        query=account_transform_query, credentials_secret=AZURE_SQL_CREDENTIALS_SECRET
    )
    account_transform_query.set_upstream(account_upsert_stg_into_prod)
    to_sf_account = sf_upsert_task(
        final_accounts_data,
        table=ACCOUNT_SF_TABLE,
        external_id=ACCOUNT_SF_EXTERNAL_ID_FIELD,
        domain=SALESFORCE_DOMAIN,
        env=ENV_NAME,
        credentials_secret=SALESFORCE_CREDENTIALS_SECRET,
    )

    # Upsert related contacts
    account_ids = get_account_ids(final_accounts_data)
    contact_c4c_query_params = get_contact_c4c_query_params(acc_ids=account_ids)
    df_contact = c4c_to_df_task(
        url=URL,
        endpoint=CONTACT_ENDPOINT,
        params=contact_c4c_query_params,
        credentials_secret=C4C_CREDENTIALS_SECRET,
    )
    df_contact_cleaned = remove_c4c_metadata_cols(df_contact, metadata_cols=C4C_CONTACT_METADATA_COLS)
    df_contact_with_updated_at_field = add_updated_at_field(df_contact_cleaned)
    df_contact_with_metadata = add_ingestion_metadata_task(df_contact_with_updated_at_field)
    contact_upsert_stg_into_prod = azure_sql_upsert_task(
        df=df_contact_with_metadata,
        schema=AZURE_SQL_SCHEMA,
        table=CONTACT_AZURE_SQL_TABLE,
        on=CONTACT_PRIMARY_KEY,
        credentials_secret=AZURE_SQL_CREDENTIALS_SECRET,
    )
    contact_transform_query = get_contact_transform_query(
        schema=AZURE_SQL_SCHEMA, table=CONTACT_AZURE_SQL_TABLE, acc_ids=account_ids
    )
    final_contacts_data = query_to_df_task(
        query=contact_transform_query, credentials_secret=AZURE_SQL_CREDENTIALS_SECRET
    )
    contact_transform_query.set_upstream(contact_upsert_stg_into_prod)
    to_sf_contacts = sf_upsert_task(
        final_contacts_data,
        table=CONTACT_SF_TABLE,
        external_id=CONTACT_SF_EXTERNAL_ID_FIELD,
        domain=SALESFORCE_DOMAIN,
        env=ENV_NAME,
        credentials_secret=SALESFORCE_CREDENTIALS_SECRET,
    )






In [None]:
f.run(cursor_value="2022-04-15T00:00:00Z")

[2022-05-11 11:18:17+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'incr_c4c_corporate_account_sfdc_account'
[2022-05-11 11:18:17+0000] INFO - prefect.TaskRunner | Task 'cursor_value': Starting task run...
[2022-05-11 11:18:17+0000] INFO - prefect.TaskRunner | Task 'cursor_value': Finished task run for task with final state: 'Success'
[2022-05-11 11:18:17+0000] INFO - prefect.TaskRunner | Task 'evaluate_cursor_value': Starting task run...
[2022-05-11 11:18:17+0000] INFO - prefect.TaskRunner | Task 'evaluate_cursor_value': Finished task run for task with final state: 'Success'
[2022-05-11 11:18:17+0000] INFO - prefect.TaskRunner | Task 'get_account_c4c_query_params': Starting task run...
[2022-05-11 11:18:17+0000] INFO - prefect.TaskRunner | Task 'get_account_c4c_query_params': Finished task run for task with final state: 'Success'
[2022-05-11 11:18:17+0000] INFO - prefect.TaskRunner | Task 'c4c_to_df': Starting task run...
[2022-05-11 11:18:17+0000] ERROR - prefect.TaskRunne