In [1]:
from pyspark.sql import SparkSession
import os

spark = SparkSession.builder \
    .appName("CRM Data Analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

file_path = r"C:\Users\KIIT\Desktop\synthetic_crm_data.csv"
sql_friendly_path = file_path.replace("\\", "/")

if os.path.isabs(sql_friendly_path) and not sql_friendly_path.startswith("file:///"):
    sql_friendly_path = "file:///" + sql_friendly_path

table_name = "crm_data"

spark.sql(f"DROP TABLE IF EXISTS {table_name}")

try:
    spark.sql(f"""
        CREATE TABLE {table_name}
        USING csv
        OPTIONS (path '{sql_friendly_path}', header 'true', inferSchema 'true')
    """)
    print(f"Table '{table_name}' created successfully using path: '{sql_friendly_path}'")
except Exception as e:
    print(f"Error creating table '{table_name}': {e}")
    spark.stop()
    exit()
    
print(f"Tables in database: {spark.catalog.listTables()}")

try:
    df = spark.table(table_name)
    print(f"Successfully loaded data from table '{table_name}' into DataFrame.")
except Exception as e:
    print(f"Error loading data from table '{table_name}': {e}")
    spark.stop()
    exit()

schema_df = df.schema

schema_details = []
for field in schema_df.fields:
    col_name = field.name
    col_type_str = str(field.dataType)
    
    try:
        sample_values_rows = spark.sql(f"SELECT DISTINCT `{col_name}` FROM {table_name} LIMIT 5").collect()
        sample_values = [row[0] for row in sample_values_rows]
        sample_str = ", ".join([str(val) for val in sample_values if val is not None])
        schema_details.append(f"Column: `{col_name}` (Type: {col_type_str}) - Sample distinct values: [{sample_str}]")
    except Exception as e:
        schema_details.append(f"Column: `{col_name}` (Type: {col_type_str}) - Error fetching sample values: {e}")

try:
    head_df_pandas = df.limit(5).toPandas()
    head_str = head_df_pandas.to_string(index=False)
    print(head_str)
except Exception as e:
    print(f"Error generating DataFrame head: {e}")

schema_details

Table 'crm_data' created successfully using path: 'file:///C:/Users/KIIT/Desktop/synthetic_crm_data.csv'
Tables in database: [Table(name='crm_data', catalog='spark_catalog', namespace=['default'], description=None, tableType='EXTERNAL', isTemporary=False)]
Successfully loaded data from table 'crm_data' into DataFrame.
           Name                              Email                Phone                Company   Industry   Status LastContacted
     John Lewis stevenblackburn@robbins-turner.com           1653970950            Santana LLC     Retail Prospect    2024-06-04
  William Davis            mullinswesley@yahoo.com     740-058-6169x086         Rosario-Thomas Healthcare Customer    2025-03-04
    Kelly Patel             alicejones@pittman.com   447-611-7401x94969 Young, White and Smith Technology     Lead    2024-11-21
Chris Wilkerson          angelvillanueva@gmail.com         821.900.9018                 Wu PLC     Retail     Lead    2024-12-28
   Sherry Small                    

['Column: `Name` (Type: StringType()) - Sample distinct values: [Jeffrey Hunt, Kelly Patel, Linda Meyer, Pamela Young, Lori Ramirez]',
 'Column: `Email` (Type: StringType()) - Sample distinct values: [kirbyarthur@fields-anderson.net, carrie25@jackson-christian.com, munozanthony@wilson-fuller.info, jjackson@wood.biz, melanie22@carrillo.com]',
 'Column: `Phone` (Type: StringType()) - Sample distinct values: [001-280-768-7988x797, +1-345-140-1522x36173, (055)413-6228x641, 5888393372, +1-529-506-5957x6137]',
 'Column: `Company` (Type: StringType()) - Sample distinct values: [King, Bailey and Berry, Hansen Group, Ali, Lee and Case, Pearson-Espinoza, Hernandez-Heath]',
 'Column: `Industry` (Type: StringType()) - Sample distinct values: [Education, Healthcare, Finance, Technology, Retail]',
 'Column: `Status` (Type: StringType()) - Sample distinct values: [Prospect, Lead, Customer, Churned]',
 'Column: `LastContacted` (Type: DateType()) - Sample distinct values: [2025-02-16, 2024-06-04, 2024-

In [2]:
from langchain_openai import ChatOpenAI
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate, ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate
from openai import OpenAI
from langchain_core.output_parsers import StrOutputParser
import traceback


client = OpenAI(
    base_url='http://localhost:11434/v1',
    api_key='ollama'
)

user_query = [
    "Count of customers by status",
    "Count of customers by industry",
    "List of all churned customers",
    "Customers who haven't been contacted in the last 60 days"
]

system_template = """You are an expert SQL query generator specializing in Spark SQL syntax. 
Generate precise, optimized SQL queries based on the schema provided.
Always follow these rules:
1. Only return the SQL query with no explanations or markdown formatting
2. For date calculations, use date_sub(current_date(), n) for subtracting days
3. Do not use DATE('now'), NOW(), or MySQL/SQLite-style date functions
4. Use simple capital letters for main query aliases
5. Always reference the table as 'crm_data'
6. Use proper date functions compatible with Spark SQL
7. Never modify the schema
8. Be consistent in your query generation approach
"""


human_template = """Based on the schema and sample data below, write a SQL query that answers this question: {user_query}

SCHEMA DETAILS:
{schema_str}

SAMPLE DATA:
{head_str}

SQL QUERY:"""

chat_prompt = ChatPromptTemplate.from_messages([
    SystemMessagePromptTemplate.from_template(system_template),
    HumanMessagePromptTemplate.from_template(human_template)
])


llm1 = ChatOpenAI(
    model="gemma3",
    openai_api_base="http://localhost:11434/v1",
    openai_api_key="ollama"
)

parser = StrOutputParser()
result = chat_prompt | llm1 | parser


def generate_query(user_queries):
    sql_queries = []
    for query in user_queries:
        sql = result.invoke({
            "schema_str": "\n".join(schema_details),
            "head_str": head_str,
            "user_query": query
        })
        sql = sql.replace("```sql", "").replace("```", "").strip()
        sql_queries.append(sql)
    return sql_queries


check_query_prompt = PromptTemplate.from_template("""
Check if the following SQL query is valid for the given schema and Spark SQL. 
If any error is found, fix the query as per the schema and Spark SQL requirements.
Return only the corrected SQL query, nothing else.

Schema:
{schema_str}

SQL Query:
{sql_query}
""")

checker_chain = check_query_prompt | llm1 | parser

def check(sql_queries_tuple):
    checked_queries = []
    for query in sql_queries_tuple:
        checked_sql = checker_chain.invoke({
            "schema_str": "\n".join(schema_details),
            "sql_query": query
        })
        checked_queries.append(checked_sql)
    return tuple(checked_queries)

def execute_query_with_error_capture(sql_query, spark):
    try:
        result_df = spark.sql(sql_query)
        return {"success": True, "result": result_df}
    except Exception as e:
        tb_str = traceback.format_exc()
        return {"success": False, "error_message": str(e), "traceback": tb_str}

error_fix_prompt = PromptTemplate.from_template("""
The following SQL query was executed in Spark SQL and resulted in an error.
Please fix the query so it works, considering the schema and error message below.
Return only the corrected SQL query, in Spark SQL syntax.

Schema:
{schema_str}

SQL Query:
{sql_query}

Error Message:
{error_message}
""")

def auto_fix_and_execute(sql_query, spark, schema_details, max_attempts=5):
    attempts = 0
    current_sql = sql_query
    while attempts < max_attempts:
        result = execute_query_with_error_capture(current_sql, spark)
        if result["success"]:
            return result["result"]
        else:
            corrected_chain = error_fix_prompt | llm1 | parser
            current_sql = corrected_chain.invoke({
                "schema_str": "\n".join(schema_details),
                "sql_query": current_sql,
                "error_message": result["error_message"]
            })
        attempts += 1
    raise Exception("Failed to generate a correct SQL query after multiple attempts.")

def execute():
    results = generate_query(user_query)
    final = check(results)
    for idx, query in enumerate(final, start=1):
        print(f"\nQuery {idx}:")
        print(f"SQL: {query}")
        try:
            result_df = auto_fix_and_execute(query, spark, schema_details)
            print(result_df)
        except Exception as e:
            print(f"Failed to execute corrected query after several attempts: {e}")

execute()
spark.stop()


Query 1:
SQL: SELECT
  Status,
  COUNT(*) AS customer_count
FROM crm_data
GROUP BY
  Status;

DataFrame[Status: string, customer_count: bigint]

Query 2:
SQL: SELECT Industry, COUNT(*) AS customer_count
FROM crm_data
GROUP BY Industry;
DataFrame[Industry: string, customer_count: bigint]

Query 3:
SQL: SELECT
  Name
FROM crm_data
WHERE
  Status = 'Churned';

DataFrame[Name: string]

Query 4:
SQL: SELECT
  *
FROM crm_data
WHERE
  LastContacted < date_sub(current_date(), 60);
DataFrame[Name: string, Email: string, Phone: string, Company: string, Industry: string, Status: string, LastContacted: date]


In [3]:
# from pyhive import hive

# conn = hive.Connection(
#     host='localhost',
#     port=10000,
#     username='debasmith',
#     database='default'
# )
# cursor = conn.cursor()
# cursor.execute('SHOW TABLES')
# print(cursor.fetchall())

In [15]:
# sql_final

In [1]:
!jupyter nbextension enable --py widgetsnbextension


SyntaxError: invalid syntax (2269694994.py, line 1)