diff --git a/README.md b/README.md index acde3d8..7fbcaec 100644 --- a/README.md +++ b/README.md @@ -4,12 +4,18 @@ A React + TypeScript project demonstrating different approaches to file upload c ## Overview -This project explores various concurrency patterns for file uploads to S3, starting with sequential uploads and progressing to more sophisticated parallel upload strategies. It uses LocalStack to simulate AWS S3 locally for development and testing. +This project explores three distinct concurrency patterns for file uploads to S3: +1. **Sequential Upload** - complexity with blocking phases +2. **Async Batch Upload** - complexity with parallel processing within phases +3. **Streaming Upload** - complexity with real-time progress tracking and mutex protection + +Each implementation showcases different levels of concurrency control, from basic sequential processing to advanced streaming with race condition management using async-mutex. ## Tech Stack - **Frontend**: React 19 + TypeScript + Vite - **AWS SDK**: @aws-sdk/client-s3 + @aws-sdk/s3-request-presigner +- **Concurrency**: async-mutex for race condition management - **Local Infrastructure**: LocalStack + Docker + Nginx (CORS proxy) - **Development**: ESLint + TypeScript strict mode @@ -19,12 +25,17 @@ This project explores various concurrency patterns for file uploads to S3, start src/ ├── infra/ │ ├── S3Client.ts # LocalStack S3 client configuration -│ ├── S3UploadService.ts # Upload service implementations │ └── crypto.ts # MD5 integrity checking utilities ├── useCases/ │ ├── useSequentialUpload.ts # Sequential upload implementation -│ └── useAsyncBatchUpload.ts # Batch async upload implementation -├── App.tsx # Main application component +│ ├── useAsyncBatchUpload.ts # Batch async upload implementation +│ └── useStreamingUpload.ts # Streaming upload with mutex protection +├── types/ +│ └── uploadProgress.ts # Unified progress tracking interfaces +├── blog/ +│ ├── PT-BR.md # Portuguese technical article +│ └── EN-US.md # English technical article +├── App.tsx # Main application with centralized file display └── main.tsx # Application entry point ``` @@ -80,28 +91,21 @@ Nginx proxy handles CORS issues between the frontend and LocalStack: - Nginx proxies to LocalStack with proper CORS headers - Supports file uploads up to 100MB -## Features - -### Upload Interface -- **Strategy Selection**: Radio buttons to switch between Sequential and Batch async -- **Performance Tracking**: Live results table showing run times and comparisons -- **File Management**: Multi-file selection with reset functionality -- **Upload States**: Loading indicators and disabled states during uploads ### Security Features - - Pre-signed URLs for secure uploads -- MD5 integrity checking +- MD5 integrity checking with Content-MD5 headers - File validation and size limits ## Usage 1. **Start the application**: Follow the Development Setup steps above 2. **Select files**: Use the file input to choose multiple files -3. **Choose strategy**: Select either "Sequential Upload" or "Batch Upload (Async)" +3. **Choose strategy**: Select from "Sequential", "Batch (Async)", or "Streaming" upload 4. **Run test**: Click the upload button and observe the performance -5. **Compare results**: Switch strategies and run again to compare performance -6. **View metrics**: Check the results table for detailed timing comparisons +5. **Monitor progress**: For streaming uploads, watch real-time file-by-file progress +6. **Compare results**: Switch strategies and run again to compare performance +7. **View metrics**: Check the results table for detailed timing comparisons and percentage improvements ## Troubleshooting @@ -124,9 +128,18 @@ Nginx proxy handles CORS issues between the frontend and LocalStack: - LocalStack resource constraints in Docker - Network saturation with many concurrent uploads +## Key Dependencies + +- **async-mutex**: Race condition management for concurrent operations +- **crypto-js**: MD5 hash generation for file integrity +- **@aws-sdk/client-s3**: AWS S3 client for bucket operations +- **@aws-sdk/s3-request-presigner**: Pre-signed URL generation + ## Resources - [AWS S3 Upload Documentation](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html) - [Pre-signed URLs Guide](https://docs.aws.amazon.com/AmazonS3/latest/userguide/using-presigned-url.html) - [Object Integrity Checking](https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html) - [LocalStack Documentation](https://docs.localstack.cloud/) +- [async-mutex Library](https://github.com/DirtyHairy/async-mutex) +- [XMLHttpRequest Progress Events](https://developer.mozilla.org/en-US/docs/Web/API/XMLHttpRequest/upload) diff --git a/blog/EN-US.md b/blog/EN-US.md new file mode 100644 index 0000000..6e577d0 --- /dev/null +++ b/blog/EN-US.md @@ -0,0 +1,442 @@ +# Frontend Concurrency Control +![alt text](robot-delivery.png) + +> Raw Portuguese version is here: + +Hello there! Hope you're doing well. + +The purpose of Tech Pills here is to begin a series of articles to solve or discuss technical problems we encounter with clients on a daily basis. + +Yes, we know that JavaScript essentially works with single-thread execution, relying on callbacks for asynchronous behaviors. + +Today we're going to climb a few steps to work with concurrency at different levels. + +We're faced with uploading files to storage (Amazon S3 in our case) and how we could improve uploading a large number of files, since the frontend would be responsible for these requests in our initial scenario. + +There are N ways to do this today. We'll start with sequential and climb two steps of complexity and performance. + +This is the guide project we'll use. Any information related to technologies and setup is centralized in it, but the most important thing now is to understand the concepts and what problem we want to solve. + +We'll simulate an S3 server locally with [localstack](https://github.com/localstack/localstack), but that doesn't matter. For a real case, we just need to make the necessary edits to our client, because we'll consume the same AWS S3 SDK. + +You'll notice that our s3-client also normally uses the AWS S3 SDKs, following Amazon's own documentation https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html + +We also know that we need an `S3Client` with our specific configurations, and for our case we'll also use [pre-signed URLs](https://docs.aws.amazon.com/AmazonS3/latest/userguide/using-presigned-url.html) + +As already mentioned, the implementation of the methods is not trivial and is in the repository for reference (whenever we have ... ellipses it means I've hidden the implementation). + +```ts +// src/infra/S3Client.ts + +class LocalS3Simulator { + readonly client: S3Client; + readonly bucket: string; + readonly baseUrl: string; + + constructor(options: LocalS3SimulatorOptions = {}) { + this.client = new S3Client({ + region: options.region || "us-east-1", + endpoint: options.endpoint || "http://localhost:4566", + forcePathStyle: true, + credentials: { + accessKeyId: options.accessKeyId || "test", + secretAccessKey: options.secretAccessKey || "test", + }, + }); + + this.bucket = options.bucket || "test-uploads"; + this.baseUrl = options.endpoint || "http://localhost:4566"; + } + + async bucketExists(): Promise {/* ... */} + async createBucket(): Promise {/* ... */} + async generateSignedUploadUrl(fileName: string): Promise {/* ... */} +} +``` + +For this case we'll also check the [integrity](https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html) of our upload by providing an [MD5](https://docs.aws.amazon.com/pt_br/redshift/latest/dg/r_MD5.html) hash, and with that we can finally begin our journey in sending files. + +## Sequential + +> Implementation of this step here https://github.com/Tech-Pills/frontend-concurrency-control/pull/1 + +In our initial approach, sequentially, we'll create MD5 hashes, temporary links, and then perform the uploads. + +An important point is that the frontend implementation and its components are trivial. We'll focus more on the part that concerns the upload. + +So, as mentioned above, we'll follow three steps: +1 - Generate MD5 hashes +2 - Generate pre-signed URLs +3 - Perform uploads + +And for each of them, we'll do it sequentially. In this scenario we'll send 1,000 files at a time. +Notice that we only start generating pre-signed URLs when we have all the hashes, and we also only start uploads when we have all the pre-signed URLs. + +We'll have this class responsible for generating pre-signed URLs and making a simple fetch for the upload: + +```ts +// infra/S3UploadService.ts +export class S3UploadService { + private s3Client: LocalS3Simulator; + + constructor(bucket = "test-uploads") { + this.s3Client = new LocalS3Simulator({ bucket }); + } + + async generateSignedUploadUrl(fileName: string): Promise {/* ... */} + async uploadSingleFile( + customFile: CustomFile, + signedUrl: string + ): Promise {/* ... */} +``` + +We could indeed do the entire process for each file before calling the next one, but I wanted to highlight the problem of queuing tasks that are dependent in a completely sequential manner, and often we let this happen when iterating through lists to execute some processing. +Since each iteration is responsible for processing, it will be simple to modify in the next steps. So in this way, our user will only be able to observe their uploads when all hashes and pre-signed URLs have been generated. + +![alt text]() + +We'll have a use case for each of the types I'll show here. Remember not to focus too much on writing decisions, some were designed to better exemplify the phases we'll go through. + +```ts +// useCases/useSequentialUpload.ts + +export const useSequentialUpload = () => { + const [uploadService] = useState(() => new S3UploadService()); + /* ... */ + +async function uploadFilesSequentially(files: File[]): Promise { + const results: Response[] = []; + const customFiles: CustomFile[] = []; + + for (const file of files) { + const md5Hash = await generateMD5Base64(file); + customFiles.push({ file, md5: md5Hash, preSignedUrl: "" }); + } + + for (const customFile of customFiles) { + const signedUrl = await uploadService.generateSignedUploadUrl( + customFile.file.name + ); + customFile.preSignedUrl = signedUrl; + } + + for (const customFile of customFiles) { + const result = await uploadService.uploadSingleFile( + customFile, + customFile.preSignedUrl + ); + results.push(result); + } + + return results; + } +} +``` + +At this moment we can observe a time complexity of `O(3n)`. The 3 is disposable, but for our understanding, illustrating it is important because it becomes evident that for each phase we have here, we have potential to work in parallel. + +![alt text](time-complexity-sequential.png) + +To be honest, if we were thinking that our user is waiting for the upload and would like some feedback, we could have structured so that all steps are executed within a "wrapper" so that we don't need to wait for all hashes and pre-signed URLs to be created before starting the first upload. We would make a batch handle the 3 processes and create a queue for that. I forced this decision for a study scenario, for this and the next example. + +If you happen to be using my repository as a base, you can check the uploaded files at `http://localhost:4566/test-uploads/` + +## Async Batch + +> Implementation of this step here https://github.com/Tech-Pills/frontend-concurrency-control/pull/2 + +Now in each of the phases we use [Promises](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise), therefore each action can happen at the same time as another, and as soon as they all resolve we can move to the next step in our flow. It's still necessary to wait for all hash generations to happen, for example, but now no longer sequentially. + +We add a new use case for our asynchronous batch upload: + +```ts +// useCases/useAsyncBatchUpload.ts +export const useBatchUpload = () => { + // ... + async function uploadFilesBatch(files: File[]): Promise { + const results: Response[] = []; + const customFiles: CustomFile[] = []; + + const md5Promises = files.map(async (file, index) => { + const md5Hash = await generateMD5Base64(file); + console.log(`MD5 generated for file ${index + 1}: ${file.name}`); + return { file, md5: md5Hash, preSignedUrl: "" }; + }); + + const resolvedCustomFiles = await Promise.all(md5Promises); + customFiles.push(...resolvedCustomFiles); + + const urlPromises = customFiles.map(async (customFile, index) => { + const signedUrl = await uploadService.generateSignedUploadUrl( + customFile.file.name + ); + customFile.preSignedUrl = signedUrl; + console.log(`Signed URL generated for file ${index + 1}: ${customFile.file.name}`); + }); + + await Promise.all(urlPromises); + + const uploadPromises = customFiles.map(async (customFile, index) => { + console.log(`Uploading file ${index + 1}: ${customFile.file.name}`); + return await uploadService.uploadSingleFile( + customFile, + customFile.preSignedUrl + ); + }); + + const uploadResults = await Promise.all(uploadPromises); + results.push(...uploadResults); + + return results; + } + // ... +} +``` + +I also made some non-trivial modifications to our main UI to have some comparisons between each execution and algorithm. + +![alt text](ui-sequential-vs-async.png) + +To further illustrate, now we have multiple people responsible for executing actions at the same time and not just one. + +If we were a restaurant, in the sequential form we would have only 1 chef: +- Reading 1 order -> reading 1 order -> reading 1 order +- Preparing 1 dish -> preparing 1 dish -> preparing 1 dish +- Delivering 1 order -> delivering 1 order -> delivering 1 order + +![alt text](batch-async-upload.png) + +And in the asynchronous batch example, we hire multiple chefs: +- All chefs start reading orders, as soon as the last one finishes +- All chefs start preparing dishes, as soon as the last one finishes +- All chefs deliver orders + +I added some logs to show that some actions within each phase can happen before another. + +![alt text](log-async-example.png) + +Since everything happens in parallel in each phase, our algorithm now drops to `O(phases)`. In this case we have 3 phases happening. + +![alt text](time-complexity-async.png) + +Keep in mind that performance is not as expected if we count the complexity achieved, but we have some bottlenecks happening at various points: +- Browser connection limits; +- LocalStack limits some resources; +- Single Nginx proxy saturating our connection; +- Limits in JavaScript's [event loop](https://nodejs.org/id/learn/asynchronous-work/event-loop-timers-and-nexttick#what-is-the-event-loop). + +That's why overall we only achieve about ~20% performance improvement. + +![alt text](async-limit.png) + +I don't want to go deep into the concept of [Threads](https://developer.mozilla.org/en-US/docs/Glossary/Thread), but we know that JavaScript was initially designed as single-threaded executing one operation at a time, but today we know it's possible to create additional threads. + +## Streaming + Mutex + +> Implementation of this step here https://github.com/Tech-Pills/frontend-concurrency-control/pull/3 + +Finally, we're going to start working with pipeline processing. As mentioned earlier, each phase of our processing needs the previous one, but we don't need all actions of one phase to finish before starting the next. + +Following the restaurant example with multiple chefs, we agreed that in the previous scenario the chefs could only work on the next step when ALL others had finished, but now: +- All chefs start reading orders, as soon as each chef finishes +- Each chef can start preparing their dish as soon as each chef finishes +- Each chef can deliver their order + +```ts +export const useStreamingUpload = () => { +// ... + async function uploadFilesStreaming(files: File[]): Promise { + const results: Response[] = new Array(files.length); + + const processFile = async (file: File, index: number) => { + try { + const md5Hash = await generateMD5Base64(file); + const signedUrl = await uploadService.generateSignedUploadUrl(file.name); + + const customFile: CustomFile = { + file, + md5: md5Hash, + preSignedUrl: signedUrl + }; + + const result = await uploadService.uploadSingleFile(customFile, signedUrl); + results[index] = result; + + return result; + } catch (error) { + const failedResponse = new Response(null, { + status: 500, + statusText: error instanceof Error ? error.message : 'Unknown error' + }); + + results[index] = failedResponse; + return failedResponse; + } + }; + const streamPromises = files.map((file, index) => processFile(file, index)); + await Promise.all(streamPromises); + return results; + } + // ... +} +``` + +Can you identify any problems? Some of these actions may compete for resources, for example, trying to access our `results` at the same time, having errors interfering with processes still running, and also not having any visibility of the state of each pending batch execution. But remember, as we've already discussed, JavaScript is single-threaded and we're not creating other threads at the moment, so many scenarios that seem to compete for resources may not even exist. But regardless, we'll proceed to solve the "problem". + +When we compete for resources, we fall into a [race condition](https://en.wikipedia.org/wiki/Race_condition), but we have something that solves this problem: the [Mutex - mutual exclusion](https://pt.wikipedia.org/wiki/Exclus%C3%A3o_m%C3%BAtua), which blocks access to some public resource, avoiding unexpected behaviors. + +We'll use [useRef](https://react.dev/reference/react/useRef) to create references where necessary and [async-mutex](https://github.com/DirtyHairy/async-mutex), the library that today provides the most stability for this type of implementation in JavaScript. + +Let's break down the explanation into parts: +```ts +// useCases/useStreamingUpload.ts +export const useStreamingUpload = () => { + const completedCountRef = useRef(0); + const failedCountRef = useRef(0); + const counterMutex = useRef(new Mutex()).current; + + const resultsRef = useRef([]); + const [fileProgress, setFileProgress] = useState([]); + const stateMutex = useRef(new Mutex()).current; + // ... +} +``` + +We now have a reference to count actions that completed or failed and a `Mutex` to control access to them, also references for results and what's still pending, controlled by another `Mutex`. + +The `useRef(new Mutex()).current` pattern is an optimization that ensures a unique and persistent mutex instance across all component re-renders: +1. `useRef()` creates persistent storage - The ref object survives new renders +2. `.current` extracts the mutex immediately - No need to access .current everywhere +3. Single initialization - The Mutex constructor runs only once during the first render. + +In addition to initializing each context, when we need to update our pending batches, `stateMutex.runExclusive` will protect its mutual access. +```ts +// useCases/useStreamingUpload.ts +// ... +async function uploadFilesStreaming(files: File[]): Promise { + completedCountRef.current = 0; + failedCountRef.current = 0; + resultsRef.current = new Array(files.length); + + const initialProgress: FileProgress[] = files.map((file, index) => ({ + id: index, + fileName: file.name, + phase: 'waiting', + progress: 0, + startTime: performance.now() + })); + + setFileProgress(initialProgress); + + const updateFileProgress = async (index: number, phase: FileProgress['phase'], progress?: number) => { + await stateMutex.runExclusive(async () => { + setFileProgress(prev => prev.map((file, i) => + i === index ? { ...file, phase, progress: progress ?? file.progress } : file + )); + }); + }; +// ... +``` + +And for writing results as well: +```ts +// useCases/useStreamingUpload.ts +// ... +const setResult = async (index: number, result: Response) => { + resultsRef.current[index] = result; + + await counterMutex.runExclusive(async () => { + if (result.ok) { + completedCountRef.current += 1; + console.log( + `MUTEX ACCESS: Incremented completed count to ${completedCountRef.current}` + ); + } else { + failedCountRef.current += 1; + console.log( + `MUTEX ACCESS: Incremented failed count to ${failedCountRef.current}` + ); + } + }); + + await updateFileProgress( + index, + result.ok ? "completed" : "failed", + result.ok ? 100 : 0 + ); +}; +// ... +``` + +And for the file processing itself, we have the call to each phase with its state update immediately after, each ensuring safe access for accesses and updates (I decided to use fixed values for progress updates just so we can analyze this in the simplest possible way). +```ts +// useCases/useStreamingUpload.ts +// ... +const processFile = async (file: File, index: number) => { + try { + console.log( + `Starting streaming process for file ${index + 1}: ${file.name}` + ); + + await updateFileProgress(index, "md5", 25); + console.log( + `Phase 1 - MD5 generation for file ${index + 1}: ${file.name}` + ); + const md5Hash = await generateMD5Base64(file); + + await updateFileProgress(index, "url", 50); + console.log( + `Phase 2 - URL generation for file ${index + 1}: ${file.name}` + ); + + const signedUrl = await uploadService.generateSignedUploadUrl( + file.name + ); + + await updateFileProgress(index, "upload", 75); + console.log( + `Phase 3 - Upload starting for file ${index + 1}: ${file.name}` + ); + const customFile: CustomFile = { + file, + md5: md5Hash, + preSignedUrl: signedUrl, + }; + + const result = await uploadService.uploadSingleFile( + customFile, + signedUrl + ); + + await setResult(index, result); + + console.log( + `Streaming upload completed for file ${index + 1}: ${file.name}` + ); + return result; +// ... +``` +These are the main changes. As mentioned, in addition to ensuring safe accesses/updates, we also initialize all streams at the same time and they can "compete" for these resources. + +![alt text](streaming-timeline.png) + +You must have noticed that we didn't gain performance, but now we process each upload as soon as the other steps have finished, getting closer to what we expect from a real-world implementation, delivering more speed and experience to the user. + +Time complexity `O(1)`? +Constant time, regardless of file count, because: +- All streams start at t=0 simultaneously; +- Each file processes independently through its pipeline; +- No waiting for other files to complete phases; +- Total time = time to process the slowest individual file + +But like the other example, all the resource limitations already mentioned and performance will be similar in our local context to the previous implementation (in terms of speed for completing all files). + +We updated the UI to observe how each phase is now happening at different moments: + +![alt text](interface-algorithms.png) + +It's also important to note that we went through several types of implementations and used file upload as an example, but all the concepts used here can be applied in any other context with bananas or apples. + +For a next article, we'll talk about this same implementation with Web Workers and Multi-threading. + +See you on GitHub, see you soon! :rocket: diff --git a/blog/PT-BR.md b/blog/PT-BR.md new file mode 100644 index 0000000..34db2c9 --- /dev/null +++ b/blog/PT-BR.md @@ -0,0 +1,442 @@ +# Frontend Concurrency Control +![alt text](robot-delivery.png) + +> LLM improved English version is here: REPLACEME + +Fala, turma! Espero que esteja tudo bem. + +O intuito do Tech Pills aqui é começar com uma série de artigos para resolver ou discutir problemas técnicos que encontramos nos clientes no dia a dia. + +Sim, nós sabemos que o JavaScript, em suma, trabalha com execução single thread, dependendo de callbacks para comportamentos assíncronos. + +Hoje vamos subir algumas escadas para trabalharmos com concorrência em alguns níveis. + +Nos deparamos com um upload de arquivos para um storage (Amazon S3 no nosso caso) e como poderíamos melhorar o upload de uma quantidade grande de arquivos, já que o frontend seria responsável por essas requests para o nosso cenário inicial. + +Temos N maneiras de fazer isso hoje, começaremos pela sequencial e subiremos duas escadas de complexidade e também de performance. + +Esse é o projeto guia que usaremos, qualquer informação relacionada às tecnologias e setup estão centralizadas nele, mas o mais importante agora é entendermos os conceitos e qual problema queremos resolver. + +Iremos simular um servidor S3 localmente com [localstack](https://github.com/localstack/localstack), mas isso não importa, para um caso real é só fazermos as edições necessárias no nosso client, porque iremos consumir a mesma SDK da aws-s3. + +Vai perceber que o nosso s3-client também utiliza normalmente os SDKs da aws-s3 +Seguindo a própria documentação da Amazon https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html + +Também sabemos que precisamos de um `S3Client` com nossas configurações específicas e para o nosso caso também iremos usar [URLs pré-assinadas](https://docs.aws.amazon.com/AmazonS3/latest/userguide/using-presigned-url.html) + +Como já dito, a implementação dos métodos não são triviais e estão no repositório para consulta (sempre que tivermos ... reticências quer dizer que eu escondi a implementação). + +```ts +// src/infra/S3Client.ts + +class LocalS3Simulator { + readonly client: S3Client; + readonly bucket: string; + readonly baseUrl: string; + + constructor(options: LocalS3SimulatorOptions = {}) { + this.client = new S3Client({ + region: options.region || "us-east-1", + endpoint: options.endpoint || "http://localhost:4566", + forcePathStyle: true, + credentials: { + accessKeyId: options.accessKeyId || "test", + secretAccessKey: options.secretAccessKey || "test", + }, + }); + + this.bucket = options.bucket || "test-uploads"; + this.baseUrl = options.endpoint || "http://localhost:4566"; + } + + async bucketExists(): Promise {/* ... */} + async createBucket(): Promise {/* ... */} + async generateSignedUploadUrl(fileName: string): Promise {/* ... */} +} +``` + +Para esse caso também vamos checar a [integridade](https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html) do nosso upload provendo uma [MD5](https://docs.aws.amazon.com/pt_br/redshift/latest/dg/r_MD5.html) e com isso poderemos finalmente começar nossa jornada no envio dos arquivos. + +## Sequencial + +> Implementação dessa etapa aqui https://github.com/Tech-Pills/frontend-concurrency-control/pull/1 + +Na nossa forma inicial, de maneira sequencial, iremos criar as hashes MD5, os links temporários e então faremos os uploads. + +Um ponto importante é que a implementação frontend e seus componentes é trivial, vamos focar mais na parte que tange o upload. + +Então, como citado acima, vamos seguir três passos: +1 - Gerar os MD5 hashes +2 - Gerar as URLs pré-assinadas +3 - Fazer os uploads + +E para cada um deles, vamos fazer de forma sequencial. Nesse cenário vamos enviar 1.000 (MIL) arquivos por vez. +Veja que só começamos a gerar as URLs pré-assinadas quando temos todas as hashes, e também só começamos os uploads quando tivermos todas as URLs pré-assinadas. + +Teremos essa classe responsável por gerar as URLs pré-assinadas e fazer um fetch simples para o upload + +```ts +// infra/S3UploadService.ts +export class S3UploadService { + private s3Client: LocalS3Simulator; + + constructor(bucket = "test-uploads") { + this.s3Client = new LocalS3Simulator({ bucket }); + } + + async generateSignedUploadUrl(fileName: string): Promise {/* ... */} + async uploadSingleFile( + customFile: CustomFile, + signedUrl: string + ): Promise {/* ... */} +``` + +Poderíamos sim fazer o processo todo para cada arquivo antes de chamar o próximo, mas queria evidenciar o problema em enfileirarmos tarefas que são dependentes de forma totalmente sequencial, e muitas vezes deixamos isso acontecer ao iterar por listas para executarmos algum processamento. +Como cada iteração é responsável por um processamento, deixará simples de modificarmos nos próximos passos, então dessa forma, nosso usuário só vai conseguir observar os seus uploads quando todos os hashes e URLs pré-assinadas forem geradas. + +![alt text]() + +Teremos um caso de uso para cada um dos tipos que vou mostrar aqui, lembrando para não se ater muito às decisões de escrita, algumas foram pensadas para exemplificar melhor pelas fases que vamos passar. + +```ts +// useCases/useSequentialUpload.ts + +export const useSequentialUpload = () => { + const [uploadService] = useState(() => new S3UploadService()); + /* ... */ + +async function uploadFilesSequentially(files: File[]): Promise { + const results: Response[] = []; + const customFiles: CustomFile[] = []; + + for (const file of files) { + const md5Hash = await generateMD5Base64(file); + customFiles.push({ file, md5: md5Hash, preSignedUrl: "" }); + } + + for (const customFile of customFiles) { + const signedUrl = await uploadService.generateSignedUploadUrl( + customFile.file.name + ); + customFile.preSignedUrl = signedUrl; + } + + for (const customFile of customFiles) { + const result = await uploadService.uploadSingleFile( + customFile, + customFile.preSignedUrl + ); + results.push(result); + } + + return results; + } +} +``` + +Nesse momento podemos observar uma complexidade no tempo um `O(3n)`, o 3 é descartável mas para o nosso entendimento ilustrá-lo é importante, porque fica evidente que para cada fase que teríamos aqui, temos potencial para trabalharmos em paralelo. + +![alt text](time-complexity-sequential.png) + +Para ser honesto, se estivéssemos pensando que o nosso usuário está esperando pelo upload e gostaria de algum feedback, poderíamos ter estruturado para que todos os passos sejam executados dentro de um "embrulho" para que não precisemos esperar todas as hashes e URLs pré-assinadas serem criadas para então iniciarmos o primeiro upload, faríamos um batch cuidar dos 3 processos e faríamos uma fila disso, forcei essa decisão para um cenário de estudo, para esse e o próximo exemplo. + +Se por acaso está usando meu repositório como base você pode checar os arquivos enviados em `http://localhost:4566/test-uploads/` + +## Async Batch + +> Implementação dessa etapa aqui https://github.com/Tech-Pills/frontend-concurrency-control/pull/2 + +Agora em cada uma das fases nós usamos [Promises](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise), portanto cada ação pode acontecer ao mesmo tempo que outra e assim que todas elas resolverem podemos passar para a próxima etapa no nosso fluxo, ainda sendo necessário esperar por todas as gerações de hashes acontecerem, por exemplo, mas agora não mais de forma sequencial. + +Adicionamos um novo caso de uso para o nosso upload em batches assíncronos + +```ts +// useCases/useAsyncBatchUpload.ts +export const useBatchUpload = () => { + // ... + async function uploadFilesBatch(files: File[]): Promise { + const results: Response[] = []; + const customFiles: CustomFile[] = []; + + const md5Promises = files.map(async (file, index) => { + const md5Hash = await generateMD5Base64(file); + console.log(`MD5 generated for file ${index + 1}: ${file.name}`); + return { file, md5: md5Hash, preSignedUrl: "" }; + }); + + const resolvedCustomFiles = await Promise.all(md5Promises); + customFiles.push(...resolvedCustomFiles); + + const urlPromises = customFiles.map(async (customFile, index) => { + const signedUrl = await uploadService.generateSignedUploadUrl( + customFile.file.name + ); + customFile.preSignedUrl = signedUrl; + console.log(`Signed URL generated for file ${index + 1}: ${customFile.file.name}`); + }); + + await Promise.all(urlPromises); + + const uploadPromises = customFiles.map(async (customFile, index) => { + console.log(`Uploading file ${index + 1}: ${customFile.file.name}`); + return await uploadService.uploadSingleFile( + customFile, + customFile.preSignedUrl + ); + }); + + const uploadResults = await Promise.all(uploadPromises); + results.push(...uploadResults); + + return results; + } + // ... +} +``` + +Também fiz algumas modificações não triviais à nossa UI principal para termos algumas comparações entre cada execução e algoritmo. + +![alt text](ui-sequential-vs-async.png) + +Exemplificando mais ainda, agora temos vários responsáveis por executar as ações ao mesmo tempo e não apenas um. + +Se fôssemos um restaurante, na forma sequencial teríamos apenas 1 chef: +- Leitura de 1 pedido -> leitura de 1 pedido -> leitura de 1 pedido +- Preparo de 1 prato -> preparo de 1 prato -> preparo de 1 prato +- Entrega de 1 pedido -> entrega de 1 pedido -> entrega de 1 pedido + +![alt text](batch-async-upload.png) + +E no exemplo de batches assíncronos, contratamos vários chefs: +- Todos os chefs começam a leitura dos pedidos, assim que o último termina +- Todos os chefs começam o preparo dos pratos, assim que o último termina +- Todos os chefs entregam os pedidos + +Adicionei alguns logs para evidenciar que algumas ações dentro de cada fase podem acontecer antes de outra. + +![alt text](log-async-example.png) + +Como em cada fase tudo acontece em paralelo agora o nosso algoritmo cai para `O(fases)` nesse caso temos 3 etapas acontecendo. + +![alt text](time-complexity-async.png) + +Tenha em mente que a performance não é a esperada se contarmos a complexidade atingida mas temos alguns gargalos acontecendo em diversos pontos: +- Limite de conexões do browser; +- LocalStack limita alguns recursos; +- Único Nginx proxy saturando a nossa conexão; +- Limites no [event loop](https://nodejs.org/id/learn/asynchronous-work/event-loop-timers-and-nexttick#what-is-the-event-loop) do JavaScript. + +Por isso no geral obtemos apenas cerca de ~20% de performance. + +![alt text](async-limit.png) + +Eu não quero ir a fundo no conceito de [Threads](https://developer.mozilla.org/en-US/docs/Glossary/Thread) mas sabemos que o JavaScript foi desenhado inicialmente como single thread executando uma operação por vez, mas hoje sabemos que é possível criar threads adicionais + +## Streaming + Mutex + +> Implementação dessa etapa aqui https://github.com/Tech-Pills/frontend-concurrency-control/pull/3 + +Finalmente vamos começar a trabalhar com processamento em fila, como citado anteriormente, cada fase do nosso processamento precisa da anterior, mas não precisamos que todas as ações de uma fase terminem para começarmos a próxima. + +Seguindo o exemplo do restaurante com vários chefs, concordamos que no cenário anterior os chefs só podiam trabalhar na próxima etapa quando TODOS os outros tinham terminado, mas agora: +- Todos os chefs começam a leitura dos pedidos, assim que cada chef termina +- Cada chef pode começar o preparo do seu prato, assim que cada chef termina +- Cada chef pode entregar o seu pedido + +```ts +export const useStreamingUpload = () => { +// ... + async function uploadFilesStreaming(files: File[]): Promise { + const results: Response[] = new Array(files.length); + + const processFile = async (file: File, index: number) => { + try { + const md5Hash = await generateMD5Base64(file); + const signedUrl = await uploadService.generateSignedUploadUrl(file.name); + + const customFile: CustomFile = { + file, + md5: md5Hash, + preSignedUrl: signedUrl + }; + + const result = await uploadService.uploadSingleFile(customFile, signedUrl); + results[index] = result; + + return result; + } catch (error) { + const failedResponse = new Response(null, { + status: 500, + statusText: error instanceof Error ? error.message : 'Unknown error' + }); + + results[index] = failedResponse; + return failedResponse; + } + }; + const streamPromises = files.map((file, index) => processFile(file, index)); + await Promise.all(streamPromises); + return results; + } + // ... +} +``` + +Consegue identificar algum problema? Algumas dessas ações podem concorrer por recursos, por exemplo, tentar acessar o nosso `results` ao mesmo tempo, termos erros interferindo em processos ainda em execução e também não termos nenhuma visibilidade do estado de cada execução em lote pendente, mas lembrando que como já conversamos o JavaScript é single-threaded e não estamos criando outras threads no momento, então muitos cenários que parecem competir por recursos podem nem existir, mas independente disso, seguiremos para resolver o "problema". +Quando disputamos por recursos, caímos em uma [race condition](https://en.wikipedia.org/wiki/Race_condition), mas temos algo que resolve esse problema, o [Mutex - mutual exclusion](https://pt.wikipedia.org/wiki/Exclus%C3%A3o_m%C3%BAtua), que dessa forma bloqueamos o acesso de algum recurso público, evitando comportamentos inesperados. + +Vamos usar [useRef](https://react.dev/reference/react/useRef) para criarmos referências onde é necessário e o [async-mutex](https://github.com/DirtyHairy/async-mutex) a biblioteca que hoje entrega mais estabilidade nesse tipo de implementação para JavaScript. + +Vamos quebrar a explicação em algumas partes: +```ts +// useCases/useStreamingUpload.ts +export const useStreamingUpload = () => { + const completedCountRef = useRef(0); + const failedCountRef = useRef(0); + const counterMutex = useRef(new Mutex()).current; + + const resultsRef = useRef([]); + const [fileProgress, setFileProgress] = useState([]); + const stateMutex = useRef(new Mutex()).current; + // ... +} +``` + +Temos agora uma referência para contarmos as ações que completaram ou falharam e um `Mutex` para fazermos o controle do acesso a elas, também referências para os resultados e o que ainda está pendente, sendo controlados por um outro `Mutex`. + +O padrão `useRef(new Mutex()).current` é uma otimização que garante uma instância única e persistente de mutex em todas as re-renderizações de componentes: +1. `useRef()` cria armazenamento persistente - O objeto ref sobrevive a novas renderizações +2. `.current` extrai o mutex imediatamente - Não há necessidade de acessar .current em todos os lugares +3. Inicialização única - O construtor Mutex é executado apenas uma vez durante a primeira renderização. + +Além de iniciarmos cada contexto, quando precisarmos atualizar as nossas pending batches o `stateMutex.runExclusive` vai proteger o seu acesso mútuo. +```ts +// useCases/useStreamingUpload.ts +// ... +async function uploadFilesStreaming(files: File[]): Promise { + completedCountRef.current = 0; + failedCountRef.current = 0; + resultsRef.current = new Array(files.length); + + const initialProgress: FileProgress[] = files.map((file, index) => ({ + id: index, + fileName: file.name, + phase: 'waiting', + progress: 0, + startTime: performance.now() + })); + + setFileProgress(initialProgress); + + const updateFileProgress = async (index: number, phase: FileProgress['phase'], progress?: number) => { + await stateMutex.runExclusive(async () => { + setFileProgress(prev => prev.map((file, i) => + i === index ? { ...file, phase, progress: progress ?? file.progress } : file + )); + }); + }; +// ... +``` + +E para a escrita dos resultados também: +```ts +// useCases/useStreamingUpload.ts +// ... +const setResult = async (index: number, result: Response) => { + resultsRef.current[index] = result; + + await counterMutex.runExclusive(async () => { + if (result.ok) { + completedCountRef.current += 1; + console.log( + `MUTEX ACCESS: Incremented completed count to ${completedCountRef.current}` + ); + } else { + failedCountRef.current += 1; + console.log( + `MUTEX ACCESS: Incremented failed count to ${failedCountRef.current}` + ); + } + }); + + await updateFileProgress( + index, + result.ok ? "completed" : "failed", + result.ok ? 100 : 0 + ); +}; +// ... +``` + +E para o processamento do arquivo em si, temos a chamada de cada uma das fases com a atualização do seu estado logo em sequência, e cada um deles garantindo o acesso de forma segura para os acessos e atualizações (decidi usar valores fixos para a atualização do progresso só para podermos analisar isso da forma mais simples possível). +```ts +// useCases/useStreamingUpload.ts +// ... +const processFile = async (file: File, index: number) => { + try { + console.log( + `Starting streaming process for file ${index + 1}: ${file.name}` + ); + + await updateFileProgress(index, "md5", 25); + console.log( + `Phase 1 - MD5 generation for file ${index + 1}: ${file.name}` + ); + const md5Hash = await generateMD5Base64(file); + + await updateFileProgress(index, "url", 50); + console.log( + `Phase 2 - URL generation for file ${index + 1}: ${file.name}` + ); + + const signedUrl = await uploadService.generateSignedUploadUrl( + file.name + ); + + await updateFileProgress(index, "upload", 75); + console.log( + `Phase 3 - Upload starting for file ${index + 1}: ${file.name}` + ); + const customFile: CustomFile = { + file, + md5: md5Hash, + preSignedUrl: signedUrl, + }; + + const result = await uploadService.uploadSingleFile( + customFile, + signedUrl + ); + + await setResult(index, result); + + console.log( + `Streaming upload completed for file ${index + 1}: ${file.name}` + ); + return result; +// ... +``` +Essas são as principais mudanças, como dito, além de garantirmos os acessos/atualizações de forma segura, também inicializamos todas as streams ao mesmo tempo e elas poderão "brigar" por esses recursos. + +![alt text](streaming-timeline.png) + +Você deve ter percebido que não ganhamos em performance, mas agora processamos cada upload assim que as outras etapas já finalizaram, chegando mais próximo do que esperamos de uma implementação na vida real, entregando mais velocidade e experiência para o usuário. + +Complexidade de tempo `O(1)`? +Tempo constante, independentemente da contagem de arquivos, porque: +- Todos os fluxos começam em t=0 simultaneamente; +- Cada arquivo processa independentemente por meio de seu pipeline; +- Sem espera para que outros arquivos concluam fases; +- Tempo total = tempo para processar o arquivo individual mais lento + +Mas como no outro exemplo todas as limitações de recurso já citadas e a performance vai se assemelhar no nosso contexto rodando local à implementação anterior (em tempos de velocidade para conclusão de todos os arquivos). + +Atualizamos a UI para observamos como cada fase agora está acontecendo em um momento: + +![alt text](interface-algorithms.png) + +Também é importante notar que passamos por alguns tipos de implementações e usamos o file upload como exemplo, mas todos os conceitos utilizados aqui podem ser aplicados em qualquer outro contexto com bananas ou maçãs. + +Para um próximo artigo, vamos falar dessa mesma implementação com Web Workers e Multi-threading. + +Nos vemos no github, até breve! :rocket: diff --git a/blog/async-limit.png b/blog/async-limit.png new file mode 100644 index 0000000..bc34ed5 Binary files /dev/null and b/blog/async-limit.png differ diff --git a/blog/batch-async-upload.png b/blog/batch-async-upload.png new file mode 100644 index 0000000..cb31840 Binary files /dev/null and b/blog/batch-async-upload.png differ diff --git a/blog/interface-algorithms.png b/blog/interface-algorithms.png new file mode 100644 index 0000000..8aa1d34 Binary files /dev/null and b/blog/interface-algorithms.png differ diff --git a/blog/log-async-example.png b/blog/log-async-example.png new file mode 100644 index 0000000..6539dbe Binary files /dev/null and b/blog/log-async-example.png differ diff --git a/blog/robot-delivery.png b/blog/robot-delivery.png new file mode 100644 index 0000000..618b871 Binary files /dev/null and b/blog/robot-delivery.png differ diff --git a/blog/sequential-upload-timeline.png b/blog/sequential-upload-timeline.png new file mode 100644 index 0000000..d2fc366 Binary files /dev/null and b/blog/sequential-upload-timeline.png differ diff --git a/blog/streaming-timeline.png b/blog/streaming-timeline.png new file mode 100644 index 0000000..7be316d Binary files /dev/null and b/blog/streaming-timeline.png differ diff --git a/blog/time-complexity-async.png b/blog/time-complexity-async.png new file mode 100644 index 0000000..de1fafe Binary files /dev/null and b/blog/time-complexity-async.png differ diff --git a/blog/time-complexity-sequential.png b/blog/time-complexity-sequential.png new file mode 100644 index 0000000..aba898e Binary files /dev/null and b/blog/time-complexity-sequential.png differ diff --git a/blog/ui-sequential-vs-async.png b/blog/ui-sequential-vs-async.png new file mode 100644 index 0000000..4e4493e Binary files /dev/null and b/blog/ui-sequential-vs-async.png differ diff --git a/package-lock.json b/package-lock.json index 701cb2a..8073fb2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,6 +10,7 @@ "dependencies": { "@aws-sdk/client-s3": "^3.850.0", "@aws-sdk/s3-request-presigner": "^3.850.0", + "async-mutex": "^0.5.0", "crypto-js": "^4.2.0", "react": "^19.1.0", "react-dom": "^19.1.0" @@ -3409,6 +3410,15 @@ "dev": true, "license": "Python-2.0" }, + "node_modules/async-mutex": { + "version": "0.5.0", + "resolved": "https://registry.npmjs.org/async-mutex/-/async-mutex-0.5.0.tgz", + "integrity": "sha512-1A94B18jkJ3DYq284ohPxoXbfTA5HsQ7/Mf4DEhcyLx3Bz27Rh59iScbB6EPiP+B+joue6YCxcMXSbFC1tZKwA==", + "license": "MIT", + "dependencies": { + "tslib": "^2.4.0" + } + }, "node_modules/balanced-match": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/balanced-match/-/balanced-match-1.0.2.tgz", diff --git a/package.json b/package.json index 4c2c2e5..cbfdeae 100644 --- a/package.json +++ b/package.json @@ -12,6 +12,7 @@ "dependencies": { "@aws-sdk/client-s3": "^3.850.0", "@aws-sdk/s3-request-presigner": "^3.850.0", + "async-mutex": "^0.5.0", "crypto-js": "^4.2.0", "react": "^19.1.0", "react-dom": "^19.1.0" diff --git a/src/App.tsx b/src/App.tsx index d1cf623..4df027e 100644 --- a/src/App.tsx +++ b/src/App.tsx @@ -4,40 +4,56 @@ import "./App.css"; import { useSequentialUpload } from "./useCases/useSequentialUpload"; import { useBatchUpload } from "./useCases/useAsyncBatchUpload"; +import { useStreamingUpload } from "./useCases/useStreamingUpload"; import { useState, useEffect } from "react"; interface UploadResult { id: number; - algorithm: 'Sequential' | 'Batch (Async)'; + algorithm: "Sequential" | "Batch (Async)" | "Streaming"; runTime: number; percentageFaster: string | null; timestamp: string; } function App() { - const [strategy, setStrategy] = useState<'sequential' | 'batch'>('sequential'); + const [strategy, setStrategy] = useState< + "sequential" | "batch" | "streaming" + >("sequential"); const [uploadResults, setUploadResults] = useState([]); - + const sequential = useSequentialUpload(); const batch = useBatchUpload(); - - const current = strategy === 'sequential' ? sequential : batch; + const streaming = useStreamingUpload(); + + const current = + strategy === "sequential" + ? sequential + : strategy === "batch" + ? batch + : streaming; + // TODO remove all UI logic and remove unnecessary hooks useEffect(() => { if (sequential.runTime) { const runTimeMs = parseFloat(sequential.runTime); const lastResult = uploadResults[uploadResults.length - 1]; - const percentageFaster = lastResult - ? ((lastResult.runTime - runTimeMs) / lastResult.runTime * 100).toFixed(1) + '%' + const percentageFaster = lastResult + ? ( + ((lastResult.runTime - runTimeMs) / lastResult.runTime) * + 100 + ).toFixed(1) + "%" : null; - setUploadResults(prev => [...prev, { - id: prev.length + 1, - algorithm: 'Sequential', - runTime: runTimeMs, - percentageFaster, - timestamp: new Date().toLocaleTimeString() - }]); + setUploadResults((prev) => [ + ...prev, + { + id: prev.length + 1, + algorithm: "Sequential", + runTime: runTimeMs, + percentageFaster, + timestamp: new Date().toLocaleTimeString(), + }, + ]); } }, [sequential.runTime]); @@ -45,26 +61,57 @@ function App() { if (batch.runTime) { const runTimeMs = parseFloat(batch.runTime); const lastResult = uploadResults[uploadResults.length - 1]; - const percentageFaster = lastResult - ? ((lastResult.runTime - runTimeMs) / lastResult.runTime * 100).toFixed(1) + '%' + const percentageFaster = lastResult + ? ( + ((lastResult.runTime - runTimeMs) / lastResult.runTime) * + 100 + ).toFixed(1) + "%" : null; - setUploadResults(prev => [...prev, { - id: prev.length + 1, - algorithm: 'Batch (Async)', - runTime: runTimeMs, - percentageFaster, - timestamp: new Date().toLocaleTimeString() - }]); + setUploadResults((prev) => [ + ...prev, + { + id: prev.length + 1, + algorithm: "Batch (Async)", + runTime: runTimeMs, + percentageFaster, + timestamp: new Date().toLocaleTimeString(), + }, + ]); } }, [batch.runTime]); + useEffect(() => { + if (streaming.runTime) { + const runTimeMs = parseFloat(streaming.runTime); + const lastResult = uploadResults[uploadResults.length - 1]; + const percentageFaster = lastResult + ? ( + ((lastResult.runTime - runTimeMs) / lastResult.runTime) * + 100 + ).toFixed(1) + "%" + : null; + + setUploadResults((prev) => [ + ...prev, + { + id: prev.length + 1, + algorithm: "Streaming", + runTime: runTimeMs, + percentageFaster, + timestamp: new Date().toLocaleTimeString(), + }, + ]); + } + }, [streaming.runTime]); + const handleFileChange = (event: React.ChangeEvent) => { const fileList = event.target.files; if (fileList && fileList.length > 0) { const selectedFiles = Array.from(fileList); sequential.setFiles(selectedFiles); batch.setFiles(selectedFiles); + streaming.setFiles(selectedFiles); } }; @@ -102,26 +149,53 @@ function App() { setStrategy(e.target.value as 'sequential' | 'batch')} + checked={strategy === "sequential"} + onChange={(e) => + setStrategy( + e.target.value as "sequential" | "batch" | "streaming" + ) + } /> Sequential Upload -