# A Complex `value_from:` Policy

These are kind of hard to work with because we have several things.

1. A CEL policy

2. A Resource

3. A Current Time

4. Additional Data that must be used as part of policy 

This fourth component requires a more sophisticated mock function for testing purposes.


In [1]:
policy_text = """
name: asg-invalid-asset-value-mark
resource: asg
comment: 'Report on any ASGs that use an ASSET that isn''t valid.

  '
filters:
- tag:custodian_asset: absent
- key: CreatedTime
  op: greater-than
  type: value
  value: 0.011
  value_type: age
- key: tag:ASSET
  op: not-in
  type: value
  value_from:
    expr: all_values.*
    format: json
    url: s3://cloud-governance/custodian/asset_list.json
actions:
- tag: custodian_asset
  type: tag
  value: AutoScaling Group does not have a valid ASSET tag value
- action_desc: The ASSET tag should be updated, both on the ASG and in any template
    or config files.
  cc:
  - custodian@capitalone.com
  from: custodian@capitalone.com
  policy_url: https://enterprise.com/docs/cloud_policy.html
  subject: '[custodian][{{ account }}] AutoScaling Group with invalid ASSET value -
    {{ region }}'
  template: fs-default.html
  to:
  - resource-owner
  transport:
    topic: arn:aws:sns:{region}:123456789012:c7n-notifications
    type: sns
  type: notify
  violation_desc: 'The following AutoScaling Group(s) are tagged with an invalid ASSET
    value:'
"""

In [2]:
from xlate.c7n_to_cel import C7N_Rewriter
cel_text = C7N_Rewriter.c7n_rewrite(policy_text)
print(cel_text)

absent(Resource["Tags"].filter(x, x["Key"] == "custodian_asset")[0]["Value"]) && Now - duration("15m50s") > timestamp(Resource["CreatedTime"]) && ! value_from("s3://cloud-governance/custodian/asset_list.json", "json").jmes_path('all_values.*').contains(Resource["Tags"].filter(x, x["Key"] == "ASSET")[0]["Value"])


Reformatting:

    absent(Resource["Tags"].filter(x, x["Key"] == "custodian_asset")[0]["Value"]) 
    && Now - duration("15m50s") > timestamp(Resource["CreatedTime"]) 
    && ! value_from("s3://cloud-governance/custodian/asset_list.json", "json")
        .jmes_path('all_values.*').contains(
            Resource["Tags"].filter(x, x["Key"] == "ASSET")[0]["Value"]
        )

How can we test with `value_from`?? Do we really need access to an S3? 

It turns out, we can provide our own extension function that provides appropriate test data. We can replace the `value_from()` function.

In [4]:
import celpy.celtypes
from celpy.adapter import json_to_cel

def my_value_from(url, format):
    print(f"Reading {url} in {format}")
    the_document = {"all_values": {"name": "in_the_list"}}
    return json_to_cel(the_document)

In [8]:
import celpy
import celpy.c7nlib
from typing import Dict, Any

class CELFilter:
    decls = {
        "Resource": celpy.celtypes.MapType,
        "Now": celpy.celtypes.TimestampType,
    }

    def __init__(self, expr: str) -> None:
        env = celpy.Environment(annotations=CELFilter.decls)
        ast = env.compile(expr)
        self.functions = celpy.c7nlib.FUNCTIONS
        self.functions["value_from"] = my_value_from
        self.prgm = env.program(ast, self.functions)
        
    def process(self, resource: celpy.celtypes.Value, now: str) -> bool:
        activation = {
            "Resource": resource,
            "Now": celpy.celtypes.TimestampType(now),
        }
        return self.prgm.evaluate(activation)

We can update `c7nlib.FUNCTIONS` with the new function.

    import celpy.c7nlib
    my_functions = celpy.c7nlib.FUNCTIONS
    my_functions["value_from"] = my_value_from

Now, we can 

    import celpy
    decls = {
        "Resource": celpy.celtypes.MapType,
        "Now": celpy.celtypes.TimestampType,
    }
    env = celpy.Environment(annotations=decls)
    ast = env.compile(cel_text)
    # Bind my_functions with the CEL code...
    prgm = env.program(ast, functions=my_functions)

In [9]:
cel_text = """
absent(Resource["Tags"].filter(x, x["Key"] == "custodian_asset")[0]["Value"]) 
&& Now - duration("15m50s") > timestamp(Resource["CreatedTime"]) 
&& ! value_from("s3://cloud-governance/custodian/asset_list.json", "json").jmes_path('all_values.*').contains(Resource["Tags"].filter(x, x["Key"] == "ASSET")[0]["Value"])
"""
filter_1 = CELFilter(cel_text)

In [11]:
sample_doc_in = {
    "Tags": [
        {"Key": "ASSET", "Value": "in_the_list"},
    ],
    "CreatedTime": "2020-10-17T18:00:00Z"
}
now = "2020-10-17T18:19:20Z"
activation_in = {
    "Resource": celpy.adapter.json_to_cel(sample_doc_in),
    "Now": celpy.celtypes.TimestampType("2020-10-17T18:19:20Z"),
}
filter_1.process(json_to_cel(sample_doc_in), now)

Reading s3://cloud-governance/custodian/asset_list.json in json


BoolType(False)

In [13]:
sample_doc_not_in = {
    "Tags": [
        {"Key": "ASSET", "Value": "not_in_the_list"},
    ],
    "CreatedTime": "2020-10-17T18:00:00Z"
}
filter_1.process(json_to_cel(sample_doc_not_in), now)

Reading s3://cloud-governance/custodian/asset_list.json in json


False

## Debugging Approach

Breaking the CEL (or the resource description) into pieces, and evaluating each piece.

In [14]:
jmes_path_cel_1 = """
value_from("s3://cloud-governance/custodian/asset_list.json", "json")
"""
CELFilter(jmes_path_cel_1).process(json_to_cel(sample_doc_not_in), now)

Reading s3://cloud-governance/custodian/asset_list.json in json


MapType({StringType('all_values'): MapType({StringType('name'): StringType('in_the_list')})})

In [15]:
jmes_path_cel_2 = """
value_from("s3://cloud-governance/custodian/asset_list.json", "json")
.jmes_path('all_values.*')
"""
CELFilter(jmes_path_cel_2).process(json_to_cel(sample_doc_not_in), now)

Reading s3://cloud-governance/custodian/asset_list.json in json


ListType([StringType('in_the_list')])

In [16]:
jmes_path_cel_3 = """
value_from("s3://cloud-governance/custodian/asset_list.json", "json")
.jmes_path('all_values.*').contains("in_the_list")
"""
CELFilter(jmes_path_cel_3).process(json_to_cel(sample_doc_not_in), now)

Reading s3://cloud-governance/custodian/asset_list.json in json


BoolType(True)