# Melhorias (Função lambda)

Durante o desenvolvimento do desafio foi observado que esta etapa teve algumas falhas, de modo que voltei para prosseguir nas demais etapas. O primeiro ponto é que a quantidade de threads utilizada superava o limite de requisições da API fornecidos para meu usuário por segundo, o qual era 50. Dessa forma, muitas requisições foram bloqueadas e houve uma imensa perda de dados, dos 59 mil filmes pesquisados cerca de 26 mil foram trazidos. Após as melhorias aplicadas foram obtidos dados de 50 mil filmes.

Outro ponto criado para otimizar a ação do código foi fazer com que a função de obter a data atual fosse chamada apenas uma vez, fora das funções criadas, pois, sendo chamada a cada vez que um dado iria ser salvo na nuvem havia uma grande perda de tempo de processamento.

Com a redução do número threads foi necessário fazer adaptações para que o código pudesse buscar todos os filmes na API antes que acabasse o limite de tempo de execução da função lambda. Esse processo de adaptação resultou na criação de 3 funções lambdas: trazer ID's dos filmes de imdb para tmdb, dividir conteúdo trazido em dois únicos arquivos e buscar filmes na api.

### Código para buscar os ID's de filmes no TMDB

In [None]:
import pandas
import requests
import json
import boto3
from datetime import datetime
import numpy
import threading
import os


Authorization = os.getenv('Authorization')
headers = {
    "Authorization": Authorization,
    "accept": "application/json"
}

bucketName = 'data-lake-do-berg'
s3_client = boto3.client('s3')

# Função para gravar os filmes no bucket s3

def gravarMoviesID(objeto, nomeArquivo):

    key = f'Raw/TMDB/JSON/Movies/IDs/{nomeArquivo}'

    if len(objeto) > 1:
        jsonMovies = "\n".join(objeto)
    else:
        jsonMovies = objeto[0]

    s3_client.put_object(Bucket=bucketName, Key=key, Body=jsonMovies)

# Função que retorna filmes da API

def retornarMoviesID(imdbID):
    url = f'https://api.themoviedb.org/3/find/{imdbID}?external_source=imdb_id'
    response = requests.get(url, headers=headers)

    if response.status_code == 200:
        data = response.json()

        if data.get("movie_results"):
            obraID = data["movie_results"][0]["id"]
            IDs = {
                "imdb": imdbID,
                "tmdb": obraID
            }
            return IDs                    
    return None

# Função para buscar filmes e salvá-los no bucket s3

def buscarMoviesID(moviesIDs, threadName):    
    movies = []
    contador = 0
    numeroArquivo = 0

    ultimoIndice = len(moviesIDs) - 1
    for indice, id in enumerate(moviesIDs):        

        movieJson = retornarMoviesID(id)
        if movieJson is not None:
            movies.append(json.dumps(movieJson))
            contador += 1

        if indice == ultimoIndice and len(movies) > 0:
            numeroArquivo += 1            
            arquivo = f'Movies-IDs-{threadName}-{numeroArquivo}.json'
            gravarMoviesID(movies, arquivo)

# Função para criação de threads de execução concorrente

def executarThreads(dfMovies):
    valores = ['Drama,Romance', 'Drama', 'Romance']
    condicao = dfMovies['genero'].isin(valores)
    
    dfMoviesNormalizado = dfMovies[condicao].drop_duplicates()

    colunaMovieID = dfMoviesNormalizado["id"].to_numpy()
    arraysMoviesID = numpy.array_split(colunaMovieID, 50)
    arraysMoviesID = numpy.array(arraysMoviesID, dtype=object)

    threads = []
    
    for i in range(50):
        threadName = f'T{i}'
        thread = threading.Thread(target=buscarMoviesID, args=(arraysMoviesID[i], threadName))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

#  Função lambda de inicialização do código
 
def lambda_handler(event, context):
    s3_file_name = 'Raw/Local/CSV/Movies/2024/05/09/movies.csv'
    objeto = s3_client.get_object(Bucket=bucketName, Key=s3_file_name)
    dfMovies = pandas.read_csv(objeto['Body'], sep='|', usecols=['id', 'genero'])
    executarThreads(dfMovies)
 
    return {
        'statusCode': 200,
        'body': "Dados salvos com sucesso!"
    }

### Código para juntar os arquivos de ID's criados em dois únicos arquivos

In [None]:
import pandas as pd
import json
import boto3
import os
from io import BytesIO


Authorization = os.getenv('Authorization')
headers = {
    "Authorization": Authorization,
    "accept": "application/json"
}


bucketName = 'data-lake-do-berg'
s3 = boto3.client('s3')

# Salvar arquivos JSON no S3
def gravar_arquivos(df, filename):
    json_buffer = BytesIO()
    df.to_json(json_buffer, orient='records', lines=True)
    json_buffer.seek(0)
    s3_file_name = f'Raw/TMDB/JSON/Movies/BuscaIDs/{filename}.json'
    s3.put_object(Bucket=bucketName, Key=s3_file_name, Body=json_buffer.getvalue())

# Função lambda de inicialização do código
def lambda_handler(event, context):
    
    # Obter arquivos na pasta
    s3_path_name = 'Raw/TMDB/JSON/Movies/IDs'
    response = s3.list_objects_v2(Bucket=bucketName, Prefix=s3_path_name)
    
    # Capturar o nome dos arquivos JSON
    json_keys = [content['Key'] for content in response.get('Contents', []) if content['Key'].endswith('.json')]

    # Inicializar uma lista para armazenar os DataFrames
    dataFrames = []
    
    # Ler os arquivos dentro do DataFrame e adicionar ao array de DataFrames
    for key in json_keys:
        response = s3.get_object(Bucket=bucketName, Key=key)
        content = response['Body'].read().decode('utf-8')
        
        # Processar cada linha como um JSON separado
        for line in content.strip().split('\n'):
            data = json.loads(line)
            df = pd.json_normalize(data)
            dataFrames.append(df)
    
    # Dividir a lista de DataFrames em duas partes iguais
    meio = len(dataFrames) // 2
    dataFrames_1 = dataFrames[:meio]
    dataFrames_2 = dataFrames[meio:]
    
    # Concatenar cada parte em um único DataFrame
    dataFrames_1 = pd.concat(dataFrames_1, ignore_index=True)
    dataFrames_2 = pd.concat(dataFrames_2, ignore_index=True)
    
    # Salvar as partes no S3
    gravar_arquivos(dataFrames_1, 'Movies-1')
    gravar_arquivos(dataFrames_2, 'Movies-2')
 
    return {
        'statusCode': 200,
        'body': "Dados salvos com sucesso!"
    }


### Código para buscar os filmes a partir dos arquivos gerados

O código foi executado duas vezes, uma para cada arquivo.

In [None]:
import pandas
import requests
import json
import boto3
from datetime import datetime
import numpy
import threading
import os


Authorization = os.getenv('Authorization')
headers = {
    "Authorization": Authorization,
    "accept": "application/json"
}

bucketName = 'data-lake-do-berg'
s3_client = boto3.client('s3')


data_atual = datetime.now()
data_formatada = data_atual.strftime('%Y/%m/%d')


# Função para gravar os filmes no bucket s3
def gravarMovies(objeto, nomeArquivo):

    key = f'Raw/TMDB/JSON/Movies/{data_formatada}/{nomeArquivo}'

    if len(objeto) > 1:
        jsonMovies = "\n".join(objeto)
    else:
        jsonMovies = objeto[0]

    s3_client.put_object(Bucket=bucketName, Key=key, Body=jsonMovies)


# Função que retorna filmes da API
def retornarMovies(tmdbID):
    url = f'https://api.themoviedb.org/3/movie/{tmdbID}?language=pt-BR'
    response = requests.get(url, headers=headers)
    

    if response.status_code == 200:
        data = response.json()
        return data
    
    return None
        

# Função para buscar filmes e salvá-los no bucket s3
def buscarMovies(moviesIDs, threadName):    
    movies = []
    contador = 0
    numeroArquivo = 0

    ultimoIndice = len(moviesIDs) - 1
    for indice, id in enumerate(moviesIDs):        

        movieJson = retornarMovies(id)
        if movieJson is not None:
            movies.append(json.dumps(movieJson))
            contador += 1

        if contador == 100:
            numeroArquivo += 1            
            arquivo = f'Movies-B-{threadName}-{numeroArquivo}.json'
            gravarMovies(movies, arquivo)
            movies = []
            contador = 0

        elif indice == ultimoIndice and len(movies) > 0:
            numeroArquivo += 1            
            arquivo = f'Movies-B-{threadName}-{numeroArquivo}.json'
            gravarMovies(movies, arquivo)


# Função para criação de threads de execução concorrente
def executarThreads(dfMovies):

    colunaMovieID = dfMovies["tmdb"].to_numpy()
    arraysMoviesID = numpy.array_split(colunaMovieID, 50)
    arraysMoviesID = numpy.array(arraysMoviesID, dtype=object)

    threads = []
    
    for i in range(50):
        threadName = f'T{i}'
        thread = threading.Thread(target=buscarMovies, args=(arraysMoviesID[i], threadName))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()
        


#  Função lambda de inicialização do código
def lambda_handler(event, context):
    s3_file_name = 'Raw/TMDB/JSON/Movies/BuscaIDs/Movies-2.json'
    objeto = s3_client.get_object(Bucket=bucketName, Key=s3_file_name)
    json_body = objeto['Body'].read().decode('utf-8')
    
    json_data = []
    for linha in json_body.splitlines():
        if linha.strip():
            json_data.append(json.loads(linha))
    
    dfMovies = pandas.DataFrame(json_data)
    dfMovies = dfMovies[['tmdb']]
    executarThreads(dfMovies)
 
    return {
        'statusCode': 200,
        'body': "Dados salvos com sucesso!"
    }
