In [1]:
!pip install google-cloud-storage google-cloud-bigquery pandas




In [1]:
from google.colab import auth
auth.authenticate_user()

In [2]:


PROJECT_ID = 'maximal-chemist-355505'
!gcloud config set project $PROJECT_ID

Updated property [core/project].


In [4]:
import pandas as pd
from google.cloud import bigquery, storage
import os
from io import BytesIO

# Set up GCS and BigQuery details
GCS_BUCKET_NAME = 'hdfc_segmentwise_data'
PROJECT_ID = 'maximal-chemist-355505'
BIGQUERY_DATASET = 'hdfc_dummy'

# Initialize Google Cloud Storage and BigQuery clients
storage_client = storage.Client(project=PROJECT_ID)
bq_client = bigquery.Client(project=PROJECT_ID)

def sanitize_column_names(df):
    """
    Sanitizes DataFrame column names to comply with BigQuery standards
    and applies specific transformations based on known column names.
    """
    df.columns = df.columns.str.replace(' ', '_').str.lower()
    df.columns = df.columns.str.replace(r"[^a-z0-9_]", "_", regex=True).str.replace(r"_+", "_", regex=True).str.strip('_')
    column_mapping = {
        'Fire': 'fire',
        'Marine Total': 'marine_total',
        'Marine Cargo': 'marine_cargo',
        'Marine Hull': 'marine_hull',
        'Engineering': 'engineering',
        'Motor Total': 'motor_total',
        'Motor OD': 'motor_od',
        'Motor TP': 'motor_tp',
        'Health': 'health',
        'Aviation': 'aviation',
        'P.A': 'personal_accident',
        'All Other Misc (Crop Insurance+ Credit guarantee + All Other Misc)': 'all_other_misc',
        'Grand Total': 'grand_total',
        'Growth %': 'growth_percentage',
        'Market %': 'market_percentage',
        'Accretion': 'accretion'
    }
    df.rename(columns=column_mapping, inplace=True)
    return df

def handle_previous_year_and_growth_rows(df):
    """
    Processes rows with 'Previous Year' and '% Growth' indicators to append relevant names.
    Ignores any rows after 'Previous Year Market Share' that contain irrelevant text or special characters.
    """
    processed_data = []
    skip_rows = False
    for i in range(len(df)):
        row = df.iloc[i]
        if "Previous Year Market Share" in row.values:
            skip_rows = True
            continue
        if skip_rows:
            continue
        if row.str.contains("^Previous Year$", case=False, regex=True).all():
            if i - 1 >= 0:
                prev_name = df.iloc[i - 1].astype(str).str.cat(sep=' ').strip() + "_Previous Year"
                row = row.copy()
                row[:] = prev_name
            processed_data.append(row)
        elif row.str.contains(r"^% Growth$", case=False, regex=True).all():
            if i - 2 >= 0:
                growth_name = df.iloc[i - 2].astype(str).str.cat(sep=' ').strip() + "_Growth"
                row = row.copy()
                row[:] = growth_name
            processed_data.append(row)
        else:
            processed_data.append(row)
    df_processed = pd.DataFrame(processed_data, columns=df.columns)
    return df_processed

def convert_column_types(df):
    """
    Converts DataFrame column types to ensure compatibility with BigQuery.
    """
    for column in df.columns:
        df[column] = df[column].astype(str)
    return df

def find_segmentwise_sheet(excel_data):
    """
    Finds the correct sheet containing 'segmentwise' in its name or specific columns.
    """
    xls = pd.ExcelFile(BytesIO(excel_data))
    sheet_name = None
    for name in xls.sheet_names:
        if "segmentwise" in name.lower():
            print(f"Found sheet with 'segmentwise' in name: {name}")
            sheet_name = name
            break
    if sheet_name is None:
        known_columns = ['fire', 'marine_total', 'marine_cargo', 'health']
        for sheet_index in [2, 3]:
            if sheet_index < len(xls.sheet_names):
                temp_df = pd.read_excel(xls, sheet_name=sheet_index, header=1)
                temp_df.columns = temp_df.columns.str.lower().str.replace(' ', '_').str.replace(r"[^a-z0-9_]", "_", regex=True)
                if all(col in temp_df.columns for col in known_columns):
                    print(f"Found sheet with known columns at index {sheet_index}: {xls.sheet_names[sheet_index]}")
                    sheet_name = xls.sheet_names[sheet_index]
                    break
    return sheet_name

def check_table_exists(table_id):
    """
    Checks if a table exists in BigQuery.
    """
    try:
        bq_client.get_table(table_id)
        return True
    except:
        return False

def load_excel_files_to_bigquery(request):
    request_json = request.get_json()
    if request.args and 'message' in request.args:
        return request.args.get('message')
    else:
        bucket = storage_client.bucket(GCS_BUCKET_NAME)
        blobs = bucket.list_blobs(prefix='comp-segmentwise/')
        excel_files = [blob.name for blob in blobs if blob.name.endswith('.xlsx')]

        if not excel_files:
            print("No Excel files found in the specified GCS bucket.")
            return 'No Excel files found.'

        for file_path in excel_files:
            file_name = os.path.basename(file_path).replace('.xlsx', '').replace('-', '_')
            table_id = f'{PROJECT_ID}.{BIGQUERY_DATASET}.{file_name}'

            if check_table_exists(table_id):
                print(f"Table {table_id} already exists. Skipping table creation.")
                continue

            print(f'Processing {file_path} and creating table {file_name}')
            blob = bucket.blob(file_path)
            excel_data = blob.download_as_bytes()

            sheet_name = find_segmentwise_sheet(excel_data)
            if sheet_name is None:
                print(f'No relevant sheet found in {file_path}. Skipping this file.')
                continue

            df = pd.read_excel(BytesIO(excel_data), header=2, sheet_name=sheet_name)
            df = sanitize_column_names(df)
            df = handle_previous_year_and_growth_rows(df)
            df = convert_column_types(df)

            job_config = bigquery.LoadJobConfig(
                write_disposition=bigquery.WriteDisposition.WRITE_EMPTY,  # Only create the table if it doesn't exist
                autodetect=True
            )

            try:
                job = bq_client.load_table_from_dataframe(df, table_id, job_config=job_config)
                job.result()
                table = bq_client.get_table(table_id)
                print(f'Loaded {table.num_rows} rows into table {table_id}.')
            except Exception as e:
                print(f'Error loading table {table_id}: {e}')

        return 'Task successfully completed'
