# mParticle Real Time Data + Personalize

In this module you are going to be adding the ability to maintain a real-time dataset that represents the latest user behavior for users of the Retail Demo Store.  You will then connect that dataset to the Personalize dataset groups that you built in the first part of the workshop. This will enable your Personalize models to be kept up to date with the latest events your users are performing.

This workshop will use the mParticle web sdk npm library (https://github.com/mParticle/mparticle-web-sdk) to collect real-time data from the Retail Demo Store, and then feed that event data into the mParticle platform, where it can be routed directly to a Personalize Tracker, and then used to maintain the latest behavioral data for your personalization user-item interaction data.

*Recommended Time: 45 Minutes*

## Prerequisites
In order to complete this workshop, you will need to complete the 1.1-Personalize workbook in this directory.  You will also need a mParticle workspace.  If you are doing this workshop as part of a live workshop event, ask your moderator how to set up a mParticle workspace.  

If you are running this workshop on your own, you can go to https://www.mparticle.com/contact to request for the creation of a mParticle account.  We do not recommend using your production mParticle workspace for this workshop.

## mParticle Platform Overview
mParticle is a customer data platform (CDP) that helps you collect, clean, and control your customer data. mParticle provides several types of Sources which you can use to collect your data, and which you can choose from based on the needs of your app or site. For websites, you can use a javascript library to collect data. If you have a mobile app, you can embed one of mParticle’s Mobile SDKs, and if you’d like to create messages directly on a server (if you have, for example a dedicated .NET server that processes payments), mParticle has several server-based libraries that you can embed directly into your backend code. With mParticle, you can also use cloud-sources to import data about your app or site from other tools like Zendesk or Salesforce, to enrich the data sent through mParticle. By using mParticle to decouple data collection from data use, you can create a centralized data supply chain based on organized and modular data.

![mParticle Overview](images/mparticle/mparticle_overview.png)

## Setup
If you have already entered your mParticle API key and secret into your Cloud Formation deployment, you can skip to the next section.

mParticle uses *connections* as a way to organize data inputs into the platform.  Configuring a input will allow you to collect real-time event data from the Retail Demo Store user interface, and pass that information to mParticle.  You need to be signed into your mParticle workspace to begin this process.  Once you are signed in to the mParticle console (https://app.mparticle.com), click on your workspace, and then ‘Setup’ in the left hand navigation bar of the screen. Then, click ‘Inputs’.

![mParticle Input Setup](images/mparticle/mparticle-step-1.png)

Select the ‘Web’ type within Platforms.

![mParticle Platform Catalog](images/mparticle/mparticle-step-2.png)

And click ‘Web’ then click Issue Keys.

![mParticle Input Setup Add JS Source](images/mparticle/mparticle-step-3.png)

mParticle will generate a pair of key and secret which you will use as part of the cloud formation template setup earlier.

![mParticle Input Setup Key_Secret](images/mparticle/mparticle-step-4.png)

Now that you are here, set the write key for your new source in the environment variable below.  

You will need this in a few minutes, when you enable mParticle events collection in the Retail Demo Store.

Make sure you run the cell after you paste the key.  This will allow us to set the mParticle API key in the web UI deployment, and pass the keys securely to other back-end services via SSM.

In [1]:
# THIS IS ONLY REQUIRED IF YOU DID NOT SET THE mPARTICLE API KEYS AND ORG ID IN YOUR ORIGINAL DEPLOYMENT

# IF YOU ARE RUNNING THIS IN A GUIDED WORKSHOP, YOU WILL NEED TO SET THESE VALUES BEFORE CONTINUING

mparticle_api_key = "us1-bdb3725abab98a48859abd5d8d8c0f45"
mparticle_secret_key = "UCC_5O3Vg9pKohMOdakGHnauH5bGxnPleWzAVMgpx3bC51-EmoE75GmVvB0up4_M"

import boto3
import json

ssm = boto3.client('ssm')
iam = boto3.client('iam')
sts = boto3.client('sts')

In [2]:

aws_account_id = sts.get_caller_identity().get('Account')
region_name = boto3.Session().region_name

if mparticle_api_key:
    response = ssm.put_parameter(
        Name='retaildemostore-mparticle-api-key',
        Value='{}'.format(mparticle_api_key),
        Type='String',
        Overwrite=True
    )

if mparticle_secret_key:
    response = ssm.put_parameter(
        Name='retaildemostore-mparticle-secret-key',
        Value='{}'.format(mparticle_secret_key),
        Type='String',
        Overwrite=True
    )


print("mParticle API Key:")
print(mparticle_api_key)
print("mParticle Secret Key:")
print(mparticle_secret_key)
print("AWS Account ID:")
print(aws_account_id)

{'ResponseMetadata': {'RequestId': 'b221fe49-b195-4db8-bf83-e56d7bebe755', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'b221fe49-b195-4db8-bf83-e56d7bebe755', 'content-type': 'text/xml', 'content-length': '224', 'date': 'Thu, 07 Oct 2021 05:46:34 GMT'}, 'RetryAttempts': 0}}
mParticle Account ID:
683819462896
mParticle API Key:
us1-bdb3725abab98a48859abd5d8d8c0f45
mParticle Secret Key:
UCC_5O3Vg9pKohMOdakGHnauH5bGxnPleWzAVMgpx3bC51-EmoE75GmVvB0up4_M
AWS Account ID:
683819462896


You now have an environment variable that will enable the mParticle data collection library in the code in `AnalyticsHandler.js`.  All we need to do now, is force a re-deploy of the Retail Demo Store.  

To do that, go back to your AWS Console tab or window, and select Code Pipeline from the Services search.  Then, find the pipeline name that contains `WebUIPipeline` and click that link.

Then, select the ‘Release Change’ button, and confirm the release once the popup shows up.  You will see a confirmation that the pipeline is re-deploying the web ui for the Retail Demo Store.

This process should complete in a few minutes.  Once this is done, you will see the bottom tile confirm that your deployment has completed.

Now that you have a working source, let's try to see if the Events from the Retail Demo Store are flowing into mParticle.

## Sending Real-Time Events via the Retail Demo Store

Navigate to your Retail Demo Store Web App and refresh the screen to reload the user interface.  This will load the libraries you just deployed, and will allow your instance of the Retail Demo Store to send events to mParticle.

mParticle provides a variety of ways to collect real time events, and a full discussion of how this works is beyond the scope of this document, however the Retail Demo Store represents a fairly typical deployment for most web applications, in that it uses the mParticle Web SDK library, loaded via NPM, to inject their code into the web application.  

To verify if mParticle JS is fully instantiated within the Retail Demo Store, just open developer console of your web browser and type 
```javascript window.mParticle.Identity.getCurrentUser().getMPID()```

You should get the following response:

![mParticle Verification Setup3](images/mparticle/mparticle-verification-step.png)

Then, open another tab to the mParticle console, and select Live Stream under Data Master:

![mParticle Livestream View](images/mparticle/mparticle-live-stream.png)

You should see events collected by the mParticle SDK being streamed real-time within your mParticle instance. Feel free to view the events and see the actualy information hold per each event.

You can also try logging in or creating an account within the AWS Retail Demo Store Web application. Feel free to check the difference between a logged in user vs a user who is a guest within the retail demo store web app.



## Configure the mParticle Personalize Destination

mParticle uses Outputs to route real-time event data to a data consumer application.  In this case, you will be using Amazon Kinesis as the destination.  This destination will take real-time events from mParticle, pass them through an AWS Lambda function, and then into the user-item interactions dataset in your Retail Demo Store.

For this use case you will need to bring together mParticle and three AWS resources:

    1.) An Amazon Kinesis stream to receive real-time events from mParticle
    2.) An Amazon Personalize campaign to create product recommendations
    3.) A Lambda function to act as a broker to transform data from Kinesis into a format accepted by Amazon Personalize (and ingested via a Personalize Event Tracker)

When you deployed the Retail Demo Store, a Cloud Formation template deployed a Kinesis stream and Lambda function for this workshop, as well as the necessary IAM account and policy for mParticle to write to your Kinesis stream. Let's connect these to your mParticle environment.

### Connect mParticle to Kinesis

mParticle offers an "event" output for streaming event data to Kinesis in real time. This can be set up and controlled from the mParticle dashboard without writing code. You can read an overview of event outputs in the mParticle docs (https://docs.mparticle.com/guides/getting-started/connect-an-event-output/).

Amazon Kinesis is an AWS service for processing streaming data. mParticle will forward commerce event data to Kinesis, where it will be picked up by the Lambda function you will set up in a moment.

Click ‘Directory’ in the left hand navigation bar  of the screen, and then search ‘Amazon Kinesis’.

![mParticle Output Setup](images/mparticle/mparticle-step-5.png)

#### Create configuration

First, you will need to create an overall configuration for Kinesis. This holds all the settings that will remain the same for every input you connect.

![mParticle Kinesis Setup1](images/mparticle/mparticle-step-6.png)

To obtain an Access Key ID and Secret Access Key, please run the following code below and enter the generated Access Key ID and Secret in mParticle.


In [None]:
#create programmatic keys process


# policy JSON
# replace the region and account id

kinesisarn = "arn:aws:kinesis:"+region_name+":"+aws_account_id+":stream/mParticlePersonalizeEventsKinesisStream"
customPolicy = {
   "Version": "2012-10-17",
   "Statement": [
       {
           "Effect": "Allow",
           "Action": [
               "kinesis:PutRecord"
           ],
           "Resource": [
              kinesisarn
           ]
       }
   ]
}

# create the policy

policy = iam.create_policy(
  PolicyName='KinesismParticlePolicy',
  PolicyDocument=json.dumps(customPolicy)
)

policy_arn = policy['Policy']['Arn']

print(policy_arn)
    
#create user
created_user = client.create_user(
    UserName='mParticleRetailDemoStoreKinesis'
)

print(created_user)

response = iam_client.attach_user_policy(
            UserName=created_user,
            PolicyArn=policy_arn
        )

print(response)

#create programmatic access_key for mParticle
response = client.create_access_key(
    UserName=created_user
)

print(response)



In [3]:
# The AWS region you are running in is:

print(f'AWS Region: {region_name}')


AWS Region: us-east-1


#### Connect all sources

Next, you will connect the Retail Demo Store Web UI as an input: Web to Kinesis. To do that, click Connections then Connect. Select JS Web Platform as the Input

![mParticle Kinesis Setup1](images/mparticle/mparticle-step-7.png)

Click + Connect Output

![mParticle Kinesis Setup1](images/mparticle/mparticle-step-8.png)

Select Kinesis and the configuration You've just recently created earlier

![mParticle Kinesis Setup1](images/mparticle/mparticle-step-9.png)


The settings you need to provide here are the Amazon Region in which you deployed the Retail Demo Store and the name of the Kinesis stream in your environment.  The region will depend on which region you are using in your AWS account or workshop account.  The name of the Kinesis stream will be `mParticlePersonalizeEventsKinesisStream`.  This was deployed for you when you deployed the workshop environment.




![mParticle Kinesis Setup1](images/mparticle/mparticle-step-10.png)

After setting up the Kinesis service region, make sure you untick all the checkboxes but leave Send eCommerce Events only ticked or selected. Click Save to save your settings. We only need to send eCommerece Events to Kinesis as these are the only events relevant for AWS Personalize.

![mParticle Kinesis Setup1](images/mparticle/mparticle-kinesis-connection-config2.png)



## Configure Lambda Parameters and Review Code

Before the destination can send events to your Amazon Personalize events tracker, you will need to tell the destination lambda where to send the events.  It looks for an environment variable called 'personalize_tracking_id'.

Let's set that.  Run the following cell to look up the relevant Amazon Personalize tracker from the Personalize workbook.

We can then set the appropriate value in the destination Lambda.

In [9]:
ssm = boto3.client('ssm')

# First, let's look up the appropriate tracking string
response = ssm.get_parameter(
    Name='retaildemostore-personalize-event-tracker-id'
)

tracking_id = response['Parameter']['Value']

print(tracking_id)

# Get the Campaign ARN
response = ssm.get_parameter(
    Name='retaildemostore-product-recommendation-campaign-arn'
)

product_recommendation_arn = response['Parameter']['Value']

print(product_recommendation_arn)

d56d258d-0b24-421e-9fda-fdb7398d85da
arn:aws:personalize:us-east-1:683819462896:campaign/retaildemostore-product-personalization


Within the mParticle Platform, navigate to Directory and within the search for Custom Feed.

![mParticle Custom_Feed](images/mparticle/CustomFeed.png)


Click Setup and it should generate you a pair of key and secret. The key and secret generated here will be the keys you'll used for your lambda environment configuration.

![mParticle Custom_Feed](images/mparticle/CustomFeedDetails.png)




Go to your AWS console tab or window, and select Lambda from the Services menu.

Find the mParticlePersonalize, and click on it in the list.

![mParticle Events Destination Lambda](images/mparticle/mparticle-find-lambda-function.png)

Then, Navigate to Configuration then click Environment variables

![mParticle Events Destination Lambda](images/mparticle/mparticle-events-lambda-params.png)

If you don't have the ENVIRONMENT VARIABLES SET UP, you may need to manually type in the name and the value below.

The PERSONALISE_TRACKING_ID parameter should be set to the tracker id generated above.
The PERSONALISE_CAMPAIGN_ARN should be set by getting the ARN generated above.
The MPARTICLE_S2S_API_KEY and MPARTICLE_S2S_SECRET_KEY should be set to the key generated earlier above.


Click the edit button, then paste in the tracking ID, MPARTICLE_S2S_API_KEY, MPARTICLE_S2S_SECRET_KEY, campaignArn  from the cell above, and click save.

Take some time to look at the code that this Lambda uses to send events to Personalize.  You can use this code in your own deployment, however you may need to change the event parameters sent to Amazon Personalize depending on the dataset you set up.

```javascript
const AWS = require('aws-sdk');
const JSONBig = require('json-bigint')({ storeAsString: true });
const mParticle = require('mparticle');
const trackingId = process.env.PERSONALISE_TRACKING_ID;
const campaignArn = process.env.PERSONALISE_CAMPAIGN_ARN;
const report_actions = ["purchase", "view_detail", "add_to_cart", "checkout","add_to_wishlist"];
const mp_api_key = process.env.MPARTICLE_S2S_API_KEY;
const mp_api_secret = process.env.MPARTICLE_S2S_SECRET_KEY;
const personalizeevents = new AWS.PersonalizeEvents({apiVersion: '2018-03-22'});
const personalizeruntime = new AWS.PersonalizeRuntime({apiVersion: '2018-05-22'});
console.log("ENVIRONMENT VARIABLES:"+trackingId+":"+mp_api_key+":"+mp_api_secret);
const mp_api = new mParticle.EventsApi(new mParticle.Configuration(mp_api_key, mp_api_secret));
const req = require('request');
const axios = require('axios');

var eventList = [];
var mpid;

//new file

exports.handler = function (event, context) {

    console.log(event);
    console.log(event.Records);
   // const record = event.Records;
    for (const record of event.Records) {
        const payloadString = Buffer.from(record.kinesis.data, 'base64').toString('ascii');
        console.log(payloadString);
        const payload = JSON.parse(payloadString);
        console.log(payload);
        const events = payload.events;
        mpid = payload.mpid.toString();
        var amazonPersonalizeId = mpid;
        if(payload.user_attributes && payload.user_attributes.amazonPersonalizeId)
            amazonPersonalizeId = payload.user_attributes.amazonPersonalizeId;

        var amazonUserId = mpid;
        if(payload.user_identities){
            for (const identityRecord of payload.user_identities)
            {
                if(identityRecord.identity_type==="customer_id")
                    amazonUserId = identityRecord.identity;
            }
        }
        const sessionId = payload.message_id;
        let params = {
            sessionId: sessionId,
            userId: amazonPersonalizeId,
            trackingId: trackingId
        };
        console.log(params);
        // Check for variant and assign one if not already assigned
        var variant_assigned;
        var variant;
        if(payload.user_attributes && payload.user_attributes.ml_variant)
        {
            variant_assigned = Boolean(payload.user_attributes.ml_variant); 
            variant = variant_assigned ? payload.user_attributes.ml_variant : Math.random() > 0.5 ? "A" : "B";
        }
        console.log(events);
        for (const e of events) {
            if (e.event_type === "commerce_event" && report_actions.indexOf(e.data.product_action.action) >= 0) {
                const timestamp = Math.floor(e.data.timestamp_unixtime_ms / 1000);
                const action = e.data.product_action.action;
                const event_id = e.data.event_id;
                const discount = Math.random() > 0.5 ? "Yes" : "No";
                for (const product of e.data.product_action.products) {
                    const obj = {itemId: product.id,discount: discount};

                    if(eventList.length > 10){
                        eventList.shift();
                        
                    }
                    eventList.push({
                        properties: obj,
                        sentAt: timestamp,
                        eventId: event_id,
                        eventType: action
                    });
                }
            }
        }

        console.log("Event List:");
        console.log(eventList);
        console.log(eventList.length);
        if(eventList.length > 10)
        {
            console.log("eventList more than 10");
            console.log(eventList);
            var lastTenRecords = eventList.length / 2;
            eventList = eventList.slice(lastTenRecords);
            console.log("eventList after slice");
            console.log(eventList);
        }
        if (eventList.length > 0) {
            params.eventList = eventList;
                personalizeevents.putEvents(params, async function(err, data) {
                if (err) 
                {
                    console.log(err);
                    console.log(err, err.stack);
                }
                else {
                    //getProductPersonalization
                    let params = {
                      // Select campaign based on variant
                      campaignArn: campaignArn,
                      numResults: '5',
                      userId: amazonPersonalizeId
                    };
                    console.log(params);
                    personalizeruntime.getRecommendations(params, async function(err, data) {
                      if (err)
                      {
                        console.log(err);
                          console.log(err, err.stack);
                      }
                      else {
                          
                          let batch = new mParticle.Batch(mParticle.Batch.Environment.development);
                          batch.mpid = mpid;
                          let itemList = [];
                          var productNameList = [];
                          let promises = [];
                          for (let item of data.itemList) {
                              itemList.push(item.itemId);
                              var url = "https://products.stridesolution.com/products/id/"+item.itemId;
                              
                              promises.push(axios.get(url));
                              promises.push(
                                axios.get(url).then(response => {
                                  // do something with response
                                  productNameList.push(response.data.name);
                                })
                              );
                          }

                          await Promise.all(promises).then(() => console.log(productNameList));
                          batch.user_attributes = {};
                          batch.user_attributes.product_recs = itemList;
                          console.log("ItemList");
                          console.log(itemList);
                          console.log("poductNameList");
                          console.log(productNameList);
                          // Record variant on mParticle user profile
                          if (!variant_assigned) {
                              batch.user_attributes.ml_variant = variant
                              batch.user_attributes.product_recs_name=productNameList;
                          }
    
                          let event = new mParticle.AppEvent(mParticle.AppEvent.CustomEventType.other, 'AWS Product Personalization Recs Update');
                          event.custom_attributes = {product_recs: itemList.join()};
                          batch.addEvent(event);
                          var body = [batch]; // {[Batch]} Up to 100 Batch objects
                          
                          let mp_callback = async function(error, data, response) {
                              if (error) {
                                  console.error(error);
                                } else {
                                  console.log('API called successfully.');
                                }
                              };
                        
                          mp_api.bulkUploadEvents(body, mp_callback);
                          //uploadEvents(body, batch);
                         // return 'Success';
                      }
                    });
    
                }
            });
        }
    }

    console.log("Successfully processed");
 
   

};
```


## Validate that Real-Time Events are Flowing to AWS Kinesis

To validate if events being captured from mParticle are being sent to Kinesis, you would need to go back to the mParticle UI/Platform and Click Data Master then Livestream.

Under Message Direction, select Both In and Out

![mParticle Verification Setup3](images/mparticle/mparticle-verification-step-kinesis.png).

Go back to the Retail Demo Store Web App, and do a eCommerce event. This can be done by viewing a product, click add to Cart, Checkout or Purchase.

You should see the following entries within Livestream which will contain Amazon Kinesis with an outward arrow.

![mParticle Verification Setup3](images/mparticle/mparticle-verification-step-kinesis2.png).

If you haven't seen any outbound events generated to Amazon Kinesis, you might need to wait for a while before the settings are applied properly.


## Save AWS Personalize Recommended Products back to mParticle

Aside from just sending events from the AWS Retail Demo Store to mParticle, the Lambda function above also sends the commerce events to AWS Personalize. This allows AWS Personalize to receive specific commerce events made by a anonymous and known user and from there allow AWS Personalize to do its magic by providing product recommendation information back to mParticle. Once AWS Personalize has finished computing the relevant products that is associated to the recent events the customer has made, the said product recommendation information will be sent back to mParticle using the mParticle NodeJS SDK. The said code snippet below will set the product_recommendation information as a user attribute (product_recs) within the user's profile. 

```javascript
  if (eventList.length > 0) {
            params.eventList = eventList;
                personalizeevents.putEvents(params, async function(err, data) {
                if (err) 
                {
                    console.log(err);
                    console.log(err, err.stack);
                }
                else {
                    //getProductPersonalization
                    let params = {
                      // Select campaign based on variant
                      campaignArn: campaignArn,
                      numResults: '5',
                      userId: amazonPersonalizeId
                    };
                    console.log(params);
                    personalizeruntime.getRecommendations(params, async function(err, data) {
                      if (err)
                      {
                        console.log(err);
                          console.log(err, err.stack);
                      }
                      else {
                          
                          let batch = new mParticle.Batch(mParticle.Batch.Environment.development);
                          batch.mpid = mpid;
                          let itemList = [];
                          var productNameList = [];
                          let promises = [];
                          for (let item of data.itemList) {
                              itemList.push(item.itemId);
                              var url = "https://products.stridesolution.com/products/id/"+item.itemId;
                              
                              promises.push(axios.get(url));
                              promises.push(
                                axios.get(url).then(response => {
                                  // do something with response
                                  productNameList.push(response.data.name);
                                })
                              );
                          }

                          await Promise.all(promises).then(() => console.log(productNameList));
                          batch.user_attributes = {};
                          batch.user_attributes.product_recs = itemList;
                          console.log("ItemList");
                          console.log(itemList);
                          console.log("poductNameList");
                          console.log(productNameList);
                          // Record variant on mParticle user profile
                          if (!variant_assigned) {
                              batch.user_attributes.ml_variant = variant
                              batch.user_attributes.product_recs_name=productNameList;
                          }
    
                          let event = new mParticle.AppEvent(mParticle.AppEvent.CustomEventType.other, 'AWS Product Personalization Recs Update');
                          event.custom_attributes = {product_recs: itemList.join()};
                          batch.addEvent(event);
                          var body = [batch]; // {[Batch]} Up to 100 Batch objects
                          
                          let mp_callback = async function(error, data, response) {
                              if (error) {
                                  console.error(error);
                                } else {
                                  console.log('API called successfully.');
                                }
                              };
                        
                          mp_api.bulkUploadEvents(body, mp_callback);
                          //uploadEvents(body, batch);
                         // return 'Success';
                      }
                    });
    
                }
            });
        }
```

![mParticle product_recs](images/mparticle/mparticle-product_recs1.png)
![mParticle product_recs](images/mparticle/mparticle-product_recs2.png)



