# Generic File Ingestion RAG Pipeline

This notebook contains a comprehensive RAG pipeline for analyzing any data file and finding the top 5 best database tables for data ingestion using semantic search and LLM analysis.

## Features:
- **Multi-format support**: CSV, Excel, JSON, TSV, TXT files
- **Automatic domain detection** from column names
- **Context-aware semantic query generation**
- **SQL agent optimized output format**
- **Confidence scoring for automation decisions**

## Setup and Dependencies

In [3]:
# Import required libraries
import os
import json
import pandas as pd
import hashlib
from uuid import uuid4
from dotenv import load_dotenv

# Vector store and embeddings
from qdrant_client import QdrantClient, models
from qdrant_client.models import PointStruct, PayloadSchemaType
from langchain_openai import OpenAIEmbeddings
from langchain_core.documents import Document
from openai import OpenAI

# Load environment variables
load_dotenv(r'C:\Users\axel.grille\Documents\rules-engine-agent\Agent\.env')
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
QDRANT_URL = os.getenv("QDRANT_URL")
QDRANT_API_KEY = os.getenv("QDRANT_API_KEY")

print("✅ Dependencies loaded successfully")

✅ Dependencies loaded successfully


## Initialize Vector Store and Clients

In [4]:
# Initialize OpenAI and Qdrant clients
client = OpenAI()
encoder = OpenAIEmbeddings()

# Initialize Qdrant client
qdrant_client = QdrantClient(
    url=QDRANT_URL, 
    api_key=QDRANT_API_KEY,
)

print("✅ Clients initialized")
print(f"📊 Available collections: {[col.name for col in qdrant_client.get_collections().collections]}")

✅ Clients initialized
📊 Available collections: ['maxo_vector_store_v2', 'sandbox05_database_schema', 'maxo_vector_store']


## Database Setup (Run Once)

In [5]:
# DICO API call to get database schema
import requests

def efficy_api_call():
    """Fetch database schema from Efficy API"""
    session = requests.Session()
    
    try:
        # Login
        login_response = session.post(
            "https://sandbox-5.efficytest.cloud/crm/logon",
            headers={
                'X-Efficy-Customer': 'SANDBOX05',
                'X-Requested-By': 'User',
                'X-Requested-With': 'XMLHttpRequest',
                'Content-Type': 'application/x-www-form-urlencoded'
            },
            data='user=paul&password=Eff1cyDemo!'
        )
        
        if login_response.status_code == 200:
            print("✅ Login successful")
            
            # DICO request
            dico_response = session.get(
                "https://sandbox-5.efficytest.cloud/crm/system/dico",
                headers={
                    'X-Requested-By': 'User',
                    'X-Requested-With': 'XMLHttpRequest'
                }
            )
            
            if dico_response.status_code == 200:
                print("✅ DICO data retrieved")
                return dico_response.json()
            else:
                print(f"❌ DICO request failed: {dico_response.status_code}")
                
        else:
            print(f"❌ Login failed: {login_response.status_code}")
            
    except Exception as e:
        print(f"❌ Request error: {e}")
    
    return None

# Execute the API call (comment out if dico_data already exists)
dico_data = efficy_api_call()
if dico_data:
    print(f"✅ Retrieved {len(dico_data['data']['tables'])} tables from database schema")

✅ Login successful
✅ DICO data retrieved
✅ Retrieved 409 tables from database schema


In [20]:
# Create stable ID function
def stable_id(*parts, length=32): 
    base = '|'.join(str(p) for p in parts)
    return hashlib.sha256(base.encode()).hexdigest()[:length]

# Generate table chunks for vector store
from Agent.RAG.chunk_generator import generate_table_ingestion_chunks

if 'table_chunks' not in locals():
    table_chunks = generate_table_ingestion_chunks(dico_data)

print(f"✅ Generated {len(table_chunks)} table chunks")

# Create vector store collection
collection_name = "maxo_vector_store_v2"

existing_collections = [col.name for col in qdrant_client.get_collections().collections]
if collection_name not in existing_collections: 
    qdrant_client.create_collection(
        collection_name=collection_name,
        vectors_config=models.VectorParams(
            size=len(encoder.embed_query("Hello world")),
            distance=models.Distance.COSINE,
        ),
    )
    print(f"✅ Created collection: {collection_name}")
else:
    print(f"✅ Using existing collection: {collection_name}")

✅ Generated 286 table chunks
✅ Using existing collection: maxo_vector_store_v2


In [21]:
# Create and upload table points to vector store
table_points = []

for chunk in table_chunks: 
    chunk_id = stable_id(
        chunk.metadata['chunk_type'],
        chunk.metadata['primary_table'],
        chunk.metadata['table_code']
    )

    embedding = encoder.embed_query(chunk.page_content)

    point = PointStruct(
        id=chunk_id, 
        vector=embedding,
        payload={
            'content': chunk.page_content,
            'chunk_type': chunk.metadata['chunk_type'],
            'primary_table': chunk.metadata['primary_table'],
            'table_code': chunk.metadata['table_code'],
            'table_kind': chunk.metadata['table_kind'],
            'field_count': chunk.metadata['field_count'],
            'metadata': chunk.metadata
        }
    )
    table_points.append(point)

# Upsert points to Qdrant
try:
    result = qdrant_client.upsert(
        collection_name=collection_name, 
        points=table_points
    )
    print(f"✅ Successfully upserted {len(table_points)} table chunks")
    
    collection_info = qdrant_client.get_collection(collection_name)
    print(f"📊 Collection now contains {collection_info.points_count} total points")
except Exception as e:
    print(f"❌ Error during upsert: {e}")

# Create payload indexes for efficient filtering
try:
    qdrant_client.create_payload_index(
        collection_name=collection_name,
        field_name="chunk_type",
        field_schema=PayloadSchemaType.KEYWORD
    )
    qdrant_client.create_payload_index(
        collection_name=collection_name,
        field_name="primary_table",
        field_schema=PayloadSchemaType.KEYWORD
    )
    print("✅ Payload indexes created")
except Exception as e:
    print(f"⚠️  Index creation (may already exist): {e}")

✅ Successfully upserted 286 table chunks
📊 Collection now contains 286 total points
✅ Payload indexes created


## Generic File Ingestion RAG Pipeline

In [None]:
class GenericFileIngestionRAGPipeline:
    """
    Generic RAG pipeline for analyzing any data file and finding the top 5 best
    database tables for data ingestion using semantic search and LLM analysis.
    """
    
    def __init__(self, qdrant_client, encoder, collection_name="maxo_vector_store_v2"):
        self.qdrant_client = qdrant_client
        self.encoder = encoder
        self.collection_name = collection_name
        self.llm_client = OpenAI()
        self.supported_formats = ['.csv', '.xlsx', '.xls', '.json', '.txt', '.tsv']
    
    def analyze_file_structure(self, file_path):
        """Analyze any supported file structure and content"""
        try:
            if not os.path.exists(file_path):
                return {'error': f'File not found: {file_path}'}
            
            file_extension = os.path.splitext(file_path)[1].lower()
            file_name = os.path.basename(file_path)
            
            if file_extension not in self.supported_formats:
                return {'error': f'Unsupported file format: {file_extension}'}
            
            # Handle different file types
            if file_extension == '.csv':
                return self._analyze_csv(file_path, file_name)
            elif file_extension in ['.xlsx', '.xls']:
                return self._analyze_excel(file_path, file_name)
            elif file_extension == '.json':
                return self._analyze_json(file_path, file_name)
            elif file_extension in ['.txt', '.tsv']:
                return self._analyze_text(file_path, file_name)
            
        except Exception as e:
            return {'error': f'Failed to analyze file: {str(e)}'}
    
    def _analyze_csv(self, file_path, file_name):
        """Analyze CSV files"""
        df = pd.read_csv(file_path)
        return self._create_file_analysis(df, file_name, 'CSV')
    
    def _analyze_excel(self, file_path, file_name):
        """Analyze Excel files"""
        df = pd.read_excel(file_path)
        return self._create_file_analysis(df, file_name, 'Excel')
    
    def _analyze_json(self, file_path, file_name):
        """Analyze JSON files"""
        with open(file_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        
        if isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict):
            df = pd.DataFrame(data)
            return self._create_file_analysis(df, file_name, 'JSON Array')
        elif isinstance(data, dict):
            df = pd.DataFrame([data])
            return self._create_file_analysis(df, file_name, 'JSON Object')
        else:
            return {'error': 'JSON structure not suitable for tabular analysis'}
    
    def _analyze_text(self, file_path, file_name):
        """Analyze text/TSV files"""
        with open(file_path, 'r', encoding='utf-8') as f:
            first_line = f.readline()
        
        delimiter = '\t' if '\t' in first_line else ',' if ',' in first_line else ';'
        df = pd.read_csv(file_path, delimiter=delimiter)
        return self._create_file_analysis(df, file_name, 'Text/TSV')
    
    def _create_file_analysis(self, df, file_name, file_type):
        """Create standardized file analysis from DataFrame"""
        file_info = {
            'file_name': file_name,
            'file_type': file_type,
            'total_rows': len(df),
            'total_columns': len(df.columns),
            'columns': df.columns.tolist(),
            'sample_data': df.head(2).to_dict('records') if len(df) > 0 else []
        }
        
        # Analyze column types and content
        column_analysis = {}
        for col in df.columns:
            column_analysis[col] = {
                'dtype': str(df[col].dtype),
                'non_null_count': df[col].notna().sum(),
                'null_count': df[col].isna().sum(),
                'unique_values': df[col].nunique(),
                'sample_values': df[col].dropna().head(3).tolist()
            }
        
        file_info['column_analysis'] = column_analysis
        return file_info
    
    def _infer_data_domain(self, columns):
        "Enhanced data domain inference with comprehensive business entity detection"
        columns_lower = [col.lower() for col in columns]
        
        # Comprehensive domain detection patterns
        domain_patterns = {
            # Core CRM entities
            'leads': [
                'lead', 'prospect', 'lead_status', 'source', 'qualification', 'score', 'conversion',
                'lead_id', 'prospect_id', 'qualified', 'unqualified', 'mql', 'sql', 'nurture'
            ],
            'opportunities': [
                'opportunity', 'deal', 'pipeline', 'stage', 'probability', 'close_date', 'forecast',
                'opp_id', 'deal_id', 'sales_stage', 'win_probability', 'expected_revenue', 'deal_value'
            ],
            'contacts': [
                'contact', 'person', 'individual', 'first_name', 'last_name', 'title', 'relationship',
                'contact_id', 'person_id', 'full_name', 'job_title', 'phone', 'mobile', 'email'
            ],
            'companies': [
                'company', 'organization', 'enterprise', 'business', 'industry', 'sector', 'headquarters',
                'company_id', 'org_id', 'business_name', 'company_name', 'industry_type', 'company_size'
            ],
            'activities': [
                'activity', 'action', 'event', 'log', 'history', 'timeline', 'interaction',
                'activity_id', 'event_id', 'action_type', 'activity_type', 'interaction_type', 'follow_up'
            ],
            'meetings': [
                'meeting', 'appointment', 'schedule', 'calendar', 'attendee', 'agenda', 'duration',
                'meeting_id', 'appointment_id', 'scheduled', 'start_time', 'end_time', 'location'
            ],
            'campaigns': [
                'campaign', 'marketing', 'promotion', 'advertising', 'channel', 'target', 'response',
                'campaign_id', 'promo_id', 'marketing_campaign', 'campaign_name', 'campaign_type'
            ],
            'tickets': [
                'ticket', 'issue', 'support', 'incident', 'priority', 'resolution', 'escalation',
                'ticket_id', 'issue_id', 'support_ticket', 'incident_id', 'case_id', 'help_desk'
            ],
            'users': [
                'user', 'username', 'login', 'profile', 'role', 'permission', 'access',
                'user_id', 'account', 'user_name', 'login_name', 'user_role', 'access_level'
            ],
            
            # Extended business domains
            'communication': [
                'message', 'email', 'mail', 'subject', 'sender', 'recipient', 'date',
                'corps du message', 'mail expéditeur', 'mail destinataire', 'objet', 'visite_mail'
            ],
            'sales_orders': ['order', 'product', 'price', 'quantity', 'total', 'invoice', 'payment'],
            'financial': ['amount', 'cost', 'revenue', 'budget', 'transaction', 'account', 'currency'],
            'hr_employee': ['employee', 'staff', 'salary', 'department', 'position', 'hire', 'manager'],
            'inventory': ['item', 'stock', 'warehouse', 'supplier', 'category', 'sku', 'unit'],
            'project': ['project', 'task', 'milestone', 'deadline', 'status', 'resource', 'team'],
            'logistics': ['shipment', 'delivery', 'tracking', 'carrier', 'destination', 'weight']
        }
        
        # Calculate domain scores with weighted importance
        domain_scores = {}
        for domain, keywords in domain_patterns.items():
            score = 0
            matched_keywords = []
            
            for keyword in keywords:
                for col in columns_lower:
                    if keyword in col:
                        # Weight exact matches higher
                        if keyword == col:
                            score += 3
                        # Weight ID fields higher (strong indicators)
                        elif keyword.endswith('_id') and keyword in col:
                            score += 2.5
                        # Regular substring matches
                        else:
                            score += 1
                        matched_keywords.append(keyword)
                        break
            
            domain_scores[domain] = {
                'score': score,
                'matched_keywords': list(set(matched_keywords))
            }
        
        # Get best matching domain
        best_domain = max(domain_scores, key=lambda x: domain_scores[x]['score']) if domain_scores else 'general'
        best_score = domain_scores[best_domain]['score'] if best_domain != 'general' else 0
        
        # Enhanced domain mapping with confidence indicators
        domain_mapping = {
            'leads': {
                'primary_domain': 'lead management and prospecting', 
                'business_area': 'sales lead generation', 
                'data_category': 'leads',
                'table_hints': ['lead', 'prospect', 'qualification']
            },
            'opportunities': {
                'primary_domain': 'sales opportunity tracking', 
                'business_area': 'sales pipeline management', 
                'data_category': 'opportunities',
                'table_hints': ['opportunity', 'deal', 'sales_pipeline']
            },
            'contacts': {
                'primary_domain': 'contact and person management', 
                'business_area': 'relationship management', 
                'data_category': 'contacts',
                'table_hints': ['contact', 'person', 'individual']
            },
            'companies': {
                'primary_domain': 'company and organization management', 
                'business_area': 'corporate data management', 
                'data_category': 'companies',
                'table_hints': ['company', 'organization', 'enterprise']
            },
            'activities': {
                'primary_domain': 'activity and event tracking', 
                'business_area': 'interaction management', 
                'data_category': 'activities',
                'table_hints': ['activity', 'event', 'action', 'visit']
            },
            'meetings': {
                'primary_domain': 'meeting and calendar management', 
                'business_area': 'scheduling and appointments', 
                'data_category': 'meetings',
                'table_hints': ['meeting', 'appointment', 'schedule']
            },
            'campaigns': {
                'primary_domain': 'marketing campaign management', 
                'business_area': 'marketing operations', 
                'data_category': 'campaigns',
                'table_hints': ['campaign', 'marketing', 'promotion']
            },
            'tickets': {
                'primary_domain': 'ticketing and support management', 
                'business_area': 'customer support', 
                'data_category': 'tickets',
                'table_hints': ['ticket', 'support', 'incident']
            },
            'users': {
                'primary_domain': 'user and account management', 
                'business_area': 'system administration', 
                'data_category': 'users',
                'table_hints': ['user', 'account', 'profile']
            },
            'communication': {
                'primary_domain': 'communication and messaging', 
                'business_area': 'correspondence', 
                'data_category': 'communication',
                'table_hints': ['mail', 'email', 'message', 'visit']
            },
            'sales_orders': {'primary_domain': 'sales and order management', 'business_area': 'sales operations', 'data_category': 'transactional', 'table_hints': ['order', 'sale', 'invoice']},
            'financial': {'primary_domain': 'financial and accounting', 'business_area': 'finance', 'data_category': 'financial', 'table_hints': ['financial', 'accounting', 'budget']},
            'hr_employee': {'primary_domain': 'human resources', 'business_area': 'HR management', 'data_category': 'employee', 'table_hints': ['employee', 'staff', 'hr']},
            'inventory': {'primary_domain': 'inventory and stock management', 'business_area': 'supply chain', 'data_category': 'inventory', 'table_hints': ['inventory', 'stock', 'product']},
            'project': {'primary_domain': 'project management', 'business_area': 'project operations', 'data_category': 'project', 'table_hints': ['project', 'task', 'milestone']},
            'logistics': {'primary_domain': 'logistics and shipping', 'business_area': 'operations', 'data_category': 'logistics', 'table_hints': ['shipment', 'delivery', 'logistics']},
            'general': {'primary_domain': 'business data', 'business_area': 'general operations', 'data_category': 'business', 'table_hints': ['data', 'general']}
        }
        
        result = domain_mapping.get(best_domain, domain_mapping['general'])
        
        # Add detection metadata for debugging and confidence assessment
        result['detection_confidence'] = min(best_score / 5.0, 1.0)  # Normalize to 0-1
        result['matched_keywords'] = domain_scores.get(best_domain, {}).get('matched_keywords', [])
        result['all_scores'] = {k: v['score'] for k, v in domain_scores.items() if v['score'] > 0}
        
        return result
    
    def generate_semantic_queries(self, file_info, user_context=None):
        """Generate semantic queries based on file content and optional user context"""
        columns = file_info.get('columns', [])
        file_name = file_info.get('file_name', 'data file')
        file_type = file_info.get('file_type', 'file')
        
        domain_hints = self._infer_data_domain(columns)
        
        # Enhanced email/mail detection
        email_indicators = self._detect_email_signals(file_name, columns)
        
        query_templates = [
            f"database table for storing {domain_hints['primary_domain']} data with fields like {', '.join(columns[:6])}",
            f"{file_type} data ingestion into relational database with columns {', '.join(columns[:8])}",
            f"business data management system for {domain_hints['business_area']} information",
            f"data warehouse table for {domain_hints['data_category']} records and analytics",
            f"structured data storage for {file_name} content in enterprise database"
        ]
        
        # If strong email signals detected, prioritize email/mail queries
        if email_indicators['is_email_data']:
            email_queries = [
                f"mail table for email message storage with sender recipient fields {', '.join(email_indicators['email_columns'])}",
                f"email communication table for message tracking with mail fields from {file_name}",
                f"mail system database table for email correspondence management",
                f"email message storage table with mail content and headers",
                f"visit mail table for email tracking with fields {', '.join(columns[:4])}"
            ]
            # Insert email-specific queries at the beginning for higher priority
            query_templates = email_queries + query_templates
            print(f"🔍 Enhanced with {len(email_queries)} email-specific queries (confidence: {email_indicators['confidence']:.2f})")
        
        if user_context:
            context_query = f"{user_context} with data structure: {', '.join(columns[:10])}"
            query_templates.insert(0, context_query)
        
        return query_templates

    
    def search_relevant_tables(self, queries, top_k=10):
        """Search for relevant tables using multiple semantic queries"""
        all_results = {}
        
        for i, query in enumerate(queries):
            query_embedding = self.encoder.embed_query(query)
            
            try:
                search_results = self.qdrant_client.query_points(
                    collection_name=self.collection_name,
                    query=query_embedding,
                    query_filter=models.Filter(
                        must=[
                            models.FieldCondition(
                                key="chunk_type",
                                match=models.MatchValue(value="table_ingestion_profile")
                            )
                        ]
                    ),
                    limit=top_k
                )
                
                for point in search_results.points:
                    table_name = point.payload['primary_table']
                    
                    if table_name not in all_results:
                        all_results[table_name] = {
                            'table_name': table_name,
                            'table_code': point.payload['table_code'],
                            'table_kind': point.payload['table_kind'],
                            'field_count': point.payload['field_count'],
                            'content': point.payload['content'],
                            'metadata': point.payload['metadata'],
                            'scores': [],
                            'queries_matched': []
                        }
                    
                    all_results[table_name]['scores'].append(point.score)
                    all_results[table_name]['queries_matched'].append(i+1)
                
            except Exception as e:
                print(f"Search error for query {i+1}: {e}")
        
        return all_results
    
    def rank_tables_by_relevance(self, search_results):
        """Rank tables by multiple relevance criteria"""
        ranked_tables = []
        
        for table_name, data in search_results.items():
            scores = data['scores']
            avg_score = sum(scores) / len(scores) if scores else 0
            max_score = max(scores) if scores else 0
            query_coverage = len(set(data['queries_matched']))
            
            composite_score = (max_score * 0.4) + (avg_score * 0.4) + (query_coverage * 0.2)
            
            ranked_tables.append({
                'table_name': table_name,
                'table_code': data['table_code'],
                'table_kind': data['table_kind'],
                'field_count': data['field_count'],
                'content': data['content'],
                'avg_score': avg_score,
                'max_score': max_score,
                'query_coverage': query_coverage,
                'composite_score': composite_score,
                'total_matches': len(scores),
                'queries_matched': data['queries_matched']
            })
        
        ranked_tables.sort(key=lambda x: x['composite_score'], reverse=True)
        return ranked_tables
    
    def run_complete_pipeline(self, file_path, user_context=None):
        """Run the complete RAG pipeline for any file type"""
        print("=== GENERIC FILE INGESTION RAG PIPELINE ===")
        print(f"📁 Analyzing: {os.path.basename(file_path)}")
        print()
        
        # Step 1: Analyze file structure
        print("Step 1: Analyzing file structure...")
        file_info = self.analyze_file_structure(file_path)
        if 'error' in file_info:
            return file_info
        
        print(f"✓ {file_info['file_type']} file: {file_info['total_columns']} columns, {file_info['total_rows']} rows")
        print(f"✓ Detected domain: {self._infer_data_domain(file_info['columns'])['primary_domain']}")
        print()
        
        # Step 2: Generate semantic search queries
        print("Step 2: Generating semantic search queries...")
        queries = self.generate_semantic_queries(file_info, user_context)
        print(f"✓ Generated {len(queries)} queries for database search")
        print()
        
        # Step 3: Search for relevant tables
        print("Step 3: Searching for relevant database tables...")
        search_results = self.search_relevant_tables(queries)
        print(f"✓ Found {len(search_results)} unique tables across all queries")
        print()
        
        # Step 4: Rank tables by relevance
        print("Step 4: Ranking tables by relevance...")
        ranked_tables = self.rank_tables_by_relevance(search_results)
        print(f"✓ Ranked {len(ranked_tables)} tables by composite relevance score")
        print()
        
        # Compile final results
        final_results = {
            'file_analysis': file_info,
            'inferred_domain': self._infer_data_domain(file_info['columns']),
            'user_context': user_context,
            'search_queries_used': queries,
            'total_tables_found': len(search_results),
            'top_10_tables': ranked_tables[:10],  # Changed from top_5_tables to top_10_tables
            'ingestion_summary': {
                'recommended_table': ranked_tables[0]['table_name'] if ranked_tables else None,
                'confidence_level': self._calculate_confidence_level(ranked_tables[0] if ranked_tables else None),
                'requires_review': self._requires_review(ranked_tables[0] if ranked_tables else None),
                'sql_agent_ready': len(ranked_tables) > 0 and ranked_tables[0]['composite_score'] > 0.6
            }
        }
        
        return final_results
    
    def _calculate_confidence_level(self, best_table):
        """Calculate confidence level for SQL agent"""
        if not best_table:
            return 'None'
        score = best_table['composite_score']
        if score > 0.6:  # Lowered from 0.8
            return 'High'
        elif score > 0.4:  # Lowered from 0.6
            return 'Medium'
        else:
            return 'Low'
    
    def _requires_review(self, best_table):
        """Determine if human review is needed before SQL generation"""
        if not best_table:
            return True
        return best_table['composite_score'] < 0.7
    
    def display_results_summary(self, results):
        """Display a formatted summary optimized for SQL agent consumption"""
        if 'error' in results:
            print(f"❌ Pipeline Error: {results['error']}")
            return
        
        file_info = results['file_analysis']
        summary = results['ingestion_summary']
        
        print("=" * 80)
        print(f"🎯 INGESTION ANALYSIS: {file_info['file_name']}")
        print("=" * 80)
        print(f"📊 File: {file_info['file_type']} | {file_info['total_rows']} rows | {file_info['total_columns']} columns")
        print(f"🏷️  Domain: {results['inferred_domain']['primary_domain']}")
        print(f"🎯 Best Table: {summary['recommended_table']}")
        print(f"🔍 Confidence: {summary['confidence_level']}")
        print(f"⚠️  Review Required: {'Yes' if summary['requires_review'] else 'No'}")
        print(f"🤖 SQL Agent Ready: {'Yes' if summary['sql_agent_ready'] else 'No'}")
        print()
        
        print("📋 TOP 10 DATABASE TABLES:") 
        for i, table in enumerate(results['top_10_tables'], 1):  
            print(f"{i}. {table['table_name']} ({table['table_kind']})")
            print(f"   Score: {table['composite_score']:.3f} | Fields: {table['field_count']} | Matches: {table['total_matches']}")
        print()
        
        if summary['sql_agent_ready']:
            print("✅ Ready for SQL Agent Processing")
        else:
            print("⚠️  Requires review before SQL generation")
    
    def export_for_sql_agent(self, results, output_file=None):
        """Export results in format optimized for SQL generation agent"""
        if 'error' in results:
            return results
        
        if not output_file:
            file_name = results['file_analysis']['file_name']
            base_name = os.path.splitext(file_name)[0]
            output_file = f"{base_name}_ingestion_analysis.json"
        
        sql_agent_data = {
            'source_file': results['file_analysis']['file_name'],
            'file_structure': {
                'columns': results['file_analysis']['columns'],
                'total_rows': results['file_analysis']['total_rows'],
                'column_types': {col: analysis['dtype'] for col, analysis in results['file_analysis']['column_analysis'].items()}
            },
            'recommended_ingestion': {
                'primary_table': results['ingestion_summary']['recommended_table'],
                'confidence': results['ingestion_summary']['confidence_level'],
                'ready_for_sql': results['ingestion_summary']['sql_agent_ready']
            },
            'table_options': [
                {
                    'table_name': table['table_name'],
                    'table_code': table['table_code'],
                    'relevance_score': round(table['composite_score'], 3),
                    'field_count': table['field_count'],
                    'table_schema': table['content']
                }
                for table in results['top_10_tables']  # Changed from top_5_tables to top_10_tables
            ],
            'generation_timestamp': pd.Timestamp.now().isoformat()
        }
        
        try:
            with open(output_file, 'w', encoding='utf-8') as f:
                json.dump(sql_agent_data, f, indent=2, ensure_ascii=False)
            print(f"\n💾 SQL Agent data exported to: {output_file}")
            return {'success': True, 'output_file': output_file}
        except Exception as e:
            return {'error': f'Export failed: {str(e)}'}

print("✅ GenericFileIngestionRAGPipeline class complete with all methods")

✅ GenericFileIngestionRAGPipeline class complete with all methods


In [24]:
# Initialize the pipeline
pipeline = GenericFileIngestionRAGPipeline(qdrant_client, encoder, "maxo_vector_store_v2")

# Analyze a file (example with Mails.csv)
file_path = r"C:\Users\axel.grille\Documents\rules-engine-agent\Mails.csv"
user_context = "email correspondence and customer communication tracking"

# Run the complete pipeline
results = pipeline.run_complete_pipeline(file_path, user_context)

# Display results
pipeline.display_results_summary(results)

# Export for SQL agent
if 'error' not in results:
    export_result = pipeline.export_for_sql_agent(results)
    if 'success' in export_result:
        print(f"✅ Ready for SQL Agent: {export_result['output_file']}")

=== GENERIC FILE INGESTION RAG PIPELINE ===
📁 Analyzing: Mails.csv

Step 1: Analyzing file structure...
✓ CSV file: 25 columns, 5 rows
✓ Detected domain: communication and messaging

Step 2: Generating semantic search queries...
🔍 Enhanced with 5 email-specific queries (confidence: 1.00)
✓ Generated 11 queries for database search

Step 3: Searching for relevant database tables...
✓ Found 48 unique tables across all queries

Step 4: Ranking tables by relevance...
✓ Ranked 48 tables by composite relevance score

🎯 INGESTION ANALYSIS: Mails.csv
📊 File: CSV | 5 rows | 25 columns
🏷️  Domain: communication and messaging
🎯 Best Table: Mail
🔍 Confidence: High
⚠️  Review Required: No
🤖 SQL Agent Ready: Yes

📋 TOP 10 DATABASE TABLES:
1. Mail (Entity)
   Score: 2.029 | Fields: 31 | Matches: 7
2. Contact (Entity)
   Score: 1.996 | Fields: 70 | Matches: 7
3. Sms (Entity)
   Score: 1.814 | Fields: 9 | Matches: 6
4. Docu_Docu (Relation)
   Score: 1.806 | Fields: 6 | Matches: 6
5. Kbas_Kbas (Relation)

In [5]:
qdrant_client = QdrantClient(url=QDRANT_URL, api_key=QDRANT_API_KEY)

qdrant_client.get_collections().collections

[CollectionDescription(name='maxo_vector_store_v2'),
 CollectionDescription(name='sandbox05_database_schema'),
 CollectionDescription(name='maxo_vector_store')]

In [8]:
embedding = OpenAIEmbeddings()

In [None]:
query = "Which table should I use to store email messages with sender recipient subject and body content"
query_embedding = embedding.embed_query(query)

response = qdrant_client.query_points(
    collection_name="sandbox_vector_store",
    query=query_embedding,
)
print(response)

points=[ScoredPoint(id='6ba91d04-d51e-49ee-3cc1-e86d6e6094d6', version=62, score=0.75811076, payload={'content': "=== TABLE INGESTION PROFILE: Mail (mail) ===\nType: Entity Table\nPurpose: Manage and track email interactions for effective communication and customer relationship management.\n\n# SCHEMA & CONSTRAINTS\nTotal fields: 31\nRequired fields: (none detected)\n\n# FIELD DEFINITIONS\n- mailKey (None): FOB | NULLABLE\n- mailInteractionKey (None): FOB | NULLABLE\n- mailSubject (None): ALPHANUMERIC | NULLABLE\n- mailSysCreatedDate (None): DATE | NULLABLE\n- mailSysUpdatedDate (None): DATE | NULLABLE\n- mailSysCreatedUserKey (None): ALPHANUMERIC | NULLABLE\n- mailSysUpdatedUserKey (None): ALPHANUMERIC | NULLABLE\n- mailArchived (None): BOOLEAN | NULLABLE\n- mailReceivedDate (None): DATE | NULLABLE\n- mailMailFrom (None): ALPHANUMERIC | NULLABLE\n- mailMailTo (None): MEMO | NULLABLE\n- mailMailCc (None): MEMO | NULLABLE\n- mailBody (None): BLOB | NULLABLE\n- mailHtmlBody (None): BLOB 

V2

In [6]:
dico_data = efficy_api_call()

✅ Login successful
✅ DICO data retrieved


In [7]:
dico_data

{'data': {'fields': {'0001000U000003fh': {'sfldKey': '0001000U000003fh',
    'sfldKTable': 10,
    'sfldName': 'stblKey',
    'sfldQuery': True,
    'sfldGrid': True,
    'sfldDataType': 'Key',
    'sfldDefaultValue': 'KEYGEN',
    'sfldMassAction': True,
    'sfldType': 'FOB',
    'sfldUsage': 'K',
    'sfldAvailableInSegments': True},
   '0001000U000003fj': {'sfldKey': '0001000U000003fj',
    'sfldKTable': 10,
    'sfldName': 'stblKTable',
    'sfldQuery': True,
    'sfldGrid': True,
    'sfldDataType': 'Integer',
    'sfldMassAction': True,
    'sfldType': 'INTEGER',
    'sfldAvailableInSegments': True,
    'isInlineEditAllowed': True},
   '0001000U000003fl': {'sfldKey': '0001000U000003fl',
    'sfldKTable': 10,
    'sfldName': 'stblName',
    'sfldQuery': True,
    'sfldGrid': True,
    'sfldDataType': 'Alphanumeric',
    'sfldDataLength': 30,
    'sfldMassAction': True,
    'sfldType': 'ALPHANUMERIC',
    'sfldAvailableInSegments': True,
    'isInlineEditAllowed': True},
   '00010

In [8]:
type(dico_data)

dict

In [28]:
dico_data

{'data': {'fields': {'0001000U000003fh': {'sfldKey': '0001000U000003fh',
    'sfldKTable': 10,
    'sfldName': 'stblKey',
    'sfldQuery': True,
    'sfldGrid': True,
    'sfldDataType': 'Key',
    'sfldDefaultValue': 'KEYGEN',
    'sfldMassAction': True,
    'sfldType': 'FOB',
    'sfldUsage': 'K',
    'sfldAvailableInSegments': True},
   '0001000U000003fj': {'sfldKey': '0001000U000003fj',
    'sfldKTable': 10,
    'sfldName': 'stblKTable',
    'sfldQuery': True,
    'sfldGrid': True,
    'sfldDataType': 'Integer',
    'sfldMassAction': True,
    'sfldType': 'INTEGER',
    'sfldAvailableInSegments': True,
    'isInlineEditAllowed': True},
   '0001000U000003fl': {'sfldKey': '0001000U000003fl',
    'sfldKTable': 10,
    'sfldName': 'stblName',
    'sfldQuery': True,
    'sfldGrid': True,
    'sfldDataType': 'Alphanumeric',
    'sfldDataLength': 30,
    'sfldMassAction': True,
    'sfldType': 'ALPHANUMERIC',
    'sfldAvailableInSegments': True,
    'isInlineEditAllowed': True},
   '00010

In [35]:
tables_df = pd.DataFrame.from_dict(dico_data['data']['tables'].values(), orient="columns")
tables_df

Unnamed: 0,stblKind,stblName,stblCode,stblSecured,stblFrontIcon,stblRuleEngineEnabled,stblFrontSidePanelActive,stblFrontHomePageActive,stblFrontObjectPageActive,stblFrontMappedTableCode,...,stblKTable,stblCrmModule,stblPrivilege,stblIndexSearch,stblIndexActive,stblFavorite,linkedBeans,stblNotify,stblImportEnabled,categoryParentKTable
0,V,intg,intg,0,code,True,,,,,...,,,,,,,,,,
1,V,request_status,reqstatus,0,traffic-light-stop,True,True,True,True,,...,,,,,,,,,,
2,V,database,database,0,database,True,,True,True,,...,,,,,,,,,,
3,V,delegated_security,delegsecu,0,user-unlock,True,True,True,True,user,...,,,,,,,,,,
4,V,fmly,fmly,0,folder,True,True,True,True,fldr,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
404,E,PopulationDependency,pode,2,,,,,,,...,434000.0,marketing,,,,,,,,
405,E,AIPrompt,aipr,0,sparkles,,True,True,,,...,435000.0,,,,,,,,,
406,E,ExternalLink,elik,0,,,,,,,...,604170.0,administration,,,,,,,,
407,R,Fldr_Rule,fldrrule,0,,True,,,,,...,604180.0,relations,,,,,,,,


In [38]:
tables_df = tables_df[tables_df['stblKTable'].notna()] if 'stblKTable' in tables_df.columns else tables_df

In [41]:
cleaned_tables = [t for t in dico_data['data']['tables'].values() if t.get('stblKind') in ['E', 'R']]
len(cleaned_tables)

286

In [40]:
tables_df[tables_df['stblKind'].isin(['E', 'R'])]

Unnamed: 0,stblKind,stblName,stblCode,stblSecured,stblFrontIcon,stblRuleEngineEnabled,stblFrontSidePanelActive,stblFrontHomePageActive,stblFrontObjectPageActive,stblFrontMappedTableCode,...,stblKTable,stblCrmModule,stblPrivilege,stblIndexSearch,stblIndexActive,stblFavorite,linkedBeans,stblNotify,stblImportEnabled,categoryParentKTable
79,E,SecureToken,stkn,0,,,,,,,...,10120.0,administration,,,,,,,,
83,E,TracerEvent,trev,0,,,,,,,...,23000.0,administration,,,,,,,,
84,E,TracerJob,trjb,0,,,,,,,...,23500.0,administration,,,,,,,,
85,E,TracerRule,trru,0,,,,,,,...,23600.0,administration,,,,,,,,
86,E,LogEvent,loge,0,,,,,,,...,24000.0,administration,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
403,E,ConfigBundleOperation,cobo,0,,,,True,True,,...,433000.0,,,,,,,,,
404,E,PopulationDependency,pode,2,,,,,,,...,434000.0,marketing,,,,,,,,
405,E,AIPrompt,aipr,0,sparkles,,True,True,,,...,435000.0,,,,,,,,,
406,E,ExternalLink,elik,0,,,,,,,...,604170.0,administration,,,,,,,,


In [44]:
fields_df = pd.DataFrame.from_dict(dico_data['data']['fields'].values(), orient="columns")
fields_df

Unnamed: 0,sfldKey,sfldKTable,sfldName,sfldQuery,sfldGrid,sfldDataType,sfldDefaultValue,sfldMassAction,sfldType,sfldUsage,...,isInlineEditAllowed,sfldDataLength,sfldAllowNull,sfldFobKTable,sfldRefrTable,sfldRefrField,sfldMultivalue,sfldIsCustomCreated,sfldLabelPos,sfldTranslate
0,0001000U000003fh,10,stblKey,True,True,Key,KEYGEN,True,FOB,K,...,,,,,,,,,,
1,0001000U000003fj,10,stblKTable,True,True,Integer,,True,INTEGER,,...,True,,,,,,,,,
2,0001000U000003fl,10,stblName,True,True,Alphanumeric,,True,ALPHANUMERIC,,...,True,30.0,,,,,,,,
3,0001000U000003fn,10,stblSysCreatedDate,True,True,Date,DATEGMT,True,DATE,,...,True,,,,,,,,,
4,0001000U000003fp,10,stblSysUpdatedDate,True,True,Date,DATEGMT,True,DATE,,...,True,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4215,0001000U000003nj,604180,fldrruleSysUpdatedDate,True,True,DateTime,DATEGMT,True,DATE,,...,True,,True,,,,,True,,
4216,0001000U000003nl,604180,fldrruleMain,True,True,Logical,0,True,BOOLEAN,,...,True,,True,,,,,True,,
4217,0001000U000003nn,604180,fldrruleRuleKey,True,True,Key,KEYGEN,True,FOB,K,...,,,,,,,,True,,
4218,0001000U000003np,604180,fldrruleFolderKey,True,True,Key,KEYGEN,True,FOB,K,...,,,,,,,,True,,


In [45]:
fields_index = fields_df.groupby('sfldKTable') if 'sfldKTable' in fields_df.columns else {}

In [None]:
for table in cleaned_tables:
    ktable = table['stblKTable']
    table_fields = fields_index.get_group(ktable)

    content = _build_table_chunk(table, table_fields)

def _build_table_chunk(table, table_fields):
    name = table.get('stblName')
    code =table.get('stblCode')
    kind_flag = table.get('stblKind')
    kind = 'Entity' if kind_flag == 'E' else 'Relation' if kind_flag == 'R' else 'Other'

    required, optional, foreign_keys, all_defs = _classify_fields(table_fields)

    parent_dependencies = _infer_parents(foreign_keys)

    purpose_line = _infer_purpose(table, table_fields)
    
    parts = []
    parts.append(f"TABLE INGESTION PROFILE: {name} ({code}) - {kind}")
    parts.append(f"Purpose: {purpose_line}")
    parts.append("")
    parts.append("# SCHEMA & CONSTRAINTS")
    parts.append(f"Total fields: {len(table_fields)}")
    if required: 
        parts.append(f"Required fields ({len(required)}): " + ", ".join(f[0] for f in required))
    else:
        parts.append("Required fields: (none detected)")
    if foreign_keys:
        parts.append(f"Foreign keys ({len(foreign_keys)}): " + ", ".join(f[0] for f in foreign_keys))
    if parent_dependencies:
        parts.append(f"Depends on parent tables: " + ", ".join(sorted(parent_dependencies)))
    if kind == 'Relation':
        parts.append("Insertion rule: parent entity rows must exists before inserting this relation row.")
    
    primary_key_info = _detect_primary_key(tbl_fields)
    if primary_key_info:
        parts.append(f"Primary Key: {primary_key_info['field_name']} ({primary_key_info['code']})" + (" [AUTO_INCREMENT]" if primary_key_info.get('auto_increment') else ""))
    parts.append("")

    parts.append("# FIELD DEFINITIONS")
    for f in all_defs:
        extra = []
        if primary_key_info and f['code'] == primary_key_info['code']:
            extra.append("PRIMARY KEY")
        if f['fk_flag']:
            extra.append("FOREIGN KEY")
        marker = (' [' + ', '.join(extra) + ']') if extra else '' 
        parts.append(f"- {f['name']} ({f['code']}): {f['type']}{f['size']} | {'REQUIRED' if not f['nullable'] else 'NULLABLE'}{marker}")
    parts.append("")

    parts.append("# VALIDATION RULES")
    parts.append("- All REQUIRED fields must be provided.")
    if foreign_keys:
        parts.append("- Foreign key fields must reference existing primary keys in their parent tables.")
    parts.append("- Respect maximum sizes for VARCHAR-like fields.")
    parts.append("- Ensure data types (numeric/date/text) align with field expectations.") 
    parts.append("")

    parts.append("# FOREIGN KEY RELATIONSHIPS")
    if foreign_keys: 
        for fk_name, fk_code, fk_target in foreign_keys:
            parts.append(f"- {fk_name} ({fk_code}) -> references parent table ' {fk_target or 'UNKNOWN'}")
    else:
        parts.append("(No foreign keys detected)")
    parts.append("")

    parts.append("# COMMON DATA MAPPING EXAMPLES")
    mapping_examples = _mapping_examples(name, required, optional)
    for ex in mapping_examples: 
        parts.append(ex)
    parts.append("")
    
    parts.append("# INSERTION ORDER GUIDANCE")
    if kind == 'Entity' and foreign_keys: 
        parts.append("- Insert parent tables first, then this entity (due to foreign key dependencies).")
    elif kind == 'Entity':
        parts.append("This entity can be inserted independently (no foreign key dependencies detected).")
    else: 
        parts.append("Insert all referenced parent entity rows before relation rows.")
    parts.append("")

    parts.append("# RISK / AMBIGUITY AREAS")
    if not required:
        parts.append("- No required fields detected: verify if primary key is auto-generated.")
    if len(required) > 15:
        parts.append("- High number of required fields: consider staged ingestion.")
    if any('date' in f[1].lower() for f in required):
        parts.append("- Required date fields: ensure correct date formats. (YYYY-MM-DD)")
    parts.append("")

    return "\n".join(parts)

def _classify_fields(self, table_fields: pd.DataFrame):
    required = []
    optional = []
    foreign_keys = []
    all_defs = []
    parent_name_lower = {n.lower() for n in parent_candidates.get(_current_table_name(table_fields), [])}
    for _, row in table_fields.iterrows():
        name = row.get('sfldName')
        code = row.get('sfldCode')
        ftype = row.get('sfldType')
        size_val = row.get('sfldSize')
        size = f"({int(size_val)})" if isinstance(size_val, (int, float)) and size_val else ""
        nullable = bool(row.get('sfldNullable', True))

        is_primary_key = code in ('Key', 'KEY') or (code and code.lower().endswith('id') and not nullable) 
        is_foreign_key = False
        target_guess = None 
        if code and code.startswith('K') and code.lower() != 'key':
            is_foreign_key = True
            core = code[1:].lower()
            target_guess = _match_table_by_code_fragment(core)
        elif code and any(parent in code.lower() for parent in parent_name_lower):
            is_foreign_key = True 
            target_guess = _match_table_bt_code_fragment(code.lower())
        if is_foreign_key and not target_guess:
            target_giess = None 
        if is_foreign_key:
            foreign_keys.append((name, code, target_guess))
        elif not nullable and not is_primary_key:
            required.append((name, code, None))
        else:
            optional.append((name, code, None))
        all_defs.append({
            'name': name, 
            'code': code, 
            'type': ftype,
            'size': size,
            'nullable': nullable,
            'fk_flag': ' [FOREIGN KEY]' if is_foreign_key else ''
        })

    return required, optional, foreign_keys, all_defs 


def _current_table_name(table_fields): 
    try: 
        k = table_fields.iloc[0]['sfldKTable']
        table = _table_by_k.get(k)
        return table.get('stblName') if table else '' 
    except Exception:
        return '' 
    
def _detect_primary_key(table_fields):
    for _, row in table_fields.iterrows():
        code = row.get('sfldcode')
        name = row.get('sfldName') 
        if code in ('Key', 'KEY') or (code and code.lower().endswith('id')):
            auto_increment = _infer_auto_increment(row)
            return {'field_name': name, "code": code, "auto_increment": auto_increment}
    return None

def _infer_auto_increment(row): 
    field_type = str(row.get('sfldType', '')).lower()
    code = str(row.get('sfldCode', '')).lower()
    nullable = bool(row.get('sfldNullable', True))
    if code == 'key' and (('int' in field_type) or ('number' in field_type) and not nullable):
        return True 
    return False 

def _infer_purpose(table, table_fields): 
    if not enable_llm_summary or not client:
        return '<ADD BUSINESS PURPOSE HERE>'
    field_names = [str(name) for name in table_fields["sfldName"].head(12)]
    prompt = (
        "You are a nalysing a CRM database schema." \
        "Given the table name and its fields name, produce a concise business purpose statement (max 20 words).\n"
        f"Table: {table.get('stblName')}\nFields: {', '.join(field_names)}\nPurpose:"
    )
    try:
        response = client.chat.completion.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            temperature = 0.3,
            max_tokens=60
        )
        text = response.choices[0].message.content.strip()
        return text 
    except Exception:
        return '<ADD BUSINESS PURPOSE HERE>'
    
def _build_relationship_graph():
    graph = {}
    for table in cleaned_tables: 
        name = table.get('stblName')
        beans = table.get('stblBeans', {}) or {}
        relatives = []
        for _, arr in beans.items():
            for bean in arr:
                target = bean.get('detailEntity')
                if target and target != name:
                    relatives.append(target)
        graph[name] = sorted(set(relatives))
    return graph


