In [None]:
# SocketIO Upload Client for Jupyter Notebook
import socketio
import json
import os
import base64
import time
import uuid
import pandas as pd
from IPython.display import display
import logging
import threading
from typing import Optional, Dict, Any

# --- Configuration ---
PROJECT_ID = "80"
SERVER_URL = "http://localhost:5055"  # Note: http not ws for SocketIO
PROJECT_NAMESPACE = f"/project/{PROJECT_ID}"

LARGE_CSV_FILE_PATH = "generated_cells_data_large_1gb.csv"

# Upload parameters
CHUNK_SIZE = 256 * 1024  # 256KB chunks
DATASOURCE_NAME = None  # Set to None to use filename, or specify a name string
REPLACE_DATASOURCE = True  # Set to True to overwrite if datasource with same name exists

# Retry parameters
MAX_RETRIES = 50
RETRY_DELAY_SECONDS = 15  # Wait time between retries

# Configure logging for the client
logging.basicConfig(level=logging.INFO, format='%(asctime)s - SOCKETIO_CLIENT - %(levelname)s - %(message)s')
client_logger = logging.getLogger(__name__)

class SocketIOUploader:
    """
    SocketIO-based file uploader with resumability support.
    """
    
    def __init__(self, server_url: str, namespace: str, file_path: str, name: str, 
                 file_id: Optional[str] = None, view: str = "default", 
                 replace: bool = True, supplied_only: bool = False):
        """Initialize the SocketIO uploader."""
        self.server_url = server_url
        self.namespace = namespace
        self.file_path = file_path
        self.file_size = os.path.getsize(file_path)
        self.file_name = os.path.basename(file_path)
        self.name = name  # Datasource name
        self.view = view
        self.replace = replace
        self.supplied_only = supplied_only
        
        # File ID for tracking and resuming
        self.file_id = file_id or str(uuid.uuid4())
        client_logger.info(f"Initialized SocketIO uploader for file: {self.file_name}, File ID: {self.file_id}")
        
        # State variables
        self.progress = 0
        self.uploaded_bytes = 0
        self.resume_offset = 0
        self.start_time = None
        self.end_time = None
        self.processing_complete = False
        self.upload_transfer_complete = False
        self.server_will_process = False
        self.is_resuming = False
        self.final_result = None
        
        # Threading events
        self.connection_established = threading.Event()
        self.upload_acknowledged = threading.Event()
        self.server_responded_to_query = threading.Event()
        self.stop_event = threading.Event()
        self.lock = threading.Lock()
        
        # SocketIO client
        self.sio = socketio.Client(logger=False, engineio_logger=False)
        self._setup_event_handlers()
        
    def _setup_event_handlers(self):
        """Set up SocketIO event handlers."""
        
        @self.sio.on('connect', namespace=self.namespace)
        def on_connect():
            client_logger.info(f"Connected to SocketIO server at {self.server_url}{self.namespace}")
            self.connection_established.set()
            
        @self.sio.on('disconnect', namespace=self.namespace)
        def on_disconnect():
            client_logger.info("Disconnected from SocketIO server")
            
        @self.sio.on('connected', namespace=self.namespace)
        def on_connected(data):
            client_logger.info(f"Server connection acknowledged: {data}")
            
        @self.sio.on('upload_start_ack', namespace=self.namespace)
        def on_upload_start_ack(data):
            client_logger.info(f"Upload start acknowledged: {data}")
            if data.get('file_id') == self.file_id:
                self.upload_acknowledged.set()
                
        @self.sio.on('upload_resume_ack', namespace=self.namespace)
        def on_upload_resume_ack(data):
            client_logger.info(f"Upload resume acknowledged: {data}")
            if data.get('file_id') == self.file_id:
                self.resume_offset = data.get('received_bytes', 0)
                self.is_resuming = True
                self.upload_acknowledged.set()
                
        @self.sio.on('upload_end_ack', namespace=self.namespace)
        def on_upload_end_ack(data):
            client_logger.info(f"Upload end acknowledged: {data}")
            self.upload_transfer_complete = True
            
        @self.sio.on('upload_progress', namespace=self.namespace)
        def on_upload_progress(data):
            if data.get('file_id') == self.file_id:
                current_progress = data.get('progress', 0)
                if current_progress == 0 or current_progress == 100 or current_progress % 10 == 0:
                    if not hasattr(self, '_last_logged_progress') or current_progress > self._last_logged_progress:
                        client_logger.info(f"Upload progress: {current_progress}% ({data.get('received')} / {data.get('total')} bytes)")
                        self._last_logged_progress = current_progress
                        
        @self.sio.on('upload_processing_initiated', namespace=self.namespace)
        def on_upload_processing_initiated(data):
            client_logger.info(f"Server initiated processing: {data}")
            if data.get('file_id') == self.file_id:
                with self.lock:
                    self.processing_complete = False
                    self.server_will_process = True
                self.server_responded_to_query.set()
                
        @self.sio.on('upload_processing', namespace=self.namespace)
        def on_upload_processing(data):
            client_logger.info(f"Server processing file: {data}")
            with self.lock:
                self.processing_complete = False
                
        @self.sio.on('upload_success', namespace=self.namespace)
        def on_upload_success(data):
            client_logger.info(f"Processing successful: {data}")
            if data.get('file_id') == self.file_id:
                with self.lock:
                    self.processing_complete = True
                    self.final_result = data
                    
        @self.sio.on('upload_error', namespace=self.namespace)
        def on_upload_error(data):
            client_logger.error(f"Server error: {data}")
            if data.get('file_id') == self.file_id or not data.get('file_id'):
                with self.lock:
                    self.processing_complete = True
                    self.final_result = data
                    
        @self.sio.on('upload_resume_info', namespace=self.namespace)
        def on_upload_resume_info(data):
            client_logger.info(f"Resume info received: {data}")
            if data.get('file_id') == self.file_id:
                self.resume_offset = data.get('received_bytes', 0)
                self.is_resuming = True
                self.upload_transfer_complete = False
                self.server_will_process = False
                self.server_responded_to_query.set()
                
        @self.sio.on('upload_not_found', namespace=self.namespace)
        def on_upload_not_found(data):
            client_logger.info(f"Server does not have state for file: {data}")
            if data.get('file_id') == self.file_id:
                self.resume_offset = 0
                self.is_resuming = False
                self.upload_transfer_complete = False
                self.server_will_process = False
                self.server_responded_to_query.set()
                
        @self.sio.on('pong', namespace=self.namespace)
        def on_pong(data):
            client_logger.debug("Received pong from server")
            
    def _query_upload_status(self) -> bool:
        """Query server for upload status. Returns True if client needs to send data."""
        self.server_will_process = False
        self.server_responded_to_query.clear()
        
        query_msg = {"file_id": self.file_id}
        client_logger.info(f"Querying server status for file_id: {self.file_id}")
        
        try:
            self.sio.emit('upload_query', query_msg, namespace=self.namespace)
        except Exception as e:
            client_logger.error(f"Failed to send query: {e}")
            with self.lock:
                self.processing_complete = True
                self.final_result = {'type': 'error', 'message': f'Failed to send query: {e}'}
            return False
            
        # Wait for response
        if self.server_responded_to_query.wait(timeout=20):
            with self.lock:
                if self.server_will_process:
                    client_logger.info("Server will handle processing. No data transfer needed.")
                    return False
            client_logger.info(f"Query processed. Resuming: {self.is_resuming}, Offset: {self.resume_offset}")
            return True
        else:
            client_logger.error("Timeout waiting for server response to query")
            self.resume_offset = 0
            self.is_resuming = False
            return True
            
    def _start_upload(self):
        """Send upload start message."""
        self.start_time = time.time()
        start_msg = {
            "file_id": self.file_id,
            "filename": self.file_name,
            "size": self.file_size,
            "content_type": "text/csv",
            "name": self.name,
            "view": self.view,
            "replace": self.replace,
            "supplied_only": self.supplied_only
        }
        client_logger.info(f"Sending upload start for file ID {self.file_id}")
        self.sio.emit('upload_start', start_msg, namespace=self.namespace)
        
    def _send_file_chunks(self):
        """Send file chunks to server."""
        chunk_num = 0
        bytes_sent_this_session = 0
        client_logger.info(f"Starting file transmission from offset {self.resume_offset}")
        
        try:
            with open(self.file_path, 'rb') as file:
                if self.is_resuming and self.resume_offset > 0:
                    if self.resume_offset >= self.file_size:
                        client_logger.warning(f"Resume offset {self.resume_offset} >= file size {self.file_size}")
                        self.uploaded_bytes = self.resume_offset
                        return
                    client_logger.info(f"Seeking to resume offset: {self.resume_offset}")
                    file.seek(self.resume_offset)
                    self.uploaded_bytes = self.resume_offset
                else:
                    self.uploaded_bytes = 0
                    
                while not self.stop_event.is_set():
                    if self.uploaded_bytes >= self.file_size:
                        client_logger.info("File transfer complete")
                        break
                        
                    chunk = file.read(CHUNK_SIZE)
                    if not chunk:
                        client_logger.info("Reached end of file")
                        break
                        
                    bytes_to_send = len(chunk)
                    if self.uploaded_bytes + bytes_to_send > self.file_size:
                        bytes_to_send = self.file_size - self.uploaded_bytes
                        chunk = chunk[:bytes_to_send]
                        
                    if bytes_to_send <= 0:
                        break
                        
                    chunk_b64 = base64.b64encode(chunk).decode('utf-8')
                    chunk_msg = {
                        "file_id": self.file_id,
                        "chunk_num": chunk_num,
                        "data": chunk_b64
                    }
                    
                    self.sio.emit('upload_chunk', chunk_msg, namespace=self.namespace)
                    
                    bytes_sent_this_session += bytes_to_send
                    self.uploaded_bytes += bytes_to_send
                    chunk_num += 1
                    
                    # Small delay to prevent overwhelming the server
                    time.sleep(0.01)
                    
            client_logger.info(f"Finished sending chunks. Total bytes uploaded: {self.uploaded_bytes}")
            
        except Exception as e:
            client_logger.exception(f"Error sending file chunks: {e}")
            raise
            
    def _end_upload(self):
        """Send upload end message."""
        if self.uploaded_bytes >= self.file_size:
            end_msg = {"file_id": self.file_id}
            client_logger.info(f"Sending upload end for file_id: {self.file_id}")
            self.sio.emit('upload_end', end_msg, namespace=self.namespace)
        else:
            client_logger.warning(f"Upload incomplete ({self.uploaded_bytes}/{self.file_size})")
            
    def upload(self):
        """Main upload method with improved error handling."""
        client_logger.info(f"Starting SocketIO upload process for file ID: {self.file_id}")
        
        # Reset state
        self.stop_event.clear()
        self.processing_complete = False
        self.upload_transfer_complete = False
        self.server_will_process = False
        self.final_result = None
        self.connection_established.clear()
        self.upload_acknowledged.clear()
        self.server_responded_to_query.clear()
        self._last_logged_progress = -1
        
        try:
            # Ensure we're disconnected first
            try:
                if self.sio.connected:
                    self.sio.disconnect()
                time.sleep(1)  # Brief pause before reconnecting
            except Exception:
                pass
            
            # Connect to server with retries
            connection_attempts = 3
            for conn_attempt in range(connection_attempts):
                try:
                    client_logger.info(f"Connection attempt {conn_attempt + 1}/{connection_attempts}")
                    self.sio.connect(self.server_url, namespaces=[self.namespace])
                    
                    # Wait for connection with timeout
                    if self.connection_established.wait(timeout=15):
                        client_logger.info("Connection established successfully")
                        break
                    else:
                        raise Exception("Connection timeout")
                        
                except Exception as e:
                    client_logger.warning(f"Connection attempt {conn_attempt + 1} failed: {e}")
                    if conn_attempt < connection_attempts - 1:
                        time.sleep(2)  # Wait before retry
                        continue
                    else:
                        raise Exception(f"Failed to establish connection after {connection_attempts} attempts")
            
            # Verify connection is still active
            if not self.sio.connected:
                raise Exception("Connection lost after establishment")
            
            # Query status
            proceed_with_upload = self._query_upload_status()
            
            if proceed_with_upload:
                client_logger.info("Proceeding with file data transfer")
                
                # Start upload
                self._start_upload()
                
                # Wait for acknowledgment
                if not self.upload_acknowledged.wait(timeout=15):
                    raise Exception("Did not receive upload acknowledgment")
                    
                # Send file chunks with connection monitoring
                self._send_file_chunks()
                
                # End upload
                if not self.stop_event.is_set() and self.uploaded_bytes >= self.file_size:
                    self._end_upload()
                    
            else:
                with self.lock:
                    should_wait = self.server_will_process
                if should_wait:
                    client_logger.info("Server will handle processing. Waiting for completion.")
                else:
                    client_logger.info("No data transfer needed.")
                    return False, {'type': 'info', 'message': 'No transfer needed'}
                    

            # --- MODIFICATION START ---
            # Wait for processing to complete, but fail fast on disconnection.
            timeout_seconds = 3600  # 1 hour timeout
            start_wait_time = time.time()
            
            # This flag is set to True once we are in the "waiting for processing" phase
            is_waiting_for_server_processing = self.upload_transfer_complete or self.server_will_process

            while not self.processing_complete and not self.stop_event.is_set():
                if time.time() - start_wait_time > timeout_seconds:
                    client_logger.error(f"Timeout ({timeout_seconds}s) waiting for processing")
                    raise Exception("Client timeout waiting for completion")

                # If we are waiting for the server to process the file and the connection drops,
                # we must fail the attempt so the main retry loop can re-query the status.
                if is_waiting_for_server_processing and not self.sio.connected:
                    client_logger.error("Connection lost while waiting for server to process the file.")
                    raise ConnectionError("Connection lost while waiting for processing result")
                    
                time.sleep(1)
            
            # --- MODIFICATION END ---
                    
            success = self.final_result is not None and self.final_result.get('type') in ['success', 'upload_success']
            return success, self.final_result
            
        except Exception as e:
            client_logger.exception(f"Error during upload: {e}")
            with self.lock:
                if not self.processing_complete:
                    self.processing_complete = True
                    self.final_result = {'type': 'error', 'message': f'Upload error: {e}'}
            return False, self.final_result
            
        finally:
            try:
                if self.sio.connected:
                    self.sio.disconnect()
            except Exception:
                pass


    def cancel_upload(self):
        """Cancel the current upload."""
        try:
            cancel_msg = {"file_id": self.file_id}
            self.sio.emit('upload_cancel', cancel_msg, namespace=self.namespace)
            self.stop_event.set()
        except Exception as e:
            client_logger.error(f"Error cancelling upload: {e}")

# --- Helper Functions ---
def preview_csv(file_path):
    """Preview CSV file contents."""
    if not os.path.exists(file_path):
        client_logger.error(f"File not found: {file_path}")
        return None
        
    try:
        df = pd.read_csv(file_path, nrows=10)
        file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
        client_logger.info(f"CSV File: {os.path.basename(file_path)} | Size: {file_size_mb:.2f} MB")
        print("\n--- CSV Preview (first 5 rows) ---")
        display(df.head())
        print("----------------------------------\n")
        return True
    except Exception as e:
        client_logger.error(f"Error previewing CSV: {e}")
        return False

# --- Main Execution Logic ---
if __name__ == "__main__" and "get_ipython" in locals():
    print(f"--- Starting SocketIO Large File Upload Script ---")
    print(f"Target Server: {SERVER_URL}{PROJECT_NAMESPACE}")
    print(f"File to Upload: {LARGE_CSV_FILE_PATH}")

    file_id_for_upload = str(uuid.uuid4())
    
    if not os.path.exists(LARGE_CSV_FILE_PATH):
        print(f"\nERROR: File not found at {LARGE_CSV_FILE_PATH}")
    else:
        preview_csv(LARGE_CSV_FILE_PATH)
        ds_name = DATASOURCE_NAME or os.path.splitext(os.path.basename(LARGE_CSV_FILE_PATH))[0]
        print(f"Using Datasource Name: {ds_name}")
        print(f"Replace if exists: {REPLACE_DATASOURCE}")
        
        overall_success = False
        final_upload_result = None
        
        for attempt in range(MAX_RETRIES):
            client_logger.info(f"--- Upload Attempt {attempt + 1} of {MAX_RETRIES} ---")
            
            # Generate a new file ID for each retry to avoid server state conflicts
            
            print(f"Generated File ID for attempt {attempt + 1}: {file_id_for_upload}")
            
            uploader = SocketIOUploader(
                server_url=SERVER_URL,
                namespace=PROJECT_NAMESPACE,
                file_path=LARGE_CSV_FILE_PATH,
                name=ds_name,
                file_id=file_id_for_upload,
                replace=REPLACE_DATASOURCE,
                supplied_only=False
            )
            
            success, result = uploader.upload()
            final_upload_result = result
            
            if success:
                client_logger.info(f"Attempt {attempt + 1} successful!")
                overall_success = True
                break
            else:
                client_logger.warning(f"Upload attempt {attempt + 1} failed: {result}")
                
                # Improved error classification for retries
                should_retry = False
                if result and 'message' in result:
                    msg_lower = result['message'].lower()
                    
                    # Expanded list of retriable error patterns
                    retriable_error_patterns = [
                        'connection', 'timeout', 'websocket error', 'disconnect', 
                        'namespace', 'network', 'socket', 'broken pipe', 
                        'connection reset', 'connection refused', 'connection aborted',
                        'bad namespace', 'not a connected namespace'
                    ]
                    
                    # Check if this is a retriable error
                    if any(pattern in msg_lower for pattern in retriable_error_patterns):
                        should_retry = True
                        client_logger.info(f"Detected retriable error: {result['message']}")
                    else:
                        client_logger.warning(f"Non-retriable error detected: {result['message']}")
                
                # Always retry unless it's the last attempt or explicitly non-retriable
                if attempt < MAX_RETRIES - 1:
                    if should_retry:
                        client_logger.info(f"Retrying in {RETRY_DELAY_SECONDS} seconds... (attempt {attempt + 1}/{MAX_RETRIES})")
                        time.sleep(RETRY_DELAY_SECONDS)
                    else:
                        # Even for "non-retriable" errors, give it a few more tries for large files
                        if attempt < 3:  # Allow at least 3 attempts even for "non-retriable" errors
                            client_logger.info(f"Retrying anyway for large file in {RETRY_DELAY_SECONDS} seconds...")
                            time.sleep(RETRY_DELAY_SECONDS)
                        else:
                            client_logger.error("Non-retriable error and max retry attempts for non-retriable reached")
                            break
                else:
                    client_logger.error("Max retries reached")
                    break