Skip to content
This repository has been archived by the owner on Jul 29, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3 from Labelbox/export_to_table
Browse files Browse the repository at this point in the history
Export to table
  • Loading branch information
raphaeljafriLB committed Mar 13, 2024
2 parents 028de01 + 713a029 commit b202875
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 10 deletions.
127 changes: 118 additions & 9 deletions labelboxbigquery/client.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import labelbox
from labelbox import Client as labelboxClient
from labelbox.schema.data_row_metadata import DataRowMetadataKind
from labelbase.metadata import sync_metadata_fields, get_metadata_schema_to_name_key
from labelbase.downloader import export_and_flatten_labels
from google.cloud import bigquery
from google.oauth2 import service_account
from uuid import uuid4
import pandas as pd
from datetime import datetime


# BigQuery limits special characters that can be used in column names and they have to be unicode
DIVIDER_MAPPINGS = {'&' : '\u0026', '%' : '\u0025', '>' : '\u003E', '#' : '\u0023', '|' : '\u007c'}

class Client:
""" A LabelBigQuery Client, containing a Labelbox Client and BigQuery Client Object
Args:
Expand Down Expand Up @@ -36,9 +42,18 @@ def __init__(
lb_app_url="https://app.labelbox.com"):

self.lb_client = labelboxClient(lb_api_key, endpoint=lb_endpoint, enable_experimental=lb_enable_experimental, app_url=lb_app_url)
bq_creds = service_account.Credentials.from_service_account_file(google_key) if google_key else None
self.bq_client = bigquery.Client(project=google_project_name, credentials=bq_creds)

self.bq_creds = service_account.Credentials.from_service_account_file(google_key) if google_key else None
self.bq_client = bigquery.Client(project=google_project_name, credentials=self.bq_creds)
self.google_project_name = google_project_name

def _validate_divider(self, divider):
unicode_divider = ''
for char in divider:
if char not in DIVIDER_MAPPINGS:
raise ValueError(f"Restricted character(s) found in divider - {char}. The allowed characters are {[key for key in DIVIDER_MAPPINGS.keys()]}")
unicode_divider += DIVIDER_MAPPINGS[char]
return unicode_divider

def _sync_metadata_fields(self, bq_table_id, metadata_index={}):
""" Ensures Labelbox's Metadata Ontology has all necessary metadata fields given a metadata_index
Args:
Expand Down Expand Up @@ -174,8 +189,91 @@ def __check_global_keys(client, global_keys):
else:
upload_results.extend(task.result)
return upload_results

def export_to_BigQuery(self, project, bq_dataset_id:str, bq_table_name:str, create_table:bool=False,
include_metadata:bool=False, include_performance:bool=False, include_agreement:bool=False,
include_label_details:bool=False, verbose:bool=False, mask_method:str="png", divider="|||",
export_filters:dict=None):

divider = self._validate_divider(divider)
flattened_labels_dict = export_and_flatten_labels(
client=self.lb_client, project=project, include_metadata=include_metadata,
include_performance=include_performance, include_agreement=include_agreement,
include_label_details=include_label_details, mask_method=mask_method, verbose=verbose, divider=divider,
export_filters=export_filters
)
if len(flattened_labels_dict) == 0:
if verbose:
print("No labels were found in the project export")
return

#Make sure all
flattened_labels_dict = [{key: str(val) for key, val in dict.items()} for dict in flattened_labels_dict]

table = pd.DataFrame.from_dict(flattened_labels_dict)
label_ids = table['label_id'].to_numpy()
labels_str = ""
for label_id in label_ids:
labels_str += "'" + label_id + "',"
labels_str = labels_str[:-1]
columns = table.columns.values.tolist()
table_schema = [bigquery.SchemaField(col, "STRING") for col in columns]
bq_table_name = bq_table_name.replace("-","_") # BigQuery tables shouldn't have "-" in them, as this causes errors when performing SQL updates

if create_table:
bq_table = self.bq_client.create_table(bigquery.Table(f"{self.google_project_name}.{bq_dataset_id}.{bq_table_name}", schema=table_schema))
if verbose:
print(f'Created BigQuery Table with ID {bq_table.table_id}')
labels_to_insert = flattened_labels_dict
else:
bq_table = self.bq_client.get_table(bigquery.Table(f"{self.google_project_name}.{bq_dataset_id}.{bq_table_name}"))
query = """
SELECT updated_at, label_id
FROM {0}
WHERE label_id in ({1})
"""
query = query.format(f"{self.google_project_name}.{bq_dataset_id}.{bq_table_name}", labels_str)
query_job = self.bq_client.query(query)
rows = list(query_job.result())
labels_to_update = []
labels_to_insert = []
for label in flattened_labels_dict:
label_in_table = False
for row in rows:
if label['label_id'] == row[1]:
label_in_table = True
row_time = datetime.strptime(row[0], "%Y-%m-%dT%H:%M:%S.%f%z")
label_time = datetime.strptime(label["updated_at"], "%Y-%m-%dT%H:%M:%S.%f%z")
if label_time > row_time:
labels_to_update.append(label)
if not label_in_table:
labels_to_insert.append(label)
if len(labels_to_update) > 0:
job_config = bigquery.LoadJobConfig(
schema=table_schema,
write_disposition="WRITE_TRUNCATE",
)
job = self.bq_client.load_table_from_json(
flattened_labels_dict, f"{self.google_project_name}.{bq_dataset_id}.{bq_table_name}", job_config=job_config
)
errors = job.result().errors
if not errors and verbose:
print(f'Successfully updated table. {len(labels_to_update)} rows were updated and {len(labels_to_insert)} new rows were inserted')
elif verbose:
print(errors)
return errors
if verbose:
print(f"inserting {len(labels_to_insert)} data rows to table")
errors = self.bq_client.insert_rows_json(bq_table, labels_to_insert)
if not errors and verbose:
print(f'Insert job successful')
elif verbose:
print(f"There are errors present:\n {errors}")
return errors

def create_data_rows_from_table(self, bq_table_id, lb_dataset, row_data_col, global_key_col=None, external_id_col=None, metadata_index={}, attachment_index={}, skip_duplicates=False):
def create_data_rows_from_table(
self, bq_table_id:str="", lb_dataset:labelbox.schema.dataset.Dataset=None, row_data_col:str="", global_key_col:str=None,
external_id_col:str=None, metadata_index:dict={}, attachment_index:dict={}, skip_duplicates:bool=False, divider:str="|||"):
""" Creates Labelbox data rows given a BigQuery table and a Labelbox Dataset
Args:
bq_table_id : Required (str) - BigQuery Table ID structured in the following format: "google_project_name.dataset_name.table_name"
Expand All @@ -186,9 +284,12 @@ def create_data_rows_from_table(self, bq_table_id, lb_dataset, row_data_col, glo
metadata_index : Optional (dict) - Dictionary where {key=column_name : value=metadata_type} - metadata_type must be one of "enum", "string", "datetime" or "number"
attachment_index : Optional (dict) - Dictionary where {key=column_name : value=attachment_type} - attachment_type must be one of "IMAGE", "VIDEO", "TEXT", "HTML"
skip_duplicates : Optional (bool) - If True, will skip duplicate global_keys, otherwise will generate a unique global_key with a suffix "_1", "_2" and so on
divider : Optional (str) - String delimiter for schema name keys and suffix added to duplocate global keys
Returns:
List of errors from data row upload - if successful, is an empty list
"""

divider = self._validate_divider(divider)
# Sync metadata index keys with metadata ontology
check = self._sync_metadata_fields(bq_table_id, metadata_index)
if not check:
Expand Down Expand Up @@ -240,9 +341,9 @@ def create_data_rows_from_table(self, bq_table_id, lb_dataset, row_data_col, glo
query_lookup[mdf] = index_value
index_value += 1
if attachment_index:
attachment_whitelist = ["IMAGE", "VIDEO", "RAW_TEXT", "HTML", "TEXT_URL"]
for attachment_field_name in attachment_index:
atf = attachment_field_name.replace(" ", "_")
attachment_whitelist = ["IMAGE", "VIDEO", "RAW_TEXT", "HTML", "TEXT_URL"]
if attachment_index[attachment_field_name] not in attachment_whitelist:
print(f'Error: Invalid value for attachment_index key {attachment_field_name} : {attachment_index[attachment_field_name]}\n must be one of {attachment_whitelist}')
return None
Expand All @@ -259,10 +360,15 @@ def create_data_rows_from_table(self, bq_table_id, lb_dataset, row_data_col, glo
# Iterate over your query payload to construct a list of data row dictionaries in Labelbox format
global_key_to_upload_dict = {}
for row in query_job:
if len(row[query_lookup[row_data_col]]) <= 200:
global_key = row[query_lookup[row_data_col]]
else:
print("Global key too long (>200 characters). Replacing with randomly generated global key.")
global_key = str(uuid4())
data_row_upload_dict = {
"row_data" : row[query_lookup[row_data_col]],
"metadata_fields" : [{"schema_id":metadata_name_key_to_schema['lb_integration_source'],"value":"BigQuery"}],
"global_key" : str(row[query_lookup[global_key_col]])
"global_key" : str(global_key)
}
if external_id_col:
data_row_upload_dict['external_id'] = row[query_lookup[external_id_col]]
Expand All @@ -283,9 +389,11 @@ def create_data_rows_from_table(self, bq_table_id, lb_dataset, row_data_col, glo
data_row_upload_dict['attachments'] = [{"type" : attachment_index[attachment_field_name], "value" : row[query_lookup[attachment_field_name]]} for attachment_field_name in attachment_index]
global_key_to_upload_dict[row[query_lookup[global_key_col]]] = data_row_upload_dict
# Batch upload your list of data row dictionaries in Labelbox format
if type(lb_dataset) == str):
if type(lb_dataset) == str:
lb_dataset = self.lb_client.get_dataset(lb_dataset)
upload_results = self.__batch_create_data_rows(client=self.lb_client, dataset=lb_dataset, global_key_to_upload_dict=global_key_to_upload_dict)


print(f'Success')
return upload_results

Expand Down Expand Up @@ -340,6 +448,7 @@ def create_table_from_dataset(self, bq_dataset_id, bq_table_name, lb_dataset, me
if metadata_field_name in field_to_value.keys():
row_dict[mdf] = field_to_value[metadata_field_name]
rows_to_insert.append(row_dict)
print(len(rows_to_insert))
errors = self.bq_client.insert_rows_json(bq_table, rows_to_insert)
if not errors:
print(f'Success\nCreated BigQuery Table with ID {bq_table.table_id}')
Expand Down Expand Up @@ -442,4 +551,4 @@ def upsert_labelbox_metadata(self, bq_table_id, global_key_col, global_keys_list
field.value = metadata_name_key_to_schema[name_key] if name_key in metadata_name_key_to_schema.keys() else table_value
upload_metadata.append(labelbox.schema.data_row_metadata.DataRowMetadata(data_row_id=drid, fields=new_metadata))
results = lb_mdo.bulk_upsert(upload_metadata)
return results
return results
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="labelboxbigquery",
version="0.1.03",
version="0.1.04",
author="Labelbox",
author_email="raphael@labelbox.com",
description="Labelbox Connector for BigQuery",
Expand Down

0 comments on commit b202875

Please sign in to comment.