![Wise Regulatory Data Processor](data/wise_header.png)

# Wise Regulatory Data Processor

## Overview:
This notebook is designed to automate the extraction, transformation, and loading (ETL) of regulatory data for Wise, using input from an XLS file. It processes transaction data to meet regulatory reporting requirements for different entities (UK and US), including cross-currency and same-currency volumes. The processed data is uploaded to Google Cloud Storage and subsequently loaded into BigQuery for further analysis and reporting.

## Objectives:
- Parse the provided XLS file containing transactional data.
- Aggregate and transform the data to meet regulatory requirements for UK and US entities.
- Upload the transformed data to Google Cloud Storage.
- Load the data into BigQuery for further analysis and regulatory reporting.

## Key Steps:
1. Read and process the XLS file with multiple sheets.
2. Transform the data to meet the reporting requirements (R1 & R2).
3. Upload the data to Google Cloud Storage.
4. Load the transformed data into BigQuery.



## Table of Contents <a id='section_0'></a>

* [1. Prepare work environment](#section_1)
    * [1.1. Library loading](#section_1.1)  
    * [1.2. Configuration and Storage Setup](#section_1.2)
    * [1.2. load the data](#section_1.2)

* [2. Load the data](#section_2)

* [3. Data Cleaning](#section_3)

* [4. Basic QA Checklist](#section_4)

* [5. Save DataFrames](#section_5)



## 1. Prepare work environment <a id='section_1'></a>

* [Table of Contents](#section_0)

The work environment is set up by importing the libraries, creating the folders that will be used,
setting the permissions and referencing those folders so that it is easy to access.

### 1.1. Library loading <a id='section_1.1'></a>
* [Table of Contents](#section_0)

In [1]:
pip install openpyxl

Note: you may need to restart the kernel to use updated packages.


In [2]:
# Python standard libraries
import os
from datetime import datetime

# Third-party libraries
import pandas as pd
from google.cloud import storage
from google.cloud import bigquery

import logging
import datetime
from typing import Tuple, Optional
from dataclasses import dataclass
import openpyxl
from openpyxl.styles import Font, Border, Side, Alignment

### 1.2. Configuration and Storage Setup <a id='section_1.2'></a>
* [Table of Contents](#section_0)

In [3]:
# Configure Google Cloud Storage client
storage_client = storage.Client()

# Define Google Cloud Variables
bucket_name = "wise_csb" 
project_id = "wiseentitydataflow"
dataset_id = "wise_dataset" 
table_id_customer = "customers" 
table_id_transactions = "transactions" 

destination_blob_name_customer = "customer_data.csv"
destination_blob_name_transactions = "transactions_data.csv"

## 2. Load the data <a id='section_2'></a>

* [Table of Contents](#section_0)

This code loads data from an Excel file ('Dummy Data.xlsx') containing Customer and Transaction sheets. It processes each sheet into separate DataFrames, stores them in a dictionary with cleaned sheet names as keys, and then creates specific DataFrames for customer and transaction data for further analysis.

In [4]:
import pandas as pd

# Excel file path (using a variable for easier modification)
excel_file_path = "data/Dummy Data.xlsx"

def load_data(excel_file_path):
    """
    Loads data from an Excel file containing Customer and Transaction sheets.

    Args:
      excel_file_path: Path to the Excel file.

    Returns:
      A tuple containing two pandas DataFrames: customer_df and transactions_df.
    """
    try:
        # Read the Excel file and load the sheets into DataFrames
        excel_file = pd.ExcelFile(excel_file_path)

        # Create a dictionary to store the DataFrames
        sheets = {}

        for sheet_name in excel_file.sheet_names:
            df = excel_file.parse(sheet_name, header=1)  # Read with header
            # Clean the sheet name of whitespace
            cleaned_sheet_name = sheet_name.strip()
            sheets[cleaned_sheet_name] = df

        # Access the DataFrame of the sheet 'Customer'
        customer_df = sheets['Customer']

        # Access the DataFrame of the sheet 'Transactions'
        transactions_df = sheets['Transactions']

        print("DataFrames 'customer_df' and 'transactions_df' created successfully.")

        print("\nFirst 2 rows of 'customer_df':\n", customer_df.head(2))
        print("\nFirst 2 rows of 'transactions_df':\n", transactions_df.head(2))

        return customer_df, transactions_df

    except FileNotFoundError:
        print(f"Error: Excel file not found at {excel_file_path}")
        return None, None
    except Exception as e:
        print(f"An error occurred while reading the Excel file: {e}")
        return None, None

# Run the fucntion to updload data
customer_df, transactions_df = load_data(excel_file_path)

DataFrames 'customer_df' and 'transactions_df' created successfully.

First 2 rows of 'customer_df':
    Unnamed: 0  Customer_Id  Customer_Type Current_Address_Country  \
0         NaN             1      Personal                     GBR   
1         NaN             2      Business                     ESP   

  Customer_Since_Date  Unnamed: 5  Unnamed: 6          Field  \
0          2023-02-23         NaN         NaN    Customer_Id   
1          2023-10-18         NaN         NaN  Customer_Type   

                                          Definition  
0            Unique Identifier for a given customer   
1  Flag that identifies whether the customer is a...  

First 2 rows of 'transactions_df':
    Unnamed: 0  Transaction Id  Customer_Id   Amount_GBP Currency_Route   \
0         NaN              73             1         809     GBP --> AUD   
1         NaN             102             1         184     EUR --> EUR   

  Transaction Date  Unnamed: 6           Field  \
0       2022-08-23 

## 3. Data Cleaning <a id='section_3'></a>

* [Table of Contents](#section_0)


This code processes customer_df and transactions_df, by changing their headers to use the first row as column names. It filters both DataFrames to retain only specific columns. Finally, it prints the cleaned and filtered DataFrames for review.

In [5]:
# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

@dataclass
class DataFrameColumns:
    """Define required columns and their mappings"""
    customer_columns = {
        # Original column name -> Standardized name
        'Customer_Id': 'Customer_Id',  # Exact match from original
        'Customer_Type': 'Customer_Type',
        'Current_Address_Country': 'Current_Address_Country',
        'Customer_Since_Date': 'Customer_Since_Date'
    }
    
    transaction_columns = {
        # Original column name -> Standardized name
        'Transaction Id': 'Transaction_Id',  # Note the space
        'Customer_Id': 'Customer_Id',
        'Amount_GBP': 'Amount_GBP',
        'Currency_Route': 'Currency_Route',
        'Transaction Date': 'Transaction_Date'  # Note the space
    }

def preprocess_dataframe(df: pd.DataFrame, required_columns: dict, df_name: str) -> Optional[pd.DataFrame]:
    """
    Preprocesses DataFrame by removing unnecessary columns and standardizing column names
    """
    try:
        logger.info(f"Preprocessing {df_name}...")
        logger.info(f"Original columns in {df_name}: {df.columns.tolist()}")
        
        # Remove rows that are actually metadata (where Field and Definition columns exist)
        if 'Field' in df.columns and 'Definition' in df.columns:
            df = df[df['Field'].isna()].copy()
        
        # Remove unnecessary columns
        columns_to_drop = ['Unnamed: 0', 'Unnamed: 5', 'Unnamed: 6', 'Field', 'Definition']
        df = df.drop(columns=[col for col in columns_to_drop if col in df.columns])
        
        # Log available columns after initial cleaning
        logger.info(f"Columns after initial cleaning in {df_name}: {df.columns.tolist()}")
        
        # Before checking required columns, let's clean up column names
        df.columns = df.columns.str.strip()
        
        # Verify required columns exist
        present_columns = set(df.columns)
        required_column_names = set(required_columns.keys())
        
        if not required_column_names.issubset(present_columns):
            missing = required_column_names - present_columns
            logger.error(f"Missing required columns in {df_name}: {missing}")
            logger.error(f"Available columns: {present_columns}")
            return None
        
        # Select and rename columns
        df = df[list(required_columns.keys())].copy()
        df = df.rename(columns=required_columns)
        
        return df
        
    except Exception as e:
        logger.error(f"Error preprocessing {df_name}: {str(e)}")
        return None

def clean_data(
    customer_df: pd.DataFrame, 
    transactions_df: pd.DataFrame
) -> Tuple[Optional[pd.DataFrame], Optional[pd.DataFrame]]:
    """
    Cleans and preprocesses customer and transaction data for regulatory reporting
    """
    try:
        # Log initial DataFrame shapes
        logger.info(f"Initial customer_df shape: {customer_df.shape}")
        logger.info(f"Initial transactions_df shape: {transactions_df.shape}")
        
        columns = DataFrameColumns()
        
        # Preprocess DataFrames
        customer_df = preprocess_dataframe(customer_df, columns.customer_columns, "customer_df")
        transactions_df = preprocess_dataframe(transactions_df, columns.transaction_columns, "transactions_df")
        
        if customer_df is None or transactions_df is None:
            raise ValueError("Preprocessing failed")

        logger.info("Starting data cleaning process...")
        
        # Clean up dates
        customer_df['Customer_Since_Date'] = pd.to_datetime(
            customer_df['Customer_Since_Date'],
            errors='coerce'
        )
        
        transactions_df['Transaction_Date'] = pd.to_datetime(
            transactions_df['Transaction_Date'],
            errors='coerce'
        )
        
        # Clean up Currency_Route (remove arrows and spaces)
        transactions_df['Currency_Route'] = transactions_df['Currency_Route'].str.replace(' --> ', '-')
        
        # Clean up customer type and country
        customer_df['Customer_Type'] = customer_df['Customer_Type'].str.strip()
        customer_df['Current_Address_Country'] = customer_df['Current_Address_Country'].str.strip()
        
        # Convert Amount_GBP to numeric, removing any currency symbols if present
        transactions_df['Amount_GBP'] = pd.to_numeric(
            transactions_df['Amount_GBP'].astype(str).str.replace('[£,]', '', regex=True),
            errors='coerce'
        )
        
        logger.info("Data cleaning completed successfully")
        logger.info(f"Final customer_df shape: {customer_df.shape}")
        logger.info(f"Final transactions_df shape: {transactions_df.shape}")
        
        return customer_df, transactions_df
        
    except Exception as e:
        logger.error(f"Error during data cleaning: {str(e)}", exc_info=True)
        return None, None

def display_dataframe_info(customer_df: Optional[pd.DataFrame], 
                         transactions_df: Optional[pd.DataFrame]) -> None:
    """
    Safely displays information about the cleaned DataFrames
    """
    if customer_df is not None and transactions_df is not None:
        print("\nDataFrames cleaned successfully.")
        print("\nFirst 2 rows of 'customer_df':")
        print(customer_df.head(2))
        print("\nCustomer DataFrame Info:")
        print(f"Total customers: {len(customer_df)}")
        print(f"Customer types distribution:\n{customer_df['Customer_Type'].value_counts()}")
        
        print("\nFirst 2 rows of 'transactions_df':")
        print(transactions_df.head(2))
        print("\nTransaction DataFrame Info:")
        print(f"Total transactions: {len(transactions_df)}")
        print(f"Average transaction amount: £{transactions_df['Amount_GBP'].mean():.2f}")
    else:
        print("Data cleaning failed. Please check the logs for details.")

if __name__ == "__main__":
    # Antes de la limpieza, veamos la estructura actual
    print("\nColumns originals in customer_df:", customer_df.columns.tolist())
    print("\nColumns originals in transactions_df:", transactions_df.columns.tolist())
    
    # Clean data
    cleaned_customer_df, cleaned_transactions_df = clean_data(customer_df, transactions_df)
    
    # Print results
    display_dataframe_info(cleaned_customer_df, cleaned_transactions_df)

2024-10-06 20:24:05,771 - INFO - Initial customer_df shape: (100, 9)
2024-10-06 20:24:05,772 - INFO - Initial transactions_df shape: (90, 9)
2024-10-06 20:24:05,773 - INFO - Preprocessing customer_df...
2024-10-06 20:24:05,774 - INFO - Original columns in customer_df: ['Unnamed: 0', 'Customer_Id ', 'Customer_Type', 'Current_Address_Country', 'Customer_Since_Date', 'Unnamed: 5', 'Unnamed: 6', 'Field', 'Definition']
2024-10-06 20:24:05,778 - INFO - Columns after initial cleaning in customer_df: ['Customer_Id ', 'Customer_Type', 'Current_Address_Country', 'Customer_Since_Date']
2024-10-06 20:24:05,780 - INFO - Preprocessing transactions_df...
2024-10-06 20:24:05,781 - INFO - Original columns in transactions_df: ['Unnamed: 0', 'Transaction Id', 'Customer_Id ', 'Amount_GBP', 'Currency_Route ', 'Transaction Date', 'Unnamed: 6', 'Field', 'Definition']
2024-10-06 20:24:05,783 - INFO - Columns after initial cleaning in transactions_df: ['Transaction Id', 'Customer_Id ', 'Amount_GBP', 'Currency_


Columns originals in customer_df: ['Unnamed: 0', 'Customer_Id ', 'Customer_Type', 'Current_Address_Country', 'Customer_Since_Date', 'Unnamed: 5', 'Unnamed: 6', 'Field', 'Definition']

Columns originals in transactions_df: ['Unnamed: 0', 'Transaction Id', 'Customer_Id ', 'Amount_GBP', 'Currency_Route ', 'Transaction Date', 'Unnamed: 6', 'Field', 'Definition']

DataFrames cleaned successfully.

First 2 rows of 'customer_df':
   Customer_Id Customer_Type Current_Address_Country Customer_Since_Date
4            5      Business                     FRA          2023-10-23
5            6      Business                     ISL          2022-12-26

Customer DataFrame Info:
Total customers: 96
Customer types distribution:
Customer_Type
Personal    65
Business    27
Name: count, dtype: int64

First 2 rows of 'transactions_df':
   Transaction_Id  Customer_Id  Amount_GBP Currency_Route Transaction_Date
5              85            3         460        USD-USD       2022-12-30
6             117     

In [6]:
# Update the original DataFrames with their cleaned versions
customer_df = cleaned_customer_df
transactions_df = cleaned_transactions_df

# Print results
display_dataframe_info(customer_df, transactions_df)


DataFrames cleaned successfully.

First 2 rows of 'customer_df':
   Customer_Id Customer_Type Current_Address_Country Customer_Since_Date
4            5      Business                     FRA          2023-10-23
5            6      Business                     ISL          2022-12-26

Customer DataFrame Info:
Total customers: 96
Customer types distribution:
Customer_Type
Personal    65
Business    27
Name: count, dtype: int64

First 2 rows of 'transactions_df':
   Transaction_Id  Customer_Id  Amount_GBP Currency_Route Transaction_Date
5              85            3         460        USD-USD       2022-12-30
6             117            4        1000        USD-AUD       2022-08-21

Transaction DataFrame Info:
Total transactions: 85
Average transaction amount: £484.89


## 4. Basic QA Checklist <a id='section_4'></a>

* [Table of Contents](#section_0)

Before uploading your DataFrames customer_df and transactions_df to Google Cloud Storage (GCS), it's important to perform a basic quality assurance (QA) check to ensure that the data is correct and in the appropriate format. 

In [7]:
# --- Create the output directory if it doesn't exist ---
if not os.path.exists('output'):
    os.makedirs('output')

def perform_qa_checks(customer_df, transactions_df):
    """
    Performs basic quality assurance checks on the data.

    Args:
      customer_df: DataFrame containing customer data.
      transactions_df: DataFrame containing transaction data.
    """
    # --- Check the Structure of the DataFrame ---
    print("Structure of customer_df:")
    print(customer_df.info())
    print("\nStructure of transactions_df:")
    print(transactions_df.info())

    # --- Check for Null Values ---
    print("\nNull Values in customer_df:")
    print(customer_df.isnull().sum())
    print("\nNull Values in transactions_df:")
    print(transactions_df.isnull().sum())

    # --- Check for Duplicates ---
    print("\nDuplicates in customer_df:", customer_df.duplicated().sum())
    print("Duplicates in transactions_df:", transactions_df.duplicated().sum())

    # --- Check Ranges and Expected Values ---
    print("\nUnique Values in Customer_Type:")
    print(customer_df['Customer_Type'].unique())
    print("\nUnique Values in Current_Address_Country:")
    print(customer_df['Current_Address_Country'].unique())
    print("\nUnique Dates in transactions_df:")
    print(transactions_df['Currency_Route'].describe())

    # --- Validate Date Formats ---
    try:
        pd.to_datetime(customer_df['Customer_Since_Date'])
        print("Date format in Customer_Since_Date is valid.")
    except Exception as e:
        print("Error in date format:", e)
    try:
        pd.to_datetime(transactions_df['Transaction Date'])
        print("Date format in Date is valid.")
    except Exception as e:
        print("Error in date format:", e)

    # --- Random Sample of Data ---
    print("\nRandom Sample from customer_df:")
    print(customer_df.sample(5))
    print("\nRandom Sample from transactions_df:")
    print(transactions_df.sample(5))

    # --- Check Referential Integrity ---
    missing_customers = transactions_df[~transactions_df['Customer_Id'].isin(customer_df['Customer_Id'])]
    if missing_customers.empty:
        print("No broken references between transactions_df and customer_df.")
    else:
        print("Missing references in transactions_df:", missing_customers)

def generate_report(customer_df, transactions_df):
    """
    Generates a basic data quality report.

    Args:
      customer_df: DataFrame containing customer data.
      transactions_df: DataFrame containing transaction data.

    Returns:
      A pandas DataFrame containing the data quality report.
    """

    # --- Customer Statistics ---
    customer_stats = {
        ('Customer Statistics', 'Number of Unique Customers'): customer_df['Customer_Id'].nunique(),
        ('Customer Statistics', 'Customer Type Distribution'): customer_df['Customer_Type'].value_counts().to_dict(),
        ('Customer Statistics', 'Customer Country Distribution'): customer_df['Current_Address_Country'].value_counts().to_dict(),
        ('Customer Statistics', 'Customer Since Date Statistics'): customer_df['Customer_Since_Date'].describe().to_dict()
    }

    # --- Transaction Statistics ---
    transaction_stats = {
        ('Transaction Statistics', 'Total Transactions'): len(transactions_df),
        ('Transaction Statistics', 'Total Amount (GBP)'): transactions_df['Amount_GBP'].sum(),
        ('Transaction Statistics', 'Average Transaction Amount (GBP)'): transactions_df['Amount_GBP'].mean(),
        ('Transaction Statistics', 'Maximum Transaction Amount (GBP)'): transactions_df['Amount_GBP'].max(),
        ('Transaction Statistics', 'Minimum Transaction Amount (GBP)'): transactions_df['Amount_GBP'].min(),
        ('Transaction Statistics', 'Transactions by Currency Route'): transactions_df['Currency_Route'].value_counts().to_dict()
    }

    # --- Combine all stats into a report ---
    all_stats = {**customer_stats, **transaction_stats}
    report = pd.DataFrame.from_dict(all_stats, orient='index')

    return report

# --- Run QA checks ---
print("Running data quality checks...")
perform_qa_checks(customer_df, transactions_df)

# --- Generate and display the report ---
print("\nGenerating report...")
report = generate_report(customer_df, transactions_df)

# --- Save the report to an Excel file ---
report.to_excel('output/data_quality_report.xlsx', sheet_name='Data Quality Report', index=True)

# --- Load the workbook and select the worksheet ---
wb = openpyxl.load_workbook('output/data_quality_report.xlsx')
ws = wb['Data Quality Report']

# --- Formatting ---
# Set the title
ws['A1'].value = 'Wise Data Quality Report'
ws['A1'].font = Font(bold=True, size=14)

# Adjust column widths
ws.column_dimensions['A'].width = 30
ws.column_dimensions['B'].width = 50

# Apply formatting to headers
header_font = Font(bold=True)
header_border = Border(top=Side(style='thin'), bottom=Side(style='thin'), left=Side(style='thin'), right=Side(style='thin'))
header_alignment = Alignment(wrap_text=True, vertical='top')
for cell in ws[3]:  # Headers are in row 3
    cell.font = header_font
    cell.border = header_border
    cell.alignment = header_alignment

# Apply formatting to the index
index_font = Font(bold=True)
index_border = Border(top=Side(style='thin'), bottom=Side(style='thin'), left=Side(style='thin'), right=Side(style='thin'))
index_alignment = Alignment(wrap_text=True, vertical='top')
for cell in ws['A']:
    cell.font = index_font
    cell.border = index_border
    cell.alignment = index_alignment

wb.save('output/data_quality_report.xlsx')
print("Report saved to Wise_CS/output/data_quality_report.xlsx")

Running data quality checks...
Structure of customer_df:
<class 'pandas.core.frame.DataFrame'>
Index: 96 entries, 4 to 99
Data columns (total 4 columns):
 #   Column                   Non-Null Count  Dtype         
---  ------                   --------------  -----         
 0   Customer_Id              96 non-null     int64         
 1   Customer_Type            92 non-null     object        
 2   Current_Address_Country  96 non-null     object        
 3   Customer_Since_Date      96 non-null     datetime64[ns]
dtypes: datetime64[ns](1), int64(1), object(2)
memory usage: 3.8+ KB
None

Structure of transactions_df:
<class 'pandas.core.frame.DataFrame'>
Index: 85 entries, 5 to 89
Data columns (total 5 columns):
 #   Column            Non-Null Count  Dtype         
---  ------            --------------  -----         
 0   Transaction_Id    85 non-null     int64         
 1   Customer_Id       85 non-null     int64         
 2   Amount_GBP        85 non-null     int64         
 3   Cur

## 5. Save DataFrames <a id='section_5'></a>

* [Table of Contents](#section_0)

This code saves the customer_df and transactions_df DataFrames as CSV files in the output directory. It then generates dynamic filenames for these CSVs by appending the current month and year, and uploads them to a Google Cloud Storage bucket.

In [8]:
def create_and_load_table(customer_df, transactions_df, project_id, dataset_id, table_id_customer, table_id_transactions):
    """
    Creates and loads tables in BigQuery.

    Args:
        customer_df: DataFrame containing customer data.
        transactions_df: DataFrame containing transaction data.
        project_id: Google Cloud project ID.
        dataset_id: BigQuery dataset ID.
        table_id_customer: BigQuery table ID for customer data.
        table_id_transactions: BigQuery table ID for transaction data.
    """
    try:
        print("Starting BigQuery table creation and loading process...")

        client = bigquery.Client(project=project_id)  # Use project_id variable
        dataset_ref = client.dataset(dataset_id)    # Use dataset_id variable

        # Create the customer table
        table_ref_customer = dataset_ref.table(table_id_customer)
        job_config_customer = bigquery.LoadJobConfig(
            autodetect=True, 
            write_disposition="WRITE_APPEND"
        )
        job_customer = client.load_table_from_dataframe(
            customer_df, table_ref_customer, job_config=job_config_customer
        )
        job_customer.result()  # Wait for the job to complete
        print(f"Loaded {job_customer.output_rows} rows into BigQuery table {project_id}.{dataset_id}.{table_id_customer}")

        # Create the transactions table
        table_ref_transactions = dataset_ref.table(table_id_transactions)
        job_config_transactions = bigquery.LoadJobConfig(
            autodetect=True,
            write_disposition="WRITE_APPEND" 
        )
        job_transactions = client.load_table_from_dataframe(
            transactions_df, table_ref_transactions, job_config=job_config_transactions
        )
        job_transactions.result()  # Wait for the job to complete
        print(f"Loaded {job_transactions.output_rows} rows into BigQuery table {project_id}.{dataset_id}.{table_id_transactions}")

        print("BigQuery table creation and loading process completed.")

    except Exception as e:
        print(f"An error occurred while creating or loading tables in BigQuery: {e}")


def save_data(customer_df, transactions_df, bucket_name, destination_blob_name_customer, 
              destination_blob_name_transactions, project_id, dataset_id, 
              table_id_customer, table_id_transactions):
    """
    Saves the DataFrames as CSV files, uploads them to Google Cloud Storage, 
    and creates and loads tables in BigQuery. Also uploads a data quality report.

    Args:
        customer_df: DataFrame containing customer data.
        transactions_df: DataFrame containing transaction data.
        bucket_name: Name of the GCS bucket.
        destination_blob_name_customer: Filename for the customer data on GCS.
        destination_blob_name_transactions: Filename for the transaction data on GCS.
        project_id: Google Cloud project ID.
        dataset_id: BigQuery dataset ID.
        table_id_customer: BigQuery table ID for customer data.
        table_id_transactions: BigQuery table ID for transaction data.
    """
    try:
        # Save DataFrames to CSV files 
        customer_df.to_csv('output/customer_data.csv', index=False)
        transactions_df.to_csv('output/transactions_data.csv', index=False)

        # Access the bucket
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)

        # Upload the files with the dynamic filenames
        blob_customer = bucket.blob(destination_blob_name_customer)
        blob_customer.upload_from_filename('output/customer_data.csv')
        print(f"Uploaded {blob_customer.name} to GCS") 

        blob_transactions = bucket.blob(destination_blob_name_transactions)
        blob_transactions.upload_from_filename('output/transactions_data.csv')
        print(f"Uploaded {blob_transactions.name} to GCS") 

        # Get today's date for the report filename
        today = datetime.date.today().strftime("%Y-%m-%d")  # Corrected line

        # Upload the data quality report with today's date in the filename
        report_blob_name = f"data_quality_report_{today}.xlsx"
        report_blob = bucket.blob(report_blob_name)
        report_blob.upload_from_filename('output/data_quality_report.xlsx')
        print(f"Uploaded {report_blob.name} to GCS") 

        print("Files uploaded successfully to GCS.")

        # Create and load tables in BigQuery
        create_and_load_table(customer_df, transactions_df, project_id, dataset_id, table_id_customer, table_id_transactions)

    except Exception as e:
        print(f"An error occurred while saving or uploading the data: {e}")


if __name__ == "__main__":

    # Call the save_data function to execute the entire process.
    save_data(customer_df, transactions_df, bucket_name, 
              destination_blob_name_customer, destination_blob_name_transactions, 
              project_id, dataset_id, table_id_customer, table_id_transactions)

Uploaded customer_data.csv to GCS
Uploaded transactions_data.csv to GCS
Uploaded data_quality_report_2024-10-06.xlsx to GCS
Files uploaded successfully to GCS.
Starting BigQuery table creation and loading process...
Loaded 96 rows into BigQuery table wiseentitydataflow.wise_dataset.customers
Loaded 85 rows into BigQuery table wiseentitydataflow.wise_dataset.transactions
BigQuery table creation and loading process completed.
