# Timestream Lambda Function Sample Application

This notebook demonstrates generating data, according to a schema defined by the user; deploying an AWS Lambda function to process it; and visualizing the data using Grafana.

## Step 1: Generate Data

## Imports

In [7]:
from abc import ABC, abstractmethod
import random
import time
from datetime import datetime, timedelta
import json

## Data Generator Base Class Definition

In [17]:
class DataGenerator(ABC):
    start_date: datetime
    end_date: datetime
    frequency: timedelta
    num_entities: int

    def __init__(self, start_date: datetime, end_date: datetime, frequency: timedelta, num_entities: int):
        self.start_date = start_date
        self.end_date = end_date
        self.frequency = frequency
        self.num_entities = num_entities

    @abstractmethod
    def generate(self, num_records, entities) -> list:
        pass

    def _generate_date_range(self):
        return
    
    def _generate_random_string(self, length):
        """Generate a random alphanumeric string of a given length."""
        letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
        return ''.join(random.choice(letters) for _ in range(length))
    

## DevOps Data Generator Subclass Definition

In [32]:
class DevOpsDataGenerator(DataGenerator):
    def generate(self) -> list:
        dates = self._generate_date_range()
        entities = []
        for _ in range(self.num_entities):
            entity = {
                "device_id": self._generate_random_string(9),
                "region": random.choice(["us-west-1", "us-east-1"]),
                # This allows the entity's measure values to be incremented without doing a costly
                # scan over previous records in the records list
                "most_recent_measures": {}
            }
            entities.append(entity)

        records = []
        current_date = self.start_date
        max_measure_variance = 1.2
        while current_date <= self.end_date:
            # Each entity has a record for a single timestamp
            for entity in entities:
                dimensions = []
                for dimension_name in entity:
                    if dimension_name != "most_recent_measures":
                        dimension = {
                            "Name": dimension_name,
                            "Value": entity[dimension_name]
                        }
                        dimensions.append(dimension)

                # Set initial values for measures
                measures = {
                    "energy_usage": random.uniform(1, 10),
                    "cpu_usage": random.uniform(2, 20)
                } if current_date == self.start_date else {
                    "energy_usage": entity["most_recent_measures"]["energy_usage"] + random.uniform(-max_measure_variance, max_measure_variance),
                    "cpu_usage": entity["most_recent_measures"]["cpu_usage"] + random.uniform(-max_measure_variance, max_measure_variance)
                }
                entity["most_recent_measures"] = measures

                record = {
                    "Dimensions": dimensions,
                    "Time": str(int(current_date.timestamp())), # Millisecond precision
                    "Measures": measures
                }
                records.append(record)
            current_date += self.frequency
        return records


In [33]:
# Generate data using the new schema format
generator = DevOpsDataGenerator(start_date=datetime(2024, 1, 1, 1), end_date=datetime(2024, 9, 10, 11), frequency=timedelta(days=1), num_entities=4)
sample_data = generator.generate()

# Print generated data
print(json.dumps(sample_data, indent=2))

[
  {
    "Dimensions": [
      {
        "Name": "device_id",
        "Value": "UmSzYKCaL"
      },
      {
        "Name": "region",
        "Value": "us-east-1"
      }
    ],
    "Time": "1704099600",
    "Measures": {
      "energy_usage": 4.079206209313118,
      "cpu_usage": 16.71543965248445
    }
  },
  {
    "Dimensions": [
      {
        "Name": "device_id",
        "Value": "WoqqgLg29"
      },
      {
        "Name": "region",
        "Value": "us-east-1"
      }
    ],
    "Time": "1704099600",
    "Measures": {
      "energy_usage": 4.170465147220728,
      "cpu_usage": 3.7209129631883977
    }
  },
  {
    "Dimensions": [
      {
        "Name": "device_id",
        "Value": "Cl5E4YTJF"
      },
      {
        "Name": "region",
        "Value": "us-west-1"
      }
    ],
    "Time": "1704099600",
    "Measures": {
      "energy_usage": 4.175119484097989,
      "cpu_usage": 15.836309009332224
    }
  },
  {
    "Dimensions": [
      {
        "Name": "device_id",
     

In [None]:


def generate_random_string(length=6):
    """Generate a random alphanumeric string of a given length."""
    letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
    return ''.join(random.choice(letters) for _ in range(length))

def generate_timestream_data(schema, num_records=10):
    """
    Generate mock Timestream records with random dimension values and measures based on an input schema.
    
    Parameters:
    - schema: A dictionary containing 'dimensions' and 'measures' lists.
      Dimensions define 'DimensionName' and can optionally define a list of 'options' for random selection.
      Measures define 'MeasureName', 'MeasureValueType', and optionally 'MeasureValueRange' or 'options'.
      Example:
      {
          "dimensions": [
              {"DimensionName": "region", "options": ["us-west-2", "us-east-1"]},
              {"DimensionName": "device_id"}  # Will randomly generate a value if no options are given
          ],
          "measures": [
              {"MeasureName": "cost", "MeasureValueType": "DOUBLE", "MeasureValueRange": [100.0, 500.0]},
              {"MeasureName": "energy_consumption", "MeasureValueType": "DOUBLE", "MeasureValueRange": [50.0, 300.0]},
              {"MeasureName": "status", "MeasureValueType": "VARCHAR", "options": ["OK", "FAIL"]}
          ]
      }
    - num_records: Number of records to generate.

    Returns:
    - List of records suitable for ingestion into Timestream.
    """

    records = []

    for _ in range(num_records):
        record_time = str(int(time.time_ns()))  # Use a common nanosecond timestamp for all measures in the record

        # Extract dimensions from the schema and generate random values if needed
        record_dimensions = []
        for dim in schema.get("dimensions", []):
            dimension_name = dim['DimensionName']
            dimension_value = random.choice(dim.get("options", [generate_random_string()]))
            
            record_dimensions.append({
                "Name": dimension_name,
                "Value": dimension_value
            })

        # Extract measures from the schema and generate random values
        record_measures = []
        for measure in schema.get("measures", []):
            measure_name = measure.get("MeasureName")
            measure_type = measure.get("MeasureValueType")
            
            if measure_type == "DOUBLE":
                measure_value = round(random.uniform(*measure.get("MeasureValueRange", [0.0, 100.0])), 2)
            elif measure_type == "VARCHAR":
                measure_value = random.choice(measure.get("options", ["default"]))
            else:
                raise ValueError(f"Unsupported MeasureValueType: {measure_type}")

            record_measures.append({
                "MeasureName": measure_name,
                "MeasureValue": str(measure_value),
                "MeasureValueType": measure_type
            })

        # Construct the final record with multiple measures and dimensions
        record = {
            "Dimensions": record_dimensions,  # Use the generated dimensions
            "Time": record_time,  # Same timestamp for all measures in this record
            "Measures": record_measures  # Use the collected measures
        }

        records.append(record)

    return records

# Example input schema with separated dimensions and measures
schema = {
    "dimensions": [
        {"DimensionName": "region", "options": ["us-west-2", "us-east-1"]},
        {"DimensionName": "device_id", "options": ["fhncipr94k", "1o30plfurn", "1f49apz086"]}  # Random string if no options are provided
    ],
    "measures": [
        {"MeasureName": "cost", "MeasureValueType": "DOUBLE", "MeasureValueRange": [100.0, 500.0]},
        {"MeasureName": "energy_consumption", "MeasureValueType": "DOUBLE", "MeasureValueRange": [50.0, 300.0]},
        {"MeasureName": "status", "MeasureValueType": "VARCHAR", "options": ["OK", "FAIL"]}
    ]
}

dataset = SERVER_DATASET

# Generate data using the new schema format
sample_data = generate_timestream_data(schema, num_records=1000)

# Print generated data
print(json.dumps(sample_data, indent=2))

## Step 2: Deploy AWS Lambda Function

The following code will construct and deploy a Lambda function that ingests data to Timestream.

In [36]:
# AWS SDK and Lambda deployment
import boto3
import zipfile
import os
import json

REGION_NAME='us-west-2'
DATABASE_NAME = 'sample_app_database'
TABLE_NAME = 'sample_app_table'

# Initialize clients

iam_client = boto3.client('iam', region_name=REGION_NAME)
lambda_client = boto3.client('lambda', region_name=REGION_NAME)
sts_client = boto3.client('sts', region_name=REGION_NAME)
account_id = sts_client.get_caller_identity()['Account']

# Step 1: Create IAM Role for Lambda
role_name = "TimestreamLambdaRole"
assume_role_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {"Service": "lambda.amazonaws.com"},
            "Action": "sts:AssumeRole"
        }
    ]
}

role_arn = ""

try:
    create_role_response = iam_client.create_role(
        RoleName=role_name,
        AssumeRolePolicyDocument=json.dumps(assume_role_policy),
        Description="Role for Lambda to write to Timestream"
    )
    print(f"Created IAM Role: {role_name}")
    role_arn = create_role_response['Role']['Arn']
except iam_client.exceptions.EntityAlreadyExistsException:
    print(f"IAM Role {role_name} already exists")
    try:
        role_arn = iam_client.get_role(RoleName=role_name)['Role']['Arn']
    except iam_client.exceptions.NoSuchEntityException:
        print("IAM Role could not be found")
        raise

# CloudWatch logs policy to be added to the role
cloudwatch_logs_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": f"arn:aws:logs:{REGION_NAME}:{account_id}:log-group:/aws/lambda/TimestreamIoTLambda*"
        }
    ]
}

# Add the CloudWatch logs policy to the role
try:
    iam_client.put_role_policy(
        RoleName=role_name,
        PolicyName='CloudWatchLogsPolicy',
        PolicyDocument=json.dumps(cloudwatch_logs_policy)
    )
    print(f"Attached CloudWatch logs policy to role: {role_name}")
except Exception as e:
    print(f"Error attaching CloudWatch logs policy: {e}")

# Step 2: Attach Policy to the IAM Role
policy_arn = "arn:aws:iam::aws:policy/AmazonTimestreamFullAccess"
iam_client.attach_role_policy(
    RoleName=role_name,
    PolicyArn=policy_arn
)

print(f"Attached Timestream write policy to {role_name}")

# Step 3: Create Lambda Function Code
lambda_function_code = '''
import json
import os
import boto3
from botocore.exceptions import ClientError

REGION_NAME = os.environ['REGION_NAME']
DATABASE_NAME = os.environ['DATABASE_NAME']
TABLE_NAME = os.environ['TABLE_NAME']

# Initialize the Timestream client
timestream_client = boto3.client('timestream-write', REGION_NAME)

# Define your table retention properties
RETENTION_PROPERTIES = {
    'MemoryStoreRetentionPeriodInHours': 24,  # Adjust as needed
    'MagneticStoreRetentionPeriodInDays': 365  # Adjust as needed
}

def create_timestream_database_and_table():
    """
    Create Timestream database and table if they do not exist.
    """
    try:
        # Create database if it does not exist
        timestream_client.create_database(DatabaseName=DATABASE_NAME)
        print(f"Database '{DATABASE_NAME}' created successfully.")
    except ClientError as e:
        if e.response['Error']['Code'] == 'ConflictException':
            print(f"Database '{DATABASE_NAME}' already exists.")
        else:
            raise e  # Raise if it's a different error

    try:
        # Create table if it does not exist
        timestream_client.create_table(
            DatabaseName=DATABASE_NAME,
            TableName=TABLE_NAME,
            RetentionProperties=RETENTION_PROPERTIES
        )
        print(f"Table '{TABLE_NAME}' created successfully in database '{DATABASE_NAME}'.")
    except ClientError as e:
        if e.response['Error']['Code'] == 'ConflictException':
            print(f"Table '{TABLE_NAME}' already exists in database '{DATABASE_NAME}'.")
        else:
            raise e  # Raise if it's a different error

def lambda_handler(event, context):
    """
    Lambda function to process the request and ingest records into Timestream.
    The function accepts a list of records, handles MULTI measure types, 
    and sends data in batches of 100 records to Timestream.
    """

    # Create the database and table if they do not exist
    create_timestream_database_and_table()

    try:
        # Extract the records from the event
        body = event.get('body', '{}')
        parsed_body = json.loads(body)
        records = parsed_body.get('records', [])
        print(records)
        if not records:
            return {
                "statusCode": 400,
                "body": json.dumps("No records found in the request.")
            }

        # Process records in batches of 100
        for i in range(0, len(records), 100):
            records_batch = records[i:i + 100]
            # Prepare the records for Timestream
            prepared_records = []

            for record in records_batch:
                dimensions = record.get("Dimensions", [])
                time_value = record.get("Time")
                measures = record.get("Measures", [])

                # Check if there are multiple measures, in which case we'll use MULTI
                if len(measures) > 1:
                    measure_value_type = 'MULTI'
                    multi_value_measure = {
                        'MeasureName': measures[0]['MeasureName'],  # Example MeasureName; Timestream expects just one for MULTI
                        'MeasureValues': [
                            {
                                'Name': m['MeasureName'],
                                'Value': m['MeasureValue'],
                                'Type': m['MeasureValueType']
                            }
                            for m in measures
                        ]
                    }
                    prepared_record = {
                        'Dimensions': dimensions,
                        'Time': time_value,
                        'TimeUnit': 'MILLISECONDS',
                        'MeasureName': multi_value_measure['MeasureName'],  # Set MULTI MeasureName
                        'MeasureValueType': measure_value_type,
                        'MeasureValues': multi_value_measure['MeasureValues']
                    }
                else:
                    # Handle the case where there is only one measure
                    measure = measures[0]
                    prepared_record = {
                        'Dimensions': dimensions,
                        'Time': time_value,
                        'TimeUnit': 'MILLISECONDS',
                        'MeasureName': measure['MeasureName'],
                        'MeasureValue': measure['MeasureValue'],
                        'MeasureValueType': measure['MeasureValueType']
                    }

                prepared_records.append(prepared_record)

            # Write to Timestream using the `write_records` API
            response = timestream_client.write_records(
                DatabaseName=DATABASE_NAME,
                TableName=TABLE_NAME,
                Records=prepared_records
            )
            print(f"Batch write successful for records {i} to {i + len(records_batch) - 1}: {response}")

        return {
            "statusCode": 200,
            "body": json.dumps(f"Successfully ingested {len(records)} records into Timestream.")
        }
    
    except ClientError as e:
        print(f"Failed to write to Timestream: {e}")
        return {
            "statusCode": 500,
            "body": json.dumps(f"Error writing to Timestream: {str(e)}")
        }
'''

lambda_name = "TimestreamIoTLambda"

# Save the Lambda function code to a file
lambda_function_file = "lambda_function.py"
with open(lambda_function_file, 'w') as f:
    f.write(lambda_function_code)

# Create a deployment package (zip file)
lambda_zip = "lambda_function.zip"
with zipfile.ZipFile(lambda_zip, 'w') as zipf:
    zipf.write(lambda_function_file)

# Step 4: Deploy Lambda Function
lambda_name = "TimestreamIoTLambda"
try:
    with open(lambda_zip, 'rb') as f:
        lambda_client.create_function(
            FunctionName=lambda_name,
            Runtime='python3.12',
            Role=role_arn,
            Handler='lambda_function.lambda_handler',
            Architectures=['arm64'],
            Code={'ZipFile': f.read()},
            Environment={
                'Variables': {
                    'REGION_NAME': REGION_NAME,
                    'DATABASE_NAME': DATABASE_NAME,
                    'TABLE_NAME': TABLE_NAME
                }
            },
            Timeout=30,
            MemorySize=128
        )
    print(f"Lambda function {lambda_name} created successfully.")
except lambda_client.exceptions.ResourceConflictException:
    print(f"Lambda function {lambda_name} already exists.")

# Clean up the files
os.remove(lambda_function_file)
os.remove(lambda_zip)

# Step 4: Add a resource policy to allow invocation via the function URL

try:
    lambda_client.add_permission(
        FunctionName=lambda_name,
        StatementId='FunctionURLAllowInvoke',
        Action='lambda:InvokeFunctionUrl',
        Principal=role_arn,
        FunctionUrlAuthType='AWS_IAM'
    )
    print(f"Added resource policy to allow function URL invocation for {lambda_name}.")
except lambda_client.exceptions.ResourceConflictException:
    print(f"Resource policy for {lambda_name} already exists.")

# Create or get the Lambda Function URL
try:
    response = lambda_client.create_function_url_config(
        FunctionName=lambda_name,
        AuthType='AWS_IAM'
    )
    function_url = response['FunctionUrl']
    print(f"Lambda Function URL: {function_url}")
except lambda_client.exceptions.ResourceConflictException:
    # If the URL configuration already exists, retrieve it
    response = lambda_client.get_function_url_config(FunctionName=lambda_name)
    function_url = response['FunctionUrl']
    print(f"Lambda Function URL (existing): {function_url}")


IAM Role TimestreamLambdaRole already exists
Attached CloudWatch logs policy to role: TimestreamLambdaRole
Attached Timestream write policy to TimestreamLambdaRole
Lambda function TimestreamIoTLambda created successfully.
Added resource policy to allow function URL invocation for TimestreamIoTLambda.
Lambda Function URL (existing): https://stki3begraoqpcarqe7l4olplm0kpwjv.lambda-url.us-west-2.on.aws/


## Step 3: Send Data to the Lambda Function

The following code will send the generated sample data to the Lambda function's URL with SigV4 authenticated requests, ensuring requests do not exceed Lambda's limit of 6 MB.

In [37]:
import os
import json
import requests
import boto3
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest

MAX_REQUEST_SIZE = 6 * 1024 * 1024  # 6 MB in bytes

def send_data_to_lambda(data):
    """Sends generated data to the Lambda function in chunks."""
    service = 'lambda'
    region = REGION_NAME
    method = "POST"

    session = boto3.Session(region_name=region)

    # Calculate the size of the entire data payload
    data_payload = json.dumps({'records': data})
    total_size = len(data_payload.encode('utf-8'))

    # Check if the total size exceeds the maximum request size
    if total_size <= MAX_REQUEST_SIZE:
        send_request(session, method, data_payload)
    else:
        # Chunk the data if it's too large
        chunk_size = MAX_REQUEST_SIZE - len(b'{"records":[]}')  # Reserve space for the JSON structure
        chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)]
        for chunk in chunks:
            chunk_payload = json.dumps({'records': chunk})
            send_request(session, method, chunk_payload)

def send_request(session, method, payload):
    """Sends the request to the Lambda function."""
    request = AWSRequest(
        method=method,
        url=function_url,
        headers={'Content-Type': 'application/json'},
        data=payload
    )

    SigV4Auth(session.get_credentials(), 'lambda', REGION_NAME).add_auth(request)

    try:
        response = requests.request(method, function_url, headers=dict(request.headers), data=payload, timeout=30)
        response.raise_for_status()
        print(f'Response Status: {response.status_code}')
        print(f'Response Body: {response.content.decode("utf-8")}')
    except Exception as e:
        print(f'Error: {e}')

# Send sample data to the Lambda function
send_data_to_lambda(sample_data)


Error: 502 Server Error: Bad Gateway for url: https://stki3begraoqpcarqe7l4olplm0kpwjv.lambda-url.us-west-2.on.aws/


## Step 4: Configure Grafana

### Grafana Configuration Steps
1. Open Grafana.
2. Add a new data source with the name "timestream_sample_app_database".
3. Create a new dashboard using the provided `dashboard.json`.