In [None]:
%run 'common-functions.ipynb'

In [None]:
from botocore.config import Config
from boto3.dynamodb.conditions import Attr

# aws setup
BOTO3_CONFIG = Config(retries={"max_attempts": 10, "mode": "adaptive"})
lambda_client = boto3.client("lambda", region_name="eu-west-1", config=BOTO3_CONFIG)

EXEMPTED_ACCOUNT_IDS = [
    "157234191091"
]  # populate with any exempted accounts you wish to filter out


In [None]:
@dask.delayed
def invoke_lambda(account_id, log_group_name, account_region):
    invocation_type = "DryRun" if DRY_RUN else "Event"
    return lambda_client.invoke(
        FunctionName=f"{HUB_NAME}-LMD_CW_LOG_GROUP_OPTIMIZER",
        InvocationType=invocation_type,
        Payload=json.dumps(
            {
                "account": account_id,
                "detail": {
                    "requestParameters": {
                        "logGroupName": log_group_name,
                    },
                    "awsRegion": account_region,
                },
            }
        ),
    )


@dask.delayed
def get_log_groups(client, account_id, account_region):
    paginator = client.get_paginator("describe_log_groups")
    page_iterator = paginator.paginate()
    data = []
    data = [page["logGroups"] for page in page_iterator]
    df = pd.DataFrame([item for sublist in data for item in sublist])
    df["account_id"] = account_id
    df["region"] = account_region
    return df


In [None]:
# fetch accounts from metadata once - so dont need to keep taxing ddb in case of issue
accounts = fetch_accounts_from_metadata(
    exempted_account_ids=EXEMPTED_ACCOUNT_IDS)


In [None]:
%%time
# Fetch all cloudwatch log groups across all spokes that dont have a retentionInDays value set
master_df = pd.DataFrame()
dfs = []
print(f"Accounts to process: {len(accounts)}")
for account in accounts:
    account_id = account.get("account")
    account_region = account.get("region")
    if account_region:
        client = assume_return_boto_client("logs", account_id, account_region)
        dfs.append(get_log_groups(client, account_id, account_region))

graph = dask.delayed()(dfs)
results = graph.compute()
master_df = pd.concat(results)
master_df = master_df[master_df["retentionInDays"].isnull()]

In [None]:
# show data frame
master_df


In [None]:
%%time
lambda_process = []
# process all rows and invoke lambda
for index, row in master_df.iterrows():
    lambda_process.append(invoke_lambda(account_id=row['account_id'], log_group_name=row['logGroupName'], account_region=row['region']))

lambda_graph = dask.delayed()(lambda_process)
responses = lambda_graph.compute()