Imports

In [23]:
import boto3
from boto3.dynamodb.conditions import Key, Attr
import ipywidgets as widgets
from IPython.display import display as ipy_display, clear_output as ipy_clear_output


Initialization

In [24]:
# Widgets for user inputs
profile_input = widgets.Text(
    description="AWS Profile:", 
    placeholder="e.g., musarisaas",
    style={'description_width': '150px'}
)
table_input = widgets.Text(
    description="Table Name:", 
    placeholder="e.g., CoreUser",
    style={'description_width': '150px'}
)

# Textarea for bulk paste + individual inputs
partition_keys_textarea = widgets.Textarea(
    description="Partition Keys:",
    placeholder="Paste multiple values (one per line) or use buttons below to add individually",
    style={'description_width': '150px'},
    rows=5,
    layout=widgets.Layout(width='600px')
)

# Dynamic partition key inputs
partition_key_list = []
partition_keys_container = widgets.VBox([])

def add_partition_key(b=None):
    """Add a new partition key input field"""
    new_input = widgets.HBox([
        widgets.Text(
            placeholder="e.g., PEPSICO-PRE",
            layout=widgets.Layout(width='400px')
        ),
        widgets.Button(
            description="‚ùå",
            button_style='danger',
            layout=widgets.Layout(width='40px')
        )
    ])
    
    # Add remove functionality
    remove_button = new_input.children[1]
    def remove_this_key(b):
        partition_key_list.remove(new_input)
        update_partition_keys_display()
    remove_button.on_click(remove_this_key)
    
    partition_key_list.append(new_input)
    update_partition_keys_display()

def update_partition_keys_display():
    """Update the display of partition key inputs"""
    if partition_key_list:
        partition_keys_container.children = partition_key_list
    else:
        partition_keys_container.children = [
            widgets.HTML("<i>No individual keys added. Use the text area above or click 'Add Single Key'.</i>")
        ]

# Buttons for partition key management
add_partition_button = widgets.Button(
    description="+ Add Single Key",
    button_style='info',
    icon='plus',
    layout=widgets.Layout(width='150px')
)
add_partition_button.on_click(add_partition_key)

clear_all_button = widgets.Button(
    description="Clear All",
    button_style='warning',
    icon='trash',
    layout=widgets.Layout(width='150px')
)

def clear_all_keys(b):
    """Clear all partition key inputs"""
    partition_key_list.clear()
    partition_keys_textarea.value = ""
    update_partition_keys_display()

clear_all_button.on_click(clear_all_keys)

sort_key_prefix_input = widgets.Text(
    description="Sort Key Prefix:", 
    placeholder="e.g., 2024 (optional)",
    style={'description_width': '150px'}
)

run_button = widgets.Button(
    description="Run Query", 
    button_style="success",
    icon='play',
    layout=widgets.Layout(width='150px')
)
output = widgets.Output()

Query and count

In [25]:
def run_query(b):
    with output:
        ipy_clear_output()
        try:
            # Validate inputs
            profile_name = profile_input.value.strip()
            table_name = table_input.value.strip()
            sort_key_prefix = sort_key_prefix_input.value.strip()

            # Collect partition key values from BOTH textarea and individual inputs
            partition_values = []
            
            # From textarea (support pasted values)
            textarea_values = [v.strip() for v in partition_keys_textarea.value.split('\n') if v.strip()]
            partition_values.extend(textarea_values)
            
            # From individual input fields
            for key_input in partition_key_list:
                value = key_input.children[0].value.strip()
                if value:  # Only add non-empty values
                    partition_values.append(value)
            
            # Remove duplicates while preserving order
            seen = set()
            partition_values = [x for x in partition_values if not (x in seen or seen.add(x))]

            if not profile_name or not table_name:
                ipy_display("‚ùå Error: AWS Profile and Table Name are required.")
                return
            
            if not partition_values:
                ipy_display("‚ùå Error: At least one partition key value is required.")
                ipy_display("üí° Tip: Paste values in the text area (one per line) or use '+ Add Single Key' button.")
                return

            ipy_display("üîÑ Connecting to DynamoDB...")
            
            # Initialize boto3 session and DynamoDB table
            session = boto3.Session(profile_name=profile_name)
            dynamodb = session.resource('dynamodb')
            table = dynamodb.Table(table_name)

            # Get table key schema dynamically
            ipy_display("üîç Detecting table schema...")
            table.load()  # Load table metadata
            key_schema = {item['AttributeName']: item['KeyType'] 
                         for item in table.key_schema}
            
            # Find partition and sort key names
            partition_key_name = next(name for name, key_type in key_schema.items() 
                                     if key_type == 'HASH')
            sort_key_name = next((name for name, key_type in key_schema.items() 
                                 if key_type == 'RANGE'), None)

            ipy_clear_output(wait=True)
            ipy_display(f"üìä Table: {table_name}")
            ipy_display(f"üîë Partition Key: '{partition_key_name}'")
            ipy_display(f"üìù Querying {len(partition_values)} partition value(s):")
            for pv in partition_values:
                ipy_display(f"   - {pv}")
            if sort_key_name:
                if sort_key_prefix:
                    ipy_display(f"üîë Sort Key: '{sort_key_name}' begins_with '{sort_key_prefix}'")
                else:
                    ipy_display(f"üîë Sort Key: '{sort_key_name}' (no filter applied)")
            ipy_display("\n" + "="*60)

            # Initialize overall counters
            total_record_count = 0
            total_page_count = 0
            partition_results = {}

            # Query each partition key value
            for idx, partition_value in enumerate(partition_values, 1):
                ipy_clear_output(wait=True)
                ipy_display(f"üìä Table: {table_name}")
                ipy_display(f"üîÑ Processing partition {idx}/{len(partition_values)}: '{partition_value}'")
                ipy_display("="*60)
                
                # Initialize pagination vars for this partition
                record_count = 0
                page_count = 0
                last_evaluated_key = None

                # Prepare KeyConditionExpression
                key_cond = Key(partition_key_name).eq(partition_value)
                if sort_key_name and sort_key_prefix:
                    key_cond = key_cond & Key(sort_key_name).begins_with(sort_key_prefix)

                # Paginate through results for this partition
                while True:
                    query_params = {
                        'KeyConditionExpression': key_cond,
                        'Limit': 1000
                    }
                    
                    if last_evaluated_key:
                        query_params['ExclusiveStartKey'] = last_evaluated_key

                    response = table.query(**query_params)
                    
                    items_in_page = response['Count']
                    record_count += items_in_page
                    page_count += 1

                    # Show progress for this partition
                    ipy_clear_output(wait=True)
                    ipy_display(f"üìä Table: {table_name}")
                    ipy_display(f"üîÑ Processing partition {idx}/{len(partition_values)}: '{partition_value}'")
                    ipy_display(f"   Page {page_count} | Items: {items_in_page} | Subtotal: {record_count}")
                    ipy_display("="*60)

                    # Check if there are more pages
                    last_evaluated_key = response.get('LastEvaluatedKey')
                    if not last_evaluated_key:
                        break

                # Store results for this partition
                partition_results[partition_value] = {
                    'records': record_count,
                    'pages': page_count
                }
                total_record_count += record_count
                total_page_count += page_count

            # Display final results
            ipy_clear_output(wait=True)
            ipy_display(f"üìä Table: {table_name}")
            ipy_display(f"üîë Partition Key: '{partition_key_name}'")
            if sort_key_name and sort_key_prefix:
                ipy_display(f"üîë Sort Key Filter: '{sort_key_name}' begins_with '{sort_key_prefix}'")
            ipy_display("\n" + "="*60)
            ipy_display(f"‚úÖ Query Complete!")
            ipy_display(f"\nüìà OVERALL SUMMARY:")
            ipy_display(f"   Total Records: {total_record_count}")
            ipy_display(f"   Total Pages: {total_page_count}")
            ipy_display(f"   Partitions Queried: {len(partition_values)}")
            
            if len(partition_values) > 1:
                ipy_display(f"\nüìä BREAKDOWN BY PARTITION:")
                for partition_value, results in partition_results.items():
                    ipy_display(f"   '{partition_value}':")
                    ipy_display(f"      Records: {results['records']}")
                    ipy_display(f"      Pages: {results['pages']}")

        except AttributeError:
            ipy_clear_output(wait=True)
            ipy_display(f"‚ùå Error: AWS profile '{profile_name}' not found.")
            ipy_display("üí° Check your AWS credentials configuration.")
        except Exception as e:
            ipy_clear_output(wait=True)
            ipy_display(f"‚ùå Error: {e}")
            ipy_display(f"üîç Error Type: {type(e).__name__}")

run_button.on_click(run_query)

In [26]:
# Display UI
ipy_display(widgets.HTML("<h3>üîç DynamoDB Query Counter</h3>"))
ipy_display(widgets.HTML("<p>Query any DynamoDB table and count matching records.</p>"))
ipy_display(
    widgets.VBox([
        profile_input, 
        table_input,
        widgets.HTML("<b>Option 1: Paste Multiple Values (one per line)</b>"),
        partition_keys_textarea,
        widgets.HTML("<b>Option 2: Add Individual Keys</b>"),
        widgets.HBox([add_partition_button, clear_all_button]),
        partition_keys_container,
        sort_key_prefix_input, 
        run_button, 
        output
    ], layout=widgets.Layout(padding='10px'))
)

HTML(value='<h3>üîç DynamoDB Query Counter</h3>')

HTML(value='<p>Query any DynamoDB table and count matching records.</p>')

VBox(children=(Text(value='', description='AWS Profile:', placeholder='e.g., musarisaas', style=TextStyle(desc‚Ä¶