# 1. Configurations

In [None]:
# Cell 1: Class Definition
"""
# BigQuery Connector Utility Class

This notebook contains the main BigQueryConnector class.
Import this notebook into other notebooks using: %run utils/bigquery_connector.ipynb
"""


## 1.1 Import Library

In [None]:
import os
import pandas as pd
from google.cloud import bigquery
from typing import List, Optional
from IPython.display import display, HTML
import json
import inspect
import types
import sys

## 1.2 Main Class

In [None]:
# Cell 3: Main Class
class BigQueryConnector:
    """
    Professional BigQuery Connector Class for Jupyter Notebooks
    
    Features:
    - Easy connection setup
    - Interactive data exploration
    - Query validation and execution
    - DataFrame integration
    - Notebook-friendly outputs
    """
    
    def __init__(self):
        """Initialize BigQuery client with environment variables"""
        try:
            # Setup credentials
            credentials_path = os.getenv('GOOGLE_APPLICATION_CREDENTIALS')
            if credentials_path:
                os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_path
                print(f"✓ Credentials loaded from: {credentials_path}")
            
            # Initialize client
            self.project_id = os.getenv('PROJECT_ID')
            self.dataset_id = os.getenv('DATASET_ID')
            
            if not self.project_id:
                raise ValueError("PROJECT_ID not found in environment variables")
            
            self.client = bigquery.Client(project=self.project_id)
            print(f"✓ BigQuery client initialized for project: {self.project_id}")
            
        except Exception as e:
            print(f"✗ Initialization failed: {e}")
            raise
    
    def test_connection(self) -> bool:
        """Test BigQuery connection with simple query"""
        try:
            query = "SELECT 1 as test, CURRENT_DATETIME() as timestamp"
            result = self.client.query(query).to_dataframe()
            print("✓ Connection successful!")
            print(f"📅 Server time: {result.iloc[0]['timestamp']}")
            return True
        except Exception as e:
            print(f"✗ Connection failed: {e}")
            return False
    
    def get_tables(self) -> List[str]:
        """List all tables in the configured dataset"""
        try:
            if not self.dataset_id:
                print("⚠️ DATASET_ID not configured. Use list_datasets() to see available datasets.")
                return []
            
            dataset_ref = self.client.dataset(self.dataset_id)
            tables = list(self.client.list_tables(dataset_ref))
            table_names = [table.table_id for table in tables]
            
            print(f"📊 Dataset: {self.dataset_id}")
            print(f"📋 Available tables ({len(table_names)}):")
            for i, table in enumerate(table_names, 1):
                print(f"  {i:2d}. {table}")
            
            return table_names
        except Exception as e:
            print(f"✗ Failed to list tables: {e}")
            return []
    
    def query_data(self, query: str, dry_run: bool = False) -> Optional[pd.DataFrame]:
        """
        Execute query and return DataFrame
        
        Args:
            query (str): SQL query to execute
            dry_run (bool): If True, only validate query without running
        """
        try:
            if dry_run:
                job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
                job = self.client.query(query, job_config=job_config)
                print(f"✓ Query validation successful")
                print(f"📊 Estimated bytes processed: {job.total_bytes_processed:,}")
                print(f"💰 Estimated cost: ${(job.total_bytes_processed / 1024**4) * 5:.4f}")
                return None
            
            print("🔄 Executing query...")
            df = self.client.query(query).to_dataframe()
            print(f"✓ Query executed successfully")
            print(f"📊 Rows returned: {len(df):,}")
            print(f"📋 Columns: {len(df.columns)}")
            print(f"💾 Memory usage: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
            return df
            
        except Exception as e:
            print(f"✗ Query failed: {e}")
            return None
    
    def get_table_info(self, table_name: str) -> Optional[bigquery.Table]:
        """Get comprehensive table information including schema and sample data"""
        try:
            if not self.dataset_id:
                print("⚠️ DATASET_ID not configured")
                return None
                
            table_ref = self.client.dataset(self.dataset_id).table(table_name)
            table = self.client.get_table(table_ref)
            
            # Basic table info
            print(f"\n" + "="*60)
            print(f"📊 TABLE INFORMATION: {table_name}")
            print(f"="*60)
            print(f"📈 Total Rows: {table.num_rows:,}")
            print(f"💾 Size: {table.num_bytes / (1024*1024):.2f} MB")
            print(f"📅 Created: {table.created}")
            print(f"🔄 Modified: {table.modified}")
            
            # Description if available
            if table.description:
                print(f"📝 Description: {table.description}")
            
            # Schema information
            print(f"\n📋 SCHEMA ({len(table.schema)} columns):")
            print(f"{'No.':<4} {'Column Name':<25} {'Data Type':<15} {'Mode':<10} {'Description':<30}")
            print("-" * 90)
            
            for i, field in enumerate(table.schema, 1):
                mode = field.mode if field.mode else "NULLABLE"
                description = field.description[:30] if field.description else ""
                print(f"{i:<4} {field.name:<25} {field.field_type:<15} {mode:<10} {description:<30}")
            
            # Sample data
            print(f"\n🔍 SAMPLE DATA (first 5 rows):")
            sample_df = self.quick_query(table_name, limit=5)
            if sample_df is not None and not sample_df.empty:
                # Display with better formatting
                display(sample_df)
            
            return table
            
        except Exception as e:
            print(f"✗ Failed to get table info: {e}")
            return None
    
    def list_datasets(self) -> List[str]:
        """List all datasets in the project"""
        try:
            datasets = list(self.client.list_datasets())
            
            print(f"📁 PROJECT: {self.project_id}")
            print(f"📂 Available datasets ({len(datasets)}):")
            
            for i, dataset in enumerate(datasets, 1):
                # Get dataset info
                dataset_obj = self.client.get_dataset(dataset.dataset_id)
                tables_count = len(list(self.client.list_tables(dataset_obj)))
                
                print(f"  {i:2d}. {dataset.dataset_id:<30} ({tables_count} tables)")
                
            return [dataset.dataset_id for dataset in datasets]
            
        except Exception as e:
            print(f"✗ Failed to list datasets: {e}")
            return []
    
    def quick_query(self, table_name: str, limit: int = 10) -> Optional[pd.DataFrame]:
        """Quick query to preview table data"""
        if not self.dataset_id:
            print("⚠️ DATASET_ID not configured")
            return None
            
        query = f"""
            SELECT *
            FROM `{self.project_id}.{self.dataset_id}.{table_name}`
            LIMIT {limit}
        """
        return self.query_data(query)
    
    def get_table_schema(self, table_name: str) -> Optional[List[dict]]:
        """Get table schema as a list of dictionaries"""
        try:
            if not self.dataset_id:
                print("⚠️ DATASET_ID not configured")
                return None
                
            table_ref = self.client.dataset(self.dataset_id).table(table_name)
            table = self.client.get_table(table_ref)
            
            schema_list = []
            for field in table.schema:
                schema_list.append({
                    'name': field.name,
                    'type': field.field_type,
                    'mode': field.mode or 'NULLABLE',
                    'description': field.description or ''
                })
            
            return schema_list
            
        except Exception as e:
            print(f"✗ Failed to get schema: {e}")
            return None
    
    def count_rows(self, table_name: str, where_clause: str = "") -> Optional[int]:
        """Count rows in a table with optional WHERE clause"""
        try:
            if not self.dataset_id:
                print("⚠️ DATASET_ID not configured")
                return None
            
            where_part = f"WHERE {where_clause}" if where_clause else ""
            query = f"""
                SELECT COUNT(*) as row_count
                FROM `{self.project_id}.{self.dataset_id}.{table_name}`
                {where_part}
            """
            
            result = self.query_data(query)
            if result is not None:
                count = result.iloc[0]['row_count']
                print(f"📊 Row count for {table_name}: {count:,}")
                return count
            return None
            
        except Exception as e:
            print(f"✗ Failed to count rows: {e}")
            return None
    
    def get_column_stats(self, table_name: str, column_name: str) -> Optional[pd.DataFrame]:
        """Get basic statistics for a specific column"""
        try:
            if not self.dataset_id:
                print("⚠️ DATASET_ID not configured")
                return None
            
            query = f"""
                SELECT 
                    COUNT(*) as total_count,
                    COUNT({column_name}) as non_null_count,
                    COUNT(DISTINCT {column_name}) as unique_count,
                    MIN({column_name}) as min_value,
                    MAX({column_name}) as max_value
                FROM `{self.project_id}.{self.dataset_id}.{table_name}`
            """
            
            stats = self.query_data(query)
            if stats is not None:
                print(f"📊 Statistics for column '{column_name}' in table '{table_name}':")
                display(stats)
            return stats
            
        except Exception as e:
            print(f"✗ Failed to get column stats: {e}")
            return None
    
    def search_tables(self, search_term: str) -> List[str]:
        """Search for tables containing the search term in their name"""
        try:
            if not self.dataset_id:
                print("⚠️ DATASET_ID not configured")
                return []
            
            all_tables = self.get_tables()
            matching_tables = [table for table in all_tables if search_term.lower() in table.lower()]
            
            print(f"🔍 Tables matching '{search_term}':")
            for i, table in enumerate(matching_tables, 1):
                print(f"  {i}. {table}")
            
            return matching_tables
            
        except Exception as e:
            print(f"✗ Search failed: {e}")
            return []
    
    def export_table_to_csv(self, table_name: str, filename: str = None, limit: int = None) -> str:
        """Export table data to CSV file"""
        try:
            if not self.dataset_id:
                print("⚠️ DATASET_ID not configured")
                return None
            
            # Prepare query
            limit_clause = f"LIMIT {limit}" if limit else ""
            query = f"""
                SELECT *
                FROM `{self.project_id}.{self.dataset_id}.{table_name}`
                {limit_clause}
            """
            
            # Get data
            df = self.query_data(query)
            if df is None:
                return None
            
            # Prepare filename
            if not filename:
                filename = f"{table_name}_export"
            
            # Save to CSV
            save_query_result(df, filename, format='csv')
            return f"../outputs/exports/{filename}.csv"
            
        except Exception as e:
            print(f"✗ Export failed: {e}")
            return None
    
    def validate_query(self, query: str) -> bool:
        """Validate SQL query syntax without executing"""
        return self.query_data(query, dry_run=True) is not None
    
    def select_table_by_index(self, index: int) -> Optional[str]:
        """Select table by index (0-based)"""
        try:
            tables = self.get_tables()
            if 0 <= index < len(tables):
                selected_table = tables[index]
                print(f"✅ Selected table at index {index}: {selected_table}")
                return selected_table
            else:
                print(f"❌ Index {index} out of range. Available: 0-{len(tables)-1}")
                return None
        except Exception as e:
            print(f"✗ Failed to select table: {e}")
            return None
    
    def select_table_by_number(self, number: int) -> Optional[str]:
        """Select table by display number (1-based)"""
        try:
            tables = self.get_tables()
            if 1 <= number <= len(tables):
                selected_table = tables[number - 1]  # Convert to 0-based index
                print(f"✅ Selected table #{number}: {selected_table}")
                return selected_table
            else:
                print(f"❌ Invalid number. Choose between 1-{len(tables)}")
                return None
        except Exception as e:
            print(f"✗ Failed to select table: {e}")
            return None
    
    def explore_table_by_index(self, index: int):
        """Select and explore table by index in one go"""
        selected_table = self.select_table_by_index(index)
        if selected_table:
            print(f"\n🔍 Exploring table: {selected_table}")
            self.get_table_info(selected_table)
            sample_df = self.quick_query(selected_table, limit=5)
            if sample_df is not None:
                display(sample_df)
        return selected_table
    
    def extract_table_data(self, table_name: str, 
                          columns: list = None, 
                          where_clause: str = None, 
                          order_by: str = None, 
                          limit: int = None,
                          sample_percent: float = None) -> Optional[pd.DataFrame]:
        """
        Extract data from a single table with flexible options
        
        Args:
            table_name (str): Name of the table to extract from
            columns (list): List of columns to select (default: all columns)
            where_clause (str): WHERE condition (without 'WHERE' keyword)
            order_by (str): ORDER BY clause (without 'ORDER BY' keyword)
            limit (int): Maximum number of rows to return
            sample_percent (float): Percentage of data to sample (0.1 = 10%)
        
        Returns:
            pd.DataFrame: Extracted data or None if failed
            
        Examples:
            # Extract all data
            df = bq.extract_table_data('customers')
            
            # Extract specific columns
            df = bq.extract_table_data('customers', columns=['id', 'name', 'email'])
            
            # Extract with conditions
            df = bq.extract_table_data('orders', 
                                     where_clause="status = 'completed' AND amount > 100",
                                     order_by="created_at DESC",
                                     limit=1000)
            
            # Extract sample data
            df = bq.extract_table_data('large_table', sample_percent=1.0)
        """
        try:
            if not self.dataset_id:
                print("⚠️ DATASET_ID not configured")
                return None
            
            # Build SELECT clause
            if columns:
                # Validate column names (simple check)
                clean_columns = [col.strip() for col in columns]
                select_clause = ", ".join(clean_columns)
            else:
                select_clause = "*"
            
            # Build FROM clause
            from_clause = f"`{self.project_id}.{self.dataset_id}.{table_name}`"
            
            # Add TABLESAMPLE if sample_percent provided
            if sample_percent:
                if 0 < sample_percent <= 100:
                    from_clause += f" TABLESAMPLE SYSTEM ({sample_percent} PERCENT)"
                else:
                    print("⚠️ sample_percent must be between 0 and 100")
                    return None
            
            # Build WHERE clause
            where_part = f"WHERE {where_clause}" if where_clause else ""
            
            # Build ORDER BY clause
            order_part = f"ORDER BY {order_by}" if order_by else ""
            
            # Build LIMIT clause
            limit_part = f"LIMIT {limit}" if limit else ""
            
            # Construct final query
            query = f"""
                SELECT {select_clause}
                FROM {from_clause}
                {where_part}
                {order_part}
                {limit_part}
            """.strip()
            
            print(f"🔄 Extracting data from table: {table_name}")
            if where_clause:
                print(f"📋 Filter: {where_clause}")
            if columns:
                print(f"📊 Columns: {len(columns)} selected")
            if limit:
                print(f"📏 Limit: {limit:,} rows")
            if sample_percent:
                print(f"🎲 Sample: {sample_percent}%")
            
            # Execute query
            df = self.query_data(query)
            
            if df is not None:
                print(f"✅ Successfully extracted {len(df):,} rows from {table_name}")
                
                # Show column info if specific columns selected
                if columns and len(df) > 0:
                    print(f"📋 Extracted columns: {list(df.columns)}")
                
                return df
            else:
                print(f"❌ Failed to extract data from {table_name}")
                return None
                
        except Exception as e:
            print(f"✗ Error extracting data from {table_name}: {e}")
            return None

## 1.3 Helper Functions for Notebooks

In [None]:
# Cell 4: Helper Functions for Notebooks
def display_query_results(df: pd.DataFrame, title: str = "Query Results") -> pd.DataFrame:
    """Display query results in notebook-friendly format"""
    print(f"\n📊 {title}")
    print("="*50)
    print(f"📈 Rows: {len(df):,} | Columns: {len(df.columns)}")
    print(f"💾 Memory usage: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
    print(f"📋 Column types: {dict(df.dtypes)}")
    print("\n🔍 Sample data:")
    
    # Display with pandas styling for better visualization
    if len(df) > 0:
        display(df.head(10))
    else:
        print("No data to display")
    
    return df

def save_query_result(df: pd.DataFrame, filename: str, format: str = 'csv') -> str:
    """Save query results to file"""
    output_dir = '../outputs/exports'
    os.makedirs(output_dir, exist_ok=True)
    
    filepath = f"{output_dir}/{filename}.{format}"
    
    try:
        if format == 'csv':
            df.to_csv(filepath, index=False)
        elif format == 'parquet':
            df.to_parquet(filepath, index=False)
        elif format == 'excel':
            df.to_excel(filepath, index=False)
        elif format == 'json':
            df.to_json(filepath, orient='records', indent=2)
        else:
            raise ValueError(f"Unsupported format: {format}")
        
        print(f"💾 Data saved to: {filepath}")
        print(f"📊 Saved {len(df):,} rows and {len(df.columns)} columns")
        return filepath
        
    except Exception as e:
        print(f"✗ Failed to save file: {e}")
        return None

def create_sample_queries(bq_connector) -> dict:
    """Generate sample queries for exploration"""
    if not bq_connector.dataset_id:
        print("⚠️ DATASET_ID not configured")
        return {}
    
    tables = bq_connector.get_tables()
    if not tables:
        print("⚠️ No tables found")
        return {}
    
    first_table = tables[0]
    project_id = bq_connector.project_id
    dataset_id = bq_connector.dataset_id
    
    sample_queries = {
        "basic_select": f"SELECT * FROM `{project_id}.{dataset_id}.{first_table}` LIMIT 10",
        "count_rows": f"SELECT COUNT(*) as total_rows FROM `{project_id}.{dataset_id}.{first_table}`",
        "column_info": f"""
            SELECT 
                column_name,
                data_type,
                is_nullable
            FROM `{project_id}.{dataset_id}.INFORMATION_SCHEMA.COLUMNS`
            WHERE table_name = '{first_table}'
        """,
        "distinct_count": f"""
            SELECT 
                COUNT(*) as total_rows,
                COUNT(DISTINCT *) as unique_rows
            FROM `{project_id}.{dataset_id}.{first_table}`
        """
    }
    
    print("📝 Sample queries generated:")
    for name, query in sample_queries.items():
        print(f"  - {name}")
    
    return sample_queries

def quick_data_profile(df: pd.DataFrame) -> None:
    """Quick data profiling for any DataFrame"""
    print("📊 QUICK DATA PROFILE")
    print("="*50)
    
    # Basic info
    print(f"📈 Shape: {df.shape[0]:,} rows × {df.shape[1]} columns")
    print(f"💾 Memory: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
    
    # Missing values
    missing = df.isnull().sum()
    if missing.sum() > 0:
        print(f"\n🚨 Missing values:")
        for col, count in missing[missing > 0].items():
            print(f"  {col}: {count:,} ({count/len(df)*100:.1f}%)")
    else:
        print("\n✓ No missing values")
    
    # Data types
    print(f"\n📋 Data types:")
    type_counts = df.dtypes.value_counts()
    for dtype, count in type_counts.items():
        print(f"  {dtype}: {count} columns")
    
    # Numeric columns summary
    numeric_cols = df.select_dtypes(include=['number']).columns
    if len(numeric_cols) > 0:
        print(f"\n📊 Numeric columns summary:")
        display(df[numeric_cols].describe())

## 1.4 Utility Functions for BigQuery Operations

In [None]:
# Cell 5: Utility Functions for BigQuery Operations
def setup_environment_check():
    """Check if environment is properly configured"""
    print("🔍 Environment Configuration Check")
    print("="*50)
    
    # Check environment variables
    env_vars = ['PROJECT_ID', 'DATASET_ID', 'GOOGLE_APPLICATION_CREDENTIALS']
    for var in env_vars:
        value = os.getenv(var)
        status = "✓ Set" if value else "❌ Missing"
        print(f"{var}: {status}")
        if value and var == 'GOOGLE_APPLICATION_CREDENTIALS':
            file_exists = os.path.exists(value)
            print(f"  File exists: {'✓ Yes' if file_exists else '❌ No'}")
    
    print("\n📦 Required packages:")
    required_packages = ['google-cloud-bigquery', 'pandas', 'IPython']
    for package in required_packages:
        try:
            __import__(package.replace('-', '_'))
            print(f"  {package}: ✓ Installed")
        except ImportError:
            print(f"  {package}: ❌ Missing")

def format_sql_query(query: str) -> str:
    """Basic SQL formatting for better readability"""
    # Simple formatting - can be enhanced with sqlparse
    formatted = query.strip()
    formatted = formatted.replace(',', ',\n    ')
    formatted = formatted.replace(' FROM ', '\nFROM ')
    formatted = formatted.replace(' WHERE ', '\nWHERE ')
    formatted = formatted.replace(' GROUP BY ', '\nGROUP BY ')
    formatted = formatted.replace(' ORDER BY ', '\nORDER BY ')
    formatted = formatted.replace(' HAVING ', '\nHAVING ')
    
    return formatted

print("✓ BigQueryConnector class and helper functions loaded!")
print("✓ Ready to use: bq = BigQueryConnector()")
print("✓ Available helper functions: display_query_results, save_query_result, quick_data_profile")
print("✓ Utility functions: setup_environment_check, create_sample_queries, format_sql_query")

In [None]:
print("\n🎯 METHOD 2: Filter only custom methods")
print("="*50)

def get_custom_methods(obj_or_class):
    """Get only custom methods (exclude Python internal methods)"""
    methods = []
    for name in dir(obj_or_class):
        # Skip internal Python methods (yang dimulai dan diakhiri __)
        if not name.startswith('_') or name in ['__init__']:
            attr = getattr(obj_or_class, name)
            if callable(attr):  # Check if it's a method/function
                methods.append(name)
    return methods

# Dari class
custom_methods = get_custom_methods(BigQueryConnector)
print("Custom methods in BigQueryConnector:")
for i, method in enumerate(custom_methods, 1):
    print(f"  {i:2d}. {method}")
