In [1]:
import os
import pandas as pd
import psycopg2
import requests
import shutil
import logging
import json
from datetime import datetime
import configparser

class CKANDatastoreLoader:
    def __init__(self, config_path='config.ini'):
        # Load configuration
        self.config = configparser.ConfigParser()
        self.config.read(config_path)

        # Configure logging
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s: %(message)s',
            handlers=[
                logging.FileHandler('datastore_loader.log', encoding='utf-8'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

        # Database connection parameters
        self.db_params = {
            'dbname': self.config.get('Database', 'dbname'),
            'user': self.config.get('Database', 'user'),
            'password': self.config.get('Database', 'password'),
            'host': self.config.get('Database', 'host')
        }

        # CKAN API details
        self.ckan_api_url = self.config.get('CKAN', 'api_url')
        self.ckan_api_key = self.config.get('CKAN', 'api_key')

        # Directory paths
        self.pending_root_dir = self.config.get('Paths', 'pending_dir')
        self.pending_file_dir = os.path.join(self.pending_root_dir, 'files')
        self.pending_metadata_dir = os.path.join(self.pending_root_dir, 'metadata')
        self.completed_root_dir = self.config.get('Paths', 'completed_dir')
        self.completed_file_dir = os.path.join(self.completed_root_dir, 'files')
        self.completed_metadata_dir = os.path.join(self.completed_root_dir, 'metadata')
        self.completed_report_dir = os.path.join(self.completed_root_dir, 'report')

        # Ensure directories exist
        for path in [
            self.pending_root_dir, self.pending_file_dir, self.pending_metadata_dir,
            self.completed_root_dir, self.completed_file_dir, self.completed_metadata_dir, self.completed_report_dir
        ]:
            os.makedirs(path, exist_ok=True)

        # Report file setup
        self.report_file = os.path.join(self.completed_report_dir, f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
        self.report_data = []

    def _sanitize_column_name(self, name):
        """Sanitize column names to be compatible with PostgreSQL."""
        return ''.join(c if c.isalnum() or c == '_' else '_' for c in name).lower()

    def _infer_postgres_type(self, dtype):
        """Map pandas data types to PostgreSQL types."""
        type_mapping = {
            'int64': 'BIGINT',
            'float64': 'FLOAT',
            'object': 'TEXT',
            'datetime64[ns]': 'TIMESTAMP',
            'bool': 'BOOLEAN'
        }
        return type_mapping.get(str(dtype), 'TEXT')

    def process_file(self, filename, metadata_template=None):
        """Process a single file to load it into CKAN and update the data dictionary."""
        try:
            file_path = os.path.join(self.pending_file_dir, filename)
            metadata_path = None
            file_loaded = False
            metadata_loaded = False
            data_dictionary_loaded = False

            # Read the file based on its extension
            if filename.endswith('.csv'):
                df = pd.read_csv(file_path)
            elif filename.endswith('.xlsx'):
                df = pd.read_excel(file_path)
            else:
                self.logger.error(f"Unsupported file type: {filename}")
                self.report_data.append([filename, None, False, False, False])
                return False

            # Sanitize table name
            table_name = f"datastore_{self._sanitize_column_name(os.path.splitext(filename)[0])}"

            # Create or update CKAN dataset and resource
            try:
                dataset_payload = metadata_template.get("dataset", {})
                dataset_payload["tags"] = (
                    dataset_payload.get("tags", []) if isinstance(dataset_payload.get("tags", []), list)
                    else [{"name": tag} for tag in dataset_payload.get("tags", [])]
                )

                # Validate required fields
                required_fields = ["name", "title", "notes", "owner_org"]
                missing_fields = [field for field in required_fields if not dataset_payload.get(field)]
                if missing_fields:
                    self.logger.error(f"Missing required dataset fields: {', '.join(missing_fields)}. Cannot create dataset.")
                    self.report_data.append([filename, table_name, file_loaded, False, False])
                    return False

                # Handle tags safely
                if not dataset_payload["tags"]:
                    self.logger.warning("Tags array is empty. Using default tags.")
                    dataset_payload["tags"] = [{"name": "default_tag"}]

                # Check if dataset already exists
                existing_dataset_response = requests.get(
                    f"{self.ckan_api_url}/package_show",
                    params={"id": dataset_payload["name"]},
                    headers={"Authorization": self.ckan_api_key}
                )

                if existing_dataset_response.status_code == 200:
                    dataset_payload["id"] = existing_dataset_response.json()["result"]["id"]
                    create_dataset_response = requests.post(
                        f"{self.ckan_api_url}/package_update", 
                        json=dataset_payload,
                        headers={
                            "Authorization": self.ckan_api_key,
                            "Content-Type": "application/json"
                        }
                    )
                    self.logger.info(f"Updated existing dataset: {dataset_payload['name']}")
                else:
                    create_dataset_response = requests.post(
                        f"{self.ckan_api_url}/package_create", 
                        json=dataset_payload,
                        headers={
                            "Authorization": self.ckan_api_key,
                            "Content-Type": "application/json"
                        }
                    )
                    if create_dataset_response.status_code == 409:
                        self.logger.warning(f"Dataset '{dataset_payload['name']}' already exists. Skipping creation.")
                        return False

                create_dataset_response.raise_for_status()
                dataset_result = create_dataset_response.json()
                dataset_id = dataset_result["result"]["id"]

                # Check if resource already exists in the dataset
                dataset_resources_response = requests.get(
                    f"{self.ckan_api_url}/package_show",
                    params={"id": dataset_id},
                    headers={"Authorization": self.ckan_api_key}
                )
                if dataset_resources_response.status_code == 200:
                    dataset_resources = dataset_resources_response.json()["result"]["resources"]
                    resource_exists = False
                    for resource in dataset_resources:
                        if resource["name"] == filename:
                            resource_exists = True
                            resource_id = resource["id"]
                            break

                if resource_exists:
                    # Update the existing resource
                    resource_payload = metadata_template.get("resource", {})
                    resource_payload["id"] = resource_id
                    update_resource_response = requests.post(
                        f"{self.ckan_api_url}/resource_update", 
                        data=resource_payload,
                        files={"upload": (filename, open(file_path, "rb"))},
                        headers={"Authorization": self.ckan_api_key}
                    )
                    update_resource_response.raise_for_status()
                    self.logger.info(f"Updated existing resource: {filename}")
                else:
                    # Create a new resource
                    resource_payload = metadata_template.get("resource", {})
                    resource_payload["package_id"] = dataset_id
                    create_resource_response = requests.post(
                        f"{self.ckan_api_url}/resource_create", 
                        data=resource_payload,
                        files={"upload": (filename, open(file_path, "rb"))},
                        headers={"Authorization": self.ckan_api_key}
                    )
                    create_resource_response.raise_for_status()
                    resource_result = create_resource_response.json()
                    resource_id = resource_result["result"]["id"]
                    self.logger.info(f"Created new resource: {filename}")

                # Move processed file to completed directory
                completed_path = os.path.join(self.completed_file_dir, filename)
                shutil.move(file_path, completed_path)
                self.logger.info(f"Moved {filename} to {completed_path}")

                # Move processed metadata file to completed/metadata directory
                if metadata_template:
                    metadata_matches = [
                        f for f in os.listdir(self.pending_metadata_dir) 
                        if f.startswith(f"metadata_{os.path.splitext(filename)[0]}")
                    ]
                    if metadata_matches:
                        latest_metadata = max(metadata_matches)
                        metadata_path = os.path.join(self.pending_metadata_dir, latest_metadata)
                        completed_metadata_path = os.path.join(self.completed_metadata_dir, latest_metadata)
                        shutil.move(metadata_path, completed_metadata_path)
                        self.logger.info(f"Moved metadata file {latest_metadata} to {completed_metadata_path}")
                metadata_loaded = True

                # Push resource to DataStore
                self.push_to_datastore(resource_id, df)

            except Exception as e:
                self.logger.error(f"Failed to create CKAN dataset/resource for {filename}: {str(e)}")
                self.report_data.append([filename, table_name, file_loaded, False, False])
                return False

            # Update CKAN datastore schema with field descriptions
            try:
                if metadata_template and "resource" in metadata_template and "schema" in metadata_template["resource"]:
                    schema_fields = metadata_template["resource"]["schema"].get("fields", [])
                    if schema_fields:
                        self.logger.info(f"Attempting to update data dictionary for resource ID: {resource_id}")
                        self.update_schema_dictionary(resource_id, schema_fields)
                        data_dictionary_loaded = True
                    else:
                        self.logger.warning("No schema fields found in metadata template. Skipping data dictionary update.")
                else:
                    self.logger.warning("No valid metadata template found. Skipping data dictionary update.")
            except Exception as e:
                self.logger.error(f"Failed to update data dictionary for {filename}: {str(e)}")

            # Append to report data
            self.report_data.append([filename, table_name, file_loaded, metadata_loaded, data_dictionary_loaded])
            return True

        except Exception as e:
            self.logger.error(f"An error occurred while processing {filename}: {str(e)}")
            self.report_data.append([filename, None, False, False, False])
            return False

    def push_to_datastore(self, resource_id, df):
        """
        Push the resource to the CKAN DataStore.
        """
        try:
            datastore_create_payload = {
                "resource_id": resource_id,
                "force": True,  # Override read-only restriction
                "fields": [
                    {"id": col, "type": self._infer_postgres_type(df[col].dtype)}
                    for col in df.columns
                ],
                "records": df.to_dict(orient="records")  # Include data records
            }
            response = requests.post(
                f"{self.ckan_api_url}/datastore_create",
                json=datastore_create_payload,
                headers={"Authorization": self.ckan_api_key}
            )
            if response.status_code == 200:
                self.logger.info(f"Resource {resource_id} successfully pushed to DataStore.")
            else:
                self.logger.error(f"Failed to push resource to DataStore: {response.text}")
        except requests.exceptions.RequestException as e:
            self.logger.error(f"API request failed: {e}")
            if hasattr(e, "response"):
                self.logger.error(f"Response content: {e.response.text}")

    def update_schema_dictionary(self, resource_id, schema_fields):
        """
        Update the data dictionary (column descriptions) in CKAN datastore using datastore_create.
        """
        try:
            # Fetch the current schema from the Datastore
            response = requests.post(
                f"{self.ckan_api_url}/datastore_search",
                json={"resource_id": resource_id, "limit": 0},
                headers={"Authorization": self.ckan_api_key}
            )
            if response.status_code != 200:
                self.logger.error(f"Failed to fetch current schema: {response.text}")
                return  # Exit the function if the schema fetch fails

            current_fields = response.json()["result"]["fields"]
            technical_fields = {"_id", "_full_text"}  # Exclude technical fields
            filtered_current_fields = [
                field for field in current_fields if field["id"] not in technical_fields
            ]

            # Merge existing fields with new descriptions
            updated_fields = []
            for field in schema_fields:
                field_id = field["id"]
                matching_field = next((f for f in filtered_current_fields if f["id"] == field_id), None)
                if matching_field:
                    matching_field["info"] = {
                        "label": field.get("id", ""),
                        "notes": field.get("description", "")
                    }
                    updated_fields.append(matching_field)
                else:
                    self.logger.warning(f"Field '{field_id}' not found in current schema. Skipping.")

            # Prepare payload for datastore_create
            update_payload = {
                "resource_id": resource_id,
                "fields": updated_fields,
                "force": True  # Override read-only restriction
            }

            # Update the schema
            response = requests.post(
                f"{self.ckan_api_url}/datastore_create",
                json=update_payload,
                headers={"Authorization": self.ckan_api_key}
            )
            if response.status_code == 200:
                self.logger.info("✅ Data dictionary updated successfully using datastore_create")
            else:
                self.logger.error(f"❌ Failed to update dictionary: {response.text}")
        except requests.exceptions.RequestException as e:
            self.logger.error(f"API request failed: {e}")
            if hasattr(e, "response"):
                self.logger.error(f"Response content: {e.response.text}")

    def generate_report(self):
        """Generate a report CSV file with the status of processed files."""
        report_columns = ["Filename", "Table Name", "File Loaded", "Metadata Loaded", "Data Dictionary Loaded"]
        try:
            with open(self.report_file, "w", encoding="utf-8") as f:
                # Write the header row
                f.write(",".join(report_columns) + "\n")
                # Write each row of data
                for row in self.report_data:
                    f.write(",".join(str(item) for item in row) + "\n")
            self.logger.info(f"Generated report: {self.report_file}")
        except Exception as e:
            self.logger.error(f"Failed to generate report: {str(e)}")

    def run(self, use_metadata_templates=True):
        """Run the loader to process all pending files."""
        self.logger.info("Starting file processing...")
        files = [f for f in os.listdir(self.pending_file_dir) 
                 if os.path.isfile(os.path.join(self.pending_file_dir, f)) 
                 and f.endswith((".csv", ".xlsx"))]
        self.logger.info(f"Found {len(files)} files to process")
        for filename in files:
            try:
                metadata_template = None
                if use_metadata_templates:
                    metadata_matches = [
                        f for f in os.listdir(self.pending_metadata_dir) 
                        if f.startswith(f"metadata_{os.path.splitext(filename)[0]}")
                    ]
                    if metadata_matches:
                        latest_metadata = max(metadata_matches)
                        metadata_path = os.path.join(self.pending_metadata_dir, latest_metadata)
                        with open(metadata_path, "r", encoding="utf-8") as f:
                            metadata_template = json.load(f)
                self.logger.info(f"Processing file: {filename}")
                if self.process_file(filename, metadata_template):
                    self.logger.info(f"Successfully processed {filename}")
                else:
                    self.logger.error(f"Failed to process {filename}")
            except Exception as e:
                self.logger.error(f"An unexpected error occurred while processing {filename}: {str(e)}")
        # Generate the final report
        self.generate_report()

if __name__ == "__main__":
    loader = CKANDatastoreLoader()
    loader.run(use_metadata_templates=True)

2025-02-07 20:52:04,819 - INFO: Starting file processing...
2025-02-07 20:52:04,819 - INFO: Found 1 files to process
2025-02-07 20:52:04,820 - INFO: Processing file: Memorials11.csv
2025-02-07 20:52:04,849 - ERROR: Missing required dataset fields: name, title, notes, owner_org. Cannot create dataset.
2025-02-07 20:52:04,849 - ERROR: Failed to process Memorials11.csv
2025-02-07 20:52:04,851 - INFO: Generated report: ./Completed\report\report_20250207_205204.csv
