In [2]:
import openai
import operator
import logging
from tqdm import tqdm
from time import sleep
import openai
import numpy as np
import matplotlib.pyplot as plt
import colorama
from colorama import Fore, Style
import tiktoken
from scipy.ndimage import gaussian_filter1d
from scipy.optimize import curve_fit
import math


encoding = tiktoken.get_encoding("p50k_base")

api_key = "YOUR-OPENAI-API"

prompt = "Beginners BBQ Class Taking Place in Missoula! Do you want to get better at making delicious BBQ? You will have the opportunity, put this on your calendar now. Thursday, September 22nd join World Class BBQ Champion, Tony Balay from Lonestar Smoke Rangers. "
real_completion = "He will be teaching a beginner level class for everyone who wants to get better with their culinary skills. He will teach you everything you need to know to compete in a KCBS BBQ competition, including techniques, recipes, timelines, meat selection and trimming, plus smoker and fire information. The cost to be in the class is $35 per person, and for spectators it is free. Included in the cost will be either a t-shirt or apron and you will be tasting samples of each meat that is prepared."
temperature = 0

class gpt_logp:
    """ Language Model. """

    def __init__(self, api_key: str, model: str, sleep_time: int = 10):
        """ Language Model.

        @param api_key: OpenAI API key.
        @param model: OpenAI model.
        """
        logging.info(f'Loading Model: `{model}`')
        openai.api_key = api_key
        self.model = model
        self.sleep_time = sleep_time

    def get_logprobs(self, input_texts: str or list, *args, **kwargs):
        """ Compute the log probabilities and return corresponding tokens on recurrent LM.

        :param input_texts: A string or list of input texts for the encoder.
        :return: A list of tuples, where each tuple contains the log probabilities and corresponding token for a single input text.
        """
        single_input = type(input_texts) == str
        input_texts = [input_texts] if single_input else input_texts
        all_logprobs = []
        for text in tqdm(input_texts):
            while True:
                try:
                    completion = openai.Completion.create(
                        model=self.model,
                        prompt=text,
                        logprobs=5,
                        max_tokens=0,
                        temperature=0,
                        echo=True
                    )
                    break
                except Exception:
                    if self.sleep_time is None or self.sleep_time == 0:
                        logging.exception('OpenAI internal error')
                        exit()
                    logging.info(f'Rate limit exceeded. Waiting for {self.sleep_time} seconds.')
                    sleep(self.sleep_time)
            logprobs = completion['choices'][0]['logprobs']['token_logprobs']
            all_logprobs.append(logprobs)
        return all_logprobs,completion


def calculate_ppl(log_probs):
    """ Calculate the perplexity of a sequence of log probabilities.

    :param log_probs: List of log probabilities for each token in the text, 
                      or a list of lists of log probabilities.
    :return: The perplexity of the text.
    """
    if log_probs and isinstance(log_probs[0], list):
        flat_log_probs = [prob for sublist in log_probs for prob in sublist]
    else:
        flat_log_probs = log_probs

    N = len(flat_log_probs)
    if N > 0:
        return math.exp(-sum(flat_log_probs) / N)
    else:
        return float('inf')

def generate_text_without_pattern(api_key, prompt, temperature, max_length):
    """
    Generate text using OpenAI GPT-3 model without a specific pattern.

    :param api_key: The API key for OpenAI.
    :param prompt: The prompt to send to the model.
    :param temperature: The temperature setting for the model.
    :return: Generated text from the model.
    """
    openai.api_key = api_key
    response = openai.Completion.create(
            engine="davinci-002",
            prompt=prompt,
            max_tokens=max_tokens,
            temperature=temperature,
            top_p=1,
            frequency_penalty=0,
            presence_penalty=0
        )
    model_gen_text = response['choices'][0]['text']
    return model_gen_text


# gpt-3.5-turbo-instruct,
def generate_text_with_pattern(api_key, prompt, pattern, temperature, max_retries=3):
    openai.api_key = api_key

    pattern_index = 0
    model_gen_text = ""

    for _ in range(len(pattern)):
        retries = 0
        while retries < max_retries:
            try:
                response = openai.Completion.create(
                    model="gpt-3.5-turbo-instruct",
                    prompt=prompt + model_gen_text,
                    max_tokens=1,
                    logprobs=5,
                    temperature=temperature,
                    frequency_penalty=0,
                    presence_penalty=0
                )

                if response['choices'][0]['logprobs']['top_logprobs']:
                    top_logprobs = response['choices'][0]['logprobs']['top_logprobs'][0]
                    if top_logprobs:
                        next_token_rank = pattern[pattern_index % len(pattern)] - 1
                        next_token = sorted(top_logprobs.items(), key=lambda x: x[1], reverse=True)[next_token_rank][0]

                        if next_token == "<|endoftext|>":
                            model_gen_text += "."  # 插入默认的 token
                            logging.warning(" in response, inserting default token.")
                        else:
                            model_gen_text += next_token
                            break 

                    else:
                        logging.warning("Empty top_logprobs, retrying.")
                else:
                    logging.warning("No top_logprobs in response, retrying.")

                retries += 1

            except Exception as e:
                logging.exception(f"Exception occurred: {e}")
                retries += 1

        if retries == max_retries:
            logging.warning("Max tries limit")
            model_gen_text += "."

        pattern_index += 1

    return model_gen_text
def analyze_text_watermark(input_text, api_key):
    scorer_gpt35 = gpt_logp(api_key=api_key, model="davinci-002")

    logprobs, response_1 = scorer_gpt35.get_logprobs(input_text)

    samples_preview = response_1['choices'][0]['logprobs']['token_logprobs']

    gen_candidates_1 = response_1.choices[0].logprobs.top_logprobs
    gen_candidates_1 = [item for item in gen_candidates_1 if item is not None]

    sorted_gen_candidates_1 = [dict(sorted(item.items(), key=operator.itemgetter(1), reverse=True)) for item in gen_candidates_1]
    gen_candidates_format_1 = [[k for k in d.keys()] for d in sorted_gen_candidates_1]
    tokens_1 = response_1['choices'][0]['logprobs']['tokens']
    tokens_1 = tokens_1[1:]

    rank_output = [gen_candidates_format_1[i].index(tokens_1[i]) + 1 if tokens_1[i] in gen_candidates_format_1[i] else None for i in range(len(tokens_1))]
    rank_outputs = [3 if i is None else i for i in rank_output]

    return rank_outputs, samples_preview


def generate_qpsk_signal(binary_data):
    symbol_mapping = {
        '00': np.pi,       # 180°
        '01': np.pi/2,     # 90°
        '10': 3*np.pi/2,   # 270°
        '11': 0,           # 0°
    }

    carrier_freq = 1  
    samples_per_symbol = 100 
    total_symbols = len(binary_data) // 2
    x = np.linspace(0, 2 * np.pi * total_symbols, total_symbols * samples_per_symbol)

    qpsk_signal = np.array([])

    for i in range(0, len(binary_data), 2):
        symbol = binary_data[i:i+2]
        phase = symbol_mapping[symbol]
        wave_segment = np.sin(carrier_freq * x[:samples_per_symbol] + phase)
        qpsk_signal = np.concatenate([qpsk_signal, wave_segment])
        x = x[samples_per_symbol:]

    scaled_qpsk_signal = 2 * qpsk_signal + 3

    sample_n = 20
    sampling_interval = 100//sample_n
    sampled_indices = np.arange(0, len(scaled_qpsk_signal), sampling_interval)
    sampled_qpsk_signal = scaled_qpsk_signal[sampled_indices]

    sampled_qpsk_signal_rounded = np.round(sampled_qpsk_signal).astype(int)

    return sampled_qpsk_signal_rounded





def decode_to_ascii(rank_outputs_W, sample_n, window_size=3, sigma=1):
    def gaussian_filter(data, sigma):
        return gaussian_filter1d(data, sigma)

    def sin_wave(x, amplitude, phase, offset):
        return amplitude * np.sin(x + phase) + offset

    data = rank_outputs_W

    subarrays = [data[i:i+sample_n] for i in range(0, len(data), sample_n)]
    smoothed_data_gaussian = [gaussian_filter(array, sigma) for array in subarrays]

    resampled_subarrays = []
    for original_array, smoothed_array in zip(subarrays, smoothed_data_gaussian):
        if len(smoothed_array) < len(original_array):
            extended_smoothed_array = np.pad(smoothed_array, (0, len(original_array) - len(smoothed_array)), 'edge')
            resampled_subarrays.append(extended_smoothed_array)
        else:
            resampled_subarrays.append(smoothed_array[:len(original_array)])

    x = np.linspace(0, 2 * np.pi, sample_n)
    initial_phases = [0, np.pi/2, np.pi, 3*np.pi/2]
    amplitude_phase_list = []

    for array in resampled_subarrays:
        best_fit_phase = None
        smallest_error = float('inf')
        params = [0, 0, 0]

        for initial_phase in initial_phases:
            try:
                params, _ = curve_fit(sin_wave, x, array, p0=[2, initial_phase, 3])
                fitted_curve = sin_wave(x, *params)
                error = np.sum((fitted_curve - array) ** 2)
                if error < smallest_error:
                    best_fit_phase = params[1]
                    smallest_error = error
            except RuntimeError:
                continue

        amplitude_phase_list.append((params[0], params[1]))

    process_data = []
    for amplitude, phase in amplitude_phase_list:
        if amplitude < 0:
            process_data.append(phase / np.pi - 1)
        else:
            process_data.append(phase / np.pi)

    decode_bin = ''
    for j in process_data:
        if 0.25 < j <= 0.75:
            decode_bin += '01'
        elif 0.75 < j <= 1.25:
            decode_bin += '00'
        elif 1.25 < j <= 1.75:
            decode_bin += '10'
        elif j > 1.75 or 0 < j <= 0.25:
            decode_bin += '11'
            
    print(decode_bin)
    
    ascii_character = chr(int(decode_bin, 2))
    return ascii_character


Single-letter

In [3]:
prompt = "Beginners BBQ Class Taking Place in Missoula! Do you want to get better at making delicious BBQ? You will have the opportunity, put this on your calendar now. Thursday, September 22nd join World Class BBQ Champion, Tony Balay from Lonestar Smoke Rangers. "

binary_data_list = []  # Ensure this list is initialized before the loop


# test for A-Z
for char in range(ord('A'), ord('Z') + 1):
    binary_data = format(char, '08b')
    binary_data_list.append(binary_data)

# test for a-z
for char in range(ord('a'), ord('z') + 1):
    binary_data = format(char, '08b')
    binary_data_list.append(binary_data)
    
# test for 0 to 9
for num in range(0, 10):
    binary_data = format(num, '08b')
    binary_data_list.append(binary_data)
    
    
print(binary_data_list)
test_res = []
pre = np.ones(40, dtype=int)
temperature=0.1
sample_n=20

for i in binary_data_list:
    sampled_qpsk_signal = generate_qpsk_signal(i)
    pattern = np.concatenate([pre, sampled_qpsk_signal])
    model_gen_text_W = generate_text_with_pattern(api_key, prompt, pattern, temperature)
    print(model_gen_text_W)
    rank_outputs_W, samples_preview_W = analyze_text_watermark(model_gen_text_W, api_key)
    print(len(rank_outputs_W))
    offset = 120-len(rank_outputs_W)
    ascii_character = decode_to_ascii(rank_outputs_W[40:] + offset*[3], sample_n)
    test_res.append(ascii_character)
    print("Decoded ASCII Character:", ascii_character)


ERROR:root:Exception occurred: 

You tried to access openai.Completion, but this is no longer supported in openai>=1.0.0 - see the README at https://github.com/openai/openai-python for the API.

You can run `openai migrate` to automatically upgrade your codebase to use the 1.0.0 interface. 

Alternatively, you can pin your installation to the old version, e.g. `pip install openai==0.28`

A detailed migration guide is available here: https://github.com/openai/openai-python/discussions/742
Traceback (most recent call last):
  File "/tmp/ipykernel_1023521/2591772479.py", line 123, in generate_text_with_pattern
    response = openai.Completion.create(
  File "/home/zhenyu/anaconda3/envs/AIcodedect/lib/python3.8/site-packages/openai/lib/_old_api.py", line 39, in __call__
    raise APIRemovedInV1(symbol=self._symbol)
openai.lib._old_api.APIRemovedInV1: 

You tried to access openai.Completion, but this is no longer supported in openai>=1.0.0 - see the README at https://github.com/openai/opena

['01000001', '01000010', '01000011', '01000100', '01000101', '01000110', '01000111', '01001000', '01001001', '01001010', '01001011', '01001100', '01001101', '01001110', '01001111', '01010000', '01010001', '01010010', '01010011', '01010100', '01010101', '01010110', '01010111', '01011000', '01011001', '01011010']


ERROR:root:Exception occurred: 

You tried to access openai.Completion, but this is no longer supported in openai>=1.0.0 - see the README at https://github.com/openai/openai-python for the API.

You can run `openai migrate` to automatically upgrade your codebase to use the 1.0.0 interface. 

Alternatively, you can pin your installation to the old version, e.g. `pip install openai==0.28`

A detailed migration guide is available here: https://github.com/openai/openai-python/discussions/742
Traceback (most recent call last):
  File "/tmp/ipykernel_1023521/2591772479.py", line 123, in generate_text_with_pattern
    response = openai.Completion.create(
  File "/home/zhenyu/anaconda3/envs/AIcodedect/lib/python3.8/site-packages/openai/lib/_old_api.py", line 39, in __call__
    raise APIRemovedInV1(symbol=self._symbol)
openai.lib._old_api.APIRemovedInV1: 

You tried to access openai.Completion, but this is no longer supported in openai>=1.0.0 - see the README at https://github.com/openai/opena

........................................................................................................................


  0%|                                                                                                                     | 0/1 [01:28<?, ?it/s]


KeyboardInterrupt: 

In [None]:
!pip install openai==0.28

Multi-letters

In [None]:
binary_data_list = []  # Ensure this list is initialized before the loop

# 
for char in ['G','P','T','3']:
# for char in ['A', 'D', 'M', 'I', 'N']:
    ascii_value = ord(char)  # Convert character to ASCII value
    binary_data = format(ascii_value, '08b')  # Convert ASCII value to binary
    binary_data_list.append(binary_data)
    
print(binary_data_list)
test_res = []
# pre = np.ones(40, dtype=int)
temperature=0
sample_n=20
prompt = "Beginners BBQ Class Taking Place in Missoula! Do you want to get better at making delicious BBQ? You will have the opportunity, put this on your calendar now. Thursday, September 22nd join World Class BBQ Champion, Tony Balay from Lonestar Smoke Rangers. Continue..."
model_gen_text_W=''
NW=''
for i in binary_data_list:
    sampled_qpsk_signal = generate_qpsk_signal(i)
#     pattern = np.concatenate([pre, sampled_qpsk_signal])
    pattern = sampled_qpsk_signal
    model_gen_text_W = generate_text_with_pattern(api_key, prompt, pattern, temperature)
    print(model_gen_text_W)
    prompt = prompt + model_gen_text_W
    NW = NW + model_gen_text_W
    rank_outputs_W, samples_preview_W = analyze_text_watermark(prompt, api_key)
    print(len(rank_outputs_W))
    offset = 80-len(rank_outputs_W)
#     ascii_character = decode_to_ascii(rank_outputs_W[40:] + offset*[3], sample_n)
#     print(len(rank_outputs_W[40:] + offset*[3]))
    ascii_character = decode_to_ascii(rank_outputs_W[-80:], sample_n)
    print(len(rank_outputs_W[-80:]))
    test_res.append(ascii_character)
    print("Decoded ASCII Character:", ascii_character)


Calculate PPL

In [None]:
NP=generate_text_without_pattern(api_key,prompt,temperature, 320)
log_probs, _ = scorer.get_logprobs(NP)
ppl = calculate_ppl(log_probs[0][1:])
ppl

C4 dataset

In [None]:
from datasets import load_dataset
dataset = load_dataset("c4", "realnewslike", split="train")
scorer = gpt_logp(api_key=api_key, model="davinci-002")

max_tokens=320
real=[]
result=[]
resultN=[]
for i in range(3):
    all_index=encoding.encode(dataset[i]['text'])
    if len(all_index) <= 120:
        continue 
    prompt_index=all_index[0:-100]
    real_completion_index=all_index[-100:]
    prompt=encoding.decode(prompt_index)
    real_completion=encoding.decode(real_completion_index)
    real.append(real_completion)
    model_gen_text_W = generate_text_with_pattern(api_key, prompt, pattern, temperature)
    result.append(model_gen_text_W)
    model_gen_text_NW=generate_text_without_pattern(api_key,prompt,temperature, max_tokens)
    resultN.append(model_gen_text_NW)
    
    log_probs1, _ = scorer.get_logprobs(model_gen_text_W)
    log_probs2, _ = scorer.get_logprobs(model_gen_text_NW)
    log_probs3, _ = scorer.get_logprobs(real_completion)

    ppl1 = calculate_ppl(log_probs1[0][1:])
    ppl2 = calculate_ppl(log_probs2[0][1:])
    ppl3 = calculate_ppl(log_probs3[0][1:])
    print(f"The perplexity of the text is: {ppl1}, {ppl2}, {ppl3}")