# Setup

In [None]:
from dotenv import load_dotenv


# Loading OpenAI API key
ENV_FILE_PATH = ".env"
load_dotenv(ENV_FILE_PATH, override=True)

In [None]:
import json
import numpy as np
from typing import Dict, List, Tuple
import os
import zlib
import plotly.express as px
import pandas as pd
import logging
from collections import defaultdict
import concurrent.futures as cf
import itertools

import utils.functions as functions
from utils.openai_handler import OpenAIHandler

In [None]:
# Ensuring that the OpenAI API key is set
OpenAIHandler.set_api_key(os.getenv("OPENAI_API_KEY"))

# Setting up logging
logging.basicConfig(level=logging.INFO)

In [None]:
LITERARY_TEXT_DIR = 'text_data/literary'
file_name_to_original_text: Dict[str, str] = {}
for f_name in os.listdir(LITERARY_TEXT_DIR):
    if not f_name.endswith('.txt'):
        logging.warning(f'File {f_name} is not a text file. Skipping.')
        continue
    with open(f'{LITERARY_TEXT_DIR}/{f_name}', 'r') as f:
        file_name_to_original_text[f_name] = f.read()

Generating Base Compression Examples

In [None]:
base_compression_system_prompt = """ 
You are LLM text compression system. Given inputted text, compress it into the smallest possible character representation.
You should compress the text into a latent space representation, that only needs to be able to be reconstructed with a different {model_type}.
Do not simply abbreviate words or remove spaces and do not use any compression algorithms. 
Return only the compressed text as a string. """

base_compression_user_prompt = base_compression_system_prompt + \
"""
### Text To Compress ###
{original_text}
"""

base_decompression_system_prompt = """ 
You are LLM text decompression system. Given inputted text, decompress it into its original form.
A different {model_type} has compressed the text into a latent space representation such that it can be reconstructed by you.
Do not simply abbreviate words or remove spaces and do not use any decompression algorithms.
Return only the decompressed text as a string. """

base_decompression_user_prompt = base_decompression_system_prompt + \
"""
### Text To Decompress ###
{compressed_text}
"""

In [None]:
GPT_35_MODEL = 'gpt-3.5-turbo'
GPT_4_MODEL = 'gpt-4'
GPT_4_TURBO_MODEL = 'gpt-4-1106-preview'

MODEL_TYPES = [GPT_35_MODEL, GPT_4_MODEL, GPT_4_TURBO_MODEL]

In [None]:
def call_model(original_text: str, model_type: str, system_prompt: str, user_prompt: str) -> str:
    system_prompt = system_prompt.format(model_type=model_type)
    user_prompt = user_prompt.format(original_text=original_text, model_type=model_type)
    compression_messages = [
            {'role': 'system', 'content': system_prompt},
            {'role': 'user', 'content': user_prompt}]
    return OpenAIHandler.get_chat_completion(compression_messages, model=model_type)[0]

In [None]:
MAX_THREADS = 10

model_to_compressed_text: Dict[str, Dict[str, str]] = defaultdict(dict)
compression_func_args = [
    (file_name, text, model_type, base_compression_system_prompt, base_compression_user_prompt) 
    for file_name, text in file_name_to_original_text.items() for model_type in MODEL_TYPES]

model_to_decompressed_text: Dict[str, Dict[str, str]] = defaultdict(dict)
decompression_func_args = [
    (file_name, text, model_type, base_decompression_system_prompt, base_decompression_user_prompt) 
    for file_name, text in file_name_to_original_text.items() for model_type in MODEL_TYPES]

compression_futures: Dict[cf.Future[str], Tuple[str, str]] = {}
with cf.ThreadPoolExecutor(MAX_THREADS) as executor:

    while compression_func_args:
        for comp_args in compression_func_args:
            compression_futures[executor.submit(call_model, *comp_args[1:])] = comp_args

        for future in cf.as_completed(compression_futures):
            try:
                file_name, original_text, model_type = compression_futures.pop(future)
                compression_func_args.remove((file_name, original_text, model_type))
                compressed_text = future.result()
                model_to_compressed_text[model_type][file_name] = compressed_text
                logging.info(f'Succesfully compressed: {(file_name, model_type)}')
            except Exception as exc:
                logging.error(f'Compression generated an exception: {(file_name, model_type)} {exc}')
                compression_func_args.append((file_name, original_text, model_type))
            finally:
                logging.info(f'{len(compression_func_args)} left to compress')
       

# Data Analysis

### Loading text data

### Getting Embeddings

In [None]:
# Get the text embeddings for each original text
file_name_to_original_text_embeddings: Dict[str, np.ndarray] = {}
for file_name, original_text in file_name_to_original_text.items():
    file_name_to_original_text_embeddings[file_name] = OpenAIHandler.get_text_embedding(original_text)

In [None]:
# Get the text embeddings for each decompressed text
file_name_to_chatGPT4_decompressed_text_embeddings: Dict[str, np.ndarray] = {}
for file_name, decompressed_text in file_name_to_chatGPT4_decompressed_text.items():
    file_name_to_chatGPT4_decompressed_text_embeddings[file_name] = OpenAIHandler.get_text_embedding(decompressed_text)

### Saving GPT-4 Data

In [None]:
BASE_FILE_PATH = "experiment_data/gpt4_{data_type}.json"

In [None]:
# Saving the compressed text
GPT4_COMPRESSED_DATA_PATH = BASE_FILE_PATH.format("compressed_text")
with open(GPT4_COMPRESSED_DATA_PATH, 'w') as f:
    json.dump(file_name_to_chatGPT4_compressed_text, f)

In [None]:
# Get the decompressed text
GPT4_DECOMPRESSED_DATA_PATH = BASE_FILE_PATH.format("decompressed_text")
with open(GPT4_DECOMPRESSED_DATA_PATH, 'w') as f:
    json.dump(file_name_to_chatGPT4_decompressed_text, f)

In [None]:
# Saving the original text embeddings
GPT4_EMBEDDINGS_DATA_PATH = BASE_FILE_PATH.format("embeddings")
with open(GPT4_EMBEDDINGS_DATA_PATH, 'w') as f:
    json_serializable = {
        file_name: embedding.tolist() 
        for file_name, embedding in file_name_to_original_text_embeddings.items()}
    json.dump(json_serializable, f)

In [None]:
# Saving the decompressed text embeddings
GPT4_DECOMPRESSED_EMBEDDINGS_DATA_PATH = BASE_FILE_PATH.format("decompressed_embeddings")
with open(GPT4_DECOMPRESSED_EMBEDDINGS_DATA_PATH, 'w') as f:
    json_serializable = {
        file_name: embedding.tolist() 
        for file_name, embedding in file_name_to_chatGPT4_decompressed_text_embeddings.items()}
    json.dump(json_serializable, f)

### Applying Burrows-Wheeler Transform

In [None]:
file_name_to_zlib_most_compressed_bytes: Dict[str, str] = {}
file_name_to_zlib_most_decompressed_text: Dict[str, str] = {}
for file_name, original_text in file_name_to_original_text.items():
    compressed_bytes = zlib.compress(original_text.encode('utf-8'), level=9)
    file_name_to_zlib_most_compressed_bytes[file_name] = compressed_bytes
    file_name_to_zlib_most_decompressed_text[file_name] = zlib.decompress(compressed_bytes).decode('utf-8')

In [None]:
file_name_to_zlib_least_compressed_bytes: Dict[str, str] = {}
file_name_to_zlib_least_decompressed_text: Dict[str, str] = {}
for file_name, original_text in file_name_to_original_text.items():
    compressed_bytes = zlib.compress(original_text.encode('utf-8'), level=1)
    file_name_to_zlib_least_compressed_bytes[file_name] = compressed_bytes
    file_name_to_zlib_least_decompressed_text[file_name] = zlib.decompress(compressed_bytes).decode('utf-8')

### Computing Entropy

In [None]:
file_name_to_chatGPT4_compressed_bytes = {
    file_name: compressed_text.encode('utf-8')
    for file_name, compressed_text in file_name_to_chatGPT4_compressed_text.items()}

In [None]:
# ChatGPT4 Entropy
file_name_to_chatGPT4_compressed_bytes_entropy = {
    file_name: functions.entropy(str(compressed_bytes))
    for file_name, compressed_bytes in file_name_to_chatGPT4_compressed_bytes.items()}

In [None]:
# zlib Most Compressed Entropy
file_name_to_zlib_most_compressed_bytes_entropy = {
    file_name: functions.entropy(str(compressed_bytes))
    for file_name, compressed_bytes in file_name_to_zlib_most_compressed_bytes.items()}

In [None]:
# zlib Most Lease Entropy
file_name_to_zlib_least_compressed_bytes_entropy = {
    file_name: functions.entropy(str(compressed_bytes))
    for file_name, compressed_bytes in file_name_to_zlib_least_compressed_bytes.items()}

### Saving Entropy Data

In [None]:
ZLIB_MOST_COMPRESSED_DATA_PATH = "experiment_data/zlib_most_compressed_bytes_entropy_sim.json"
with open(ZLIB_MOST_COMPRESSED_DATA_PATH, 'w') as f:
    json.dump(file_name_to_zlib_most_compressed_bytes_entropy, f)

In [None]:
ZLIB_LEAST_COMPRESSED_DATA_PATH = "experiment_data/zlib_least_compressed_bytes_entropy_sim.json"
with open(ZLIB_LEAST_COMPRESSED_DATA_PATH, 'w') as f:
    json.dump(file_name_to_zlib_least_compressed_bytes_entropy, f)

### Graphing Entropy

In [None]:
# Graph a stacked bar chart of the entropy of the compressed bytes
combined_df = pd.DataFrame({
    'ChatGPT4': file_name_to_chatGPT4_compressed_bytes_entropy,
    'zlib Most Compressed': file_name_to_zlib_most_compressed_bytes_entropy,
    'zlib Least Compressed': file_name_to_zlib_least_compressed_bytes_entropy,
})

# Normalize the data universally, not along the columns
combined_df = combined_df / combined_df.max().max()

In [None]:
# Make a grouped plotly bar chart
px.bar(
    combined_df, 
    barmode='group',
    title='Relative Entropy of Compressed Bytes',
    labels={
        'value': 'Entropy',
        'index': 'Text',
        'variable': 'Compression Method'})

In [None]:
# Average all indeces
transposed_df = combined_df.T
transposed_df = transposed_df.mean(axis=1)

In [None]:
# Make a grouped plotly bar chart
px.bar(
    transposed_df, 
    x=transposed_df.index,
    y=transposed_df.values,
    color=transposed_df.index,
    text=transposed_df.values.round(3),
    title='Averaged Entropy of Compressed Bytes',
    labels={
        'y': 'Relative Entropy',
        'index': 'Compression Method'})

### Computing Compression Ratio

In [None]:
# ChatGPT Compression Ratio
file_name_to_chatGPT4_compression_ratio: Dict[str, float] = {}
for file_name in file_name_to_original_text.keys():
    compression_ratio = 1-(len(file_name_to_chatGPT4_compressed_bytes[file_name]) / len(file_name_to_original_text[file_name].encode('utf-8')))
    file_name_to_chatGPT4_compression_ratio[file_name] = compression_ratio

In [None]:
# zlib Most Compressed Compression Ratio
file_name_to_zlib_most_compression_ratio: Dict[str, float] = {}
for file_name in file_name_to_original_text.keys():
    compression_ratio = 1-(len(file_name_to_zlib_most_compressed_bytes[file_name]) / len(file_name_to_original_text[file_name].encode('utf-8')))
    file_name_to_zlib_most_compression_ratio[file_name] = compression_ratio

In [None]:
# zlib Most Least Compression Ratio
file_name_to_zlib_least_compression_ratio: Dict[str, float] = {}
for file_name in file_name_to_original_text.keys():
    compression_ratio = 1-(len(file_name_to_zlib_least_compressed_bytes[file_name]) / len(file_name_to_original_text[file_name].encode('utf-8')))
    file_name_to_zlib_least_compression_ratio[file_name] = compression_ratio

In [None]:
ZLIB_MOST_COMPRESSION_RATIO_DATA_PATH = "experiment_data/zlib_most_compression_ratio.json"
with open(ZLIB_MOST_COMPRESSION_RATIO_DATA_PATH, 'w') as f:
    json.dump(file_name_to_zlib_most_compression_ratio, f)

In [None]:
ZLIB_LEAST_COMPRESSION_RATIO_DATA_PATH = "experiment_data/zlib_least_compression_ratio.json"
with open(ZLIB_LEAST_COMPRESSION_RATIO_DATA_PATH, 'w') as f:
    json.dump(file_name_to_zlib_least_compression_ratio, f)

### Graphing Compression Ratio

In [None]:
combined_ratio_df = pd.DataFrame({
    'ChatGPT4': file_name_to_chatGPT4_compression_ratio,
    'zlib Most Compressed': file_name_to_zlib_most_compression_ratio,
    'zlib Least Compressed': file_name_to_zlib_least_compression_ratio})

In [None]:
# Plot the compression ratio
px.bar(
    combined_ratio_df,
    title='Compression Ratio',
    labels={
        'value': 'Compression Ratio',
        'index': 'Text',
        'variable': 'Compression Method'},
    barmode='group')

In [None]:
# Average Universly
transposed_ratio_df = combined_ratio_df.T
transposed_ratio_df = transposed_ratio_df.mean(axis=1)

In [None]:

# Plot the averaged compression ratio
px.bar(
    transposed_ratio_df,
    x=transposed_ratio_df.index,
    y=transposed_ratio_df.values,
    color=transposed_ratio_df.index,
    text=transposed_ratio_df.values.round(3),
    title='Averaged Compression Ratio',
    labels={
        'value': 'Average Compression Ratio',
        'index': 'Compression Method'})

### Computing Edit Distance

In [None]:
# ChatGPT4 Compression Edit Distance
file_name_to_chatGPT4_compression_edit_distance: Dict[str, float] = {}
for file_name in file_name_to_original_text.keys():
    compression_edit_distance = functions.edit_distance(file_name_to_chatGPT4_decompressed_text[file_name], file_name_to_original_text[file_name])
    file_name_to_chatGPT4_compression_edit_distance[file_name] = compression_edit_distance

In [None]:
# zlib Most Compressed Compression Edit Distance
file_name_to_zlib_most_compression_edit_distance: Dict[str, float] = {}
for file_name in file_name_to_original_text.keys():
    compression_edit_distance = functions.edit_distance(file_name_to_zlib_most_decompressed_text[file_name], file_name_to_original_text[file_name])
    file_name_to_zlib_most_compression_edit_distance[file_name] = compression_edit_distance

In [None]:
# zlib Least Compressed Compression Edit Distance
file_name_to_zlib_least_compression_edit_distance: Dict[str, float] = {}
for file_name in file_name_to_original_text.keys():
    compression_edit_distance = functions.edit_distance(file_name_to_zlib_least_decompressed_text[file_name], file_name_to_original_text[file_name])
    file_name_to_zlib_least_compression_edit_distance[file_name] = compression_edit_distance

In [None]:
ZLIB_MOST_COMPRESSION_EDIT_DISTANCE_DATA_PATH = "experiment_data/zlib_most_compression_edit_distance.json"
with open(ZLIB_MOST_COMPRESSION_EDIT_DISTANCE_DATA_PATH, 'w') as f:
    json.dump(file_name_to_zlib_most_compression_edit_distance, f)

In [None]:
ZLIB_LEAST_COMPRESSION_EDIT_DISTANCE_DATA_PATH = "experiment_data/zlib_least_compression_edit_distance.json"
with open(ZLIB_LEAST_COMPRESSION_EDIT_DISTANCE_DATA_PATH, 'w') as f:
    json.dump(file_name_to_zlib_least_compression_edit_distance, f)

### Graphing Edit Distance

In [None]:
combined_edit_distance_df = pd.DataFrame({
    'ChatGPT4': file_name_to_chatGPT4_compression_edit_distance,
    'zlib Most Compressed': file_name_to_zlib_most_compression_edit_distance,
    'zlib Least Compressed': file_name_to_zlib_least_compression_edit_distance})

In [None]:
transposed_edit_distance_df_2 = combined_edit_distance_df.T
transposed_edit_distance_df_2 = transposed_edit_distance_df_2.mean(axis=1)
# Univerally normalize the data
combined_edit_distance_df = combined_edit_distance_df / combined_edit_distance_df.max().max()

In [None]:
# Plot the compression edit distance
px.bar(
    combined_edit_distance_df,
    title='Compression Edit Distance',
    labels={
        'value': 'Compression Edit Distance',
        'index': 'Text'},
    barmode='group')

In [None]:
# Average Universally
transposed_edit_distance_df = combined_edit_distance_df.T
transposed_edit_distance_df = transposed_edit_distance_df.mean(axis=1)

In [None]:
# Plot the compression edit distance
px.bar(
    transposed_edit_distance_df,
    title='Compression Edit Distance',
    color=transposed_edit_distance_df.index,
    text=transposed_edit_distance_df.values.round(3),
    labels={
        'value': 'Edit Distance',
        'index': 'Compression Method'})

### Computing Embedding Cosine Distance

In [None]:
# ChatGPT4 Decompression Cosine Similarity
file_name_to_chatGPT4_decompression_cosine_similarity: Dict[str, float] = {}
for file_name in file_name_to_original_text.keys():
    decompression_cosine_similarity = functions.cosine_distance(file_name_to_chatGPT4_decompressed_text_embeddings[file_name], file_name_to_original_text_embeddings[file_name])
    file_name_to_chatGPT4_decompression_cosine_similarity[file_name] = decompression_cosine_similarity

In [None]:
# zlib Most Compressed Decompression Cosine Similarity
file_name_to_zlib_most_decompression_cosine_similarity: Dict[str, float] = {}
for file_name in file_name_to_original_text.keys():
    # Using the original text embeddings twice as there is no loss in compression
    decompression_cosine_similarity = functions.cosine_distance(file_name_to_original_text_embeddings[file_name], file_name_to_original_text_embeddings[file_name])
    file_name_to_zlib_most_decompression_cosine_similarity[file_name] = decompression_cosine_similarity

In [None]:
# zlib Leasr Compressed Decompression Cosine Similarity
file_name_to_zlib_least_decompression_cosine_similarity: Dict[str, float] = {}
for file_name in file_name_to_original_text.keys():
    # Using the original text embeddings twice as there is no loss in compression
    decompression_cosine_similarity = functions.cosine_distance(file_name_to_original_text_embeddings[file_name], file_name_to_original_text_embeddings[file_name])
    file_name_to_zlib_least_decompression_cosine_similarity[file_name] = decompression_cosine_similarity

In [None]:
ZLIB_MOST_COMPRESSION_COSINE_SIM_DATA_PATH = "experiment_data/zlib_most_cosine_sim.json"
with open(ZLIB_MOST_COMPRESSION_COSINE_SIM_DATA_PATH, 'w') as f:
    json.dump(file_name_to_zlib_most_decompression_cosine_similarity, f)

In [None]:
ZLIB_LEAST_COMPRESSION_COSINE_SIM_DATA_PATH = "experiment_data/zlib_least_cosine_sim.json"
with open(ZLIB_LEAST_COMPRESSION_COSINE_SIM_DATA_PATH, 'w') as f:
    json.dump(file_name_to_zlib_least_decompression_cosine_similarity, f)

### Graphing Embedding Cosine Distance

In [None]:
combined_cosine_similarity_df = pd.DataFrame({
    'ChatGPT4': file_name_to_chatGPT4_decompression_cosine_similarity,
    'zlib Most Compressed': file_name_to_zlib_most_decompression_cosine_similarity,
    'zlib Least Compressed': file_name_to_zlib_least_decompression_cosine_similarity})

In [None]:
# Plot the decompression cosine similarity
px.bar(
    combined_cosine_similarity_df,
    title='Decompression Embedding Cosine Similarity',
    labels={
        'value': 'Cosine Similarity',
        'index': 'Text'},
    barmode='group')

In [None]:
# Average Universally
transposed_cosine_similarity_df = combined_cosine_similarity_df.T
transposed_cosine_similarity_df = transposed_cosine_similarity_df.mean(axis=1)

In [None]:
# Plot the decompression cosine similarity
px.bar(
    transposed_cosine_similarity_df,
    title='Averaged Decompression Cosine Similarity',
    color=transposed_cosine_similarity_df.index,
    text=transposed_cosine_similarity_df.values.round(3),
    labels={
        'value': 'Decompression Cosine Similarity',
        'index': 'Compression Method'})