In [None]:
from google.cloud import storage
from config_loader import ConfigLoader
import os

class CloudStorageValidator:
    def __init__(self, config_path="google_cloud.yml"):
        os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "google_cloud.json"

        self.config_loader = ConfigLoader(config_path)
        self.client = storage.Client()
        self.default_parameters = self.config_loader.get_default_parameters()
        self.buckets_config = self.config_loader.get_bucket_configs()

    def validate_or_create_buckets_and_tags(self):
        """Valida, cria e aplica configurações de buckets e tags."""
        for bucket_config in self.buckets_config:
            bucket_name = bucket_config["name"]
            bucket_options = bucket_config.get("options", {})
            folders = bucket_config.get("folders", [])
            bucket_tags = self._merge_tags(bucket_options.get("tags", {}), self.default_parameters.get("tags", {}))

            bucket = self._get_or_create_bucket(bucket_name, bucket_options)
            self._validate_and_apply_tags(bucket, bucket_tags)
            self._create_folders(bucket, folders)

    def _get_or_create_bucket(self, bucket_name, bucket_options):
        """Obtém o bucket se ele existir ou cria um novo com as configurações fornecidas."""
        try:
            bucket = self.client.get_bucket(bucket_name)
            print(f"Bucket '{bucket_name}' encontrado.")
        except Exception:
            print(f"Bucket '{bucket_name}' não encontrado. Criando...")
            bucket = self.client.bucket(bucket_name)

            # Configuração do bucket
            bucket.location = bucket_options.get("region", self.default_parameters.get("region"))
            bucket.storage_class = bucket_options.get("storage_class", self.default_parameters.get("storage_class"))
            bucket.versioning_enabled = bucket_options.get("versioning", self.default_parameters.get("versioning"))

            # Criação do bucket
            bucket = self.client.create_bucket(bucket)
            print(f"Bucket '{bucket_name}' criado com sucesso.")

        return bucket

    def _merge_tags(self, bucket_tags, default_tags):
        """Mescla as tags do bucket com as tags padrão."""
        merged_tags = default_tags.copy()
        merged_tags.update(bucket_tags)
        return merged_tags

    def _validate_and_apply_tags(self, bucket, tags):
        """Valida e aplica as tags no bucket."""
        existing_tags = bucket.labels

        if existing_tags != tags:
            bucket.labels = tags
            bucket.patch()  # Aplica as mudanças no bucket
            print(f"Tags atualizadas no bucket '{bucket.name}': {tags}")
        else:
            print(f"As tags no bucket '{bucket.name}' já estão atualizadas.")

    def _create_folders(self, bucket, folders):
        """Cria folders simulados no bucket."""
        for folder_name in folders:
            blob = bucket.blob(f"{folder_name}/")  # Simula uma pasta
            if not blob.exists():
                blob.upload_from_string("")  # Cria um blob vazio como marcador
                print(f"Folder '{folder_name}' criado em '{bucket.name}'.")
            else:
                print(f"Folder '{folder_name}' já existe em '{bucket.name}'.")

# Uso do validador
validator = CloudStorageValidator()
validator.validate_or_create_buckets_and_tags()

In [None]:
from google.cloud import bigquery
from config_loader import ConfigLoader
import os

class BigQueryManager:
    def __init__(self, config_path="google_cloud.yml"):
        os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "google_cloud.json"

        self.config_loader = ConfigLoader(config_path)
        self.client = bigquery.Client(project='challenge-kaggle-brmarket')
        self.default_parameters = self.config_loader.get_default_parameters()
        # Acessa bigquery no nível raiz
        self.bigquery_config = self.config_loader.config.get("bigquery", {})

    def setup_datasets(self):
        """Configura datasets no BigQuery com base nas configurações do YAML."""
        datasets_config = self.bigquery_config.get("datasets", [])
        for dataset_config in datasets_config:
            dataset_name = dataset_config["name"]
            dataset_options = dataset_config.get("options", {})
            dataset_tags = self._merge_tags(
                dataset_options.get("tags", {}),
                self.default_parameters.get("tags", {})
            )

            self._create_or_update_dataset(dataset_name, dataset_options, dataset_tags)

    def _create_or_update_dataset(self, dataset_name, dataset_options, dataset_tags):
        """Cria ou atualiza o dataset com as configurações fornecidas."""
        dataset_id = f"{self.client.project}.{dataset_name}"
        dataset = bigquery.Dataset(dataset_id)

        # Configuração do dataset
        dataset.location = dataset_options.get("region", self.default_parameters.get("region", "US"))
        dataset.description = dataset_options.get("description", "Dataset criado automaticamente.")
        dataset.labels = dataset_tags

        try:
            # Tenta criar o dataset
            dataset = self.client.create_dataset(dataset, exists_ok=True)
            print(f"Dataset '{dataset_id}' criado ou atualizado com sucesso.")
        except Exception as e:
            print(f"Erro ao criar ou atualizar o dataset '{dataset_id}': {e}")

    def _merge_tags(self, dataset_tags, default_tags):
        """Mescla as tags do dataset com as tags padrão."""
        merged_tags = default_tags.copy()
        merged_tags.update(dataset_tags)
        return merged_tags

# Uso do gerenciador de datasets
if __name__ == "__main__":
    manager = BigQueryManager()
    manager.setup_datasets()
