In [1]:
#@title Импорт библиотек.

# Фиксируем версии библиотек на 22.11.2023
!pip  install  tiktoken==0.5.1
!pip  install  langchain==0.0.339
!pip  install  openai==1.3.4
!pip  install  faiss-cpu==1.7.4
!pip install gspread==3.4.2

from langchain.vectorstores import FAISS
from langchain.docstore.document import Document
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.text_splitter import MarkdownHeaderTextSplitter

import requests
import os
import re
import getpass
import openai
import tiktoken
from openai import OpenAI
import zipfile
from IPython import display
import timeit

import gspread                  # Импортируем API для работы с Google таблицами
from google.colab import auth   # Импортируем модуль для аутентификации
from google.auth import default # Импортируем модуль для работы с учетными данными

# Очистить экран.
display.clear_output()

In [2]:
#@title Ввод ключа к API OpenAI.

openai.api_key = getpass.getpass("Введите OpenAi API key:")
os.environ["OPENAI_API_KEY"] = openai.api_key

Введите OpenAi API key:··········


# Внутренности.

In [3]:
#@title Вспомогательные функции

def num_tokens_from_string(string: str) -> int:
    """Возвращает количество токенов в строке"""
    # Выбор кодировщика. `cl100k_base`используется для `gpt-4`, `gpt-3.5-turbo`, `text-embedding-ada-002`
    encoding = tiktoken.get_encoding("cl100k_base")
    # Разбивка строки на токены и подсчет из количества.
    num_tokens = len(encoding.encode(string))
    return num_tokens

def split_text(text, verbose=0):
    """ Функция разбивает текст на чанки. """
    # Шаблон MarkdownHeaderTextSplitter по которому будет делится переданный
    # текст в формате Markdown.
    headers_to_split_on = [ ("#",    "Header 1"),
                            ("##",   "Header 2"),
                            ("###",  "Header 3"),
                            ("####", "Header 4")
                        ]
    # Создаем экземпляр спилиттера.
    markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
    # Получаем список чанков.
    source_chunks = markdown_splitter.split_text(text)

    # Обработка чанков.
    chank_count = len(source_chunks)
    for number, chank in enumerate(source_chunks):
        # Добавление информации в чанк о его номере в базе.
        chank.page_content = f'Chank#{number}/{chank_count}. {chank.page_content}'
        # Вывод количества слов/токенов в фрагменте, если включен режим verbose.
        if verbose:
            count = num_tokens_from_string(chank.page_content)
            print(f"\nChank#{number}/{chank_count}. Tokens in text = {count}\n{'-' * 20}\n{insert_newlines(str(chank))}\n{'=' * 20}")

    # Возвращение списка фрагментов текста.
    return source_chunks

def create_embedding(data, verbose=0):
    """Функция преобразует текстовую Базу знаний в векторную."""
    # Разбивка текста на чанки.
    source_chunks = []
    source_chunks = split_text(text=data, verbose=verbose)

    # Создание векторной Базы знаний на основе чанков.
    search_index = FAISS.from_documents(source_chunks, OpenAIEmbeddings(), )
    # Подсчет общего количества токенов во всех чанках.
    count_token = num_tokens_from_string(' '.join([x.page_content for x in source_chunks]))
    # Печать сводной информации по созданию векторной Базы знаний.
    print('\n==================== ')
    print('Количество токенов в документе :', count_token)
    # Стоимость эмбэндинга согласно прайса на 22.11.2023 - 0,0001/1К токенов.
    # https://openai.com/pricing#language-models
    print('ЦЕНА запроса:', 0.0001*(count_token/1000), ' $')
    return search_index

def load_file(url: str):
    """ Функция загрузки документа по url как текст."""
    try:
        response = requests.get(url) # Получение документа по url.
        response.raise_for_status()  # Проверка ответа и если была ошибка - формирование исключения.
        return response.text
    except Exception as e:
        print(e)

def load_search_indexes(url: str, verbose=0):
    """Функция загружает текстовую Базу знаний и преобразует ее в векторную."""
    try:
        return create_embedding(load_file(url), verbose=verbose)
    except Exception as e:
        print(e)

def insert_newlines(text: str, max_len: int = 120) -> str:
    """ Функция форматирует переданный текст по длине
    для лучшего восприятия на экране."""
    words = text.split()
    lines = []
    current_line = ""
    for word in words:
        if len(current_line + " " + word) > max_len:
            lines.append(current_line)
            current_line = ""
        current_line += " " + word
    lines.append(current_line)
    return "\n".join(lines)

def answer_index(model, system, topic, query, search_index, temp = 1, verbose_documents = 0,  verbose_price = 0, top_documents = 3):
    """ Основная функция которая формирует запрос и получает ответ от OpenAI по заданному вопросу
    на основе векторной Базы знаний. """

    # Выбор варианта вопроса. Если есть query то вопрос задан из группового запроса.
    question = query["question"] if bool(query) else topic
    # Выборка релевантных чанков.
    #`docs = search_index.similarity_search(question, k=top_documents)
    docs = search_index.similarity_search_with_score(question, k=top_documents)

    message_content = ""            # Контекст для GPT.
    message_content_display = ""    # Контекст для вывода на экран.
    for i, doc in enumerate(docs):
        # Формирование контекста для запроса GPT и показа на экран отобранных чанков.
        message_content = message_content + f'Отрывок документа №{i+1}:{doc[0].page_content}'
        message_content_display = message_content_display + f'\nОтрывок документа №{i+1}. Score({str(round(doc[1], 3))})\n-----------------------\n{insert_newlines(doc[0].page_content)}\n'

        # Сбор информации для группого запроса.
        if bool(query):
            # Выделение из строки метаданных ссылки. Если нет присваиваем пустую строку.
            search_link_h1 = re.search(r'\[(.*?)\]', doc[0].metadata.get('Header 1')) if bool(doc[0].metadata.get('Header 1')) else ""
            search_link_h2 = re.search(r'\[(.*?)\]', doc[0].metadata.get('Header 2')) if bool(doc[0].metadata.get('Header 2')) else ""
            search_link_h3 = re.search(r'\[(.*?)\]', doc[0].metadata.get('Header 3')) if bool(doc[0].metadata.get('Header 3')) else ""
            search_link_h4 = re.search(r'\[(.*?)\]', doc[0].metadata.get('Header 4')) if bool(doc[0].metadata.get('Header 4')) else ""
            # Выбор самой внутренней ссылки. Если несколько ссылок, то самая внутренняя ссылается на расположение чанка на сайте.
            link = ''
            if bool(search_link_h4):
                link = search_link_h4.group(1)
            elif bool(search_link_h3):
                link = search_link_h3.group(1)
            elif bool(search_link_h2):
                link = search_link_h2.group(1)
            elif bool(search_link_h1):
                link = search_link_h1.group(1)
            # Заполнение запроса выбранными чанками.
            query[f"chank_{i+1}"] = f"Score({str(round(doc[1], 3))}). {doc[0].page_content}.\n--------\n{link}"

    # Вывод на экран отобранных чанков.
    if (verbose_documents):
        print(message_content_display)

    # Отправка запроса к Open AI.
    completion = OpenAI().chat.completions.create(
        model = model[0],
        messages = [
            {"role": "system", "content": system + f"{message_content}"},
            {"role": "user", "content": question}
        ],
        temperature=temp
    )

    # Подсчет токенов и стоимости.
    prompt_tokens = completion.usage.prompt_tokens
    total_tokens = completion.usage.total_tokens
    price_promt_tokens = prompt_tokens * model[1]/1000
    price_answer_tokens = (total_tokens - prompt_tokens) * model[2]/1000
    price_total_token = price_promt_tokens + price_answer_tokens
    # Сбор информации для группого запроса.
    if bool(query):
        query["price_query"] = price_total_token
        query["price_question"] = price_promt_tokens
        query["price_answer"] = price_answer_tokens
        query["token_query"] = total_tokens
        query["token_question"] = prompt_tokens
        query["token_answer"] = total_tokens - prompt_tokens
    # Вывод на экран стоимости запроса.
    if (verbose_price):
        print('\n======================================================= ')
        print(f'{prompt_tokens} токенов использовано на вопрос. Цена: {round(price_promt_tokens, 6)} $.')
        print(f'{total_tokens - prompt_tokens} токенов использовано на ответ.  Цена: {round(price_answer_tokens, 6)} $.')
        print(f'{total_tokens} токенов использовано всего.     Цена: {round(price_total_token, 6)} $')
        print('======================================================= ')

    # Ответ OpenAI.
    return completion.choices[0].message.content


In [4]:
#@title Вспомогательные функции для загрузки Базы знаний.

def load_bd_text(url: str, verbose=0):
    """ Функция загружает текстовую Базу знаний и
        преобразует ее в векторную."""
    response = requests.get(url) # Получение документа по url.
    response.raise_for_status()  # Проверка ответа и если была ошибка - формирование исключения.
    return create_embedding(response.text, verbose=verbose)

def load_bd_vect(url: str, verbose=0):
    """ Функция загружает векторную Базу знаний."""
    name_bd = 'federallab_bd_index.zip'
    # Скачивание архива Базы знаний
    response = requests.get(url) # Получение документа по url.
    response.raise_for_status()  # Проверка ответа и если была ошибка - формирование исключения.
    # Сохранение архива.
    with open(name_bd, 'wb') as file:
        file.write(response.content)
    # Разархивирование Базы знаний.
    with zipfile.ZipFile(name_bd, 'r') as zip:
        zip.extractall()
    # Загрузка векторной Базы знаний.
    federallab_bd = FAISS.load_local(f'federallab_bd_index', OpenAIEmbeddings())
    return federallab_bd

def load_bd (url_vect: str, url_text: str, verbose=0):
    """ Функция организует очередность загрузки Базы знаний.
        Сначала идет загрузка векторной базы, если она не загружается,
        то загружается база в текстовом формате и потом преобразуется в векторную."""
    try:
        federallab_bd = load_bd_vect(url_vect, verbose)
        print("Загрузка векторной Базы знаний выполнена успешно.")
        return federallab_bd
    except Exception as e:
        print("По указанной ссылке векторной Базы знаний нет.")
        print(e)
        print("\nИдет загрузка текстовой Базы знаний...")
        try:
            federallab_bd = load_bd_text(url_text, verbose)
            print("\nЗагрузка текстовой Базы знаний выполнена успешно.")
            return federallab_bd
        except Exception as e:
            print("\nПо указанной ссылке текстовой Базы знаний нет.")
            print(e)
            print("\nОшибка загрузки!!")

In [5]:
#@title Архивирование Базы знаний (при необходимости).

# В дальнейшем архив нужно поместить на GitHub
archive = False
if archive:
    folder_to_zip = 'federallab_bd_index'
    output_filename = 'federallab_bd_index.zip'
    # Сохранение папки с векторной Базой знаний.
    federallab_bd_index.save_local(folder_to_zip)
    # Архивирование папки с векторной Базой знаний
    with zipfile.ZipFile(output_filename, 'w') as zip:
        for root, dirs, files in os.walk(folder_to_zip):
            for file in files:
                zip.write(os.path.join(root, file))

# Загрузка Базы знаний.

In [None]:
# Сылка на Базу знаний на Github в текстовом формате.
link_bd_text = "https://raw.githubusercontent.com/terrainternship/GPT_labsud/main/Datadase/LabSudDB_v1.md"
# Сылка на Базу знаний на Github в векторном формате.
link_bd_vect = 'https://github.com/terrainternship/GPT_labsud/raw/main/federallab_bd_index.zip'
# Показывать полученные чанки только при загрузке Базы знаний в текстовом формате.
verbose_bd = 1

# Загрузка Базы знаний.
federallab_bd_index = load_bd(link_bd_vect, link_bd_text, verbose=verbose_bd )

# Параметры запроса нейро-консультанту.

In [7]:
 # Параметры запроса нейро-консультанту.

# Данные по названиям модели и стоимости токена на 22.11.2023.
# https://openai.com/pricing#language-models
# Псевдоним = ['Имя модели', 'Цена токена - вопроса', 'Цена токена - ответа'].
MODEL_GPT_4_1106_PREVIEW = ['gpt-4-1106-preview', 0.01, 0.03]   # 128K tokens
MODEL_GPT_3_5_TURBO_1106 = ['gpt-3.5-turbo-1106', 0.001, 0.002] #  16K tokens

# Общие настройки для одиночного и группового запроса.
SELECT_MODEL_GPT = MODEL_GPT_3_5_TURBO_1106 # Выбранная модель.
top_documents = 3                           # Количество полученных релевантных чанков после запроса.
temp = 0                                    # Вариативность ответа.
#federallab_chat_promt = load_file("https://raw.githubusercontent.com/terrainternship/GPT_labsud/main/Dokumov/%D0%A2%D1%8B%20%D1%81%D0%B0%D0%BC%D1%8B%D0%B9%20%D0%BA%D0%BE%D0%BC%D0%BF%D0%B5%D1%82%D0%B5%D0%BD%D1%82%D0%BD%D1%8B%D0%B9%20%D0%BD%D0%B5%D0%B9%D1%80%D0%BE-%D0%BA%D0%BE%D0%BD%D1%81%D1%83%D0%BB%D1%8C.txt")
federallab_chat_promt = load_file("https://raw.githubusercontent.com/terrainternship/GPT_labsud/main/Galina/FLSE_promt")

# Настройки только для одиночного запроса.
verbose_documents = 1   # Показывать отобранные чанки.
verbose_price = 1       # Показывать стоимость запроса.

# Одиночный запрос.

In [55]:
#@title Вопрос нейро-консультанту:
question = 'Стоимость экспертизы технического состояния ТС?'

In [None]:
#@title Отправка вопроса нейро-консультанту.

answer = answer_index(
    model = SELECT_MODEL_GPT,
    system = federallab_chat_promt,
    topic = question,
    search_index = federallab_bd_index,
    temp = temp,
    verbose_documents = verbose_documents,
    verbose_price = verbose_price,
    top_documents = top_documents,
    query = '',
)

print()
print(f'ВОПРОС:\n{insert_newlines(question)}\n')
print(f'ОТВЕТ:\n{insert_newlines(answer)}')

# Групповой запрос.

In [8]:
auth.authenticate_user()        # Аутентифицируем текущего пользователя Colab
creds, _ = default()            # Создаем объект учетных данных на основе аутентификации
gc = gspread.authorize(creds)   # Создаем клиент для таблиц на основе учетных данных

In [9]:
# Название файла с вопросами.
# Ссылка на общий файл.
try:
    spreadsheet = gc.open_by_url('https://docs.google.com/spreadsheets/d/1p69Ma_vcEU86_lde61l5RtawJIbGwpAhpXHo2DEKNDo/edit#gid=1834702311')
    print(f'Подключились к документу - {spreadsheet.title}')
except Exception as e:
    print(e)
    print(f'Ошибка подключения к документу. Проверьте ссылку.')

Подключились к документу - FederalLab


In [10]:
# Получаем список всех страниц файла.
worksheet_list = spreadsheet.worksheets()
print('Страницы документа:')
print('-------------------')
for i, worksheet in enumerate(worksheet_list):
        print(f'  {i}. {worksheet.title}')

Страницы документа:
-------------------
  0. Инструкции
  1. Страницы
  2. Исключенные
  3. Шаблон
  4. Докумов
  5. Макеев
  6. Галина
  7. Петрунин
  8. Шляпников
  9. Бугаев
  10.  A.Куцинс


In [11]:
# Устанавливаем номер рабочего листа по списку выше.
number_sheet = 6 # Петрунин=6
#---------------------------------------------------
worksheet = worksheet_list[number_sheet]
# Проверка текущей страницы.
print(f'Текущая страница - "{worksheet.title}"\n')
# Список всех столбцов на странице.
print('-----№---------Название---')
for i, col in enumerate(worksheet.row_values(1)):
    print(f'Колонка № {i}. {col}')

Текущая страница - "Петрунин"

-----№---------Название---
Колонка № 0. Ответственный
Колонка № 1. URL
Колонка № 2. Название
Колонка № 3. Вопрос
Колонка № 4. Ожидаемый ответ
Колонка № 5. Ответ GPT
Колонка № 6. Оценка
Колонка № 7. Ошибка
Колонка № 8. Комментарий
Колонка № 9. Чанк №1
Колонка № 10. Чанк №2
Колонка № 11. Чанк №3


In [None]:
# Устанавливаем номер колонки с вопросами. Проверить со списком.
column_question = 3

In [12]:
""" В данном блоке происходит заполнение списка запросов вопросами и дополнительной
информацией с выбранного листа. Загрузка выполняется ВСЕХ строк с вопросами для
последующей корректной работы. Далее из этого списка выбирается нужный диапазон
вопросов для группового запроса к GPT."""

# Выбор всех вопросов.
column = worksheet.col_values(column_question)
# Создаем пустой список запросов.
list_query = []
# Заполнение списка запросов информацией с выбранного листа.
for i in range(len(column)-1):
    row = worksheet.row_values(i+2)
    # Считывание ячеек с контролем их наличия для избежании ошибки чтения.
    # Проверить индексы row со списком в ячейке выше.
    person = row[0] if len(row)>=1 else ""      # Автор вопроса.
    link = row[1] if len(row)>=2 else ""        # Ссылка на тему.
    subject = row[2] if len(row)>=3 else ""     # Тема вопроса.
    question = row[3] if len(row)>=4 else ""    # Вопрос.
    answer = row[4] if len(row)>=5 else ""      # Ожидаемый ответ.
    # Словарь запроса.
    query = {
        "line": i+2,            # Номер строки в документе. Берем строки с вопросами.
        "person": person,       # Автор вопроса.
        "subject": subject,     # Тема вопроса.
        "link": link,           # Ссылка на тему.
        "question": question,   # Вопрос.
        "answer": question,     # Ожидаемый ответ.
        "answer_gpt": "",       # Ответ GPT.
        "appraisal": None,      # Оценка ответа.
        "bug": "",              # Ошибка.
        "comments": "",         # Комментарии.
        "chank_1": "",          # Чанк №1.
        "chank_2": "",          # Чанк №2.
        "chank_3": "",          # Чанк №3.
        "price_query": 0,       # Стоимость запроса общая.
        "price_question": 0,    # Стоимость вопроса с контекстом.
        "price_answer": 0,      # Стоимость ответа.
        "token_query": 0,       # Количество токенов всего вопро-ответ.
        "token_question": 0,    # Количество токенов в вопросе с контекстом.
        "token_answer": 0,      # Количество токенов в ответе.
    }
    list_query.append(query)
print(f'Загрузка списка вопросов завершена. Количество вопросов: {len(column)-1}.')

Загрузка списка вопросов завершена. Количество вопросов: 60.


In [13]:
# Пример считанного первого запроса.
# Установкой среза можно вывести интересующий интервал.
for query in list_query[:1]:
    print(f'Строка №{query["line"]}. Вопрос: {query["question"]}')
    print(query)

Строка №2. Вопрос: Какие элементы ТС подвергаются экспертизе техничестого состоянияТС?
{'line': 2, 'person': 'Петрунин', 'subject': 'Экспертиза технического состояния ТС', 'link': 'https://federallab.ru/uslugi-ekspertizyi/avtotexnicheskaya-ekspertiza/ekspertiza-texnicheskogo-sostoyaniya-ts/', 'question': 'Какие элементы ТС подвергаются экспертизе техничестого состоянияТС?', 'answer': 'Какие элементы ТС подвергаются экспертизе техничестого состоянияТС?', 'answer_gpt': '', 'appraisal': None, 'bug': '', 'comments': '', 'chank_1': '', 'chank_2': '', 'chank_3': '', 'price_query': 0, 'price_question': 0, 'price_answer': 0, 'token_query': 0, 'token_question': 0, 'token_answer': 0}


In [14]:
# Диапазон строк с вопросами на выбранной странице.
print(f"Диапазон номеров строк с вопросами (включительно): [{list_query[0]['line']}:{list_query[-1]['line']}].")

Диапазон номеров строк с вопросами (включительно): [2:61].


In [None]:
# Определение диапазона строк с вопросами. Начало диапазона, конец диапазона.
row_first, row_end = 2, 10

In [15]:
#@title Отправка группового запроса нейро-консультанту.
""" Если подвис ответ от GPT можно прервать обработку, установить row_first
в значение на которой была прервана обработка и запустить обработку заново.
Ранее обработанные вопросы будут сохранены в файле. """

# Обнуление общих затрат на групповой запрос.
total_price_query, total_token_query, total_query = 0, 0, 0
# Фиксация времени.
start_group = timeit.default_timer()

for query in list_query[row_first-2:row_end-1]:
    # Проверка на пустой вопрс. Если пустой пропуск цикла.
    if not bool(query['question'].strip ()):
        print(f'На строке № {query["line"]} - вопроса нет.')
        continue
    # Отправка запроса. Фиксация времени.
    start = timeit.default_timer()
    try:
        query["answer_gpt"]= answer_index(
            model = SELECT_MODEL_GPT,
            system = federallab_chat_promt,
            topic = "",
            search_index = federallab_bd_index,
            temp = temp,
            verbose_documents = 0,
            verbose_price = 0,
            top_documents = top_documents,
            query = query,
        )
        total_price_query += query['price_query']
        total_token_query += query['token_query']
        total_query +=1
    except Exception as e:
        print(f'Ошибка ответа GPT на строке №{query["line"]}. - {e}')
    end = timeit.default_timer()
    # Сообщение об успешности ответа от GPT.
    print(f'Строка №{query["line"]}. Ответ на вопрос получен за - {round(end-start, 3)} сек.')

    # Запись ответа в файл.
    try:
        if bool(query['answer_gpt']):
            worksheet.update_cell(query['line'], 6, query['answer_gpt']) # Ответ GPT.
            worksheet.update_cell(query['line'], 10, query['chank_1'])   # Чанк №1.
            worksheet.update_cell(query['line'], 11, query['chank_2'])   # Чанк №2.
            worksheet.update_cell(query['line'], 12, query['chank_3'])   # Чанк №3.
            print(f'Строка №{query["line"]}. Ответ записан в файл.')
    except Exception as e:
        print('\n========================================================')
        print(f'!!!Ошибка записи строки №{query["line"]} в файл. - {e}')
        print('========================================================')

end_group = timeit.default_timer()
print()
print('-------------------------------------------')
print(f'Вопросов в пакетной обработке - {total_query} шт.')
print(f'Время пакетной обработки      - {round(end_group-start_group, 1)} сек.')
print(f'Стоимость пакетной обработки  - {round(total_price_query, 4)} $.')
print(f'Токенов в пакетной обработке  - {total_token_query} шт.')

Строка №5. Ответ на вопрос получен за - 7.29 сек.
Строка №5. Ответ записан в файл.

-------------------------------------------
Вопросов в пакетной обработке - 1 шт.
Время пакетной обработки      - 9.0 сек.
Стоимость пакетной обработки  - 0.0024 $.
Токенов в пакетной обработке  - 2134 шт.


In [16]:
# Вывод вопросов и ответов.
for i, query in enumerate(list_query):
    if query['answer_gpt']:
        print()
        print(insert_newlines(f"ВОПРОС №{i+2}: {query['question']}"))
        print('---------------------------')
        print(insert_newlines(f"\nОТВЕТ: {query['answer_gpt']}"))
        print('===========================\n')
        print(query['chank_1'])
        print()
        print(query['chank_2'])
        print()
        print(query['chank_3'])


 ВОПРОС №5: Какие виды экспертиз проводите при оценке авто?
---------------------------
 ОТВЕТ: При оценке автомобиля ФЛСЭ проводит следующие виды экспертиз: 1. Оценка автотранспорта, включая осмотр, расчет
 общей стоимости и определение точной стоимости. 2. Независимая автотехническая экспертиза, которая помогает разрешить
 спорные вопросы в случае дорожно-транспортных происшествий или при купле-продаже автомобилей. 3. Автотовароведческая
 экспертиза, сочетающая в себе товароведческую и автотехническую экспертизу для получения ответов на вопросы
 относительно соответствия качества автомобиля как товара, его узлов и агрегатов данным, указанным в договоре
 купли-продажи, а также условиям безопасности эксплуатации.

Score(0.218). Chank#67/217. Оценка автотранспорта. Этапы оценки.  
Чаще всего экспертизу назначает суд для определения точной стоимости автомобиля на рынке. Однако в частных ситуациях водители сами обращаются за помощью. И в том, и в другом случае специалистам необходимо про