# Upload multiple CSV files from a Google Cloud Bucket to a Big Query dataset

Package installation

In [None]:
!pip install gcsfs



In [None]:
from google.colab import auth
from google.cloud import bigquery
import pandas as pd

# Authenticate with Google Cloud
auth.authenticate_user()

# Set up BigQuery client
client = bigquery.Client(project='ty-mini-project-41')

# Function to create a new table in BigQuery
def create_table(dataset_id, table_id, schema):
    dataset_ref = client.dataset(dataset_id)
    table_ref = dataset_ref.table(table_id)


    table = bigquery.Table(table_ref, schema=schema)
    table = client.create_table(table)
    print(f'Table {table.table_id} created successfully.')




In [None]:
# Function to read a csv file from a Google cloud Bucket and push its content to a table in Big Query
def load_csv_to_table(dataset_id, table_id, csv_path):
    dataset_ref = client.dataset(dataset_id)
    table_ref = dataset_ref.table(table_id)

    job_config = bigquery.LoadJobConfig()
    job_config.skip_leading_rows = 1  # Skip the header row

    with open(csv_path, 'rb') as source_file:
        job = client.load_table_from_file(source_file, table_ref, job_config=job_config)

    job.result()  # Wait for the job to complete

    print(f'CSV file {csv_path} loaded into table {table_id} successfully.')

The following functions were included to standarize date formats within the files

In [None]:
from datetime import datetime
#find object columns

def change_time(x):
    format = '%m/%d/%Y %I:%M:%S %p'
    my_date = datetime.strptime(x, format)
    return my_date

def fix_the_csv(df):
  all_strings=list(df.dtypes[df.dtypes=='object'].index)
  for var in all_strings:
    try:
      if df[var].apply(lambda x:1 if x.count(" PM")+x.count(' AM')>0 else 0).sum()>0:
        df[var]=df[var].apply(lambda x: change_time(x))
    except:
      pass
  return df

def change_types(x):
  if x.count('int')>0:
    return "NUMERIC"
  elif x.count('date')>0:
    return 'DATETIME'
  elif x.count('float')>0:
    return 'BIGNUMERIC'
  else:
    return 'STRING'

The process need to receive the name of each file that you would like to add, so i included this function to list all files in a google cloud bucket

In [None]:
from google.cloud import storage

def list_files_in_bucket(bucket_name):
    # Initialize a client
    client = storage.Client()

    for blob in client.list_blobs(bucket_name):
      print(str(blob))


list_files_in_bucket('final_predicted_files_when2heat')


<Blob: final_predicted_files_when2heat, AT_predicted_df_predicted_df.csv, 1714047960010594>
<Blob: final_predicted_files_when2heat, BE_predicted_df_predicted_df.csv, 1714048031119581>
<Blob: final_predicted_files_when2heat, BG_predicted_df_predicted_df.csv, 1714047959998075>
<Blob: final_predicted_files_when2heat, CH_predicted_df_predicted_df.csv, 1714047486808800>
<Blob: final_predicted_files_when2heat, CZ_predicted_df_predicted_df.csv, 1714048063168641>
<Blob: final_predicted_files_when2heat, DE_predicted_df_predicted_df.csv, 1714047692659772>
<Blob: final_predicted_files_when2heat, DK_predicted_df_predicted_df.csv, 1714048031243220>
<Blob: final_predicted_files_when2heat, EE_predicted_df_predicted_df.csv, 1714048074848408>
<Blob: final_predicted_files_when2heat, ES_predicted_df_predicted_df.csv, 1714047466165570>
<Blob: final_predicted_files_when2heat, FI_predicted_df_predicted_df.csv, 1714047722653462>
<Blob: final_predicted_files_when2heat, FR_predicted_df_predicted_df.csv, 171404

In [None]:
from google.colab import auth
from google.cloud import bigquery
import pandas as pd

# Authenticate with Google Cloud
auth.authenticate_user()

# Set up BigQuery client
client = bigquery.Client(project='ty-mini-project-41')

# Define the dataset_id
dataset_id = 'final_predicted_when2heat'  # Replace with your actual dataset ID

# Function to create a new table in BigQuery
def create_table(dataset_id, table_id, schema):
    dataset_ref = client.dataset(dataset_id)
    table_ref = dataset_ref.table(table_id)

    table = bigquery.Table(table_ref, schema=schema)
    table = client.create_table(table)
    print(f'Table {table.table_id} created successfully.')

# Function to read a csv file from a Google cloud Bucket and push its content to a table in Big Query
def load_csv_to_table(dataset_id, table_id, csv_path):
    dataset_ref = client.dataset(dataset_id)
    table_ref = dataset_ref.table(table_id)

    job_config = bigquery.LoadJobConfig()
    job_config.skip_leading_rows = 1  # Skip the header row

    with open(csv_path, 'rb') as source_file:
        job = client.load_table_from_file(source_file, table_ref, job_config=job_config)

    job.result()  # Wait for the job to complete

    print(f'CSV file {csv_path} loaded into table {table_id} successfully.')

# Function to fix the CSV DataFrame

def fix_the_csv(df, country_prefix):
    # Convert 'year', 'hour', 'day', and 'month' columns to integers
    df['year'] = df['year'].astype(int)
    df['month'] = df['month'].astype(int)
    df['day'] = df['day'].astype(int)
    df['hour'] = df['hour'].astype(int)

    # Combine 'year', 'hour', 'day', and 'month' columns into 'cet_timestamp'
    df['cet_timestamp'] = df.apply(lambda row: datetime(int(row['year']), int(row['month']), int(row['day']), int(row['hour'])), axis=1)

    # Drop individual date and time columns
    df = df.drop(columns=['year', 'hour', 'day', 'month'])

    # Add country prefix to each column except 'cet_timestamp'
    df.columns = [f'{country_prefix}_{col}' if col != 'cet_timestamp' else col for col in df.columns]

    return df

# Iterate over the CSV files
file_names = [
    'AT_predicted_df_predicted_df.csv',
    'BE_predicted_df_predicted_df.csv',
    'BG_predicted_df_predicted_df.csv',
    'CH_predicted_df_predicted_df.csv',
    'CZ_predicted_df_predicted_df.csv',
    'DE_predicted_df_predicted_df.csv',
    'DK_predicted_df_predicted_df.csv',
    'EE_predicted_df_predicted_df.csv',
    'ES_predicted_df_predicted_df.csv',
    'FI_predicted_df_predicted_df.csv',
    'FR_predicted_df_predicted_df.csv',
    'GB_predicted_df_predicted_df.csv',
    'GR_predicted_df_predicted_df.csv',
    'HR_predicted_df_predicted_df.csv',
    'HU_predicted_df_predicted_df.csv',
    'IE_predicted_df_predicted_df.csv',
    'IT_predicted_df_predicted_df.csv',
    'LT_predicted_df_predicted_df.csv',
    'LU_predicted_df_predicted_df.csv',
    'LV_predicted_df_predicted_df.csv',
    'NL_predicted_df_predicted_df.csv',
    'NO_predicted_df_predicted_df.csv',
    'PL_predicted_df_predicted_df.csv',
    'PT_predicted_df_predicted_df.csv',
    'RO_predicted_df_predicted_df.csv',
    'SE_predicted_df_predicted_df.csv',
    'SI_predicted_df_predicted_df.csv',
    'SK_predicted_df_predicted_df.csv'
]

csv_files = ['gs://final_predicted_files_when2heat/'+file for file in file_names]  # Update with your CSV file names and path

for csv_file in csv_files:
    # Read the CSV file
    df = pd.read_csv(csv_file)

    # Extract the country prefix from the file name
    country_prefix = csv_file.split('/')[-1][:2]

    # Modify the column names with the country prefix
    df = fix_the_csv(df, country_prefix)

    # Create a schema dictionary
    dtypes_dic = df.dtypes.apply(lambda x: str(x)).to_dict()
    ready_dtypes_dic = {k: change_types(dtypes_dic[k]) for k in dtypes_dic}
    schema = [bigquery.SchemaField(k, ready_dtypes_dic[k]) for k in ready_dtypes_dic]

    # Extract the table name from the CSV file name
    table_id = csv_file.split('/')[-1]
    table_id = table_id.replace(".csv", "")

    # Create a new table in BigQuery
    create_table(dataset_id, table_id, schema)

    # Write the CSV into the newly created table
    df.to_csv(table_id, index=False)  # Save the CSV file with the modified column names

    # Load the CSV into the newly created table
    load_csv_to_table(dataset_id, table_id, table_id)




Table AT_predicted_df_predicted_df created successfully.
CSV file AT_predicted_df_predicted_df loaded into table AT_predicted_df_predicted_df successfully.
Table BE_predicted_df_predicted_df created successfully.
CSV file BE_predicted_df_predicted_df loaded into table BE_predicted_df_predicted_df successfully.
Table BG_predicted_df_predicted_df created successfully.
CSV file BG_predicted_df_predicted_df loaded into table BG_predicted_df_predicted_df successfully.
Table CH_predicted_df_predicted_df created successfully.
CSV file CH_predicted_df_predicted_df loaded into table CH_predicted_df_predicted_df successfully.
Table CZ_predicted_df_predicted_df created successfully.
CSV file CZ_predicted_df_predicted_df loaded into table CZ_predicted_df_predicted_df successfully.
Table DE_predicted_df_predicted_df created successfully.
CSV file DE_predicted_df_predicted_df loaded into table DE_predicted_df_predicted_df successfully.
Table DK_predicted_df_predicted_df created successfully.
CSV fil