In [0]:
%pylab inline

In [0]:
import dataiku
from dataiku import pandasutils as pdu
import pandas as pd

In [0]:
# Example: load a DSS dataset as a Pandas dataframe
mydataset = dataiku.Dataset("mydataset")
mydataset_df = mydataset.get_dataframe()

In [0]:
import os
import shutil
import zipfile
import tempfile
import logging
import time
from io import BytesIO
import pandas as pd
import concurrent.futures
from functools import partial

import dataiku
from docx import Document
import pdfplumber
from pptx import Presentation

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import multiprocessing
from tqdm import tqdm  # For progress tracking

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


class BaseExtractor:
    """
    Base extractor that handles file reading from a Dataiku Folder.
    """
    def __init__(self, folder_id):
        self.data_source = dataiku.Folder(folder_id)
        # Cache for small file data to avoid repeated downloads
        self._file_cache = {}
        
    def get_file_data(self, file_path):
        """
        Reads file data from the Dataiku Folder with caching for small files.
        """
        if file_path in self._file_cache:
            return self._file_cache[file_path]
        
        with self.data_source.get_download_stream(file_path) as f:
            data = f.read()
            # Only cache files smaller than 10MB to avoid memory issues
            if len(data) < 10 * 1024 * 1024:
                self._file_cache[file_path] = data
            return data
    
    def get_file_size(self, file_path):
        """Get the size of a file without downloading its entire content."""
        try:
            file_info = self.data_source.get_path_details(file_path)
            return file_info.get('size', 0)
        except Exception as e:
            logger.warning(f"Could not get size for {file_path}: {e}")
            return 0


class WordExtractor(BaseExtractor):
    """
    Extracts text, tables, and images from Word documents.
    """
    def extract_text_and_tables(self, file_path):
        """
        Extracts text and table content from a Word document.
        """
        try:
            file_data = self.get_file_data(file_path)
            doc_stream = BytesIO(file_data)
            doc = Document(doc_stream)

            # Extract text from paragraphs
            text_content = [para.text.strip() for para in doc.paragraphs if para.text.strip()]

            # Extract table data
            table_data = []
            for table in doc.tables:
                table_content = []
                for row in table.rows:
                    row_data = [cell.text.strip() for cell in row.cells]
                    table_content.append(row_data)
                table_data.append(table_content)

            return {"text": "\n".join(text_content), "tables": table_data}
        except Exception as e:
            logger.error(f"Error reading Word document {file_path}: {e}")
            return {"error": f"Error reading Word document: {e}"}

    def extract_images(self, file_path, output_folder_id):
        """
        Extracts images from a Word document and saves them to a managed folder.
        Uses chunking for large files.
        """
        try:
            file_size = self.get_file_size(file_path)
            file_data = self.get_file_data(file_path)
            file_name = os.path.basename(file_path)
            base_name = os.path.splitext(file_name)[0]

            # Get the output folder for images
            image_folder = dataiku.Folder(output_folder_id)
            
            # List existing files in the output folder
            existing_images = set(image_folder.list_paths_in_partition())

            # Create a temporary directory for extraction
            temp_dir = tempfile.mkdtemp()
            temp_docx_path = os.path.join(temp_dir, file_name)
            with open(temp_docx_path, "wb") as temp_file:
                temp_file.write(file_data)

            images_extracted = []
            
            # Process ZIP file in chunks for efficiency
            with zipfile.ZipFile(temp_docx_path, "r") as docx_zip:
                # Get all media files first
                media_files = [f for f in docx_zip.namelist() if f.startswith("word/media/")]
                
                # Process media files in chunks to avoid memory issues with large files
                chunk_size = 20  # Adjust based on average image size
                for i in range(0, len(media_files), chunk_size):
                    media_chunk = media_files[i:i+chunk_size]
                    
                    for file_info_name in media_chunk:
                        image_name = os.path.basename(file_info_name)
                        image_path = f"{base_name}_{image_name}"
                        
                        # Skip if image already exists
                        if image_path in existing_images:
                            images_extracted.append(image_path)
                            continue
                            
                        image_data = docx_zip.read(file_info_name)
                        with image_folder.get_writer(image_path) as writer:
                            writer.write(image_data)
                        images_extracted.append(image_path)

            shutil.rmtree(temp_dir, ignore_errors=True)
            return images_extracted
        except Exception as e:
            logger.error(f"Error extracting images from {file_path}: {e}")
            return {"error": f"Error extracting images: {e}"}


class PDFExtractor(BaseExtractor):
    """
    Extracts text from PDF documents using chunking for large files.
    """
    def extract_text(self, file_path):
        """
        Extracts text content from a PDF file processing pages in chunks.
        """
        try:
            file_data = self.get_file_data(file_path)
            pdf_stream = BytesIO(file_data)
            
            text_chunks = []
            with pdfplumber.open(pdf_stream) as pdf:
                total_pages = len(pdf.pages)
                logger.info(f"Processing PDF {file_path} with {total_pages} pages")
                
                # Process pages in chunks to manage memory for large PDFs
                chunk_size = 20  # Number of pages to process at once
                for i in range(0, total_pages, chunk_size):
                    end_idx = min(i + chunk_size, total_pages)
                    logger.debug(f"Processing PDF pages {i+1}-{end_idx} of {total_pages}")
                    
                    # Extract text from the current chunk of pages
                    chunk_text = []
                    for page_num in range(i, end_idx):
                        page = pdf.pages[page_num]
                        extracted = page.extract_text()
                        if extracted:
                            chunk_text.append(extracted)
                    
                    text_chunks.extend(chunk_text)
            
            return "\n".join(text_chunks)
        except Exception as e:
            logger.error(f"Error reading PDF {file_path}: {e}")
            return f"Error reading PDF: {e}"


class PowerPointExtractor(BaseExtractor):
    """
    Extracts text from PowerPoint presentations.
    """
    def extract_text(self, file_path):
        """
        Extracts text from each slide in a PowerPoint file.
        """
        try:
            file_data = self.get_file_data(file_path)
            ppt_stream = BytesIO(file_data)
            prs = Presentation(ppt_stream)
            
            text_parts = []
            total_slides = len(prs.slides)
            logger.info(f"Processing PowerPoint {file_path} with {total_slides} slides")
            
            # Process slides in chunks for very large presentations
            for slide_idx, slide in enumerate(prs.slides):
                if slide_idx % 20 == 0:  # Log progress for large files
                    logger.debug(f"Processing slide {slide_idx+1}/{total_slides}")
                
                # Extract text from each shape that has text
                for shape in slide.shapes:
                    if hasattr(shape, "text") and shape.text:
                        text_parts.append(shape.text.strip())
                    
            return "\n".join(text_parts)
        except Exception as e:
            logger.error(f"Error reading PowerPoint {file_path}: {e}")
            return f"Error reading PowerPoint: {e}"


class SpreadsheetExtractor(BaseExtractor):
    """
    Extracts data from Excel and CSV files with chunking for large files.
    """
    def extract_data(self, file_path):
        try:
            file_size = self.get_file_size(file_path)
            file_data = self.get_file_data(file_path)
            file_ext = os.path.splitext(file_path)[1].lower()
            
            # Large file handling (>50MB)
            is_large_file = file_size > 50 * 1024 * 1024
            logger.info(f"Processing spreadsheet {file_path} (size: {file_size/1024/1024:.2f}MB, large file: {is_large_file})")

            if file_ext in ['.xls', '.xlsx']:
                result = {}
                
                if is_large_file:
                    # For large Excel files, process sheet by sheet
                    xl_file = pd.ExcelFile(file_data)
                    for sheet_name in xl_file.sheet_names:
                        logger.debug(f"Processing large Excel sheet: {sheet_name}")
                        
                        # Process the sheet in chunks
                        chunks = []
                        chunk_size = 10000  # Rows per chunk
                        for chunk_num, chunk_df in enumerate(pd.read_excel(
                                file_data, sheet_name=sheet_name, chunksize=chunk_size)):
                            chunks.append(chunk_df)
                            logger.debug(f"Processed chunk {chunk_num+1} of sheet {sheet_name}")
                        
                        # Combine chunks
                        if chunks:
                            sheet_df = pd.concat(chunks, ignore_index=True)
                            result[sheet_name] = sheet_df.to_dict(orient='records')
                        else:
                            result[sheet_name] = []
                else:
                    # For smaller files, read all at once
                    excel_data = pd.read_excel(file_data, sheet_name=None)
                    result = {sheet_name: df.to_dict(orient='records') 
                              for sheet_name, df in excel_data.items()}
                
                return result
                
            elif file_ext == '.csv':
                if is_large_file:
                    # Process large CSV in chunks
                    chunks = []
                    chunk_size = 50000  # Rows per chunk
                    for chunk_num, chunk_df in enumerate(pd.read_csv(
                            BytesIO(file_data), chunksize=chunk_size)):
                        chunks.append(chunk_df)
                        logger.debug(f"Processed CSV chunk {chunk_num+1}")
                    
                    # Combine chunks
                    if chunks:
                        df = pd.concat(chunks, ignore_index=True)
                        return {"Sheet1": df.to_dict(orient='records')}
                    return {"Sheet1": []}
                else:
                    # For smaller files, read all at once
                    df = pd.read_csv(BytesIO(file_data))
                    return {"Sheet1": df.to_dict(orient='records')}
            else:
                return {"error": f"Unsupported spreadsheet file type: {file_ext}"}
        except Exception as e:
            logger.error(f"Error reading spreadsheet {file_path}: {e}")
            return {"error": f"Error reading spreadsheet: {e}"}       
        

class FileProcessor:
    def __init__(self, folder_id, image_output_folder_id=None, max_workers=None, max_retries=3, 
                 retry_delay=1, chunk_size=25, large_file_threshold_mb=50):
        self.folder_id = folder_id
        self.image_output_folder_id = image_output_folder_id
        # Default to using about half the cores for worker threads to avoid overloading
        self.max_workers = max_workers or max(2, os.cpu_count() // 2)
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.chunk_size = chunk_size
        self.large_file_threshold_mb = large_file_threshold_mb
        
        # Shared extractors for efficiency
        self._pdf_extractor = PDFExtractor(folder_id)
        self._word_extractor = WordExtractor(folder_id)
        self._ppt_extractor = PowerPointExtractor(folder_id)
        self._spreadsheet_extractor = SpreadsheetExtractor(folder_id)
        
        logger.info(f"Initialized FileProcessor with {self.max_workers} workers, "
                   f"{self.max_retries} retry attempts, chunk size of {self.chunk_size}, "
                   f"and large file threshold of {self.large_file_threshold_mb}MB")
    
    def process_file(self, file_path, retry_count=0):
        """
        Process a single file with retry logic and chunking for large files.
        """
        file_name = os.path.basename(file_path)
        ext = os.path.splitext(file_name)[1].lower()
        result = {"file_name": file_name, "file_path": file_path}
        
        try:
            # Check file size to determine if special handling is needed
            file_size_mb = self._pdf_extractor.get_file_size(file_path) / (1024 * 1024)
            is_large_file = file_size_mb > self.large_file_threshold_mb
            
            if is_large_file:
                logger.info(f"Processing large file: {file_name} ({file_size_mb:.2f}MB)")

            if ext == ".pdf":
                result["text_content"] = self._pdf_extractor.extract_text(file_path)
                result["table_content"] = ""
                result["images_extracted"] = ""

            elif ext == ".docx":
                content = self._word_extractor.extract_text_and_tables(file_path)
                result["text_content"] = content.get("text", content.get("error", ""))
                result["table_content"] = str(content.get("tables", ""))
                
                if self.image_output_folder_id:
                    images = self._word_extractor.extract_images(file_path, self.image_output_folder_id)
                    result["images_extracted"] = str(images)
                else:
                    result["images_extracted"] = ""

            elif ext in [".ppt", ".pptx"]:
                result["text_content"] = self._ppt_extractor.extract_text(file_path)
                result["table_content"] = ""
                result["images_extracted"] = ""

            elif ext in [".xls", ".xlsx", ".csv"]:
                data = self._spreadsheet_extractor.extract_data(file_path)
                result["text_content"] = "" if "error" not in data else data["error"]
                # Convert to string representation, handling potentially large data
                if "error" not in data:
                    # Check for extremely large data structure before conversion
                    total_records = sum(len(sheet_data) for sheet_data in data.values())
                    if total_records > 100000:
                        # For very large data, provide a summary instead
                        sheet_summaries = [f"{sheet}: {len(records)} records" 
                                         for sheet, records in data.items()]
                        result["table_content"] = f"Large dataset: {total_records} total records across sheets: " + \
                                                 ", ".join(sheet_summaries)
                    else:
                        result["table_content"] = str(data)
                else:
                    result["table_content"] = ""
                result["images_extracted"] = ""

            else:
                result["text_content"] = f"Unsupported file type: {file_name}"
                result["table_content"] = ""
                result["images_extracted"] = ""

            # Check if there was an error message in the text_content
            if isinstance(result["text_content"], str) and result["text_content"].startswith("Error:"):
                raise Exception(result["text_content"])
                
            return result

        except Exception as e:
            logger.warning(f"Error processing file {file_name} (attempt {retry_count + 1}/{self.max_retries + 1}): {e}")
            
            # Check if we should retry
            if retry_count < self.max_retries:
                # Use exponential backoff for retry delay
                backoff_delay = self.retry_delay * (2 ** retry_count)
                logger.info(f"Retrying file {file_name} in {backoff_delay} seconds...")
                time.sleep(backoff_delay)
                return self.process_file(file_path, retry_count + 1)
            else:
                logger.error(f"Failed to process file {file_name} after {self.max_retries + 1} attempts")
                result["text_content"] = f"Error: {e} (failed after {self.max_retries + 1} attempts)"
                result["table_content"] = ""
                result["images_extracted"] = ""
                return result
    
    def _process_file_batch(self, file_batch):
        """Process a batch of files and return results."""
        results = []
        for idx, file_path in enumerate(file_batch):
            try:
                result = self.process_file(file_path)
                results.append(result)
                logger.info(f"Processed {idx+1}/{len(file_batch)} in current batch: {file_path}")
            except Exception as e:
                logger.error(f"Unexpected error with file {file_path}: {e}")
                results.append({
                    "file_name": os.path.basename(file_path),
                    "file_path": file_path,
                    "text_content": f"Fatal error: {e}",
                    "table_content": "",
                    "images_extracted": ""
                })
        return results
    
    def process_all_files(self, file_list):
        """
        Processes all files in batches for better performance and memory management.
        """
        logger.info(f"Starting processing of {len(file_list)} files in batches of {self.chunk_size}")
        
        # Split files into batches for processing
        file_batches = [file_list[i:i + self.chunk_size] 
                       for i in range(0, len(file_list), self.chunk_size)]
        
        all_results = []
        
        # Process each batch with progress tracking
        with tqdm(total=len(file_list), desc="Processing files") as pbar:
            # Process batches in parallel
            with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
                batch_futures = [executor.submit(self._process_file_batch, batch) 
                                for batch in file_batches]
                
                for future in as_completed(batch_futures):
                    try:
                        batch_results = future.result()
                        all_results.extend(batch_results)
                        pbar.update(len(batch_results))
                    except Exception as e:
                        logger.error(f"Error processing batch: {e}")
        
        logger.info(f"Completed processing {len(all_results)} files")
        return pd.DataFrame(all_results)


def distribute_files_by_size(file_list, extractor, num_groups=4):
    """
    Distribute files into groups balancing by file size to improve parallel processing.
    """
    # Get file sizes
    file_sizes = []
    for file_path in file_list:
        size = extractor.get_file_size(file_path)
        file_sizes.append((file_path, size))
    
    # Sort by size (descending)
    file_sizes.sort(key=lambda x: x[1], reverse=True)
    
    # Distribute files to balance size across groups
    groups = [[] for _ in range(num_groups)]
    group_sizes = [0] * num_groups
    
    for file_path, size in file_sizes:
        # Find the group with the smallest total size
        min_group = group_sizes.index(min(group_sizes))
        groups[min_group].append(file_path)
        group_sizes[min_group] += size
    
    return groups


def main(folder_id, output_folder_id=None):
    """Main function to demonstrate balanced parallel processing."""
    # Step 1: Initialize the processor
    processor = FileProcessor(
        folder_id=folder_id,
        image_output_folder_id=output_folder_id,
        max_workers=max(2, os.cpu_count() // 2),
        chunk_size=25,
        large_file_threshold_mb=50
    )
    
    # Step 2: Get the list of files
    folder = dataiku.Folder(folder_id)
    file_list = folder.list_paths_in_partition()
    
    # Step 3: Distribute files by size for balanced processing
    base_extractor = BaseExtractor(folder_id)
    file_groups = distribute_files_by_size(file_list, base_extractor, 
                                          num_groups=processor.max_workers)
    
    # Step 4: Process each group in parallel
    all_results = []
    with ThreadPoolExecutor(max_workers=processor.max_workers) as executor:
        group_futures = [executor.submit(processor.process_all_files, group) 
                        for group in file_groups]
        
        for future in as_completed(group_futures):
            try:
                group_results = future.result()
                all_results.append(group_results)
            except Exception as e:
                logger.error(f"Error processing group: {e}")
    
    # Combine results from all groups
    final_results = pd.concat(all_results, ignore_index=True)
    return final_results

main()