In [1]:
##Primary Install

!sudo apt update -y

!sudo apt install default-jdk -y

!sudo apt install build-essential -y

!pip install ollama-python psutil datasets langdetect huggingface-hub sentence-transformers transformers pandas h5py tqdm pyarrow IProgress pyspark scikit-learn hf-transfer pyyaml numpy matplotlib==3.7.3 fasttext torch cupy-cuda12x seaborn umap-learn



In [1]:
import gc
import json
import os
import re
import socket
import sys
import time
from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
import multiprocessing
from multiprocessing import Pool, Manager, Process, Value
from multiprocessing.pool import ThreadPool
import fasttext
import h5py
import matplotlib.pyplot as plt
import numpy as np
import ollama
import pandas as pd
import psutil
import pyarrow as pa
import pyarrow.parquet as pq
import requests
import seaborn as sns
import torch
import torch.nn.functional as F
import yaml
from datasets import load_dataset, DatasetDict
from langdetect import detect, LangDetectException
from matplotlib import cm, colors
from ollama import Client
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, to_json, monotonically_increasing_id, rand, row_number, hash, pandas_udf, udf
from pyspark.sql.types import ArrayType, FloatType, StructType
from pyspark.sql.window import Window
from scipy.cluster.hierarchy import dendrogram, linkage
from scipy.sparse import lil_matrix, save_npz, csr_matrix, load_npz
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.preprocessing import normalize
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModel, AutoConfig
from umap import UMAP


class Logger(object):
    def __init__(self, filename="logfile.log"):
        self.terminal = sys.stdout
        self.log = open(filename, "a")

    def write(self, message):
        self.terminal.write(message)
        self.log.write(message)
        self.flush()

    def flush(self):
        self.terminal.flush()
        self.log.flush()

sys.stdout = Logger("output.log")
sys.stderr = Logger("output.log")

def load_config(file_path):
    """Load a YAML configuration file."""
    with open(file_path, 'r') as file:
        return yaml.safe_load(file)

def create_default_configs(config_dir):
    """Create default configuration files if they do not exist."""
    os.makedirs(config_dir, exist_ok=True)
    
    main_config_path = os.path.join(config_dir, 'main_config.yml')
    user_config_path = os.path.join(config_dir, 'user_config.yml')
    dataset_config_path = os.path.join(config_dir, 'dataset_config.yml')
    phrase_lists_path = os.path.join(config_dir, 'phrase_lists.yml')
    
    default_main = {
        'chunk_size': 10000,
        'core_count': 'max(2, multiprocessing.cpu_count() - 4)',
        'processed_chunks_dir': './processed_chunks',
        'script_version': 'DMKv3.5',
        'rewrite_sys_prompt': False,
        'sys_prompt_min_token_length': 50
    }
    default_user = {
        'Dataset_Name': 'Default_Dataset',
        'User': 'Default_User'
    }
    default_dataset = {
        'key_name': 'conversations',
        'acceptable_languages': ['en'],
        'filtered_data': ['function-call', 'function-response', 'assistant'],
        'tokenizer_used': 'NousResearch/Meta-Llama-3-70B-Instruct',
        'datasets': [
            {
                'name': 'default_dataset_name',
                'use_percentage': 100,
                'phrase_list_key': 1
            }
        ]
    }
    default_phrases = {
        'phrase_lists': {
            1: [
                "example phrase 1",
                "example phrase 2"
            ]
        }
    }
    
    config_files = {
        main_config_path: default_main,
        user_config_path: default_user,
        dataset_config_path: default_dataset,
        phrase_lists_path: default_phrases
    }
    
    for path, default_config in config_files.items():
        if not os.path.exists(path):
            with open(path, 'w') as file:
                yaml.safe_dump(default_config, file)
    
    print("INFO: Checked and/or created default configuration files in", config_dir)

def update_config_file(config_path, new_options):
    if os.path.exists(config_path):
        with open(config_path, 'r') as file:
            config = yaml.safe_load(file)
    else:
        config = {}
    
    updated = False
    for key, value in new_options.items():
        if key not in config:
            config[key] = value
            updated = True
    
    if updated:
        with open(config_path, 'w') as file:
            yaml.safe_dump(config, file)
        print(f"Updated configuration file: {config_path}")
    
    return config

config_directory = './config_files'
create_default_configs(config_directory)

main_config_path = os.path.join(config_directory, 'main_config.yml')
new_options = {
    'rewrite_sys_prompt': False,
    'sys_prompt_min_token_length': 50
}
main_config = update_config_file(main_config_path, new_options)

user_config = load_config(os.path.join(config_directory, 'user_config.yml'))
dataset_config = load_config(os.path.join(config_directory, 'dataset_config.yml'))
phrase_lists_config = load_config(os.path.join(config_directory, 'phrase_lists.yml'))

key_name = dataset_config['key_name']
chunk_size = main_config['chunk_size']
core = main_config['core_count']
processed_chunks_dir = main_config['processed_chunks_dir']
script_version = main_config['script_version']
User = user_config['User']
Dataset_Name = user_config['Dataset_Name']
rewrite_sys_prompt = main_config['rewrite_sys_prompt']
sys_prompt_min_token_length = main_config['sys_prompt_min_token_length']

core_count = max(2, multiprocessing.cpu_count() - 4)

datasets = dataset_config['datasets']
filtered_data = dataset_config['filtered_data']
phrase_lists = phrase_lists_config['phrase_lists']
acceptable_languages = dataset_config['acceptable_languages']
tokenizer_used = dataset_config['tokenizer_used']

os.makedirs(processed_chunks_dir, exist_ok=True)

def ensure_model_downloaded(model_path, download_url):
    os.makedirs(os.path.dirname(model_path), exist_ok=True)
    if not os.path.isfile(model_path):
        print(f"Model not found at {model_path}. Downloading from {download_url}...")
        response = requests.get(download_url)
        with open(model_path, 'wb') as f:
            f.write(response.content)
        print("Download complete.")

model_path = './model/lid.176.bin'
download_url = 'https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin'
ensure_model_downloaded(model_path, download_url)

language_model = fasttext.load_model(model_path)
tokenizer = AutoTokenizer.from_pretrained(tokenizer_used)

print(f"Full_Configuration:")
print(f"\nUser Configuration:")
print(f"User: {User}")
print(f"Merged_Dataset_name: {Dataset_Name}")
print(f"\nSystem Configuration:")
print(f"  Key Name: {key_name}")
print(f"  CPU Threads Used: {core_count}")
print(f"  Chunk Size: {chunk_size}")
print(f"  Processed Chunks Directory: {processed_chunks_dir}")
print(f"  Script Version: {script_version}")
print(f"  Rewrite System Prompt: {rewrite_sys_prompt}")
print(f"  System Prompt Minimum Token Length: {sys_prompt_min_token_length}")

def format_dataset_info(datasets):
    info = "Configured Datasets:\n"
    for dataset in datasets:
        info += f"  - Name: {dataset['name']}\n"
        info += f"    Use Percentage: {dataset['use_percentage']}%\n"
        info += f"    Phrase List Key: {dataset['phrase_list_key']}\n"
    return info

print(format_dataset_info(datasets))
print(f"\nFiltered Data Types: {', '.join(filtered_data)}")
#print(f"\nTokenizer Used: {tokenizer}")
print(f"\nacceptable_languages: {acceptable_languages}")

print("\nPhrase Lists:")
for key, phrases in phrase_lists.items():
    print(f"  List {key}: {', '.join(phrases)}")

def load_mappings_from_config(dataset_config, phrase_lists_config):
    dataset_names = [ds['name'] for ds in dataset_config['datasets']]
    dataset_phrase_keys = {ds['name']: ds['phrase_list_key'] for ds in dataset_config['datasets']}
    dataset_use_percentage = {ds['name']: ds['use_percentage'] for ds in dataset_config['datasets']}
    phrase_lists = phrase_lists_config['phrase_lists']
    filtered_data = dataset_config['filtered_data']
    acceptable_languages = dataset_config['acceptable_languages']
    return dataset_names, phrase_lists, dataset_phrase_keys, filtered_data, dataset_use_percentage, acceptable_languages

dataset_names, phrase_lists, dataset_phrase_keys, filtered_data, dataset_use_percentage, acceptable_languages = load_mappings_from_config(dataset_config, phrase_lists_config)

  from .autonotebook import tqdm as notebook_tqdm


INFO: Checked and/or created default configuration files in ./config_files
Full_Configuration:

User Configuration:
User: TheSkullery
Merged_Dataset_name: Aether-Lite-v2

System Configuration:
  Key Name: conversations
  CPU Threads Used: 20
  Chunk Size: 10000
  Processed Chunks Directory: ./processed_chunks
  Script Version: DMKv3
  Rewrite System Prompt: True
  System Prompt Minimum Token Length: 50
Configured Datasets:
  - Name: jondurbin/airoboros-3.2
    Use Percentage: 100%
    Phrase List Key: 1
  - Name: LLM-Experiments/Steel-Med
    Use Percentage: 100%
    Phrase List Key: 1
  - Name: mpasila/no-robots-sharegpt-edit
    Use Percentage: 100%
    Phrase List Key: 1
  - Name: nothingiisreal/Kalomaze-Opus-Instruct-25k-filtered
    Use Percentage: 100%
    Phrase List Key: 1
  - Name: mrfakename/Pure-Dove-ShareGPT
    Use Percentage: 100%
    Phrase List Key: 1
  - Name: Undi95/Capybara-ShareGPT
    Use Percentage: 100%
    Phrase List Key: 1
  - Name: PJMixers/grimulkan_theory-o

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/02 22:44:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


🌟 Initializing merging and sampling process...

📂 Loading and sampling dataset jondurbin/airoboros-3.2 with file pattern: processed_chunks/jondurbin_airoboros-3.2_chunk_*.parquet
🔀 Shuffling dataset to randomize row order...


24/08/02 22:44:58 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


🔢 Initial row count for jondurbin/airoboros-3.2: 53310
📉 Row count after sampling jondurbin/airoboros-3.2: 53310

📂 Loading and sampling dataset LLM-Experiments/Steel-Med with file pattern: processed_chunks/LLM-Experiments_Steel-Med_chunk_*.parquet
🔀 Shuffling dataset to randomize row order...
🔢 Initial row count for LLM-Experiments/Steel-Med: 77827
📉 Row count after sampling LLM-Experiments/Steel-Med: 77827

📂 Loading and sampling dataset mpasila/no-robots-sharegpt-edit with file pattern: processed_chunks/mpasila_no-robots-sharegpt-edit_chunk_*.parquet
🔀 Shuffling dataset to randomize row order...
🔢 Initial row count for mpasila/no-robots-sharegpt-edit: 9765
📉 Row count after sampling mpasila/no-robots-sharegpt-edit: 9765

📂 Loading and sampling dataset nothingiisreal/Kalomaze-Opus-Instruct-25k-filtered with file pattern: processed_chunks/nothingiisreal_Kalomaze-Opus-Instruct-25k-filtered_chunk_*.parquet
🔀 Shuffling dataset to randomize row order...
🔢 Initial row count for nothingiisr

24/08/02 22:45:33 WARN DAGScheduler: Broadcasting large task binary with size 1513.0 KiB
24/08/02 22:45:34 WARN DAGScheduler: Broadcasting large task binary with size 1514.3 KiB
24/08/02 22:45:36 WARN DAGScheduler: Broadcasting large task binary with size 1519.2 KiB
  return torch.load(io.BytesIO(b))
  return torch.load(io.BytesIO(b))
  return torch.load(io.BytesIO(b))
  return torch.load(io.BytesIO(b))
  return torch.load(io.BytesIO(b))
  return torch.load(io.BytesIO(b))
  return torch.load(io.BytesIO(b))
  return torch.load(io.BytesIO(b))
  return torch.load(io.BytesIO(b))
  return torch.load(io.BytesIO(b))
  return torch.load(io.BytesIO(b))
  return torch.load(io.BytesIO(b))
  return torch.load(io.BytesIO(b))
  return torch.load(io.BytesIO(b))
  return torch.load(io.BytesIO(b))
  return torch.load(io.BytesIO(b))
  return torch.load(io.BytesIO(b))
  return torch.load(io.BytesIO(b))
  return torch.load(io.BytesIO(b))
  return torch.load(io.BytesIO(b))
                                 

Processing Embeddings: 100%|##########| 306309/306309 [00:00<00:00, 4493321.90it/s]
Number of embeddings: 306309
Calculating cosine similarity matrix...
Number of embeddings: 306,309, Embedding dimension: 768
Total System Memory: 125.72 GB
Available Memory (75%): 51.30 GB
Single Embedding Size: 3.00 KB
Maximum matrix size that can fit in memory: 117347 x 117347
Total number of embeddings in dataset: 306,309
Calculated chunk size: 117,347
Estimated memory usage per chunk: 51.30 GB
Creating memory-mapped file for embeddings...
Total number of chunks to process: 6
Processing chunks:   0%|          | 0/6 [00:00<?, ?it/s]Calculated sub-chunk size: 116744
Estimated memory usage per sub-chunk: 50.77 GB


In [3]:
import gc
import json
import os
import re
import socket
import sys
import time
from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
import multiprocessing
from multiprocessing import Pool, Manager, Process, Value
from multiprocessing.pool import ThreadPool
import fasttext
import h5py
import matplotlib.pyplot as plt
import numpy as np
import ollama
import pandas as pd
import psutil
import pyarrow as pa
import pyarrow.parquet as pq
import requests
import seaborn as sns
import torch
import torch.nn.functional as F
import yaml
from datasets import load_dataset, DatasetDict
from langdetect import detect, LangDetectException
from matplotlib import cm, colors
from ollama import Client
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, to_json, monotonically_increasing_id, rand, row_number, hash, pandas_udf, udf
from pyspark.sql.types import ArrayType, FloatType, StructType
from pyspark.sql.window import Window
from scipy.cluster.hierarchy import dendrogram, linkage
from scipy.sparse import lil_matrix, save_npz, csr_matrix, load_npz
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.preprocessing import normalize
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModel, AutoConfig
from umap import UMAP


max_retries = 3
retry_delay = 1
num_request_workers = 20
ollama_host = 'http://0.0.0.0:1143'
ollama_model = '[model name]'

def initialize_token_distribution(start, end, step):
    return {f"{i}-{min(i+step-1, end)}": 0 for i in range(start, end, step)}

def update_token_distribution(token_distribution, token_count):
    for key in token_distribution:
        start, end = map(int, key.split('-'))
        if start <= token_count <= end:
            token_distribution[key] += 1
            break

def create_regex_pattern(dataset_name, dataset_phrase_keys, phrase_lists):
    phrase_key = dataset_phrase_keys.get(dataset_name)
    if not phrase_key:
        return None
    phrases = phrase_lists.get(phrase_key, [])
    return re.compile('|'.join([re.escape(phrase).replace('\\ ', '\\s+') for phrase in phrases]), re.IGNORECASE)

def sanitize_text(text):
    return text.replace('\n', ' ').replace('\r', ' ').strip()

def filter_conversation(conversations, regex_pattern, filtered_data, acceptable_languages):
    for msg in conversations:
        content = sanitize_text(msg.get('value', ''))
        if msg.get("from") == "human":
            try:
                if detect(content) not in acceptable_languages:
                    return None, True
            except LangDetectException:
                pass
        if msg.get("from") in filtered_data or (regex_pattern and regex_pattern.search(content)):
            return None, True
    return conversations, False

def transform_record(record, dataset_name, script_version, tokenizer):
    try:
        final_conversations = []
        system = record.get('system', '')
        tools = record.get('tools', '')
        human_token_count = gpt_token_count = 0
        token_distribution = initialize_token_distribution(0, 32000, 16)

        for item in record.get('conversations', []):
            if item['from'] not in ['system', 'tools']:
                token_count = len(tokenizer.tokenize(item['value']))
                if item['from'] == 'human':
                    human_token_count += token_count
                    if len(final_conversations) % 2 == 0:
                        final_conversations.append(item)
                elif item['from'] == 'gpt':
                    gpt_token_count += token_count
                    if len(final_conversations) % 2 != 0:
                        final_conversations.append(item)
                update_token_distribution(token_distribution, token_count)

        return {
            'conversations': final_conversations,
            'system': system,
            'tools': tools,
            'origin': dataset_name,
            'script_version': script_version,
            'human_token_count': human_token_count,
            'gpt_token_count': gpt_token_count,
            'token_distribution': token_distribution,
        }, False
    except Exception as e:
        print(f"Error in transform_record: {e}")
        return None, True

def write_chunk_to_file(chunk_data, dataset_name, chunk_index, processed_chunks_dir):
    for record in chunk_data:
        record['token_distribution'] = {str(k): v for k, v in record['token_distribution'].items()}
    
    filename = f"{processed_chunks_dir}/{dataset_name.replace('/', '_')}_chunk_{chunk_index}.parquet"
    table = pa.Table.from_pandas(pd.DataFrame(chunk_data))
    pq.write_table(table, filename)

def request_worker(input_queue, output_dict, worker_id):
    client = Client(host=ollama_host)
    while True:
        try:
            prompt_id, prompt = input_queue.get()
            if prompt_id is None:  # Sentinel value to stop the worker
                break
            for attempt in range(max_retries):
                try:
                    response = client.generate(ollama_model, prompt)
                    output_dict[prompt_id] = response['response'].strip()
                    break
                except Exception as e:
                    if attempt == max_retries - 1:
                        print(f"Error in request after {max_retries} attempts: {e}")
                        output_dict[prompt_id] = None
                    else:
                        time.sleep(retry_delay)
        except Exception as e:
            print(f"Unexpected error in request worker {worker_id}: {e}")
        finally:
            input_queue.task_done()

def rewrite_system_prompt(conversations, tokenizer, input_queue, output_dict, prompt_id):
    context = "\n".join([f"{msg['from']}: {msg['value']}" for msg in conversations[:5]])
    prompt = f"Based on the following conversation, write a detailed system prompt that sets the context and guidelines for an AI model. This model can be used for various purposes, including role-playing (RP), erotic role-playing (ERP), intelligence tasks, general AI assistance, code generation, and more. Ensure the system prompt is comprehensive and specific to the task at hand. Only output the system prompt based on the context:\n\n{context}\n\nSystem prompt:"

    input_queue.put((prompt_id, prompt))
    
    start_time = time.time()
    while prompt_id not in output_dict and time.time() - start_time < 60:
        time.sleep(0.1)
    
    new_system_prompt = output_dict.get(prompt_id)
    if new_system_prompt and len(tokenizer.tokenize(new_system_prompt)) >= sys_prompt_min_token_length:
        return new_system_prompt
    else:
        print("Timeout or invalid result while waiting for system prompt rewrite. Using original system prompt.")
        return None

def process_chunk(args):
    dataset_name, chunk_index, regex_pattern, filtered_data, data_chunk, processed_chunks_dir, script_version, acceptable_languages, tokenizer, input_queue, output_dict = args
    results = []
    removed_count = 0
    token_distribution = initialize_token_distribution(0, 32000, 16)
    rewrites_count = 0
    
    for record in data_chunk:
        filtered_conversations, was_removed = filter_conversation(
            record['conversations'], regex_pattern, filtered_data, acceptable_languages
        )
        if was_removed:
            removed_count += 1
            continue

        transformed_record, transform_error = transform_record(record, dataset_name, script_version, tokenizer)
        if transformed_record:
            current_sys_prompt_length = len(tokenizer.tokenize(transformed_record['system'] if 'system' in transformed_record else ""))
            if rewrite_sys_prompt and current_sys_prompt_length < sys_prompt_min_token_length:
                prompt_id = f"{chunk_index}_{len(results)}"
                new_system_prompt = rewrite_system_prompt(transformed_record['conversations'], tokenizer, input_queue, output_dict, prompt_id)
                if new_system_prompt:
                    transformed_record['system'] = new_system_prompt
                    rewrites_count += 1
            
            results.append(transformed_record)
            for key in transformed_record['token_distribution']:
                token_distribution[key] += transformed_record['token_distribution'][key]
        if transform_error:
            removed_count += 1

    if results:
        write_chunk_to_file(results, dataset_name, chunk_index, processed_chunks_dir)
        
    return {
        'chunk_index': chunk_index, 
        'results': len(results), 
        'removed_count': removed_count,
        'token_distribution': token_distribution,
        'rewrites_count': rewrites_count
    }

def load_data_chunk_hf(dataset_name, split='train', chunk_size=None):
    dataset = load_dataset(dataset_name, split=split)
    chunk = []
    for idx, example in enumerate(dataset):
        if isinstance(example, str):
            example = json.loads(example)

        conversations = example.get('conversations', [])
        system = example.get('system', '')
        tools = example.get('tools', '')

        for item in conversations:
            if item['from'] == 'system':
                system = item['value']
            elif item['from'] == 'tools':
                tools = item['value']
        
        record = {
            'conversations': [conv for conv in conversations if conv['from'] not in ['system', 'tools']],
            'system': system,
            'tools': tools
        }

        chunk.append(record)
        if chunk_size and len(chunk) == chunk_size:
            yield chunk
            chunk = []

    if chunk:
        yield chunk

def process_datasets_in_chunks(dataset_names, chunk_size, filtered_data, core_count, dataset_phrase_keys, phrase_lists, processed_chunks_dir, script_version, acceptable_languages, tokenizer):
    dataset_summary = {}
    manager = Manager()
    input_queue = manager.Queue()
    output_dict = manager.dict()

    workers = []
    for i in range(num_request_workers):
        p = Process(target=request_worker, args=(input_queue, output_dict, i))
        p.start()
        workers.append(p)

    with Pool(processes=core_count) as pool:
        for dataset_name in dataset_names:
            dataset_summary[dataset_name] = {
                'processed': 0,
                'removed': 0,
                'token_distribution': initialize_token_distribution(0, 32000, 16),
                'rewrites_count': 0
            }
            regex_pattern = create_regex_pattern(dataset_name, dataset_phrase_keys, phrase_lists)
            
            tasks = []
            total_records = sum(1 for _ in load_data_chunk_hf(dataset_name, chunk_size=chunk_size))
            
            with tqdm(total=total_records, desc=f"Processing {dataset_name}") as pbar:
                for chunk_index, data_chunk in enumerate(load_data_chunk_hf(dataset_name, chunk_size=chunk_size)):
                    args = (dataset_name, chunk_index, regex_pattern, filtered_data, data_chunk, processed_chunks_dir, script_version, acceptable_languages, tokenizer, input_queue, output_dict)
                    task = pool.apply_async(process_chunk, (args,))
                    tasks.append((dataset_name, task))

                for dataset_name, task in tasks:
                    result = task.get()
                    dataset_summary[dataset_name]['processed'] += result['results']
                    dataset_summary[dataset_name]['removed'] += result['removed_count']
                    dataset_summary[dataset_name]['rewrites_count'] += result['rewrites_count']
                    for key in result['token_distribution']:
                        dataset_summary[dataset_name]['token_distribution'][key] += result['token_distribution'][key]
                    pbar.update(result['results'] + result['removed_count'])

    for _ in range(num_request_workers):
        input_queue.put((None, None))
    for p in workers:
        p.join()

    gc.collect()

    for dataset_name in dataset_summary:
        dataset_summary[dataset_name]['token_distribution'] = {k: v for k, v in dataset_summary[dataset_name]['token_distribution'].items() if v > 0}

    print("\nDataset Summary (Processed / Removed / Rewrites):")
    for name, summary in dataset_summary.items():
        print(f"{name}: {summary['processed']} / {summary['removed']} / {summary['rewrites_count']}")

if __name__ == "__main__":
    print("Starting dataset processing...")
    
    print("\nConfiguration:")
    print(f"Rewrite System Prompt: {rewrite_sys_prompt}")
    print(f"System Prompt Minimum Token Length: {sys_prompt_min_token_length}")
    print(f"Chunk Size: {chunk_size}")
    print(f"Core Count: {core_count}")
    print(f"Processed Chunks Directory: {processed_chunks_dir}")
    print(f"Script Version: {script_version}")
    print(f"Ollama Host: {ollama_host}")
    print(f"Ollama Model: {ollama_model}")
    
    try:
        client = Client(host=ollama_host)
        response = client.generate(ollama_model, 'Test connection')
        print("Successfully connected to the Ollama service.")
    except Exception as e:
        print(f"Warning: Unable to connect to the Ollama service: {e}")
        print("The script will continue without rewriting system prompts.")
        rewrite_sys_prompt = False
    
    process_datasets_in_chunks(
        dataset_names,
        chunk_size,
        filtered_data,
        core_count,
        dataset_phrase_keys,
        phrase_lists,
        processed_chunks_dir,
        script_version,
        acceptable_languages,
        tokenizer
    )
    
    print("\nDataset processing completed.")

In [None]:
#deduplicate and merge
import gc
import json
import os
import re
import socket
import sys
import time
from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
import multiprocessing
from multiprocessing import Pool, Manager, Process, Value
from multiprocessing.pool import ThreadPool
import fasttext
import h5py
import matplotlib.pyplot as plt
import numpy as np
import ollama
import pandas as pd
import psutil
import pyarrow as pa
import pyarrow.parquet as pq
import requests
import seaborn as sns
import torch
import torch.nn.functional as F
import yaml
from datasets import load_dataset, DatasetDict
from langdetect import detect, LangDetectException
from matplotlib import cm, colors
from ollama import Client
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, to_json, monotonically_increasing_id, rand, row_number, hash, pandas_udf, udf
from pyspark.sql.types import ArrayType, FloatType, StructType
from pyspark.sql.window import Window
from scipy.cluster.hierarchy import dendrogram, linkage
from scipy.sparse import lil_matrix, save_npz, csr_matrix, load_npz
from scipy.sparse import eye
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.preprocessing import normalize
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModel, AutoConfig
from umap import UMAP
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.preprocessing import normalize
from scipy.stats import gaussian_kde
from matplotlib.colors import LinearSegmentedColormap

tokenizer = AutoTokenizer.from_pretrained("avsolatorio/GIST-Embedding-v0")
model = AutoModel.from_pretrained("avsolatorio/GIST-Embedding-v0")

# Checking if a GPU is available and using it
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", device)
model.to(device)

def initialize_spark_session(core_count):
    """
    Initialize and return a Spark session with specified core count settings
    and an increased driver max result size.
    """
    spark_memory = "15g"  # Adjust based on your system's resources
    driver_max_result_size="2g"
    spark = SparkSession.builder\
        .appName("Process and Merge Conversations")\
        .master(f"local[{core_count}]")\
        .config("spark.driver.memory", spark_memory)\
        .config("spark.executor.memory", spark_memory)\
        .config("spark.executor.cores", str(core_count))\
        .config("spark.sql.shuffle.partitions", str(core_count * 2))\
        .config("spark.driver.maxResultSize", driver_max_result_size)\
        .enableHiveSupport()\
        .getOrCreate()
    return spark
    
def adapt_schema(sdf):
    """
    Check and adapt the schema of the Spark DataFrame if necessary.
    """
    expected_columns = ["conversations", "system", "tools", "origin", "script_version", "human_token_count", "gpt_token_count", "token_distribution", "processing_time_ms"]
    if set(sdf.columns) == set(expected_columns):
        print("✔️ Schema matches the expected format. No adaptation needed.")
    else:
        print("❗ Schema does not match expected format. Adaptation might be required.")

    return sdf

def perform_reduction(method, embeddings):
    return method.fit_transform(embeddings)

def get_available_memory():
    return psutil.virtual_memory().available

def calculate_chunk_size(n, embedding_dim, dtype=np.float32):
    def bytes_to_human_readable(bytes_value):
        for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
            if bytes_value < 1024.0:
                return f"{bytes_value:.2f} {unit}"
            bytes_value /= 1024.0

    # Calculate memory footprint of one embedding
    embedding_size = np.dtype(dtype).itemsize * embedding_dim
    
    # Get available memory (using 75% of available memory)
    total_memory = psutil.virtual_memory().total
    available_memory = get_available_memory() * 0.5
    
    print(f"Total System Memory: {bytes_to_human_readable(total_memory)}")
    print(f"Available Memory (75%): {bytes_to_human_readable(available_memory)}")
    print(f"Single Embedding Size: {bytes_to_human_readable(embedding_size)}")
    
    # Calculate the maximum square matrix size that can fit in memory
    max_matrix_size = int(np.sqrt(available_memory / np.dtype(dtype).itemsize))
    
    # Ensure chunk size is at least 1 and no larger than the dataset or max_matrix_size
    chunk_size = min(n, max_matrix_size)
    
    print(f"Maximum matrix size that can fit in memory: {max_matrix_size} x {max_matrix_size}")
    print(f"Total number of embeddings in dataset: {n:,}")
    print(f"Calculated chunk size: {chunk_size:,}")
    
    memory_usage = chunk_size * chunk_size * np.dtype(dtype).itemsize
    print(f"Estimated memory usage per chunk: {bytes_to_human_readable(memory_usage)}")
    
    return chunk_size

def calculate_sub_chunk_size(batch_size, embedding_dim, dtype=np.float32):
    # Get available memory (using 75% of available memory to be safe)
    available_memory = psutil.virtual_memory().available * 0.25
    
    # Calculate memory footprint of one sub-chunk
    element_size = np.dtype(dtype).itemsize
    sub_chunk_memory = lambda size: size * size * element_size
    
    # Binary search to find the largest sub-chunk size that fits in memory
    low, high = 1, batch_size
    while low <= high:
        mid = (low + high) // 2
        if sub_chunk_memory(mid) <= available_memory:
            low = mid + 1
        else:
            high = mid - 1
    
    sub_chunk_size = high
    
    print(f"Calculated sub-chunk size: {sub_chunk_size}")
    print(f"Estimated memory usage per sub-chunk: {sub_chunk_memory(sub_chunk_size) / (1024**3):.2f} GB")
    
    return sub_chunk_size

def cosine_similarity(embeddings, core_count, save_path):
    # Convert embeddings to numpy array if it's a list
    if isinstance(embeddings, list):
        embeddings = np.array(embeddings, dtype=np.float32)
    
    n, embedding_dim = embeddings.shape
    print(f"Number of embeddings: {n:,}, Embedding dimension: {embedding_dim}")
    
    chunk_size = calculate_chunk_size(n, embedding_dim, dtype=np.float32)
    ensure_directory_exists(save_path)
    
    mmap_path = os.path.join(save_path, 'embeddings.mmap')
    print("Creating memory-mapped file for embeddings...")
    mmap_embeddings = np.memmap(mmap_path, dtype=np.float32, mode='w+', shape=embeddings.shape)
    mmap_embeddings[:] = embeddings[:]
    mmap_embeddings.flush()
    
    chunks = [(i, j) for i in range(0, n, chunk_size) for j in range(i, n, chunk_size)]
    
    print(f"Total number of chunks to process: {len(chunks)}")
    
    for i, j in tqdm(chunks, desc="Processing chunks"): #last run stop
        end_i = min(i + chunk_size, n)
        end_j = min(j + chunk_size, n)
        result_path = os.path.join(save_path, f"similarity_chunk_{i}_{j}.npz")
        
        if not os.path.exists(result_path):
            try:
                calculate_and_save_cosine_similarity(mmap_embeddings, i, end_i, j, end_j, result_path, chunk_size)
            except Exception as e:
                print(f"Error processing chunk {i},{j}: {str(e)}")
    
    # Verify that at least one chunk was processed successfully
    if not any(os.path.exists(os.path.join(save_path, f"similarity_chunk_{i}_{j}.npz")) for i, j in chunks):
        raise ValueError("No similarity chunks were successfully created.")
    
    del mmap_embeddings
    try:
        os.unlink(mmap_path)
        print(f"Successfully deleted {mmap_path}")
    except Exception as e:
        print(f"Failed to delete {mmap_path}: {str(e)}")
    
    print("Cosine similarity calculation completed.")

def calculate_and_save_cosine_similarity(mmap_embeddings, start_i, end_i, start_j, end_j, result_path, chunk_size):
    try:
        batch_i = mmap_embeddings[start_i:end_i]
        batch_j = mmap_embeddings[start_j:end_j]
        
        # Normalize the embeddings
        batch_i = batch_i / (np.linalg.norm(batch_i, axis=1, keepdims=True) + 1e-8)
        batch_j = batch_j / (np.linalg.norm(batch_j, axis=1, keepdims=True) + 1e-8)
        
        # Calculate the sub-chunk size
        sub_chunk_size = calculate_sub_chunk_size(max(batch_i.shape[0], batch_j.shape[0]), batch_i.shape[1])
        
        # Calculate cosine similarity in smaller sub-chunks
        similarity = lil_matrix((batch_i.shape[0], batch_j.shape[0]), dtype=np.float32)
        
        for i in range(0, batch_i.shape[0], sub_chunk_size):
            for j in range(0, batch_j.shape[0], sub_chunk_size):
                sub_i = batch_i[i:i+sub_chunk_size]
                sub_j = batch_j[j:j+sub_chunk_size]
                sub_sim = np.dot(sub_i, sub_j.T)
                similarity[i:i+sub_chunk_size, j:j+sub_chunk_size] = sub_sim
        
        # Convert to CSR format for efficient storage
        similarity = similarity.tocsr()
        
        # Save as sparse matrix
        save_npz(result_path, similarity)
        
        print(f"Saved similarity chunk to {result_path}")
    except Exception as e:
        print(f"Error in calculate_and_save_cosine_similarity: {str(e)}")
        raise

def ensure_directory_exists(directory_path):
    os.makedirs(directory_path, exist_ok=True)
    print(f"Ensured directory exists: {directory_path}")

def load_complete_similarity_matrix(save_path):
    print("Starting to load similarity matrix...")
    max_index_i = max_index_j = 0
    file_dimensions = {}

    # First pass: determine matrix dimensions
    for filename in os.listdir(save_path):
        if filename.endswith('.npz'):
            parts = filename[:-4].split('_')
            if len(parts) >= 4 and parts[-3] == 'chunk':
                i, j = int(parts[-2]), int(parts[-1])
                filepath = os.path.join(save_path, filename)
                chunk = load_npz(filepath)
                chunk_shape = chunk.shape
                file_dimensions[(i, j)] = chunk_shape
                max_index_i = max(max_index_i, i + chunk_shape[0])
                max_index_j = max(max_index_j, j + chunk_shape[1])

    print(f"Full matrix dimensions: {max_index_i} x {max_index_j}")
    
    if max_index_i == 0 or max_index_j == 0:
        raise ValueError("No valid similarity matrix chunks found.")
    
    # Initialize the full matrix
    sim_matrix = csr_matrix((max_index_i, max_index_j), dtype=np.float32)

    # Second pass: load chunks and fill the matrix
    for (i, j), chunk_shape in tqdm(file_dimensions.items(), desc="Loading chunks"):
        filepath = os.path.join(save_path, f"similarity_chunk_{i}_{j}.npz")
        chunk = load_npz(filepath)
        sim_matrix[i:i+chunk_shape[0], j:j+chunk_shape[1]] = chunk

    print("Similarity matrix loaded successfully.")
    print(f"Final similarity matrix shape: {sim_matrix.shape}")
    return sim_matrix

def ensure_directory_exists(directory_path):
    os.makedirs(directory_path, exist_ok=True)

def get_embedding(text):
    # Ensure the text is a string
    if not isinstance(text, str):
        text = str(text)

    # Tokenize and encode the text
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
    
    # Move inputs to the same device as the model
    inputs = {k: v.to(device) for k, v in inputs.items()}

    # Perform inference with no gradient calculation
    with torch.no_grad():
        outputs = model(**inputs)

    # Move the output back to CPU, and then to numpy
    embedding = outputs.last_hidden_state.mean(dim=1).squeeze().cpu().numpy()
    
    # Clear GPU cache
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    
    return embedding

@pandas_udf(ArrayType(FloatType()))
def text_to_embeddings(texts: pd.Series) -> pd.Series:
    embeddings = []
    for text in texts:
        embedding = get_embedding(text)
        embeddings.append(embedding)
        
        # Manually trigger garbage collection
        gc.collect()
    
    return pd.Series(embeddings)


def deduplicate_embeddings_sdf(spark, sdf, core_count, chunk_size, before_count, threshold, save_path="processed_database/"):
    chunk_size = 20000
    
    print("Adding embeddings to DataFrame...")
    sdf = sdf.withColumn("embeddings", text_to_embeddings(sdf["conversations"]))

    print("Converting DataFrame to Pandas...")
    pdf = sdf.toPandas()

    embeddings = list(tqdm(pdf['embeddings'], desc="Processing Embeddings"))
    texts = list(pdf['conversations'])
    origins = list(pdf['origin'])

    print(f"Number of embeddings: {len(embeddings)}")

    print("Calculating cosine similarity matrix...")
    cosine_similarity(embeddings, core_count, save_path)

    print("Loading the full cosine similarity matrix from multiple files.")
    sim_matrix = load_complete_similarity_matrix(save_path)
    
    print(f"Shape of similarity matrix: {sim_matrix.shape}")
    
    if sim_matrix.nnz == 0:
        raise ValueError("Similarity matrix is empty. Check the cosine similarity calculation.")

    print("Identifying duplicates based on threshold...")
    # Efficiently create duplicates_mask
    duplicates_mask = sim_matrix.copy()
    duplicates_mask.data[duplicates_mask.data < threshold] = 0
    duplicates_mask.eliminate_zeros()
    
    # Remove self-similarities
    duplicates_mask.setdiag(0)
    duplicates_mask.eliminate_zeros()
    
    print(f"Shape of duplicates_mask: {duplicates_mask.shape}")
    print(f"Number of True values in duplicates_mask: {duplicates_mask.nnz}")
    
    to_keep = ~(duplicates_mask.sum(axis=0) > 0).A.ravel()
    
    print(f"Shape of to_keep mask: {to_keep.shape}")
    print(f"Number of True values in to_keep: {np.sum(to_keep)}")

    if len(to_keep) != len(pdf):
        print(f"Warning: Mismatch in lengths: to_keep ({len(to_keep)}) != pdf ({len(pdf)})")
        print("Adjusting to_keep array to match pdf length...")
        if len(to_keep) < len(pdf):
            to_keep = np.pad(to_keep, (0, len(pdf) - len(to_keep)), 'constant', constant_values=True)
        else:
            to_keep = to_keep[:len(pdf)]
        print(f"Adjusted to_keep length: {len(to_keep)}")

    print("Deduplicating DataFrame...")
    deduped_pdf = pdf[to_keep]

    print(f"Shape of deduped_pdf: {deduped_pdf.shape}")

    deduped_sdf = spark.createDataFrame(deduped_pdf.drop(columns=['embeddings']))

    print(f"Reduced from {len(pdf)} to {len(deduped_pdf)} entries.")

    print("Cleaning up temporary files...")
    for filename in os.listdir(save_path):
        file_path = os.path.join(save_path, filename)
        try:
            if os.path.isfile(file_path) or os.path.islink(file_path):
                os.unlink(file_path)
            elif os.path.isdir(file_path):
                shutil.rmtree(file_path)
        except Exception as e:
            print(f"Failed to delete {file_path}. Reason: {e}")

    return deduped_sdf
    

def load_chunk(filepath):
    return load_npz(filepath)

def convert_to_array(embeddings):
    """Convert embeddings list to a numpy array if not already an array."""
    if isinstance(embeddings, list):
        return np.array(embeddings, dtype=np.float16)
    return embeddings

def shuffle_dataset(sdf):
    """
    Shuffle each dataset randomly
    """
    print("🔀 Shuffling dataset to randomize row order...")
    sdf = sdf.withColumn("unique_id", monotonically_increasing_id())
    window_spec = Window.partitionBy("origin").orderBy(rand())
    sdf = sdf.withColumn("row_num", row_number().over(window_spec))
    sdf = sdf.orderBy("row_num", rand())
    sdf = sdf.drop("row_num", "unique_id")
    return sdf

def shuffle_final_dataset(final_sdf):
    """
    Shuffle the final dataset randomly.
    """
    print("🔀 Shuffling final dataset to randomize row order...")
    return final_sdf.orderBy(rand())

def sample_datasets(sdf, dataset_name, dataset_use_percentage):
    """
    Samples a percentage of the dataset based on the dataset_name using a fraction.
    """
    initial_count = sdf.count()
    print(f"🔢 Initial row count for {dataset_name}: {initial_count}")
    if dataset_name in dataset_use_percentage:
        percentage = dataset_use_percentage[dataset_name] / 100.0
        sampled_sdf = sdf.sample(withReplacement=False, fraction=percentage, seed=42)
        final_count = sampled_sdf.count()
        print(f"📉 Row count after sampling {dataset_name}: {final_count}")
        return sampled_sdf
    else:
        print(f"⚠️ No sampling percentage defined for {dataset_name}, using full dataset.")
        return sdf
        
def print_filtered_schema(df, exclude_field):
    schema = df.schema
    for field in schema:
        if field.name != exclude_field:
            # Check if the field is a struct type and needs further inspection
            if isinstance(field.dataType, StructType):
                print(f"|-- {field.name}: struct (nullable = {field.nullable})")
                for subfield in field.dataType.fields:
                    if subfield.name != exclude_field:
                        print(f" |    |-- {subfield.name}: {subfield.dataType.simpleString()} (nullable = {subfield.nullable})")
            else:
                print(f"|-- {field.name}: {field.dataType.simpleString()} (nullable = {field.nullable})")
                
def process_and_merge_files(input_directory, temp_output_dir, final_output_file, dataset_use_percentage, core_count, chunk_size):
    spark = initialize_spark_session(core_count)
    print("🌟 Initializing merging and sampling process...")
    
    sampled_sdfs = []
    for dataset_name, percentage in dataset_use_percentage.items():
        file_pattern = os.path.join(input_directory, dataset_name.replace('/', '_') + '_chunk_*.parquet')
        print(f"\n📂 Loading and sampling dataset {dataset_name} with file pattern: {file_pattern}")
        specific_sdf = spark.read.parquet(file_pattern)
        specific_sdf = shuffle_dataset(specific_sdf)
        sampled_sdf = sample_datasets(specific_sdf, dataset_name, dataset_use_percentage)
        sampled_sdfs.append(sampled_sdf)
    
    print("\n🔗 Unioning all sampled dataframes...")
    final_sdf = sampled_sdfs[0]
    for sdf in sampled_sdfs[1:]:
        final_sdf = final_sdf.union(sdf)
    
    print("")
    print("\n📋 Files successfully loaded. Inspecting schema...")
    print_filtered_schema(final_sdf, "token_distribution")
    print("")
    
    final_sdf = shuffle_final_dataset(final_sdf)

    before_count = final_sdf.count()
    print(f"\n✅ Before deduplication row count: {before_count}")
    final_sdf = adapt_schema(final_sdf)
    final_sdf = deduplicate_embeddings_sdf(spark, final_sdf, core_count, chunk_size, before_count, threshold=0.95)
    final_count = final_sdf.count()
    
    print(f"\n✅ Deduplication completed. Final row count: {final_count}")
    os.makedirs(os.path.dirname(final_output_dir), exist_ok=True)
    
    print(f"💾 Saving deduplicated data to temporary directory: {temp_output_dir}...")
    final_sdf.coalesce(1).write.mode('overwrite').parquet(temp_output_dir)
    print("🏁 Data saved. Preparing to consolidate into a single file...")
    pd_df = pd.read_parquet(temp_output_dir, engine='pyarrow')
    pd_df.to_parquet(final_output_file, engine='pyarrow')
    
    print(f"🎉 Merged data saved to: {final_output_file}")
    spark.stop()

if __name__ == "__main__":
    input_directory = 'processed_chunks/' # Input directory
    temp_output_dir = '/tmp/merged_data'  # Temporary directory for Spark output
    final_output_dir = 'final/' # Final directory
    final_output_file = 'final/merged_cleaned_shuffled_deduped.parquet'  # Final single Parquet file
    
    process_and_merge_files(input_directory, temp_output_dir, final_output_file, dataset_use_percentage, core_count, chunk_size)

In [11]:
##Upload To Huggingface make sure to log in

from datasets import load_dataset
from huggingface_hub import notebook_login

dataset = load_dataset("parquet", data_files="./final/modified_file.parquet")
dataset.push_to_hub(f"[user/model]")

#created by Steel-skull

CommitInfo(commit_url='https://huggingface.co/datasets/TheSkullery/Aether-Lite-v1.8.1/commit/cc93afcaa867ad2b63f4f5a9060552728c5198cb', commit_message='Upload dataset', commit_description='', oid='cc93afcaa867ad2b63f4f5a9060552728c5198cb', pr_url=None, pr_revision=None, pr_num=None)