## Definindo máximo de byter por Trigger

Se você estiver ingerindo arquivos grandes que causam tempos longos de processamento em micro-batches ou problemas de memória, você pode usar a opção cloudFiles.maxBytesPerTrigger para controlar a quantidade máxima de dados processados em cada micro-batch. Isso melhora a estabilidade e mantém a duração dos batches mais previsível. Por exemplo, para limitar cada micro-batch a 1 GB de dados, você pode configurar seu stream da seguinte forma:


In [0]:
    spark.readStream
         .format("cloudFiles")
         .option("cloudFiles.format", <source_format>)
         .option("cloudFiles.maxBytesPerTrigger", "1g") #Expressão que define o tamanho máximo de dados processados por trigger
         .load("/path/to/files")

## Lidando com registros ruins

Ao trabalhar com arquivos JSON ou CSV, você pode utilizar a opção badRecordsPath para capturar e isolar registros inválidos em um local separado para análise posterior. Registros com sintaxe malformada, como colchetes ausentes ou vírgulas extras, ou com incompatibilidades de esquema, como erros de tipo de dados ou campos ausentes, são redirecionados para o caminho especificado.


In [0]:
    spark.readStream
         .format("cloudFiles")
         .option("cloudFiles.format", "json")
         .option("badRecordsPath", "/path/to/quarantine") #Expressão que define o caminho para armazenar os registros inválidos
         .schema("id int, value double")
         .load("/path/to/files")

## Filtro de tipagem de arquivos

Para filtrar arquivos de entrada com base em um padrão específico, como por exemplo *.png, você pode utilizar a opção pathGlobFilter. Esta configuração permite que o seu processo de ingestão ignore arquivos irrelevantes no diretório de origem e processe apenas aqueles que atendem ao critério definido, otimizando o tempo de execução e o consumo de recursos do cluster.

In [0]:
    spark.readStream
         .format("cloudFiles")
         .option("cloudFiles.format", "binaryFile")
         .option("pathGlobfilter", "*.png") #Expressão que define o filtro de arquivos(*.png, *.jpg, *.gif...)
         .load("/path/to/files")

## Evolução do Esquema (Schema)
O Auto Loader detecta a adição de novas colunas em arquivos de entrada durante o processamento. Para controlar como essa mudança de esquema é gerenciada, você pode configurar a opção cloudFiles.schemaEvolutionMode. Essa funcionalidade permite que o seu pipeline se adapte automaticamente a mudanças nos dados de origem sem a necessidade de intervenção manual ou reinicialização do stream.

In [0]:
    spark.readStream
         .format("cloudFiles")
         .option("cloudFiles.format", <source_format>) #Expressão que define o formato de dados
         .option("cloudFiles.schemaEvolutionMode", <mode>) #Expressão que define o modo de evolução do esquema
         .load("/path/to/files")

### Modos de evolução de esquema

| Modo | Comportamento ao ler uma nova coluna |
| :--- | :--- |
| `addNewColumns` (padrão) | O fluxo falha. Novas colunas são adicionadas ao esquema. |
| `rescue` | O esquema nunca evolui e o fluxo não falha devido a mudanças de esquema. Todas as novas colunas são registradas na coluna de dados recuperados (`rescued data column`). |
| `failOnNewColumns` | O fluxo falha. O fluxo não reinicia a menos que o esquema fornecido seja atualizado ou o arquivo de dados problemático seja removido. |
| `none` | Não evolui o esquema, novas colunas são ignoradas e os dados não são recuperados. O fluxo não falha devido a mudanças de esquema. |


O modo padrão é o addNewColumns, portanto, quando o Auto Loader detecta uma nova coluna, o fluxo (stream) é interrompido com uma exceção do tipo UnknownFieldException. Antes que o seu fluxo lance esse erro, o Auto Loader atualiza o local do esquema com a versão mais recente, mesclando as novas colunas ao final do esquema. A próxima execução do fluxo ocorrerá com sucesso utilizando o esquema atualizado.

É importante observar que o modo addNewColumns é o padrão quando um esquema não é fornecido, mas o modo none torna-se o padrão quando você fornece um esquema manualmente. Além disso, o uso de addNewColumns não é permitido quando o esquema do fluxo já foi explicitamente definido.