In [115]:
import boto3
from botocore.exceptions import ClientError
from datetime import datetime
import re
import time



In [109]:
class Transaction:

    def __init__(self, src, dst, amount, signed=False, transaction_id=None):

        if transaction_id is None:
            self.transaction_id = str(hash(datetime.now().strftime("%d/%m/%Y %H:%M:%S")))
        else:
            self.transaction_id = transaction_id

        self.src = src
        self.dst = dst
        self.amount = amount
        self.signed = signed

    def __repr__(self):
        return f'transaction_id:{self.transaction_id} ' \
               f'src:{self.src}, ' \
               f'dst:{self.dst}, ' \
               f'amount:{self.amount}, ' \
               f'signed:{self.signed}'
    
    def serialize(self):
        return dict(
            transaction_id={
                'StringValue': f'{self.transaction_id}',
                'DataType': 'String'
            },   
            src={
                'StringValue': f'{self.src}',
                'DataType': 'String'
            },
            dst={
                'StringValue': f'{self.dst}',
                'DataType': 'String'
            },
            amount={
                'StringValue': f'{self.amount}',
                'DataType': 'Number'
            }, 
            signed={
                'StringValue': f'{int(self.signed)}',
                'DataType': 'Number'
            }
        )


def convert_transaction(item):
    amount = int(item['amount'])
    src = item['src']
    trasanction_id = item['transaction_id']
    dst = item['dst']
    signed = item['signed']
    return Transaction(src=src, dst=dst, amount=amount, signed=signed, transaction_id=trasanction_id)


def create_transaction(transaction, table_name='transactions'):
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table(table_name)
    return table.put_item(Item=transaction.__dict__)

def create_sqs(name):
    sqs = boto3.resource('sqs')
    return sqs.create_queue(QueueName=name)


def create_nodes_db(table_name):
    dynamodb = boto3.resource('dynamodb')
    try:
        table = dynamodb.create_table(
                    TableName=table_name,
                    KeySchema=[{'AttributeName':k[0],
                                'KeyType':k[1]} for k in KEYS],

                    AttributeDefinitions=[{'AttributeName':k[0],
                                           'AttributeType':k[2]} for k in KEYS],

                    ProvisionedThroughput={
                        'ReadCapacityUnits': RW_CAPACITY,
                        'WriteCapacityUnits': RW_CAPACITY})


        table.meta.client.get_waiter('table_exists').wait(TableName=TABLE_NAME)
        return table
    except dynamodb.meta.client.exceptions.ResourceInUseException:
        print(f'{TABLE_NAME} DB Already exists')
        return dynamodb.Table(TABLE_NAME)
     
def create_bucket(bucket_name, region=None):
    try:
        if region is None:
            s3_client = boto3.client('s3')
            s3_client.create_bucket(Bucket=bucket_name)
        else:
            s3_client = boto3.client('s3', region_name=region)
            location = {'LocationConstraint': region}
            s3_client.create_bucket(Bucket=bucket_name,
                                    CreateBucketConfiguration=location)
    except ClientError as e:
        logging.error(e)
        return False
    return True

# Create Queue,Table and Bucket

In [110]:
Q_NAME = 'transactions'

KEYS = [('transaction_id','HASH','S')] # (COL_NAME, KEY_TYPE, ATTR_TYPE)
TABLE_NAME = 'transactions'
RW_CAPACITY = 10
REGION = 'eu-central-1'
BUCKET_NAME = 'dsblocks'

In [111]:
ledger = create_sqs(Q_NAME)
table = create_nodes_db(TABLE_NAME)
create_bucket(BUCKET_NAME,REGION)

transactions DB Already exists


True

# Sending messages

In [117]:
transactions = [Transaction('Ricky1', 'Ricky2', 30,transaction_id='2', signed=True),
                Transaction('Ricky5', 'Ricky2', 30,transaction_id='1', signed=True),
                Transaction('Ricky4', 'Ricky2', 30,transaction_id='3', signed=True),
                Transaction('Ricky9', 'Ricky2', 30,transaction_id='4', signed=True),
                Transaction('Ricky6', 'Ricky2', 30,transaction_id='5', signed=True),
                Transaction('Ricky1', 'Ricky2', 30,transaction_id='6', signed=True),
                Transaction('Ricky1', 'Ricky7', 30,transaction_id='7', signed=True), 
                Transaction('Ricky1', 'Ricky9', 30,transaction_id='8', signed=True),
                Transaction('Ricky1', 'Ricky9', 30,transaction_id='10', signed=True),
                Transaction('Ricky1', 'Ricky9', 30,transaction_id='11', signed=True),
                Transaction('Ricky1', 'Ricky9', 30,transaction_id='9', signed=True)]

In [122]:
sqs = boto3.resource('sqs')
queue = sqs.get_queue_by_name(QueueName=Q_NAME)

for id,tr in enumerate(transactions):
    create_transaction(tr)
    response = queue.send_message(MessageBody = f'transaction{id+1}',
                                  MessageAttributes = tr.serialize())


# Get Messages and write attributes to file
Cron function

In [123]:
TRANSACTION_LIMIT = 20

def transactions_to_file(block):
    file_name = datetime.now().strftime("%m/%d/%Y_%H:%M:%S")
    file_name = f"{re.sub(r'[,/:]','_', file_name)}.txt"
    tids = []
    with open(file_name,"w") as f:
        for b in block:
            tid= b['transaction_id']['StringValue']
            f.write(f"{tid}, "\
                      f"src: {b['src']['StringValue']}, "\
                      f"dst: {b['dst']['StringValue']}, "\
                      f"amount: {b['amount']['StringValue']}\n")
            tids.append(tid)
    return file_name, tids

def upload_block(file_name, object_name=None):
    if object_name is None:
        object_name = file_name

    # Upload the file
    s3_client = boto3.client('s3')
    try:
        response = s3_client.upload_file(file_name, BUCKET_NAME, object_name)
    except ClientError as e:
        print(e)
        return False
    return True

def delete_transactions(tids):
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('transactions')
    for tid in tids:
        table.delete_item(Key={'transaction_id': tid})

def add_block():
    sqs = boto3.resource('sqs')
    queue = sqs.get_queue_by_name(QueueName=Q_NAME)

    response = [0]
    message_count = 0
    block = []

    while len(response) > 0 and message_count < TRANSACTION_LIMIT:
        response = queue.receive_messages(MessageAttributeNames=['All'])
        for message in response:

            if message.message_attributes is not None:
                block.append(message.message_attributes)

            message.delete()
            message_count+=1

    if message_count > 0:
        block_name, tids = transactions_to_file(block)
        upload_block(block_name)
        delete_transactions(tids)
        print(f"{message_count} TRANSACTIONS SIGNED TO BLOCK")
    else:
        print("NO TRANSACTIONS")
            
add_block()

20 TRANSACTIONS SIGNED TO BLOCK
