In [1]:
import os
import argparse
from google.cloud import bigquery
from pathlib import Path
from google.cloud import storage
from jinja2 import Template
import urllib.parse

ModuleNotFoundError: No module named 'google'

In [None]:
#Read .sql file and generate query
def generate_query(input_file: Path, **replacements) -> str:
    with open(input_file, "r") as f:
        query_template = f.read()
    return Template(query_template).render(**replacements)

In [None]:
#Execute sql query
def execute_sql_query(sql_query, name, project_id):
    client = bigquery.Client(project=project_id)
    job_config = bigquery.QueryJobConfig()
    job_config.use_legacy_sql = False
    
    try:
        query_job = client.query(sql_query, job_config=job_config)
        query_job.result()
        print(f"Query executed successfully for {name}")
    except:
        print(f"Error executing the query on{name}:{str(e)}")

In [None]:
#Copy files from GCS bucket
def copy_files_from_gcs_bucket(bucket_name, source_directory, destination_directory):
    client = storage.Client()
    bucket = client.get_bucket(bucket_name)
    blobs = bucket.list_blobs(prefix=source_directory)
    os.makedirs(destination_directory, exist_ok=True)
    for blob in blobs:
        if blob.name.endswith(".sql"):
            source_blob_name = blob.name
            destination_file_name = os.path.join(destination_directory, os.path.basename(source_blob_name))
            blob.download_to_filename(destination_file_name)

In [None]:
def list_sql_files_in_directory(directory_path):
    sql_files = []
    for filename in os.listdir(directory_path):
        if filename.endswith(".sql"):
            sql_files.append(filename)
    return sql_files

In [None]:
if __name__ == "__main__":
    parser = argparse.ArumentParser(description='Process Args')
    parser.add_argument('--arg_Input_GCP_Project', type=str, help='Input GCP Project')
    parser.add_argument('--arg_Input_BQ_Dataset', type=str, help='Input BQ Dataset')
    parser.add_argument('--arg_Output_BQ_Dataset', type=str, help='Output BQ Dataset')
    parser.add_argument('--arg_querypath', type=str, help='Query Path')
    
    args = parser.parse_args()
    
    GCP_PROJECT_ID = args.arg_Input_GCP_Project
    OUTPUT_DATASET_ID = args.arg_Output_BQ_Dataset
    INPUT_DATASET_ID = args.arg_Input_BQ_Dataset
    
    gcs_query_folder = urllib.parse.unquote(args.arg_querypath)
    path_parts = gcs_query_folder.split('/')
    bucket_name = ''
    object_name = ''
    if len(path_parts) >= 4 and path_parts[0]=='gs:':
        bucket_name = path_parts[2]
        object_name = '/'.join(path_parts[3:])
    
    dest_dir = os.getcwd()+"/queries"
    copy_files_from_gcs_bucket(bucket_name, object_name, dest_dir)
    
    if not os.path.exists(bucket_name, object_name, dest_dir):
        print("SQL file does not exist")
    else:
        sql_files = list_sql_files_in_directory(dest_dir)
        
        for sql in sql_files:
            sql_name = dest_dir + "/" + sql
            sql_query = generate_query(sql_name, 
                                      destination_dataset=f"{GCP_PROJECT_ID}.{OUTPUT_DATASET_ID}",
                                      source_dataset=f"{GCP_PROJECT_ID}.{INPUT_DATASET_ID}")
            execute_sql_query(sql_query=sql_query,
                             name=Path(sql_name).stem,
                             project_id = GCP_PROJECT_ID)