In [0]:
!pip install clarifai==9.9.3
!pip install pyspark


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.2.2[0m[39;49m -> [0m[32;49m23.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.2.2[0m[39;49m -> [0m[32;49m23.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [0]:
!pip install protobuf==4.24.2
!pip show protobuf
dbutils.library.restartPython() 
     


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.2.2[0m[39;49m -> [0m[32;49m23.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Name: protobuf
Version: 4.24.2
Summary: 
Home-page: https://developers.google.com/protocol-buffers/
Author: protobuf@googlegroups.com
Author-email: protobuf@googlegroups.com
License: 3-Clause BSD License
Location: /local_disk0/.ephemeral_nfs/envs/pythonEnv-9aa8405a-8613-4655-977f-9f791b447e0a/lib/python3.10/site-packages
Requires: 
Required-by: clarifai-grpc, facets-overview, googleapis-common-protos, grpcio-status


# Writing ClarifaiPySpark SDK functions

### ClarifaiPySpark Dataset class

In [0]:
import json
import time
import uuid
from typing import List

import requests
from clarifai.client.app import App
from clarifai.client.dataset import Dataset
from clarifai.client.input import Inputs
from clarifai.client.user import User
from clarifai.errors import UserError
from clarifai_grpc.grpc.api.resources_pb2 import Input
from google.protobuf.json_format import MessageToJson
from google.protobuf.struct_pb2 import Struct
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql import SparkSession


class Dataset(Dataset):
  """
  Dataset class provides information about dataset of the app
  and it inherits from the clarifai SDK Dataset class.
  """

  def __init__(self, user_id: str = "", app_id: str = "", dataset_id: str = ""):
    """Initializes the Dataset object.

    Args:
        user_id (str): The clarifai user ID of the user.
        app_id (str): Clarifai App ID.
        dataset_id (str): Dataset ID of the dataset inside the clarifai App.

    Example: TODO
    """
    self.user = User(user_id=user_id)
    self.app = App(app_id=app_id)
    #Inputs object - for listannotations
    #input_obj = User(user_id="user_id").app(app_id="app_id").inputs()
    self.user_id = user_id
    self.app_id = app_id
    self.dataset_id = dataset_id
    super().__init__(user_id=user_id, app_id=app_id, dataset_id=dataset_id)

  def upload_dataset_from_csv(self,
                              csv_path: str = "",
                              input_type: str = 'text',
                              csv_type: str = None,
                              labels: bool = True,
                              chunk_size: int = 128) -> None:
    """Uploads dataset to clarifai app from the csv file path.

    Args:
        csv_path (str): CSV file path of the dataset to be uploaded into clarifai App.
        input_type (str): Input type of the dataset whether (Image, text).
        csv_type (str): Type of the csv file contents(url, raw, filepath).
        labels (bool): Give True if labels column present in dataset else False.
        chunk_size (int): chunk size of parallel uploads of inputs and annotations.

    Example: TODO

    Note:
        CSV file supports 'inputid', 'input', 'concepts', 'metadata', 'geopoints' columns.
        All the data in the CSV should be in double quotes.
        metadata should be in single quotes format. Example: "{'key': 'value'}"
        geopoints should be in "long,lat" format.

    """
    ### TODO: Can input column names & extract them to convert to our csv format
    self.upload_from_csv(
        csv_path=csv_path,
        input_type=input_type,
        csv_type=csv_type,
        labels=labels,
        chunk_size=chunk_size)

  def upload_dataset_from_folder(self,
                                 folder_path: str,
                                 input_type: str,
                                 labels: bool = False,
                                 chunk_size: int = 128) -> None:
    """Uploads dataset from folder into clarifai app.

    Args:
        folder_path (str): folder path of the dataset to be uploaded into clarifai App.
        input_type (str): Input type of the dataset whether (Image, text).
        labels (bool): Give True if folder name is a label name else False.
        chunk_size (int): chunk size of parallel uploads of inputs and annotations.

    Example: TODO

    Note:
        Can provide a volume or S3 path to the folder
        If label is true, then folder name is class name (label)
    """

    self.upload_from_folder(
        folder_path=folder_path, input_type=input_type, labels=labels, chunk_size=chunk_size)

  def _get_inputs_from_dataframe(self,
                                dataframe,
                                input_type: str,
                                df_type: str,
                                dataset_id: str = None,
                                labels: str = True) -> List[Input]:
    input_protos = []
    input_obj = Inputs(user_id=self.user_id, app_id=self.app_id)

    for row in dataframe.collect():
      if labels:
        labels_list = row["concepts"].split(',')
        labels = labels_list if len(row['concepts']) > 0 else None
      else:
        labels = None

      if 'metadata' in dataframe.columns:
        if row['metadata'] is not None and len(row['metadata']) > 0:
          metadata_str = row['metadata'].replace("'", '"')
          try:
            metadata_dict = json.loads(metadata_str)
          except json.decoder.JSONDecodeError:
            raise UserError("metadata column in CSV file should be a valid json")
          metadata = Struct()
          metadata.update(metadata_dict)
        else:
          metadata = None
      else:
        metadata = None

      if 'geopoints' in dataframe.columns:
        if row['geopoints'] is not None and len(row['geopoints']) > 0:
          geo_points = row['geopoints'].split(',')
          geo_points = [float(geo_point) for geo_point in geo_points]
          geo_info = geo_points if len(geo_points) == 2 else UserError(
              "geopoints column in CSV file should have longitude,latitude")
        else:
          geo_info = None
      else:
        geo_info = None

      input_id = row['inputid'] if 'inputid' in dataframe.columns else uuid.uuid4().hex
      text = row["input"] if input_type == 'text' else None
      image = row['input'] if input_type == 'image' else None
      video = row['input'] if input_type == 'video' else None
      audio = row['input'] if input_type == 'audio' else None

      if df_type == 'raw':
        input_protos.append(
            input_obj.get_text_input(
                input_id=input_id,
                raw_text=text,
                dataset_id=dataset_id,
                labels=labels,
                geo_info=geo_info))
      elif df_type == 'url':
        input_protos.append(
            input_obj.get_input_from_url(
                input_id=input_id,
                image_url=image,
                text_url=text,
                audio_url=audio,
                video_url=video,
                dataset_id=dataset_id,
                labels=labels,
                geo_info=geo_info))
      else:
        input_protos.append(
            input_obj.get_input_from_file(
                input_id=input_id,
                image_file=image,
                text_file=text,
                audio_file=audio,
                video_file=video,
                dataset_id=dataset_id,
                labels=labels,
                geo_info=geo_info))

    return input_protos

  def upload_dataset_from_dataframe(self,
                                    dataframe,
                                    input_type: str,
                                    df_type: str = None,
                                    labels: bool = True,
                                    chunk_size: int = 128) -> None:
    """Uploads dataset from a dataframe.
       Expected columns in the dataframe are inputid, input, concepts (optional), metadata (optional), geopoints (optional).

      Args:
          task (str): task type(text_clf, visual-classification, visual_detection, visual_segmentation, visual-captioning).
          split (str): split type(train, test, val).
          module_dir (str): path to the module directory.
          dataset_loader (str): name of the dataset loader.
          chunk_size (int): chunk size for concurrent upload of inputs and annotations.

      Example: TODO
    """

    if input_type not in ('image', 'text', 'video', 'audio'):
      raise UserError('Invalid input type, it should be image,text,audio or video')

    if df_type not in ('raw', 'url', 'file_path'):
      raise UserError('Invalid csv type, it should be raw, url or file_path')

    if df_type == 'raw' and input_type != 'text':
      raise UserError('Only text input type is supported for raw csv type')

    if not isinstance(dataframe, SparkDataFrame):
      raise UserError('dataframe should be a Spark DataFrame')

    chunk_size = min(128, chunk_size)
    input_obj = input_obj = Inputs(user_id=self.user_id, app_id=self.app_id)
    input_protos = self._get_inputs_from_dataframe(
        dataframe=dataframe,
        df_type=df_type,
        input_type=input_type,
        dataset_id=self.dataset_id,
        labels=labels)
    return (input_obj._bulk_upload(inputs=input_protos, chunk_size=chunk_size))

  def upload_dataset_from_dataloader(self,
                                     task: str,
                                     split: str,
                                     module_dir: str = None,
                                     chunk_size: int = 128) -> None:
    """Uploads dataset using a dataloader function for custom formats.

    Args:
        task (str): task type(text_clf, visual-classification, visual_detection, visual_segmentation, visual-captioning).
        split (str): split type(train, test, val).
        module_dir (str): path to the module directory.
        chunk_size (int): chunk size for concurrent upload of inputs and annotations.

    Example: TODO
    """
    self.upload_dataset(task=task, split=split, module_dir=module_dir, chunk_size=chunk_size)

  def upload_dataset_from_table(self,
                                table_path: str,
                                input_type: str,
                                table_type: str,
                                labels: bool,
                                chunk_size: int = 128) -> None:
    """upload dataset to clarifai app from spark tables.

    Args:
        table_path (str): path of the table to be uploaded.
        task (str):
        split (str):
        input_type (str): Input type of the dataset whether (Image, text).
        table_type (str): Type of the table contents (url, raw, filepath).
        labels (bool): Give True if labels column present in dataset else False.
        module_dir (str): path to the module directory.
        dataset_loader (str): name of the dataset loader.
        chunk_size (int): chunk size for concurrent upload of inputs and annotations.
    Note:
        Accepted csv format - input, label
        TODO: dataframe dataloader template
        TODO: Can input column names & extreact them to convert to our csv format
    """
    spark = SparkSession.builder.appName('Clarifai-spark').getOrCreate()
    tempdf = spark.read.format("delta").load(table_path)
    self.upload_from_dataframe(
        dataframe=tempdf,
        input_type=input_type,
        df_type=table_type,
        labels=labels,
        chunk_size=chunk_size)

  def list_inputs(self, per_page: int = None, input_type: str = None):
    """Lists all the inputs from the app.

    Args:
        per_page (str): No of response of inputs per page.
        input_type (str): Input type that needs to be displayed (text,image)
        TODO: Do we need input_type ?, since in our case it is image, so probably we can go with default value of "image".

    Examples:
        TODO

    Returns:
        list of inputs.
        """
    input_obj = Inputs(user_id=self.user_id, app_id=self.app_id)
    return input_obj.list_inputs(
        dataset_id=self.dataset_id, input_type=input_type, per_page=per_page)

  def list_annotations(self, input_ids: list = None, per_page: int = None, input_type: str = None):
    """Lists all the annotations for the inputs in the dataset of a clarifai app.

    Args:
        input_ids (list): list of input_ids for which user wants annotations
        per_page (str): No of response of inputs per page.
        input_type (str): Input type that needs to be displayed (text,image)
        TODO: Do we need input_type ?, since in our case it is image, so probably we can go with default value of "image".

    Examples:
        TODO

    Returns:
        list of annotations.
    """
    ### input_ids: list of input_ids for which user wants annotations
    input_obj = Inputs(user_id=self.user_id, app_id=self.app_id)
    if not input_ids:
        all_inputs = list(
            input_obj.list_inputs(
                dataset_id=self.dataset_id, input_type=input_type, per_page=per_page))
    else:
        all_inputs = [input_obj._get_proto(input_id=inpid, dataset_id=self.dataset_id) for inpid in input_ids]
    return input_obj.list_annotations(batch_input=all_inputs)

  def export_annotations_to_dataframe(self, input_ids: list = None):
    """Export all the annotations from clarifai App's dataset to spark dataframe.

    Args:
        input_ids (list): list of input_ids for which user wants annotations

    Examples:
        TODO

    Returns:
        spark dataframe with annotations"""

    annotation_list = []
    spark = SparkSession.builder.appName('Clarifai-spark').getOrCreate()
    response = list(self.list_annotations(input_ids=input_ids))
    for an in response:
      temp = {}
      temp['annotation'] = json.loads(MessageToJson(an.data))
      if not temp['annotation'] or temp['annotation'] == '{}' or temp['annotation'] == {}:
        continue
      temp['id'] = an.id
      temp['user_id'] = an.user_id
      temp['input_id'] = an.input_id
      try:
        created_at = float(f"{an.created_at.seconds}.{an.created_at.nanos}")
        temp['created_at'] = time.strftime('%m/%d/% %H:%M:%5', time.gmtime(created_at))
        modified_at = float(f"{an.modified_at.seconds}.{an.modified_at.nanos}")
        temp['modified_at'] = time.strftime('%m/%d/% %H:%M:%5', time.gmtime(modified_at))
      except:
        temp['created_at'] = float(f"{an.created_at.seconds}.{an.created_at.nanos}")
        temp['modified_at'] = float(f"{an.modified_at.seconds}.{an.modified_at.nanos}")
      annotation_list.append(temp)
    return spark.createDataFrame(annotation_list)

  def export_images_to_volume(self, path, input_response):
    for resp in input_response:
      imgid = resp.id
      ext = resp.data.image.image_info.format
      url = resp.data.image.url
      img_name = path + '/' + imgid + '.' + ext.lower()
      headers = {"Authorization": self.metadata[0][1]}
      response = requests.get(url, headers=headers)
      with open(img_name, "wb") as f:
        f.write(response.content)

  def export_text_to_volume(self, path, input_response):
    for resp in input_response:
      textid = resp.id
      url = resp.data.text.url
      file_name = path + '/' + textid + '.txt'
      enc = resp.data.text.text_info.encoding
      headers = {"Authorization": self.metadata[0][1]}
      response = requests.get(url, headers=headers)
      with open(file_name, "a", encoding=enc) as f:
        f.write(response.content.decode())


### ClarifaiPySpark Client class

In [0]:
from clarifai.client.app import App
from clarifai.client.base import BaseClient
from clarifai.client.user import User

# from clarifaipyspark.dataset import Dataset


class ClarifaiPySpark(BaseClient):
  """
  ClarifaiPySpark inherits the BaseClient class from the clarifai SDK and it initializes the client.
  """

  def __init__(self, user_id: str = "", app_id: str = ""):
    """Initializes clarifai client object.

    Args:
      - user_id (str): A user ID for authentication.
      - app_id (str): An app ID for the application to interact with.
    """

    self.user = User(user_id=user_id)
    self.app = App(app_id=app_id)
    self.user_id = user_id
    self.app_id = app_id
    super().__init__(user_id=user_id, app_id=app_id)

  def dataset(self, dataset_id):
    """Initializes the dataset method with dataset_id.

    Args:
      dataset_id: The dataset_id within the user app.

    Returns:
      Dataset object for the dataset_id.
    """

    try:
      self.app.dataset(dataset_id=dataset_id)
    except:
      print("Creating a new dataset")
      self.app.create_dataset(dataset_id=dataset_id)

    return Dataset(dataset_id=dataset_id, user_id=self.user_id, app_id=self.app_id)


# Testing ClarifaiPySpark SDK with S3

### Setting Env variables

In [0]:
import os

os.environ['CLARIFAI_PAT'] = '95978ef1e65e4e1ab8b268e94a49b1e9'

### Creating ClarifaiPyspark object & creating/fetching image dataset from app

In [0]:
cspark_obj = ClarifaiPySpark(user_id='mansi_k', app_id='databricks_tester_img')

dataset_obj = cspark_obj.dataset(dataset_id='dataset1')

### Uploading dataset from CSV stored in S3

In [0]:
df_csv = spark.read.format("csv").option('header', 'true').load("s3://new-bucket-for-databricks-integration-23102023/image_urls3.csv")
df_csv.show(3)

+-------+--------------------+--------+--------------------+
|inputid|               input|concepts|            metadata|
+-------+--------------------+--------+--------------------+
|image01|https://samples.c...|   image|{'filename': 'can...|
|image02|https://samples.c...|   image|                NULL|
|image03|https://samples.c...|   image|{'filename': 'doo...|
+-------+--------------------+--------+--------------------+



In [0]:
dataset_obj.upload_dataset_from_csv(csv_path='s3://new-bucket-for-databricks-integration-23102023/image_urls3.csv', source='s3', input_type='image', csv_type='url', labels=True)

Uploading inputs:   0%|          | 0/1 [00:00<?, ?it/s]Uploading inputs: 100%|██████████| 1/1 [00:02<00:00,  2.42s/it]Uploading inputs: 100%|██████████| 1/1 [00:02<00:00,  2.42s/it]


### Uploading dataset from a delta table stored in S3

In [0]:
df_delta = spark.read.format("delta").load("s3://new-bucket-for-databricks-integration-23102023/img3_deltatable/")
df_delta.show()

+-------+--------------------+--------+
|inputid|               input|concepts|
+-------+--------------------+--------+
| img111|https://cdni.auto...|     car|
| img211|https://cdni.auto...|     car|
+-------+--------------------+--------+



In [0]:
dataset_obj.upload_dataset_from_table(table_path="s3://new-bucket-for-databricks-integration-23102023/img3_deltatable/", input_type='image', table_type='url', labels=True)

Uploading inputs:   0%|          | 0/1 [00:00<?, ?it/s]Uploading inputs: 100%|██████████| 1/1 [00:02<00:00,  2.42s/it]Uploading inputs: 100%|██████████| 1/1 [00:02<00:00,  2.42s/it]
