# This is a sample Jupyter Notebook

Below is an example of a code cell. 
Put your cursor into the cell and press Shift+Enter to execute it and select the next one, or click 'Run Cell' button.

Press Double Shift to search everywhere for classes, files, tool windows, actions, and settings.

To learn more about Jupyter Notebooks in PyCharm, see [help](https://www.jetbrains.com/help/pycharm/ipython-notebook-support.html).
For an overview of PyCharm, go to Help -> Learn IDE features or refer to [our documentation](https://www.jetbrains.com/help/pycharm/getting-started.html).

In [None]:
print("Hello World!")


extrct data

In [None]:
import pandas as pd
import os
from pathlib import Path
import logging
import re
import json
from typing import Dict, List, Optional

# Setup logging with a detailed format
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(name)s - %(message)s'
)
logger = logging.getLogger(__name__)

class ABSDataProcessor:
    """Process ABS (Asset-Backed Securities) data from Excel files."""

    def __init__(
        self,
        input_dir: str = "data/total_data_demo1",
        output_dir: str = "processed_data",
        structured_dir: str = "structured_data"
    ):
        """
        Initialize the data processor.

        Args:
            input_dir: Directory containing input Excel files
            output_dir: Directory for processed individual files
            structured_dir: Directory for combined/structured output
        """
        self.input_dir = Path(input_dir)
        self.output_dir = Path(output_dir)
        self.structured_dir = Path(structured_dir)
        self.raw_data: Dict[str, pd.DataFrame] = {}
        self.processed_data: Dict[str, pd.DataFrame] = {}

        # Create necessary directories
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self.structured_dir.mkdir(parents=True, exist_ok=True)

    def normalize_columns(self, columns: List[str]) -> List[str]:
        """
        Normalize column names for consistent processing.

        Args:
            columns: List of column names to normalize

        Returns:
            List of normalized column names
        """
        normalized = []
        for c in columns:
            if not isinstance(c, str):
                c = str(c)
            c_norm = c.strip()
            c_norm = re.sub(r'\s+', '_', c_norm)  # Replace spaces with underscores
            c_norm = re.sub(r'[^\w\s-]', '', c_norm)  # Remove special characters
            normalized.append(c_norm)
        return normalized

    def process_excel(self, file_path: str) -> Optional[Dict[str, pd.DataFrame]]:
        """
        Process Excel file sheets and clean data.

        Args:
            file_path: Path to the Excel file

        Returns:
            Dictionary mapping sheet names to processed DataFrames, or None if processing fails
        """
        try:
            sheets = pd.read_excel(file_path, sheet_name=None, engine='openpyxl')
            processed_data = {}

            for sheet_name, df in sheets.items():
                # Normalize column names
                df.columns = self.normalize_columns(df.columns)

                # Drop fully empty rows and columns
                df = df.dropna(how='all')
                df = df.dropna(axis=1, how='all')

                # Handle Unnamed columns if present
                if 'Unnamed_0' in df.columns or any(col.startswith('Unnamed') for col in df.columns):
                    unnamed_cols = [col for col in df.columns if col.startswith('Unnamed')]
                    for unnamed_col in unnamed_cols:
                        parts = sheet_name.split('_', 1)
                        new_col_name = parts[1] if len(parts) > 1 else f"value_{unnamed_col.lower()}"
                        df.rename(columns={unnamed_col: new_col_name}, inplace=True)

                if df.empty:
                    logger.info(f"Sheet '{sheet_name}' is empty, skipping.")
                    continue

                # Add metadata columns
                df['source_file'] = Path(file_path).name
                df['sheet_name'] = sheet_name
                df['process_date'] = pd.Timestamp.now().strftime('%Y-%m-%d')

                logger.info(f"Processing sheet: {sheet_name}")
                processed_data[sheet_name] = df

            return processed_data

        except Exception as e:
            logger.error(f"Error processing file '{file_path}': {str(e)}")
            return None

    def save_data(
        self,
        processed_data: Dict[str, pd.DataFrame],
        output_dir: Path,
        prefix: str
    ) -> None:
        """
        Save processed data to CSV and JSON files.

        Args:
            processed_data: Dictionary of processed DataFrames
            output_dir: Directory to save the files
            prefix: Prefix for the output filenames
        """
        output_dir.mkdir(parents=True, exist_ok=True)

        for sheet_name, df in processed_data.items():
            try:
                # Clean sheet name for filename
                cleaned_sheet_name = re.sub(r'\W+', '_', sheet_name)
                base_name = f"{prefix}_{cleaned_sheet_name}"

                # Save as CSV
                csv_file = output_dir / f"{base_name}.csv"
                df.to_csv(csv_file, index=False, encoding='utf-8-sig')
                logger.info(f"Saved CSV: {csv_file}")

                # Save as JSON
                json_file = output_dir / f"{base_name}.json"
                df.to_json(json_file, orient='records', force_ascii=False, indent=2)
                logger.info(f"Saved JSON: {json_file}")

            except Exception as e:
                logger.error(f"Error saving sheet '{sheet_name}': {str(e)}")

    def process_directory(self, input_dir: Path, output_dir: Path) -> None:
        """
        Process all Excel files in the directory structure.

        Args:
            input_dir: Root directory containing Excel files
            output_dir: Directory to save processed files
        """
        combined_data = {}

        for root, _, files in os.walk(input_dir):
            for file in files:
                if file.endswith('.xlsx') and "资产池特征分布" in file:
                    file_path = Path(root) / file
                    relative_path = Path(root).relative_to(input_dir)
                    output_subdir = output_dir / relative_path
                    prefix = file_path.parent.name

                    logger.info(f"Processing file: {file_path}")
                    processed_data = self.process_excel(str(file_path))

                    if processed_data:
                        # Save individual files
                        self.save_data(processed_data, output_subdir, prefix)

                        # Accumulate data for combined files
                        for sheet_name, df in processed_data.items():
                            if sheet_name not in combined_data:
                                combined_data[sheet_name] = []
                            combined_data[sheet_name].append(df)

        # Create combined files
        self._save_combined_data(combined_data)

    def _save_combined_data(self, combined_data: Dict[str, List[pd.DataFrame]]) -> None:
        """
        Save combined data files.

        Args:
            combined_data: Dictionary mapping sheet names to lists of DataFrames
        """
        combined_dir = self.structured_dir / "combined"
        combined_dir.mkdir(parents=True, exist_ok=True)

        for sheet_name, dfs in combined_data.items():
            try:
                # Combine all DataFrames for this sheet type
                combined_df = pd.concat(dfs, ignore_index=True)

                # Clean sheet name for filename
                cleaned_name = re.sub(r'\W+', '_', sheet_name)
                output_file = combined_dir / f"combined_{cleaned_name}_distribution.csv"

                # Save combined file
                combined_df.to_csv(output_file, index=False, encoding='utf-8-sig')
                logger.info(f"Saved combined file: {output_file}")

            except Exception as e:
                logger.error(f"Error creating combined file for '{sheet_name}': {str(e)}")

class DataOrganizer:
    """Organizes processed data into structured formats."""

    def __init__(self, data_dir: str = "structured_data/combined"):
        """
        Initialize the data organizer.

        Args:
            data_dir: Directory containing combined CSV files
        """
        self.data_dir = Path(data_dir)
        self.raw_data: Dict[str, pd.DataFrame] = {}

        # Ensure directory exists
        self.data_dir.mkdir(parents=True, exist_ok=True)

    def load_raw_data(self) -> None:
        """Load all CSV files from the data directory."""
        for file_path in self.data_dir.glob("combined_*.csv"):
            try:
                # Extract distribution type from filename
                dist_type = re.search(r'combined_(.+)_distribution\.csv', file_path.name)
                if dist_type:
                    key = dist_type.group(1)

                    # Read CSV and validate required columns
                    df = pd.read_csv(file_path, encoding='utf-8')
                    if 'source_file' not in df.columns:
                        logger.warning(f"Missing 'source_file' column in {file_path}")
                        continue

                    # Clean the data
                    df = df.dropna(subset=['source_file'])

                    self.raw_data[key] = df
                    logger.info(f"Loaded {key} distribution data")

            except Exception as e:
                logger.error(f"Error loading {file_path}: {str(e)}")

    def _extract_product_details(self, filename: str) -> dict:
        """
        Extract product year and series from filename.

        Args:
            filename: Source filename to extract details from

        Returns:
            Dictionary containing year and series information
        """
        if pd.isna(filename):
            return {'year': None, 'series': None}

        match = re.search(r'(\d{2})(\dC)', str(filename))
        if match:
            return {
                'year': f"20{match.group(1)}",
                'series': match.group(2)
            }
        return {'year': None, 'series': None}

    def organize_by_product(self) -> Dict:
        """
        Organize data by product (year and series).

        Returns:
            Dictionary containing organized data structure
        """
        product_data = {}

        for dist_type, df in self.raw_data.items():
            # Skip empty DataFrames
            if df.empty:
                continue

            # Process each row
            for _, row in df.iterrows():
                try:
                    product = self._extract_product_details(row['source_file'])
                    year = product['year']
                    series = product['series']

                    if not year or not series:
                        continue

                    # Build nested dictionary structure
                    product_data.setdefault(year, {})
                    product_data[year].setdefault(series, {})
                    product_data[year][series].setdefault(dist_type, [])

                    # Add row data
                    row_dict = row.to_dict()
                    product_data[year][series][dist_type].append(row_dict)

                except Exception as e:
                    logger.error(f"Error processing row in {dist_type}: {str(e)}")
                    continue

        return product_data

    def save_structured_data(self, output_dir: str = "organized_data") -> None:
        """
        Save structured data to JSON files.

        Args:
            output_dir: Directory to save organized data files
        """
        output_path = Path(output_dir)
        output_path.mkdir(parents=True, exist_ok=True)

        try:
            # Get organized data
            product_data = self.organize_by_product()

            # Save by year and series
            for year in product_data:
                year_dir = output_path / year
                year_dir.mkdir(exist_ok=True)

                for series in product_data[year]:
                    filename = f"{year}_{series}_data.json"
                    file_path = year_dir / filename

                    with open(file_path, 'w', encoding='utf-8') as f:
                        json.dump(
                            product_data[year][series],
                            f,
                            ensure_ascii=False,
                            indent=2
                        )
                    logger.info(f"Saved structured data: {file_path}")

        except Exception as e:
            logger.error(f"Error saving structured data: {str(e)}")

def main():
    """Execute the complete data processing pipeline."""
    try:
        # Initialize processor
        processor = ABSDataProcessor(
            input_dir="data/total_data_demo1",
            output_dir="processed_data",
            structured_dir="structured_data"
        )

        # Process all Excel files
        processor.process_directory(
            input_dir=Path("data/total_data_demo1"),
            output_dir=Path("processed_data")
        )

        # Initialize organizer
        organizer = DataOrganizer(data_dir="structured_data/combined")

        # Load and organize the data
        organizer.load_raw_data()
        organizer.save_structured_data(output_dir="organized_data")

        logger.info("Data processing pipeline completed successfully")

    except Exception as e:
        logger.error(f"Pipeline execution failed: {str(e)}")

if __name__ == "__main__":
    main()