You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
from google.cloud import bigquery
from google.oauth2 import service_account
import pandas as pd
class BigQueryClient:
def init(self, credentials_file, project_id):
"""
Initialize the BigQueryClient class.
:param credentials_file: Path to the Google Cloud service account key JSON file.
:param project_id: Google Cloud project ID.
"""
self.credentials = service_account.Credentials.from_service_account_file(credentials_file)
self.client = bigquery.Client(credentials=self.credentials, project=project_id)
def query(self, query):
"""
Execute a SQL query and return the results as a DataFrame.
:param query: SQL query string.
:return: DataFrame containing the query results.
"""
query_job = self.client.query(query)
result = query_job.result()
return result.to_dataframe()
def insert_rows(self, table_id, rows_to_insert):
"""
Insert rows into a BigQuery table.
:param table_id: Table ID in the format `project.dataset.table`.
:param rows_to_insert: List of rows to insert, where each row is a dictionary.
:return: List of errors encountered during the insert operation.
"""
errors = self.client.insert_rows_json(table_id, rows_to_insert)
if errors:
print(f"Encountered errors while inserting rows: {errors}")
return errors
def load_dataframe(self, dataframe, table_id, if_exists='replace'):
"""
Load a DataFrame into a BigQuery table.
:param dataframe: Pandas DataFrame to load.
:param table_id: Table ID in the format `project.dataset.table`.
:param if_exists: Behavior when the table exists. Options: 'replace', 'append'.
"""
job_config = bigquery.LoadJobConfig()
if if_exists == 'replace':
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
elif if_exists == 'append':
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
else:
raise ValueError("Invalid value for if_exists: 'replace' or 'append' expected.")
job = self.client.load_table_from_dataframe(dataframe, table_id, job_config=job_config)
job.result() # Wait for the job to complete
print(f"Loaded {job.output_rows} rows into {table_id}.")
def create_table(self, table_id, schema):
"""
Create a new BigQuery table with the specified schema.
:param table_id: Table ID in the format `project.dataset.table`.
:param schema: List of bigquery.SchemaField objects defining the table schema.
"""
table = bigquery.Table(table_id, schema=schema)
table = self.client.create_table(table)
print(f"Created table {table_id}")
def delete_table(self, table_id):
"""
Delete a BigQuery table.
:param table_id: Table ID in the format `project.dataset.table`.
"""
self.client.delete_table(table_id, not_found_ok=True)
print(f"Deleted table {table_id}")
def list_datasets(self):
"""
List all datasets in the project.
:return: List of dataset IDs.
"""
datasets = list(self.client.list_datasets())
return [dataset.dataset_id for dataset in datasets]
def list_tables(self, dataset_id):
"""
List all tables in a dataset.
:param dataset_id: Dataset ID.
:return: List of table IDs.
"""
tables = list(self.client.list_tables(dataset_id))
return [table.table_id for table in tables]
The text was updated successfully, but these errors were encountered:
Here is some starter code:
from google.cloud import bigquery
from google.oauth2 import service_account
import pandas as pd
class BigQueryClient:
def init(self, credentials_file, project_id):
"""
Initialize the BigQueryClient class.
The text was updated successfully, but these errors were encountered: