#União arquivos particionados

Os arquivos foram particionados no disco local e foi feito upload de forma manual via interface gráfica do Databricks. Provavelmente essa não era a abordagem mais eficaz de subir o arquivo original no ambiente, mas serviu como aprendizado

##Abordagem 1

Essa não foi a primeira abordagem. A primeira de fato demorou muito e não avançou quase nada, por isso foi descartada.

Como o primeiro código (que não foi apresentado) estava demorando demais, comecei a buscar outras formas de fazer a união e pensei em diminuir a quantidade de partições (por se tratar de um cluster) do arquivo para tentar ser mais rápido. De fato o resultado foi melhor, porém por um erro no particionamento feito no disco local tive que adaptar a leitura das partes e consequentemente a união.

No disco local eu simplesmente quebrei o arquivo original em partes levando em conta o número de linhas, porém o cabeçalho estava presente apenas na parte 1. Isso gerou alguns problemas na união, que foram corrigidos na Abordagem 2.

In [0]:
import pyspark.dbutils
import pyspark.pandas as ps
import datetime

In [0]:
path = "dbfs:/FileStore/"

files = dbutils.fs.ls(path)

df_res = ps.DataFrame()

In [0]:
from pyspark.sql import SparkSession
from functools import reduce
import os

In [0]:

# Inicializa a sessão do Spark
spark = SparkSession.builder.appName("União de Arquivos").getOrCreate()

# Caminho do diretório onde os arquivos divididos estão localizados
diretorio_entrada = "dbfs:/FileStore/"

# Obtém a lista de arquivos no diretório de entrada
arquivos = [arquivo.path for arquivo in dbutils.fs.ls(diretorio_entrada) if arquivo.path.endswith(".csv")]

# Ordena a lista de arquivos com base no número da parte
arquivos_ordenados = sorted(arquivos, key=lambda x: int(x.split("_")[-1].split(".")[0]))

# Lista para armazenar os DataFrames
dataframes = []

# Lê cada arquivo CSV e cria um DataFrame Spark para cada um
for arquivo in arquivos_ordenados:
    df = spark.read.csv(arquivo, header=True, inferSchema=True, sep=';')
    dataframes.append(df)

print("fim da leitura")

# Reduz o número de partições antes de unir os DataFrames
num_partitions = 10  # Defina um número adequado de partições
dataframes_coalesced = [df.coalesce(num_partitions) for df in dataframes]

print("fim da Coalesced")



fim da leitura
fim da Coalesced


In [0]:
# Concatena os DataFrames em um único DataFrame
df_consolidado = reduce(lambda x, y: x.union(y), dataframes_coalesced)

print("fim da lambda")

# Exibe o DataFrame consolidado
#df_consolidado.show()

# Salva o DataFrame consolidado em formato CSV
caminho_saida_csv = "dbfs:/dbfs/FileStore/tables/moduloVI/consolidado.csv"
df_consolidado.write.csv(caminho_saida_csv, header=True, mode="overwrite")

# Salva o DataFrame consolidado em formato Parquet
caminho_saida_parquet = "dbfs:/dbfs/FileStore/tables/moduloVI/consolidado.parquet"
df_consolidado.write.parquet(caminho_saida_parquet, mode="overwrite")

# Encerra a sessão do Spark
spark.stop()


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-1660463438321834>:2[0m
[1;32m      1[0m [38;5;66;03m# Concatena os DataFrames em um único DataFrame[39;00m
[0;32m----> 2[0m df_consolidado [38;5;241m=[39m reduce([38;5;28;01mlambda[39;00m x, y: x[38;5;241m.[39munion(y), dataframes_coalesced)
[1;32m      4[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124mfim da lambda[39m[38;5;124m"[39m)
[1;32m      6[0m [38;5;66;03m# Exibe o DataFrame consolidado[39;00m
[1;32m      7[0m [38;5;66;03m#df_consolidado.show()[39;00m
[1;32m      8[0m 
[1;32m      9[0m [38;5;66;03m# Salva o DataFrame consolidado em formato CSV[39;00m

File [0;32m<command-1660463438321834>:2[0m, in [0;36m<lambda>[0;34m(x, y)[0m
[1;32m      1[0m [38;5;66;03m# Concatena os DataFrames em um único DataFrame[39;00m
[0;32m----> 2[0m df_consolidad

##Abordagem 2

Nessa abordagem a grande diferença foi a leitura separada da primeira parte, que tinha o cabeçalho, para as outras partes, que não tinham.

In [0]:
from pyspark.sql import SparkSession
from functools import reduce
from pyspark.sql.types import StructType, StringType  # Importa os tipos de dados

# Inicializa a sessão do Spark
spark = SparkSession.builder.appName("União de Arquivos").getOrCreate()

# Caminho do diretório onde os arquivos divididos estão localizados
diretorio_entrada = "dbfs:/FileStore/"

# Obtém a lista de arquivos no diretório de entrada
arquivos = [arquivo.path for arquivo in dbutils.fs.ls(diretorio_entrada) if arquivo.path.endswith(".csv")]

# Ordena a lista de arquivos com base no número da parte
arquivos_ordenados = sorted(arquivos, key=lambda x: int(x.split("_")[-1].split(".")[0]))

# Lista para armazenar os DataFrames
dataframes = []

# Lê o primeiro arquivo para obter o cabeçalho (esquema)
primeiro_arquivo = arquivos_ordenados[0]
primeiro_df = spark.read.option("header", "true").option("delimiter", ";").csv(primeiro_arquivo)
esquema = primeiro_df.schema

# Lê os outros arquivos usando o mesmo esquema do primeiro arquivo
for arquivo in arquivos_ordenados:
    if arquivo != primeiro_arquivo:
        df = spark.read.option("header", "false").option("delimiter", ";").schema(esquema).csv(arquivo)
        dataframes.append(df)




In [0]:
# Concatena os DataFrames em um único DataFrame
df_consolidado = reduce(lambda x, y: x.union(y), dataframes)

# Salva o DataFrame consolidado em formato CSV
caminho_saida_csv = "dbfs:/dbfs/FileStore/tables/moduloVI/consolidado.csv"  # Substitua pelo seu caminho de saída CSV
df_consolidado.write.csv(caminho_saida_csv, header=True, mode="overwrite")

# Salva o DataFrame consolidado em formato Parquet
caminho_saida_parquet = "dbfs:/dbfs/FileStore/tables/moduloVI/consolidado.parquet"  # Substitua pelo seu caminho de saída Parquet
df_consolidado.write.parquet(caminho_saida_parquet, mode="overwrite")

# Encerra a sessão do Spark
spark.stop()