In [None]:
# pip install langchain langchain-anthropic anthropic

In [1]:
i = 1
while i <=5:
    i=i+1
    print(i)

2
3
4
5
6


In [None]:
from abc import ABC, abstractmethod
import openai
from typing import Any, Dict
from langchain.chat_models import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_community.llms import Ollama


# ✔ Best Overall (SQL Correction & Generation): GPT-4-Turbo
# ✔ Best for Large Schema & Context: Gemini 1.5-Pro
# ✔ Best Open-Source (Privacy Focused): Llama 3-70B


class DatabaseConnector(ABC):
    """Abstract base class for database connectors."""
    
    def __init__(self, **kwargs):
        self.credentials = kwargs
        self.connection = None
        # self.llm = ChatOpenAI(model_name="gpt-4", temperature=0)
        # self.llm = ChatAnthropic(model_name="claude-3-opus-2024-03-12", anthropic_api_key=self.credentials.get("anthropic_api_key"))
        # self.llm = ChatOpenAI(model_name="gpt-4-turbo", openai_api_key=self.credentials.get("openai_api_key"))
        self.llm = ChatGoogleGenerativeAI(model="gemini-1.5-pro", google_api_key=self.credentials.get("google_api_key"))
        # self.llm = Ollama(model="llama3-70b")

    @abstractmethod
    def connect(self):
        """Establish a connection to the database."""
        pass
    
    @abstractmethod
    def fetch_schema(self):
        """Fetch schema information."""
        pass
    
    @abstractmethod
    def execute_query(self, query: str):
        """Execute a query on the database."""
        pass
    
    def close(self):
        """Close the database connection."""
        if self.connection:
            self.connection.close()

    def generate_sql_query(self, prompt: str, schema_info: Dict[str, Any]) -> str:
        """Generate SQL query using LLM based on schema info and user prompt."""
        schema_text = "\n".join(
            [f"{table}: {', '.join(columns.keys())}" for table, columns in schema_info.items()]
        )
        full_prompt = f"Database Schema:\n{schema_text}\nUser Query: {prompt}\nGenerate SQL:"
        
        response = self.llm.invoke(full_prompt)
        return response.choices[0].text.strip()

# MySQL Connector
import mysql.connector

class MySQLConnector(DatabaseConnector):
    def connect(self):
        self.connection = mysql.connector.connect(
            host=self.credentials["host"],
            user=self.credentials["user"],
            password=self.credentials["password"],
            database=self.credentials["database"]
        )
    
    def fetch_schema(self):
        cursor = self.connection.cursor()
        cursor.execute("SELECT TABLE_NAME, COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA=DATABASE();")
        schema_info = {}
        for table, column, dtype in cursor.fetchall():
            if table not in schema_info:
                schema_info[table] = {}
            schema_info[table][column] = dtype
        cursor.close()
        return schema_info
    
    def execute_query(self, query: str):
        cursor = self.connection.cursor()
        cursor.execute(query)
        result = cursor.fetchall()
        cursor.close()
        return result

# MongoDB Connector
from pymongo import MongoClient

class MongoDBConnector(DatabaseConnector):
    def connect(self):
        self.connection = MongoClient(self.credentials["host"], self.credentials["port"])
    
    def fetch_schema(self):
        db = self.connection[self.credentials["database"]]
        schema_info = {}
        for collection in db.list_collection_names():
            schema_info[collection] = db[collection].find_one().keys()
        return schema_info
    
    def execute_query(self, query: Dict[str, Any]):
        db = self.connection[self.credentials["database"]]
        collection = db[query["collection"]]
        return list(collection.find(query["filter"], query["projection"]))

# Amazon Athena Connector
import boto3

class AthenaConnector(DatabaseConnector):
    def connect(self):
        self.connection = boto3.client("athena", region_name=self.credentials["region"])
    
    def execute_query(self, query: str):
        response = self.connection.start_query_execution(
            QueryString=query,
            QueryExecutionContext={"Database": self.credentials["database"]},
            ResultConfiguration={"OutputLocation": self.credentials["output_location"]}
        )
        return response["QueryExecutionId"]

# Neo4j Connector
from neo4j import GraphDatabase

class Neo4jConnector(DatabaseConnector):
    def connect(self):
        self.connection = GraphDatabase.driver(
            self.credentials["uri"],
            auth=(self.credentials["user"], self.credentials["password"])
        )
    
    def execute_query(self, query: str):
        with self.connection.session() as session:
            return session.run(query).data()

# InfluxDB Connector
from influxdb import InfluxDBClient

class InfluxDBConnector(DatabaseConnector):
    def connect(self):
        self.connection = InfluxDBClient(
            host=self.credentials["host"],
            port=self.credentials["port"],
            username=self.credentials["user"],
            password=self.credentials["password"],
            database=self.credentials["database"]
        )
    
    def execute_query(self, query: str):
        return list(self.connection.query(query).get_points())

# Example usage:
# db = MySQLConnector(host='localhost', user='root', password='pass', database='testdb')
# db.connect()
# schema = db.fetch_schema()
# query = db.generate_sql_query("Get all customers", schema)
# result = db.execute_query(query)
# print(result)
# db.close()

# db = MongoDBConnector(host='localhost', port=27017, database='testdb')
# db.connect()
# result = db.execute_query({"collection": "customers", "filter": {}, "projection": {"_id": 0}})
# print(result)
# db.close()

# db = AthenaConnector(region='us-east-1', database='testdb', output_location='s3://query-results/')
# db.connect()
# result = db.execute_query("SELECT * FROM customers;")
# print(result)
# db.close()

# db = Neo4jConnector(uri='bolt://localhost:7687', user='neo4j', password='pass')
# db.connect()
# result = db.execute_query("MATCH (c:Customer) RETURN c")
# print(result)
# db.close()

# db = InfluxDBConnector(host='localhost', port=8086, user='admin', password='pass', database='testdb')
# db.connect()
# result = db.execute_query("SELECT * FROM customers")
# print(result)
# db.close()


In [None]:
from abc import ABC, abstractmethod
import openai
from typing import Any, Dict
from langchain.chat_models import ChatOpenAI

# PostgreSQL Connector
import psycopg2

class PostgreSQLConnector(DatabaseConnector):
    def connect(self):
        self.connection = psycopg2.connect(
            host=self.credentials["host"],
            user=self.credentials["user"],
            password=self.credentials["password"],
            database=self.credentials["database"]
        )
    
    def fetch_schema(self):
        cursor = self.connection.cursor()
        cursor.execute("SELECT table_name, column_name, data_type FROM information_schema.columns WHERE table_schema='public';")
        schema_info = {}
        for table, column, dtype in cursor.fetchall():
            if table not in schema_info:
                schema_info[table] = {}
            schema_info[table][column] = dtype
        cursor.close()
        return schema_info
    
    def execute_query(self, query: str):
        cursor = self.connection.cursor()
        cursor.execute(query)
        result = cursor.fetchall()
        cursor.close()
        return result

# SQL Server Connector
import pyodbc

class SQLServerConnector(DatabaseConnector):
    def connect(self):
        self.connection = pyodbc.connect(
            f"DRIVER={{SQL Server}};SERVER={self.credentials['server']};DATABASE={self.credentials['database']};UID={self.credentials['user']};PWD={self.credentials['password']}"
        )
    
    def execute_query(self, query: str):
        cursor = self.connection.cursor()
        cursor.execute(query)
        result = cursor.fetchall()
        cursor.close()
        return result

# Cassandra Connector
from cassandra.cluster import Cluster

class CassandraConnector(DatabaseConnector):
    def connect(self):
        cluster = Cluster([self.credentials["host"]])
        self.connection = cluster.connect(self.credentials["keyspace"])
    
    def execute_query(self, query: str):
        return list(self.connection.execute(query))

# Snowflake Connector
import snowflake.connector

class SnowflakeConnector(DatabaseConnector):
    def connect(self):
        self.connection = snowflake.connector.connect(
            user=self.credentials["user"],
            password=self.credentials["password"],
            account=self.credentials["account"],
            database=self.credentials["database"]
        )
    
    def execute_query(self, query: str):
        cursor = self.connection.cursor()
        cursor.execute(query)
        result = cursor.fetchall()
        cursor.close()
        return result

# Example usage:
# db = PostgreSQLConnector(host='localhost', user='postgres', password='pass', database='testdb')
# db.connect()
# schema = db.fetch_schema()
# query = db.generate_sql_query("Get all customers", schema)
# result = db.execute_query(query)
# print(result)
# db.close()

# db = SQLServerConnector(server='localhost', user='sa', password='pass', database='testdb')
# db.connect()
# result = db.execute_query("SELECT * FROM Customers")
# print(result)
# db.close()

# db = CassandraConnector(host='localhost', keyspace='testkeyspace')
# db.connect()
# result = db.execute_query("SELECT * FROM customers")
# print(result)
# db.close()

# db = SnowflakeConnector(user='user', password='pass', account='account', database='testdb')
# db.connect()
# result = db.execute_query("SELECT * FROM customers")
# print(result)
# db.close()
