
# DataSet Analysis Langchain Batch



## Details
This notebook utilizes Azure OpenAi to produce individual data analysis for specified tables in a dataframe in batch. This demonstration focuses on table residing in the Unity Catalog, though any table converted to the dataframe can be analyzed.

In [0]:
# Force reinstall a specific version of numpy package
%pip install --force-reinstall numpy==1.26.2 

In [0]:
# Install the latest version of the langchain package quietly
%pip install -qU langchain 

In [0]:
# Install the latest version of the langchain-openai package quietly
%pip install -qU langchain-openai

In [0]:
# Install the latest version of the langchain-community package quietly
# %pip install -qU langchain-community

In [0]:
# Install the latest version of the langchain-experimental package quietly
%pip install -qU langchain-experimental

In [0]:
# Install the latest version of the tabulate package quietly
%pip install -qU tabulate

In [0]:
# %pip install git+https://github.com/serena-ruan/mlflow.git@main

In [0]:
# Restart the Python process for the current notebook session
dbutils.library.restartPython()

In [0]:
import logging  # For logging information
import pandas as pd  # For data manipulation and analysis
import pyspark.sql.functions as F
import time  # For time-related functions
import warnings  # For handling warnings
from datetime import datetime  # For manipulating dates and times
from IPython.display import display, Markdown  # For displaying rich content in notebooks
from langchain_openai import AzureChatOpenAI  # For Azure OpenAI model integration
from langchain.agents import AgentExecutor  # For executing agents
from langchain.agents.agent_types import AgentType  # For defining agent types
from langchain_experimental.agents.agent_toolkits import create_pandas_dataframe_agent, create_spark_dataframe_agent # For creating pandas dataframe agents
from pyspark.sql.types import *  # For defining Spark data types
from tabulate import tabulate  # For tabulating data

warnings.simplefilter(action='ignore', category=FutureWarning)  # Ignore future warnings

logger = logging.getLogger(__name__)  # Create a logger

logger.propagate = False  # Prevent the logger from propagating messages to the root logger

logger.setLevel(logging.INFO)  # Set the logging level to INFO

stream_handler = logging.StreamHandler()  # Create a stream handler

stream_handler.setLevel(logging.INFO)  # Set the stream handler logging level to INFO

formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')  # Define a log message format

stream_handler.setFormatter(formatter)  # Set the formatter for the stream handler

logger.addHandler(stream_handler)  # Add the stream handler to the logger

# Function to insert a value at the top of a markdown string
def insert_at_top(markdown_string, value_to_insert):
    updated_markdown = f"{value_to_insert}\n\n{markdown_string}"
    return updated_markdown

# Function to map PySpark data types to T-SQL data types
def get_tsql_dtype(data_type):
    # Dictionary mapping PySpark data types to T-SQL data types
    pyspark_to_tsql = {
        "string": "varchar",
        "int": "int",
        "bigint": "bigint",
        "long": "bigint",
        "float": "float",
        "double": "float",
        "boolean": "bit",
        "date": "date",
        "timestamp": "datetime",
        "binary": "varbinary",
        "decimal": "decimal"
    }
    # Retrieve the corresponding T-SQL data type for the given PySpark data type
    field_type = pyspark_to_tsql[data_type]
    return field_type

In [0]:
# Retrieve the OpenAI API key from Databricks secrets
api_key = dbutils.secrets.get(scope="djsdbsecrets", key="openai-key")

# Define the Azure OpenAI endpoint
azure_endpoint = dbutils.secrets.get(scope="djsdbsecrets", key="openai-uri")

# Define the deployment name for the OpenAI model
deployment = "gpt-4o"

# Define the catalog name for the database
catalog = "generaldata"

# Define the schema name for the database
schema = 'dataanalysis'

# Initialize the AzureChatOpenAI model with the specified parameters
llm = AzureChatOpenAI(
    azure_deployment=deployment, 
    azure_endpoint=azure_endpoint,  
    openai_api_version="2024-12-01-preview",  
    openai_api_key=api_key,  
    model=deployment,
    temperature=0 
)

In [0]:
table = "air_quality"

# Construct the full table name using catalog, schema, and table name
full_table_name = f"{catalog}.{schema}.{table}"

# Log the start of processing for the current table
logger.info(f"Processing table {full_table_name}")

# Generate a timestamp string for the current date and time in the format "YYYY-MM-DD_HH-MM-SS"
file_date_stamp = str(datetime.now().strftime("%Y-%m-%d_%H-%M-%S"))

# Define the analysis report name based on the full table name
analysis_report_name = f"analysis_output_{file_date_stamp}_{full_table_name.replace('.','_')}.md"

# Create a SQL query to select the first 20 rows from the current table
table_select_query = f"SELECT * FROM {full_table_name}"

# Execute the query to retrieve data from the current table
analysis_df = spark.sql(table_select_query)

# Initialize a list to store columns with binary data types
columns_to_drop = []

# Iterate over the data types of the DataFrame columns
for dt in analysis_df.dtypes:
# Check if the column data type is binary
    if dt[1] == 'binary':
        # Add the column name to the list of columns to drop
        columns_to_drop.append(dt[0])
        # Log the column name that will be dropped
        logger.info(f"The following column(s) will be dropped from analysis dataframe as they are binary data types: {dt[0]}")

# Drop the columns with binary data types from the DataFrame
analysis_df = analysis_df.drop(*columns_to_drop) 

# # Convert Spark DataFrame to Pandas DataFrame
# open_ai_df = analysis_df.toPandas()

# Pause execution for 30 seconds to avoid hitting rate limits or overloading the API
time.sleep(60)

# Create a Spark DataFrame agent using the Azure OpenAI model
agent = create_spark_dataframe_agent(
llm,  # Azure OpenAI model initialized in the previous cell
analysis_df,  # DataFrame to be analyzed
agent_type=AgentType.OPENAI_FUNCTIONS,  # Specify the agent type
agent_executor_kwargs={"handle_parsing_errors": True},  # Handle parsing errors
allow_dangerous_code=True  # Allow execution of potentially dangerous code
)


# Define the input prompt for the EDA agent
eda_input = """
You are a data analyst given the provided dataframe. Perform and display an Exploratory Data Analysis on the provided dataframe. Please focus on providing a thorough analysis of the dataset. Ensure your analysis is detailed, tailored, and user-centric to help the user understand the data. ALWAYS FINISH THE OUTPUT. Never send partial responses. STRICTLY use the context provided to generate your response. DO NOT GENERATE ANYTHING FROM YOUR PRE-TRAINED MEMORY.  
"""

# Invoke the agent with the EDA input to generate the analysis response
response = agent.invoke({"input": eda_input.strip()})

# Format the output as Markdown
output = Markdown(response['output']) 

# Create the header for the analysis report including the data types table
output_header = f"# Data Analysis for table {full_table_name}"

# Insert the header at the top of the analysis output

# vol_path = "/Volumes/generaldata/dataanalysis/markdowndocs"

# with open(f"{vol_path}/{analysis_report_name}", "w") as file:
#     file.write(str(updated_output))

# logging.info(f"Analysis report saved to {vol_path}/{analysis_report_name}")
display(Markdown(insert_at_top(output.data,output_header)))  