<td>
   <a target="_blank" href="https://labelbox.com" ><img src="https://labelbox.com/blog/content/images/2021/02/logo-v4.svg" width=256/></a>
</td>

<td>
<a href="https://colab.research.google.com/github/Labelbox/labelspark/blob/master/notebooks/delta_table_upload_demo.ipynb" target="_blank"><img
src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"></a>
</td>




<td>
<a href="https://ggithub/Labelbox/labelspark/blob/master/notebooks/delta_table_upload_demo.ipynb" target="_blank"><img
src="https://img.shields.io/badge/GitHub-100000?logo=github&logoColor=white" alt="GitHub"></a>
</td>

# __*Labelbox Connector for Databricks Tutorial Notebook*__

# _**Ingesting Data into Labelbox from Delta Tables using Labelspark**_

Thanks for trying out the Databricks and Labelbox Connector! This notebook will help illustrate how Labelbox and Databricks can be used together to power unstructured data workflows.

Labelbox can be used to rapidly annotate a variety of unstructured data from your Data Lake ([images](https://labelbox.com/product/image), [video](https://labelbox.com/product/video), [text](https://labelbox.com/product/text), and [geospatial tiled imagery](https://docs.labelbox.com/docs/tiled-imagery-editor)) and the Labelbox Connector for Databricks makes it easy to bring the annotations back into your Lakehouse environment for AI/ML and analytical workflows.

If you would like to watch a video of the workflow, check out our [Data & AI Summit Demo](https://databricks.com/session_na21/productionizing-unstructured-data-for-ai-and-analytics).


<img src="https://labelbox.com/static/images/partnerships/collab-chart.svg" alt="example-workflow" width="800"/>

<h5>Questions or comments? Reach out to us at ecosystem+databricks@labelbox.com

## _**Documentation**_

### **Data Rows**
_____________________

**Requirements:**

- A `row_data` column - This column must be URLs that point to the asset to-be-uploaded

- Either a `dataset_id` column or an input argument for `dataset_id`
  - If uploading to multiple datasets, provide a `dataset_id` column
  - If uploading to one dataset, provide a `dataset_id` input argument
    - _This can still be a column if it's already in your CSV file_

- If uploading data from a cloud storage bucket, make sure you set up the proper [Labelbox integration](https://docs.labelbox.com/docs/iam-delegated-access) for that cloud storage service

**Recommended:**
- A `global_key` column
  - This column contains unique identifiers for your data rows
  - If none is provided, will default to your `row_data` column
  - The Labelspark client will validate the uniqueness of the global keys provided in this column. If one or more of the global keys already exist in your organization, the client will use the `skip_duplicates` argument of `create_data_rows_from_delta_table()` to determine how to proceed.

    - `skip_duplicates = True`: Rows with existing global keys will not be uploaded
    - `skip_duplicates = False`: Rows with existing global keys will have a suffix appended to the global key to ensure uniqueness.


**Optional:**
- A `project_id` columm or an input argument for `project_id`
  - If batching to multiple projects, provide a `project_id` column
  - If batching to one project, provide a `project_id` input argument
    - _This can still be a column if it's already in your Delta Table_

## _**Code**_

Install required packages

In [3]:
%pip install labelspark  -q
%pip install delta-spark -q
%pip install pyspark -q

Instantiate Clients and create a new Labelbox dataset. This is the dataset that we will use for ingesting data from a Delta table into Labelbox.

We will also be creating a new project and attaching the uploaded data for labeling.

In [None]:
import labelspark as ls
import labelbox as lb

API_KEY = ""
client = ls.Client(API_KEY)

lb_client = lb.Client(API_KEY)
dataset = lb_client.create_dataset(name="Databricks Integration Demo Dataset", iam_integration=None)
project = lb_client.create_project(name="Databricks Integration Demo Project", media_type=lb.MediaType.Image)

Install the required dependencies if you are uploading data from a Delta table stored in a third-party cloud service.

AWS S3 dependencies:

In [None]:
!wget -P /usr/local/lib/python3.10/dist-packages/pyspark/jars https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar
!wget -P /usr/local/lib/python3.10/dist-packages/pyspark/jars https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar

GCS dependencies:

In [3]:
!wget -P /usr/local/lib/python3.10/dist-packages/pyspark/jars https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar

The spark_config argument is only required when connecting to a third-party storage service such as AWS S3 or GCS, the fields that need to be in spark_config are:


*   jars - A string containing the path to the .jar files needed for the connector you are using, separated by a comma. The .jar file versions will need to be compatible with the version of Spark you have installed.

  *   AWS S3 - hadoop-aws-2.7.4.jar, aws-java-sdk-1.7.4.jar
  *   GCS - gcs-connector-hadoop3-latest.jar
*   credentials - Only required if connecting to GCS, the value should be the path to your [GCS service account](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account) credentials JSON file. To run this notebook with a GCS credentials file, click on the folder icon on the left sidebar and then click on the upload file icon. This will upload the file to the Notebook's `/content/` folder.


*   AWS_ACCCESS_KEY - Only required if connecting to AWS S3, the value should be your [AWS service account](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html) access key
*   AWS_SECRET_KEY - Only required if connecting to AWS S3, the value should be your [AWS service account](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html) secret key



In [5]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

SPARK_HOME = '/usr/local/lib/python3.10/dist-packages/pyspark/'

AWS_ACCESS_KEY = ''
AWS_SECRET_KEY = ''

aws_spark_config = {
                "jars": f"{SPARK_HOME}/jars/hadoop-aws-2.7.4.jar, {SPARK_HOME}/jars/aws-java-sdk-1.7.4.jar",
                "AWS_ACCESS_KEY": AWS_ACCESS_KEY,
                "AWS_SECRET_KEY": AWS_SECRET_KEY
}

gcs_credentials_file = '/content/{name of credentials .json file}'

gcs_spark_config = {
                "jars": f"{SPARK_HOME}/jars/gcs-connector-hadoop3-latest.jar",
                "credentials": gcs_credentials_file
}

#table_path can either be a local file, or a link to a third-party cloud storage service (e.g gs://{bucket_name}/{folder}/{table} or s3a://{bucket_name}/{folder}/{table})
table_path = ""

If you don't have a Delta Table ready for this demo, run the following cell to create one and save it to this notebook's local storage:

In [None]:
data = [{'row_data': 'https://lb-test-data.s3.us-west-1.amazonaws.com/image-samples/sample-image-1.jpg', 'global_key': 'sample-image-1.jpg'}]
table_path = '/content/demo-table'

builder = SparkSession.builder.appName("labelspark_export").config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension").config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
df = spark.createDataFrame(data)

df.write.format('delta').save(table_path)


In [8]:
#Make sure the Delta table saved properly
df = spark.read.load(table_path)
df.show()

+------------------+--------------------+
|        global_key|            row_data|
+------------------+--------------------+
|sample-image-1.jpg|https://lb-test-d...|
+------------------+--------------------+



In [None]:
#When uploading data from a Delta table saved locally, only the dataset_id and table_path argument are required. The project_id argument
#is only necessary if you want the uploaded data rows to be batched to a project
results = client.create_data_rows_from_delta_table(dataset_id=dataset.uid, project_id=project.uid, verbose=True, table_path=table_path)

We can either create a Spark Session before running `create_data_rows_from_delta_table()`, or we can pass in the `spark_config` argument that the Labelspark Client will use to programatically create the Spark Session.

# AWS:

Build Spark Session before running `create_data_rows_from_delta_table()`:

In [13]:
builder = SparkSession.\
        builder.\
        appName("pyspark-test").\
        config("spark.driver.extraClassPath", f"{SPARK_HOME}/jars/hadoop-aws-2.7.4.jar:{SPARK_HOME}/jars/aws-java-sdk-1.7.4.jar").\
        config("spark.executor.extraClassPath", f"{SPARK_HOME}/jars/hadoop-aws-2.7.4.jar:{SPARK_HOME}/jars/aws-java-sdk-1.7.4.jar")

spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", AWS_ACCESS_KEY)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_SECRET_KEY)

results = client.create_data_rows_from_delta_table(dataset_id=dataset.uid, project_id=project.uid, verbose=True, table_path=table_path, spark=spark)

Run `create_data_rows_from_delta_table()` with spark_config arg:

In [None]:
results = client.create_data_rows_from_delta_table(dataset_id=dataset.uid, project_id=project.uid, verbose=True, table_path=table_path, spark_config=aws_spark_config)

In [None]:
#Check the status of the upload results to make sure the upload was successful
results

# GCS:


Build Spark Session before running `create_data_rows_from_delta_table()`:

In [2]:
builder = SparkSession.\
        builder.\
        appName("pyspark-demo").\
        config("spark.driver.extraClassPath", f"{SPARK_HOME}/jars/gcs-connector-hadoop3-latest.jar").\
        config("spark.executor.extraClassPath", f"{SPARK_HOME}/jars/gcs-connector-hadoop3-latest.jar")

spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark.conf.set("spark.hadoop.fs.gs.auth.service.account.enable", "true")
spark.conf.set("google.cloud.auth.type", "SERVICE_ACCOUNT_JSON_KEYFILE")
spark.conf.set('google.cloud.auth.service.account.json.keyfile', credentials_file)
spark.conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")

results = client.create_data_rows_from_delta_table(dataset_id=dataset.uid, verbose=True, table_path=table_path, spark=spark)

Run `create_data_rows_from_delta_table()` with spark_config arg:

In [None]:
results = client.create_data_rows_from_delta_table(dataset_id=dataset.uid, verbose=True, table_path=table_path, spark_config=gcs_spark_config)

In [None]:
#Check the status of the upload results to make sure the upload was successful
results