In [0]:
pip install langchain-community databricks-sql-connector databricks_langchain openai databricks-sqlalchemy~=1.0

In [0]:
dbutils.library.restartPython()

In [0]:
from langchain.agents import create_sql_agent
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from pydantic import BaseModel, Field
from langchain.agents.agent_toolkits import SQLDatabaseToolkit
from langchain.sql_database import SQLDatabase
from langchain import OpenAI
from databricks_langchain import ChatDatabricks
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder.appName("TimeSeriesAnalysis").getOrCreate()
try:
    db = SQLDatabase.from_databricks(catalog="forecast", schema="schema1", engine_args={"pool_pre_ping": True})
except Exception as e:
    print(f"Error initializing LangChain DB: {e}")
    exit()

In [0]:
llm = ChatDatabricks(
    endpoint="databricks-meta-llama-3-1-70b-instruct",
    temperature=0.1,
    max_tokens=800,
)

toolkit = SQLDatabaseToolkit(db=db, llm=llm)
agent = create_sql_agent(llm=llm, toolkit=toolkit, verbose=True)

In [0]:
extract_catalog_schema_prompt = PromptTemplate(
    input_variables=["input"],
    template="""
    You are a helpful assistant that extracts catalog and schema names from a database query.
    The user has provided the following query: "{input}"

    Please extract the catalog and schema mentioned in the query.
    If either the catalog or schema is not provided, return 'Not provided' for the missing one.
    Format the response like: Catalog: <catalog_name>, Schema: <schema_name>.
    """
)

def process_user_query(user_query):
    catalog_schema_chain = LLMChain(prompt=extract_catalog_schema_prompt, llm=llm)
    catalog_schema_response = catalog_schema_chain.invoke({"input": user_query})
    # Extract catalog and schema from the response
    catalog, schema = 'Not provided', 'Not provided'
    if 'Catalog:' in catalog_schema_response['text'] and 'Schema:' in catalog_schema_response['text']:
        catalog = catalog_schema_response['text'].split('Catalog:')[1].split(',')[0].strip()
        schema = catalog_schema_response['text'].split('Schema:')[1].strip()
    return catalog.replace('.', ''), schema.replace('.', '')

In [0]:
user_query = "connect to the forecast catalog and schema1 schema"
catalog, schema= process_user_query(user_query)
print("Extracted Catalog:", catalog)
print("Extracted Schema:", schema)

In [0]:
# Ensure the database connection uses the extracted catalog and schema
db = SQLDatabase.from_databricks(
    catalog=catalog,
    schema=schema,
    engine_args={"pool_pre_ping": True}  # Prevent session expiration issues
)

# Fetch table names
try:
    tables = db.get_usable_table_names()
    print(f"Tables in {catalog}.{schema}: {tables}")
except Exception as e:
    print(f"Error retrieving tables from {catalog}.{schema}: {e}")


In [0]:
import pandas as pd

dataframes = {}

for table in tables:
    try:
        spark_df = spark.table(f"forecast.schema1.{table}")
        pandas_df = spark_df.toPandas()
        dataframes[table] = pandas_df
        print(f"Successfully retrieved data for table: {table}")
    except Exception as e:
        print(f"Error retrieving data from {table}: {e}")


In [0]:
for table_name, df in dataframes.items():
    print(f"Data for table: {table_name}")
    print(df.head())  
    print("\n")

In [0]:
import pandas as pd
import numpy as np
from scipy.stats import entropy, skew, kurtosis

def compute_statistics(dataframes):
    stats = {}
    
    for table_name, df in dataframes.items():
        table_stats = {}
        
        for col in df.columns:
            col_data = df[col].dropna()
            col_stats = {}

            if np.issubdtype(col_data.dtype, np.number):
                col_stats["count"] = col_data.count()
                col_stats["mean"] = col_data.mean()
                col_stats["median"] = col_data.median()
                col_stats["mode"] = col_data.mode().iloc[0] if not col_data.mode().empty else None
                col_stats["variance"] = col_data.var()
                col_stats["std_dev"] = col_data.std()
                col_stats["min"] = col_data.min()
                col_stats["max"] = col_data.max()
                col_stats["range"] = col_data.max() - col_data.min()
                col_stats["iqr"] = col_data.quantile(0.75) - col_data.quantile(0.25)
                col_stats["skewness"] = skew(col_data)
                col_stats["kurtosis"] = kurtosis(col_data)
                col_stats["sum"] = col_data.sum()
            
            elif col_data.dtype == 'bool':
                col_stats["count"] = col_data.count()
                col_stats["true_count"] = col_data.sum()
                col_stats["false_count"] = len(col_data) - col_data.sum()
                col_stats["true_ratio"] = col_data.mean()

            elif col_data.dtype == 'object' or col_data.dtype.name == 'category':
                col_stats["count"] = col_data.count()
                col_stats["unique"] = col_data.nunique()
                col_stats["mode"] = col_data.mode().iloc[0] if not col_data.mode().empty else None
                value_counts = col_data.value_counts(normalize=True)
                col_stats["entropy"] = entropy(value_counts, base=2) if not value_counts.empty else None

            elif np.issubdtype(col_data.dtype, np.datetime64):
                col_stats["count"] = col_data.count()
                col_stats["min_date"] = col_data.min()
                col_stats["max_date"] = col_data.max()
                col_stats["range_days"] = (col_data.max() - col_data.min()).days

            table_stats[col] = col_stats
        
        stats[table_name] = table_stats

    return stats


In [0]:
import pprint

stats = compute_statistics(dataframes)
table_name = "calendar"  
pprint.pprint(stats[table_name])  


In [0]:
stats_df = pd.DataFrame(
    {(table, col): stats[table][col] for table in stats for col in stats[table]}
).T

In [0]:
stats_df.head()

In [0]:
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType

stats_df = stats_df.fillna(np.nan)

for col in stats_df.select_dtypes(include=[np.number]).columns:
    stats_df[col] = stats_df[col].astype(float)

for col in stats_df.select_dtypes(include=[object]).columns:
    stats_df[col] = stats_df[col].astype(str)

spark = SparkSession.builder.getOrCreate()
spark_df = spark.createDataFrame(stats_df)
catalog = "forecast"
schema = "schema1"
table_name = "stats_table"

spark_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.{table_name}")



In [0]:
from langchain.chat_models import ChatDatabricks
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain


In [0]:
def get_table_schema(table_name):
    query = f"DESCRIBE {table_name}"
    result = db.run(query) 
    return result

# Step 2: Provide the LLM with a prompt to decide on the best merge strategy
merge_strategy_prompt = PromptTemplate(
    input_variables=["tables_schema"],
    template="""
    The user has provided the schema information of the following tables:

    {tables_schema}

    Based on the provided schemas, suggest the best strategy for merging these tables. 
    Consider factors such as common columns, column data types, and potential keys for joining.
    All tables are stored in 'dataframes' which refers to a dictionary where the keys are the table names (as strings), and the values are the corresponding Pandas DataFrames that hold the data from those tables.
    Merge the tables as a pandas DataFrame by providing the appropriate python code.
    Make merges stepwise, only providing the necessary code and little explanation for each step.
    Make sure the column datatypes are correctly matched when merging.
    """
)

def generate_merge_strategy(tables):
    tables_schema = ""
    for table in tables:
        schema = get_table_schema(table)
        tables_schema += f"Table {table}: {schema}\n\n"
    
    merge_strategy_chain = LLMChain(prompt=merge_strategy_prompt, llm=llm)
    merge_strategy_response = merge_strategy_chain.invoke({"tables_schema": tables_schema})
    return merge_strategy_response['text']

merge_strategy = generate_merge_strategy(tables)

print("Recommended Merge Strategy:")
print(merge_strategy)

In [0]:
for table in tables:
    print(f"Schema for table: {table}")
    
    # Get schema using the existing get_table_schema function
    display_data = get_table_schema(table)
    
    # If display_data is a string, evaluate it to a list of tuples
    if isinstance(display_data, str):
        import ast
        display_data = ast.literal_eval(display_data)  # Converts string to list of tuples
    
    # Print the schema in a grid format
    for item in display_data:
        print(f"Column Name: {item[0]}, Datatype: {item[1]}, Comment: {item[2]}")
    
    print("\n" + "-"*50 + "\n")


In [0]:
df_calendar = dataframes['calendar']
df_hobbies_1_test_0 = dataframes['hobbies_1_test_0']
df_merged_1 = pd.merge(df_calendar, df_hobbies_1_test_0, on='date', how='inner')

In [0]:
df_sales_train_evaluation = dataframes['sales_train_evaluation']

print(df_merged_1[['item_id', 'dept_id', 'cat_id', 'store_id', 'state_id']].dtypes)
print(df_sales_train_evaluation[['item_id', 'dept_id', 'cat_id', 'store_id', 'state_id']].dtypes)

# # Merge on 'item_id', 'dept_id', 'cat_id', 'store_id', 'state_id' columns
# df_merged_2 = pd.merge(df_merged_1, df_sales_train_evaluation, on=['item_id', 'dept_id', 'cat_id', 'store_id', 'state_id'], how='inner')