<a href="https://colab.research.google.com/github/cooolbabu/GoogleGemini101/blob/main/AzureDatabricks/BookstoreMedallion_BookstoreDB_OpenAI_T1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Package installs

In [62]:
%pip install openai
%pip install PyGithub



### Imports, get keys, get llm client and set model to variable MODEL_NAME

In [63]:

import json
import re
from openai import OpenAI
from pprint import pprint
from google.colab import userdata
from github import Github

# Get the OpenAI API key from Colab secrets
github_token=userdata.get('Github_Token')
openai_api_key=userdata.get('OPENAI_API_KEY')
# Initialize a GitHub instance
g = Github(github_token)

client = OpenAI(api_key=openai_api_key)
MODEL_NAME = "gpt-3.5-turbo-1106"

### Github helper functions
* read_file_as_string()
* check_in_file(repo_name, file_path, file_content, content_tag, branch)

In [64]:
def read_file_as_string(file_path):
  """
      Reads the file and return a string representation of the file contents

      Parameters:
          file_path (str): Filename including filepath
  """
  try:
      with open(file_path, 'r') as file:
          file_contents = file.read()
      return file_contents
  except FileNotFoundError:
      print(f"File '{file_path}' not found.")
      return None
  except Exception as e:
      print(f"An error occurred: {e}")
      return None

def check_in_file(repo_name, file_path, file_content, content_tag, branch):
    """
        Checks if a specific file exists in a GitHub repository and updates it with new content if it does.
        If the file does not exist, it creates a new file with the provided content.

        This function operates on a specific branch named 'test'. If updating, it will commit the changes with a given content tag as the commit message.
        In case the file needs to be created, it will also use the content tag as the commit message for the new file.

        Parameters:
        - repo_name (str): The name of the repository, formatted as 'username/repository'.
        - file_path (str): The path to the file within the repository. This should include the file name and its extension.
        - file_content (str): The content to be written to the file. This is used both for updating and creating the file.
        - content_tag (str): A message associated with the commit used for updating or creating the file.
        - branch (str): Github branch for the code

        Behavior:
        - If the file exists at the specified path, it updates the file with `file_content`, using `content_tag` as the commit message.
        - If the file does not exist, it creates a new file at the specified path with `file_content`, also using `content_tag` as the commit message for creation.
        - Upon successful update or creation, prints a success message indicating the action taken.
    """

    # Get the repository
    repo = g.get_repo(repo_name)

    try:
        # Get the contents of the file if it exists
        file = repo.get_contents(file_path, ref=branch)

        # Update the file
        repo.update_file(file_path, content_tag, file_content, file.sha, branch=branch)
        print(f"File '{file_path}' updated successfully.")
    except:
        # If the file doesn't exist, create it
        print(f"{file_path}/{file_content} does not exist")
        repo.create_file(file_path, content_tag, file_content, branch=branch)
        print(f"File '{file_path}' created successfully.")

def create_notebook(response, system_message, instructions, filename):
    # Extract summary, code, and explanation from the response JSON
    summary = response["summary"]

    # Create the notebook content
    summary_section = f"# Databricks notebook source\n# MAGIC %md\n# MAGIC # Summary\n# MAGIC {summary}\n"

    parts_section = ""
    for element in response_data['parts']:
      parts_section += f"# COMMAND ----------\n# MAGIC %md\n# MAGIC ##{element['sub_header']}\n# COMMAND ----------\n# MAGIC %md\n# MAGIC ## Explanation\n{element['explanation']}\n# COMMAND ----------\n{element['code']}\n"

    instructions_section = f"""
# COMMAND ----------
# MAGIC %md
# MAGIC # GenAI Instructions
# COMMAND ----------\n
# MAGIC %md
# MAGIC * ## System message to AI
# MAGIC   * {system_message}

# COMMAND ----------
# MAGIC %md
# MAGIC * ## Instructions to AI (Try edit mode for visualizing table structure)
# MAGIC   * {instructions}
"""
    notebook_content = summary_section + parts_section + instructions_section
    # Write the notebook content to a file
    with open(filename, "w") as f:
        f.write(notebook_content)

    print(f"Notebook '{filename}' has been created.")

    return notebook_content

In [65]:
import ast

def convert_str_to_dict(s):
    try:
        d = ast.literal_eval(s)
        if isinstance(d, dict):
            return d
        else:
            raise ValueError("Input is not a valid dictionary string")
    except (ValueError, SyntaxError):
        raise ValueError("Input is not a valid dictionary string")

import string

def strip_control_characters_old(s):
    # Create a translation table that maps all control characters to None
    control_chars = dict.fromkeys(range(0x00, 0x20), ' ')
    control_chars.update(dict.fromkeys(range(0x7f, 0xa0), ' '))

    # Translate the string using the translation table
    cleaned_str = s.translate(dict.fromkeys(control_chars, ' '))

    return cleaned_str

def strip_control_characters(s):
    # Create a translation table that maps all control characters and special characters to a space ' '
    control_chars = dict.fromkeys(range(0x00, 0x09), ' ')  # Exclude \n, \r, \f
    control_chars.update(dict.fromkeys(range(0x0B, 0x0C), ' '))
    control_chars.update(dict.fromkeys(range(0x0E, 0x20), ' '))
    control_chars.update(dict.fromkeys(range(0x7f, 0xa0), ' '))
    special_chars = dict.fromkeys(map(ord, string.punctuation.replace('\n', '').replace('\r', '').replace('\f', '')), ' ')
    control_chars.update(special_chars)

    # Translate the string using the translation table
    cleaned_str = s.translate(control_chars)

# Setup
1.   System Message
2.   User Message



In [67]:


system_message = """
You are  Azure Databricks data engineer.
    - You will be given tasks and asked to write pyspark code.
    - You will use best practices for writing code.
    - Your response will be in JSON format with keys summary, number_of_parts, sub_header, code, explanation.
    - JSON format must be summary, number_of_parts and parts. Parts must be an array containing code and explanation
  """.strip()

user_message_content = read_file_as_string("./BookstorePrompt.txt")
print(user_message_content)

Please build a pyspark program for Azure Databricks using Medallion framework.
- I will give you the table schema. I will provide general instructions and instructions for each step. 
- The schema for the tables is as follows

- customers table schema
root
 |-- customer_id: string (nullable = true)
 |-- email: string (nullable = true)
 |-- profile: string (nullable = true)
 |-- updated: string (nullable = true)

- books table schema
root
 |-- book_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author: string (nullable = true)
 |-- category: string (nullable = true)
 |-- price: double (nullable = true)
 
- orders_bronze table schema
root
 |-- order_id: string (nullable = true)
 |-- order_timestamp: long (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- quantity: long (nullable = true)
 |-- total: integer (nullable = true)
 |-- books: array (nullable = true)
 |--|-- element: struct (containsNull = true)
 |--|--|--- book_id: string (nullable = true)

In [68]:
print(system_message)

You are  Azure Databricks data engineer.
    - You will be given tasks and asked to write pyspark code.
    - You will use best practices for writing code.
    - Your response will be in JSON format with keys summary, number_of_parts, sub_header, code, explanation.
    - JSON format must be summary, number_of_parts and parts. Parts must be an array containing code and explanation


# Make the call to LLMs

In [69]:
# Create the message with variables

response = client.chat.completions.create(
    model="gpt-4-1106-preview",
    temperature = 0,
    response_format = {"type" : "json_object"},
    messages=[
        {"role": "system", "content": system_message},
        {"role": "user", "content": user_message_content}]
)

# Assuming you have a client setup for interaction. Ensure to configure your OpenAI client appropriately.



In [70]:
print(f"\nResponse id: {response.id}\nCreated: {response.created}\nModel: {response.model}\nCompletion Tokens: {response.usage.completion_tokens}\nPrompt Tokens: {response.usage.prompt_tokens}\nTotal tokens: {response.usage.total_tokens}")
#print(response.created)

print(response.choices[0].message.content)



Response id: chatcmpl-93YlkUL670ob8KCR4GFuWRCcCsY4C
Created: 1710635664
Model: gpt-4-1106-preview
Completion Tokens: 1348
Prompt Tokens: 1153
Total tokens: 2501
{
    "summary": "Medallion architecture implementation in Azure Databricks using PySpark",
    "number_of_parts": 3,
    "parts": [
        {
            "sub_header": "Part 1: Ingesting Data into Orders_Bronze Table",
            "code": "from pyspark.sql import SparkSession\nfrom pyspark.sql.functions import input_file_name, current_timestamp\n\n# Initialize Spark session\nspark = SparkSession.builder.appName('OrdersBronzeIngestion').getOrCreate()\n\n# Define variables\ninput_folder = 'dbfs:/mnt/bookstore/orders-raw'\ncheckpoint_location = 'dbfs:/mnt/bookstore/checkpoints/orders_bronze'\ntarget_table = 'orders_bronze'\n\n# Ingest data using AutoLoader\nbronze_df = (spark.readStream.format('cloudFiles')\n    .option('cloudFiles.format', 'parquet')\n    .option('cloudFiles.schemaLocation', checkpoint_location)\n    .option('c

# Validate response from LLM

In [71]:
response_data = json.loads(response.choices[0].message.content)

In [32]:
response_data['summary']
response_data['parts'][0]

{'sub_header': 'Part 1: Ingesting Data into Orders_Bronze Table',
 'code': "from pyspark.sql.functions import input_file_name, current_timestamp\nfrom pyspark.sql import SparkSession\n\n# Initialize Spark session\nspark = SparkSession.builder.appName('OrdersBronzeIngestion').getOrCreate()\n\n# Define variables\ninput_folder = 'dbfs:/mnt/bookstore/orders-raw'\ncheckpoint_location = 'dbfs:/mnt/bookstore/checkpoints/orders_bronze'\ntarget_table = 'orders_bronze'\n\n# Ingest data using AutoLoader\norders_bronze_df = (spark.readStream.format('cloudFiles')\n    .option('cloudFiles.format', 'parquet')\n    .option('cloudFiles.schemaLocation', checkpoint_location)\n    .option('cloudFiles.schemaEvolutionMode', 'addNewColumns')\n    .load(input_folder)\n    .withColumn('file_name', input_file_name())\n    .withColumn('processed_timestamp', current_timestamp())\n)\n\n# Write to table\n(orders_bronze_df.writeStream.format('delta')\n    .outputMode('append')\n    .option('checkpointLocation', chec

In [72]:
file_contents = create_notebook(response_data, system_message, user_message_content, "orders_bronze_notebook-t2.py")

Notebook 'orders_bronze_notebook-t2.py' has been created.


In [73]:
print(file_contents)

# Databricks notebook source
# MAGIC %md
# MAGIC # Summary
# MAGIC Medallion architecture implementation in Azure Databricks using PySpark
# COMMAND ----------
# MAGIC %md
# MAGIC ##Part 1: Ingesting Data into Orders_Bronze Table
# COMMAND ----------
# MAGIC %md
# MAGIC ## Explanation
This code initializes a Spark session and sets up the AutoLoader to ingest data from the specified input folder. The data is read in parquet format, and schema evolution is enabled to handle any new columns in the data. The input_file_name and current_timestamp functions are used to append the file_name and processed_timestamp columns. The data is then written to the orders_bronze table in append mode, with a checkpoint location specified for fault tolerance. The trigger option 'availableNow' is used to process the available files immediately.
# COMMAND ----------
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name, current_timestamp

# Initialize Spark session
spark = S

# Check into Github
*   repository : "cooolbabu/GoogleGemini101"
*   filename : "AzureDatabricks/filename" - specify actual filename
*   filecontent: Contents of the file to check in
*   tag_name: give a comment. It will show in Github
* branch: branch name to check into. Ensure that branch already exists
          Future TODO: if branch doesn't exist (notify, ask, process)



In [74]:


check_in_file(repo_name="cooolbabu/GoogleGemini101",
              file_path="AzureDatabricks/ConfigureDB/orders_streams_p3.py",
              file_content=file_contents,
              content_tag='Orders streaming p3',
              branch="pyspark-genai-t3")

AzureDatabricks/ConfigureDB/orders_streams_p3.py/# Databricks notebook source
# MAGIC %md
# MAGIC # Summary
# MAGIC Medallion architecture implementation in Azure Databricks using PySpark
# COMMAND ----------
# MAGIC %md
# MAGIC ##Part 1: Ingesting Data into Orders_Bronze Table
# COMMAND ----------
# MAGIC %md
# MAGIC ## Explanation
This code initializes a Spark session and sets up the AutoLoader to ingest data from the specified input folder. The data is read in parquet format, and schema evolution is enabled to handle any new columns in the data. The input_file_name and current_timestamp functions are used to append the file_name and processed_timestamp columns. The data is then written to the orders_bronze table in append mode, with a checkpoint location specified for fault tolerance. The trigger option 'availableNow' is used to process the available files immediately.
# COMMAND ----------
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name, curren

In [None]:
content_dict = json.loads(response.choices[0].message.content)

code_snippets = {}
explanations = {}
for part, details in content_dict['code'].items():
  for task, task_details in details.items():
      code_snippets[part] = task_details['code']
      explanations[part] = task_details['explanation']

print(content_dict["summary"])
print(code_snippets)
print(explanations)

Created a Pyspark program using Medallion framework for Azure Databricks following the instructions provided.
{'part1': "spark = SparkSession.builder.appName('OrderBronzeIngest').getOrCreate()\n\norders_bronze_df = spark.readStream.format('cloudFiles')\n    .option('cloudFiles.format', 'parquet')\n    .option('cloudFiles.includeExistingFiles', 'true')\n    .option('cloudFiles.useNotifications', 'true')\n    .option('cloudFiles.url', 'dbfs:/mnt/bookstore/orders-raw')\n    .option('cloudFiles.format', 'parquet')\n    .load()\n\norders_bronze_df = orders_bronze_df.withColumn('file_name', input_file_name())\n    .withColumn('processed_timestamp', current_timestamp())\n\ncheckpoint_location = 'dbfs:/mnt/bookstore/checkpoints/orders_bronze'\n\nquery = orders_bronze_df.writeStream.format('delta')\n    .outputMode('append')\n    .option('checkpointLocation', checkpoint_location)\n    .trigger(availableNow=True)\n    .table('orders_bronze')", 'part2': "customers_table = 'customers'\norders_bron