<a href="https://colab.research.google.com/github/LopeWale/Emmanuel_SideProjects/blob/main/streamlit_auto_coder_openAi.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
"""
Create an automated coding system that has an user interface using streamlit and uses openai codex model api to generate python code based on the prompt given by the user
"""
from __future__ import absolute_import, division, print_function

import argparse
import glob
import logging
import os
import pickle
import random
from typing import Dict, List, Tuple

import numpy as np
import torch
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader, Dataset, RandomSampler, SequentialSampler, TensorDataset
from torch.utils.data.distributed import DistributedSampler
from tqdm import tqdm, trange

from transformers import WEIGHTS_NAME, BertConfig, BertForMaskedLM, BertTokenizer
from transformers import RobertaConfig, RobertaForMaskedLM, RobertaTokenizer
from transformers import XLMRobertaConfig, XLMRobertaForMaskedLM, XLMRobertaTokenizer
from transformers import XLMConfig, XLMForMaskedLM, XLMTokenizer
from transformers import XLNetConfig, XLNetForMaskedLM, XLNetTokenizer

from transformers import AdamW, get_linear_schedule_with_warmup

import streamlit as st


ModuleNotFoundError: ignored

In [None]:
logger = logging.getLogger(__name__)

#PYTHON_CODE = "int main(){\n\tint x=5;\n\tint y=6;\n\tint z;\n\tz=x+y;\n\treturn z;\n}"
#PYTHON_CODE = "import numpy as np\nimport pandas as pd\nimport matplotlib.pyplot as plt\nimport seaborn as sns\nimport warnings\nwarnings.filterwarnings('ignore')\ndata = pd.read_csv('../input/train.csv')\ndata.head(5)"
PYTHON_CODE = "import numpy as np\nimport pandas as pd\nimport matplotlib.pyplot as plt\nimport seaborn as sns\nimport warnings\nwarnings.filterwarnings('ignore')"

def get_input_code(text):
    if text == "":
        return PYTHON_CODE
    else:
        return text

def get_predictions(text):
    return generate_code(text)

def generate_code(text):
    parser = argparse.ArgumentParser()

    ## Required parameters
    parser.add_argument("--model_type", default=None, type=str, required=True,
                        help="Model type selected in the list: " + ", ".join(MODEL_CLASSES.keys()))
    parser.add_argument("--model_name_or_path", default=None, type=str, required=True,
                        help="Path to pre-trained model or shortcut name selected in the list: " + ", ".join(ALL_MODELS))
    parser.add_argument("--output_dir", default=None, type=str, required=True,
                        help="The output directory where the model predictions and checkpoints will be written.")

    ## Other parameters
    parser.add_argument("--prompt", type=str, default="")
    parser.add_argument("--length", type=int, default=20)
    parser.add_argument("--stop_token", type=str, default=None,
                        help="Token at which text generation is stopped")

    parser.add_argument("--temperature", type=float, default=1.0,
                        help="temperature of 0 implies greedy sampling")
    parser.add_argument("--repetition_penalty", type=float, default=1.0,
                        help="primarily useful for CTRL model; in that case, use 1.2")
    parser.add_argument("--k", type=int, default=0)
    parser.add_argument("--p", type=float, default=0.9)

    parser.add_argument("--num_return_sequences", type=int, default=1)

    parser.add_argument('--seed', type=int, default=42)
    parser.add_argument('--no_cuda', action='store_true',
                        help="Avoid using CUDA when available")
    parser.add_argument('--num_samples', type=int, default=20)

    args = parser.parse_args()

    args.device = torch.device("cuda" if torch.cuda.is_available() and not args.no_cuda else "cpu")
    args.n_gpu = torch.cuda.device_count()

    set_seed(args)

    args.model_type = args.model_type.lower()
    model_class, tokenizer_class = MODEL_CLASSES[args.model_type]
    tokenizer = tokenizer_class.from_pretrained(args.model_name_or_path)
    model = model_class.from_pretrained(args.model_name_or_path)
    model.to(args.device)
    model.eval()

    if args.length < 0 and model.config.max_position_embeddings > 0:
        args.length = model.config.max_position_embeddings
    elif 0 < model.config.max_position_embeddings < args.length:
        args.length = model.config.max_position_embeddings  # No generation bigger than model size
    elif args.length < 0:
        args.length = MAX_LENGTH  # avoid infinite loop

    logger.info(args)

    if args.model_type in ["ctrl"]:
        if args.temperature > 0.7:
            logger.info('CTRL typically works better with lower temperatures (and lower top_k).')

    while True:
        raw_text = text
        context_tokens = tokenizer.encode(raw_text, add_special_tokens=False)
        if args.model_type in ['xlnet', 'xlm']:
            # XLNet is a direct (predict same token, not next token) and bi-directional model by default
            # => need one additional dummy token in the input (will be masked), attention mask and target mapping (see model docstring)
            context_tokens = tokenizer.build_inputs_with_special_tokens(context_tokens)
            input_ids = torch.tensor([context_tokens], dtype=torch.long).to(args.device)
            input_ids = input_ids.repeat(args.num_samples, 1)
            token_type_ids = torch.zeros_like(input_ids)
            attention_mask = torch.zeros_like(input_ids)
            lm_labels = torch.full_like(input_ids, fill_value=-1)
            lm_labels[:, -1] = torch.tensor(context_tokens, dtype=torch.long).to(args.device)
            lm_labels = lm_labels.repeat(args.num_samples, 1)

        else:
            assert args.model_type in ["bert", "roberta", "distilbert", "camembert", "xlmroberta"]
            # Models with a LM head are next token prediction (classification) models by default
            # when used with the cross entropy loss. Last token prediction is always treated as
            # token generation.
            input_ids = torch.tensor([context_tokens], dtype=torch.long).to(args.device)
            input_ids = input_ids.repeat(args.num_samples, 1)
            lm_labels = None

            # if model_type in ["bert", "roberta", "distilbert", "camembert", "xlmroberta"]:
            #     # Models with a LM head are next token prediction (classification) models by default
            #     # when used with the cross entropy loss. Last token prediction is always treated as
            #     # token generation.
            #     lm_labels = torch.full_like(input_ids, fill_value=-1)
            #     lm_labels[:, -1] = torch.tensor(context_tokens, dtype=torch.long).to(args.device)
            #     lm_labels = lm_labels.repeat(args.num_samples, 1)

        with torch.no_grad():
            outputs = model(input_ids, lm_labels=lm_labels) if args.model_type in ["bert", "roberta", "distilbert", "camembert", "xlmroberta"] else model(input_ids, token_type_ids=token_type_ids, attention_mask=attention_mask, lm_labels=lm_labels)
            loss = outputs[0]
            logits = outputs[0] if args.model_type in ['bert', 'xlnet'] else outputs[1]
            logits = logits[:, -1, :] / (args.temperature if args.temperature > 0 else 1.)
            logits = top_k_top_p_filtering(logits, top_k=args.k, top_p=args.p)
            probs = torch.softmax(logits, dim=-1)

            prev = torch.topk(probs, 1)[1] if args.no_cuda else torch.topk(probs, 1)[1].to('cpu')
            if i >= args.length:
                break
            current_output = torch.cat((prev, current_output), dim=1)

        out = current_output.tolist()
        for i in range(args.num_samples):
            text = tokenizer.decode(out[i], clean_up_tokenization_spaces=True)
            text = text[: text.find(args.stop_token) if args.stop_token else None]

            return text

def set_seed(args):
    random.seed(args.seed)
    np.random.seed(args.seed)
    torch.manual_seed(args.seed)
    if args.n_gpu > 0:
        torch.cuda.manual_seed_all(args.seed)

def top_k_top_p_filtering(logits, top_k=0, top_p=0.0, filter_value=-float('Inf')):
    """ Filter a distribution of logits using top-k and/or nucleus (top-p) filtering
        Args:
            logits: logits distribution shape (vocabulary size)
            top_k > 0: keep only top k tokens with highest probability (top-k filtering).
            top_p > 0.0: keep the top tokens with cumulative probability >= top_p (nucleus filtering).
                Nucleus filtering is described in Holtzman et al. (http://arxiv.org/abs/1904.09751)
        From: https://gist.github.com/thomwolf/1a5a29f6962089e871b94cbd09daf317
    """
    top_k = min(top_k, logits.size(-1))  # Safety check
    if top_k > 0:
        # Remove all tokens with a probability less than the last token of the top-k
        indices_to_remove = logits < torch.topk(logits, top_k)[0][..., -1, None]
        logits[indices_to_remove] = filter_value

    if top_p > 0.0:
        sorted_logits, sorted_indices = torch.sort(logits, descending=True)
        cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1)

        # Remove tokens with cumulative probability above the threshold
        sorted_indices_to_remove = cumulative_probs > top_p
        # Shift the indices to the right to keep also the first token above the threshold
        sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone()
        sorted_indices_to_remove[..., 0] = 0

        # Back to unsorted indices and set them to -infinity
        indices_to_remove = sorted_indices[sorted_indices_to_remove]
        logits[indices_to_remove] = filter_value
    return logits


MODEL_CLASSES = {
    'bert': (BertConfig, BertForMaskedLM, BertTokenizer),
    'xlnet': (XLNetConfig, XLNetForMaskedLM, XLNetTokenizer),
    'xlm': (XLMConfig, XLMForMaskedLM, XLMTokenizer),
    'roberta': (RobertaConfig, RobertaForMaskedLM, RobertaTokenizer),
    'distilbert': (DistilBertConfig, DistilBertForMaskedLM, DistilBertTokenizer),
    'camembert': (CamembertConfig, CamembertForMaskedLM, CamembertTokenizer),
    'xlmroberta': (XLMRobertaConfig, XLMRobertaForMaskedLM, XLMRobertaTokenizer),
}

ALL_MODELS = sum((tuple(conf.pretrained_config_archive_map.keys()) for conf in (BertConfig, XLNetConfig, XLMConfig,
                                                                                RobertaConfig, DistilBertConfig,
                                                                                CamembertConfig, XLMRobertaConfig)), ())

MODEL_CLASSES = {
    'bert': (BertConfig, BertForMaskedLM, BertTokenizer),
    'xlnet': (XLNetConfig, XLNetForMaskedLM, XLNetTokenizer),
    'xlm': (XLMConfig, XLMForMaskedLM, XLMTokenizer),
    'roberta': (RobertaConfig, RobertaForMaskedLM, RobertaTokenizer),
    'distilbert': (DistilBertConfig, DistilBertForMaskedLM, DistilBertTokenizer),
    'camembert': (CamembertConfig, CamembertForMaskedLM, CamembertTokenizer),
    'xlmroberta': (XLMRobertaConfig, XLMRobertaForMaskedLM, XLMRobertaTokenizer),
}

MAX_LENGTH = int(10000)  # Hardcoded max length to avoid infinite loop


In [None]:
def main():
    st.title("Automated Code Generator")
    st.subheader("Python Code Generator")
    st.markdown("This application is a Streamlit dashboard that can be used to generate Python code using OpenAI's GPT-3 API.")

    st.sidebar.title("Generate Python Code")
    st.sidebar.markdown("This application is a Streamlit dashboard that can be used to generate Python code using OpenAI's GPT-3 API.")
    st.sidebar.markdown("Enter your code snippet below:")

    code_input = st.sidebar.text_area("", height=200)
    user_input = get_input_code(code_input)
    #st.write(f"Inputted Code: {user_input}")

    if st.sidebar.checkbox("Show Input Code"):
        st.subheader("Inputted Code")
        st.code(user_input)

    st.sidebar.markdown("Enter the prompt below:")
    prompt_input = st.sidebar.text_area("", height=50)
    user_prompt = get_input_code(prompt_input)
    #st.write(f"Prompt: {user_prompt}")

    if st.sidebar.checkbox("Show Prompt"):
        st.subheader("Prompt")
        st.code(user_prompt)

    if st.sidebar.button("Generate Code"):
        st.subheader("Generated Code")
        with st.spinner('Generating Code...'):
            time.sleep(5)
            generated_code = get_predictions(user_prompt)
            #st.write(f"Generated Code: {generated_code}")
            st.code(generated_code)

if __name__ == '__main__':
    main()