In [None]:
pip install pyspark

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, when, col
from pyspark.sql.functions import count as _count
from pyspark.sql.functions import min as _min
from datetime import datetime
import psutil
import multiprocessing
import findspark

In [None]:
total_memory = psutil.virtual_memory().total / (1024**3)
memory_alocated = round(total_memory / 2)
num_cores = multiprocessing.cpu_count()
print(f'{total_memory} disponível. {memory_alocated} será alocada nesse processo. {num_cores} núcleos do processador serão envolvidos no processo')
spark = ""

findspark.init()
spark = SparkSession.builder.master("local[*]") \
    .appName("source_archive") \
    .config("spark.executor.memory", f"{memory_alocated}g") \
    .config("spark.executor.instances", f"{num_cores}") \
    .getOrCreate()

In [None]:
import os
import csv
import pandas as pd
import chardet

root_path = '/content/drive/MyDrive/HDI_DR/Arquivos_Source_Individuais/input_caracteres'
output_path = '/content/drive/MyDrive/HDI_DR/Arquivos_Source_Individuais/output_caracteres'

csv_files = []

parquet_files = []

data = []

total_columns = 0

providers = ['Agger', 'Infocap', 'Quiver', 'Segfy']

columns = ['Provider', 'ArchiveName', 'Headers', 'Cols','CountColumns']

def valid_names(provider, sep):
    for folder in os.listdir(root_path):
    # Para cada diretório no caminho 'root_path'
        if folder.startswith(provider):
        # Se o início do nome da pasta corresponder com os nomes da lista 'providers'  
            files_list = os.listdir(os.path.join(root_path, folder))
            # file_list será a junção do caminho 'root_path' com a o nome da pasta 'folder'
            for archive in files_list:
            # Lista os arquivos da lista 'files_list'    
                if archive.endswith('.csv'):
                # Se o arquivo começa com '.csv'    
                    csv_path = os.path.join(root_path, folder, archive)
                    # 'csv_path' será a junção do caminho 'root_path', da pasta 'folder', e do arquivo 'file'.
                    print(f'Caminho: {csv_path} \n')
                    df = spark.read.csv(csv_path, sep=sep, encoding='utf-8', header=True)
                    # Lê o arquivo CSV usando Spark e especifica o separador e a codificação
                    for column in df.columns:
                    # Para cada coluna do DataFrame:
                        df = df.withColumn(f'{column}_caract', length(df[column]))
                        # Adiciona uma nova coluna com o sufixo '_caract' que contém o tamanho de cada valor da coluna original

                elif archive.endswith('.parquet'):
                # Se o arquivo começa com .parquet
                    parquet_path = os.path.join(root_path, folder, archive)
                    # 'parquet_path' será a junção do caminho 'root_path', da pasta 'folder', e do arquivo 'file'.
                    print(f'Caminho: {parquet_path} \n')
                    
                    df = spark.read.parquet(parquet_path)
                    # Lê o arquivo PARQUET usando Spark e especifica o separador e a codificação
                    for column in df.columns:
                    # Para cada coluna do DataFrame: 
                        df = df.withColumn(f'{column}_caract', length(df[column]))
                        # Adiciona uma nova coluna com o sufixo '_caract' que contém o tamanho de cada valor da coluna original

                print(df)
                colunas_selecionadas = [col for col in df.columns if col.endswith('_caract')]
                # Seleciona apenas as colunas que terminam com '_caract'
                df = df.select(*colunas_selecionadas)
                # o DataFrame df é atualizado para conter apenas as colunas especificadas em colunas_selecionadas

                temp_paths = []
                # Cria uma lista de caminhos temporários

                now = datetime.now().strftime("%d.%m.%Y_%H.%M.%S")
                # A variável 'now' obtém a data e horário do sistema
                temp_save_path = os.path.join(output_path, f"_temp_{now}")
                # Concatena o caminho do diretório de saída (output_path) com o nome do arquivo temporário. 
                temp_paths.append(temp_save_path)
                # Adiciona o diretório temporário à lista 'temp_paths'

                print('Salvando o novo prepared...')
                # Salva o DataFrame resultante como um arquivo CSV em um diretório temporário
                df.coalesce(1).write.format("csv").option("header", True).save(temp_save_path)
                # O DataFrame df é modificado usando o método coalesce(1) que é utilizado para reduzir o número de partições do DataFrame e cria um arquivo CSV no caminho de salvamento temporário especificado.
                temp_paths.append(temp_save_path)
                # Adiciona o diretório temporário à lista 'temp_paths'
                print(temp_paths)

                temp_saved_paths = []

                for path in temp_paths:
                # Para cada pasta em 'temp_paths':
                    now = datetime.now().strftime("%d.%m.%Y_%H.%M.%S")
                    # A variável 'now' obtém a data e horário do sistema
                    files = [file for file in os.listdir(path) if file.endswith('.csv')]
                    # Seleciona apenas os arquivos que terminam com '.csv'
                    for i, file in enumerate(files):
                    # Percorre cada arquivo na lista files e acessa tanto o índice quanto o valor do arquivo.
                        temp_file_path = os.path.join(path, file)
                        # Obtém todos os arquivos CSV no diretório temporário
                        new_file_name = f"{provider}_count_{archive}.csv"
                        # Cria o novo nome do arquivo concatenando as variáveis provider, archive.
                        print(f'Este é o: {new_file_name}')
                        temp_saved_path = os.path.join(output_path, new_file_name)
                        # Concatena o caminho do diretório de saída (output_path) com o nome do arquivo de destino (new_file_name). O resultado é o caminho completo para o arquivo com o novo nome.
                        os.rename(temp_file_path, temp_saved_path)
                        # Renomeia o arquivo temporário com o novo nome e move para o diretório de saída
                        temp_saved_paths.append(temp_saved_path)
                        # Adiciona o caminho completo do arquivo salvo à lista temp_saved_paths.


In [None]:
valid_names('Agger',',')
# Chama a função e especifica os parâmentros 'fornecedor', e 'separador'

Essas são as principais ações realizadas pelo código:

As bibliotecas necessárias, como os, csv e pandas, são importadas.

Variáveis como root_path, output_path, csv_files, parquet_files, data, total_columns, providers e columns são definidas.

A função valid_names(provider, sep) é definida.

O código itera sobre os diretórios no caminho root_path usando os.listdir(root_path).

Verifica se o nome do diretório começa com um dos nomes na lista providers usando folder.startswith(provider).

Dentro do diretório, itera sobre os arquivos no diretório usando os.listdir(os.path.join(root_path, folder)).

Se o arquivo tiver a extensão .csv, lê o arquivo usando Spark e adiciona uma nova coluna para cada coluna existente com o sufixo _caract, que contém o tamanho de cada valor da coluna original.

Se o arquivo tiver a extensão .parquet, lê o arquivo usando Spark e adiciona uma nova coluna para cada coluna existente com o sufixo _caract, que contém o tamanho de cada valor da coluna original.

Seleciona apenas as colunas que terminam com _caract.

Cria um diretório temporário usando a data e hora atual.

Salva o DataFrame resultante como um arquivo CSV no diretório temporário.

Renomeia os arquivos CSV no diretório temporário com um novo nome e move-os para o diretório de saída.

In [None]:
import os
import pandas as pd
import numpy as np

root_path = r'/content/drive/MyDrive/HDI_DR/Arquivos_Source_Individuais/output_caracteres'

df_list = []

for file in os.listdir(root_path):
# Itera sobre os arquivos no diretório root_path
    provider = file.split('_')[0]
    # Extrai o nome do fornecedor a partir do nome do arquivo, fatiando o nome em partes separadas pelo caractere "_", e selecionando a primeira ocorrência encontrada.
    print(provider)
    df = pd.read_csv(os.path.join(root_path, file))
    # Combina o caminho da pasta principal (root_path) com o nome do arquivo (file), formando o caminho completo para o arquivo CSV e armazena no DataFrame 'df'.
    df['provider'] = provider
    # Adiciona uma coluna 'provider' com o nome do fornecedor.
    df_list.append(df)

for df in df_list:
# Itera sobre os DataFrames na lista df_list
    fornecedor = str(df['provider'].iloc[0])
    # Obtém o valor da primeira linha dessa coluna.
    print(f'Este é o fornecedor: {fornecedor} \n')
    conteudo_df = []
    for col in df.columns:
    # Itera sobre as colunas do DataFrame
        if pd.api.types.is_numeric_dtype(df[col]):
        # Verifica se a coluna contém valores numéricos
            valores_numericos = pd.to_numeric(df[col], errors='coerce')
            # Converte os valores para tipo numérico.
            # O parâmetro errors='coerce' é utilizado para lidar com valores não numéricos. Caso ocorram valores não numéricos na coluna, eles serão convertidos para NaN (valor ausente). 
            linha = (fornecedor, col, round(df[col].mean(),0), round(df[col].max(),0))
            # Cria uma tupla contendo informações estatísticas para cada coluna do DataFrame df.
            # Em seguida, aplica round() para arredondar o resultado para zero casas decimais.
            conteudo_df.append(linha)
            # Adiciona a tupla linha à lista conteudo_df. 

df_final = pd.DataFrame(conteudo_df, columns=['provider','variavel','media_caract','max_caract'])
# Cria um novo DataFrame com as informações coletadas

df_final.to_csv(rf'/content/drive/MyDrive/HDI_DR/Arquivos_Source_Individuais/dicionario_colunas/{file}_numeros.csv', index=False) 
# Salva o DataFrame como um arquivo CSV


Aqui está um resumo das principais ações realizadas pelo código:

O código itera sobre os arquivos no diretório especificado em root_path.

O nome do fornecedor é extraído do nome do arquivo usando o método split e armazenado na variável provider.

O arquivo CSV é lido usando o Pandas e armazenado em um DataFrame chamado df.

Uma coluna chamada 'provider' é adicionada ao DataFrame df com o nome do fornecedor.

O DataFrame df é adicionado à lista df_list.

O código itera sobre os DataFrames na lista df_list.

O nome do fornecedor é extraído do DataFrame.

O código cria uma lista vazia chamada conteudo_df para armazenar as informações sobre as colunas.

O código itera sobre as colunas do DataFrame.

Verifica se a coluna contém valores numéricos usando a função is_numeric_dtype do Pandas.

Se a coluna contiver valores numéricos, os valores são convertidos para tipo numérico usando a função to_numeric do Pandas.

O código cria uma linha com informações sobre a coluna, como o fornecedor, nome da variável, média dos valores e máximo dos valores.

A linha é adicionada à lista conteudo_df.

O código cria um novo DataFrame chamado df_final com base nas informações coletadas.

O DataFrame df_final é salvo como um arquivo CSV em um diretório específico.

In [None]:
# Empilhar os arquivos


root_path = r'/content/drive/MyDrive/HDI_DR/Arquivos_Source_Individuais/dicionario_colunas/'
# Caminho para os arquivos

df_list = []
# Lista vazia para armazenar os DataFrames

columns = ['provider','variavel','media_caract','max_caract']
# Nomes das colunas do DataFrame final

for file in os.listdir(root_path):
# Itera sobre os arquivo do diretório 'root_path'
  file_path = os.path.join(root_path, file)
  # file_path será a junção do caminho 'root_path' com o nome do arquivo 'file'
  if file_path.endswith('.csv'):
  # Se o arquivo começa com '.csv'    
    df = pd.read_csv(file_path)
    # Faz a leitura dos arquivos e armazena na variável 'df'
    rows = df.iloc[1:]
    # Pega apenas da segunda linha em dia, não pega o cabeçalho das colunas.
    print(rows)
    df_list.append(df)
    # Adiciona as linhas à lista 'df_list'

combined_df = pd.concat(df_list)
# Concatena os DataFrames individuais da lista 'df_list' e atribui o resultado à variável 'combined_df'.
combined_df.columns = columns
# Especifica que as colunas do DataFrame terão os nomes que constam na lista 'columns'

print(combined_df)


combined_df.to_csv(r'/content/drive/MyDrive/HDI_DR/Arquivos_Source_Individuais/dicionario_colunas/amostra_final.csv',index=False, sep=',', encoding='utf-8',header=True)
# Exporta o arquivo para csv.