# Scope of Notebook

The goal of this notebook is to showcase how you can use, in your own environment, a pre-trained model along with some profile data extracted from the Adobe Experience Platform to generate propensity scores and ingest those back to enrich the Unified Profile.

![EndToEndDesign](images/CMLE-Notebooks-Week4-Workflow-Watsonx.png)

We'll go through several steps:
- **Reading the featurized data** from the Data Landing Zone
- Generating the __scores__
- Creating a __target dataset__
- Creating a __dataflow__ to deliver data in the right format to that dataset.

# Setup
This notebook requires some configuration data to properly authenticate to your Adobe Experience Platform instance. You should be able to find all the values required above by following the Setup section of the __README__.
The next cell will be looking for your configuration file under your project assets to fetch the values used throughout this notebook. See more details in the __Setup__ section of the __README__ to understand how to create your configuration file.
NOTE: ensure the cluster type for this notebook is set to (Python 3.10 with Spark)

In [0]:
from project_lib import Project
from configparser import ConfigParser
import io

project = Project.access()
config_file = project.get_file('config.ini')

config = ConfigParser()
config.read_string(config_file.read().decode('utf-8'))

ims_org_id = config.get("Platform", "ims_org_id")
sandbox_name = config.get("Platform", "sandbox_name")
environment = config.get("Platform", "environment")
client_id = config.get("Authentication", "client_id")
client_secret = config.get("Authentication", "client_secret")
scopes = config.get("Authentication", "scopes")
dataset_id = config.get("Platform", "dataset_id")
featurized_dataset_id = config.get("Platform", "featurized_dataset_id")
export_path = config.get("Cloud", "export_path")
import_path = config.get("Cloud", "import_path")
data_format = config.get("Cloud", "data_format")
compression_type = config.get("Cloud", "compression_type")
model_name = config.get("Cloud", "model_name")

watson_username = config.get("Watsonx", "watson_username")
watson_apikey = config.get("Watsonx", "watson_apikey")
model_id = config.get("Watsonx", "model_id")

Some utility functions that will be used throughout this notebook:

In [0]:
def get_ui_link(tenant_id, resource_type, resource_id):
    if environment == "prod":
        prefix = f"https://experience.adobe.com"
    else:
        prefix = f"https://experience-{environment}.adobe.com"
    return f"{prefix}/#/@{tenant_id}/sname:{sandbox_name}/platform/{resource_type}/{resource_id}"

To ensure uniqueness of resources created as part of this notebook, we are using your system provisioned username to include in each of the resource titles to avoid conflicts, it is __recommended to supply a more readable__ one so you could easily identify resources in AEP created by this notebook

In [0]:
import re

username=watson_username # supply your custom one ex: foo@bar.com
unique_id = s = re.sub("[^0-9a-zA-Z]+", "_", watson_username)

print(f"Username: {username}")
print(f"Unique ID: {unique_id}")

Username: mndymuqvx34peqwqz-ydi68gcdn1kj9ugzqs-towtum
Unique ID: mndymuqvx34peqwqz_ydi68gcdn1kj9ugzqs_towtum


Before we run anything, make sure to install the following required libraries for this notebook. They are all publicly available libraries and the latest version should work fine.

In [0]:
!pip install mlflow
!pip install aepp
!pip install adlfs
!pip install s3fs
!pip install fsspec

Collecting mlflow
  Downloading mlflow-2.12.2-py3-none-any.whl.metadata (29 kB)
Collecting Flask<4 (from mlflow)
  Downloading flask-3.0.3-py3-none-any.whl.metadata (3.2 kB)
Collecting alembic!=1.10.0,<2 (from mlflow)
  Downloading alembic-1.13.1-py3-none-any.whl.metadata (7.4 kB)
Collecting docker<8,>=4.0.0 (from mlflow)
  Downloading docker-7.0.0-py3-none-any.whl.metadata (3.5 kB)
Collecting gitpython<4,>=3.1.9 (from mlflow)
  Downloading GitPython-3.1.43-py3-none-any.whl.metadata (13 kB)
Collecting graphene<4 (from mlflow)
  Downloading graphene-3.3-py2.py3-none-any.whl.metadata (7.7 kB)
Collecting querystring-parser<2 (from mlflow)
  Downloading querystring_parser-1.2.4-py2.py3-none-any.whl.metadata (559 bytes)
Collecting sqlparse<1,>=0.4.0 (from mlflow)
  Downloading sqlparse-0.5.0-py3-none-any.whl.metadata (3.9 kB)
Collecting gunicorn<23 (from mlflow)
  Downloading gunicorn-22.0.0-py3-none-any.whl.metadata (4.4 kB)
Collecting Mako (from alembic!=1.10.0,<2->mlflow)
  Downloading M

Now lets init the APIClient in order to be able to interact with the platform.

In [0]:
wml_credentials = {
    "instance_id": "openshift",
    "version": "4.8",
    "url": "https://cpd-cpd-instance.apps.p712zf6h.eastus2.aroapp.io",
    "username": watson_username,
    "apikey": watson_apikey
}

In [0]:
from ibm_watsonx_ai import APIClient

project_id = project.get_metadata()['metadata']['guid']
print("Project ID:", project_id)    

client = APIClient(wml_credentials)
client.set.default_project(project_id)

Project ID: 506b7b7a-ecf6-454f-8931-6d1aab37044f


'SUCCESS'

We'll be using the [aepp Python library](https://github.com/pitchmuc/aepp) here to interact with AEP APIs and create a schema and dataset suitable for adding our synthetic data further down the line. This library simply provides a programmatic interface around the REST APIs, but all these steps could be completed similarly using the raw APIs directly or even in the UI. 
For more information on the underlying APIs please see the [API reference guide](https://developer.adobe.com/experience-platform-apis/).

Before any calls can take place, we need to configure the library and setup authentication credentials. For this you'll need the following piece of information. For information about how you can get these, please refer to the __Setup__ section of the __Readme__:

- Client ID
- Client secret

In [0]:
import aepp

aepp.configure(
  org_id=ims_org_id,
  secret=client_secret,
  scopes=scopes,
  client_id=client_id,
  environment=environment,
  sandbox=sandbox_name
)

# 1. Generating Propensity Scores Using the Trained Model

### 1.1 Reading the Featurized Data from the Data Landing Zone

In the second weekly assignment we had written our featurized data into the Data Landing Zone, and then on the third assignment we just read a sampled
portion of it for training our model. At that point we want to score all of the profiles, so we need to read everything.

The featurized data exported into the Data Landing Zone is under the format __cmle/egress__/𝐷𝐴𝑇𝐴𝑆𝐸𝑇𝐼𝐷/𝑒𝑥𝑝𝑜𝑟𝑡𝑇𝑖𝑚𝑒=__EXPORTTIME__.
We know the dataset ID which is in your config under `featurized_dataset_id` so we're just missing the export time so we know what to read.
To get that we can simply list files in the DLZ and find what the value is. The first step is to retrieve the credentials for the DLZ related to the destination container:

In [0]:
import aepp
from aepp import flowservice

flow_conn = flowservice.FlowService()
credentials = flow_conn.getLandingZoneCredential(dlz_type='dlz_destination')

Now we use some Python libraries to authenticate and issue listing commands so we can get the paths and extract the time from it.

In [0]:
import fsspec
from fsspec import AbstractFileSystem

def getDLZFSPath(credentials: dict):
    if 'dlzProvider' in credentials.keys() and ['Amazon', 's3'] in credentials['dlzProvider']:
        aws_credentials = {
            'key' : credentials['credentials']['awsAccessKeyId'],
            'secret' : credentials['credentials']['awsSecretAccessKey'],
            'token' : credentials['credentials']['awsSessionToken']
        }
        return fsspec.filesystem('s3', **aws_credentials), credentials['dlzPath']['bucketName']
    else:
        abs_credentials = {
            'account_name' : credentials['storageAccountName'],
            'sas_token' : credentials['SASToken']
        }
        return fsspec.filesystem('abfss', **abs_credentials), credentials['containerName']

    
def get_export_time(fs: AbstractFileSystem, container_name: str, base_path: str, dataset_id: str):
  featurized_data_base_path = f"{container_name}/{base_path}/{dataset_id}"
  featurized_data_export_paths = fs.ls(featurized_data_base_path)
  
  if len(featurized_data_export_paths) == 0:
    raise Exception(f"Found no exports for featurized data from dataset ID {dataset_id} under path {featurized_data_base_path}")
  elif len(featurized_data_export_paths) > 1:
    print(f"Found {len(featurized_data_export_paths)} exports from dataset dataset ID {dataset_id} under path {featurized_data_base_path}, using most recent one")
  
  featurized_data_export_path = featurized_data_export_paths[-1]
  featurized_data_export_time = featurized_data_export_path.strip().split("/")[-1].split("=")[-1]
  return featurized_data_export_time

fs, container = getDLZFSPath(credentials)

export_time = get_export_time(fs, container, export_path, featurized_dataset_id)
print(f"Using featurized data export time of {export_time}")

Using featurized data export time of 20240506204134


At that point we're ready to read this data. We're using Spark since it could be pretty large as we're not doing any sampling. 
Based on the provisioned account Landing Zone could be either configured to use azure or aws, in case of azure following properties will be used to authenticate using SAS:

- `fs.azure.account.auth.type.$ACCOUNT.dfs.core.windows.net` should be set to `SAS`.
- `fs.azure.sas.token.provider.type.$ACCOUNT.dfs.core.windows.net` should be set to `org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider`.
- `fs.azure.sas.fixed.token.$ACCOUNT.dfs.core.windows.net` should be set to the SAS token retrieved earlier.

in case of aws following properties will be used to access data stored in s3:

- `fs.s3a.access.key` and spark.hadoop.fs.s3a.access.key` should be the s3 access key
- `fs.s3a.secret.key` and spark.hadoop.fs.s3a.secret.key` should be the s3 secret
- `fs.s3a.session.token` and `spark.hadoop.fs.s3a.session.token` should be set to s3 session token
- `fs.s3a.aws.credentials.provider` and `spark.hadoop.fs.s3a.aws.credentials.provider` should be set to `org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider`
- `fs.s3.impl` and `spark.hadoop.fs.s3.impl` should be set to `org.apache.hadoop.fs.s3a.S3AFileSystem`

The above properties are calculated based on the landing zone credentials, following util method will set these up:

In [0]:
def configureSparkSessionAndGetPath(credentials):
    if 'dlzProvider' in credentials.keys() and ['Amazon', 's3'] in credentials['dlzProvider']:
        aws_key = credentials['credentials']['awsAccessKeyId']
        aws_secret = credentials['credentials']['awsSecretAccessKey']
        aws_token = credentials['credentials']['awsSessionToken']
        aws_buket = credentials['dlzPath']['bucketName']
        dlz_folder = credentials['dlzPath']['dlzFolder']
        spark.conf.set("fs.s3a.access.key", aws_key)
        spark.conf.set("fs.s3a.secret.key", aws_secret)
        spark.conf.set("fs.s3a.session.token", aws_token)
        spark.conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
        spark.conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        spark.conf.set("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        spark.conf.set("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
        spark.conf.set("spark.hadoop.fs.s3a.access.key", aws_key)
        spark.conf.set("spark.hadoop.fs.s3a.secret.key", aws_secret)
        spark.conf.set("fs.s3a.session.token", aws_token)
        return f"s3a://{aws_buket}/{dlz_folder}/"
    else:
        dlz_storage_account = credentials['storageAccountName']
        dlz_sas_token = credentials['SASToken']
        dlz_container = credentials['containerName']    
        spark.conf.set(f"fs.azure.account.auth.type.{dlz_storage_account}.dfs.core.windows.net", "SAS")
        spark.conf.set(f"fs.azure.sas.token.provider.type.{dlz_storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
        spark.conf.set(f"fs.azure.sas.fixed.token.{dlz_storage_account}.dfs.core.windows.net", dlz_sas_token)
        return f"abfss://{dlz_container}@{dlz_storage_account}.dfs.core.windows.net/"

Let's put that in practice and create a Spark dataframe containing the entire featurized data:

In [0]:
cloud_base_path = configureSparkSessionAndGetPath(credentials)
input_path = cloud_base_path + f"{export_path}/{featurized_dataset_id}/exportTime={export_time}/"

df = spark.read.parquet(input_path)
df.printSchema()

We can verify it matches what we had written out in the second weekly assignment:

In [0]:
df.count()

399624

And also do a sanity check on the data to make sure it looks good:

In [0]:
df.show()

+--------------------+--------------------+--------------------+--------------------+--------------+------------+-------------+--------------+--------------------+--------------------+-------------+-----------------------+-------------------------+------------------------+-------------------------+---------------------------------+--------------------------------+-----------------------+--------------------------+
|              userId|           eventType|           timestamp|subscriptionOccurred|emailsReceived|emailsOpened|emailsClicked|productsViewed|propositionInteracts|propositionDismissed|webLinkClicks|minutes_since_emailSent|minutes_since_emailOpened|minutes_since_emailClick|minutes_since_productView|minutes_since_propositionInteract|minutes_since_propositionDismiss|minutes_since_linkClick|random_row_number_for_user|
+--------------------+--------------------+--------------------+--------------------+--------------+------------+-------------+--------------+--------------------+-

In [0]:
df = df.fillna(0)
df.show()

+--------------------+--------------------+--------------------+--------------------+--------------+------------+-------------+--------------+--------------------+--------------------+-------------+-----------------------+-------------------------+------------------------+-------------------------+---------------------------------+--------------------------------+-----------------------+--------------------------+
|              userId|           eventType|           timestamp|subscriptionOccurred|emailsReceived|emailsOpened|emailsClicked|productsViewed|propositionInteracts|propositionDismissed|webLinkClicks|minutes_since_emailSent|minutes_since_emailOpened|minutes_since_emailClick|minutes_since_productView|minutes_since_propositionInteract|minutes_since_propositionDismiss|minutes_since_linkClick|random_row_number_for_user|
+--------------------+--------------------+--------------------+--------------------+--------------+------------+-------------+--------------+--------------------+-

# 1.2 Scoring the Profiles

For scoring we need 2 things:

- The __data__ to score
- The __trained model__ that will be used to do the scoring

We just created a dataframe containing the first one, and in the previous weekly assignment we created a production model that can operate on this data, so let's fetch this model from our repository and turn it into a Spark UDF so it can interact with our data easily:

In [0]:
model = client.repository.load(model_id)
model

![sklearn_pipeline](images/sklearn_pipeline.png)

Now we're ready to apply our trained model on top of the entire dataset. For that, we need to create an UDF function which will use the model above for scoring.

In [0]:
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import FloatType
import numpy as np


def model_predict(features):
    return float(model.predict_proba(np.array([features]))[:, 1]) # index 1 - represents positive probability of event to occur, while 0 - is probability of negative outcome

predict_udf = udf(model_predict, FloatType())

Now lets define UDF its inputs - which in our case are all the columns that the model needs to to operate on (except the `subscriptionOccurred` - which our model is not expecting but rather is attempting to predict the score for).
We can get that easily as the Spark dataframe contains metadata about its columns:

In [0]:
df_to_score = df.drop('subscriptionOccurred') # drop label column since model is not expecting it for scoring purposes

udf_inputs = struct(*(df_to_score.columns))

df_scored = df_to_score.withColumn(
  "prediction",
  predict_udf(udf_inputs)
)

df_scored.printSchema()

root
 |-- userId: string (nullable = true)
 |-- eventType: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- emailsReceived: long (nullable = true)
 |-- emailsOpened: long (nullable = true)
 |-- emailsClicked: long (nullable = true)
 |-- productsViewed: long (nullable = true)
 |-- propositionInteracts: long (nullable = true)
 |-- propositionDismissed: long (nullable = true)
 |-- webLinkClicks: long (nullable = true)
 |-- minutes_since_emailSent: integer (nullable = true)
 |-- minutes_since_emailOpened: integer (nullable = true)
 |-- minutes_since_emailClick: integer (nullable = true)
 |-- minutes_since_productView: integer (nullable = true)
 |-- minutes_since_propositionInteract: integer (nullable = true)
 |-- minutes_since_propositionDismiss: integer (nullable = true)
 |-- minutes_since_linkClick: integer (nullable = true)
 |-- random_row_number_for_user: integer (nullable = true)
 |-- prediction: float (nullable = true)



If we look at the data we should see a new column called prediction which corresponds to the score generated by the model for this particular profile based on all the features computed earlier.

In [0]:
df_scored.show()

+--------------------+--------------------+--------------------+--------------+------------+-------------+--------------+--------------------+--------------------+-------------+-----------------------+-------------------------+------------------------+-------------------------+---------------------------------+--------------------------------+-----------------------+--------------------------+------------+
|              userId|           eventType|           timestamp|emailsReceived|emailsOpened|emailsClicked|productsViewed|propositionInteracts|propositionDismissed|webLinkClicks|minutes_since_emailSent|minutes_since_emailOpened|minutes_since_emailClick|minutes_since_productView|minutes_since_propositionInteract|minutes_since_propositionDismiss|minutes_since_linkClick|random_row_number_for_user|  prediction|
+--------------------+--------------------+--------------------+--------------+------------+-------------+--------------+--------------------+--------------------+-------------+---

In [0]:
print(f"{df_scored.count()} - rows in dataset")
unique_users = df_scored.select('userId').distinct().count()
print(f"{unique_users} - unique users in dataset")

399624 - rows in dataset
100000 - unique users in dataset


Lets now group by userId and pull `MAX` prediction this way discarding duplicated records.
The result we should obtain here as many rows as many unique userId's we have.

In [0]:
from pyspark.sql import functions as F
df_user_predictions = df_scored.groupBy("userId").agg(F.max("prediction").alias("max_prediction"))

When you think about bringing the scored profiles back into the Adobe Experience Platform, we don't need to bring back all the features. In fact, we only really need 2 columns:

    - The user ID, so we know in the Unified Profile to which profile this row corresponds.
    - The score for this user ID.

In [0]:
df_to_ingest = df_user_predictions.select(
  "userId",
  "max_prediction"
).cache()

df_to_ingest.printSchema()

root
 |-- userId: string (nullable = true)
 |-- max_prediction: float (nullable = true)



At that point we have the scored profiles and exactly what we need to bring back into Adobe Experience Platform. But we're not quite ready to write the results yet, there's a bit of setup that needs to happen first:

    We need to create and configure a destination dataset in Adobe Experience Platform where our data will end up.
    We need to setup a data flow that will be able to take this data, convert it into an XDM format, and deliver it to this dataset.


# 2. Bringing the Scores back into Unified Profile

### 2.1 Create ingestion schema and dataset

The first step is to define where this propensity data we are creating as the output of our model should end up in the Unified Profile. We need to create a few entities for that:

    A **fieldgroup** that will define the XDM for where propensity scores should be stored.
    A **schema** based on that field group that will tie it back to the concept of profile.
    A **dataset** based on that schema that will hold the data.

As for the structure itself it's pretty simple, we just need 2 fields:

    The **propensity** itself as a decimal number.
    The **user ID** to which this propensity score relates.

Let's put that in practice and create the field group. Note that because we are creating custom fields here, they need to be nested under the tenant ID corresponding to your organization.


In [0]:
from aepp import schema

schema_conn = schema.Schema()

tenant_id = schema_conn.getTenantId()
tenant_id

'cloudmlecosystem'

In [0]:
fieldgroup_res = schema_conn.createFieldGroup({
  	"type": "object",
	"title": f"[CMLE][Week4] Fieldgroup for user propensity (created by {username})",
	"description": "This mixin is used to define a propensity score that can be assigned to a given profile.",
	"allOf": [{
		"$ref": "#/definitions/customFields"
	}],
	"meta:containerId": "tenant",
	"meta:resourceType": "mixins",
	"meta:xdmType": "object",
	"definitions": {
      "customFields": {
        "type": "object",
        "properties": {
          f"_{tenant_id}": {
            "type": "object",
            "properties": {
              "propensity": {
                "title": "Propensity",
                "description": "This refers to the propensity of a user towards an outcome.",
                "type": "number"
              },
              "userid": {
                "title": "User ID",
                "description": "This refers to the user having a propensity towards an outcome.",
                "type": "string"
              }
            }
          }
        }
      }
	},
	"meta:intendedToExtend": ["https://ns.adobe.com/xdm/context/profile"]
})

fieldgroup_id = fieldgroup_res["$id"]
fieldgroup_id

'https://ns.adobe.com/cloudmlecosystem/mixins/c4ef85f3bdd05fb971b8603596ebb000113f97a7e34485f9'

From this field group ID we can add it to a brand new schema that will be marked for profiles.

In [0]:
schema_res = schema_conn.createProfileSchema(
  name=f"[CMLE][Week4] Schema for user propensity ingestion (created by {username})",
  mixinIds=[
    fieldgroup_id
  ],
  description="Schema generated by CMLE for user propensity score ingestion",
)

schema_id = schema_res["$id"]
schema_alt_id = schema_res["meta:altId"]

print(f"Schema ID: {schema_id}")
print(f"Schema Alt ID: {schema_alt_id}")

Schema ID: https://ns.adobe.com/cloudmlecosystem/schemas/cd4be6112bc779d88f46cf0539a296f0207de2caf219f02f
Schema Alt ID: _cloudmlecosystem.schemas.cd4be6112bc779d88f46cf0539a296f0207de2caf219f02f


Because we eventually intend for these scores to end up in the Unified Profile, we need to specify which field of the schema corresponds to an identity so it can resolve the corresponding profile. In our case, the `userid` field is an ECID and we mark it as such.

In [0]:
identity_type = "ECID"
descriptor_res = schema_conn.createDescriptor(
  descriptorObj = {
    "@type": "xdm:descriptorIdentity",
    "xdm:sourceSchema": schema_id,
    "xdm:sourceVersion": 1,
    "xdm:sourceProperty": f"/_{tenant_id}/userid",
    "xdm:namespace": identity_type,
    "xdm:property": "xdm:id",
    "xdm:isPrimary": True
  }
)
descriptor_res

{'@id': 'abb9cf5e95479ef68bb095515d4607fb5ba4c2fe0c6d404b',
 '@type': 'xdm:descriptorIdentity',
 'xdm:sourceSchema': 'https://ns.adobe.com/cloudmlecosystem/schemas/cd4be6112bc779d88f46cf0539a296f0207de2caf219f02f',
 'xdm:sourceVersion': 1,
 'xdm:sourceProperty': '/_cloudmlecosystem/userid',
 'imsOrg': '3ADF23C463D98F640A494032@AdobeOrg',
 'version': '1',
 'xdm:namespace': 'ECID',
 'xdm:property': 'xdm:id',
 'xdm:isPrimary': True,
 'meta:containerId': '21f60eb6-7c13-4074-b60e-b67c13b0740c',
 'meta:sandboxId': '21f60eb6-7c13-4074-b60e-b67c13b0740c',
 'meta:sandboxType': 'production'}

And of course that schema needs to be enabled for Unified Profile consumption, so it can be added to the profile union schema.

In [0]:
enable_res = schema_conn.enableSchemaForRealTime(schema_alt_id)
enable_res

{'$id': 'https://ns.adobe.com/cloudmlecosystem/schemas/cd4be6112bc779d88f46cf0539a296f0207de2caf219f02f',
 'meta:altId': '_cloudmlecosystem.schemas.cd4be6112bc779d88f46cf0539a296f0207de2caf219f02f',
 'meta:resourceType': 'schemas',
 'version': '1.1',
 'title': '[CMLE][Week4] Schema for user propensity ingestion (created by mndymuqvx34peqwqz-ydi68gcdn1kj9ugzqs-towtum)',
 'type': 'object',
 'description': 'Schema generated by CMLE for user propensity score ingestion',
 'allOf': [{'$ref': 'https://ns.adobe.com/xdm/context/profile',
   'type': 'object',
   'meta:xdmType': 'object'},
  {'$ref': 'https://ns.adobe.com/cloudmlecosystem/mixins/c4ef85f3bdd05fb971b8603596ebb000113f97a7e34485f9',
   'type': 'object',
   'meta:xdmType': 'object'}],
 'refs': ['https://ns.adobe.com/xdm/context/profile',
  'https://ns.adobe.com/cloudmlecosystem/mixins/c4ef85f3bdd05fb971b8603596ebb000113f97a7e34485f9'],
 'imsOrg': '3ADF23C463D98F640A494032@AdobeOrg',
 'additionalInfo': {'numberOfIdentities': 1,
  'numb

At that point we're ready to create the dataset that will hold our propensity scores. This dataset is based on our schema we just created and nothing more.

In [0]:
from aepp import catalog

cat_conn = catalog.Catalog()

ingestion_dataset_res = cat_conn.createDataSets(
  name=f"[CMLE][Week4] Dataset for user propensity ingestion (created by {username})",
  schemaId=schema_id
)

ingestion_dataset_id = ingestion_dataset_res[0].split("/")[-1]
ingestion_dataset_id

'663bb3d786def12c699779a1'

And similarly that dataset needs to be enabled for Unified Profile consumption, so that any batch of data written to this dataset is automatically picked up and processed to insert into the individual profiles and create new fragments.

In [0]:
# TODO: this is currently failing due to invalid content type, need to fix in aepp, see https://github.com/pitchmuc/aepp/issues/15
# for now just enable in the UI...
cat_conn.enableDatasetProfile(ingestion_dataset_id)

['@/dataSets/663bb3d786def12c699779a1']

You should be able to see your dataset in the UI at the link below, and it should match the new schema created as shown in the following screenshot.

In [0]:
ingestion_dataset_link = get_ui_link(tenant_id, "dataset/browse", ingestion_dataset_id)
print(f"Dataset ID {ingestion_dataset_id} available under {ingestion_dataset_link}")

Dataset ID 663bb3d786def12c699779a1 available under https://experience-stage.adobe.com/#/@cloudmlecosystem/sname:cmle-datarobot/platform/dataset/browse/663bb3d786def12c699779a1


## 2.2 Setup ingestion data flow

Now that all the dataset and schema setup is completed, we're ready to define our Data Flow. The Data Flow defines the contract between the source and destination dataset.

For the purposes of this notebook we will be using the [Data Landing Zone (DLZ)](https://experienceleague.adobe.com/docs/experience-platform/sources/api-tutorials/create/cloud-storage/data-landing-zone.html?lang=en) as the source filesystem under which the scoring results will be written. Every Adobe Experience Platform has a DLZ already setup as an [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs) container or [AWS S3](https://aws.amazon.com/s3/). We'll be using that as a delivery mechanism for the featurized data, but this step can be customized to delivery this data to any cloud storage filesystem.
To setup the delivery pipeline, we'll be using the Flow Service for Destinations which will be responsible for picking up the featurized data and dump it into the DLZ. There's a few steps involved:
    
Creating a source connection.
Creating a target connection.
Creating a transformation.
Creating a data flow.

Note that, although we already got DLZ credentials earlier in this notebook, there were for a different container where all the destination data is written (`dlz-destination`), but here we want to get credentials for a different container corresponding to your user drop zone (`dlz-user-container`).

For that, again we use aepp to abstract all the APIs:

In [0]:
from aepp import flowservice

flow_conn = flowservice.FlowService()
dlz_credentials = flow_conn.getLandingZoneCredential()

def getDLZPath(credentials: dict):
    if 'dlzProvider' in credentials.keys() and ['Amazon', 's3'] in credentials['dlzProvider']:
        return credentials['dlzPath']['bucketName'] + '/' + credentials['dlzPath']['dlzFolder']
    else:
        return credentials['containerName']

dlz_container = getDLZPath(dlz_credentials)
print(dlz_container)

The __source connection__ is responsible for connecting to your cloud storage account (in our case here, the Data Landing Zone) so that the resulting Data Flow will know from where data needs to be picked up.

For reference, here is a list of all the connection specs available for the most popular cloud storage accounts (these IDs are global across every single customer account and sandbox):
    
| Cloud Storage Type | Connection Spec ID | Connection Spec Name 
|--------------------|--------------------------------------|---------------------- 
| Amazon S3          | ecadc60c-7455-4d87-84dc-2a0e293d997b | amazon-s3 
| Azure Blob Storage | d771e9c1-4f26-40dc-8617-ce58c4b53702 | google-adwords 
| Azure Data Lake    | b3ba5556-48be-44b7-8b85-ff2b69b46dc4 | adls-gen2 
| Data Landing Zone  | 26f526f2-58f4-4712-961d-e41bf1ccc0e8 | landing-zone 
| Google Cloud Storage| 32e8f412-cdf7-464c-9885-78184cb113fd | google-cloud 
| SFTP               | b7bf2577-4520-42c9-bae9-cad01560f7bc | sftp    
    

In [0]:
connection_spec_id = "26f526f2-58f4-4712-961d-e41bf1ccc0e8"
source_res = flow_conn.createSourceConnection({
  "name": "[CMLE][Week4] Data Landing Zone source connection for propensity scores",
  "data": {
      "format": "delimited"
  },
  "params": {
    "path": f"{dlz_container}/{import_path}",
    "type": "folder",
    "recursive": True
  },
  "connectionSpec": {
      "id": connection_spec_id,
      "version": "1.0"
  }
})

source_connection_id = source_res["id"]
source_connection_id

'cc8479a9-c04f-4672-9729-1af5b1bedced'

The __target connection__ is responsible for connecting to your Adobe Experience Platform dataset so that the resulting Data Flow will know where the data needs to be written. Because we already created our ingestion dataset in the previous section, we can simply tie it to that dataset ID and the corresponding schema.

In [0]:
target_res = flow_conn.createTargetConnectionDataLake(
  name="[CMLE][Week4] User Propensity Target Connection",
  datasetId=ingestion_dataset_id,
  schemaId=schema_id
)

target_connection_id = target_res["id"]
target_connection_id

'e45cfb70-ea1e-4301-bcfa-20b81c00362b'

We're still missing one step. If you look back to the previous cells, this is what we have as the schema of our scored dataframe:

 - `userId`
 - `prediction`

And this is what we have as the schema of our ingestion dataset:

 - `_$TENANTID.userid`
 - `_$TENANTID.propensity`

Although it may look obvious to us, we still need to let the platform know which fields maps to what. This can be achieved using the Data Prep service which allows you to specify a set of transformations to map one field to another. In our case the transformation is pretty simple, we just need to match the schemas without making any changes, but you can do a lot more extensive transformations using this service if needed.

In [0]:
from aepp import dataprep

dataprep_conn = dataprep.DataPrep()

In [0]:
mapping_res = dataprep_conn.createMappingSet(
  schemaId=schema_id,
  validate=True,
  mappingList=[
    {
      "sourceType": "ATTRIBUTE",
      "source": "prediction",
      "destination": f"_{tenant_id}.propensity"
    },
    {
      "sourceType": "ATTRIBUTE",
      "source": "userId",
      "destination": f"_{tenant_id}.userid"
    }
  ]
)

mapping_id = mapping_res["id"]
mapping_id

'a15b7a9686c4468aacc642b7019110ae'

At that point we have everything we need to create a __Data Flow__. A data flow is the "recipe" that describes where the data comes from and where it should end up. We can also specify how often checks happen to find new data, but it cannot be lower than 15 minutes currently for platform stability reasons. A data flow is tied to a flow spec ID which contains the instructions for transfering data in an optimized way between a source and destination.

For reference, here is a list of all the flow specs available for the most popular cloud storage accounts (these IDs are global across every single customer account and sandbox):
    
| Cloud Storage Type | Flow Spec ID | Flow Spec Name 
|-----------------------|--------------------------------------|------------------
| Amazon S3 | 9753525b-82c7-4dce-8a9b-5ccfce2b9876 | CloudStorageToAEP 
| Azure Blob Storage | 14518937-270c-4525-bdec-c2ba7cce3860 | CRMToAEP 
| Azure Data Lake | 9753525b-82c7-4dce-8a9b-5ccfce2b9876 | CloudStorageToAEP 
| Data Landing Zone | 9753525b-82c7-4dce-8a9b-5ccfce2b9876 | CloudStorageToAEP 
| Google Cloud Storage | 9753525b-82c7-4dce-8a9b-5ccfce2b9876 | CloudStorageToAEP 
| SFTP | 9753525b-82c7-4dce-8a9b-5ccfce2b9876 | CloudStorageToAEP

In [0]:
flow_spec = flow_conn.getFlowSpecs("name==CloudStorageToAEP")
flow_spec_id = flow_spec[0]["id"]
flow_spec_id

'9753525b-82c7-4dce-8a9b-5ccfce2b9876'

In [0]:
import time

# TODO: cleanup in aepp, first param should not be required
flow_res = flow_conn.createFlow(flow_spec_id, obj={
  "name": f"[CMLE][Week4] DLZ to AEP for user propensity (created by {username})",
  "flowSpec": {
      "id": flow_spec_id,
      "version": "1.0"
  },
  "sourceConnectionIds": [
      source_connection_id
  ],
  "targetConnectionIds": [
      target_connection_id
  ],
  "transformations": [
      {
          "name": "Mapping",
          "params": {
              "mappingId": mapping_id,
              "mappingVersion": 0
          }
      }
  ],
  "scheduleParams": {
      "startTime": str(int(time.time())),
      "frequency": "minute",
      "interval": "15"
  }
})
dataflow_id = flow_res["id"]
dataflow_id

'c4103810-5443-401e-8812-a8ec9eb3f6b5'

Note that the name of the transformation has to be set to `Mapping` or the job will fail.

You should be able to see your Data Flow in the UI at the link below, and you may see some executions depending on when you check since it runs on a schedule and will still show the run even if there was no data to process, as shown in the screenshot below.

In [0]:
dataflow_link = get_ui_link(tenant_id, "source/dataflows", dataflow_id)
print(f"Data Flow created as ID {dataflow_id} available under {dataflow_link}")

Data Flow created as ID c4103810-5443-401e-8812-a8ec9eb3f6b5 available under https://experience-stage.adobe.com/#/@cloudmlecosystem/sname:cmle-datarobot/platform/source/dataflows/c4103810-5443-401e-8812-a8ec9eb3f6b5


Note: If you would like to switch to a different cloud storage, you need to update the flow_spec_id variable above to the matching value in the table mentioned earlier in this section. You can refer to the name from the table above to find out the ID.

## 2.3 Ingest the scored users into the Unified Profile

At that point we have successfully setup a Data Flow that is listening on any files being written to the DLZ under our import path, and will automatically convert it to XDM and deliver it to our dataset where they will be picked up for ingestion into the Unified Profile. All that is left to do is to actually deliver the data. For that, we refer to our Spark dataframe computed earlier `df_to_ingest` and we need to write it to the DLZ under the afore-mentioned folder.

Before we can do that we need to update the credentials, because we'll be writing to a different container in the DLZ (`dlz-user-container` instead of `dlz-destination`) so the credentials are different. This should not cause an issue with the lazy computation of our dataframe because we used `.cache()` to cache it so it should already be in memory right now.

In [0]:
dlz_path = configureSparkSessionAndGetPath(dlz_credentials)
dlz_path

'abfss://dlz-user-container@sndbxdtlnd5m4pyy87h4d027.dfs.core.windows.net/'

Now let's determine the full path where we need to write. We use a similar convention as when the data as been egressed, which is `cmle/ingress/$DATASETID/exportTime=$EXPORTTIME`.

In [0]:
from datetime import datetime

scoring_export_time = datetime.utcnow().strftime('%Y%m%d%H%M%S')
output_path = f"{dlz_path}{import_path}/{ingestion_dataset_id}/exportTime={scoring_export_time}/"
output_path

'abfss://dlz-user-container@sndbxdtlnd5m4pyy87h4d027.dfs.core.windows.net/cmle/ingress/663bb3d786def12c699779a1/exportTime=20240509153003/'

Now we just need to write the datafrae to this output path. Note that because we chose delimited format in our Data Flow setup, we're just going to write the resulting files as CSV format, and include the header so the transformation knows which field is which column.

In [0]:
df_to_ingest \
  .write \
  .option("header", True) \
  .format("csv") \
  .save(output_path)

Because the Data Flow is executed asynchronously every 15 minutes, it may take a few minutes before the data is ingested in the dataset. We can check the status of the runs below until we can see the run has successfully completed to check some summary statistics.

In [0]:
import time

# TODO: handle that more gracefully in aepp
finished = False
while not finished:
  try:
      runs = flow_conn.getRuns(prop=f"flowId=={dataflow_id}")
      for run in runs:
          run_id = run["id"]
          run_started_at = run["metrics"]["durationSummary"]["startedAtUTC"]
          run_ended_at = run["metrics"]["durationSummary"]["completedAtUTC"]
          run_duration_secs = (run_ended_at - run_started_at) / 1000
          run_size_mb = run["metrics"]["sizeSummary"]["outputBytes"] / 1024. / 1024.
          run_num_rows = run["metrics"]["recordSummary"]["outputRecordCount"]
          run_num_files = run["metrics"]["fileSummary"]["outputFileCount"]
          print(f"Run ID {run_id} completed with: duration={run_duration_secs} secs; size={run_size_mb} MB; num_rows={run_num_rows}; num_files={run_num_files}")
      finished = True
  except Exception as e:
      print(f"No runs completed yet for flow {dataflow_id}")
      time.sleep(60)

No runs completed yet for flow c4103810-5443-401e-8812-a8ec9eb3f6b5


Once this is done, you should be able to go back in your dataset at the same link as before and see a batch created successfully in it. You should also notice for that batch that the records ingested will also show up under **Existing Profile Fragments** which means they have been ingested in the Unified Profile successfully.

## 2.4 Storing the scoring dataset ID in the configuration

Now that we got everything working, we just need to save the ingestion_dataset_id variable in the original configuration file, so we can refer to it in the following weekly assignment. To do that, execute the code below:

In [0]:
config.set("Platform", "scoring_dataset_id", ingestion_dataset_id)
config_string = io.StringIO()
config.write(config_string)
project.save_data(file_name="config.ini", data=config_string.getvalue(), overwrite=True)

{'file_name': 'config.ini',
 'message': 'File saved to project storage.',
 'asset_id': '3b0346fe-24e6-42cf-9f33-38d3b9bc87a7'}