In [None]:
import os
import re

from typing import List

import google.cloud.client

from google.cloud import storage
dev_storage_client = storage.Client()

import ftplib

In [None]:
import pprint
pp = pprint.PrettyPrinter(indent=4)

# Overall "environment" variables

In [None]:
wget2_flags = '-b -c -nv -nH --report-speed=bytes --progress=dot'

In [None]:
ftp_site = 'ftp.1000genomes.ebi.ac.uk'

ftp_data_root = '/vol1/ftp/data_collections/HGSVC2/working'

In [None]:
gcs_bucket_name = "broad-dsde-methods-long-reads"
gcs_root_prefix = 'datasets/HGSVC2'

In [None]:
gcsfuse_local_dir = 'gs'
gcs_leading_prefix = f'gs://{gcs_bucket_name}/{gcs_root_prefix}'.rstrip('/') + '/'
gcsfuse_local_leading_prefix = f'{gcsfuse_local_dir}/{gcs_root_prefix}'.rstrip('/') + '/'

In [None]:
gcs_dirs = ['20190925_PUR_PacBio_HiFi',
            '20191005_YRI_PacBio_NA19240_HiFi',
            '20191031_CHS_PacBio_HG00512_HiFi',
            '20191031_CHS_PacBio_HG00513_HiFi',
            '20191205_YRI_PacBio_NA19238_HIFI',
            '20191205_YRI_PacBio_NA19239_HIFI',
            '20200108_PacBio_CLR_JAX',
            '20200203_PacBio_CLR_EEE',
            '20200212_PacBio_CLR_Devine',
            '20200722_PUR_PacBio_HG00732_HiFi',
            '20200731_CHS_PacBio_HG00514_CLR_reseq',
            '20200731_CHS_PacBio_HG00514_HiFi_reseq',
            '20210509_UW_HiFi',
            '2021_PacBio_HIFI_JAX'
            ]

In [None]:
known_data_directories_on_ftp = ['20190925_PUR_PacBio_HiFi/',
                                 '20191005_YRI_PacBio_NA19240_HiFi/',
                                 '20191031_CHS_PacBio_HG00512_HiFi/',
                                 '20191031_CHS_PacBio_HG00513_HiFi/',
                                 '20191205_YRI_PacBio_NA19238_HIFI/',
                                 '20191205_YRI_PacBio_NA19239_HIFI/',
                                 '20200108_PacBio_CLR_JAX/',
                                 '20200203_PacBio_CLR_EEE/',
                                 '20200212_PacBio_CLR_Devine/',
                                 '20200722_PUR_PacBio_HG00732_HiFi/',
                                 '20200731_CHS_PacBio_HG00514_CLR_reseq/',
                                 '20200731_CHS_PacBio_HG00514_HiFi_reseq/',
                                 '20210509_UW_HiFi/',
                                 '20210822_UW_EEE_ONT_UL/',
                                 '20210920_ONT_Rebasecalled/',
                                 '20211013_ONT_Rebasecalled/',
                                 '2021_ONT_UltraLong_JAX/',
                                 '2021_PacBio_HIFI_JAX/'
                                 ]

In [None]:
test_dir = gcs_dirs[0]

# Utilities for exploring FTP site

In [None]:
def traverse_ftp_data_dir(configured_ftp: ftplib.FTP, working_dir: str, depth: int=0):
    """
    Return a recursive listing of an ftp server contents, under the current directory.

    :param configured_ftp: ftplib.FTP object with the intended directory to explore
    :param working_dir: current directory being explored
    :param depth: depth into the origin dir
    :return: a recursive dictionary, where
                each key is the name of an entity (a file or a sub-directory), and
                its value is its size in bytes in case of a file, or
                its contents (a dict) in case of a sub-directory.
    """

    if depth > 10:
        raise ValueError("Tree is too deep, I give up.")
    level = {}
    contents = []
    configured_ftp.cwd(working_dir)
    configured_ftp.retrlines('LIST', callback=lambda s: contents.append(s.split()))
    for entry in contents:
        mode = entry[0]
        name = entry[-1]
        if mode.startswith('d'):  # it's a sub-dir (note this is a hack, check this out: https://bit.ly/31zQWLS)
            level[name] = traverse_ftp_data_dir(configured_ftp, name, depth+1)
            configured_ftp.cwd('..')
        else:  # it's a regular file
            level[name] = configured_ftp.size(name)
    return level

def flatten_ftp_dir_tree(nested_dict_tree: dict,
                         configured_ftp: ftplib.FTP,
                         data_dir: str):
    """
    Given the nested dir returned by `traverse_ftp_data_dir`, flatten it to a 1-D
    dict where the key is the path to a file, and value is its size in bytes.

    :param nested_dict_tree: the structure returned by `traverse_ftp_data_dir`
    :param configured_ftp: a ftplib.FTP object where the nested dir was generated on
    :param data_dir: the origin dir where `traverse_ftp_data_dir` was run on
    :return: A list where the entries are absolute paths to each individual file
    """
    flat_file_list = dict()
    for k, v in nested_dict_tree.items():
        if isinstance(v, dict):
            child = flatten_ftp_dir_tree(nested_dict_tree.get(k), configured_ftp = configured_ftp,
                                         data_dir = f'{data_dir}/{k}')
            flat_file_list.update(child)
        else:
            name = f'{configured_ftp.host}{ftp_data_root}/{data_dir}/{k}'
            flat_file_list[name] = v
    return flat_file_list

#### Test

In [None]:
with (ftplib.FTP(ftp_site)) as ftp_socket:
    ftp_socket.login()
    to_explore = ftp_data_root + '/' + test_dir
    print(f"Exploring {to_explore}")
    test_tree = traverse_ftp_data_dir(ftp_socket, working_dir=to_explore)
    test_flat = flatten_ftp_dir_tree(test_tree, ftp_socket, test_dir)
    ftp_socket.quit()

In [None]:
pp.pprint(test_tree)

In [None]:
test_flat

# Utilities for exploring GCS buckets and "folders"

In [None]:
def list_gcs_subdirectories(client: google.cloud.client.Client, bucket_name: str, prefix: str):
    """
    List sub-directories under 'gs://[bucket_name]/[prefix]/'
    Based on https://github.com/googleapis/google-cloud-python/issues/920#issuecomment-653823847
    :param client: GCS client
    :param bucket_name: name of the bucket
    :param prefix: the prefix, or the full path to the directory to list into. Note: don't include leading and ending slash.
    :return: A list of sub-directories under the prefix 'directory' in the provided bucket
    """

    local_prefix = prefix if prefix.endswith('/') else prefix + '/'
    iterator = client.list_blobs(bucket_name, prefix=local_prefix, delimiter='/')
    prefixes = set()
    for page in iterator.pages:
        prefixes.update(page.prefixes)

    return list([s.split('/')[-2] for s in prefixes])

In [None]:
def list_file_base_names(client: google.cloud.client.Client, bucket_name: str, prefix: str):
    """
    Give base name of files under 'gs://[bucket_name]/[prefix]/'
    :param client: GCS client
    :param bucket_name: name of the bucket
    :param prefix: the prefix, or the full path to the directory to list into. Note: don't include leading and ending slash.
    :return: base name of files under the prefix 'directory' in the provided bucket
    """

    full_path = [b.name for b in client.list_blobs(bucket_or_name=bucket_name, prefix=prefix)]
    return [re.sub(f'^{prefix}', '', path).lstrip('/') for path in full_path]

In [None]:
class GcsPath:
    """
    Modeling after GCS storage object, offering simplistic way of
        * checking if the paths exists, and if exists,
        * represents a file or
        * emulates a 'directory'.
    """

    def __init__(self, gs_path: str):

        if not gs_path.startswith("gs://"):
            raise ValueError(f"Provided gs path isn't valid: {gs_path}")

        arr = re.sub("^gs://", '', gs_path).split('/')
        self.bucket = arr[0]
        self.prefix = '/'.join(arr[1:-1])
        self.file = arr[-1]

    def exists(self, client: storage.client.Client) -> bool:
        return self.is_file(client=client) or self.is_emulate_dir(client=client)

    def is_file(self, client: storage.client.Client) -> bool:
        return storage.Blob(bucket=client.bucket(self.bucket), name=f'{self.prefix}/{self.file}').exists(client)

    def is_emulate_dir(self, client: storage.client.Client) -> bool:
        if self.is_file(client=client):
            return False
        return any(True for _ in client.list_blobs(client.bucket(self.bucket), prefix=f'{self.prefix}/{self.file}'))

    def size(self, client: storage.client.Client) -> int:
        blob = storage.Blob(bucket=client.bucket(self.bucket), name=f'{self.prefix}/{self.file}')
        if blob.exists(client=client):
            blob.reload()
            return blob.size
        else:
            return 0

# traverse each dir already on GCS, and check which ones are missing compared to its mirroring folder on FTP

In [None]:
def collect_files_to_transfer(gcs_dir_to_check: str, gcs_client: google.cloud.storage.Client,
                              ftp_server: str, ftp_data_root_dir: str,
                              gcs_bucket_name: str, gcs_root_prefix: str):

    """
    Given a 'folder' on GCS to check data consistency (against a FTP folder), collect which files
       1) have been successfully downloaded,
       2) have been only partially downloaded,
       3) have not been downloaded at all
    The way this is checked is by using file sizes (a compromise compared to MD5 check).
    :param gcs_dir_to_check: 'folder' on GCS to check
    :param gcs_client: GCS python API client
    :param ftp_server: FTP server address
    :param ftp_data_root_dir: root dir holding data, which is supposed to be mirrored by gcs_dir_to_check
    :param gcs_bucket_name: GCS bucket hosting the dir
    :param gcs_root_prefix: root prefix, i.e. 'parent folder' of gcs_dir_to_check, under the hosting bucket
    :return: a tuple-3 of (fully downloaded, incompletely downloaded, not yet downloaded) files
    """

    finished = list()
    incomplete_download = dict()
    not_yet_on_gcs = dict()
    with (ftplib.FTP(ftp_server)) as ftp_socket:
        ftp_socket.login()
        ftp_socket.cwd(ftp_data_root_dir)

        print(f"Traversing test data dir {ftp_data_root_dir}/{gcs_dir_to_check} on FTP server {ftp_server} ...")
        ftp_dir_tree = traverse_ftp_data_dir(ftp_socket, working_dir=gcs_dir_to_check)
        ftp_socket.quit()
        ftp_files_flat = flatten_ftp_dir_tree(ftp_dir_tree, ftp_socket, gcs_dir_to_check)
        print(f"Found {len(ftp_files_flat)} files under {gcs_dir_to_check}")

    print("Matching FTP files with GCS files...")

    for ff, sz in ftp_files_flat.items():
        gs_path = re.sub(f'^{ftp_server}{ftp_data_root}', f'gs://{gcs_bucket_name}/{gcs_root_prefix}', ff)
        to_check = GcsPath(gs_path)
        if to_check.exists(client=gcs_client):
            gcs_sz = to_check.size(client=gcs_client)
            if gcs_sz is None:
                raise ValueError(f"{gs_path} exist on GCS but doesn't have a size...")
            if 0 == gcs_sz:
                not_yet_on_gcs[ff] = gs_path
            elif sz > gcs_sz:
                incomplete_download[ff] = {'full': sz, 'has': gcs_sz, 'gcs_path': gs_path}
            else:
                finished.append(ff)
        else:
            not_yet_on_gcs[ff] = gs_path

    return finished, incomplete_download, not_yet_on_gcs


#### Test

In [None]:
ok, corrupt, jobs = collect_files_to_transfer(gcs_dir_to_check=test_dir, gcs_client=dev_storage_client,
                                              ftp_server=ftp_site, ftp_data_root_dir=ftp_data_root,
                                              gcs_bucket_name=gcs_bucket_name, gcs_root_prefix=gcs_root_prefix)

In [None]:
corrupt

#### Test one dir where there's sub-dirs

In [None]:
test_dir = '20200108_PacBio_CLR_JAX'
ok, corrupt, jobs = collect_files_to_transfer(gcs_dir_to_check=test_dir, gcs_client=dev_storage_client,
                                              ftp_server=ftp_site, ftp_data_root_dir=ftp_data_root,
                                              gcs_bucket_name=gcs_bucket_name, gcs_root_prefix=gcs_root_prefix)
corrupt

# Now real check

In [None]:
missing_files_per_dataset = dict()
for d in gcs_dirs:
    print("====================================================================================================")
    ok, corrupt, fresh = collect_files_to_transfer(gcs_dir_to_check=d, gcs_client=dev_storage_client,
                                                   ftp_server=ftp_site, ftp_data_root_dir=ftp_data_root,
                                                   gcs_bucket_name=gcs_bucket_name, gcs_root_prefix=gcs_root_prefix)
    missing_files_per_dataset[d] = {'corrupt': corrupt, 'fresh': fresh}
    print("====================================================================================================")

In [None]:
def keep_file(file_full_name: str) -> bool:
    """
    Models a file filter, heavily project-dependent.
    """

    keep = not file_full_name.endswith('.fastq.gz')

    is_meant_for_ccs = any(pat in file_full_name for pat in ['ccs', 'CCS', 'HiFi', 'HIFI'])
    is_pointing_to_non_ccs = any(pat in file_full_name for pat in ['subreads', 'scraps'])
    if is_meant_for_ccs and is_pointing_to_non_ccs:
        return False

    return keep

In [None]:
all_missing_files = {k:v['fresh'] for k,v in missing_files_per_dataset.items()}
all_missing_files = {k: {ik: iv for ik, iv in v.items() if keep_file(iv)} for k, v in all_missing_files.items()}
all_missing_files = {k: v for k, v in all_missing_files.items() if v}
print(f"{sum(len(v) for _, v in all_missing_files.items())} files in total to download, afresh.")
pp.pprint(all_missing_files)

In [None]:
all_corrupt_files = {k: {ik: iv['gcs_path'] for ik, iv in v['corrupt'].items()} for k,v in missing_files_per_dataset.items()}
all_corrupt_files = {k: {ik: iv for ik, iv in v.items() if keep_file(iv)} for k, v in all_corrupt_files.items()}
all_corrupt_files = {k: v for k, v in all_corrupt_files.items() if v}
print(f"{sum(len(v) for _, v in all_corrupt_files.items())} files in total to download, again.")
pp.pprint(all_corrupt_files)

#### print out file downloading commands

In [None]:
if 0 < len(all_missing_files):
    for k,v in all_missing_files.items():
        for ftp_path, gs_path in v.items():

            local_relative_path = re.sub(gcs_leading_prefix, '', gs_path)
            local_prefix = gcsfuse_local_leading_prefix + '/'.join(local_relative_path.split('/')[:-1]) + '/'
            log_file = '__'.join(ftp_path.split('/')[6:]) + ".wget-log"
            cmd = f'wget2 {wget2_flags} \\\n  -P {local_prefix} \\\n  -o {log_file} \\\n  {ftp_path}'
            print(cmd)

In [None]:
if 0 < len(all_corrupt_files):
    # # remove corrupt files from cloud first:
    # # !!!!!!!!!! MAKE SURE THEY ARE NOT IN THE PROCESS OF BEING DOWNLOADED !!!!!!!!!!
    # for k, v in all_corrupt_files.items():
    #     for _, gs_path in v.items():
    #         cmd = f'gsutil rm {gs_path}'
    #         os.system(cmd)
    for k,v in all_corrupt_files.items():
        for ftp_path, gs_path in v.items():

            local_relative_path = re.sub(gcs_leading_prefix, '', gs_path)
            local_prefix = gcsfuse_local_leading_prefix + '/'.join(local_relative_path.split('/')[:-1]) + '/'
            log_file = '__'.join(ftp_path.split('/')[6:]) + ".wget-log"
            cmd = f'wget2 {wget2_flags} \\\n  -P {local_prefix} \\\n  -o {log_file} \\\n  {ftp_path}'
            print(cmd)