In [12]:
# pip install jupyter_scheduler

### Data Ingestion Process Report

#### Objective
This outlines the data ingestion process designed for a data engineering pipeline that collects raw datasets from multiple source directories and ingests them into Google Cloud Storage (GCS). The ingestion process prepares data for subsequent data processing tasks in LLM MODEL BUILDING, including dataset partitioning for training, validation, and testing.

#### Process Overview
The data ingestion script automates the collection, validation, and storage of raw datasets from specified local directories into a GCS bucket. This process serves as the foundational step in data engineering, ensuring that data is readily accessible and structured within the cloud environment for further analysis and machine learning tasks.

The ingestion process encompasses the following key steps:

1. **Configuration and Authentication**  
   - The ingestion pipeline begins by establishing a connection to GCS using a service account credential. This setup provides secure and authorized access to the cloud storage, allowing files to be uploaded without interruption.
   - The service account JSON file (`gc_key.json`) is loaded, and a `storage.Client` is initialized to access the specified GCS bucket.

2. **Directory and File Validation**  
   - Each source directory and its corresponding blob prefix (GCS folder path) are paired. For each directory:
     - The script verifies the directory's existence.
     - It then lists all files in the directory and confirms whether files are present. This validation ensures that directories are not processed if they are empty, improving efficiency.
   - A message is logged if no files are found in a directory, and the script proceeds to the next directory.

3. **File Upload and Cloud Storage Organization**  
   - For each file in a valid directory, a blob path is created within the GCS bucket using the specified prefix. This organizes the files in GCS, creating a well-structured cloud storage system.
   - Each file is uploaded to its designated location in GCS. The script logs each successful upload, providing traceability and transparency in data ingestion.

4. **File Deletion for Storage Management**  
   - After each successful upload, the local file is deleted. This step helps manage storage on the local machine, retaining only the files that have not been uploaded. It also prevents duplication and ensures that local storage is freed up for future data ingestion tasks.
   - Deletion is performed only if the upload is successful, adding a level of robustness to prevent accidental data loss.

#### Logging and Error Handling
The ingestion process incorporates comprehensive logging and error handling to maintain a smooth pipeline:
- **Logging**: Each key step is logged, including successful uploads, file deletions, skipped directories, and any errors encountered.
- **Error Handling**: Exceptions are caught and logged, especially during cloud client initialization and file uploads. This ensures that the process continues smoothly without stopping due to minor issues.


#### Conclusion
This data ingestion process efficiently collects, validates, and ingests raw datasets from local storage to cloud storage. With its built-in logging, error handling, and file management, this pipeline sets the foundation for a well-organized, scalable data engineering environment. 

In [13]:
!pip install google-cloud-storage


Defaulting to user installation because normal site-packages is not writeable


In [14]:
# healthcarechatbot-451921-d908cb05ee58.json

In [15]:
import os
import logging
from google.cloud import storage
from google.oauth2 import service_account
from datetime import datetime


In [16]:
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()

In [17]:
def upload_files_to_gcs_multiple_sources(bucket_name, local_directories, blob_prefixes):
    """
    Ingests data by uploading files from multiple local directories to respective locations in a GCS bucket.
    Includes error handling, logging, and file validations as part of a data engineering pipeline.

    Parameters:
    - bucket_name (str): The name of the GCS bucket.
    - local_directories (list): List of local directories containing files to upload.
    - blob_prefixes (list): List of corresponding GCS blob prefixes for each local directory.
    """

    # Path to your service account key file
    service_account_path = 'healthcarechatbot-451921-d908cb05ee58.json'  # Replace with your service account JSON path

    # Create a credentials object and initialize the GCS client
    try:
        credentials = service_account.Credentials.from_service_account_file(service_account_path)
        storage_client = storage.Client(credentials=credentials)
        bucket = storage_client.bucket(bucket_name)
        logger.info("Google Cloud Storage client initialized.")
    except Exception as e:
        logger.error("Failed to initialize Google Cloud Storage client.", exc_info=True)
        return

    # Check if lists have the same length
    if len(local_directories) != len(blob_prefixes):
        logger.error("The number of local directories and blob prefixes must match.")
        return

    # Process each local directory and corresponding blob prefix
    for local_directory, blob_prefix in zip(local_directories, blob_prefixes):
        logger.info(f"Processing files from '{local_directory}' to GCS prefix '{blob_prefix}'...")

        # Validate if the directory exists and contains files
        if not os.path.isdir(local_directory):
            logger.warning(f"Directory '{local_directory}' does not exist. Skipping.")
            continue

        # List files in the directory
        files = [f for f in os.listdir(local_directory) if os.path.isfile(os.path.join(local_directory, f))]
        if not files:
            logger.info(f"No files found in '{local_directory}'. Skipping to next directory.")
            continue

        # Process each file in the directory
        for file_name in files:
            local_file_path = os.path.join(local_directory, file_name)
            blob_path = os.path.join(blob_prefix, file_name)
            blob = bucket.blob(blob_path)

            try:
                # Attempt to upload the file
                blob.upload_from_filename(local_file_path)
                logger.info(f"Uploaded '{file_name}' from '{local_directory}' to 'gs://{bucket_name}/{blob_path}'")

                # Optionally, delete the local file after upload (commented out by default)
                os.remove(local_file_path)
                
                logger.info(f"Deleted local file '{local_file_path}' after upload.")
            except Exception as e:
                logger.error(f"Failed to upload '{file_name}' from '{local_directory}'", exc_info=True)

    logger.info("Ingestion process completed.")


In [18]:




# Use the function with necessary parameters
bucket_name = 'raw_dataset_genai'  # Replace with your GCS bucket name
local_directories = [
    'clinical_records',  # Replace with actual folder paths
    'Journals',
    'Research_Papers',
    'Textbooks'
]
blob_prefixes = [
    'rag-healthcare-chatbot/medicaldata/clinical_records',  # Replace with corresponding GCS folder paths (or prefixes)
    'rag-healthcare-chatbot/medicaldata/Journals',
    'rag-healthcare-chatbot/medicaldata/Research_Papers',
    'rag-healthcare-chatbot/medicaldata/Textbooks'
]



In [19]:
# Run the ingestion function
upload_files_to_gcs_multiple_sources(bucket_name, local_directories, blob_prefixes)


2025-03-01 15:40:07,218 - INFO - Google Cloud Storage client initialized.
2025-03-01 15:40:07,219 - INFO - Processing files from 'Desktop/rag_docs/clinical_records' to GCS prefix 'rag-healthcare-chatbot/medicaldata/clinical_records'...
2025-03-01 15:40:07,221 - INFO - Processing files from 'Desktop/rag_docs/Journals' to GCS prefix 'rag-healthcare-chatbot/medicaldata/Journals'...
2025-03-01 15:40:07,224 - INFO - Processing files from 'Desktop/rag_docs/Research_Papers' to GCS prefix 'rag-healthcare-chatbot/medicaldata/Research_Papers'...
2025-03-01 15:40:07,226 - INFO - Processing files from 'Desktop/rag_docs/Textbooks' to GCS prefix 'rag-healthcare-chatbot/medicaldata/Textbooks'...
2025-03-01 15:40:07,228 - INFO - Ingestion process completed.
