In [None]:
import boto3
import requests
import hashlib
import json
import botocore
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
import pandas as pd

In [None]:
boto_session= boto3.Session(profile_name="") #add profile of account that can assume role in all child accounts
CHILD_ROLE="" #name of role depoyed to all accounts in the organization



In [None]:
def get_aws_accounts():

    """
    This function retrieves a list of all AWS accounts within the organization, 
    filtered by account status (active or inactive).

    Parameters:
    None

    Returns:
    A list of AWS accounts, each represented as a dictionary with the following 
    keys: 'Id', 'Name', and 'Status'.

    Raises:
    botocore.exceptions.ClientError: If there is an error making the API call.
    """
    org_client = boto_session.client('organizations')
    filters = [{'Type': 'ACCOUNT_STATUS', 'Condition': 'EQ', 'Value': 'ACTIVE'}]

    # Get the list of accounts with pagination
    accounts = []
    paginator = org_client.get_paginator('list_accounts')
    page_iterator = paginator.paginate()

    for page in page_iterator:
        active_accounts = [account for account in page['Accounts'] if account['Status'] == 'ACTIVE']

        accounts.extend(active_accounts)
    return accounts

In [None]:
def get_access_keys(account_id, role_name):
    sts_client = boto_session.client('sts')
    
    role_arn = f'arn:aws:iam::{account_id}:role/{role_name}'
    
    assumed_role = sts_client.assume_role(
        RoleArn=role_arn,
        RoleSessionName='AssumeRoleSession'
    )
    assumed_credentials=assumed_role['Credentials']
    
    client={
        'access_key' : assumed_credentials['AccessKeyId'],
        'secret_key': assumed_credentials['SecretAccessKey'],
        'token': assumed_credentials['SessionToken']
        }
  
    return client

In [None]:
def get_marketplace_subscriptions(account):
    account_id=account['Id']
    creds=get_access_keys(account_id,CHILD_ROLE)
    payload_data = "{\"domain\":\"AWSMarketplace\",\"filters\":[{\"name\":\"STATUS\",\"values\":[\"Active\"]},{\"name\":\"ACTOR\",\"values\":[\"ACCEPTOR\"]}],\"maxResults\":50}"
    service='aws-marketplace'
    url='https://commerce.us-east-1.marketplace.aws.a2z.com/'
    method='POST'
    path="/"
    headers = {
    "X-Amz-Content-sha256": hashlib.sha256(payload_data.encode('utf-8')).hexdigest(),  # Replace with actual request body
    'host':'commerce.us-east-1.marketplace.aws.a2z.com',
    'content-length':str(len(payload_data)),
    'X-Amz-Target': 'AWSMPCommerceService_v20200301.SearchAgreements',
  'Content-Type': 'application/x-amz-json-1.0',
    }
    session = botocore.session.Session()
    session.set_credentials(**creds)
    session.set_config_variable('region', 'us-east-1')
    sigv4 = SigV4Auth(session.get_credentials(), service, 'us-east-1')
    request = AWSRequest(method='POST', url=url, data=payload_data, headers=headers)
    #request.context["payload_signing_enabled"] = False # payload signing is not supported
    sigv4.add_auth(request)
    
    prepped = request.prepare()
    response = requests.post(prepped.url, headers=prepped.headers, data=payload_data) 
    #print(response.status_code)
    # print(response.text)
    return json.loads(response.text)
 

In [None]:
def get_marketplace_product(account,product_ids):
    account_id=account['Id']
    products=[]
    if len(product_ids)<1:
      return []
    creds=get_access_keys(account_id,CHILD_ROLE)
    payload_data = {"filters":[{"type":"ProductId","values":product_ids}]}
    service='aws-marketplace'
    url='https://discovery.marketplace.us-east-1.amazonaws.com/'
    method='POST'
    path="/"
    headers = {
    "X-Amz-Content-sha256": hashlib.sha256(json.dumps(payload_data).encode('utf-8')).hexdigest(),  # Replace with actual request body
    'host':'discovery.marketplace.us-east-1.amazonaws.com',
    'content-length':str(len(json.dumps(payload_data))),
    'X-Amz-Target': 'AWSMPControlPlane.DescribeListings',
    'Content-Type': 'application/x-amz-json-1.1',
    'Accept-Encoding':'gzip, deflate, br'
    }
    session = botocore.session.Session()
    session.set_credentials(**creds)
    session.set_config_variable('region', 'us-east-1')
    sigv4 = SigV4Auth(session.get_credentials(), service, 'us-east-1')
    request = AWSRequest(method='POST', url=url, data=payload_data, headers=headers)
    #request.context["payload_signing_enabled"] = False # payload signing is not supported
    sigv4.add_auth(request)
    
    prepped = request.prepare()
    response = requests.post(prepped.url, headers=prepped.headers, data=json.dumps(payload_data)) 
    res=json.loads(response.text)
    
    for product in res['listings']:
       
       product_details=json.loads(product['detail'])
       product_type=product_details['products'][0]['fulfillmentOptionTypes'][0]['displayName']
       products.append({"account_name":account['Name'],"account_id":account_id,"product_id":product_details['id'],"product_title":product_details['title'],"product_vendor":product_details['vendor']['name'],"product_description":product_details['shortDescription'],"product_type":product_type})
    
    try:
      return products
    except:
       print(json.loads(response.text))
       return []


In [None]:
accounts=get_aws_accounts()
all_products=[]
for account in accounts:
    subscriptions=get_marketplace_subscriptions(account)['agreementViewSummaries']
    #print(subscriptions)
    products=[]
    for sub in subscriptions:
        products.extend(sub['entitledProducts'])
    #print(products)
    products=get_marketplace_product(account,products)
    all_products.extend(products)

In [None]:
len(all_products)

In [None]:
df=pd.DataFrame(all_products)

In [None]:
df.head(35)

In [None]:
df['account_id']=df['account_id'].astype(str)
df.to_excel('marketplace_subscriptions.xlsx')