
# **Sarvam Parse PDF Batch API Tutorial**

This notebook demonstrates how to use the **Sarvam PARSE PDF Batch API** to process and extract structured data from multiple PDF files simultaneously. The Batch API allows you to upload a batch of PDFs, parse them, and retrieve their content in HTML format for further analysis.

### **Instructions:**

1. **Get Your Subscription Key:**
   - Go to [dashboard.sarvam.ai](https://dashboard.sarvam.ai) and sign in to your account.
   - Copy your **API Subscription Key** for authentication.

2. **Set Up Google Colab Folders:**
   - Create two folders in your Google Colab notebook:
     - **`input/`**: This is where you'll upload your PDF files for batch processing.
     - **`output/`**: The parsed files will be saved here, and you can download them after processing.

3. **Upload Multiple PDF Files for Batch Processing:**
   - Use the **Upload** button in Google Colab to upload all the PDF files you want to process into the **input/** folder.
   - Make sure you upload your PDFs as a batch.

4. **Run the Script to Parse the PDFs:**
   - The script will send the entire batch of uploaded PDF files to the **Sarvam Parse PDF Batch API**, process them, and save the parsed content in **HTML format** in the **output/** folder.

5. **Download Processed Files:**
   - Once the batch processing is completed, the parsed files will be saved in the **output/** folder. You can then download the processed HTML files from there.



In [None]:
!pip install azure-storage-file-datalake aiofiles requests



In [None]:
import os

# Set your subscription key here
os.environ['SUBSCRIPTION_KEY'] = 'd75d7bf3-b053-4084-ac80-c37561a35bfc'

# Verify the environment variable
print("SUBSCRIPTION_KEY:", os.getenv('SUBSCRIPTION_KEY'))

SUBSCRIPTION_KEY: d75d7bf3-b053-4084-ac80-c37561a35bfc


### **Import Libraries**

In [None]:
import asyncio
from urllib.parse import urlparse
import aiofiles
from azure.storage.filedatalake.aio import DataLakeDirectoryClient, FileSystemClient
from azure.storage.filedatalake import ContentSettings
import mimetypes
import requests
import time
import json
import os

## **Step 1: Initialize SarvamJobHandler**

The `SarvamJobHandler` class handles job initialization, starting, and status monitoring.

In [None]:
class SarvamJobHandler:
    BASE_URL = "https://api.sarvam.ai/parse"

    def __init__(self):
        """
        Initializes the SarvamJobHandler with the given subscription key.
        """
        self.subscription_key = os.getenv('SUBSCRIPTION_KEY')
        if not self.subscription_key:
            raise EnvironmentError("Environment variable 'SUBSCRIPTION_KEY' is not set.")

    def initialise_job(self):
        """
        Initializes a new job by sending a POST request.
        Returns:
            dict: Response data containing job details.
        """
        INIT_URL = f"{self.BASE_URL}/job/init"
        headers = {
            'API-Subscription-Key': self.subscription_key
        }
        try:
            response = requests.request("POST", INIT_URL, headers=headers)
            response_data = response.json()
        except Exception as e:
            print(f"Error occurred while trying to initialize job: {e}")
            exit()

        return response_data

    def start_job(self, job_id, file_details):
        """
        Starts a job with the given parameters by sending a POST request.
        Args:
            job_id (str): ID of the job.
            file_details list[dict]: Dictionary with local file path and corresponding start_page and end_page.

        Returns:
            dict: Response data from the server.
        """
        START_JOB_URL = f"{self.BASE_URL}/job"
        headers = {
            'API-Subscription-Key': self.subscription_key,
            'Content-Type': 'application/json'
        }

        payload = {
            "job_id": job_id,
            "job_parameters": {
                "file_intervals": file_details,
                "receiver_email": "",
                "sarvam_mode": "large"
            }
        }

        try:
            response = requests.request("POST", START_JOB_URL, headers=headers, json=payload)
            response_data = response.json()
            print("Job started successfully:", json.dumps(response_data, indent=2))
        except Exception as e:
            print(f"Error occurred while trying to start job: {e}")
            response_data = {}

        return response_data

    def get_job_status(self, job_id, polling_interval=45):
        """
        Periodically checks the status of a job until it is completed.
        Args:
            job_id (str): ID of the job to check.
            polling_interval (int): Interval between status checks (default is 45 seconds).

        Returns:
            dict: Final response data when the job is completed.
        """
        headers = {
            'API-Subscription-Key': self.subscription_key
        }

        while True:
            url = f"{self.BASE_URL}/job/{job_id}/status"
            try:
                response = requests.request("GET", url, headers=headers)
                response_data = response.json()

                job_state = response_data.get('job_state')
                print(f"Current Job State: {job_state}")

                if job_state == 'Completed':
                    print("Job has been completed.")
                    return response_data
                elif job_state == 'Failed':
                    print(f"Job failed")
                    return response_data

            except Exception as e:
                print(f"Error occurred while checking job status: {e}")

            time.sleep(polling_interval)

## **Step 2: Initialize SarvamClient**

The `SarvamClient` class handles file uploads, downloads, and deletions from Azure Data Lake Storage.


In [None]:
class SarvamClient:
    def __init__(self, url: str):
        # Extract components from the provided URL
        self.account_url, self.file_system_name, self.directory_name, self.sas_token = (
            self._extract_url_components(url)
        )
        self.lock = asyncio.Lock()

    def update_url(self, url: str):
        self.account_url, self.file_system_name, self.directory_name, self.sas_token = (
            self._extract_url_components(url)
        )

    def _extract_url_components(self, url: str):
        """
        Extracts the components from the Azure Data Lake URL.
        """
        # Parse the URL
        parsed_url = urlparse(url)

        # Construct the account URL and replace blob with dfs for the Data Lake URL
        account_url = f"{parsed_url.scheme}://{parsed_url.netloc}".replace(
            ".blob.", ".dfs."
        )

        # Split the path to get the file system and directory
        path_components = parsed_url.path.strip("/").split("/")

        # First part is the file system (e.g., 'bulk-upload-storage')
        file_system_name = path_components[0]

        # The rest forms the directory path (e.g., 'jobs/swiggy-call-analytics/.../outputs')
        directory_name = "/".join(path_components[1:])

        # Extract the SAS token from the URL query
        sas_token = parsed_url.query

        return account_url, file_system_name, directory_name, sas_token

    async def upload_files(self, local_file_paths, overwrite=True):
        """
        Upload multiple files to the directory extracted from the URL.
        """
        async with DataLakeDirectoryClient(
            account_url=f"{self.account_url}?{self.sas_token}",
            file_system_name=self.file_system_name,
            directory_name=self.directory_name,
            credential=None,
        ) as directory_client:
            tasks = []
            for local_file_path in local_file_paths:
                file_name = local_file_path.split("/")[
                    -1
                ]  # Use the file name from the local path
                tasks.append(
                    self._upload_file(
                        directory_client, local_file_path, file_name, overwrite
                    )
                )

            await asyncio.gather(*tasks, return_exceptions=True)

    async def _upload_file(
        self, directory_client, local_file_path, file_name, overwrite=True
    ):
        """
        Helper method to upload a single file.
        """
        try:
            async with aiofiles.open(local_file_path, mode="rb") as file_data:
                mime_type, _ = mimetypes.guess_type(local_file_path)
                if mime_type is None:
                    mime_type = "audio/wav"
                file_client = directory_client.get_file_client(file_name)
                await file_client.upload_data(
                    file_data,
                    overwrite=overwrite,
                    content_settings=ContentSettings(content_type=mime_type),
                )
                print(f"File '{file_name}' uploaded successfully!")
        except Exception as e:
            print(f"Failed to upload '{file_name}': {e}")

    async def list_files(self):
        """
        Return a list of file names (not full paths) in the directory extracted from the URL, protected with a lock.
        """
        file_names = []
        async with FileSystemClient(
            account_url=f"{self.account_url}?{self.sas_token}",
            file_system_name=self.file_system_name,
            credential=None,
        ) as file_system_client:
            async for path in file_system_client.get_paths(self.directory_name):
                file_name = path.name.split("/")[
                    -1
                ]  # Extract the last part of the path (file name)
                async with self.lock:  # Acquire lock before modifying file_names
                    file_names.append(file_name)
        return file_names

    async def download_files(self, file_names, destination_dir):
        """
        Download files from the directory extracted from the URL to a local directory.
        """
        os.makedirs(destination_dir, exist_ok=True)
        destination_dir.rsplit("/")[-1]
        async with DataLakeDirectoryClient(
            account_url=f"{self.account_url}?{self.sas_token}",
            file_system_name=self.file_system_name,
            directory_name=self.directory_name,
            credential=None,
        ) as directory_client:
            tasks = []
            for file_name in file_names:
                tasks.append(
                    self._download_file(directory_client, file_name, destination_dir)
                )

            await asyncio.gather(*tasks, return_exceptions=True)

    async def _download_file(self, directory_client, file_name, destination_dir):
        """
        Helper method to download a single file.
        """
        try:
            file_client = directory_client.get_file_client(file_name)
            download_path = f"{destination_dir}/{file_name}"
            async with aiofiles.open(download_path, mode="wb") as file_data:
                stream = await file_client.download_file()
                data = await stream.readall()
                await file_data.write(data)
            print(f"File '{file_name}' downloaded successfully to '{download_path}'!")
        except Exception as e:
            print(f"Failed to download '{file_name}': {e}")

    async def delete_files(self, file_names):
        """
        Delete files from the directory extracted from the URL.
        """
        async with DataLakeDirectoryClient(
            account_url=f"{self.account_url}?{self.sas_token}",
            file_system_name=self.file_system_name,
            directory_name=self.directory_name,
            credential=None,
        ) as directory_client:
            tasks = []
            for file_name in file_names:
                tasks.append(self._delete_file(directory_client, file_name))

            await asyncio.gather(*tasks, return_exceptions=True)

    async def _delete_file(self, directory_client, file_name):
        """
        Helper method to delete a single file.
        """
        try:
            file_client = directory_client.get_file_client(file_name)
            await file_client.delete_file()
            print(f"File '{file_name}' deleted successfully!")
        except Exception as e:
            print(f"Failed to delete '{file_name}': {e}")





## **Step 3: Define Helper Functions**

### **Get File Details**

This function collects file details (file name, start page, and end page) from the user.


In [None]:
def get_file_details():
    """
    Collects file details (file name, start page, and end page) from the user.
    Returns:
        list: A list of dictionaries, each containing file details.
    """
    file_paths = []
    file_details = []
    print("Enter file details. Type 'done' when you are finished.\n")
    while True:
        file_name = input("Enter file path (or type 'done' to finish): ").strip()
        if file_name.lower() == 'done':
            break
        try:
            start_page = int(input(f"Enter start page for '{file_name}': ").strip())
            end_page = int(input(f"Enter end page for '{file_name}': ").strip())

            if start_page > end_page:
                print("Error: Start page cannot be greater than end page. Please try again.")
                continue
            file_paths.append(file_name)
            file_details.append({
                "file_name": file_name.split("/")[-1],
                "page_intervals": [
                    {
                        "start_page": start_page,
                        "end_page": end_page
                    }
                ]
            })
        except ValueError:
            print("Invalid input! Please enter numeric values for start and end pages.")

    return file_paths, file_details

### **Convert JSON to HTML**

This function converts JSON output files to HTML.

In [None]:
def convert_to_html(input_dir, output_dir):
    import base64

    os.makedirs(output_dir, exist_ok=True)

    for filename in sorted(os.listdir(input_dir)):
        if filename.endswith('.json'):
            file_number = filename.split('.')[0]
            with open(os.path.join(input_dir, filename), 'r') as json_file:
                data = json.load(json_file)
            html_content = base64.b64decode(data['output']).decode('utf-8')
            output_filename = f'{file_number}.html'
            with open(os.path.join(output_dir, output_filename), 'w', encoding='utf-8') as html_file:
                html_file.write(html_content)

    print(f"Converted {len(os.listdir(input_dir))} JSON files to HTML in {output_dir}")
    return



## **Step 4: Run the Main Function**

This function orchestrates the entire process:
1. Initializes the job.
2. Uploads files.
3. Starts the job.
4. Monitors the job status.
5. Downloads and converts output files.

In [None]:
async def main():
    # Initialise job
    sarvam_handler = SarvamJobHandler()
    job_data = sarvam_handler.initialise_job()

    job_id = job_data.get('job_id')
    print(f"Extracted job id: {job_id}")
    if job_id is None:
        exit()

    # Get file paths and page intervals
    local_file_paths, file_details = get_file_details()
    # Upload files to storage
    client = SarvamClient(job_data.get('input_storage_path'))
    await client.upload_files(local_file_paths, overwrite=True)
    print(await client.list_files())

    # Start the job
    sarvam_handler.start_job(job_id, file_details)

    # Check job status
    final_status = sarvam_handler.get_job_status(job_id)
    print("Final Job Status:", json.dumps(final_status, indent=2))

    if final_status.get('job_state') == 'Completed':
        # Download the jsons
        client.update_url(url=job_data.get('output_storage_path'))
        files = await client.list_files()
        await client.download_files(file_names=files, destination_dir="./data")

        # Convert downloaded files to HTML
        input_dir = './data'
        output_dir = './output'
        convert_to_html(input_dir, output_dir)

# Run the example
await(main())

Extracted job id: 20250225_6f46fe5d-24f5-4668-becf-1c1c8affafd7
Enter file details. Type 'done' when you are finished.

Enter file path (or type 'done' to finish): /content/input/temp.pdf
Enter start page for '/content/input/temp.pdf': 1
Enter end page for '/content/input/temp.pdf': 1
Enter file path (or type 'done' to finish): /content/input/temp1.pdf
Enter start page for '/content/input/temp1.pdf': 1
Enter end page for '/content/input/temp1.pdf': 1
Enter file path (or type 'done' to finish): done
File 'temp.pdf' uploaded successfully!
File 'temp1.pdf' uploaded successfully!
['temp.pdf', 'temp1.pdf']
Job started successfully: {
  "job_status": {
    "job_state": "Pending",
    "created_at": "2025-02-25T14:55:11.581107+00:00",
    "updated_at": "2025-02-25T14:55:35.746629+00:00",
    "job_id": "20250225_6f46fe5d-24f5-4668-becf-1c1c8affafd7",
    "total_files": 0,
    "successful_files_count": 0,
    "failed_files_count": 0,
    "owner_id": "ba3a72b2144ea99c97e390e5f0f43780c693171b6fd7f

## **5. Conclusion**

This notebook demonstrated how to use the **Sarvam PDF Parser API** to extract structured data from PDF files. By following the steps, you can:

1. Upload a PDF file.
2. Parse the file using the Sarvam API.
3. Download the parsed HTML content for further analysis.

---

## **6. Additional Resources**

For more details, refer to the official **Sarvam API documentation** and join the community for support:

- **Documentation**: [docs.sarvam.ai](https://docs.sarvam.ai)  
- **Community**: [Join the Discord Community](https://discord.gg/hTuVuPNF)

---

## **7. Final Notes**

- Keep your API key secure.
- Explore advanced features like multi-page parsing and custom output formats.

Happy parsing! 🚀
