<a href="https://colab.research.google.com/github/AlessandraMayumi/python-generator/blob/main/python_generator_aws.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction

When developing a solution, start with the most simple thinking, nice controlled scenarios then increment the implementation aganist corner cases. Even being creative, it's more common than not that the production usage is even more creative, stressing resources and requiring new ideas.

I want to share how to develop a implementation from the high level description of a problem to the solution increasing complexity as the new cases happen.

## Problem
Generate a response containing how many empty cells for each column in a csv, the file is stored in a s3 bucket. This process have to be in a aws lambda.

# Hands on


## Create a project
I started this python study project in *GitHub* repository https://github.com/AlessandraMayumi/python-generator and to save this Colab notebook.

## Reading a csv file with pandas
Before thinking about aws resources, let's try to retrieve the result from a local csv file as expected in the problem description.

#### References
- w3schools: https://www.w3schools.com/python/pandas/pandas_csv.asp



In [None]:
import pandas as pd

df = pd.read_csv('/content/sample_data/california_housing_test.csv')
print(df.head())

   longitude  latitude  housing_median_age  total_rooms  total_bedrooms  \
0    -122.05     37.37                27.0       3885.0           661.0   
1    -118.30     34.26                43.0       1510.0           310.0   
2    -117.81     33.78                27.0       3589.0           507.0   
3    -118.36     33.82                28.0         67.0            15.0   
4    -119.67     36.33                19.0       1241.0           244.0   

   population  households  median_income  median_house_value  
0      1537.0       606.0         6.6085            344700.0  
1       809.0       277.0         3.5990            176500.0  
2      1484.0       495.0         5.7934            270500.0  
3        49.0        11.0         6.1359            330000.0  
4       850.0       237.0         2.9375             81700.0  


## Prepare a test csv file
Occasionally generating the test environment can take more time then implementing the initial ideas for the solution.

In this case, I used a website that generates csv file, but what I wanted to validate was empty cells, so created a script do clear some values.

#### References
- python open - https://www.pythontutorial.net/python-basics/python-read-text-file/
- csv generator: https://extendsclass.com/csv-generator.html

In [None]:
import pandas as pd
import random

""" The test csv file should have some empty cells """

filename_test = 'mock_empty_register.csv'
filepath_test = f'/content/sample_data/{filename_test}'
filepath_original = '/content/sample_data/mock_register.csv'

def modify_empty_cells():
  df = pd.read_csv(filepath_original)

  num_row, num_col = df.shape

  for i in range(100):
    x = random.randrange(num_row)
    y = random.randrange(1, num_col)
    df.loc[x, df.columns[y]] = None

  df.to_csv(filepath_test)

  print(f'Test csv file generated: {filename_test}')


modify_empty_cells()


Test csv file generated: mock_empty_register.csv


# Working on the solution
After the test file is generated, let's work on the solution.

## Count empty cells
Pandas already have a easy way to do it.

## Convert dataframe into dict

In [None]:
df_test = pd.read_csv(filepath_test)
df_test.isna().sum().to_dict()

Sucessfully count empty cells for each column: {'firstname': 22, 'lastname': 18, 'email': 21, 'email2': 22, 'profession': 17}


## Create a aws lambda
Create a aws lambda function trigged by S3 bucket, so everytime a file is uploaded, the lambda would run.

#### References
- aws lambda: https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html

In [None]:
import json

def lambda_handler(event, context):
    print(f'Lambda event: {event}')

    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }


### Add a Test
Instead of adding and removing the same csv file, print the lambda event in order to learn how the test can mock a new file in S3 bucket

Event Json is something like this:

```json
{
  "Records": [
    {
      "eventVersion": "2.1",
      "eventSource": "aws:s3",
      "awsRegion": "us-east-1",
      "eventTime": "2023-07-02T19:25:52.815Z",
      "eventName": "ObjectCreated:Put",
      "userIdentity": {
        "principalId": "A25Y..."
      },
      "requestParameters": {
        "sourceIPAddress": "187.106.35.106"
      },
      "responseElements": {
        "x-amz-request-id": "XKK...",
        "x-amz-id-2": "PAT..."
      },
      "s3": {
        "s3SchemaVersion": "1.0",
        "configurationId": "7331....",
        "bucket": {
          "name": "my-bucket",
          "ownerIdentity": {
            "principalId": "A25..."
          },
          "arn": "arn:aws:s3:::my-bucket"
        },
        "object": {
          "key": "mock_empty_register.csv",
          "size": 81685,
          "eTag": "6fc5f1...",
          "sequencer": "0064..."
        }
      }
    }
  ]
}
```

## Add permission to read S3 object

Before accessing the S3 file, is necessary to configure the lambda to permit the `get object` operation.

Navigate through: *Monitor > Permissions*.
Then click on the lambda role and edit.
In the role page, there is a Permissions policies section, add the permission: **AmazonS3ReadOnlyAccess**


## Get object from S3 bucket and read as csv

Boto3 is the Amazon Web Services (AWS) Software Development Kit (SDK) for Python. To retrieve objects from S3 it is necessary to parse the handler event which contains the bucket name and object key. The response type for body is StreamingBody that requires a conversion for pandas be able to read it.

#### References
- aws sdk boto3 for python: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/get_object.html

In [None]:
import boto3
import io
import pandas as pd

def lambda_handler(event, context):
    print(f'Lambda event: {event}')

    bucket = event['Records'][0]['s3']['bucket']['name']
    object_key = event['Records'][0]['s3']['object']['key']

    client = boto3.client('s3')
    obj = client.get_object(Bucket=bucket, Key=object_key)
    body_bytes = obj['Body'].read()
    body_io = io.BytesIO(body_bytes)

    df = pd.read_csv(body_io)
    empty_count = df.isna().sum().to_json()
    print(f'Sucessfully count empty cells for each column: {empty_count}')

    return {
        'statusCode': 200,
        'body': [empty_count]
    }

## Big files aganist lambda memory

AWS Lambdas standard memory is *128 MB*, so if the csv file is 100Mb the lambda cannot read the file entirely and then evaluete it.

#### References
- aws lambda memory: https://docs.aws.amazon.com/lambda/latest/operatorguide/computing-power.html

### Create file tests using a script
Instead of using sites to generate new files, let's create a script in order to have more control over test files.

In [None]:
""" CSV FILE GENERATOR
To generate test csv files within empty cells, call python functions

generate_csv(lines)
modify_empty_cells()
"""
import uuid
import random
import csv
import pandas as pd

FILENAME = '../data/mock_big_register.csv'


def _add_headers():
    headers = "id,firstname,lastname,email,email2,phone,profession\n"
    empty_file = open(FILENAME, 'w')
    empty_file.write(headers)
    empty_file.close()


def _get_occupations():
    occupations = []
    with open('occupations.csv') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            occupations.append(row['Occupations'])
    return occupations


def _generate_line(professions):
    name = f'name{random.randrange(1000)}'
    surname = f'surname{random.randrange(1000)}'
    email = f'{name}@email.com'
    email2 = f'{name}@email2.com'
    phone = random.randrange(100000, 999999)
    profession = professions[random.randrange(len(professions))]
    return f'{uuid.uuid4()}, {name}, {surname}, {email}, {email2}, {phone}, {profession}\n'


def generate_csv(lines):
    """
    Generate a test csv file specifying how many lines
    :param lines: quantity of lines the csv should have
    """
    _add_headers()
    occupation_list = _get_occupations()

    with open(FILENAME, 'a') as f:
        for i in range(lines):
            line = _generate_line(occupation_list)
            f.write(line)

    print('Successfully generated')


def modify_empty_cells():
    df = pd.read_csv(FILENAME)

    num_row, num_col = df.shape

    for i in range(100):
        x = random.randrange(num_row)
        y = random.randrange(2, num_col)
        df.loc[x, df.columns[y]] = None

    empty = df.isna().sum()
    print('Checking empty cells', empty)  # .to_csv()
    df.to_csv(FILENAME)

    print(f'Test csv file generated: {FILENAME}')


generate_csv(100000)
modify_empty_cells()


## Iterate lines instead of reading the entire file body

Sometimes a solution change completely due to resources limitations. In order to read a larger file using the same memory limitation, let's read each line and evaluate separately. The side effect is the ingrease of the duration.

In this case, the implementation started with dataframes only because is a simple ready to use operation. However, the iteration of lines does not enable a direct dataframe creation.

This new solution is parsing each line.

#### References
- boto3: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/get_object.html

In [None]:
def lambda_handler(event, context):
    bucket = event['Records'][0]['s3']['bucket']['name']
    object_key = event['Records'][0]['s3']['object']['key']

    client = boto3.client('s3')

    obj = client.get_object(Bucket=bucket, Key=object_key)
    generator = obj['Body'].iter_lines()

    headers = next(generator).decode('utf-8').split(',')
    # initialize dictionary
    empty_count = {}
    for h in headers:
        empty_count[h] = 0
    # count empty cells
    for line in generator:
        row = line.decode('utf-8').split(',')
        for idx, value in enumerate(row):
            if not value:
                empty_count[headers[idx]] += 1

    print(f'Successfully count empty cells for each column: {empty_count}')

    return {
        'statusCode': 200,
        'body': [empty_count]
    }

# Using dataframes in chunks

What if it was the case of a real difference using dataframes instead of evaluating each csv line.



In [None]:
def _get_csv_chunk(stream_body):
    columns = []
    pending = ''
    # increase chunk size to optimize duration while keeping low memory usage
    for chunk in stream_body.iter_chunks(chunk_size=1000000):
        decoded = pending + chunk.decode('utf-8')
        rows_list_raw = decoded.splitlines()
        # the splitlines function do not keep the newline character '\n',
        # so if the last caracter in the chunk is a '\n' this caracter is lost
        # and the next iteraction will merge two lines
        if decoded[-1] != '\n':
            pending = rows_list_raw[-1]
            rows_list = rows_list_raw[0:-1]
        else:
            pending = ''
            rows_list = rows_list_raw
        # to create each chunk dataframe, it needs column names
        # that are present only in the first chunk
        # so store a list of column names and use in each iteration
        if not columns:
            columns = rows_list[0].split(',')
            rows_list = rows_list[1::]
        yield [row.split(',') for row in rows_list], columns


def lambda_handler(event, context):
    bucket = event['Records'][0]['s3']['bucket']['name']
    object_key = event['Records'][0]['s3']['object']['key']

    client = boto3.client('s3')

    obj = client.get_object(Bucket=bucket, Key=object_key)
    stream_body = obj['Body']

    empty_count = None
    for rows, columns in _get_csv_chunk(stream_body):
        df = pd.DataFrame(rows, columns=columns).replace('', None)

        empty_count_chunk = df.isna().sum().to_dict()
        if not empty_count:
            empty_count = empty_count_chunk
        for key in empty_count_chunk:
            empty_count[key] += empty_count_chunk[key]

    print(f'Successfully count empty cells for each column: {empty_count}')

    return {
        'statusCode': 200,
        'body': [empty_count]
    }