In [1]:
import io
import os
import sys
import csv
import gzip
import time
import tarfile
import subprocess
import pandas as pd

from tqdm import tqdm
from datetime import datetime

In [None]:
#df = pd.concat([pd.read_csv("spectral_earth_table.csv"), pd.read_csv("spectral_earth_table_part_6+.csv")])
#df = df.drop(columns = ["Processing Time"])
#df = df.rename(columns = {"Filename": "filename", "Size": "size_in_megabytes", "Modified Time": "last_modified"})
#df = df.drop_duplicates(subset = ["filename"])
#df["dataset"] = df["filename"].apply(lambda x: x.split("/")[-3])
#df["filename"] = df["filename"].apply(lambda x: '/'.join(x.split("/")[-2:]))
#df["size_in_megabytes"] = df["size_in_megabytes"] / 1024**2
#df = df.sort_values(by = "filename").reset_index(drop = True)
#df.to_hdf("metadata.h5", mode = "w", key = "index", format = "fixed", complevel=9, complib="zlib")
#df.to_csv("spectral_earth.csv")

df = pd.read_hdf("metadata.h5", key = "index")

display(df.groupby("dataset").agg({"filename": "count", "size_in_megabytes": "sum"}))
cdl_df, corine_df, enmap_df, nlcd_df = [d.copy().drop(columns = "dataset").reset_index(drop = True) for _, d in df.groupby("dataset", sort = True)]


Unnamed: 0_level_0,filename,size_in_megabytes
dataset,Unnamed: 1_level_1,Unnamed: 2_level_1
cdl,2000,63.11417
corine,11000,347.2013
enmap,538927,3403156.0
nlcd,15000,473.3562


In [3]:
def get_extraction_set(df: pd.DataFrame, dataset: str) -> set:
    extraction_set = set()
    for filename in df["filename"]:
        extraction_set.add(f"spectral_earth/enmap/{filename}")
        extraction_set.add(f"spectral_earth/{dataset}/{filename}")
    return extraction_set

In [8]:
def get_archive_total_size(parts_list):
    """
    Calculate total size of all archive parts
    """
    total_size = 0
    for part in parts_list:
        try:
            size = os.path.getsize(part)
            total_size += size
        except OSError as e:
            print(f"Warning: Couldn't read size of part {i}: {e}")
    return total_size

def list_files_from_split_archive(parts_list, start_part, output_csv):
    """
    List files from a split tar.gz archive starting from a specific part,
    with progress tracking and CSV logging.
    
    Parameters:
    archive_prefix: Base name of the split archive (e.g., 'dataset.tar.gz.')
    start_part: Which part to start listing from (1-based indexing)
    total_parts: Total number of split parts
    output_csv: Path to output CSV file
    """
    # Get total size for progress bar
    total_size = int(3.57e12)
    
    # Command to concatenate ALL parts
    cat_command = ['cat', *parts_list]
    
    # Calculate target offset
    target_offset = (start_part - 1) * (300 * 1024 * 1024 * 1024)  # 300GB in bytes
    bytes_processed = 0
    started_listing = False
    start_time = time.time()
    
    # Setup CSV file
    with open(output_csv, 'w', newline='') as csvfile:
        csvwriter = csv.writer(csvfile)
        csvwriter.writerow(['Filename', 'Size', 'Modified Time', 'Processing Time'])
        
        try:
            # Open a pipe to read the concatenated data
            process = subprocess.Popen(cat_command, stdout=subprocess.PIPE)
            
            # Create a gzip decompressor
            decompressor = gzip.GzipFile(fileobj=process.stdout, mode='rb')
            
            # Create progress bar
            pbar = tqdm(total=total_size, unit='B', unit_scale=True)
            
            # Create a tar file object
            with tarfile.open(fileobj=decompressor, mode='r|') as tar:
                # Iterate through all members
                for member in tar:
                    current_time = time.time()
                    elapsed = current_time - start_time
                    
                    # Update progress
                    bytes_processed += member.size + 512  # Add header size
                    pbar.update(member.size + 512)
                    
                    # Start listing once we've passed our target offset
                    if bytes_processed >= target_offset:
                        started_listing = True
                    
                    if started_listing and member.isfile():
                        # Log to console
                        # print(f"\nFile: {member.name}")
                        # print(f"Size: {member.size:,} bytes")
                        # print(f"Modified: {datetime.fromtimestamp(member.mtime)}")
                        
                        # Log to CSV
                        csvwriter.writerow([
                            member.name,
                            member.size,
                            datetime.fromtimestamp(member.mtime).isoformat(),
                            f"{elapsed:.2f}"
                        ])
                        csvfile.flush()  # Ensure immediate write
                        
            pbar.close()
                    
        except Exception as e:
            print(f"Error: {str(e)}")
        finally:
            # Clean up
            process.stdout.close()
            process.wait()
            
        # Print summary
        end_time = time.time()
        print(f"\nTotal processing time: {end_time - start_time:.2f} seconds")

def estimate_offset(part_size_gb, start_part):
    """
    Print estimated offset information
    """
    offset_gb = (start_part - 1) * part_size_gb
    print(f"Will process full archive but only list files after {offset_gb}GB offset")

def extract_files_from_split_archive(parts_list: list, subset_to_extract: set):
    """
    Extract specific files from a split tar.gz archive.
    
    Parameters:
    archive_prefix: Base name of the split archive (e.g., 'dataset.tar.gz.')
    total_parts: Total number of split parts
    output_dir: Directory to extract files to
    file_pattern: Regex pattern to match files to extract (optional)
    max_files: Maximum number of files to extract (optional)
    """
    # Get total size for progress bar
    # total_size = get_archive_total_size(parts_list)    
    total_size = int(3.57e12)

    # Command to concatenate ALL parts
    cat_command = ['cat', *parts_list]
    
    # Ensure output directory exists
    #output_dir = "/home/sambhav/datasets/spectral_earth/imagefolder"
    output_dir = "/run/media/sambhav/StorageHDD_2/datasets/spectral_earth/imagefolder"
    os.makedirs(output_dir, exist_ok=True)
    
    # Setup counters and tracking
    bytes_processed = 0
    files_extracted = 0
    start_time = time.time()
    
    # Setup logging
    log_file = os.path.join(output_dir, 'extraction_log.csv')
    with open(log_file, 'w', newline='') as csvfile:
        csvwriter = csv.writer(csvfile)
        csvwriter.writerow(['Filename', 'Size', 'Modified Time', 'Extraction Time', 'Status'])
        
        try:
            # Open a pipe to read the concatenated data
            process = subprocess.Popen(cat_command, stdout=subprocess.PIPE)
            
            # Create a gzip decompressor
            decompressor = gzip.GzipFile(fileobj=process.stdout, mode='rb')
            
            # Create progress bar
            pbar = tqdm(total=total_size, unit='B', unit_scale=True)
            
            # Create a tar file object
            with tarfile.open(fileobj=decompressor, mode='r|') as tar:
                for member in tar:
                    current_time = time.time()
                    elapsed = current_time - start_time
                    
                    # Update progress
                    bytes_processed += member.size + 512  # Add header size
                    pbar.update(member.size + 512)
                    
                    if member.isfile() and member.name in subset_to_extract:
                        try:
                            # Extract the file
                            tar.extract(member, output_dir)
                            files_extracted += 1
                            
                            # Log success
                            csvwriter.writerow([
                                member.name,
                                member.size,
                                datetime.fromtimestamp(member.mtime).isoformat(),
                                f"{elapsed:.2f}",
                                "Success"
                            ])
                            
                            # Print progress
                            # print(f"\nExtracted: {member.name}, Size: {member.size:,} bytes")
                            
                        except Exception as e:
                            # Log failure
                            csvwriter.writerow([
                                member.name,
                                member.size,
                                datetime.fromtimestamp(member.mtime).isoformat(),
                                f"{elapsed:.2f}",
                                f"Failed: {str(e)}"
                            ])
                            print(f"\nError extracting {member.name}: {str(e)}")
                            
                    csvfile.flush()  # Ensure immediate write
                        
            pbar.close()
                    
        except Exception as e:
            print(f"Error: {str(e)}")
            print(f"Bytes processed when error occurred: {bytes_processed:,}")
        finally:
            # Clean up
            process.stdout.close()
            process.wait()
            
        # Print summary
        end_time = time.time()
        print(f"\nExtraction complete:")
        print(f"Total files extracted: {files_extracted}")
        print(f"Total processing time: {end_time - start_time:.2f} seconds")
        print(f"Extraction log saved to: {log_file}")

In [3]:
class MultiFileReader:
    """Reads multiple files sequentially as a single stream while ensuring valid tar alignment."""
    def __init__(self, filenames):
        self.filenames = filenames
        self.file_iter = iter(self.filenames)
        self.current_file = None
        self._open_next_file()

    def _open_next_file(self):
        """Open the next available file in sequence."""
        if self.current_file:
            self.current_file.close()
        try:
            next_file = next(self.file_iter)
            print(f"Opening archive part: {next_file}")
            self.current_file = open(next_file, "rb")
        except StopIteration:
            self.current_file = None

    def read(self, size=-1):
        """Read bytes from the current file, switching to the next file when needed."""
        if not self.current_file:
            return b""  # No more data
        
        data = self.current_file.read(size)
        if not data:  # End of current file, move to next
            self._open_next_file()
            return self.read(size)  # Recursive call to continue reading
        return data

    def close(self):
        """Close the last opened file."""
        if self.current_file:
            print("Closing final file.")
            self.current_file.close()

def list_tar_files(part_files, output_file="file_list.txt", start_index=0):
    """
    Lists the contents of a split .tar archive without loading the entire archive into memory,
    ensuring alignment with a valid tar header.

    :param part_files: List of tar parts in correct order.
    :param output_file: Path to save the file listing.
    :param start_index: The index of the first file to include in the listing (0-based).
    """
    if start_index >= len(part_files):
        print("Error: Start index is out of range.")
        return
    
    print(f"Processing archives starting from index {start_index} ({part_files[start_index]} onwards)...")
    
    # Open the output file
    with open(output_file, "w") as out_file:
        print(f"Writing file list to {output_file}")
        
        # Open all parts as a continuous stream, ensuring we start at a valid tar header
        reader = MultiFileReader(part_files)

        # TODO: Try getting the member list using tar.getmembers() ?

        with tarfile.open(fileobj=reader, mode="r|*") as tar:
            skipped = 0
            for member in tar:
                if skipped < start_index:
                    skipped += 1
                    continue  # Skip until we reach the desired start index
                
                # Log metadata
                archive_part = part_files[min(skipped, len(part_files) - 1)]
                file_info = f"{member.name}, Size: {member.size} bytes, Archive Part: {archive_part}\n"
                out_file.write(file_info)
                # print(f"Found file: {file_info.strip()}")  # Log output
    
    print("File listing completed.")

def extract_file_from_tar(part_files, target_file, output_dir="."):
    """
    Extracts a specific file from a split tar archive.

    :param part_files: List of tar parts in correct order.
    :param target_file: The specific file to extract.
    :param output_dir: Directory to extract the file into.
    """
    print(f"Extracting {target_file} from archive...")
    
    # Open the multi-part tar archive as a continuous stream
    reader = MultiFileReader(part_files)
    with tarfile.open(fileobj=reader, mode="r|*") as tar:
        for member in tar:
            if member.name == target_file:
                print(f"Extracting: {member.name} -> {output_dir}")
                tar.extract(member, path=output_dir)
                print("Extraction complete.")
                return
    
    print("File not found in archive.")

In [9]:
part_files = [
    "/run/media/sambhav/StorageSSD_1/datasets/spectral_earth/spectral_earth_part_1.tar.gz",
    "/run/media/sambhav/StorageSSD_1/datasets/spectral_earth/spectral_earth_part_2.tar.gz",
    "/run/media/sambhav/StorageHDD_2/datasets/spectral_earth/spectral_earth_part_3.tar.gz",
    "/run/media/sambhav/StorageHDD_2/datasets/spectral_earth/spectral_earth_part_4.tar.gz",
    "/run/media/sambhav/StorageHDD_1/datasets/spectral_earth/spectral_earth_part_5.tar.gz",
    "/run/media/sambhav/StorageHDD_1/datasets/spectral_earth/spectral_earth_part_6.tar.gz",
    "/home/sambhav/datasets/spectral_earth/spectral_earth_part_7.tar.gz",
    "/home/sambhav/datasets/spectral_earth/spectral_earth_part_8.tar.gz",
    "/run/media/sambhav/StorageSSD_1/datasets/spectral_earth/spectral_earth_part_9.tar.gz",
]
extract_files_from_split_archive(part_files, get_extraction_set(cdl_df, "cdl").union(get_extraction_set(nlcd_df, "nlcd")).union(get_extraction_set(corine_df, "corine")))
#list_tar_files(part_files, "spectral_earth_files_part_9", 8)
#list_files_from_split_archive(part_files, 6, "spectral_earth_table_part_6+.csv")

100%|█████████▉| 3.57T/3.57T [8:26:52<00:00, 117MB/s] 


Extraction complete:
Total files extracted: 55419
Total processing time: 30412.64 seconds
Extraction log saved to: /run/media/sambhav/StorageHDD_2/datasets/spectral_earth/imagefolder/extraction_log.csv



