In [None]:
import pandas as pd
import numpy as np
from datetime import datetime
from sqlalchemy import create_engine
from pymongo import MongoClient
import requests
import json
from typing import Dict, List, Union
import logging

class DataPipeline:
    def __init__(self, config: Dict):
        """
        Initialize data pipeline with configuration
        
        config: Dictionary containing:
            - database credentials
            - API endpoints
            - file paths
            - data schemas
        """
        self.config = config
        self.sql_engine = create_engine(config['sql_connection_string'])
        self.mongo_client = MongoClient(config['mongo_connection_string'])
        self.logger = logging.getLogger(__name__)
        
    def ingest_structured_data(self) -> pd.DataFrame:
        """Ingest data from various structured sources"""
        
        # 1. Financial Data
        financial_data = self._fetch_financial_metrics()
        
        # 2. ESG Metrics
        esg_data = self._fetch_esg_metrics()
        
        # 3. Climate Data
        climate_data = self._fetch_climate_data()
        
        # Merge all structured data
        merged_data = pd.merge(financial_data, esg_data, on='project_id')
        merged_data = pd.merge(merged_data, climate_data, on='project_id')
        
        return merged_data
    
    def ingest_unstructured_data(self) -> Dict:
        """Ingest unstructured data like documents and reports"""
        
        # 1. Project Documentation
        project_docs = self._fetch_project_documents()
        
        # 2. Sustainability Reports
        sustainability_reports = self._fetch_sustainability_reports()
        
        # 3. News and Media Coverage
        news_data = self._fetch_news_data()
        
        return {
            'project_documents': project_docs,
            'sustainability_reports': sustainability_reports,
            'news_data': news_data
        }
    
    def _fetch_financial_metrics(self) -> pd.DataFrame:
        """Fetch financial data from various sources"""
        dfs = []
        
        # Internal database
        query = """
        SELECT project_id, investment_amount, expected_roi, 
               implementation_cost, maintenance_cost
        FROM financial_metrics
        WHERE data_date >= NOW() - INTERVAL '1 year'
        """
        internal_data = pd.read_sql(query, self.sql_engine)
        dfs.append(internal_data)
        
        # External APIs (example: Bloomberg, Reuters)
        for api_config in self.config['financial_apis']:
            response = requests.get(
                api_config['endpoint'],
                headers={'Authorization': api_config['key']}
            )
            if response.status_code == 200:
                api_data = pd.DataFrame(response.json())
                dfs.append(api_data)
        
        return pd.concat(dfs, ignore_index=True)
    
    def _fetch_esg_metrics(self) -> pd.DataFrame:
        """Fetch ESG data from various sources"""
        
        # MongoDB collection for ESG metrics
        esg_collection = self.mongo_client.green_finance.esg_metrics
        
        # Fetch from multiple sources
        esg_data = []
        
        # 1. Carbon Emissions Data
        emissions_data = pd.DataFrame(list(esg_collection.find(
            {'metric_type': 'emissions'}
        )))
        
        # 2. Social Impact Metrics
        social_data = pd.DataFrame(list(esg_collection.find(
            {'metric_type': 'social_impact'}
        )))
        
        # 3. Governance Scores
        governance_data = pd.DataFrame(list(esg_collection.find(
            {'metric_type': 'governance'}
        )))
        
        return pd.concat([emissions_data, social_data, governance_data], 
                        ignore_index=True)
    
    def _fetch_climate_data(self) -> pd.DataFrame:
        
        climate_data = []
        
        # 1. Weather Data APIs
        for weather_api in self.config['weather_apis']:
            response = requests.get(weather_api['endpoint'])
            if response.status_code == 200:
                climate_data.append(pd.DataFrame(response.json()))
        
        # 2. Environmental Impact Assessments
        env_impact_query = """
        SELECT project_id, impact_type, impact_score
        FROM environmental_impacts
        WHERE assessment_date >= NOW() - INTERVAL '6 months'
        """
        env_impacts = pd.read_sql(env_impact_query, self.sql_engine)
        climate_data.append(env_impacts)
        
        return pd.concat(climate_data, ignore_index=True)
    
    def preprocess_data(self, structured_data: pd.DataFrame, 
                       unstructured_data: Dict) -> Dict:
        """Preprocess and clean the data"""
        
        # 1. Handle missing values
        structured_data = self._handle_missing_values(structured_data)
        
        # 2. Feature engineering
        structured_data = self._engineer_features(structured_data)
        
        # 3. Text preprocessing for unstructured data
        processed_docs = self._preprocess_documents(unstructured_data)
        
        # 4. Normalize numerical features
        structured_data = self._normalize_features(structured_data)
        
        return {
            'structured_data': structured_data,
            'processed_documents': processed_docs
        }
    
    def validate_data(self, data: Dict) -> bool:
        """Validate data quality and consistency"""
        
        validation_results = []
        
        # 1. Check for required fields
        required_fields = self.config['required_fields']
        fields_present = all(field in data['structured_data'].columns 
                           for field in required_fields)
        validation_results.append(fields_present)
        
        # 2. Validate data types
        correct_types = self._validate_data_types(data['structured_data'])
        validation_results.append(correct_types)
        
        # 3. Check value ranges
        valid_ranges = self._validate_value_ranges(data['structured_data'])
        validation_results.append(valid_ranges)
        
        return all(validation_results)

In [None]:
class DataLoader:
    def __init__(self, pipeline: DataPipeline):
        self.pipeline = pipeline
    
    def load_data(self) -> Dict:
        """Main method to load and prepare all data"""
        
        # 1. Ingest all data
        structured_data = self.pipeline.ingest_structured_data()
        unstructured_data = self.pipeline.ingest_unstructured_data()
        
        # 2. Preprocess data
        processed_data = self.pipeline.preprocess_data(
            structured_data, unstructured_data
        )
        
        # 3. Validate data
        if not self.pipeline.validate_data(processed_data):
            raise ValueError("Data validation failed")
        
        return processed_data