!!!**GO THROUGH TODO COMMENTS**!!

In [1]:
# Import to set env variable
import os
import sys
import re
from pathlib import Path
from shutil import rmtree
import json
import datetime

import pandas as pd

# Imports the Google Cloud client library
from google.cloud import storage
from google.cloud import bigquery

In [2]:
# Set up Google library authentication refer to markdown below
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/Users/leonsmith/.config/gcloud/application_default_credentials.json" # See readme to update

## Use in future
# str(Path('~').expanduser())

User credentials
When your code is running in a local development environment, such as a development workstation, the best option is to use credentials associated with your Google Account, also called user credentials.

To provide your user credentials to ADC, you use the Google Cloud CLI:

Install and initialize the gcloud CLI. <link https://cloud.google.com/sdk/docs/install>

Create your credential file:


gcloud auth application-default login
A login screen is displayed. After you log in, your credentials are stored in the local credential file used by ADC.

You can provide user credentials to ADC by running the gcloud auth application-default login command. This command places a JSON file containing the credentials you provide (usually from your own Google Account) in a well-known location on your file system. The location depends on your operating system:

Linux, macOS: $HOME/.config/gcloud/application_default_credentials.json
Windows: %APPDATA%\gcloud\application_default_credentials.json


In [3]:
# Set variables

PROJECT_ID = 'assetinsure-surety-data-models'
BUCKET = 'surety-data-models'
# TABLE_SPEC = f'{PROJECT_ID}:ls_panthers_test.panters-test-table-1'
FILES_FOLDER = 'input'
OUTPUT_FOLDER = 'output' # GCS_OUTPUT_FOLDER
TABLE_NAME = "book"
DATASET = "ls_panthers_test"

In [4]:
# Ensure we can see all packages in root of repo
repo_root_dir = str(Path(".").resolve().parent)
if repo_root_dir not in sys.path:
    sys.path.append(repo_root_dir)

sys.path[0]

'/Users/leonsmith/explore/fs/assetinsure/surity_pipeline/surity-pipeline-etl-api'

In [5]:
# Directories for local
base_dir = Path(sys.path[0])

temp_e_dir = base_dir / "temp_e"
temp_e_dir.mkdir(exist_ok=True, parents=True)

temp_l_dir = base_dir / "temp_l"
temp_l_dir.mkdir(exist_ok=True, parents=True)

logs_dir = base_dir / "logs"
logs_dir.mkdir(exist_ok=True, parents=True)

In [28]:
# Logging function
def pipeline_log(logs_dir, message, table_name):

    with open(f"{logs_dir}/{table_name}_logs.json", "a") as f:
        data = {
            "timestamp": str(datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')),
            "logs": message
        }
        
        json.dump(data, f)
        f.write("\n")

In [8]:
# Instantiates clients
# TODO: where to put in function or above
storage_client = storage.Client()
bigquery_client = bigquery.Client()



In [9]:
def download_all_blobs(bucket_name, destination_directory, source_blob_prefix="", file_pattern=".*\.xlsm"):
    """Downloads all blobs from the bucket and saves them with their original file names."""
    # Initialise the Google Cloud Storage client
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    
    # List all blobs in the bucket with the specified prefix
    blobs = list(bucket.list_blobs(prefix=source_blob_prefix))

    local_file_paths = []
    
    for blob in blobs:
        # Don't download subfolder items
        if blob.name.endswith("/"):
            # Skip directories
            continue
        
        if re.search(file_pattern, blob.name.split("/")[-1]):

            # Extract file name
            file_name = blob.name.split("/")[-1]
            
            # Construct the local file path by joining the destination directory and relative path
            local_file_path = os.path.join(destination_directory, file_name)
            
            # Download the blob to the destination with its original name
            blob.download_to_filename(local_file_path)
            
            print("Downloaded storage object {} from bucket {} to local file {}.".format(
                blob.name, bucket_name, local_file_path))

            local_file_paths.append(local_file_path)

    return local_file_paths
            

In [10]:
# Download the files from GS to local temp_e and store file paths
raw_file_paths = download_all_blobs(BUCKET, temp_e_dir, FILES_FOLDER)



Downloaded storage object input/Book.xlsm from bucket surety-data-models to local file /Users/leonsmith/explore/fs/assetinsure/surity_pipeline/surity-pipeline-etl-api/temp_e/Book.xlsm.
Downloaded storage object input/Panthers Financial Model Oct-22.xlsm from bucket surety-data-models to local file /Users/leonsmith/explore/fs/assetinsure/surity_pipeline/surity-pipeline-etl-api/temp_e/Panthers Financial Model Oct-22.xlsm.
Downloaded storage object input/not_book.xlsm from bucket surety-data-models to local file /Users/leonsmith/explore/fs/assetinsure/surity_pipeline/surity-pipeline-etl-api/temp_e/not_book.xlsm.


In [29]:
pipeline_log(logs_dir, f"Loading these files locally: {raw_file_paths}", TABLE_NAME)

In [13]:
def read_excel_files(file_path):
    try:
        # Attempt to read the Excel file
        df = pd.read_excel(file_path)
        return df
    except FileNotFoundError as e:
        # Log the filename and the error message
        print(f"Error: {e}. File not found: {file_path}")
        return None

In [14]:
def transformation(df):
    # Get company name
    # first_non_null_index = df[df.iloc[:, 0].notnull()].index[0]
    # company_name = df.iloc[first_non_null_index, 0]
    # # Print company name
    # print(company_name)

    # # Get the current date and time
    # current_time = datetime.datetime.now()
    # # Print the current time
    # print(current_time)

    # # Create a dictionary to represent the row data
    # row_data = {
    #     'ID': [10,11,12],  # Replace with the actual ID if available
    #     'CompanyName': [company_name,company_name,company_name],
    #     'Date': [current_time,current_time,current_time]
    #     }

    # # Create dataframe of dictionary
    # df = pd.DataFrame.from_dict(row_data, orient='index').T

    # new test
    df["timestamp"] = datetime.datetime.now()

    return df

In [15]:
def apply_transforms(paths):
    dfs_transformed = []
    
    for file_path in raw_file_paths:
        
        df_r = read_excel_files(file_path)
        
        # Check if the file was successfully loaded
        if df_r is not None:
            # Process the DataFrame here
            print(f"File '{file_path}' loaded successfully.")
            
            df_t = transformation(df_r)
            dfs_transformed.append(df_t)
        else:
            # Continue processing or take other actions
            print(f"Skipping '{file_path}' due to the file not found error.")
        
    df_comb_transf = pd.concat(dfs_transformed)
    
    return df_comb_transf

In [23]:
df_bq = apply_transforms(raw_file_paths)

File '/Users/leonsmith/explore/fs/assetinsure/surity_pipeline/surity-pipeline-etl-api/temp_e/Book.xlsm' loaded successfully.
Error: [Errno 2] No such file or directory: '/Users/leonsmith/explore/fs/assetinsure/surity_pipeline/surity-pipeline-etl-api/temp_e/Panthers Financial Model Oct-22.xlsm'. File not found: /Users/leonsmith/explore/fs/assetinsure/surity_pipeline/surity-pipeline-etl-api/temp_e/Panthers Financial Model Oct-22.xlsm
Skipping '/Users/leonsmith/explore/fs/assetinsure/surity_pipeline/surity-pipeline-etl-api/temp_e/Panthers Financial Model Oct-22.xlsm' due to the file not found error.
File '/Users/leonsmith/explore/fs/assetinsure/surity_pipeline/surity-pipeline-etl-api/temp_e/not_book.xlsm' loaded successfully.


In [24]:
df_bq

Unnamed: 0,number,name,date,value,boolean,timestamp
0,101,test,2023-08-02,200.45,False,2023-09-27 12:59:58.594947
1,102,Real Test,2023-01-05,3095452.85,False,2023-09-27 12:59:58.594947
2,103,Final,2021-06-09,287.11,True,2023-09-27 12:59:58.594947
0,303,prod,2020-04-22,82301.32,True,2023-09-27 12:59:58.605081
1,304,quick prod,2016-01-09,399075.0,True,2023-09-27 12:59:58.605081
2,305,yesterday,2018-11-29,223.6,True,2023-09-27 12:59:58.605081


In [30]:
pipeline_log(logs_dir, f"Transformed data to df with shape: {df_bq.shape}", TABLE_NAME)

In [25]:
def parquet_local(df, path):
    df.to_parquet(f"{path}/{TABLE_NAME}.parquet")

In [26]:
parquet_local(df_bq, temp_l_dir)

In [32]:
pipeline_log(logs_dir, f"DF to local: {temp_l_dir}/{TABLE_NAME}.parquet", TABLE_NAME)

In [33]:
def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"
    # The path to your file to upload
    # source_file_name = "local/path/to/file"
    # The ID of your GCS object
    # destination_blob_name = "storage-object-name"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    # Optional: set a generation-match precondition to avoid potential race conditions
    # and data corruptions. The request to upload is aborted if the object's
    # generation number does not match your precondition. For a destination
    # object that does not yet exist, set the if_generation_match precondition to 0.
    # If the destination object already exists in your bucket, set instead a
    # generation-match precondition using its generation number.
    
    # TODO: set the version in json config - to ovewrite pass None
    generation_match_precondition = None

    blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition)

    print(
        f"File {source_file_name} uploaded to {destination_blob_name}."
    )


In [35]:
upload_blob(BUCKET, f"{temp_l_dir}/{TABLE_NAME}.parquet", f"{OUTPUT_FOLDER}/{TABLE_NAME}.parquet")



File /Users/leonsmith/explore/fs/assetinsure/surity_pipeline/surity-pipeline-etl-api/temp_l/book.parquet uploaded to output/book.parquet.


In [36]:
pipeline_log(logs_dir, f"DF to GS: gs//{BUCKET}/{OUTPUT_FOLDER}/{TABLE_NAME}.parquet", TABLE_NAME)

In [37]:
def load_parquet_to_bigquery(project_id, bucket_name, output_folder, dataset_id, table_id):
    # Initialise BigQuery and Storage clients
    bigquery_client = bigquery.Client()
    storage_client = storage.Client()
    
    # Construct transformed parquet file source URI of Google Storage object 
    source_uri = f"gs://{bucket_name}/{output_folder}/{table_id}.parquet"
    
    # Construct BigQuery table reference
    table_id = f"{project_id}.{dataset_id}.{table_id}"
    
    # Load job config details
    job_config = bigquery.LoadJobConfig(
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, # Options: WRITE_APPEND, WRITE_TRUNCATE, WRITE_EMPTY
        source_format=bigquery.SourceFormat.PARQUET        
        
    )
    
    # Start the job to load from GS to BQ
    load_job = bigquery_client.load_table_from_uri(
        source_uri, table_id, job_config=job_config    
    )
    
    print(f"Load data from GS location: {source_uri} to BQ Table ID: {load_job.destination}")
    
    # Wait for job to complete
    load_job.result()
    
    # Print job status
    # TODO: set logging
    print(f"Job ID: {load_job.job_id}")
    print(f"Job State: {load_job.state}")
    print(f"Loaded {load_job.output_rows} rows into {dataset_id}.{table_id} from {source_uri}")
    

In [38]:
pipeline_job = load_parquet_to_bigquery(PROJECT_ID, BUCKET, OUTPUT_FOLDER, DATASET, TABLE_NAME)



Load data from GS location: gs://surety-data-models/output/book.parquet to BQ Table ID: assetinsure-surety-data-models.ls_panthers_test.book
Job ID: 4ad1ab7f-8ac7-4563-b7db-a8f5a2c68874
Job State: DONE
Loaded 6 rows into ls_panthers_test.assetinsure-surety-data-models.ls_panthers_test.book from gs://surety-data-models/output/book.parquet


In [39]:
pipeline_log(logs_dir, f"Wrote to BQ successfully: {PROJECT_ID}.{DATASET}.{TABLE_NAME}", TABLE_NAME)

In [None]:
# TODO: cleanup state include in script to check to continue to cleanup
pipeline_job.state

In [40]:
def cleanup(clean_directory):
    # List all files to delete
    files = [f for f in clean_directory.glob("*")]
    for f in files:
        print(f"Deleting: {f}")
    
    try:
        rmtree(clean_directory)
    
    except Exception as e:
        print(f"An error occurred: {str(e)}")
        

In [41]:
pipeline_log(logs_dir, f"PIPELINE RAN SUCCESSFULLY - DELETE TEMP FILES IN DIRS {temp_e_dir} and {temp_l_dir}: ", TABLE_NAME)

In [42]:
# List directories to clean
for path in [temp_e_dir, temp_l_dir]:
    cleanup(path)

Deleting: /Users/leonsmith/explore/fs/assetinsure/surity_pipeline/surity-pipeline-etl-api/temp_e/not_book.xlsm
Deleting: /Users/leonsmith/explore/fs/assetinsure/surity_pipeline/surity-pipeline-etl-api/temp_e/.DS_Store
Deleting: /Users/leonsmith/explore/fs/assetinsure/surity_pipeline/surity-pipeline-etl-api/temp_e/Book.xlsm
Deleting: /Users/leonsmith/explore/fs/assetinsure/surity_pipeline/surity-pipeline-etl-api/temp_l/book.parquet


In [43]:
pipeline_log(logs_dir, f"PIPELINE END", TABLE_NAME)