In [3]:
!ls clean_data/

browsing_mobile_behaviour.csv    im_mobile_behaviour.csv
column_descriptions.json         other_mobile_behaviour.csv
fibre_behaviour.csv              streaming_mobile_behaviour.csv
fibre_connected_devices_info.csv subscribers_info.csv
file_access_mobile_behaviour.csv table_descriptions.json
gaming_mobile_behaviour.csv      voip_mobile_behaviour.csv


In [10]:
import os
import pandas as pd
import tempfile # Although not used for the final output, good for temporary operations

def transform_and_save_csv_dates(input_directory: str, output_directory: str):
    """
    Reads CSV files ending with '_mobile_behaviour.csv' from the input directory,
    transforms the 'DATE_ID' column from 'D/M/YYYY' or 'DD/MM/YYYY' to 'YYYY-MM-DD' format,
    converts all column names to uppercase, and saves the processed files
    to the specified output directory.

    Args:
        input_directory (str): The path to the directory containing the original CSV files.
        output_directory (str): The path to the directory where transformed CSV files will be saved.
    """
    if not os.path.exists(input_directory):
        print(f"Error: Input directory '{input_directory}' does not exist.")
        return

    os.makedirs(output_directory, exist_ok=True) # Create output directory if it doesn't exist

    print(f"Searching for '_mobile_behaviour.csv' files in: {input_directory}")

    csv_files = [f for f in os.listdir(input_directory) if f.endswith('_mobile_behaviour.csv')]

    if not csv_files:
        print(f"No '_mobile_behaviour.csv' files found in '{input_directory}'.")
        return

    print(f"Found {len(csv_files)} files to process: {', '.join(csv_files)}")

    for csv_file in csv_files:
        full_input_path = os.path.join(input_directory, csv_file)
        full_output_path = os.path.join(output_directory, csv_file)

        print(f"\n--- Processing file: '{csv_file}' ---")
        try:
            # Read CSV into a pandas DataFrame
            df = pd.read_csv(full_input_path)
            print(f"Successfully read '{csv_file}'. Original rows: {len(df)}")

            # Convert all column names to uppercase
            df.columns = df.columns.str.upper()
            print("Column names converted to uppercase.")

            # Attempt to transform 'DATE_ID' column
            if 'DATE_ID' in df.columns:
                print(f"Transforming 'DATE_ID' column...")
                print(f"Sample of 'DATE_ID' before transformation:\n{df['DATE_ID'].head()}")
                try:
                    # Try parsing with '%#d/%#m/%Y' (flexible D/M/YYYY) based on new sample data
                    df['DATE_ID'] = pd.to_datetime(df['DATE_ID'], format='%d/%m/%Y', errors='coerce')

                    # Fallback to other common formats if the primary parse results in all NaT
                    if df['DATE_ID'].isnull().all():
                        print(f"Warning: Primary parsing ('%#d/%#m/%Y') resulted in all empty values. Attempting other common formats for '{csv_file}'.")
                        # Try YYYY-DD-MM (original request)
                        df['DATE_ID'] = pd.to_datetime(df['DATE_ID'], format='%Y-%d-%m', errors='coerce')
                        if df['DATE_ID'].isnull().all():
                            # Try YYYY-MM-DD (another common format)
                            df['DATE_ID'] = pd.to_datetime(df['DATE_ID'], format='%Y-%m-%d', errors='coerce')

                    # Convert valid datetime objects back to 'YYYY-MM-DD' string format
                    # This will result in 'NaN' for any dates that couldn't be parsed by any format
                    df['DATE_ID'] = df['DATE_ID'].dt.strftime('%Y-%m-%d')
                    print(f"Successfully transformed 'DATE_ID' column in '{csv_file}'.")
                    print(f"Sample of 'DATE_ID' after transformation:\n{df['DATE_ID'].head()}")

                    # Log how many values became NaN
                    nan_count = df['DATE_ID'].isnull().sum()
                    if nan_count > 0:
                        print(f"Note: {nan_count} rows in 'DATE_ID' column became NaN after transformation, indicating unparsable original values.")

                except Exception as e:
                    print(f"Warning: Could not transform 'DATE_ID' column in '{csv_file}': {e}")
                    print("This might happen if some date values are not in a parsable format.")
            else:
                print(f"Warning: 'DATE_ID' column not found in '{csv_file}'. Skipping date transformation for this file.")

            # Save the transformed DataFrame to the output directory
            df.to_csv(full_output_path, index=False, encoding='utf-8')
            print(f"Transformed file saved to: '{full_output_path}'.")

        except pd.errors.EmptyDataError:
            print(f"Warning: '{csv_file}' is empty. Skipping.")
        except pd.errors.ParserError as e:
            print(f"Error parsing '{csv_file}': {e}. Skipping.")
        except Exception as e:
            print(f"An unexpected error occurred while processing '{csv_file}': {e}")

    print("\nAll targeted CSV file transformations completed.")


INPUT_CSV_DIRECTORY = "../data"
# Define a new directory to save the fixed CSVs
OUTPUT_CSV_DIRECTORY = "../experiments/clean_data/" # Or any other desired path

# Call the function to perform the transformation and saving
transform_and_save_csv_dates(INPUT_CSV_DIRECTORY, OUTPUT_CSV_DIRECTORY)

print("\nAfter this script runs, you can use the CSVs in:")
print(f"'{OUTPUT_CSV_DIRECTORY}' for re-uploading to BigQuery.")
print("These files will have DATE_ID in YYYY-MM-DD format and all column names in CAPS.")


Searching for '_mobile_behaviour.csv' files in: ../data
Found 7 files to process: gaming_mobile_behaviour.csv, streaming_mobile_behaviour.csv, im_mobile_behaviour.csv, voip_mobile_behaviour.csv, other_mobile_behaviour.csv, browsing_mobile_behaviour.csv, file_access_mobile_behaviour.csv

--- Processing file: 'gaming_mobile_behaviour.csv' ---
Successfully read 'gaming_mobile_behaviour.csv'. Original rows: 8671
Column names converted to uppercase.
Transforming 'DATE_ID' column...
Sample of 'DATE_ID' before transformation:
0    1/12/2024
1    1/12/2024
2    1/12/2024
3    1/12/2024
4    1/12/2024
Name: DATE_ID, dtype: object
Successfully transformed 'DATE_ID' column in 'gaming_mobile_behaviour.csv'.
Sample of 'DATE_ID' after transformation:
0    2024-12-01
1    2024-12-01
2    2024-12-01
3    2024-12-01
4    2024-12-01
Name: DATE_ID, dtype: object
Transformed file saved to: '../experiments/clean_data/gaming_mobile_behaviour.csv'.

--- Processing file: 'streaming_mobile_behaviour.csv' ---
S

In [14]:
import os
import json
import pandas as pd
from google.cloud import bigquery
from google.api_core import exceptions
import tempfile

# --- Configuration ---
# Replace with your Google Cloud Project ID and BigQuery Dataset ID
PROJECT_ID = "cymbal-telco-da"
DATASET_ID = "cymbal_telco_dataset"
CSV_DIRECTORY = "../experiments/clean_data/" # Path to your CSV files
TABLE_DESCRIPTIONS_FILE = "../experiments/clean_data/table_descriptions.json" # Path to your table descriptions JSON
COLUMN_DESCRIPTIONS_FILE = "../experiments/clean_data/column_descriptions.json" # Path to your column descriptions JSON

# Initialize BigQuery client
client = bigquery.Client(project=PROJECT_ID)
dataset_ref = client.dataset(DATASET_ID)

In [15]:




def create_dataset_if_not_exists(client, dataset_id, project_id):
    """
    Checks if a BigQuery dataset exists and creates it if not.

    Args:
        client (google.cloud.bigquery.Client): The BigQuery client.
        dataset_id (str): The ID of the BigQuery dataset.
        project_id (str): The Google Cloud Project ID.
    """
    dataset_ref = client.dataset(dataset_id)
    try:
        client.get_dataset(dataset_ref)
        print(f"Dataset '{dataset_id}' already exists in project '{project_id}'.")
    except exceptions.NotFound:
        print(f"Dataset '{dataset_id}' not found. Creating dataset...")
        dataset = bigquery.Dataset(dataset_ref)
        dataset.location = "US"  # Set the location for your dataset (e.g., "US", "EU", "ASIA-SOUTHEAST1")
        try:
            dataset = client.create_dataset(dataset, timeout=30)
            print(f"Dataset '{dataset_id}' created successfully in project '{project_id}'.")
        except exceptions.Conflict:
            print(f"Dataset '{dataset_id}' already exists (was created concurrently).")
        except Exception as e:
            print(f"Error creating dataset '{dataset_id}': {e}")

def upload_csv_to_bigquery_with_caps_columns(csv_file_path, table_id, client, dataset_ref):
    """
    Uploads a CSV file to a BigQuery table, ensuring all column names are in CAPS.
    A new table will be created if it doesn't exist. The schema will be auto-detected.

    Args:
        csv_file_path (str): The full path to the CSV file.
        table_id (str): The ID of the BigQuery table to load data into.
        client (google.cloud.bigquery.Client): The BigQuery client.
        dataset_ref (google.cloud.bigquery.DatasetReference): Reference to the BigQuery dataset.
    """
    table_ref = dataset_ref.table(table_id)
    temp_csv_path = None # Initialize to None

    try:
        # Read CSV into a pandas DataFrame
        df = pd.read_csv(csv_file_path)

        # Convert all column names to uppercase
        df.columns = df.columns.str.upper()
        print(f"Converted column names to uppercase for {os.path.basename(csv_file_path)}.")

        # Save the modified DataFrame to a temporary CSV file
        # Use tempfile for secure temporary file creation and automatic cleanup
        with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.csv', encoding='utf-8') as tmp_csv:
            df.to_csv(tmp_csv.name, index=False, encoding='utf-8')
            temp_csv_path = tmp_csv.name
        print(f"Saved temporary CSV with uppercase columns: {temp_csv_path}")

        job_config = bigquery.LoadJobConfig(
            source_format=bigquery.SourceFormat.CSV,
            skip_leading_rows=1,  # Skip header row (the new uppercase one)
            autodetect=True,      # Auto-detect schema from CSV
            write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, # Overwrite if table exists
        )

        with open(temp_csv_path, "rb") as source_file:
            load_job = client.load_table_from_file(source_file, table_ref, job_config=job_config)

        print(f"Starting job {load_job.job_id} for table {table_id} from {csv_file_path}")
        load_job.result() # Waits for the job to complete
        print(f"Successfully loaded {load_job.output_rows} rows into {table_id}.")

    except exceptions.NotFound:
        print(f"Dataset {DATASET_ID} not found. Please ensure it exists in project {PROJECT_ID}.")
    except Exception as e:
        print(f"Error uploading {csv_file_path} to {table_id}: {e}")
    finally:
        # Clean up the temporary CSV file
        if temp_csv_path and os.path.exists(temp_csv_path):
            os.remove(temp_csv_path)
            print(f"Cleaned up temporary CSV: {temp_csv_path}")


def update_table_descriptions(bq_table_id, descriptions_file, client, dataset_ref):
    """
    Updates the descriptions for a specific BigQuery table based on a JSON file.

    Args:
        bq_table_id (str): The ID of the BigQuery table to update.
        descriptions_file (str): The path to the JSON file containing table descriptions.
        client (google.cloud.bigquery.Client): The BigQuery client.
        dataset_ref (google.cloud.bigquery.DatasetReference): Reference to the BigQuery dataset.
    """
    if not os.path.exists(descriptions_file):
        print(f"Table descriptions file not found: {descriptions_file}")
        return

    with open(descriptions_file, 'r') as f:
        table_descriptions_data = json.load(f)

    found_description = False
    for item in table_descriptions_data:
        # Try to match both possible keys from the JSON, normalizing to uppercase for robust matching
        json_table_name_1 = item.get("Data Source Name")
        json_table_name_2 = item.get("table_name")
        description = item.get("description")

        # Check if the current bq_table_id matches any of the names in the JSON entry (case-insensitive)
        if (json_table_name_1 and json_table_name_1.upper() == bq_table_id.upper()) or \
           (json_table_name_2 and json_table_name_2.upper() == bq_table_id.upper()):
            found_description = True
            if not description:
                print(f"No description found in JSON for table {bq_table_id}. Skipping description update for this table.")
                break # Found the entry, but no description to apply

            try:
                table = client.get_table(dataset_ref.table(bq_table_id)) # Fetch existing table
                table.description = description # Update description
                table = client.update_table(table, ["description"]) # Send API request
                print(f"Updated description for table: {bq_table_id}")
            except exceptions.NotFound:
                print(f"Table {bq_table_id} not found in dataset {DATASET_ID}. Skipping description update.")
            except Exception as e:
                print(f"Error updating description for table {bq_table_id}: {e}")
            break # Found and processed the description for this table, exit loop

    if not found_description:
        print(f"No description found in {descriptions_file} for table: {bq_table_id}. Skipping table description update.")


def update_column_descriptions(table_id, descriptions_file, client, dataset_ref):
    """
    Updates the descriptions for BigQuery table columns based on a JSON file,
    performing case-insensitive matching for column names and normalizing column names
    to uppercase during lookup.

    Args:
        table_id (str): The ID of the BigQuery table whose columns are to be updated.
        descriptions_file (str): The path to the JSON file containing column descriptions.
        client (google.cloud.bigquery.Client): The BigQuery client.
        dataset_ref (google.cloud.bigquery.DatasetReference): Reference to the BigQuery dataset.
    """
    if not os.path.exists(descriptions_file):
        print(f"Column descriptions file not found: {descriptions_file}")
        return

    with open(descriptions_file, 'r') as f:
        raw_all_column_descriptions = json.load(f)

    # Normalize all table and column description keys in the loaded JSON to uppercase
    normalized_all_column_descriptions = {}
    for json_table_name, cols_data in raw_all_column_descriptions.items():
        normalized_cols = {col_key.upper(): col_value for col_key, col_value in cols_data.items()}
        normalized_all_column_descriptions[json_table_name.upper()] = normalized_cols
    print(f"Loaded and normalized column descriptions from '{descriptions_file}'.")

    # Check if we have descriptions for the specific table being processed
    if table_id.upper() not in normalized_all_column_descriptions:
        print(f"No column descriptions found in JSON for table '{table_id}'. Skipping column description update for this table.")
        return

    columns_data_from_json = normalized_all_column_descriptions[table_id.upper()]

    try:
        table_ref = dataset_ref.table(table_id)
        table = client.get_table(table_ref)  # Fetch current table metadata

        original_schema = table.schema
        new_schema = []
        updated_count = 0

        for field in original_schema:
            # Match field.name (from BigQuery table) with keys in columns_data_from_json (from JSON)
            # by converting field.name to uppercase for lookup.
            column_description_data = columns_data_from_json.get(field.name.upper())

            if column_description_data:
                new_description = column_description_data.get("description")

                if new_description and new_description != field.description:
                    new_field = bigquery.SchemaField(
                        name=field.name,  # Keep original casing for the field name in the schema (which should be CAPS now)
                        field_type=field.field_type,
                        mode=field.mode,
                        description=new_description,
                        fields=field.fields  # Preserve nested fields if any
                    )
                    new_schema.append(new_field)
                    updated_count += 1
                    print(f"  - Updated description for column '{field.name}'.")
                else:
                    new_schema.append(field) # No change needed or description is already identical
            else:
                new_schema.append(field) # Keep original field if no new description found for it

        if updated_count > 0:
            # Update the table with the new schema (which includes updated descriptions)
            table.schema = new_schema
            table = client.update_table(table, ["schema"])
            print(f"Successfully updated {updated_count} column descriptions for table: {table_id}")
        else:
            print(f"No column descriptions needed updating for table: {table_id}.")

    except exceptions.NotFound:
        print(f"Table '{table_id}' not found in dataset '{DATASET_ID}'. Skipping column description update for this table.")
    except Exception as e:
        print(f"Error updating column descriptions for table '{table_id}': {e}")

In [16]:
# --- Main Execution Flow ---

# Cell 1: Setup and Configuration
print(f"Configured for Project: {PROJECT_ID}, Dataset: {DATASET_ID}")
create_dataset_if_not_exists(client, DATASET_ID, PROJECT_ID)
print("BigQuery client initialized.")

Configured for Project: cymbal-telco-da, Dataset: cymbal_telco_dataset
Dataset 'cymbal_telco_dataset' already exists in project 'cymbal-telco-da'.
BigQuery client initialized.


In [19]:
# Cell 2: List CSV files (for confirmation, not strictly necessary for the script but useful for debugging)
print("\n--- Available CSV files in clean_data/ ---")
csv_files_in_dir = [f for f in os.listdir('../experiments/clean_data/') if f.endswith('_mobile_behaviour.csv')]
for f in csv_files_in_dir:
    print(f)

# Define a mapping for file names to BigQuery table IDs if they differ from the filename base.
# These table IDs will be used as the BigQuery table names.
bq_table_name_map = {
    "subscribers_info.csv": "subscribers_info",
    "fibre_connected_devices_info.csv": "fibre_connected_devices_info",
    "fibre_behaviour.csv": "fibre_behaviour",
    "browsing_mobile_behaviour.csv": "browsing_mobile_behaviour", # Corrected original typo "broswing" to "browsing"
    "im_mobile_behaviour.csv": "im_mobile_behaviour",
    "other_mobile_behaviour.csv": "other_mobile_behaviour",
    "streaming_mobile_behaviour.csv": "streaming_mobile_behaviour",
    "file_access_mobile_behaviour.csv": "file_access_mobile_behaviour",
    "gaming_mobile_behaviour.csv": "gaming_mobile_behaviour",
    "voip_mobile_behaviour.csv": "voip_mobile_behaviour"
}

# Sort csv files for consistent processing order (optional but good for reproducibility)
csv_files_in_dir.sort()

for csv_file in csv_files_in_dir:
    # Get the base filename (e.g., "subscribers_info.csv")
    base_file_name = csv_file
    # Get the BigQuery table ID from the map, or derive from filename (without .csv extension)
    table_id = bq_table_name_map.get(base_file_name, os.path.splitext(base_file_name)[0])

    csv_full_path = os.path.join(CSV_DIRECTORY, csv_file)

    print(f"\n--- Processing {csv_file} (Intended BigQuery Table ID: {table_id}) ---")

    # --- Step 1: Upload CSV Data to BigQuery with CAPS columns ---
    print(f"\n--- Uploading CSV Data to BigQuery for table {table_id} ---")
    upload_csv_to_bigquery_with_caps_columns(csv_full_path, table_id, client, dataset_ref)

    # --- Step 2: Update BigQuery Table Descriptions ---
    print(f"\n--- Updating BigQuery Table Description for table {table_id} ---")
    update_table_descriptions(table_id, TABLE_DESCRIPTIONS_FILE, client, dataset_ref)

    # --- Step 3: Update BigQuery Column Descriptions ---
    print(f"\n--- Updating BigQuery Column Descriptions for table {table_id} ---")
    update_column_descriptions(table_id, COLUMN_DESCRIPTIONS_FILE, client, dataset_ref)

print("\nAll BigQuery upload and description update processes complete.")


--- Available CSV files in clean_data/ ---
gaming_mobile_behaviour.csv
streaming_mobile_behaviour.csv
im_mobile_behaviour.csv
voip_mobile_behaviour.csv
other_mobile_behaviour.csv
browsing_mobile_behaviour.csv
file_access_mobile_behaviour.csv

--- Processing browsing_mobile_behaviour.csv (Intended BigQuery Table ID: browsing_mobile_behaviour) ---

--- Uploading CSV Data to BigQuery for table browsing_mobile_behaviour ---
Converted column names to uppercase for browsing_mobile_behaviour.csv.
Saved temporary CSV with uppercase columns: /var/folders/88/pldbvcsj07lgrpgmryy9w43r01b183/T/tmpgig228_z.csv
Starting job 04091eb7-8018-4eb8-8b2d-53c97d501b63 for table browsing_mobile_behaviour from ../experiments/clean_data/browsing_mobile_behaviour.csv
Successfully loaded 652926 rows into browsing_mobile_behaviour.
Cleaned up temporary CSV: /var/folders/88/pldbvcsj07lgrpgmryy9w43r01b183/T/tmpgig228_z.csv

--- Updating BigQuery Table Description for table browsing_mobile_behaviour ---
No descripti