lambda

In [2]:
import boto3
import csv
from datetime import datetime, timedelta
from io import StringIO
import os

dynamodb = boto3.resource("dynamodb")
s3 = boto3.client("s3")


def lambda_handler(event, context):
    # Read from DynamoDB
    table_name = os.environ["BIKE_TABLE_NAME"]
    table = dynamodb.Table(table_name)

    # Calculate timestamp for two weeks ago
    two_weeks_ago = datetime.now() - timedelta(days=14)  # "from today and 14 days back"

    # kan testa att ta bort hh:mm:ss

    # filtering for records where timestamp is greater than the calculated 'two_weeks_ago'
    response = table.scan(
        FilterExpression="#ts > :two_weeks_ago",
        ExpressionAttributeNames={"#ts": "timestamp"},
        ExpressionAttributeValues={":two_weeks_ago": two_weeks_ago.isoformat()},
    )
    # checks if there are any items in the response
    items = response["Items"] if "Items" in response and response["Items"] else []

    # if no items are found
    if not items:
        return {"message": "No items found in DynamoDB for the last two weeks."}

    # Convert to CSV format
    csv_file = StringIO()  # behaves like a file but data is stored in memory
    csv_writer = csv.DictWriter(
        csv_file, fieldnames=items[0].keys()
    )  # assuming all items have the same keys
    csv_writer.writeheader()  # writes the header (column names)
    for item in items:  # loops through the items
        csv_writer.writerow(item)  # writes a row for each loop
    csv_data = csv_file.getvalue()
    csv_file.close()

    # Write to S3
    bucket_name = os.environ["S3_BUCKET_NAME"]
    timestamp_str = datetime.now().strftime("%Y%m%d%H%M%S")  #
    s3_key = f"testdata_{timestamp_str}.csv"
    s3.put_object(Bucket=bucket_name, Key=s3_key, Body=csv_data)

    return {
        "message": f"Data from the last two weeks saved to s3://{bucket_name}/{s3_key}"
    }


In [3]:
import aws_cdk
import boto3
from botocore.exceptions import ClientError
from aws_cdk import (
    aws_dynamodb,
    aws_lambda,
    aws_iam,
    Duration,
    aws_events,
    aws_events_targets as targets,
    aws_s3 as s3,
)


# VIKTIGT: OBJEKT BLIR INSTATIERADE I BÖRJAN AV KODEN
# DET HÄR FÅR DET ATT SE UT SOM ATT EN VARIABEL BLIR KALLAD FÖRE DEN ÄR SKAPAD
# Funktioner i classer kallar för metoder

from constructs import Construct
import os


# Class definition and inheritance
class DataFetchAndSave(Construct):  # inherits from Construct
    def __init__(  # initialize the object and expect the following parameters
        self, scope: Construct, id_: str, stage_name: str, service_config: dict
    ) -> None:
        super().__init__(
            scope, id_
        )  # Calls the parent class' constructor to initialize it

        # extracts the service name from the configuration
        service_short_name = service_config["service"]["service_short_name"]
        service_name = service_short_name

        # Formats the lambda name and role name
        lambda_name = f"{stage_name}-{service_short_name}-data-fetch-and-save"
        lambda_role_name = f"{lambda_name}-role"
        # "lambda_role" calls to get the role object for the lambda
        lambda_role = self._build_lambda_role(lambda_role_name)

        # environment variables for the lambda
        # configure the enviroment differently based on the stage or service
        # this is done by passing in the stage name and service name, which will change based on if you're deploying to dev or prod
        env_vars = self._create_env_vars(
            stage_name=stage_name,
            service_name=service_name,
            service_short_name=service_short_name,
        )

        bucket_name = env_vars["S3_BUCKET_NAME"]
        unique_id = f"BucketConstruct-{bucket_name}"

        s3.Bucket(self, id=unique_id, bucket_name=bucket_name)

        # builds lambda function - passes in the necessary configuration
        self.data_fetch_and_save_lambda = self._build_lambda(
            lambda_name=lambda_name,
            stage_name=stage_name,
            env_vars=env_vars,
            lambda_role=lambda_role,
        )

        # CloudWatch Event to trigger the lambda function every two weeks
        rule = aws_events.Rule(
            self, "Rule", schedule=aws_events.Schedule.rate(Duration.days(14))
        )
        rule.add_target(targets.LambdaFunction(self.data_fetch_and_save_lambda))

        # Adding permissions to access the DynamoDB table
        lambda_role.add_to_policy(
            aws_iam.PolicyStatement(
                actions=["dynamodb:Scan", "dynamodb:GetItem"],
                resources=[
                    "arn:aws:dynamodb:eu-north-1:796717305864:table/Cyrille-dscrap-bike-data-table"
                ],  # DynamoDB table ARN
            )
        )

        # S3 permissions here
        lambda_role.add_to_policy(
            aws_iam.PolicyStatement(
                actions=["s3:PutObject"],
                resources=[f"arn:aws:s3:::{bucket_name}"],  # S3 bucket ARN
            )
        )

    # def _create_s3_bucket(self, bucket_name: str) -> (s3.Bucket, str):
    #     unique_id = f"BucketConstruct-{bucket_name}"
    #     bucket = s3.Bucket(
    #         self,
    #         unique_id,
    #         bucket_name=bucket_name,
    #     )
    #     return bucket

    # creates an AWS Lambda function
    def _build_lambda(
        self,
        lambda_name: str,  # name of the lambda
        stage_name: str,  # Sets stage name, so i we know if its dev or prod etc
        env_vars: dict,  # environment variables
        lambda_role: aws_iam.Role,  # Role - for permissions
        cwd: str = os.getcwd(),  # current working directory
    ):
        lambda_function = aws_lambda.Function(
            self,
            lambda_name,  # name that has logicalID in template.json
            function_name=lambda_name,
            runtime=aws_lambda.Runtime.PYTHON_3_10,  # runtime setting - python 3.10
            environment=env_vars,
            code=aws_lambda.Code.from_asset(  # where the code for lambda function exists
                os.path.join(cwd, "bike_data_scraper/handlers")
            ),
            handler="data_fetch_and_save_lambda.lambda_handler",
            tracing=aws_lambda.Tracing.ACTIVE,
            retry_attempts=2,
            timeout=Duration.seconds(80),  # time before lambda times out
            memory_size=128,  # default memory size
            role=lambda_role,
        )
        return lambda_function

    def _build_lambda_role(self, role_name: str) -> aws_iam.Role:
        lambda_role = aws_iam.Role(
            self,
            role_name,
            assumed_by=aws_iam.ServicePrincipal("lambda.amazonaws.com"),
            managed_policies=[
                aws_iam.ManagedPolicy.from_aws_managed_policy_name(
                    managed_policy_name="service-role/AWSLambdaBasicExecutionRole"
                )
            ],
        )
        return lambda_role

    # environment variables for the lambda
    def _create_env_vars(
        self, stage_name: str, service_name: str, service_short_name: str
    ) -> dict:
        return {
            "STAGE_NAME": stage_name,
            "S3_BUCKET_NAME": f"{stage_name}-{service_short_name}-processed-bike-data",
            "BIKE_TABLE_NAME": "Cyrille-dscrap-bike-data-table",
            "LOG_LEVEL": "DEBUG",
            "SERVICE_NAME": service_name,
        }


ModuleNotFoundError: No module named 'aws_cdk'

In [None]:
from aws_cdk import Stack
from constructs import Construct

from infrastructure.constructs.data_scraper import DataScraper
from infrastructure.constructs.data_fetch_and_save import DataFetchAndSave


class DataScraperStack(Stack):
    def __init__(
        self,
        scope: Construct,
        construct_id: str,
        stage_name: str,
        service_config: dict,
        **kwargs,
    ) -> None:
        super().__init__(scope, construct_id, **kwargs)

        DataScraper(
            self,
            f"{stage_name}-DataScraper",
            stage_name=stage_name,
            service_config=service_config,
        )

        DataFetchAndSave(
            self,
            f"{stage_name}-DataFetchAndSave",
            stage_name=stage_name,
            service_config=service_config,
        )


senaste versionen av lambda funktionen

In [None]:
import boto3
import csv
from datetime import datetime, timedelta
from io import StringIO
import os

dynamodb = boto3.resource("dynamodb")
s3 = boto3.client("s3")


def lambda_handler(event, context):
    # Read from DynamoDB
    table_name = os.environ["BIKE_TABLE_NAME"]
    table = dynamodb.Table(table_name)

    # Calculate timestamp for two weeks ago
    two_weeks_ago = datetime.now() - timedelta(days=14)  # "from today and 14 days back"

    # kan testa att ta bort hh:mm:ss

    # filtering for records where timestamp is greater than the calculated 'two_weeks_ago'
    response = table.scan(
        FilterExpression="#ts > :two_weeks_ago",
        ExpressionAttributeNames={"#ts": "timestamp"},
        ExpressionAttributeValues={":two_weeks_ago": two_weeks_ago.isoformat()},
    )
    # checks if there are any items in the response
    items = response["Items"] if "Items" in response and response["Items"] else []

    # if no items are found
    if not items:
        return {"message": "No items found in DynamoDB for the last two weeks."}

    # Convert to CSV format
    csv_file = StringIO()  # behaves like a file but data is stored in memory
    csv_writer = csv.DictWriter(
        csv_file, fieldnames=items[0].keys()
    )  # assuming all items have the same keys
    csv_writer.writeheader()  # writes the header (column names)
    for item in items:  # loops through the items
        csv_writer.writerow(item)  # writes a row for each loop
    csv_data = csv_file.getvalue()
    csv_file.close()

    # Write to S3
    bucket_name = os.environ["S3_BUCKET_NAME"]
    timestamp_str = datetime.now().strftime("%Y%m%d%H%M%S")  #
    s3_key = f"testdata_{timestamp_str}.csv"
    s3.put_object(Bucket=bucket_name, Key=s3_key, Body=csv_data)

    return {
        "message": f"Data from the last two weeks saved to s3://{bucket_name}/{s3_key}"
    }


In [None]:
 import aws_cdk
 from botocore.exceptions import ClientError
 from aws_cdk import (
     aws_dynamodb,
     aws_lambda,
     aws_iam,
     Duration,
     aws_events,
     aws_events_targets as targets,
     aws_s3 as s3,
 )
 from constructs import Construct
 import os
 
 
 class DataFetchAndSave(Construct):
     def __init__(
         self, scope: Construct, id_: str, stage_name: str, service_config: dict
     ) -> None:
         super().__init__(scope, id_)
 
         service_short_name = service_config["service"]["service_short_name"]
         service_name = service_short_name
 
         lambda_name = f"{stage_name}-{service_short_name}-data-fetch-and-save"
         lambda_role_name = f"{lambda_name}-role"
         lambda_role = self._build_lambda_role(lambda_role_name)
 
         env_vars = self._create_env_vars(
             stage_name=stage_name,
             service_name=service_name,
             service_short_name=service_short_name,
         )
         bucket_name = "bikedatalake"
 
         self.data_fetch_and_save_lambda = self._build_lambda(
             lambda_name=lambda_name,
	             stage_name=stage_name,
             env_vars=env_vars,
             lambda_role=lambda_role,
         )
 
         rule = aws_events.Rule(
             self, "Rule", schedule=aws_events.Schedule.rate(Duration.days(14))
         )
         rule.add_target(targets.LambdaFunction(self.data_fetch_and_save_lambda))
 
         lambda_role.add_to_policy(
             aws_iam.PolicyStatement(
                 actions=["dynamodb:Scan", "dynamodb:GetItem"],
                 resources=[
                     "arn:aws:dynamodb:eu-north-1:796717305864:table/Cyrille-dscrap-bike-data-table"
                 ],
             )
         )
 
         lambda_role.add_to_policy(
             aws_iam.PolicyStatement(
                 actions=["s3:PutObject"], resources=[f"arn:aws:s3:::{bucket_name}"]
             )
         )
 
     def _build_lambda(
         self,
         lambda_name: str,
         stage_name: str,
         env_vars: dict,
         lambda_role: aws_iam.Role,
         cwd: str = os.getcwd(),
     ):
         lambda_function = aws_lambda.Function(
             self,
             lambda_name,
             function_name=lambda_name,
             runtime=aws_lambda.Runtime.PYTHON_3_10,
             environment=env_vars,
             code=aws_lambda.Code.from_asset(
                 os.path.join(cwd, "bike_data_scraper/handlers")
             ),
             handler="data_fetch_and_save_lambda.lambda_handler",
             tracing=aws_lambda.Tracing.ACTIVE,
             retry_attempts=2,
             timeout=Duration.seconds(80),
             memory_size=128,
             role=lambda_role,
         )
         return lambda_function
 
     def _build_lambda_role(self, role_name: str) -> aws_iam.Role:
         lambda_role = aws_iam.Role(
             self,
             role_name,
             assumed_by=aws_iam.ServicePrincipal("lambda.amazonaws.com"),
             managed_policies=[
                 aws_iam.ManagedPolicy.from_aws_managed_policy_name(
                     "service-role/AWSLambdaBasicExecutionRole"
                 )
             ],
         )
         return lambda_role
 
     def _create_env_vars(
         self, stage_name: str, service_name: str, service_short_name: str
     ) -> dict:
         return {
             "STAGE_NAME": stage_name,
             "S3_BUCKET_NAME": "bikedatalake",
             "BIKE_TABLE_NAME": f"{stage_name}-{service_short_name}-bike-data-table",
             "LOG_LEVEL": "DEBUG",
             "SERVICE_NAME": service_name,
         }

In [None]:
 from constructs import Construct
  
 from infrastructure.constructs.data_scraper import DataScraper
 from infrastructure.constructs.data_fetch_and_save import DataFetchAndSave
  
  
 class DataScraperStack(Stack):
             stage_name=stage_name,
             service_config=service_config,
         )
 
         DataFetchAndSave(
             self,
             f"{stage_name}-DataFetchAndSave",
	             stage_name=stage_name,
             service_config=service_config,
         )

In [None]:
from datetime import datetime, timedelta
two_weeks_ago = datetime.now() - timedelta(days=14)
print(two_weeks_ago)
print(datetime.now().strftime("%Y%m%d%H%M%S"))

2023-10-02 17:22:13.219039
20231016172213


In [None]:
two_weeks_ago.isoformat()

'2023-10-02T09:02:34.386642'

In [5]:
datetime.today()

datetime.datetime(2023, 10, 17, 11, 12, 4, 299775)

In [13]:
import datetime as dt



today = datetime.today()
start_date = today - dt.timedelta(days=14)
end_date = today
    
f"weather-data-{today}.csv"

'weather-data-2023-10-17 11:14:14.049364.csv'

In [17]:
one_day_ago = datetime.now() - dt.timedelta(days=1)
one_day_ago_date = one_day_ago.date()
two_weeks_ago = one_day_ago_date - dt.timedelta(days=14)

one_day_ago_date.isoformat()

'2023-10-16'