In [1]:
import os
import openai
import tiktoken
import json
from PIL import Image
import io
import base64
from dotenv import load_dotenv, find_dotenv
import PyPDF2



_ = load_dotenv(find_dotenv()) 

# Functions

In [2]:
# Load the API key from a JSON file
with open('config.json', 'r') as file:
    config = json.load(file)
    api_key = config['openai_api_key']

openai.api_key = str(api_key)

In [3]:
def resize_compress_encode_image(image_path, output_size=(300, 100), quality=60):
    # Resize and compress the image
    with Image.open(image_path) as img:
        img = img.resize(output_size, Image.Resampling.LANCZOS)
        buffer = io.BytesIO()
        img.save(buffer, format="JPEG", quality=quality)
        buffer.seek(0)
        encoded_image = base64.b64encode(buffer.read()).decode()
    return encoded_image

In [4]:
def get_completion(prompt, model="gpt-4-1106-preview"):
    messages = [{"role": "user", "content": prompt}]
    response = openai.ChatCompletion.create(
        model=model,
        messages=messages,
        temperature=0, # this is the degree of randomness of the model's output 
    )
    return response.choices[0].message["content"]

In [5]:
def send_image_query_to_openai(encoded_image):
    # Prepare the chat message payload with the encoded image
    payload = {
        "model": "gpt-4-vision-preview",  
        "messages": [
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": "What’s in this image?"},
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": f"data:image/jpeg;base64,{encoded_image}"
                        },
                    },
                ],
            }
        ]
    }

    # Send the request to the API
    response = openai.ChatCompletion.create(**payload)

    # Return the content of the response
    return response.choices[0].message["content"]

In [6]:
def query_openai_with_image_url(image_url, question="What’s in this image?"):
    # Prepare the chat message payload with the image URL
    payload = {
        "model": "gpt-4-vision-preview",  
        "messages": [
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": question},
                    {
                        "type": "image_url",
                        "image_url": {"url": image_url},
                    },
                ],
            }
        ]
    }

    # Send the request to the API
    response = openai.ChatCompletion.create(**payload)
    return response.choices[0].message["content"] if 'choices' in response and response.choices else "No content found"


In [7]:
def read_python_file_to_string(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            code_string = file.read()
        return code_string
    except FileNotFoundError:
        return "File not found."
    except Exception as e:
        return f"An error occurred: {str(e)}"

In [8]:
def notebook_to_string(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            notebook_content = json.load(file)
        return json.dumps(notebook_content, indent=4)  # Convert JSON content to a formatted string
    except FileNotFoundError:
        return "File not found: {}".format(file_path)
    except json.JSONDecodeError:
        return "Error decoding JSON from file: {}".format(file_path)
    except Exception as e:
        return "An error occurred: {}".format(str(e))

In [9]:
def string_to_notebook(json_string, file_path):
    try:
        # Parse the JSON string
        notebook_content = json.loads(json_string)

        # Write the JSON object to an .ipynb file
        with open(file_path, 'w', encoding='utf-8') as file:
            json.dump(notebook_content, file, indent=4)

        return f"Successfully created notebook: {file_path}"
    except json.JSONDecodeError:
        return "Invalid JSON string"
    except Exception as e:
        return f"An error occurred: {str(e)}"

In [10]:
def markdown_to_string(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            content = file.read()
        return content
    except FileNotFoundError:
        return f"File not found: {file_path}"
    except Exception as e:
        return f"An error occurred: {str(e)}"


In [11]:
def string_to_markdown(content, file_path):
    try:
        with open(file_path, 'w', encoding='utf-8') as file:
            file.write(content)
        return f"Successfully written to {file_path}"
    except Exception as e:
        return f"An error occurred: {str(e)}"



In [12]:
def chunk_text_by_question(text):
    # Split text using a positive lookbehind on the pattern "### Question X"
    # This keeps the pattern in the resulting list
    questions = re.split(r'(?<=\n### Question \d+\n)', text)

    # Remove the first empty element if it exists
    if questions and questions[0].strip() == '':
        questions = questions[1:]

    return questions

In [13]:
import re

def chunk_text_by_question(text):
    # Split text on the pattern, but keep the pattern in the result
    questions = re.split(r'(\n### Question \d+\n)', text)

    # Reattach the split pattern to each chunk
    combined_questions = []
    for i in range(1, len(questions), 2):
        combined_question = questions[i] + questions[i+1]
        combined_questions.append(combined_question.strip())

    return combined_questions



In [14]:
def append_to_markdown(file_path, text_to_append):
    try:
        with open(file_path, 'a', encoding='utf-8') as file:
            file.write(text_to_append)
        return f"Text appended successfully to {file_path}"
    except FileNotFoundError:
        return "File not found."
    except Exception as e:
        return f"An error occurred: {str(e)}"

# Example usage
# file_path = 'test_2.md'  # Replace with your Markdown file path
# text_to_append = question_list[29]
# result = append_to_markdown(file_path, text_to_append)
# print(result)


In [15]:
import nbformat
import os

def notebook_to_markdown(notebook_path, markdown_path):
    try:
        # Read the notebook
        with open(notebook_path, 'r', encoding='utf-8') as file:
            notebook = nbformat.read(file, as_version=4)

        # Process each cell and extract content
        markdown_content = []
        for cell in notebook.cells:
            if cell.cell_type == 'markdown':
                # Directly add markdown content
                markdown_content.append(''.join(cell.source))
            elif cell.cell_type == 'code':
                # Add code in Markdown code block format
                code_block = '```python\n' + ''.join(cell.source) + '\n```'
                markdown_content.append(code_block)

        # Write content to the markdown file
        with open(markdown_path, 'w', encoding='utf-8') as md_file:
            md_file.write('\n'.join(markdown_content))

        return f"Markdown file created successfully: {markdown_path}"
    except FileNotFoundError:
        return "Notebook file not found."
    except Exception as e:
        return f"An error occurred: {str(e)}"




In [16]:
def resize_compress_encode_image(image_path, output_size=(300, 100), quality=60):
    # Resize and compress the image
    with Image.open(image_path) as img:
        img = img.resize(output_size, Image.Resampling.LANCZOS)
        buffer = io.BytesIO()
        img.save(buffer, format="JPEG", quality=quality)
        buffer.seek(0)
        encoded_image = base64.b64encode(buffer.read()).decode()
    return encoded_image

# Actions

In [9]:
input_file = "test_1.ipynb"
output_file = "test_1.md"
content = notebook_to_string(input_file)
string_to_markdown(content, output_file)

'Successfully written to test_1.md'

# Prompt 1: Create ebook

In [23]:
files = [ "test_5_part_2.md"]
for i, file in enumerate(files, start=1):
    print(file)
    content_string = markdown_to_string(file)

    prompt = f'''Please read practice exam for the Databricks Certified Data Engineer Associate exam and create 
    a similar multiple-choice test. However, modify it by changing questions and the correct answers and slightly altering 
    the knowledge topics.
    But if question 5 is lakehouse, then you create a lakehouse question for new question 5, 
    if question is about data governance question, then create data governance question. 
    After each question, add correct answers, exam topic (Databricks Lakehouse Platform or ELT with Spark SQL and Python or
    Incremental Data Processing or Production Pipelines or Data Governance), and explanation. 
    I would like the new test to be structured in the same format as the original, and returned to me a markdown format too. 
    My input json {content_string}'''

    completion = get_completion(prompt)
    if completion:
        output_path = f"test_11_part_2.md"
        print(output_path)
        result = string_to_markdown(completion, output_path)

test_5_part_2.md
test_11_part_2.md


In [23]:
files = ["test_10_part_1.md", "test_10_part_2.md", "test_10_part_3.md", "test_10_part_4.md"]
for i, file in enumerate(files, start=1):
    print(file)
    content_string = markdown_to_string(file)

    prompt = f'''Please read practice exam for the Databricks Certified Data Engineer Associate exam and create 
    a similar multiple-choice test. However, modify it by changing questions and the correct answers and slightly altering 
    the knowledge topics.
    But if question 5 is lakehouse, then you create a lakehouse question for new question 5, 
    if question is about data governance question, then create data governance question. 
    After each question, add correct answers, exam topic (Databricks Lakehouse Platform or ELT with Spark SQL and Python or
    Incremental Data Processing or Production Pipelines or Data Governance), and explanation. 
    I would like the new test to be structured in the same format as the original, and returned to me a markdown format too. 
    My input json {content_string}'''

    completion = get_completion(prompt)
    if completion:
        output_path = f"test_12_part_{i}.md"
        print(output_path)
        result = string_to_markdown(completion, output_path)

test_10_part_1.md
test_12_part_1.md
test_10_part_2.md
test_12_part_2.md
test_10_part_3.md
test_12_part_3.md
test_10_part_4.md
test_12_part_4.md


# Prompt 2: Test image AI

In [42]:
# Example usage
encoded_image = resize_compress_encode_image('ai_chart.png')  # Make sure to use your function to get the encoded image
response_content = send_image_query_to_openai(encoded_image)
print(response_content)

The image shows a stylized 3D representation of data visualization models. It


In [44]:
# Example usage
image_url = "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg"
response_content = query_openai_with_image_url(image_url)
print(response_content)

{
  "id": "chatcmpl-8Vp3uiYFyj6Da6aVQbmwcluWCq2sD",
  "object": "chat.completion",
  "created": 1702595142,
  "model": "gpt-4-1106-vision-preview",
  "usage": {
    "prompt_tokens": 1118,
    "completion_tokens": 16,
    "total_tokens": 1134
  },
  "choices": [
    {
      "message": {
        "role": "assistant",
        "content": "The image shows a picturesque natural landscape featuring a wooden boardwalk extending through a lush"
      },
      "finish_details": {
        "type": "max_tokens"
      },
      "index": 0
    }
  ]
}
The image shows a picturesque natural landscape featuring a wooden boardwalk extending through a lush


In [29]:
def extract_text_from_pdf(pdf_path):
    with open(pdf_path, 'rb') as file:
        reader = PyPDF2.PdfReader(file)
        text = ""
        for page in reader.pages:
            text += page.extract_text()
    return text

In [34]:
pdf_text = extract_text_from_pdf('test_1.pdf')

In [36]:
prompt = f'''
How many quesiton are here? {pdf_text}
'''
completion = get_completion(prompt)
if completion:
    print(completion)

There are 45 questions listed in your message.


# Combine files

In [27]:
file_patterns = []
for i in range(1, 13):
    fname = f'test_{i}_part_*'
    print(fname)
    file_patterns.append(fname)

test_1_part_*
test_2_part_*
test_3_part_*
test_4_part_*
test_5_part_*
test_6_part_*
test_7_part_*
test_8_part_*
test_9_part_*
test_10_part_*
test_11_part_*
test_12_part_*


In [33]:
import glob
import os

# Define the directory where the combined files will be saved
output_directory = 'tests'

# Create the directory if it does not exist
if not os.path.exists(output_directory):
    os.makedirs(output_directory)


for pattern in file_patterns:
    # Find all files matching the pattern
    files_to_combine = glob.glob(f'{pattern}.md')

    # Sort the files to maintain order
    files_to_combine.sort()

    # Define the name of the combined file, saved in the 'tests' directory
    combined_file_name = os.path.join(output_directory, pattern.split('_part_')[0] + '.md')

    # Combine the contents of the files
    with open(combined_file_name, 'w', encoding='utf-8') as combined_file:
        for file_name in files_to_combine:
            with open(file_name, 'r', encoding='utf-8') as file:
                # Write the content of each file to the combined file
                combined_file.write(file.read() + '\n')

print("Files combined successfully.")



Files combined successfully.


In [34]:
file_patterns = []
for i in range(1, 13):
    fname = f'test_*'
    print(fname)
    file_patterns.append(fname)

test_*
test_*
test_*
test_*
test_*
test_*
test_*
test_*
test_*
test_*
test_*
test_*


In [37]:
import os

# Define the directory containing the files and the output file name
input_directory = 'tests'
output_file = 'tests.md'

# Create or open the output file
with open(output_file, 'w', encoding='utf-8') as outfile:
    # Iterate through each file in the specified directory
    for filename in os.listdir(input_directory):
        filepath = os.path.join(input_directory, filename)
        # Check if it's a file, not a directory
        if os.path.isfile(filepath):
            with open(filepath, 'r', encoding='utf-8') as infile:
                # Read the contents of the file and write it to the output file
                outfile.write(infile.read() + '\n')

print(f"All files in {input_directory} have been combined into {output_file}.")



All files in tests have been combined into tests.md.


In [38]:
import subprocess

def markdown_to_pdf_pandoc(markdown_file, pdf_file, margin='1in', fontsize='10pt', landscape=False):
    try:
        command = ["pandoc", markdown_file, "-o", pdf_file, "-V", f"geometry:margin={margin}", "-V", f"fontsize={fontsize}"]
        if landscape:
            command.extend(["-V", "geometry:landscape"])
        
        subprocess.run(command, check=True)

        return f"PDF created successfully: {pdf_file}"
    except FileNotFoundError:
        return "Markdown file not found or Pandoc is not installed."
    except subprocess.CalledProcessError as e:
        return f"An error occurred during conversion: {str(e)}"
    except Exception as e:
        return f"An error occurred: {str(e)}"

# Example usage
markdown_file = 'tests.md'
pdf_file = 'tests.pdf'
result = markdown_to_pdf_pandoc(markdown_file, pdf_file, landscape=True)  # Change landscape to False if not needed
print(result)


An error occurred during conversion: Command '['pandoc', 'tests.md', '-o', 'tests.pdf', '-V', 'geometry:margin=1in', '-V', 'fontsize=10pt', '-V', 'geometry:landscape']' returned non-zero exit status 47.
