## Retrive Data

The aim of this code is to using lambda function to retrive data from s3 bucket: "usgs-landsat" 

### Step 1: create event_list

In [20]:
import boto3
import json
import mysql.connector
import requests
import re
from datetime import datetime
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse

In [52]:
# longitutude and latitude range for east congo
paths = [174, 175, 176]
rows = [58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68]

events = [{"path": path, "row": row} for path in paths for row in rows]

# If you need to turn it into JSON format
json_data = json.dumps(data, indent=4)

In [7]:
# Number of Lambda invocations you plan to use
num_lambdas = 10

# Distribute URLs into batches
batch_size = len(events) // num_lambdas

batch_events = []
for i in range(num_lambdas):
    start_index = i * batch_size
    end_index = start_index + batch_size
    batch_events.append(events[start_index:end_index])

In [8]:
batch_events[1]

[{'path': 174, 'row': 61}, {'path': 174, 'row': 62}, {'path': 174, 'row': 63}]

### Step2: Deploy Lambda

#### step2.1: upload lambda+dependies zip

- upload dependencies

In [14]:
# another way
import boto3
from botocore.client import Config
from botocore import UNSIGNED

# Initialize the S3 client
s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))

# Define your bucket name
bucket_name = 'africamining'

# Define the file path and object key
file_path = 'deployment_lambda2.zip'  # Replace with your local file path
object_key = 'deployment_lambda2.zip'     # This will be the name of the file in S3

# Open the file and upload it
with open(file_path, 'rb') as data:
    s3.put_object(Bucket=bucket_name, Key=object_key, Body=data)

print(f'File {object_key} has been uploaded to bucket {bucket_name}')

File deployment_lambda2.zip has been uploaded to bucket africamining


- check if zip have been successfully uploaded to s3 bucket: africanmining

In [24]:
import boto3
from botocore import UNSIGNED
from botocore.client import Config

# Create an S3 client with unsigned requests
s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))

# Define the bucket name
bucket_name = 'africamining'

# List the objects in the bucket
response = s3.list_objects_v2(Bucket=bucket_name)

# Print the objects
for obj in response.get('Contents', []):
    print(obj['Key'])

Congo-local-roads.geojson
Congo-main-roads.geojson
Congo-villages.geojson
NDVI-sample-simple.csv
NDVI-sample-with-distances.csv
NDVI_loss_filtered_coordinates-{'path': 174, 'row': 58}.csv
NDVI_loss_filtered_coordinates-{'path': 174, 'row': 61}.csv
NDVI_loss_filtered_coordinates-{'path': 174, 'row': 67}.csv
NDVI_loss_filtered_coordinates-{'path': 175, 'row': 59}.csv
NDVI_loss_filtered_coordinates-{'path': 175, 'row': 62}.csv
NDVI_loss_filtered_coordinates-{'path': 175, 'row': 65}.csv
NDVI_loss_filtered_coordinates-{'path': 175, 'row': 68}.csv
NDVI_loss_filtered_coordinates-{'path': 176, 'row': 60}.csv
NDVI_loss_filtered_coordinates-{'path': 176, 'row': 63}.csv
cod_mines_curated_all_opendata_p_ipis.csv
complete_data.csv
complete_data_y0.csv
complete_data_y0_200k.csv
complete_data_y1.csv
complete_data_y1_radius5k.csv
deployment_lambda.zip
deployment_lambda1.zip
deployment_lambda2.zip
hotosm_cod_waterways_lines_shp.shp
jupyter/jovyan/.s3keep
jupyter/jovyan/Sample Mining.ipynb
jupyter/jovya

#### Step2.2: Initializing lambda

- create lambda function based on deployment_lambda.zip

In [19]:
import boto3
from botocore.config import Config

# Increase timeout settings
config = Config(
    read_timeout=900,  # Adjust as needed
    connect_timeout=900,  # Adjust as needed
    retries={'max_attempts': 3}
)

# Create Lambda client with increased timeout
aws_lambda = boto3.client('lambda', config=config)


# Define the S3 bucket and object key
bucket_name = 'africamining'
object_key = 'deployment_lambda2.zip'

# Role ARN for Lambda execution role
try:
    response = aws_lambda.create_function(
        FunctionName='q3',
        Runtime='python3.11',
        Role=role['Role']['Arn'],
        Handler='lambda_function.lambda_handler',
        Code={
            'S3Bucket': bucket_name,
            'S3Key': object_key
        },
        MemorySize = 10240,
        Timeout=600
    )
except aws_lambda.exceptions.ResourceConflictException:
    response = aws_lambda.update_function_code(
        FunctionName='q3',
        S3Bucket=bucket_name,
        S3Key=object_key
    )

lambda_arn = response['FunctionArn']
print(lambda_arn)

arn:aws:lambda:us-east-1:928332558160:function:q3


- test lambda using an example batch event 

In [20]:
event = batch_events[0]
r = aws_lambda.invoke(FunctionName='q3',
                      InvocationType='RequestResponse',
                      Payload=json.dumps(event))
json.loads(r['Payload'].read())


- see if lambda fucntion running result(NDVI-loss-filtered-...,csv) have been uploaded to s3 bucket as expected

In [16]:
# Create an S3 client with unsigned requests
s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))

# Define the bucket name
bucket_name = 'africamining'

# List the objects in the bucket
response = s3.list_objects_v2(Bucket=bucket_name)

# Print the objects
for obj in response.get('Contents', []):
    print(obj['Key'])

Congo-local-roads.geojson
Congo-main-roads.geojson
Congo-villages.geojson
NDVI-sample-simple.csv
NDVI-sample-with-distances.csv
NDVI_loss_filtered_coordinates-{'path': 174, 'row': 58}.csv
NDVI_loss_filtered_coordinates-{'path': 174, 'row': 61}.csv
NDVI_loss_filtered_coordinates-{'path': 174, 'row': 67}.csv
NDVI_loss_filtered_coordinates-{'path': 175, 'row': 59}.csv
NDVI_loss_filtered_coordinates-{'path': 175, 'row': 62}.csv
NDVI_loss_filtered_coordinates-{'path': 175, 'row': 65}.csv
NDVI_loss_filtered_coordinates-{'path': 175, 'row': 68}.csv
NDVI_loss_filtered_coordinates-{'path': 176, 'row': 60}.csv
NDVI_loss_filtered_coordinates-{'path': 176, 'row': 63}.csv
cod_mines_curated_all_opendata_p_ipis.csv
complete_data.csv
complete_data_y0.csv
complete_data_y0_200k.csv
complete_data_y1.csv
complete_data_y1_radius5k.csv
deployment_lambda.zip
deployment_lambda1.zip
deployment_lambda2.zip
hotosm_cod_waterways_lines_shp.shp
jupyter/jovyan/.s3keep
jupyter/jovyan/Sample Mining.ipynb
jupyter/jovya

- define step function

In [42]:
def make_def(lambda_arn):
    definition = {
      "Comment": "Q2 State Machine",
      "StartAt": "Map",
      "States": {
        "Map": {
          "Type": "Map",
          "End": True,
          "MaxConcurrency": 10,
          "Iterator": {
            "StartAt": "Lambda Invoke",
            "States": {
              "Lambda Invoke": {
                "Type": "Task",
                "Resource": "arn:aws:states:::lambda:invoke",
                "OutputPath": "$.Payload",
                "Parameters": {
                  "Payload.$": "$",
                  "FunctionName": lambda_arn
                },
                "Retry": [
                  {
                    "ErrorEquals": [
                      "Lambda.ServiceException",
                      "Lambda.AWSLambdaException",
                      "Lambda.SdkClientException",
                      "Lambda.TooManyRequestsException",
                      "States.TaskFailed",
                      "Lambda.Unknown"                      
                    ],
                    "IntervalSeconds": 2,
                    "MaxAttempts": 6,
                    "BackoffRate": 2
                  }
                ],
                "TimeoutSeconds": 900,
                "End": True
              }
            }
          }
        }
      }
    }
    return definition

- deploy step function

In [43]:
if __name__ == '__main__':
    iam = boto3.client('iam')
    sfn = boto3.client('stepfunctions')
    
    # Increase timeout settings
    config = Config(
        read_timeout=900,  # Adjust as needed
        connect_timeout=900,  # Adjust as needed
        retries={'max_attempts': 3}
    )
    
    # Create Lambda client with increased timeout
    aws_lambda = boto3.client('lambda', config=config)

    role = iam.get_role(RoleName='LabRole')
    lambda_function_name = input("What is the name of the Lambda function you would like to invoke in your Step Functions Map State? (default: 'q2'): ")
    if len(lambda_function_name) == 0:
        lambda_function_name = 'q3'

    # Get Lambda Function ARN and Role ARN
    # Assumes Lambda function already exists
    lambda_arn = [f['FunctionArn']
                  for f in aws_lambda.list_functions()['Functions']
                  if f['FunctionName'] == lambda_function_name][0]
    
    # Throttle concurrent executions to 10
    response = aws_lambda.put_function_concurrency(
            FunctionName=lambda_function_name,
            ReservedConcurrentExecutions=10
        )

    sfn_function_name = input("What would you like to name your Step Function State Machine? (default: 'q3'): ")
    if len(sfn_function_name) == 0:
        sfn_function_name = 'q3'

    # Use Lambda ARN to create State Machine Definition
    sf_def = make_def(lambda_arn)

    # Create Step Function State Machine if it doesn't already exist
    try:
        response = sfn.create_state_machine(
            name=sfn_function_name,
            definition=json.dumps(sf_def),
            roleArn=role['Role']['Arn'],
            type='EXPRESS'
        )
    except sfn.exceptions.StateMachineAlreadyExists:
        response = sfn.list_state_machines()
        state_machine_arn = [sm['stateMachineArn'] 
                            for sm in response['stateMachines'] 
                            if sm['name'] == sfn_function_name][0]
        response = sfn.update_state_machine(
            stateMachineArn=state_machine_arn,
            definition=json.dumps(sf_def),
            roleArn=role['Role']['Arn']
        )

In [45]:
response = sfn.list_state_machines()
response

{'stateMachines': [{'stateMachineArn': 'arn:aws:states:us-east-1:928332558160:stateMachine:q3',
   'name': 'q3',
   'type': 'EXPRESS',
   'creationDate': datetime.datetime(2024, 5, 25, 3, 54, 49, 399000, tzinfo=tzlocal())}],
 'ResponseMetadata': {'RequestId': '5890ced0-007b-4f4c-b669-54a983cc0f91',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '5890ced0-007b-4f4c-b669-54a983cc0f91',
   'date': 'Sat, 25 May 2024 19:56:25 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '156',
   'connection': 'keep-alive'},
  'RetryAttempts': 0}}

In [46]:
state_machine_arn = [sm['stateMachineArn'] 
                    for sm in response['stateMachines'] 
                    if sm['name'] == sfn_function_name][0]   

In [50]:
state_machine_arn

'arn:aws:states:us-east-1:928332558160:stateMachine:q3'

In [47]:
data1 = [batch_events[i] for i in range(10)]

- send all batch_events to lambda workers

In [51]:
data1 = [batch_events[i] for i in range(10)]

sfn_response = sfn.start_sync_execution(
    stateMachineArn=state_machine_arn,
    name='sync_test',
    input=json.dumps(data1)
)

print(sfn_response['output'])

KeyboardInterrupt: 