In [12]:
import os
import json
import pandas as pd
import requests
from typing import Dict, List, Any, Optional, Union
import google.generativeai as genai
from dotenv import load_dotenv

class ERPDataMapper:
    """
    A class for extracting, mapping, and standardizing data from various ERP systems
    using Google's Gemini LLM for intelligent data transformation.
    """
    
    def __init__(self, gemini_api_key: str = None):
        """
        Initialize the ERP Data Mapper.
        
        Args:
            gemini_api_key: API key for Google's Gemini LLM
        """
        # Load environment variables if API key is not provided
        if not gemini_api_key:
            load_dotenv()
            gemini_api_key = "AIzaSyCyJBxlzOnVrUIMvx6gw66i19Y32lDGNP4"
            
        if not gemini_api_key:
            raise ValueError("Gemini API key is required. Set it in .env file or pass directly.")
            
        # Configure Gemini API
        genai.configure(api_key=gemini_api_key)
        
        # Set up Gemini model
        self.model = genai.GenerativeModel('gemini-1.5-pro')
        
        # Define standard data schema templates
        self.standard_schemas = {
            "customer": {
                "id": "",
                "name": "",
                "contact_email": "",
                "contact_phone": "",
                "address": "",
                "customer_since": "",
                "status": "",
                "segment": ""
            },
            "product": {
                "id": "",
                "name": "",
                "description": "",
                "price": "",
                "category": "",
                "sku": "",
                "stock_quantity": "",
                "unit": ""
            },
            "invoice": {
                "id": "",
                "customer_id": "",
                "date": "",
                "due_date": "",
                "total_amount": "",
                "tax_amount": "",
                "status": "",
                "line_items": []
            },
            "employee": {
                "id": "",
                "name": "",
                "department": "",
                "position": "",
                "email": "",
                "phone": "",
                "hire_date": "",
                "manager_id": ""
            }
        }
        
    def connect_to_erp(self, erp_type: str, connection_params: Dict[str, Any]) -> bool:
        """
        Connect to the specified ERP system.
        
        Args:
            erp_type: Type of ERP system (e.g., 'sap', 'oracle', 'microsoft_dynamics', 'odoo')
            connection_params: Connection parameters specific to the ERP system
            
        Returns:
            bool: True if connection was successful, False otherwise
        """
        self.erp_type = erp_type.lower()
        self.connection_params = connection_params
        
        # Implement connection logic based on ERP type
        try:
            if self.erp_type == "sap":
                # SAP connection logic would go here
                # This is a placeholder - real implementation would use SAP-specific libraries
                print(f"Connecting to SAP ERP with parameters: {connection_params}")
                self.connection = {"type": "sap", "status": "connected"}
                return True
                
            elif self.erp_type == "oracle":
                # Oracle connection logic
                print(f"Connecting to Oracle ERP with parameters: {connection_params}")
                self.connection = {"type": "oracle", "status": "connected"}
                return True
                
            elif self.erp_type == "microsoft_dynamics":
                # Microsoft Dynamics connection logic
                print(f"Connecting to Microsoft Dynamics with parameters: {connection_params}")
                self.connection = {"type": "microsoft_dynamics", "status": "connected"}
                return True
                
            elif self.erp_type == "odoo":
                # Odoo connection logic
                print(f"Connecting to Odoo with parameters: {connection_params}")
                self.connection = {"type": "odoo", "status": "connected"}
                return True
                
            elif self.erp_type == "quickbooks":
                # QuickBooks connection logic
                print(f"Connecting to QuickBooks with parameters: {connection_params}")
                self.connection = {"type": "quickbooks", "status": "connected"}
                return True
                
            elif self.erp_type == "netsuite":
                # NetSuite connection logic
                print(f"Connecting to NetSuite with parameters: {connection_params}")
                self.connection = {"type": "netsuite", "status": "connected"}
                return True
                
            elif self.erp_type == "api":
                # Generic API connection
                print(f"Connecting to API with parameters: {connection_params}")
                self.connection = {"type": "api", "status": "connected"}
                return True
                
            else:
                print(f"Unsupported ERP type: {self.erp_type}")
                return False
                
        except Exception as e:
            print(f"Error connecting to {self.erp_type}: {str(e)}")
            return False
    
    def extract_data(self, data_type: str, query_params: Dict[str, Any] = None) -> List[Dict[str, Any]]:
        """
        Extract data from the connected ERP system.
        
        Args:
            data_type: Type of data to extract (e.g., 'customers', 'products', 'invoices')
            query_params: Optional parameters to filter the data
            
        Returns:
            List of dictionaries containing the extracted data
        """
        if not hasattr(self, 'connection'):
            raise ValueError("Not connected to any ERP system. Call connect_to_erp() first.")
        
        # This is a mock implementation - real implementation would use 
        # ERP-specific APIs or database queries
        try:
            # Example mock data - in a real scenario, this would pull from the ERP
            if data_type == "customers":
                # Mock customer data
                customers = [
                    {"customer_id": "C001", "customer_name": "Acme Corp", "email": "contact@acme.com", 
                     "phone": "555-123-4567", "address": "123 Main St", "registration_date": "2020-01-15", 
                     "customer_type": "Corporate", "industry": "Manufacturing"},
                    {"customer_id": "C002", "customer_name": "TechSolutions", "email": "info@techsolutions.com", 
                     "phone": "555-987-6543", "address": "456 Tech Blvd", "registration_date": "2019-05-20", 
                     "customer_type": "Corporate", "industry": "IT Services"}
                ]
                return customers
                
            elif data_type == "products":
                # Mock product data
                products = [
                    {"product_id": "P001", "product_name": "Widget Pro", "description": "Professional grade widget", 
                     "unit_price": 29.99, "category": "Widgets", "product_code": "WDG-PRO", "current_stock": 150, 
                     "measurement": "units"},
                    {"product_id": "P002", "product_name": "Gadget Plus", "description": "Enhanced gadget", 
                     "unit_price": 49.99, "category": "Gadgets", "product_code": "GDG-PLS", "current_stock": 75, 
                     "measurement": "units"}
                ]
                return products
                
            elif data_type == "invoices":
                # Mock invoice data
                invoices = [
                    {"invoice_id": "INV001", "customer_id": "C001", "invoice_date": "2023-06-15", 
                     "payment_due": "2023-07-15", "invoice_total": 299.90, "tax": 29.99, 
                     "invoice_status": "Paid", "items": [
                         {"product_id": "P001", "quantity": 10, "price": 29.99}
                     ]},
                    {"invoice_id": "INV002", "customer_id": "C002", "invoice_date": "2023-06-20", 
                     "payment_due": "2023-07-20", "invoice_total": 499.90, "tax": 49.99, 
                     "invoice_status": "Pending", "items": [
                         {"product_id": "P002", "quantity": 10, "price": 49.99}
                     ]}
                ]
                return invoices
                
            elif data_type == "employees":
                # Mock employee data
                employees = [
                    {"employee_id": "E001", "full_name": "John Smith", "dept": "Sales", 
                     "job_title": "Sales Manager", "work_email": "john.smith@company.com", 
                     "work_phone": "555-111-2222", "start_date": "2018-03-15", "reports_to": "E005"},
                    {"employee_id": "E002", "full_name": "Jane Doe", "dept": "Marketing", 
                     "job_title": "Marketing Specialist", "work_email": "jane.doe@company.com", 
                     "work_phone": "555-333-4444", "start_date": "2019-08-01", "reports_to": "E006"}
                ]
                return employees
                
            else:
                print(f"Unsupported data type: {data_type}")
                return []
                
        except Exception as e:
            print(f"Error extracting {data_type} data: {str(e)}")
            return []
    
    def map_data_with_gemini(self, 
                             data: List[Dict[str, Any]], 
                             target_schema: str) -> List[Dict[str, Any]]:
        """
        Use Gemini LLM to map data from the ERP format to the standard schema.
        
        Args:
            data: List of data records from the ERP system
            target_schema: Target schema name (e.g., 'customer', 'product')
            
        Returns:
            List of dictionaries in the standardized format
        """
        if not data:
            return []
        
        if target_schema not in self.standard_schemas:
            raise ValueError(f"Unknown target schema: {target_schema}")
        
        # Prepare standard schema template
        standard_template = self.standard_schemas[target_schema]
        
        # Prepare prompt for Gemini
        prompt = f"""
        You are a data mapping expert. Map the following data from an {self.erp_type.upper()} ERP system 
        to the standard {target_schema} schema shown below.
        
        Source Data (from {self.erp_type.upper()} ERP):
        {json.dumps(data[0], indent=2)}
        
        Target Schema:
        {json.dumps(standard_template, indent=2)}
        
        Generate a JSON mapping that shows which source fields map to which target fields. 
        Use the exact field names from the source data and target schema. If a field doesn't have 
        a direct mapping, indicate that it should be left empty or derive it from other fields if possible.
        
        Format your response as valid JSON in this format:
        {{"mapping": {{"source_field1": "target_field1", "source_field2": "target_field2", ...}}}}
        """
        
        # Generate mapping using Gemini
        try:
            response = self.model.generate_content(prompt)
            mapping_text = response.text
            
            # Extract JSON from the response
            try:
                # Try to find JSON in the response if it's formatted with markdown or other text
                json_start = mapping_text.find('{')
                json_end = mapping_text.rfind('}') + 1
                if json_start >= 0 and json_end > json_start:
                    mapping_json = json.loads(mapping_text[json_start:json_end])
                else:
                    mapping_json = json.loads(mapping_text)
                
                field_mapping = mapping_json.get('mapping', {})
            except json.JSONDecodeError:
                print(f"Failed to parse mapping JSON from Gemini response: {mapping_text}")
                # Fallback to simple field name matching if JSON parsing fails
                field_mapping = {}
                for source_field in data[0].keys():
                    for target_field in standard_template.keys():
                        if source_field.lower() in target_field.lower() or target_field.lower() in source_field.lower():
                            field_mapping[source_field] = target_field
            
            # Apply the mapping to all data records
            mapped_data = []
            for record in data:
                mapped_record = dict(standard_template)  # Start with empty template
                
                # Apply direct field mappings
                for source_field, target_field in field_mapping.items():
                    if source_field in record and target_field in mapped_record:
                        mapped_record[target_field] = record[source_field]
                
                mapped_data.append(mapped_record)
            
            return mapped_data
            
        except Exception as e:
            print(f"Error mapping data with Gemini: {str(e)}")
            # Fallback to basic field name matching
            return self._basic_field_mapping(data, target_schema)
    
    def _basic_field_mapping(self, 
                             data: List[Dict[str, Any]], 
                             target_schema: str) -> List[Dict[str, Any]]:
        """
        Fallback method to perform basic field mapping based on field name similarity.
        
        Args:
            data: List of data records from the ERP system
            target_schema: Target schema name
            
        Returns:
            List of dictionaries in the standardized format
        """
        standard_template = self.standard_schemas[target_schema]
        mapped_data = []
        
        for record in data:
            mapped_record = dict(standard_template)  # Start with empty template
            
            # Map fields based on similar field names
            for source_field in record:
                source_field_lower = source_field.lower()
                
                # Try direct mapping first
                if source_field_lower in standard_template:
                    mapped_record[source_field_lower] = record[source_field]
                    continue
                    
                # Try matching field names
                for target_field in standard_template:
                    # Check if source field contains the target field or vice versa
                    if (target_field in source_field_lower or 
                        source_field_lower in target_field or
                        self._calculate_similarity(source_field_lower, target_field) > 0.7):
                        mapped_record[target_field] = record[source_field]
                        break
            
            mapped_data.append(mapped_record)
        
        return mapped_data
    
    def _calculate_similarity(self, str1: str, str2: str) -> float:
        """
        Calculate string similarity between two strings.
        
        Args:
            str1: First string
            str2: Second string
            
        Returns:
            Float value between 0 and 1 representing similarity
        """
        # Simple Jaccard similarity implementation
        set1 = set(str1)
        set2 = set(str2)
        
        if not set1 or not set2:
            return 0.0
            
        intersection = len(set1.intersection(set2))
        union = len(set1.union(set2))
        
        return intersection / union
    
    def export_data(self, 
                    data: List[Dict[str, Any]], 
                    format_type: str = "json", 
                    output_path: str = None) -> Union[str, pd.DataFrame]:
        """
        Export the standardized data to the specified format.
        
        Args:
            data: List of standardized data records
            format_type: Output format ('json', 'csv', 'excel', 'df')
            output_path: Optional path to save the output file
            
        Returns:
            Path to the saved file, DataFrame, or JSON string depending on format_type
        """
        if not data:
            print("No data to export.")
            return None
            
        try:
            if format_type.lower() == "json":
                if output_path:
                    with open(output_path, 'w') as f:
                        json.dump(data, f, indent=2)
                    return output_path
                else:
                    return json.dumps(data, indent=2)
                    
            elif format_type.lower() == "csv":
                df = pd.DataFrame(data)
                if output_path:
                    df.to_csv(output_path, index=False)
                    return output_path
                else:
                    return df
                    
            elif format_type.lower() == "excel":
                if not output_path:
                    output_path = "erp_data_export.xlsx"
                df = pd.DataFrame(data)
                df.to_excel(output_path, index=False)
                return output_path
                
            elif format_type.lower() == "df":
                return pd.DataFrame(data)
                
            else:
                print(f"Unsupported export format: {format_type}")
                return None
                
        except Exception as e:
            print(f"Error exporting data: {str(e)}")
            return None
    
    def extract_and_map(self, 
                        data_type: str, 
                        target_schema: str, 
                        query_params: Dict[str, Any] = None, 
                        export_format: str = "json", 
                        output_path: str = None) -> Union[str, pd.DataFrame, List[Dict[str, Any]]]:
        """
        Extract data from the ERP system, map it to the standard schema, and optionally export it.
        
        Args:
            data_type: Type of data to extract (e.g., 'customers', 'products', 'invoices')
            target_schema: Target schema name
            query_params: Optional parameters to filter the data
            export_format: Output format ('json', 'csv', 'excel', 'df')
            output_path: Optional path to save the output file
            
        Returns:
            Exported data in the specified format
        """
        # Extract data from ERP
        raw_data = self.extract_data(data_type, query_params)
        
        if not raw_data:
            print(f"No {data_type} data extracted from {self.erp_type}.")
            return None
            
        # Map data to standard schema
        mapped_data = self.map_data_with_gemini(raw_data, target_schema)
        
        # Export data if format is specified
        if export_format and export_format.lower() != "none":
            return self.export_data(mapped_data, export_format, output_path)
        else:
            return mapped_data


# Example usage
if __name__ == "__main__":
    # Initialize the mapper
    mapper = ERPDataMapper()
    
    # Connect to an ERP system
    connection_params = {
        "server": "erp.example.com",
        "username": "api_user",
        "password": "api_password",
        "database": "prod_db"
    }
    mapper.connect_to_erp("sap", connection_params)
    
    # Extract and map customer data
    customer_data = mapper.extract_and_map(
        data_type="customers",
        target_schema="customer",
        export_format="json",
        output_path="standardized_customers.json"
    )
    
    # Extract and map product data
    product_data = mapper.extract_and_map(
        data_type="products",
        target_schema="product",
        export_format="csv",
        output_path="standardized_products.csv"
    )
    
    # Extract and map invoice data
    invoice_data = mapper.extract_and_map(
        data_type="invoices",
        target_schema="invoice",
        export_format="excel",
        output_path="standardized_invoices.xlsx"
    )
    
    print("Data extraction and mapping completed.")

Connecting to SAP ERP with parameters: {'server': 'erp.example.com', 'username': 'api_user', 'password': 'api_password', 'database': 'prod_db'}
Failed to parse mapping JSON from Gemini response: ```json
{
  "mapping": {
    "customer_id": "id",
    "customer_name": "name",
    "email": "contact_email",
    "phone": "contact_phone",
    "address": "address",
    "registration_date": "customer_since",
    "customer_type": "segment", 
    "industry": null,  // No direct mapping, could potentially be used to derive 'segment' in a more complex scenario
    "status": "" // Left empty as no corresponding field exists in the source data
  }
}
```

Data extraction and mapping completed.
