In [30]:
import os
import dotenv
import psycopg2
from psycopg2 import sql
import pandas as pd
from groq import Groq
from pyspark.sql import SparkSession

In [31]:
def get_llama_assistance(prompt, formatted_metadata, table_name):
    main_purpose = f"""
    As an SQL Query Expert, your primary role is to understand the given data, answer the questions based on the provided input and generate accurate SQL queries ONLY. 
    Remember, you only have to answer the Query for the given input, don't give any explanation, just the query. 
    Here are the column names with respect to their information: 
    {formatted_metadata}
    The table name is {table_name}
    Here is/are the Questions:"""

    client = Groq(api_key=os.environ.get("GROQ_API_KEY"))
    completion = client.chat.completions.create(
        model="llama3-70b-8192",
        messages=[
            {
                "role": "user",
                "content": f"{main_purpose} {prompt}"
            },
            {
                "role": "assistant",
                "content": ""
            }
        ],
        temperature=1.4,
        max_tokens=8192,
        top_p=1,
        stream=True,
        stop=None,
    )

    response_text = ""
    for chunk in completion:
        response_text += chunk.choices[0].delta.content or ""
    
    # Remove backticks from the response
    cleaned_response = response_text.replace("```", "").strip()
    
    return cleaned_response

### PySpark

In [32]:
spark = SparkSession.builder.getOrCreate()
d = spark.read.csv("test_data.csv", header=True)
d.show()

+------+--------+--------+--------------------+-----------------+------------------+-----------+-------------+---------------------+---------------+-----------+---------+---------+-------------------------------+----------------+------------+--------------+-----------------+-------------------+-------------+-------------------------+--------------------------+---------+----------------------+-----------------+------+-------------+-----------------------+--------------------+------------------------+-----------------+
|ref_no|children|age_band|              status|       occupation|occupation_partner|home_status|self_employed|self_employed_partner|year_last_moved|     tvarea|post_code|post_area|average_credit_card_transaction|balance_transfer|term_deposit|life_insurance|medical_insurance|average_a_c_balance|personal_loan|investment_in_mutual_fund|investment_tax_saving_bond|home_loan|online_purchase_amount|discount_offering|gender|       region|investment_in_commudity|investment_in_equit

In [33]:
d.printSchema()

root
 |-- ref_no: string (nullable = true)
 |-- children: string (nullable = true)
 |-- age_band: string (nullable = true)
 |-- status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- occupation_partner: string (nullable = true)
 |-- home_status: string (nullable = true)
 |-- self_employed: string (nullable = true)
 |-- self_employed_partner: string (nullable = true)
 |-- year_last_moved: string (nullable = true)
 |-- tvarea: string (nullable = true)
 |-- post_code: string (nullable = true)
 |-- post_area: string (nullable = true)
 |-- average_credit_card_transaction: string (nullable = true)
 |-- balance_transfer: string (nullable = true)
 |-- term_deposit: string (nullable = true)
 |-- life_insurance: string (nullable = true)
 |-- medical_insurance: string (nullable = true)
 |-- average_a_c_balance: string (nullable = true)
 |-- personal_loan: string (nullable = true)
 |-- investment_in_mutual_fund: string (nullable = true)
 |-- investment_tax_saving_bond: str

In [34]:
d.createOrReplaceTempView("d")
new_query = spark.sql('''SELECT 
  status, 
  AVG(portfolio_balance) AS avg_portfolio_balance, 
  AVG(medical_insurance) AS avg_medical_insurance
FROM 
  d
GROUP BY 
  status;''').show()

+--------------------+---------------------+---------------------+
|              status|avg_portfolio_balance|avg_medical_insurance|
+--------------------+---------------------+---------------------+
|             Partner|    91.48552990011672|    19.34997924503847|
|Single/Never Married|    88.66623978201653|    18.46096276112623|
|             Unknown|             65.86375|   13.132291666666669|
|  Divorced/Separated|     91.4188070692194|   19.051767304860014|
|             Widowed|    81.72394822006467|   18.327686084142346|
+--------------------+---------------------+---------------------+



In [38]:
spark.sql('''SELECT 
  status, 
  AVG(portfolio_balance) AS avg_portfolio_balance, 
  AVG(medical_insurance) AS avg_medical_insurance
FROM 
  d
GROUP BY 
  status;''').toPandas()

Unnamed: 0,status,avg_portfolio_balance,avg_medical_insurance
0,Partner,91.48553,19.349979
1,Single/Never Married,88.66624,18.460963
2,Unknown,65.86375,13.132292
3,Divorced/Separated,91.418807,19.051767
4,Widowed,81.723948,18.327686


In [None]:
schema = d.schema
columns = [(field.name, field.dataType.simpleString()) for field in schema.fields]
for i in columns:
    print(i)

('ref_no', 'string')
('children', 'string')
('age_band', 'string')
('status', 'string')
('occupation', 'string')
('occupation_partner', 'string')
('home_status', 'string')
('self_employed', 'string')
('self_employed_partner', 'string')
('year_last_moved', 'string')
('tvarea', 'string')
('post_code', 'string')
('post_area', 'string')
('average_credit_card_transaction', 'string')
('balance_transfer', 'string')
('term_deposit', 'string')
('life_insurance', 'string')
('medical_insurance', 'string')
('average_a_c_balance', 'string')
('personal_loan', 'string')
('investment_in_mutual_fund', 'string')
('investment_tax_saving_bond', 'string')
('home_loan', 'string')
('online_purchase_amount', 'string')
('discount_offering', 'string')
('gender', 'string')
('region', 'string')
('investment_in_commudity', 'string')
('investment_in_equity', 'string')
('investment_in_derivative', 'string')
('portfolio_balance', 'string')


In [None]:
# Version Mismatch, use 3.11
table_metadata = {}
for column_name, data_type in columns:
    column_name = column_name.lower().replace(" ", "_").replace("/", "_")        
    unique_values = []
    if data_type == 'string':
        unique_values = d.select(column_name).distinct().limit(10).rdd.flatMap(lambda x: x).collect()        
        table_metadata[column_name] = {
                'data_type': data_type,
                'unique_values': unique_values
            }
        
print(table_metadata)

{'ref_no': {'data_type': 'string', 'unique_values': ['296', '467', '675', '691', '829', '1090', '1436', '1512', '1572', '2069']}, 'children': {'data_type': 'string', 'unique_values': ['3', '4+', 'Zero', '1', '2']}, 'age_band': {'data_type': 'string', 'unique_values': ['41-45', '22-25', '45-50', '18-21', 'Unknown', '31-35', '71+', '51-55', '61-65', '55-60']}, 'status': {'data_type': 'string', 'unique_values': ['Partner', 'Single/Never Married', 'Unknown', 'Divorced/Separated', 'Widowed']}, 'occupation': {'data_type': 'string', 'unique_values': ['Student', 'Unknown', 'Other', 'Business Manager', 'Professional', 'Manual Worker', 'Housewife', 'Secretarial/Admin', 'Retired']}, 'occupation_partner': {'data_type': 'string', 'unique_values': ['Student', 'Unknown', 'Other', 'Business Manager', 'Professional', 'Manual Worker', 'Housewife', 'Secretarial/Admin', 'Retired']}, 'home_status': {'data_type': 'string', 'unique_values': ['Own Home', 'Rent Privately', 'Live in Parental Hom', 'Rent from Co

In [None]:
new_table = spark.table("d")

In [None]:
new_table

DataFrame[ref_no: string, children: string, age_band: string, status: string, occupation: string, occupation_partner: string, home_status: string, self_employed: string, self_employed_partner: string, year_last_moved: string, tvarea: string, post_code: string, post_area: string, average_credit_card_transaction: string, balance_transfer: string, term_deposit: string, life_insurance: string, medical_insurance: string, average_a_c_balance: string, personal_loan: string, investment_in_mutual_fund: string, investment_tax_saving_bond: string, home_loan: string, online_purchase_amount: string, discount_offering: string, gender: string, region: string, investment_in_commudity: string, investment_in_equity: string, investment_in_derivative: string, portfolio_balance: string]

### Session-2

In [None]:
spark = SparkSession.builder.getOrCreate()
d = spark.read.csv("test_data.csv", header=True).toPandas()

In [None]:
d.columns

Index(['ref_no', 'children', 'age_band', 'status', 'occupation',
       'occupation_partner', 'home_status', 'self_employed',
       'self_employed_partner', 'year_last_moved', 'tvarea', 'post_code',
       'post_area', 'average_credit_card_transaction', 'balance_transfer',
       'term_deposit', 'life_insurance', 'medical_insurance',
       'average_a_c_balance', 'personal_loan', 'investment_in_mutual_fund',
       'investment_tax_saving_bond', 'home_loan', 'online_purchase_amount',
       'discount_offering', 'gender', 'region', 'investment_in_commudity',
       'investment_in_equity', 'investment_in_derivative',
       'portfolio_balance'],
      dtype='object')