In [None]:
import os
import re
import pyarrow as pa
import pyarrow.parquet as pq
from multiprocessing import Pool
import datetime
import pandas as pd

# Define the path for processed log files
processed_folder_path = '/content/July31'
os.makedirs(processed_folder_path, exist_ok=True)

# Define the path for input log files
logs_folder_path = '/content/July31_log_files/July31'

# Define the path to store Parquet files
parquet_output_path = '/content/Latest_ProcessedFiles'
os.makedirs(parquet_output_path, exist_ok=True)

# Define a regular expression pattern to extract data from log records
log_pattern = r'(\d{4}-\d{2}-\d{2}) (\d{2}:\d{2}:\d{2},\d{3}) (-\d{4}) - \[(\w+)::(\w+)\] - (.+)'

def process_log_file(file_path):
    # Function to process a single log file and extract relevant information
    with open(file_path, 'r', encoding='ISO-8859-1') as log_file:
        file_name = os.path.basename(file_path)
        gu_id = file_name.split('_')[0]
        rows = []
        current_log = []  # To store lines of the current log entry
        for line in log_file:
            match = re.match(log_pattern, line)
            if match:
                # If a new log entry begins, concatenate and append the previous log entry
                if current_log:
                    log_entry = ' '.join(current_log)
                    date, time, code, message_type, message_name, message = match.groups()
                    rows.append([date, time, code, message_type, message_name, log_entry, gu_id])
                    current_log = []  # Reset the current log
                date, time, code, message_type, message_name, message = match.groups()
                rows.append([date, time, code, message_type, message_name, message, gu_id])
            else:
                # If the line doesn't match the pattern, append it to the previous row's message column with a space
                if rows and current_log:
                    current_log.append(line.strip())
                else:
                    current_log = [line.strip()]  # Start a new log entry
        return rows

log_files = [os.path.join(logs_folder_path, file_name) for file_name in os.listdir(logs_folder_path) if os.path.isfile(os.path.join(logs_folder_path, file_name))]
total_log_files = len(log_files)
unique_records_per_run = 1  # Number of log files per batch

processed_count = 0
processed_data = []
batch_number = 50  # Initialize the batch number
processed_file_names = []  # Initialize a list to store processed log file names

with Pool() as pool:
    for file_path in log_files:
        if processed_count >= total_log_files:
            break

        if not os.path.exists(os.path.join(processed_folder_path, os.path.basename(file_path))):
            try:
                results = pool.map(process_log_file, [file_path])
                all_results = [item for sublist in results for item in sublist]
                processed_data.extend(all_results)
                processed_count += 1
                os.rename(file_path, os.path.join(processed_folder_path, os.path.basename(file_path)))

                if processed_count % unique_records_per_run == 0 or processed_count == total_log_files:
                    # Construct the output Parquet file name with the batch number
                    output_parquet_file = os.path.join(parquet_output_path, f'July31_Batch_{batch_number}.parquet')

                    # Write the Arrow Table to the Parquet file
                    df = pd.DataFrame(processed_data, columns=['Date', 'Time', 'Code', 'Message Type', 'Message Name', 'Message', 'GU_ID'])
                    table = pa.Table.from_pandas(df)
                    pq.write_table(table, output_parquet_file)

                    # Append the processed log file names to the list
                    processed_file_names.append(os.path.basename(file_path))

                    print(f"Processed {unique_records_per_run} unique log files. Saved as {output_parquet_file}")

                    # Clear the processed data and increment the batch number
                    processed_data = []
                    batch_number += 1

            except Exception as e:
                print(f"Error processing file {file_path}: {str(e)}")

# After processing all log files, print the list of processed log file names
print(f"Processed log files: {processed_file_names}")
print(f"Total processed log files: {processed_count}")


Processed log files: []
Total processed log files: 0
