In [None]:
!pip install apache_beam --quiet

In [2]:
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'XXX' #specify your json credentials

In [3]:
from google.cloud import storage

bucket_name = 'fh-public'  # Replace with your bucket name
prefix = 'wikicrow2/'

client = storage.Client()
bucket = client.bucket(bucket_name)

In [4]:
blobs = client.list_blobs(bucket, prefix=prefix)
data_files = [blob.name for blob in blobs ]

In [None]:
import os
from concurrent.futures import ThreadPoolExecutor, as_completed

# Define a local directory to store downloaded data
download_dir = 'wikicrow2'
os.makedirs(download_dir, exist_ok=True)

def download_file(file_name):
    """
    Downloads a single file from the bucket to the local directory.

    Args:
        file_name (str): The name/path of the file in the bucket.

    Returns:
        str: The name of the downloaded file.
    """
    try:
        blob = bucket.blob(file_name)
        local_path = os.path.join(download_dir, os.path.basename(file_name))

        # Download the file
        blob.download_to_filename(local_path)
        print(f"Downloaded {file_name} to {local_path}")
        return file_name
    except Exception as e:
        print(f"Failed to download {file_name}: {e}")
        return None

# Specify the number of threads.
# You can adjust this number based on your system's capabilities and network bandwidth.
MAX_WORKERS = 10

# Use ThreadPoolExecutor to manage concurrent downloads
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    # Submit all download tasks to the executor
    future_to_file = {executor.submit(download_file, file_name): file_name for file_name in data_files}

    # As each task completes, handle the result
    for future in as_completed(future_to_file):
        file_name = future_to_file[future]
        try:
            result = future.result()
            if result:
                # Optional: Additional processing can be done here
                pass
        except Exception as exc:
            print(f"{file_name} generated an exception: {exc}")

print("All downloads completed.")

In [None]:
file_keys = [file.replace('.txt', '') for file in data_files if file.endswith('.txt')]
file_keys = [file.replace('wikicrow2/', '') for file in file_keys]

In [None]:
import concurrent.futures
import requests
import os
import sqlite3
import threading
from tqdm import tqdm  # Import tqdm for progress bar

# Function to map gene names to UniProt IDs
def map_gene_to_uniprot(gene_name, organism_id=9606):
    base_url = "https://rest.uniprot.org/uniprotkb/search"
    query = f"gene_exact:{gene_name} AND organism_id:{organism_id}"
    params = {
        "query": query,
        "format": "json",
        "fields": "accession",
        "size": 1  # Only top result
    }

    try:
        response = requests.get(base_url, params=params)
        response.raise_for_status()
        data = response.json()

        if data['results']:
            uniprot_id = data['results'][0]['primaryAccession']
            return (gene_name, uniprot_id)
        else:
            tqdm.write(f"No UniProt accession found for gene '{gene_name}'.")
            return (gene_name, None)

    except requests.exceptions.HTTPError as http_err:
        tqdm.write(f"HTTP error for gene '{gene_name}': {http_err}")
        return (gene_name, None)
    except Exception as err:
        tqdm.write(f"Error for gene '{gene_name}': {err}")
        return (gene_name, None)

# Function to construct AlphaFold PDB URL
def construct_alphafold_pdb_url(uniprot_id, model, version):
    base_url = "https://alphafold.ebi.ac.uk/files"
    pdb_url = f"{base_url}/AF-{uniprot_id}-{model}-model_v{version}.pdb"
    return pdb_url

# Function to download PDB files
def download_pdb(gene_uniprot_tuple, output_dir='pdb_files'):
    gene, uniprot_id = gene_uniprot_tuple

    if not uniprot_id:
        tqdm.write(f"Skipping download for gene '{gene}' due to missing UniProt ID.")
        return (gene, None)

    versions = ['4', '3', '2', '1']
    models = ['F1', 'F2', 'F3', 'F4', 'F5']

    os.makedirs(output_dir, exist_ok=True)

    for version in versions:
        for model in models:
            pdb_url = construct_alphafold_pdb_url(uniprot_id, model, version)
            filename = f"AF-{uniprot_id}-{model}-model_v{version}.pdb"
            save_path = os.path.join(output_dir, filename)

            try:
                response = requests.get(pdb_url, stream=True)
                if response.status_code == 200:
                    with open(save_path, 'wb') as file:
                        for data in response.iter_content(1024):
                            file.write(data)
                    return (gene, os.path.abspath(save_path))  # Return absolute path
                else:
                    continue
            except Exception as err:
                tqdm.write(f"Error downloading {filename} for UniProt ID '{uniprot_id}': {err}")
                continue

    tqdm.write(f"No PDB files found for UniProt ID '{uniprot_id}' for gene '{gene}'.")
    return (gene, None)

# Lock for database access
db_lock = threading.Lock()

def process_gene(x_and_y):
    x, y = x_and_y
    try:
        with open(y, "r") as f:
            description = f.read()
    except Exception as e:
        tqdm.write(f"Error reading file {y}: {e}")
        description = ""

    gene_name = x
    gene_uniprot_tuple = map_gene_to_uniprot(gene_name)
    download_pdb(gene_uniprot_tuple)
    gene, uniprot_id = gene_uniprot_tuple
    gene_pdb_tuple = (gene_uniprot_tuple)
    gene, pdb_file_path = gene_pdb_tuple

    # Insert into SQL database
    with db_lock:
        cursor.execute('''
            INSERT INTO genes (gene_name, description, uniprot_id, pdb_file)
            VALUES (?, ?, ?, ?)
        ''', (gene_name, description, uniprot_id, pdb_file_path))
        conn.commit()

    return (gene_name, description, uniprot_id, pdb_file_path)

# Your lists of gene names and data file

# Create the database connection
conn = sqlite3.connect('genes.db', check_same_thread=False)
cursor = conn.cursor()

# Create the table
cursor.execute('''
    CREATE TABLE IF NOT EXISTS genes (
        gene_name TEXT,
        description TEXT,
        uniprot_id TEXT,
        pdb_file TEXT
    )
''')
conn.commit()

# Use ThreadPoolExecutor for concurrency
with concurrent.futures.ThreadPoolExecutor() as executor:
    # Wrap the executor.map call with tqdm for progress bar
    results = list(tqdm(
        executor.map(process_gene, zip(file_keys, data_files)),
        total=len(file_keys),
        desc="Processing Genes",
        unit="gene"
    ))

# Close the database connection
conn.close()

print("Processing complete. Data stored in 'genes.db' database.")


In [None]:
import os
import apache_beam as beam
from google.cloud import storage

class UploadToGCS(beam.DoFn):
    def __init__(self, bucket_name, destination_path):
        self.bucket_name = bucket_name
        self.destination_path = destination_path
        self.client = None

    def setup(self):
        # Initialize the GCS client once per worker
        self.client = storage.Client()

    def process(self, file_path):
        # Extract the file name
        file_name = os.path.basename(file_path)
        # Build the GCS destination path
        gcs_blob_path = f"{self.destination_path}/{file_name}"
        # Upload the file to GCS
        bucket = self.client.bucket(self.bucket_name)
        blob = bucket.blob(gcs_blob_path)
        blob.upload_from_filename(file_path)
        yield f"Uploaded {file_path} to gs://{self.bucket_name}/{gcs_blob_path}"


# Configuration
bucket_name = "xxxx"  # Replace with your GCS bucket name
destination_path = "pdb_files"  # The target folder in GCS
local_folder = "/content/pdb_files"  # Folder containing your .pdb files

# List all files in the local folder
def list_files(folder):
    for root, _, files in os.walk(folder):
        for file in files:
            yield os.path.join(root, file)


# Apache Beam Pipeline
with beam.Pipeline() as pipeline:
    (
        pipeline
        | "List Files" >> beam.Create(list_files(local_folder))  # Step 1: List all files
        | "Upload Files" >> beam.ParDo(UploadToGCS(bucket_name, destination_path))  # Step 2: Upload files
        | "Print Results" >> beam.Map(print)  # Step 3: Print results
    )
