In [2]:
import boto3
import os
import json
import pandas as pd
import shutil
import logging
from botocore.exceptions import BotoCoreError, ClientError
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)


# Automate Production Center Submissions

In [3]:

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Set up configurations
BUCKET_NAME = 'human-pangenomics'

with open('config/dev/prefixes.json', 'r') as f:
    config = json.load(f)
    PREFIXES = config['PREFIXES']

# Metadata files mapping
metadata_files = {
    'data/production-metadata/HPRC_RU_Y2_Sequel_Metadata_PacBio_HiFi_Submission.csv': 'RU_Y2_HIFI',
    'data/production-metadata/HPRC_RU_Y2_topoff_Metadata_Submission.tsv': 'RU_Y2_topoff',
    'data/production-metadata/HPRC_RU_Y3_HiFi_Metadata_Submission.csv': 'RU_Y3_HIFI',
    'data/production-metadata/HPRC_RU_Y3_topoff_Metadata_Submission.tsv': 'RU_Y3_topoff',
    'data/production-metadata/HPRC_RU_Y4_Metadata_Submission.tsv': 'RU_Y4'
}

INPUT_MAPPING_PATH = '/private/groups/hprc/human-pangenomics/hprc-synapse/config/dev/hifi_qc_input_mapping.csv'

SRA_TABLE = pd.read_csv('/private/groups/hprc/human-pangenomics/hprc-synapse-1/HPRC_metadata/data/SRA/table_download.tsv', sep='\t')

In [7]:
# Initialize the S3 client
s3 = boto3.client('s3')

def list_files_with_size(bucket, prefix):
    """List files in an S3 bucket with size details."""
    try:
        file_list = []
        paginator = s3.get_paginator('list_objects_v2')
        pages = paginator.paginate(Bucket=bucket, Prefix=prefix)

        for page in pages:
            if 'Contents' in page:
                for obj in page['Contents']:
                    file_list.append({
                        'Key': obj['Key'],
                        'Size_in_Bytes': obj['Size'],
                        'Size_in_GB': obj['Size'] / (1024 ** 3)  # Convert to GB
                    })

        return file_list
    except (BotoCoreError, ClientError) as e:
        logging.error(f"Failed to list files in bucket {bucket} with prefix {prefix}: {e}")
        return []

def create_directory_structure(project_name):
    """Create main and hifi_qc directories for each project under the submission folder."""
    main_dir = os.path.join('submissions', project_name)
    hifi_qc_dir = os.path.join(main_dir, 'hifi_qc')
    
    os.makedirs(hifi_qc_dir, exist_ok=True)
    logging.info(f"Created directory structure: {hifi_qc_dir}")
    
    return hifi_qc_dir

def copy_metadata_files():
    """Copy each metadata file to its corresponding project folder within the submission directory."""
    for metadata_file, project_folder in metadata_files.items():
        # Define the source path (metadata file already located in production-metadata directory)
        source_path = metadata_file

        # Define the destination path within the submission directory
        destination_dir = os.path.join('submissions', project_folder)
        destination_path = os.path.join(destination_dir, os.path.basename(metadata_file))

        # Ensure the destination project directory exists
        os.makedirs(destination_dir, exist_ok=True)
        
        # Copy the file
        try:
            shutil.copy(source_path, destination_path)
            logging.info(f"Copied {metadata_file} to {destination_path}")
        except IOError as e:
            logging.error(f"Failed to copy {metadata_file} to {destination_path}: {e}")


def save_aws_df_to_csv(aws_df, output_path):
    """Save the S3 file list DataFrame to CSV."""
    try:
        aws_df.to_csv(output_path, index=False)
        logging.info(f"Data saved to {output_path}")
    except IOError as e:
        logging.error(f"Failed to save DataFrame to {output_path}: {e}")

def copy_template_file(dest_dir):
    """Copy the template file to the destination directory."""
    try:
        shutil.copy(INPUT_MAPPING_PATH, os.path.join(dest_dir, "hifi_qc_input_mapping.csv"))
        logging.info(f"Template CSV copied to {dest_dir}")
    except IOError as e:
        logging.error(f"Failed to copy template file to {dest_dir}: {e}")


def prepare_qc_script(project_name):
    """Copy and customize the QC template script for each project."""
    # Define paths for the template and the destination script
    template_script_path = 'config/dev/hifi_qc_template.py'
    destination_script_path = os.path.join('submissions', project_name, f"{project_name}_qc.py")

    # Copy the template script to the destination
    try:
        shutil.copy(template_script_path, destination_script_path)
        
        # Read in the script and replace placeholder with project name
        with open(destination_script_path, 'r') as file:
            script_content = file.read()
        
        # Replace "PROJECT_NAME" placeholder with the actual project name in the script content
        script_content = script_content.replace("PROJECT_NAME", project_name)
        
        # Write the customized script back to the file
        with open(destination_script_path, 'w') as file:
            file.write(script_content)
        
        logging.info(f"QC script prepared for {project_name} at {destination_script_path}")
    except IOError as e:
        logging.error(f"Failed to prepare QC script for {project_name}: {e}")



def process_prefix(project_name, prefix):
    """Process each prefix: fetch data from S3, save to CSV, and set up directories."""
    logging.info(f"Processing project: {project_name}")
    
    # Create necessary directories
    hifi_qc_dir = create_directory_structure(project_name)
    
    # Get the list of files with their sizes for each prefix
    aws_files_with_size = list_files_with_size(BUCKET_NAME, prefix)
    if not aws_files_with_size:
        logging.warning(f"No files found for prefix {prefix}")
        return

    # Convert the list to a DataFrame
    aws_df = pd.DataFrame(aws_files_with_size)
    aws_df['Key'] = 's3://' + BUCKET_NAME + '/' + aws_df['Key']
    aws_df = aws_df[aws_df['Key'].str.contains(r'\.bam$|\.fastq\.gz$', regex=True)]
    aws_df = aws_df[aws_df['Key'].str.contains(r'/(HG\d+|NA\d+)/', regex=True)]
    aws_df['Prefix_Label'] = project_name
    
    # Save the DataFrame to CSV under the main project directory
    output_path = os.path.join('submissions', project_name, f"{project_name}_aws_submissions_files.csv")
    save_aws_df_to_csv(aws_df, output_path)

    # Copy template file to hifi_qc directory
    copy_template_file(hifi_qc_dir)

# Run the process for each prefix
for project_name, prefix in PREFIXES.items():
    process_prefix(project_name, prefix)
    # Prepare the QC script for the project
    prepare_qc_script(project_name)

# Copy metadata files to the main project directories
copy_metadata_files()



2024-11-05 16:05:41,706 - INFO - Found credentials in shared credentials file: ~/.aws/credentials


2024-11-05 16:05:42,054 - INFO - Processing project: RU_Y2_HIFI
2024-11-05 16:05:42,056 - INFO - Created directory structure: submissions/RU_Y2_HIFI/hifi_qc
  aws_df = aws_df[aws_df['Key'].str.contains(r'/(HG\d+|NA\d+)/', regex=True)]
2024-11-05 16:05:42,443 - INFO - Data saved to submissions/RU_Y2_HIFI/RU_Y2_HIFI_aws_submissions_files.csv
2024-11-05 16:05:42,446 - ERROR - Failed to copy template file to submissions/RU_Y2_HIFI/hifi_qc: [Errno 2] No such file or directory: '/private/groups/hprc/human-pangenomics/hprc-synapse/config/dev/hifi_qc_input_mapping.csv'
2024-11-05 16:05:42,465 - INFO - QC script prepared for RU_Y2_HIFI at submissions/RU_Y2_HIFI/RU_Y2_HIFI_qc.py
2024-11-05 16:05:42,467 - INFO - Processing project: RU_Y2_topoff
2024-11-05 16:05:42,468 - INFO - Created directory structure: submissions/RU_Y2_topoff/hifi_qc
  aws_df = aws_df[aws_df['Key'].str.contains(r'/(HG\d+|NA\d+)/', regex=True)]
2024-11-05 16:05:42,539 - INFO - Data saved to submissions/RU_Y2_topoff/RU_Y2_topof

 automate output sample table check

Required
- Metadata_Submission
- aws_submissions_files
- aws_transfer_working


# Check Production Center Submission Files Released to Working

In [4]:


# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Initialize the S3 client
s3 = boto3.client('s3')

def check_s3_file_exists(s3_path):
    """Check if a specific file exists in the S3 bucket using its full S3 path."""
    # Parse the bucket and key from the S3 path
    if not s3_path.startswith("s3://"):
        raise ValueError("Invalid S3 path format.")
    
    path_parts = s3_path[5:].split('/', 1)
    bucket_name = path_parts[0]
    key = path_parts[1]

    try:
        # Attempt to retrieve metadata for the object to check existence
        s3.head_object(Bucket=bucket_name, Key=key)
        return True
    except ClientError as e:
        # If a 404 error is raised, the file does not exist
        if e.response['Error']['Code'] == "404":
            return False
        else:
            # Raise any other errors encountered
            raise e

def check_aws_transfer_files(project_name, prefix):
    """Check if all files in <project>.aws_transfer_working.csv are present in the S3 bucket."""
    transfer_csv_path = os.path.join('submissions', project_name, f"{project_name}.aws_transfer_working.csv")
    
    # Load the list of paths from the transfer CSV without any modification
    try:
        transfer_df = pd.read_csv(transfer_csv_path, header=None)
        transfer_paths = transfer_df.iloc[:, 1].tolist()  # Direct list of paths from the CSV
        
        # Check each file in the transfer list and print only missing files
        missing_files = [s3_path for s3_path in transfer_paths if not check_s3_file_exists(s3_path)]
        
        if missing_files:
            logging.warning(f"Missing files for project {project_name}:")
            for path in missing_files:
                print(path)
        else:
            logging.info(f"All files in {project_name}.aws_transfer_working.csv are present in AWS.")
    
    except FileNotFoundError:
        logging.error(f"{transfer_csv_path} not found.")

# Run the check for each project and prefix
for project_name, prefix in PREFIXES.items():
    check_aws_transfer_files(project_name, prefix)


2024-11-06 10:41:52,400 - INFO - All files in RU_Y2_HIFI.aws_transfer_working.csv are present in AWS.
2024-11-06 10:41:52,680 - INFO - All files in RU_Y2_topoff.aws_transfer_working.csv are present in AWS.
2024-11-06 10:41:56,596 - INFO - All files in RU_Y3_HIFI.aws_transfer_working.csv are present in AWS.
2024-11-06 10:41:58,078 - INFO - All files in RU_Y3_topoff.aws_transfer_working.csv are present in AWS.
2024-11-06 10:42:13,717 - INFO - All files in RU_Y4.aws_transfer_working.csv are present in AWS.


# Check SRA Submission

In [6]:
import glob
import os
import pandas as pd

# Define the required columns
required_columns = [
    'filename', 'sample_ID', 'path', 'accession', 'study', 'biosample_accession',
    'total_reads', 'total_bp', 'total_Gbp', 'min', 'max', 'mean', 'quartile_25',
    'quartile_50', 'quartile_75', 'N25', 'N50', 'N75', 'library_ID',
    'library_strategy', 'library_source', 'library_selection', 'library_layout',
    'platform', 'instrument_model', 'design_description', 'data_type',
    'shear_method', 'size_selection', 'DeepConsensus_version', 'polymerase_version',
    'seq_plate_chemistry_version', 'generator_facility', 'generator_contact',
    'notes', 'ccs_algorithm', 'MM_tag'
]

# Get list of all submission project directories
submission_dirs = glob.glob('submissions/*/')
# Get list of all data table files across submissions
data_table_files = glob.glob('submissions/*/*_data_table.csv')

# Extract project names that have data tables
data_table_projects = {os.path.dirname(file) for file in data_table_files}

# List to hold DataFrames, track missing columns, NaNs, and missing projects
data_frames = []
missing_data_table_projects = []
missing_columns_files = []
nan_columns_files = []

# Loop through each data table file
for file_path in data_table_files:
    try:
        # Read each file into a DataFrame
        df = pd.read_csv(file_path)  # adjust 'sep' if necessary, e.g., ',' for CSV
        data_frames.append(df)
        
        # Check for missing columns
        missing_columns = [col for col in required_columns if col not in df.columns]
        if missing_columns:
            print(f"Missing columns in {file_path}: {missing_columns}")
            missing_columns_files.append((file_path, missing_columns))
        
        # Check for NaN values in required columns
        nan_columns = df[required_columns].isna().any()
        nan_columns = nan_columns[nan_columns].index.tolist()
        if nan_columns:
            print(f"NaN values found in columns {nan_columns} for {file_path}")
            nan_columns_files.append((file_path, nan_columns))
        else:
            print(f"Loaded data from {file_path} with all required columns and no NaN values")
    except Exception as e:
        print(f"Error loading {file_path}: {e}")

# Identify projects missing a data table file
for project_dir in submission_dirs:
    if project_dir.rstrip('/') not in data_table_projects:
        missing_data_table_projects.append(project_dir)

# Print missing project directories
if missing_data_table_projects:
    print("\nDirectories missing a data table file:")
    for missing_dir in missing_data_table_projects:
        print(missing_dir)
else:
    print("All project directories contain a data table file.")

# Print files missing specific columns
if missing_columns_files:
    print("\nFiles with missing columns:")
    for file_path, missing_columns in missing_columns_files:
        print(f"{file_path} missing columns: {missing_columns}")

# Print files with NaN values in specific columns
if nan_columns_files:
    print("\nFiles with NaN values in specific columns:")
    for file_path, nan_columns in nan_columns_files:
        print(f"{file_path} has NaN values in columns: {nan_columns}")

# Concatenate all DataFrames into a single DataFrame if they have the required columns
if data_frames:
    aggregated_df = pd.concat(data_frames, ignore_index=True)
    # Save the aggregated DataFrame to a new CSV file
    aggregated_df[required_columns].to_csv("data/hprc-data-explorer-tables/HPRC_PacBio_HiFi.file.index.csv", index=False)
    print("\nAggregated data saved to 'HPRC-aggregated_data_table.csv'")
else:
    print("No data tables found to aggregate.")
# SRA 
assert [filename for filename in aggregated_df.filename.tolist() if filename not in SRA_TABLE['SRA.filename'].tolist()] == []

NaN values found in columns ['DeepConsensus_version', 'notes'] for submissions/RU_Y2_topoff/RU_Y2_topoff_data_table.csv
NaN values found in columns ['DeepConsensus_version', 'notes'] for submissions/RU_Y3_HIFI/RU_Y3_HIFI_data_table.csv
NaN values found in columns ['DeepConsensus_version', 'notes'] for submissions/RU_Y2_HIFI/RU_Y2_HIFI_data_table.csv
NaN values found in columns ['DeepConsensus_version', 'notes'] for submissions/RU_Y3_topoff/RU_Y3_topoff_data_table.csv
NaN values found in columns ['DeepConsensus_version', 'notes'] for submissions/RU_Y4/RU_Y4_data_table.csv
All project directories contain a data table file.

Files with NaN values in specific columns:
submissions/RU_Y2_topoff/RU_Y2_topoff_data_table.csv has NaN values in columns: ['DeepConsensus_version', 'notes']
submissions/RU_Y3_HIFI/RU_Y3_HIFI_data_table.csv has NaN values in columns: ['DeepConsensus_version', 'notes']
submissions/RU_Y2_HIFI/RU_Y2_HIFI_data_table.csv has NaN values in columns: ['DeepConsensus_version',

# Sample Aggregation

In [17]:
# Aggregation rules where filename and path align by index within each sample_ID
aggregation_rules = {
     'sample_ID': 'first', 
    'filename': lambda x: list(x),           # Collect filenames in a list, preserving order
    'path': lambda x: list(x),               # Collect paths in a list, preserving order
                      # Take the first sample_ID since it should be consistent per group
    
    # Additional fields with aggregation rules
    'library_ID':  lambda x: list(x),
    'library_strategy':  lambda x: list(x),
    'library_source':  lambda x: list(x),
    'library_selection':  lambda x: list(x),
    'library_layout':  lambda x: list(x),
    'platform':  lambda x: list(x),
    'instrument_model':  lambda x: list(x),
    'design_description':  lambda x: list(x),
    'data_type':  lambda x: list(x),
    'shear_method':  lambda x: list(x),
    'size_selection':  lambda x: list(x),
    'ccs_algorithm':  lambda x: list(x),
    'polymerase_version':  lambda x: list(x),
    'seq_plate_chemistry_version':  lambda x: list(x),
    'generator_facility':  lambda x: list(x),
    'generator_contact':  lambda x: list(x),
    'notes': lambda x: list(x.unique()),     # Unique notes combined into a list
    
    # Numerical fields with aggregation functions
    'total_reads': 'sum',
    'total_bp': 'sum',
    'total_Gbp': 'sum',
    'min': 'mean',
    'max': 'mean',
    'quartile_25': 'mean',
    'quartile_50': 'mean',
    'quartile_75': 'mean',
    'N25': 'mean',
    'N50': 'mean',
    'N75': 'mean'
}

# Apply aggregation with alignment maintained for lists
aggregated_sample_df = aggregated_df.groupby('sample_ID', as_index=False).agg(aggregation_rules).reset_index()

# Resulting DataFrame has lists in 'filename' and 'path' that align by index for each sample_ID


In [20]:
aggregated_sample_df.drop(columns=['index'],inplace=True)

In [21]:
aggregated_sample_df.to_csv("data/hprc-data-explorer-tables/HPRC_PacBio_HiFi.sample.index.csv", index=False)