In [39]:
import os

# Accessing environment variables

# # Adobe OAuth - for generating bearer token
AUTH_URL = os.environ.get("AUTH_URL")
CLIENT_ID = os.environ.get("CLIENT_ID")
CLIENT_SECRET = os.environ.get("CLIENT_SECRET")

# # AEP stream params
POST_URL = os.environ.get("POST_URL")
SANDBOX_NAME = os.environ.get("SANDBOX_NAME")
FLOW_ID = os.environ.get("FLOW_ID")
SCHEMA_ID = os.environ.get("SCHEMA_ID")
IMS_ORG_ID = os.environ.get("IMS_ORG_ID")
DATASET_ID = os.environ.get("DATASET_ID")
SOURCE_NAME = os.environ.get("SOURCE_NAME")

In [40]:
import time
import requests

class TokenManager:
    _instance = None

    def __new__(cls, *args, **kwargs):
        print('current instance is ', cls._instance)
        if not cls._instance:
            cls._instance = super(TokenManager, cls).__new__(cls)
            cls._instance._initialized = False
        return cls._instance

    def __init__(self, auth_url, client_id, client_secret):
        if self._initialized:
            return
        self.auth_url = auth_url
        self.client_id = client_id
        self.client_secret = client_secret
        self.token = None
        self.token_expiry = 0
        self._initialized = True

    def get_bearer_token(self):
        # Data for the POST request
        data = {
            'grant_type': 'client_credentials',
            'client_id': self.client_id,
            'client_secret': self.client_secret,
            'scope': 'AdobeID,openid,read_organizations,additional_info.job_function,additional_info.projectedProductContext,additional_info.roles'
        }
        
        # Make the POST request
        response = requests.post(self.auth_url, headers={'Content-Type': 'application/x-www-form-urlencoded'}, data=data)
        
        if response.status_code == 200:
            data = response.json()
            self.token = data['access_token']
            # Store the expiry time (current time + expires_in)
            self.token_expiry = time.time() + data['expires_in']
            print("Token retrieved successfully")
            print(f"Token expiry time set to: {self.token_expiry} (current time: {time.time()})")
        else:
            print("Failed to retrieve token")
            response.raise_for_status()

    def is_token_expired(self):
        return time.time() >= self.token_expiry

    def get_valid_token(self):
        if self.token is None or self.is_token_expired():
            print("Token expired or not available, fetching a new one")
            self.get_bearer_token()
        return self.token


In [41]:
class APIClient:
    def __init__(self, token_manager, post_url):
        self.token_manager = token_manager
        self.post_url = post_url

    def make_post_request(self, payload, sandbox_name, flow_id):
        token = self.token_manager.get_valid_token()
        headers = {
            'Authorization': f'Bearer {token}',
            'Content-Type': 'application/json',
            'sandbox-name': sandbox_name,
            'x-adobe-flow-id': flow_id
        }
        
        response = requests.post(self.post_url, headers=headers, json=payload)
        
        if response.status_code == 200:
            print("POST request successful")
            return response.json()
        else:
            print("POST request failed")
            response.raise_for_status()

In [42]:
class IdentityData:
    @staticmethod
    def get_identity(email, phone, user_id):
        return {
            "emailId": email,
            "phoneNumber": phone,
            "userID": user_id
        }

In [43]:
class Person:
    def __init__(self, birth_date, birth_year, gender, marital_status, first_name, last_name, nationality):
        self.birth_date = birth_date
        self.birth_year = birth_year
        self.gender = gender
        self.marital_status = marital_status
        self.name = {
            "firstName": first_name,
            "lastName": last_name
        }
        self.nationality = nationality

    @staticmethod
    def get_person(birth_date, birth_year, gender, marital_status, first_name, last_name, nationality):
        return {
            "birthDate": birth_date,
            "birthYear": birth_year,
            "gender": gender,
            "maritalStatus": marital_status,
            "name": {
                "firstName": first_name,
                "lastName": last_name
            },
            "nationality": nationality
        }

In [44]:
class PersonalEmail:
    def __init__(self, email_address):
        self.email_address = email_address

    def get_address(self):
        return {
            "address": self.email_address
        }

In [45]:
class TestProfile:
    
    @staticmethod
    def is_test_profile(test_profile_flag):
        if (test_profile_flag):
            return True

In [46]:
class Subscription:
    def __init__(self, SKU, subscription_id, billing_period, billing_start_date,
                 category, end_date, payment_method, payment_status, plan_name, reason,
                 renew, revision, start_date, status, sub_category, subscriber_name, term, term_unit_of_time, top_up, sub_type):
        self.SKU = SKU
        self.subscription_id = subscription_id
        self.billing_period = billing_period
        self.billing_start_date = billing_start_date
        self.category = category
        self.end_date = end_date
        self.payment_method = payment_method
        self.payment_status = payment_status
        self.plan_name = plan_name
        self.reason = reason
        self.renew = renew
        self.revision = revision
        self.start_date = start_date
        self.status = status
        self.sub_category = sub_category
        self.subscriber = {
            "name": {
                "courtesyTitle": subscriber_name['courtesy_title'],
                "firstName": subscriber_name['first_name'],
                "fullName": subscriber_name['full_name'],
                "lastName": subscriber_name['last_name'],
                "middleName": subscriber_name['middle_name'],
                "suffix": subscriber_name['suffix']
            }
        }
        self.term = term
        self.term_unit_of_time = term_unit_of_time
        self.top_up = top_up
        self.type = sub_type

    @staticmethod
    def get_subscription(SKU, subscription_id, billing_period, billing_start_date, category,
                                   end_date, payment_method, payment_status, plan_name, reason, renew, revision, start_date,
                                   status, sub_category, subscriber_name, term, term_unit_of_time, top_up, sub_type):
        return {
            "SKU": SKU,
            "_id": subscription_id,
            "billingPeriod": billing_period,
            "billingStartDate": billing_start_date,
            "category": category,
            "endDate": end_date,
            "paymentMethod": payment_method,
            "paymentStatus": payment_status,
            "planName": plan_name,
            "reason": reason,
            "renew": renew,
            "revision": revision,
            "startDate": start_date,
            "status": status,
            "subCategory": sub_category,
            "subscriber": {
                "name": {
                    "courtesyTitle": subscriber_name['courtesy_title'],
                    "firstName": subscriber_name['first_name'],
                    "fullName": subscriber_name['full_name'],
                    "lastName": subscriber_name['last_name'],
                    "middleName": subscriber_name['middle_name'],
                    "suffix": subscriber_name['suffix']
                }
            },
            "term": term,
            "termUnitOfTime": term_unit_of_time,
            "topUp": top_up,
            "type": sub_type
        }

In [47]:
class XDMEntity:
    def __init__(self, identity, person, personalEmail, subscriptions):
        self.identity = identity
        self.person = person
        self.subscriptions = subscriptions
        self.personalEmail = personalEmail

    def get_xdm_entity(self):
        return {
            "_adlsplatformemea": {
                "customUserData": self.identity
            },
            "person": self.person,
            "personalEmail": self.personalEmail,
            "subscriptions": self.subscriptions
        }

In [48]:
class XDMPayloadGenerator:
    def __init__(self, schema_id, ims_org_id, dataset_id, source_name):
        self.schema_id = schema_id
        self.ims_org_id = ims_org_id
        self.dataset_id = dataset_id
        self.source_name = source_name

    def generate_payload(self, xdm_entity):
        return {
            "header": {
                "schemaRef": {
                    "id": self.schema_id,
                    "contentType": "application/vnd.adobe.xed-full+json;version=1.0"
                },
                "imsOrgId": self.ims_org_id,
                "datasetId": self.dataset_id,
                "source": {
                    "name": self.source_name
                }
            },
            "body": {
                "xdmMeta": {
                    "schemaRef": {
                        "id": self.schema_id,
                        "contentType": "application/vnd.adobe.xed-full+json;version=1.0"
                    }
                },
                "xdmEntity": xdm_entity
            }
        }

In [56]:
import json

# Function to load user data from JSON file
def load_user_data(file_path):
    with open(file_path, 'r') as file:
        return json.load(file)

# Load user data
user_data_list = load_user_data('users_data.json')

# Function to generate customer data payload
def generate_customer_data(user):
    identity = IdentityData.get_identity(
        email=user["email"],
        phone=user["phone"],
        user_id=user["user_id"]
    )
    
    person = Person.get_person(
        birth_date=user["birth_date"],
        birth_year=user["birth_year"],
        gender=user["gender"],
        marital_status=user["marital_status"],
        first_name=user["first_name"],
        last_name=user["last_name"],
        nationality=user["nationality"]
    )

    personalEmail = PersonalEmail(user["email"]).get_address()
    
    subscriptions = [Subscription.get_subscription(**sub) for sub in user["subscriptions"]]
    
    customer_data = {
        "customUserData": identity,
        "person": person,
        "personalEmail": personalEmail,
        "subscriptions": subscriptions
    }
    
    if user.get("test_profile", False):
        customer_data["testProfile"] = True
    
    return customer_data

In [57]:
# Data Stream 2
if __name__ == "__main__":

    # Initialize token manager
    token_manager = TokenManager(AUTH_URL, CLIENT_ID, CLIENT_SECRET)

    # Initialize API client
    api_client = APIClient(token_manager, POST_URL)
    
    # Generate XDM payload
    xdm_generator = XDMPayloadGenerator(SCHEMA_ID, IMS_ORG_ID, DATASET_ID, SOURCE_NAME)

    # Generate and send data for each user
    for user in user_data_list:
        customer_data = generate_customer_data(user)
        xdm_entity = XDMEntity(customer_data["customUserData"],
                               customer_data["person"],
                               customer_data["personalEmail"],
                               customer_data["subscriptions"])
        xdm_entity_data = xdm_entity.get_xdm_entity()
        if customer_data.get("testProfile", False):
            xdm_entity_data["testProfile"] = True
        print(xdm_entity_data, "\n")
        payload = xdm_generator.generate_payload(xdm_entity_data)
        data = api_client.make_post_request(payload, SANDBOX_NAME, FLOW_ID)
        print(data, "\n ")

current instance is  <__main__.TokenManager object at 0x107d60ec0>
{'_adlsplatformemea': {'customUserData': {'emailId': 'ackm_ajo_testing_1234500459905@yopmail.com', 'phoneNumber': '+1234500459905', 'userID': '1234500459905'}}, 'person': {'birthDate': '1983-01-12', 'birthYear': 1983, 'gender': 'male', 'maritalStatus': 'single', 'name': {'firstName': '1234500459905 AA', 'lastName': 'BB'}, 'nationality': 'US'}, 'personalEmail': {'address': 'ackm_ajo_testing_1234500459905@yopmail.com'}, 'subscriptions': [{'SKU': 'SampleSKU1', '_id': 'Sub1', 'billingPeriod': 'Monthly', 'billingStartDate': '2023-01-01', 'category': 'Software', 'endDate': '2024-01-01', 'paymentMethod': 'Credit Card', 'paymentStatus': 'Paid', 'planName': 'Standard Plan', 'reason': 'No reason', 'renew': 'Yes', 'revision': '1', 'startDate': '2023-01-01', 'status': 'Active', 'subCategory': 'Pro', 'subscriber': {'name': {'courtesyTitle': 'Mr.', 'firstName': '1234500459905', 'fullName': '1234500459905 Doe', 'lastName': '1234500459