In [96]:
import subprocess
import os
from google.cloud import storage
from google.cloud import bigquery
import pandas as pd
import re
import datetime

In [97]:
file = "/home/jupyter/gitlab/2023-pols-na-dna-spring-cleaning/files/archive.csv"
logfile = "/home/jupyter/gitlab/2023-pols-na-dna-spring-cleaning/logs/archive.log"

In [98]:
df = pd.read_csv(file)
#df

In [99]:
storage_client = storage.Client(project="polsbigquery")
bigquery_client = bigquery.Client(project="polsbigquery")

In [117]:
def check_bucket_exists(bucket):
    """
    NAME: check_bucket_exists
    
    DESCRIPTION: Checks if the bucket exists
    
    PARAMETERS:
        bucket(str): name of the bucket
        
    RETURNS
        True/False
    """
    
    try:
        bucket = storage_client.get_bucket(bucket)
        return True
    except:
        return False

def upload_to_bucket(bucket, project, dataset, table):
    
    """
    NAME: upload_to_bucket
    
    DESCRIPTION: Uploads the file to the bucket and sets the storage class
    
    PARAMETERS:
        bucket(str): name of the bucket
        project(str): name of the project
        dataset(str): name of the dataset
        table(str): name of the table
        
    RETURNS:
        msg(str): message to be sent to logfile with action time
    """
    
    #STORAGE_CLASS = 'NEARLINE'
    #STORAGE_CLASS = 'COLDLINE'
    #STORAGE_CLASS = 'ARCHIVE'
    STORAGE_CLASS = 'STANDARD'
    
    #Setting variables
    filename = f"""{bucket}/{dataset}/{re.sub('[.]', '_', table)}.csv"""
    filepath = f"gs://{filename}"
    table_name = f"{project}.{dataset}.{table}"
    blobname= dataset+"/"+re.sub('[.]', '_', table)+'.csv'
    
    print(f"Uploading {table_name} to {filepath} with storage class: {STORAGE_CLASS}.")
    
    # Getting table reference
    table_ref = bigquery_client.get_table(table_name)
    
    # Creating and excuting the Extract Table job
    job_config = bigquery.ExtractJobConfig()
    extract_job = bigquery_client.extract_table(
        table_ref,
        filepath,
        job_config=job_config
    )
    extract_job.result()
    
    #Getting the blob reference after extracting
    bucket = storage_client.get_bucket(bucket)
    blob = bucket.blob(blobname)
    
    #Setting the storage class
    blob.update_storage_class(STORAGE_CLASS)
    
    #Creating message
    msg = f"""
New Location: {filepath} 
Storage Class: {STORAGE_CLASS}
Transfer Time: {datetime.datetime.now().isoformat()}\n"""
    
    return msg    
    
def capture_log_data(table_name):   
    
    """
    NAME: capture_log_data
    
    DESCRIPTION: Captures log data from the table instance before deletion
    
    PARAMETERS:
        table_name(str): name of the table instance
        
    RETURNS
        msg(str): message to be sent to logfile with table metadata
    """
    
    # Capture table metadata
    table_ref = bigquery_client.get_table(table_name)
    
    # setting variables
    table_id = table_ref.table_id
    path = table_ref.path
    num_rows = table_ref.num_rows
    num_bytes = table_ref.num_bytes
    modified = table_ref.modified
    created = table_ref.created
    schema = table_ref.schema
    
    # making messsage
    msg = f"""
-----------------------------------------------------------------------------------------------------------------------------------------------
table_id: {table_id}
path: {path}
num_rows: {num_rows}
num_bytes: {num_bytes}
modified: {modified}
created: {created}
schema: {schema}
"""
    
    return msg
    
def delete_table(table_name):
    """
    NAME: delete_table
    
    DESCRIPTION: Deletes the table
    
    PARAMETERS:
        table_name(str): name of the table
        
    RETURNS:
        msg(str): message to be sent to logfile with action time
    """
    
    # Getting table reference
    table_ref = bigquery_client.get_table(table_name)
    
    # Deleting the table
    bigquery_client.delete_table(table_ref)
    
    #Creating message
    msg = f"""Table Action: Deleted {table_name} at {datetime.datetime.now().isoformat()}."""
    
    return msg

def write_to_log_file(file, msg):
    """
    NAME: write_to_log_file
    
    DESCRIPTION: Writes message to the log file
    
    PARAMETERS:
        file(str): name of the log file
        msg(str): message to be sent to logfile
    """
    
    # append to log file
    
    with open(file, 'a') as f:
        f.write(msg)

In [119]:
for r in range(0,len(df)):
    
    project = df['project'][r]
    dataset = df['schema'][r]
    table = df['table'][r]
    action = df['action'][r]
    
    if action == 'archive':
    
        bucket = "edwa_global_archive"
        table_name = project+"."+dataset+"."+table

        # check if cloud storage bucket exists.  If not, create it.
        if check_bucket_exists(bucket):
            #print(bucket)
            pass
        else:
            print(f"Bucket does not exist, creating...{bucket}")
            storage_client.create_bucket(bucket)

        # Copy table to cloud storage bucket, place in long term storage
        upload_msg = upload_to_bucket(bucket, project, dataset, table)
        
        # Capture log data
        metadata_msg = capture_log_data(table_name)
        
        # Delete table
        delete_msg = delete_table(table_name)
        
        # Cobble together messages
        msg = metadata_msg + upload_msg + delete_msg
        
        # Send message to log file
        write_to_log_file(logfile, msg)
        
    else:
        continue
        
        


Bucket does not exist, creating...edwa_global_archive
Uploading polsbigquery.skunkworks.test_table_1 to gs://edwa_global_archive/skunkworks/test_table_1.csv with storage class: STANDARD.
Uploading polsbigquery.skunkworks.test_table_2 to gs://edwa_global_archive/skunkworks/test_table_2.csv with storage class: STANDARD.
Uploading polsbigquery.skunkworks.test_table_3 to gs://edwa_global_archive/skunkworks/test_table_3.csv with storage class: STANDARD.
