In [1]:
import io
import urllib3
import boto3
from botocore.errorfactory import ClientError
from threading import Thread
import queue
import time

### TESTE 

In [8]:
class S3_dict:
    
    def __init__(self,bucket_name,region,access_key,secret_access_key):
         #Construtor responsável por aceder aos dados do bucket.Acesso a objetos públicos na ausêcia de keys é feito através de urllib3 em vez de boto3

        self.bucket_name=bucket_name
        self.region=region

        if access_key==None or secret_access_key==None:
            self.public=True

        else:
            self.public=False
            self.s3=boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_access_key)

    def get(self,key):
        #Método responsável por devolver um objeto do bucket associado a uma determinada key

        if self.__contains__(key)==False:
            raise Error("A chave não está associada a nenhum objeto.")

        else:
            return self.__getitem__(key)

    def put(self,key,value):
        #Método responsável por inserir um novo objeto no bucket associado a uma chave. É de tomar atenção que este método tem 2
        #argumentos: chave e conteudo associado à chave

        if not self.__contains__(key):
            self.__setitem__(key,value)
        else:
            raise Error("A chave dada já pertence ao bucket ou não existe.")

    def pop(self,key):
        #Remove um objeto associado a uma chave do bucket e retorna o próprio objeto x

        if self.__contains__(key):
            x=self.__getitem__(key)
            self.__delitem__(key)
            return x
        
        raise Error("A chave dada já foi removida do bucket ou não existe.")

    def __getitem__(self,key):
         #Acede ao acess point do bucket e obtém o objeto com uma determinada chave

        if self.public:
            http=urllib3.PoolManager() #This object handles all of the details of connection pooling and thread safety so that you don’t have to
            x=http.urllib3.request('GET','https://'+self.bucket_name+'.s3.'+self.region+'.amazonaws.com/'+key)
            
            if x.status==200: #an HTTP status code response of 200 represents the request was successful.
                return io.BytesIO(x.data) 
            #Binary I/O (also called buffered I/O) expects bytes-like objects and produces bytes objects. No encoding, decoding, or newline translation is performed. This category of streams can be used for all kinds of non-text data, and also when manual control over the handling of text data is desired.

            else:
                raise Error("Não foi possivel obter o objeto pedido.")

        return io.BytesIO(self.s3.get_object(Bucket=self.bucket_name,Key=key)['Body'].read())

    def __setitem__(self,key,value):
        #Acede ao acess point do bucket e adiciona um novo item com chave e conteudo correspondente. 

        #No caso de os objetos serem públicos nao temos acesso às chaves AWS, logo nao temos argumentos para chamar a função e retorna um erro
        if self.public:
            raise Error("Não foi possivel obter o objeto pedido.")
            
        #No caso de termos acesso às AWS-keys o acesso é feito com sucesso
        else:
            self.s3.put_object(Bucket=self.bucket_name,Key=key,Body=value.read())

    def __delitem__(self,key):
        #Acede ao acess point do bucket e retira um objeto acedendo apenas à chave. 

        #Mais uma vez, de modo a poder ser possivel chamar a funçao é necessário ter acesso às chaves AWS, logo se os objetos forem publicos nao temos acesso as chaves AWS e nao é possivel chamar a função
        if self.public:
            raise Error("Não foi possivel retirar o objeto pedido.")
        
        #No caso de termos acesso às AWS-keys é possível retirar o objeto do bucket e a sua informação
        else:
            self.s3.delete_object(Bucket=self.bucket_name,Key=key)

    def __contains__(self,key):
        #Acede ao bucket e verifica se um determinado objeto associado a uma chave está presente no bucket e retorna um valor booleano. Para este método não temos interesse aceder ao conteúdo do objeto mas apenas verificar a sua existência
        #para tal queremos apenas aceder à metadata do objeto
        if not self.public:
            try:
                self.s3.head_object(Bucket=self.bucket_name,Key=key)
            except ClientError:
                return False
            return True
        
        #The HEAD action retrieves metadata from an object without returning the object itself. This action is useful if you're only interested in an object's metadata. To use HEAD, you must have READ access to the object.
        
        if self.public:
            http=urllib3.PoolManager()
            x=http.urllib3.request('HEAD','https://'+self.bucket_name+'.s3.'+self.region+'-3.amazonaws.com/'+key)
            if x.status==200:
                return True

        return False
    
    def keys(self,prefix=''):
        # Faz yield de um determinado objeto associado a uma chave através de um prefixo
        
        # When the Python yield statement is hit, the program suspends function execution and returns the yielded value to the caller. (In contrast, return stops function execution completely.) 
        # When a function is suspended, the state of that function is saved. This includes any variable bindings local to the generator, the instruction pointer, the internal stack, and any exception handling.
        
        # Desta forma o yield permite guardar variáveis locais e o estado da função na sua memória e não cessa o seu funcionamento, contrário ao return.
        
        # O yield vai ser usado no caso de serem filtrados grandes quantidades de dados e o utilizador não querer perder a informação adquirida pelo método a cada execução.
        
        # When managing acess IAM acess keys the best method for listing user's acess keys is paginator method:
        # paginate(**kwargs) : Creates an iterator that will paginate through responses from IAM.Client.list_access_keys() with the following statement: paginator = client.get_paginator('list_access_keys')
        # Since the list of objects is limited to 1000 elements by default, the paginator is used to seemingly get trough this limitation
        # This method also has a lenght limitation of 128 characters of UserName, neste caso, prefixo
        
        paginator=self.s3.get_paginator("list_objects")

        kwargs={'Bucket':self.bucket_name,'Prefix':prefix} #kwargs são usados para aceder a objetos através de uma string (key word arguments)

        for page in paginator.paginate(**kwargs): #vou ter de procurar pelas chaves e conteudos com os nomes que estão apresentados na consola s3, 'Key' e 'Contents'
            contents=page["Contents"]

            for obje in contents:
                yield obje["Key"]
                
    def items(self,prefix=''):
        # Dá yield de chaves associadas a um determinado prefixo 

        paginator=self.s3.get_paginator("list_objects")

        kwargs={'Bucket':self.bucket_name,'Prefix':prefix}
        
        # Tentei utilizar um método de multiprocessing usando a class Pool mas não consegui implementar o código

        # Queues são utilizados para guardar valores de cada uma das threads por ordem
        que=queue.Queue()

        for page in paginator.paginate(**kwargs):
            contents=page["Contents"]
            
            # Thread library doesnt allow acess to thread return value
            # Tentei usar target=exec mas não consegui meter a funcionar
            # O único método que consegui usar para aceder aos valores da thread foi usando a função lambda, que funciona como um gerador e permite definir diretamente o argumento da função

            for i,obje in enumerate(contents):        
                
                if i != len(contents):
                    thread = Thread(target=lambda a, arg: a.put(self.__getitem__(arg)), args=(que, contents[i]["Key"])) # Valores a guardar na thread são os conteudos de uma chave e a sua posição na página
                    thread.start()                     #começar thread
                    thread.join()                      #esperar que a thread acabe
                    tp= (obje["Key"],que.get())        #criar tuplo que guarde uma chave e o respetivo conteudo por ordem na queue
                yield tp
                

In [9]:
# TESTS

# init
bucket = S3_dict(bucket_name='smartex-test', access_key='AKIASBFPUZX4JL6G2RVU', secret_access_key='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', region='eu-west-3')
# put and get
try:
    bucket.get('texto.txt')
except:
    print('Tried to get object with key \'texto.txt\' but it failed (as it should). Uploading file to the bucket\n')


bucket.put('texto.txt', io.BytesIO(open('texto.txt', 'rb').read())) #The easiest way to create a binary stream is with open() with 'b' in the mode string:f = open("myfile.jpg", "rb")
print('File uploaded. Getting it again to check if the content is correct.\n')
file = bucket.get('texto.txt')

print('File content: ' + file.read().decode('UTF-8'))

# keys and items

print('Getting all keys with \'test\' prefix: ' + str(list(bucket.keys(prefix='teste'))))

print('\nNow getting a tuple with key-value pairs of objects with \'test\' prefix as well and checking their contents\n')

for obje in bucket.items(prefix='teste'):
    print('Key: ' + obje[0] + ' ; Content: ' + obje[1].read().decode('UTF-8'))
    print('\n')

print('If \'texto.txt\' didnt appear on the last two tests and \'test#.txt\' 1 through 4 did, everything went fine :)\nTo finish, let\'s remove the initial file once again.')
bucket.pop('texto.txt')

Tried to get object with key 'texto.txt' but it failed (as it should). Uploading file to the bucket

File uploaded. Getting it again to check if the content is correct.

File content: O Sporting vai ser campeão este ano

Getting all keys with 'test' prefix: ['teste1.txt', 'teste2.txt', 'teste3.txt']

Now getting a tuple with key-value pairs of objects with 'test' prefix as well and checking their contents

Key: teste1.txt ; Content: Sporting em primeiro


Key: teste2.txt ; Content: Porto em segundo


Key: teste3.txt ; Content: Benfica em ultimo


If 'texto.txt' didnt appear on the last two tests and 'test#.txt' 1 through 4 did, everything went fine :)
To finish, let's remove the initial file once again.


<_io.BytesIO at 0x20590e0ad10>

In [4]:
#https://s3.console.aws.amazon.com/s3/buckets/smartex-test?region=eu-west-3&tab=objects
#https://docs.aws.amazon.com/AmazonS3/latest/userguide/configure-inventory.html#configure-inventory-kms-key-policy
#https://www.stackvidhya.com/write-a-file-to-s3-using-boto3/
#https://boto3.amazonaws.com/v1/documentation/api/latest/guide/session.html
#https://www.geeksforgeeks.org/access-modifiers-in-python-public-private-and-protected/
#https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.get_object
#https://urllib3.readthedocs.io/en/latest/user-guide.html
#https://docs.python.org/3/library/io.html
#https://www.kite.com/python/answers/how-to-get-the-status-code-of-a-website-using-urllib-in-python
#https://docs.python.org/3/library/exceptions.html
#https://docs.python.org/3/howto/urllib2.html
#https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.head_object

In [5]:
#https://realpython.com/introduction-to-python-generators/#understanding-the-python-yield-statement
#https://docs.aws.amazon.com/IAM/latest/APIReference/API_ListAccessKeys.html
#https://www.tutorialspoint.com/python/python_multithreading.html
#https://www.bogotobogo.com/python/Multithread/python_multithreading_Synchronization_Lock_Objects_Acquire_Release.php
#https://www.bogotobogo.com/python/python_functions_lambda.php
#https://www.geeksforgeeks.org/multithreading-python-set-1/
#https://stackoverflow.com/questions/35244577/is-it-possible-to-use-an-inline-function-in-a-thread-call
#https://www.novixys.com/blog/python-threading-tutorial/
#https://singularitykchen.github.io/blog/2021/03/05/Code-Study-Python-multiprocessing/