In [None]:
import pandas as pd
from google.oauth2.credentials import Credentials
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import gspread
import os
from typing import Tuple, Dict, List
from datetime import datetime

# Configuration
SHEET_NAMES = {
    'STOCK_INFLOW': 'stock_inflow',
    'RELEASE': 'release',
    'STOCK_INFLOW_CLEAN': 'stock_inflow_clean',
    'OPENING_STOCK': 'opening_stock',
    'RELEASE_CLEAN': 'release_clean',
    'SUMMARY': 'summary'
}

PRODUCT_TYPES = {
    'CHICKEN': 'chicken',
    'GIZZARD': 'gizzard'
}

class DataProcessingError(Exception):
    """Custom exception for data processing errors"""
    pass

def connect_to_sheets(credentials_file: str) -> gspread.Spreadsheet:
    """Connect to Google Sheets using provided credentials"""
    try:
        credentials = service_account.Credentials.from_service_account_file(
            credentials_file,
            scopes=['https://www.googleapis.com/auth/spreadsheets']
        )
        
        gc = gspread.authorize(credentials)
        spreadsheet_url = os.getenv('SPREADSHEET_URL')
        if not spreadsheet_url:
            raise DataProcessingError("SPREADSHEET_URL environment variable not set")
        return gc.open_by_url(spreadsheet_url)
    except Exception as e:
        raise DataProcessingError(f"Failed to connect to Google Sheets: {str(e)}")

def read_worksheet_to_df(spreadsheet: gspread.Spreadsheet, worksheet_name: str) -> pd.DataFrame:
    """Read worksheet data into pandas DataFrame"""
    try:
        worksheet = spreadsheet.worksheet(worksheet_name)
        all_values = worksheet.get_all_values()
        if not all_values:
            raise DataProcessingError(f"No data found in worksheet {worksheet_name}")
        headers = all_values[0]
        data = all_values[1:]
        return pd.DataFrame(data, columns=headers)
    except Exception as e:
        raise DataProcessingError(f"Failed to read worksheet {worksheet_name}: {str(e)}")

def standardize_dataframe(df: pd.DataFrame) -> pd.DataFrame:
    """Standardize column names and data types"""
    try:
        df_clean = df.copy()
        
        # Standardize column names
        df_clean.columns = (df_clean.columns.str.lower()
                          .str.strip()
                          .str.replace(' ', '_')
                          .str.replace('-', '_'))
        
        # Standardize data
        for column in df_clean.columns:
            df_clean[column] = df_clean[column].astype(str).str.strip().str.lower()
            
            # Try converting to numeric if possible
            try:
                numeric_values = pd.to_numeric(df_clean[column].str.replace(',', ''))
                df_clean[column] = numeric_values
            except (ValueError, TypeError):
                pass
        
        return df_clean
    except Exception as e:
        raise DataProcessingError(f"Failed to standardize dataframe: {str(e)}")

def standardize_dates(df: pd.DataFrame) -> pd.DataFrame:
    """Standardize date columns and add derived date columns"""
    if df.empty:
        return df
    
    try:
        df = df.copy()
        
        # Convert date column to datetime
        date_formats = ['%d %b %Y', '%d/%m/%y']
        for format in date_formats:
            try:
                df['date'] = pd.to_datetime(df['date'], format=format)
                break
            except ValueError:
                continue
        else:
            df['date'] = pd.to_datetime(df['date'], format='mixed', dayfirst=True)
        
        # Add derived date columns while keeping date as datetime
        df['month'] = df['date'].dt.strftime('%b').str.lower()
        df['year_month'] = df['date'].dt.strftime('%Y-%b')
        
        return df
    except Exception as e:
        raise DataProcessingError(f"Failed to standardize dates: {str(e)}")

def remove_opening_stock(df: pd.DataFrame, column_name: str) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Separate opening stock records from main data"""
    try:
        opening_stock_mask = df[column_name].str.contains('opening stock', case=False, na=False)
        opening_stock_df = df[opening_stock_mask].copy()
        main_df = df[~opening_stock_mask].copy()
        return main_df, opening_stock_df
    except Exception as e:
        raise DataProcessingError(f"Failed to separate opening stock: {str(e)}")

def create_summary_df(stock_inflow_df: pd.DataFrame, release_df: pd.DataFrame) -> pd.DataFrame:
    """Create summary DataFrame from stock inflow and release data"""
    try:
        # Get unique months and year_months
        all_months = sorted(list(set(stock_inflow_df['month'].unique()) | 
                               set(release_df['month'].unique())))
        all_year_months = sorted(list(set(stock_inflow_df['year_month'].unique()) | 
                                    set(release_df['year_month'].unique())))
        
        # Create base summary DataFrame
        summary_df = pd.DataFrame({
            'month': all_months,
            'year_month': all_year_months
        })
        
        # Calculate product summaries
        product_summaries = {
            'chicken_inflow': stock_inflow_df[
                stock_inflow_df['product_type'] == PRODUCT_TYPES['CHICKEN']
            ].groupby('month').agg({
                'quantity': 'sum',
                'weight': 'sum'
            }),
            'chicken_release': release_df[
                release_df['product'] == PRODUCT_TYPES['CHICKEN']
            ].groupby('month').agg({
                'quantity': 'sum',
                'weight_in_kg': 'sum'
            }),
            'gizzard_inflow': stock_inflow_df[
                stock_inflow_df['product_type'] == PRODUCT_TYPES['GIZZARD']
            ].groupby('month').agg({
                'weight': 'sum'
            }),
            'gizzard_release': release_df[
                release_df['product'] == PRODUCT_TYPES['GIZZARD']
            ].groupby('month').agg({
                'weight_in_kg': 'sum'
            })
        }
        
        # Map values to summary DataFrame
        summary_columns = {
            'total_chicken_inflow_quantity': ('chicken_inflow', 'quantity'),
            'total_chicken_inflow_weight': ('chicken_inflow', 'weight'),
            'total_chicken_release_quantity': ('chicken_release', 'quantity'),
            'total_chicken_release_weight': ('chicken_release', 'weight_in_kg'),
            'total_gizzard_inflow_weight': ('gizzard_inflow', 'weight'),
            'total_gizzard_release_weight': ('gizzard_release', 'weight_in_kg')
        }
        
        for col_name, (summary_key, metric) in summary_columns.items():
            if metric in product_summaries[summary_key].columns:
                summary_df[col_name] = summary_df['month'].map(
                    product_summaries[summary_key][metric]).fillna(0)
            else:
                summary_df[col_name] = 0
        
        return summary_df
    except Exception as e:
        raise DataProcessingError(f"Failed to create summary: {str(e)}")

def prepare_df_for_upload(df: pd.DataFrame) -> pd.DataFrame:
    """Prepare DataFrame for upload by converting all data types to proper format"""
    df_copy = df.copy()
    
    # Convert datetime columns to string format
    date_columns = df_copy.select_dtypes(include=['datetime64']).columns
    for col in date_columns:
        df_copy[col] = df_copy[col].dt.strftime('%Y-%m-%d')
    
    # Handle NaN values and convert all columns to string
    for col in df_copy.columns:
        # Convert NaN/None to empty string
        df_copy[col] = df_copy[col].fillna('')
        # Convert all values to string
        df_copy[col] = df_copy[col].astype(str)
        # Replace 'nan' strings with empty string
        df_copy[col] = df_copy[col].replace('nan', '')
    
    return df_copy

def upload_df_to_gsheet(df: pd.DataFrame, spreadsheet_id: str, 
                       sheet_name: str, credentials_file: str) -> bool:
    """Upload DataFrame to Google Sheets"""
    try:
        # Prepare data for upload
        df_to_upload = prepare_df_for_upload(df)
        
        credentials = service_account.Credentials.from_service_account_file(
            credentials_file,
            scopes=['https://www.googleapis.com/auth/spreadsheets']
        )
        
        service = build('sheets', 'v4', credentials=credentials)
        
        # Convert DataFrame to list of lists for upload
        values = [df_to_upload.columns.tolist()]
        # Convert all values to strings and replace any remaining NaN
        values.extend([[str(cell) if cell is not None and cell == cell else '' 
                       for cell in row] for row in df_to_upload.values.tolist()])
        
        # Clear existing content
        service.spreadsheets().values().clear(
            spreadsheetId=spreadsheet_id,
            range=f'{sheet_name}!A1:ZZ'
        ).execute()
        
        # Upload new content
        result = service.spreadsheets().values().update(
            spreadsheetId=spreadsheet_id,
            range=f'{sheet_name}!A1',
            valueInputOption='RAW',
            body={'values': values}
        ).execute()
        
        print(f"Updated {result.get('updatedCells')} cells in {sheet_name}")
        return True
        
    except Exception as e:
        print(f"Failed to upload to {sheet_name}: {str(e)}")
        return False



def process_sheets_data(stock_inflow_df: pd.DataFrame, 
                       release_df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, 
                                                        pd.DataFrame, pd.DataFrame]:
    """Process all sheet data and create necessary outputs"""
    try:
        # Initial standardization
        stock_inflow_df = standardize_dataframe(stock_inflow_df)
        release_df = standardize_dataframe(release_df)
        
        # Standardize dates
        stock_inflow_df = standardize_dates(stock_inflow_df)
        release_df = standardize_dates(release_df)
        
        # Remove opening stock records and get clean versions
        stock_inflow_main_df, opening_stock_df = remove_opening_stock(
            stock_inflow_df, 'purchasing_officer')
        release_df, _ = remove_opening_stock(release_df, 'name_of_collector')
        
        # Set quantity to 0 for gizzard products in release
        release_df.loc[
            release_df['product'].str.contains(PRODUCT_TYPES['GIZZARD'], 
                                             case=False, na=False), 
            'quantity'
        ] = 0
        
        # Create summary using the cleaned dataframes
        summary_df = create_summary_df(stock_inflow_main_df, release_df)
        
        return stock_inflow_main_df, opening_stock_df, release_df, summary_df
    
    except Exception as e:
        raise DataProcessingError(f"Failed to process sheets data: {str(e)}")

def main():
    CREDENTIALS_FILE = 'credentials.json'
    
    try:
        # Validate environment variables
        spreadsheet_id = os.getenv('SPREADSHEET_ID')
        if not spreadsheet_id:
            raise DataProcessingError("SPREADSHEET_ID environment variable not set")
        
        # Connect and read data
        spreadsheet = connect_to_sheets(CREDENTIALS_FILE)
        stock_inflow_df = read_worksheet_to_df(spreadsheet, SHEET_NAMES['STOCK_INFLOW'])
        release_df = read_worksheet_to_df(spreadsheet, SHEET_NAMES['RELEASE'])
        
        # Process data
        stock_inflow_main_df, opening_stock_df, release_df, summary_df = process_sheets_data(
            stock_inflow_df, release_df)
        
        # Upload all sheets
        upload_tasks = [
            (stock_inflow_main_df, SHEET_NAMES['STOCK_INFLOW_CLEAN']),
            (opening_stock_df, SHEET_NAMES['OPENING_STOCK']),
            (release_df, SHEET_NAMES['RELEASE_CLEAN']),
            (summary_df, SHEET_NAMES['SUMMARY'])
        ]
        
        success = True
        for df, sheet_name in upload_tasks:
            if not upload_df_to_gsheet(df, spreadsheet_id, sheet_name, CREDENTIALS_FILE):
                success = False
                print(f"Failed to upload {sheet_name}")
        
        if success:
            print("Data processing and upload completed successfully!")
        else:
            raise DataProcessingError("Failed to upload one or more datasets")
            
    except Exception as e:
        print(f"An error occurred: {str(e)}")
        raise

if __name__ == "__main__":
    main()