In [1]:
import os
import pandas as pd
import re
import json
from datetime import datetime
import chardet
import textract
import pyarrow
import random
from dotenv import load_dotenv, find_dotenv


In [36]:
_ = load_dotenv(find_dotenv(), override=True) # read local .env file
TEXT_FILES_PATH = os.getenv('TEXT_FILES_PATH')  # Get the value of the API_KEY variable
BIRTH_DATE = os.getenv('BIRTH_DATE')
AUTHOR_NAME = os.getenv('AUTHOR_NAME')

In [3]:
def is_binary_file(file_path):
    try:
        with open(file_path, 'rb') as file:
            while True:
                chunk = file.read(1024)
                if not chunk:
                    break
                
                # Check for null bytes in the chunk
                if b'\x00' in chunk:
                    return True
                
                # Check for non-printable characters in the chunk
                if any(byte < 32 and byte not in (9, 10, 13) for byte in chunk):
                    return True
    except Exception as e:
        print(f"An error occurred while checking the file: {e}")
    
    return False

In [26]:
#empty list to store the data
data=[]
# Define the comparison date
comparison_date = datetime.strptime(BIRTH_DATE, '%Y-%m-%d')

# Walk through the directory containing text files

for root, dirs, files in os.walk(TEXT_FILES_PATH):
    for file in files:
        if file=='.DS_Store':
            continue
        # Get the date and year of the file
        file_path = os.path.join(root, file)
        file_stat = os.stat(file_path)
        file_timestamp = datetime.fromtimestamp(file_stat.st_mtime)
        file_date = datetime.fromtimestamp(file_stat.st_mtime).strftime('%Y-%m-%d')
        file_year = datetime.fromtimestamp(file_stat.st_mtime).strftime('%Y')

        # Check if the filename follows the DDMMYY format
        if len(file) == 10 and file[:6].isdigit():
            try:
                # Extract the date from the filename
                file_day = int(file[:2])
                file_month = int(file[2:4])
                file_year = 1900 + int(file[4:6])
                
                # Create a datetime object from the extracted date
                file_date = datetime(file_year, file_month, file_day)
                file_timestamp = datetime.fromtimestamp(file_date.timestamp())
                file_date = file_date.strftime('%Y-%m-%d')
                file_year = file_timestamp.strftime('%Y')

            except ValueError:
                print(f"File: {file}, Invalid Date Format")

        # Calculate the difference in years
        year_difference = file_timestamp.year - comparison_date.year - ((file_timestamp.month, file_timestamp.day) < (comparison_date.month, comparison_date.day))

        # Split the file name to get the extension (if any)
        _, file_extension = os.path.splitext(file)
        file_extension=file_extension.lower()
        if not is_binary_file(file_path) and (file_extension==".txt" or not file_extension):
            #get the encoding
            with open(file_path, 'rb') as f:
                result = chardet.detect(f.read())
                if (result['encoding']=='Windows-1252' and not file_extension):
                    encoding = 'MacRoman'
                else:
                    encoding = result['encoding']
            # Read the content of the file
            with open(file_path, 'r', encoding=encoding, errors='ignore') as f:
                content = f.read()
        if file_extension==".doc" or file_extension==".docx":
            #maybe a doc file
            if file_extension==".doc":
                encoding = 'iso-8859-1'
            else:
                encoding = 'utf-8'
            try:
                content = textract.process(file_path, input_encoding=encoding).decode('utf-8')

#                content = textract.process(file_path)
            except Exception as e:
                print(f"An error occurred: {file_path} {e}")
                    # Append the data to the list

        word_count = len(content.split())  
        
        data.append([file_path, file_date, file_year, year_difference, content, word_count])


In [None]:
# Create a Pandas DataFrame
df = pd.DataFrame(data, columns=['source', 'date', 'year', 'age', 'text', 'word_count'])
print(df.shape)
# Sample 20 rows to inspect the contents
df.sample(20)


In [29]:
# Save the DataFrame as a Parquet file
df.to_parquet('../data/text_files_data.parquet')

In [37]:
# Create a tuning file
tuning_entries = []

def sanitize_text(text):
    rtn = text
    clean = re.compile('<.*?>')
    rtn = re.sub(clean, '', text)
    # Escape double quotes within the JSON
    rtn = rtn.replace('"', '\\"')
    return rtn

def get_word_count_classification(word_count):
    rtn="corta"
    if word_count<=100:
        rtn="corta"
    elif word_count<=500:
        rtn="media"
    else:
        rtn="larga"
    return rtn


for index, row in df.iterrows():
    age = row['age']
    text = row['text']
    wc_class = get_word_count_classification(row['word_count'])
    # Apply the function to the 'Content' column
    text = sanitize_text(text)
    prompt = f'Escribe una entrada {wc_class} de diario por {AUTHOR_NAME} cuando tenía {age} años'
    entry = {'prompt': prompt, 'completion': text} #generic instruction format
    #entry = {"text":f"<s>[INST]{prompt}[/INST] {text}</s>"} #mistral expected format
    tuning_entries.append(entry)

# Save the tuning file to a text file
with open('../data/tuning_entries_all.jsonl', 'w', encoding='utf-8') as file:
    for entry in tuning_entries:
        file.write(json.dumps(entry) + '\n')

print("Tuning file created successfully.")

Tuning file created successfully.


In [38]:
# Define the ratio of data to use for validation (e.g., 20% for validation)
validation_ratio = 0.2

# Calculate the number of validation samples based on the ratio
num_validation_samples = int(len(tuning_entries) * validation_ratio)

# Randomly shuffle the data
random.shuffle(tuning_entries)

# Split the data into training and validation sets
training_data = tuning_entries[num_validation_samples:]
validation_data = tuning_entries[:num_validation_samples]

# Save the training and validation datasets to separate files
with open('../data/tuning_training_entries.jsonl', 'w', encoding='utf-8') as train_file:
    for item in training_data:
        train_file.write(json.dumps(item) + '\n')

with open('../data/tuning_validation_entries.jsonl', 'w', encoding='utf-8') as valid_file:
    for item in validation_data:
        valid_file.write(json.dumps(item) + '\n')

print(f"Split {len(training_data)} samples for training and {len(validation_data)} samples for validation.")

Split 2143 samples for training and 535 samples for validation.
