In [1]:
import io
import os
import shutil
import tarfile
import threading

from concurrent.futures import ThreadPoolExecutor
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload
from google.cloud import storage
from google.oauth2 import service_account
from queue import Queue

In [2]:
def download_from_drive(file_id, destination_file_name):
    credentials = service_account.Credentials.from_service_account_file(
        'credentials.json',
        scopes=['https://www.googleapis.com/auth/drive.readonly']
    )

    drive_service = build('drive', 'v3', credentials=credentials)

                          
    request = drive_service.files().get_media(fileId=file_id)
    file_stream = io.FileIO(destination_file_name, 'wb')
    downloader = MediaIoBaseDownload(file_stream, request)

    done = False
    while not done:
        status, done = downloader.next_chunk()

In [None]:
def upload_to_gcs(bucket_name, local_file, destination_file):
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)

    blob = bucket.blob(destination_file)
    blob.upload_from_filename(local_file)

In [9]:
def extract_and_upload_tar_bz2(source_file, destination_folder, bucket_name):
    # If the destination folder exists, remove it
    if os.path.exists(destination_folder):
        shutil.rmtree(destination_folder)

    # Create the destination folder
    os.makedirs(destination_folder)

    # Create a queue to hold files to be uploaded
    upload_queue = Queue(maxsize=10)

    # Create a ThreadPoolExecutor for uploading
    with ThreadPoolExecutor(max_workers=5) as upload_executor:
        # Start a separate thread for extraction
        def extract_files():
            with tarfile.open(source_file, 'r:bz2') as tar:
                for member in tar:
                    if member.isfile():
                        # extract member (file) with its full hierarchical name
                        tar.extract(member, path=destination_folder)
                        local_file = os.path.join(destination_folder, member.name)

                        # Put the file in the queue for uploading
                        upload_queue.put((local_file, "images/" + member.name))

            # Put a sentinel in the queue to signal the end of extraction
            upload_queue.put(None)

        extract_thread = threading.Thread(target=extract_files)
        extract_thread.start()

        # Function for workers to upload files
        def upload_worker():
            while True:
                job = upload_queue.get()
                if job is None:
                    # If the job is the sentinel, end the worker
                    break
                local_file, destination_file = job
                upload_to_gcs(bucket_name, local_file, destination_file)
                os.remove(local_file)

        # Start the upload workers
        for _ in range(5):
            upload_executor.submit(upload_worker)

        # Wait for extraction to finish, then put sentinels in the queue to end the upload workers
        extract_thread.join()
        for _ in range(5):
            upload_queue.put(None)

In [10]:
# Provide the Google Drive file ID and destination file name for download
file_id = '1cjY6HsHaSZuLVHywIxD5xQqng33J5S2b'
destination_file_name = 'downloaded_data.tar.bz2'

# Provide the GCS bucket name and destination folder path
bucket_name = 'fake-news-data'
destination_folder = 'fakeddit'

# Specify the folder path where the extracted files will be saved
extracted_folder = 'extracted_files'

In [None]:
download_from_drive(file_id, destination_file_name)
extract_and_upload_tar_bz2(destination_file_name, extracted_folder, bucket_name)