## Dynamodb Batch Operations : Fast Mode

Let us understand how we can take care of batch inserts into Dynamodb table using batch writer.
* We can use `batch_writer` to load the data to dynamodb table in batches.
* It can be used for deletes as well.

### 1. Read data from Github

In [1]:
import requests

In [2]:
import json

In [3]:
def list_repos(token, since='758759529'):
    res = requests.get(
        f'https://api.github.com/repositories?since={since}',
        headers={'Authorization': f'token {token}'}
    )
    return json.loads(res.content.decode('utf-8'))

In [4]:
def get_repo_details(owner, name, token):
    repo_details = json.loads(requests.get(
        f'https://api.github.com/repos/{owner}/{name}',
        headers={'Authorization': f'token {token}'}
    ).content.decode('utf-8'))
    return repo_details

In [5]:
def extract_repo_fields(repo_details):
    repo_fields = {
        'id': repo_details['id'],
        'node_id': repo_details['node_id'],
        'name': repo_details['name'],
        'full_name': repo_details['full_name'],
        'owner': {
            'login': repo_details['owner']['login'],
            'id': repo_details['owner']['id'],
            'node_id': repo_details['owner']['node_id'],
            'type': repo_details['owner']['type'],
            'site_admin': repo_details['owner']['site_admin']
        },
        'html_url': repo_details['html_url'],
        'description': repo_details['description'],
        'fork': repo_details['fork'],
        'created_at': repo_details['created_at']
    }
    return repo_fields

In [6]:
def get_repos(repos, token):
    repos_details = []
    for repo in repos:
        try:
            owner = repo['owner']['login']
            name = repo['name']
            repo_details = get_repo_details(owner, name, token)
            repo_fields = extract_repo_fields(repo_details)
            repos_details.append(repo_fields)
        except:
            pass
    return repos_details

In [7]:
repos = list_repos('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')

In [8]:
repos_details = get_repos(repos, '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')  # This var contains all the info we need to write to DynamoDB

### 2. Populate exsiting DynamoDB table

In [9]:
import boto3

In [10]:
import os

In [11]:
os.environ.setdefault('AWS_PROFILE', 'itvgithub')

'itvgithub'

In [12]:
#os.environ.setdefault('AWS_DEFAULT_REGION', 'us-east-1')

In [13]:
dynamodb = boto3.resource('dynamodb', region_name="ap-southeast-1")

In [14]:
ghrepos_table = dynamodb.Table('ghrepos')

#### 2.1 Delete old data in table

In [15]:
ghrepos_table.delete_item?

[0;31mSignature:[0m [0mghrepos_table[0m[0;34m.[0m[0mdelete_item[0m[0;34m([0m[0;34m*[0m[0margs[0m[0;34m,[0m [0;34m**[0m[0mkwargs[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Deletes a single item in a table by primary key. You can perform a conditional delete operation that deletes the item if it exists, or if it has an expected attribute value.

 

In addition to deleting an item, you can also return the item's attribute values in the same operation, using the ``ReturnValues`` parameter.

 

Unless you specify conditions, the ``DeleteItem`` is an idempotent operation; running it multiple times on the same item or attribute does *not* result in an error response.

 

Conditional deletes are useful for deleting items only if specific conditions are met. If those conditions are met, DynamoDB performs the delete. Otherwise, the item is not deleted.



See also: `AWS API Documentation <https://docs.aws.amazon.com/goto/WebAPI/dynamodb-2012-08-10/DeleteIt

In [16]:
%%time

for repo in ghrepos_table.scan()['Items']:
    print(f'Deleting entry with repo id {repo["id"]}')
    ghrepos_table.delete_item(Key={'id': repo['id']})

Deleting entry with repo id 758759772
Deleting entry with repo id 758759577
Deleting entry with repo id 758759663
Deleting entry with repo id 758759606
Deleting entry with repo id 758759670
Deleting entry with repo id 758759738
Deleting entry with repo id 758759674
Deleting entry with repo id 758759755
Deleting entry with repo id 758759718
Deleting entry with repo id 758759782
Deleting entry with repo id 758759777
Deleting entry with repo id 758759662
Deleting entry with repo id 758759599
Deleting entry with repo id 758759724
Deleting entry with repo id 758759783
Deleting entry with repo id 758759720
Deleting entry with repo id 758759667
Deleting entry with repo id 758759763
Deleting entry with repo id 758759566
Deleting entry with repo id 758759679
Deleting entry with repo id 758759634
Deleting entry with repo id 758759579
Deleting entry with repo id 758759656
Deleting entry with repo id 758759603
Deleting entry with repo id 758759770
Deleting entry with repo id 758759597
Deleting ent

#### 2.2 Batch write new data

In [17]:
batch_writer = ghrepos_table.batch_writer()

In [18]:
type(batch_writer)

boto3.dynamodb.table.BatchWriter

In [19]:
help(batch_writer.put_item)

Help on method put_item in module boto3.dynamodb.table:

put_item(Item) method of boto3.dynamodb.table.BatchWriter instance



In [20]:
batch_writer.put_item?

[0;31mSignature:[0m [0mbatch_writer[0m[0;34m.[0m[0mput_item[0m[0;34m([0m[0mItem[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m <no docstring>
[0;31mFile:[0m      ~/Projects/Internal/GenlogsS3/GenLogsS3-venv/lib/python3.8/site-packages/boto3/dynamodb/table.py
[0;31mType:[0m      method

In [21]:
def load_repos(repos_details, ghrepos_table, batch_size=50):
    with ghrepos_table.batch_writer() as batch:
    
        repos_count = len(repos_details)
        for i in range(0, repos_count, batch_size):   # range(start, stop, step), Here i = 0, 50
            print(f'Processing from {i} to {i+batch_size}')
            for repo in repos_details[i:i+batch_size]: # Here we insert one-by-one, but the batch will collect and write out in batch
                batch.put_item(Item=repo)  

In [22]:
list(range(0, 100, 50))

[0, 50]

In [23]:
%%time
load_repos(repos_details, ghrepos_table)

Processing from 0 to 50
Processing from 50 to 100
CPU times: user 89.8 ms, sys: 8.02 ms, total: 97.8 ms
Wall time: 371 ms


In [24]:
rs = ghrepos_table.scan()

In [25]:
len(rs['Items'])

98

In [26]:
rs['Items'][0]

{'created_at': '2024-02-17T02:22:41Z',
 'owner': {'site_admin': False,
  'id': Decimal('87507176'),
  'login': 'BrianHCordova',
  'type': 'User',
  'node_id': 'MDQ6VXNlcjg3NTA3MTc2'},
 'full_name': 'BrianHCordova/css-portfolio',
 'html_url': 'https://github.com/BrianHCordova/css-portfolio',
 'description': 'portfolio with advanced CSS',
 'id': Decimal('758759772'),
 'fork': False,
 'name': 'css-portfolio',
 'node_id': 'R_kgDOLTnBXA'}

#### 2.3 Batch delete new data

In [27]:
def delete_repos(repos_details, ghrepos_table, batch_size=50):
    with ghrepos_table.batch_writer() as batch:
    
        repos_count = len(repos_details)
        for i in range(0, repos_count, batch_size):
            print(f'Processing from {i} to {i+batch_size}')
            for repo in repos_details[i:i+batch_size]:
                key = {'id': repo['id']}
                batch.delete_item(Key=key)  

In [28]:
%%time
delete_repos(rs['Items'], ghrepos_table)

Processing from 0 to 50
Processing from 50 to 100
CPU times: user 37.2 ms, sys: 562 µs, total: 37.8 ms
Wall time: 182 ms


In [29]:
ghrepos_table.scan()

{'Items': [],
 'Count': 0,
 'ScannedCount': 0,
 'ResponseMetadata': {'RequestId': 'MB2N93BBA4MLFMFE2LBDV74MVNVV4KQNSO5AEMVJF66Q9ASUAAJG',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'server': 'Server',
   'date': 'Sat, 17 Feb 2024 14:34:39 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '39',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'MB2N93BBA4MLFMFE2LBDV74MVNVV4KQNSO5AEMVJF66Q9ASUAAJG',
   'x-amz-crc32': '3413411624'},
  'RetryAttempts': 0}}