In [12]:
import pyarrow.parquet as pq
import numpy as np
import pandas as pd
from collections import defaultdict
import logging
import time
import pyarrow as pa 

def calculate_mutual_information(row):
    """
    Calculate classical mutual information for a given state vector using natural logarithm.
    
    Args:
        row: DataFrame row containing state information including subsystem mask and probabilities
        
    Returns:
        float: Mutual information value in nats
    """
    start_time = time.time()
    logger = logging.getLogger(__name__)
    
    # Convert binary mask to indices and log initial setup
    mask = [int(x) for x in row['Subsystem_Mask']]
    N = len(mask)
    A_indices = [i for i, m in enumerate(mask) if m == 1]
    
    logger.debug(f"Processing system with {N} qubits, subsystem A size: {len(A_indices)}")
    
    # Get probabilities and states
    probs = row['Top_Probabilities']
    states = row['Top_Indices']
    logger.debug(f"Working with {len(states)} top states covering {sum(probs):.4f} probability")
    
    # Initialize probability distributions
    p_A = defaultdict(float)
    p_B = defaultdict(float)
    p_AB = defaultdict(float)
    
    # Calculate joint and marginal probabilities
    joint_prob_start = time.time()
    for state, prob in zip(states, probs):
        # Convert state to binary representation
        binary = format(state, f'0{N}b')
        
        # Get subsystem A and B configurations
        config_A = ''.join(binary[i] for i in A_indices)
        config_B = ''.join(binary[i] for i in range(N) if i not in A_indices)
        
        # Update probabilities
        p_AB[(config_A, config_B)] += prob
        p_A[config_A] += prob
        p_B[config_B] += prob
    
    joint_prob_time = time.time() - joint_prob_start
    logger.debug(f"Joint probability calculation took {joint_prob_time:.4f} seconds")
    logger.debug(f"Found {len(p_AB)} unique joint configurations")
    
    # Calculate mutual information using natural logarithm
    I_AB = 0
    mi_start = time.time()
    
    # Monitor numerical stability
    min_prob = float('inf')
    max_prob = 0
    
    for (config_A, config_B), p_joint in p_AB.items():
        marginal_product = p_A[config_A] * p_B[config_B]
        
        # Update probability bounds for monitoring
        min_prob = min(min_prob, p_joint, marginal_product)
        max_prob = max(max_prob, p_joint, marginal_product)
        
        # Use natural logarithm (ln) instead of log2
        I_AB += p_joint * np.log(p_joint / marginal_product)
    
    mi_time = time.time() - mi_start
    total_time = time.time() - start_time
    
    # Log detailed information about the calculation
    logger.debug(f"""Mutual Information calculation completed:
        Total time: {total_time:.4f} seconds
        Probability range: [{min_prob:.2e}, {max_prob:.2e}]
        Number of subsystem A configurations: {len(p_A)}
        Number of subsystem B configurations: {len(p_B)}
        Mutual Information: {I_AB:.6f} nats
    """)
    
    return I_AB

def process_parquet():
    """Process the parquet file and calculate mutual information for each row."""
    logger = logging.getLogger(__name__)
    start_time = time.time()
    
    logger.info("Starting parquet file processing")
    
    # Read input parquet
    read_start = time.time()
    df = pq.read_table('Rydberg1.5M1-8.parquet').to_pandas()
    logger.info(f"Read parquet file with {len(df)} rows in {time.time() - read_start:.2f} seconds")
    
    # Calculate mutual information for each row
    mi_start = time.time()
    total_rows = len(df)
    
    for idx, row in df.iterrows():
        if idx % 10000 == 0:  # Log progress every 100 rows
            elapsed = time.time() - mi_start
            avg_time = elapsed / (idx + 1) if idx > 0 else 0
            estimated_remaining = avg_time * (total_rows - idx - 1)
            
            logger.info(f"""Progress: {idx+1}/{total_rows} rows ({(idx+1)/total_rows*100:.1f}%)
                Average time per row: {avg_time:.4f} seconds
                Estimated remaining time: {estimated_remaining/60:.1f} minutes
            """)
            
        df.at[idx, 'Classical_Mutual_Info'] = calculate_mutual_information(row)
    
    mi_time = time.time() - mi_start
    logger.info(f"Mutual information calculation completed in {mi_time:.2f} seconds")
    
    # Write new parquet with added column
    write_start = time.time()
    pq.write_table(
        table=pa.Table.from_pandas(df),
        where='Rydberg1.5M1-8_with_MI.parquet'
    )
    logger.info(f"Wrote output parquet file in {time.time() - write_start:.2f} seconds")
    
    total_time = time.time() - start_time
    logger.info(f"""
    Processing complete:
        Total time: {total_time:.2f} seconds
        Average time per row: {total_time/len(df):.4f} seconds
        Output file: Rydberg1.5M1-8_with_MI.parquet
    """)

if __name__ == "__main__":
    # Set up logging configuration
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler("mutual_information.log"),
            logging.StreamHandler()
        ]
    )
    
    process_parquet()

2025-01-23 04:14:14,238 - INFO - Starting parquet file processing
2025-01-23 04:16:09,147 - INFO - Read parquet file with 1500000 rows in 114.90 seconds
2025-01-23 04:16:10,049 - INFO - Progress: 1/1500000 rows (0.0%)
                Average time per row: 0.0000 seconds
                Estimated remaining time: 0.0 minutes
            
2025-01-23 04:17:36,702 - INFO - Progress: 10001/1500000 rows (0.7%)
                Average time per row: 0.0087 seconds
                Estimated remaining time: 217.2 minutes
            
2025-01-23 04:18:57,661 - INFO - Progress: 20001/1500000 rows (1.3%)
                Average time per row: 0.0084 seconds
                Estimated remaining time: 207.7 minutes
            
2025-01-23 04:20:16,758 - INFO - Progress: 30001/1500000 rows (2.0%)
                Average time per row: 0.0083 seconds
                Estimated remaining time: 202.2 minutes
            
2025-01-23 04:21:36,187 - INFO - Progress: 40001/1500000 rows (2.7%)
                Aver

In [6]:
import pyarrow.parquet as pq
import pyarrow as pa
import pandas as pd

# Paths to the input and output Parquet files
input_file = 'Rydberg1.5M1-8_with_MI.parquet'
output_file = 'filtered_Rydberg1.5M1-8_with_MI_ex_filtered.parquet'  # Use a different name to avoid conflicts

# Open the input Parquet file
parquet_file = pq.ParquetFile(input_file)

# Initialize a ParquetWriter for the output file as None
writer = None

# Define a batch size (number of rows per batch)
# Adjust this number based on your available memory
batch_size = 100000  # Example: 100,000 rows per batch

# Iterate over each row group in the Parquet file
for rg in range(parquet_file.num_row_groups):
    try:
        # Read the current row group as a PyArrow Table
        table = parquet_file.read_row_group(rg)
        
        # Convert the table to record batches
        batches = table.to_batches(max_chunksize=batch_size)
        
        for batch in batches:
            # Convert the batch to a Pandas DataFrame
            df = batch.to_pandas()
            
            # Apply the filter condition
            filtered_df = df[df['Classical_Mutual_Info'] > df['Von_Neumann_Entropy']]
            
            if not filtered_df.empty:
                # Convert the filtered DataFrame back to a PyArrow Table
                filtered_table = pa.Table.from_pandas(filtered_df)
                
                # Initialize the writer with the schema from the first filtered table
                if writer is None:
                    writer = pq.ParquetWriter(output_file, filtered_table.schema)
                
                # Write the filtered table to the output Parquet file
                writer.write_table(filtered_table)
        
        # Optional: Print progress
        print(f"Processed row group {rg + 1}/{parquet_file.num_row_groups}")
    
    except Exception as e:
        print(f"Error processing row group {rg}: {e}")

# Close the writer to finalize the Parquet file
if writer:
    writer.close()

print(f"Filtered data has been saved to {output_file}")


Error processing row group 0: malloc of size 8589934592 failed
Processed row group 2/2
Filtered data has been saved to filtered_Rydberg1.5M1-8_with_MI_ex_filtered.parquet
