In [1]:
import cohere
import json
import io
import warnings

import pandas as pd
from IPython.display import display
from PIL import Image

from stability_sdk import client
import stability_sdk.interfaces.gooseai.generation.generation_pb2 as generation

import sys
if '..' not in sys.path:
    sys.path.append('..')
    
import os
with open('../cohere_api_key.txt', 'r') as f:
    cohere_api_key = f.read()
with open('../stability_api_key.txt', 'r') as f:
    stability_api_key = f.read()
    
os.environ["COHERE_KEY"] = cohere_api_key
os.environ["STABILITY_KEY"] = stability_api_key
del cohere_api_key
del stability_api_key

import random
import numpy as np
import re

from app.story_generator.segmentation import StoryDivider
from app.story_generator.generation import StoryGenerator, PromptGenerator, add_newline_at_the_end
# from tests.image_generation_test import get_segmented_story, summarize_story

import time

In [2]:
# import cohere
# with open('../cohere_api_key.txt', 'r') as f:
#     cohere_api_key = f.read()
# co = cohere.Client(cohere_api_key)

# n_try = 1
# while True:
#     print(n_try)
#     n_try += 1
#     response = co.generate(
#       prompt='xd',
#         num_generations=5,
#         max_tokens=300
#     )
    
#     for r in response.generations:
#         pass

In [3]:
# import cohere
# with open('../cohere_api_key.txt', 'r') as f:
#     cohere_api_key = f.read()
# co = cohere.Client(cohere_api_key)


# response = co.tokenize(
#   text=''' '''
# )


# response

<h1>Generation</h1>

In [4]:
def get_n_stories(n=3):
    stories_path = os.path.join(os.path.abspath(''), os.pardir,
                                "data", "stories", "fairy_tales.json")
    with open(stories_path, "rb") as f:
        stories = json.load(f)
        assert n <= len(stories), f'Tried to get {n} stories while is only {len(stories)} stories in the database'
        example_stories = list(np.random.choice(stories, size=n, replace=False))
        
    return example_stories

def get_segment_of_stories(stories: list, segment_idx: int, handle_too_big_index=True,
                           n_pages=None, sentences_per_page=3):
    segmented_stories = []
    for s in stories:
        seg = StoryDivider(s["title"], s["text"])
        segmented = seg.divide_story_into_segments(n_pages=n_pages, sentences_per_page=sentences_per_page)
        if handle_too_big_index and segment_idx >= len(segmented):
            index = len(segmented) - 1
            
        result = s.copy()

        text = segmented[segment_idx]
        result.update({'text': text})
        segmented_stories.append(result)
        
    return segmented_stories

def get_segments_and_continuations(stories: list, n_pages=None, sentences_per_page=3):
    segmented_stories = []
    for s in stories:
        seg = StoryDivider(s["title"], s["text"])
        segmented = seg.divide_story_into_segments(n_pages=n_pages, sentences_per_page=sentences_per_page)
        
        segment_idx = np.random.randint(0, len(segmented)-2)
            
        result = s.copy()
        prev_text = segmented[segment_idx]
        cont_text = segmented[segment_idx+1]
        result.update({'Previous Part': prev_text, 'Continuation': cont_text})
        del result['text']
        segmented_stories.append(result)
        
    return segmented_stories

def hard_filter_results(results, stop_seq, disallowed_strings):
    # results = results.loc[results['generation'].str.contains(stop_seq)]
    
    mask = None
    for s in disallowed_strings:
        new_mask = ~results['generation'].str.contains(s, regex=False)
        if mask is None:
            mask = new_mask
        else:
            mask = mask & new_mask
    results = results.loc[mask]
    return results

def postprocess_results(results, stop_seq):
    results['generation'] = results['generation'].str.replace(stop_seq, '', regex=False)
    results['generation'] = results['generation'].str.replace('\s+$', '', regex=True) # delete white characters at the end of string
    
    mask = (results['generation'].str[-1] != '.') & (results['generation'].str[-1] != '!') \
        & (results['generation'].str[-1] != '?') & (results['generation'].str[-1] != '"')
    results.loc[mask, 'generation'] += '.' # ensure there is a dot at the end of the sentence
    return results

def print_result(result, parameters, keys_used, max_words=1500):
    starting_string = ''
    break_used = False
    for param, key in zip(parameters, keys_used[:-1]):
        starting_string += f'{key.title()}: '
        if param:
            starting_string += param
        else: 
            break_used = True
            break

        starting_string = add_newline_at_the_end(starting_string)

    if not break_used:
        starting_string += f'{keys_used[-1].title()}:'

    for i, row in result.iterrows():
        print('-'*50)
        print('likelihood:', row['likelihood'])

        text = starting_string + row["generation"]
        if len(text.split(' ')) > max_words:
            print('Text too long')
            continue
        print(text)

<h2>Step 0: Set internal parameters</h2>

In [5]:
MAX_TOKENS = 150
STOP_SEQUENCES = ['--']
TEMPERATURE = 0.75
MODEL = 'xlarge'
MIN_P = 0.8
FREQ_PENALTY = 0.0 #0.3
PRESENCE_PENALTY = 1.0 #0.5
N_EXAMPLE_STORIES = 5

# I found manually some tokens that are sometimes problem in our generation
DISALLOWED_TOKENS = {
    'https': 1099,
    ' https': 1595,
    '://': 695,
    '::': 5280,
    ' ::': 13361,
    '/': 48,
    ' /': 1040,
    'http': 2676,
    ' http': 2930,
    '#': 36,
    ' #': 1462,
    '(': 41,
    ' (': 367,
    ')': 42,
    ' )': 3479,
}
DISALLOWED_STRINGS = set([re.sub('^\s|\s$', '', key) for key in DISALLOWED_TOKENS.keys()])

SUMM_GEN_HEADER = f'Assignment: Write a short summary of {N_EXAMPLE_STORIES+1} stories for children based on titles given. Your stories should be in a old-school book format.'
KEYS_TO_USE_FOR_SUMMARY = ['title', 'summary']

BEG_GEN_HEADER = 'Assignment: Write a beginning of {} stories for children based on titles and summary of the story given. Your stories should be in a old-school book format.'
KEYS_TO_USE_FOR_BEGINNING = ['title', 'summary', 'text']

KEYS_TO_USE_FOR_CONTINUATION = ['title', 'summary', 'Previous Part', 'Continuation']
CONT_GEN_HEADER = 'Assignment: Write a continuations of {} stories for children based on titles, summary and the previous part given the story given. Your stories should be in a old-school book format.'

KEYS_TO_USE_FOR_ENDING = ['title', 'summary', 'Previous Part', 'Ending']
END_GEN_HEADER = 'Assignment: Write an ending of {} stories for children based on titles, summary and the previous part given the story given. Your stories should be in a old-school book format.'

In [6]:
def choose_best_result(results):
    return results.loc[((results['generation'].str.split('.').str.len() - 5).abs()).idxmin(), 'generation']

def generate_segment(example_stories, keys_to_use, header, parameters, story_generator=None, check_n_tokens=True):
    prompt_gen = PromptGenerator(stories=example_stories, keys_to_use=keys_to_use, header=header,
                                 parameters=parameters, stop_token=STOP_SEQUENCES[0])
    prompt = prompt_gen.generate_prompt_for_story(check_n_tokens=check_n_tokens)

    if story_generator is None:
        story_gen = StoryGenerator(
            prompt=None, model=MODEL, max_tokens=MAX_TOKENS, stop_sequences=STOP_SEQUENCES, temperature=TEMPERATURE,
            min_p=MIN_P, frequency_penalty=FREQ_PENALTY, presence_penalty=PRESENCE_PENALTY, disallowed_tokens=DISALLOWED_TOKENS.values())

    # Generate in a loop in order to always have at least one viable generated output
    results = []
    try_number = 1
    while len(results) == 0:
        if try_number > 1:
            print('Try:', try_number)

        # generating
        results = story_generator.generate(prompt=prompt, num_generations=5)
        try_number += 1 

        # filtering
        results = hard_filter_results(results, STOP_SEQUENCES[0], DISALLOWED_STRINGS)

    # postprocessing resutls
    results = postprocess_results(results, STOP_SEQUENCES[0])
    
    # choosing best results
    result = choose_best_result(results)
    
    return result, prompt, results

<h2>Step 1: Get params from user</h2>

In [7]:
example_stories = get_n_stories(3) # could be based on some params (try tags?)
n_pages = 5 # pages to generate
title = 'Knight Ulrich And A Fearsome Dragon' # title for the story (could be empty)

<h2>Step 2: Generate short description</h2> 

In [8]:
if title:
    parameters = [title]
else:
    parameters = []

story_gen = StoryGenerator(
    prompt=None, model=MODEL, max_tokens=MAX_TOKENS, stop_sequences=STOP_SEQUENCES, temperature=TEMPERATURE,
    min_p=MIN_P, frequency_penalty=FREQ_PENALTY, presence_penalty=PRESENCE_PENALTY, disallowed_tokens=DISALLOWED_TOKENS.values())

summary, summ_prompt, summ_results = generate_segment(example_stories, KEYS_TO_USE_FOR_SUMMARY, SUMM_GEN_HEADER, parameters, story_generator=story_gen)    

[0m


<h3>Next Step: Serve our summary to user</h3>

Here we have 4 possibilities:
<ul>
    <li><strong>User accepts the summary and goes to Step 3</strong></li>
    <li>User edits the summary, then accepts it and goes to Step 3</li>
    <li>User (re)generates the summary once more by going to Step 2</li>
    <li>User rejects te summary and goes back to Step 1 (home page)</li>
</ul>

<h2>Step 3: Generate story beginning</h2> 

In [9]:
if title:
    parameters = [title, summary]
else:
    parameters = [summary]
    
story_beginnings = get_segment_of_stories(example_stories, 0)
header = BEG_GEN_HEADER.format(len(story_beginnings)+1)

beginning, beg_prompt, beg_results = generate_segment(story_beginnings, KEYS_TO_USE_FOR_BEGINNING, header, parameters, story_generator=story_gen)    

[0m


<h3>Next Step: Serve our beginning to user</h3>

Here we have 4 possibilities:
<ul>
    <li><strong>User accepts the beginning and goes to Step 4</strong></li>
    <li>User edits the beginning, then accepts it and goes to Step 4</li>
    <li>User (re)generates the beginning once more by going to Step 3</li>
    <li>User rejects te beginning and goes back to Step 1 (home page)</li>
</ul>

<h2>Step 4: Generate story continuation</h2> 

In [10]:
continuations_to_generate = n_pages - 2

if title:
    parameters = [title, summary, beginning]
else:
    parameters = [summary, beginning]
    
continuations = []
cont_prompts = []
cont_results_list = []
    
for i in range(continuations_to_generate):
    example_continuations = get_segments_and_continuations(example_stories)
    while True:
        header = CONT_GEN_HEADER.format(len(example_continuations)+1)
        try:
            continuation, cont_prompt, cont_results = generate_segment(example_continuations, KEYS_TO_USE_FOR_CONTINUATION, header, parameters, story_generator=story_gen)
        except AssertionError:
            example_continuations = example_continuations[:-1]
            continue
        break # break if there was no error
        
    continuations.append(continuation)
    # temporary also save prompts and results for debugging
    cont_prompts.append(cont_prompt)
    cont_results_list.append(cont_results)
    
    if title:
        parameters = [title, summary, continuation]
    else:
        parameters = [summary, continuation]

There was an error with Cohere server, retrying after 10 seconds


[0m
[0m
[0m


<h2>Step 5: Generate story ending</h2> 

In [11]:
if title:
    parameters = [title, summary, continuation]
else:
    parameters = [summary, continuation]
    
story_endings = [
    {'Previous Part': story_bef_end.pop('text'), 'Ending': story_end['text'], **story_bef_end} \
    for story_bef_end, story_end in zip(get_segment_of_stories(example_stories, -2), get_segment_of_stories(example_stories, -1))
]

header = END_GEN_HEADER.format(len(story_beginnings)+1)

ending, end_prompt, end_results = generate_segment(story_endings, KEYS_TO_USE_FOR_ENDING, header, parameters, story_generator=story_gen, check_n_tokens=False)    

[0m


In [12]:
story = f'''
Title: {title}
Summary: {summary}

Story:

{beginning + ''.join(continuations) + ending}
'''
print(story)


Title: Knight Ulrich And A Fearsome Dragon
Summary:  A brave knight is traveling through the forest when he comes across a giant dragon, whose body takes up most of the path. At first, the dragon is very friendly, and they start chatting. But then the dragon gets angry and tries to attack the knight! The knight draws his sword and kills the dragon. Then he continues on his way. But soon, he meets another dragon!

Story:

 A knight was traveling in the woods. It was dark and he didn't know where he was going. As he passed by a cave, he saw something move. "Who's there?" called the knight. He walked into the cave and saw a giant dragon curled up and snoring. The knight was frightened, but he couldn't leave the dragon behind. So he decided to hide behind a tree and wait. Just then, the dragon woke up and started roaring. The knight had never killed anyone before, but he had no choice but to kill the dragon. His sword struck the dragon's heart. The dragon screamed and fumbled about. His e

In [13]:
beginning

' A knight was traveling in the woods. It was dark and he didn\'t know where he was going. As he passed by a cave, he saw something move. "Who\'s there?" called the knight.'

In [14]:
continuations[0]

" He walked into the cave and saw a giant dragon curled up and snoring. The knight was frightened, but he couldn't leave the dragon behind. So he decided to hide behind a tree and wait. Just then, the dragon woke up and started roaring."

In [15]:
continuations[1]

" The knight had never killed anyone before, but he had no choice but to kill the dragon. His sword struck the dragon's heart. The dragon screamed and fumbled about. His enormous body hit the roof of the cave and fell to the ground."