Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 46 additions & 12 deletions aws_lambda_powertools/utilities/parser/models/cloudwatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,55 @@


class CloudWatchLogsLogEvent(BaseModel):
id: str # noqa AA03 VNE003
timestamp: datetime
message: Union[str, Type[BaseModel]]
id: str = Field(
description="Unique identifier for the log event within the batch.",
examples=["eventId1", "abc123def456"],
)
timestamp: datetime = Field(
description="The time when the event occurred in milliseconds since Jan 1, 1970 00:00:00 UTC.",
examples=[1673779200000],
)
message: Union[str, Type[BaseModel]] = Field(
description="The actual log message string or structured JSON payload emitted by the service or application.",
examples=["This is a sample log message", '{"statusCode":200,"path":"/hello"}'],
)


class CloudWatchLogsDecode(BaseModel):
messageType: str
owner: str
logGroup: str
logStream: str
subscriptionFilters: List[str]
logEvents: List[CloudWatchLogsLogEvent]
policyLevel: Optional[str] = None
messageType: str = Field(
description="The type of CloudWatch Logs message.",
examples=["DATA_MESSAGE", "CONTROL_MESSAGE"],
)
owner: str = Field(description="The AWS account ID of the originating log data.", examples=["123456789012"])
logGroup: str = Field(
description="The name of the log group that contains the log stream.",
examples=["/aws/lambda/my-function", "/aws/apigateway/my-api"],
)
logStream: str = Field(
description="The name of the log stream that stores the log events.",
examples=["2023/01/15/[$LATEST]abcdef1234567890", "i-1234567890abcdef0"],
)
subscriptionFilters: List[str] = Field(
description="List of subscription filter names associated with the log group.",
examples=[["LambdaStream_cloudwatch", "AlertFilter"]],
)
logEvents: List[CloudWatchLogsLogEvent] = Field(
description="Array of log events included in the message.",
examples=[[{"id": "eventId1", "timestamp": 1673779200000, "message": "Sample log line"}]],
)
policyLevel: Optional[str] = Field(
default=None,
description="Optional field specifying the policy level applied to the subscription filter, if present.",
examples=["ACCOUNT", "LOG_GROUP"],
)


class CloudWatchLogsData(BaseModel):
decoded_data: CloudWatchLogsDecode = Field(..., alias="data")
decoded_data: CloudWatchLogsDecode = Field(
...,
alias="data",
description="Decoded CloudWatch log data payload after base64 decoding and decompression.",
)

@field_validator("decoded_data", mode="before")
def prepare_data(cls, value):
Expand All @@ -42,4 +74,6 @@ def prepare_data(cls, value):


class CloudWatchLogsModel(BaseModel):
awslogs: CloudWatchLogsData
awslogs: CloudWatchLogsData = Field(
description="Top-level CloudWatch Logs model containing the AWS logs data section.",
)