In [5]:
import requests
import json
import hashlib
import hmac
import base64
from datetime import datetime , timezone
import azure.cosmos


from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, MapType, TimestampType, LongType

# Define schema
schema = StructType([
    StructField("id", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("query_text", StringType(), True),
    StructField("date_from", StringType(), True),
    StructField("date_to", StringType(), True),
    StructField("priority", LongType(), True),
    StructField("email", StringType(), True),
    StructField("status", LongType(), True),
    StructField("metadata.created_at", TimestampType(), True),
    StructField("metadata.updated_at", TimestampType(), True),
    StructField("metadata.source", StringType(), True),
    StructField("partitionKey", StringType(), True),
    StructField("updated_at", StringType(), True),
    StructField("_rid", StringType(), True),
    StructField("_self", StringType(), True),
    StructField("_etag", StringType(), True),
    StructField("_attachments", StringType(), True),
    StructField("_ts", LongType(), True)
])


StatementMeta(, 9383db11-2fc4-4b33-8c3b-cbc6f91ab628, 9, Finished, Available, Finished)

In [2]:
import os
from email.policy import default

from azure.cosmos import CosmosClient, PartitionKey, exceptions
from typing import List, Dict, Any, Optional

class CosmosDBClient:
    def __init__(self, url: str, key: str, database_name: str, container_name: str, partition_key: str):
        """
        Initialize the CosmosDBClient with Cosmos DB URL, key, database, container, and partition key.

        Parameters:
            url (str): Cosmos DB endpoint URL.
            key (str): Cosmos DB primary key for authorization.
            database_name (str): Name of the Cosmos DB database.
            container_name (str): Name of the Cosmos DB container.
            partition_key (str): Partition key path for the container.
        """
        self.url = url
        self.key = key
        self.database_name = database_name
        self.container_name = container_name
        self.partition_key = partition_key

        # Initialize Cosmos Client and connect to database and container
        try:
            self.client = CosmosClient(self.url, credential=self.key)
            self.database = self._create_database_if_not_exists()
            self.container = self._create_container_if_not_exists()
        except exceptions.CosmosHttpResponseError as e:
            print(f"Failed to connect to Cosmos DB: {e}")

    def _create_database_if_not_exists(self):
        """
        Create the database if it does not exist.

        Returns:
            DatabaseProxy: A reference to the Cosmos DB database.
        """
        return self.client.create_database_if_not_exists(id=self.database_name)

    def _create_container_if_not_exists(self):
        """
        Create the container if it does not exist with a specified partition key.

        Returns:
            ContainerProxy: A reference to the Cosmos DB container.
        """
        return self.database.create_container_if_not_exists(
            id=self.container_name,
            partition_key=PartitionKey(path=f"/{self.partition_key}"),
            offer_throughput=400  # Set the desired throughput
        )

    def create_document(self, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """
        Insert a document into the container. ID will be autogenerated if not provided.

        Parameters:
            data (Dict[str, Any]): Document data to insert.

        Returns:
            Optional[Dict[str, Any]]: The created document, or None if an error occurs.
        """
        try:
            data.setdefault(self.partition_key, "default_partition")  # Ensure partition key exists
            document = self.container.create_item(body=data)
            print("Document created successfully.")
            return document
        except exceptions.CosmosHttpResponseError as e:
            print(f"Error creating document: {e}")
            return None

    def upsert_document(self, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """
        Upsert (update or insert) a document in the container.

        Parameters:
            data (Dict[str, Any]): Document data to upsert.

        Returns:
            Optional[Dict[str, Any]]: The upserted document, or None if an error occurs.
        """
        try:
            data.setdefault(self.partition_key, "default_partition")
            document = self.container.upsert_item(body=data)
            print("Document upserted successfully.")
            return document
        except exceptions.CosmosHttpResponseError as e:
            print(f"Error upserting document: {e}")
            return None

    def read_document(self, document_id: str, partition_key: str) -> Optional[Dict[str, Any]]:
        """
        Read a document by ID.

        Parameters:
            document_id (str): The ID of the document to read.
            partition_key (str): The partition key of the document.

        Returns:
            Optional[Dict[str, Any]]: The retrieved document, or None if not found.
        """
        try:
            document = self.container.read_item(item=document_id, partition_key=partition_key)
            return document
        except exceptions.CosmosResourceNotFoundError:
            print("Document not found.")
            return None
        except exceptions.CosmosHttpResponseError as e:
            print(f"Error reading document: {e}")
            return None

    def query_documents(self, query: str) -> List[Dict[str, Any]]:
        """
        Query documents in the container.

        Parameters:
            query (str): SQL query string to execute.

        Returns:
            List[Dict[str, Any]]: A list of documents that match the query.
        """
        try:
            items = list(self.container.query_items(query=query, enable_cross_partition_query=True))
            return items
        except exceptions.CosmosHttpResponseError as e:
            print(f"Error querying documents: {e}")
            return []

    def delete_document(self, document_id: str, partition_key: str) -> bool:
        """
        Delete a document by ID.

        Parameters:
            document_id (str): The ID of the document to delete.
            partition_key (str): The partition key of the document.

        Returns:
            bool: True if the document was deleted, False otherwise.
        """
        try:
            self.container.delete_item(item=document_id, partition_key=partition_key)
            print("Document deleted successfully.")
            return True
        except exceptions.CosmosResourceNotFoundError:
            print("Document not found.")
            return False
        except exceptions.CosmosHttpResponseError as e:
            print(f"Error deleting document: {e}")
            return False

    def list_all_documents(self) -> List[Dict[str, Any]]:
        """
        List all documents in the container.

        Returns:
            List[Dict[str, Any]]: A list of all documents in the container.
        """
        try:
            items = list(self.container.read_all_items())
            return items
        except exceptions.CosmosHttpResponseError as e:
            print(f"Error listing all documents: {e}")
            return []


StatementMeta(, 9383db11-2fc4-4b33-8c3b-cbc6f91ab628, 6, Finished, Available, Finished)

In [10]:
import pandas as pd

COSMOS_URL = ''
COSMOS_KEY = ""
DATABASE_NAME = ''
CONTAINER_NAME = ''
PARTITION_KEY = ''  # Ensure this matches the partition key path in your Cosmos DB

USER_ID = ""

cosmos_client_query = CosmosDBClient(
    url=COSMOS_URL,
    key=COSMOS_KEY,
    database_name=DATABASE_NAME,
    container_name=CONTAINER_NAME,
    partition_key=PARTITION_KEY
)
def appendTable(source,name):
    spark_df = spark.createDataFrame(source,schema)
    spark_df.write.mode("append").format("delta").saveAsTable(name)

try:
    df = spark.sql("SELECT * FROM query.query_pending  WHERE status=0 ORDER BY priority LIMIT 1000")
except:
    appendTable([],"query_pending")
    df = spark.sql("SELECT * FROM query.query_pending  WHERE status=0 ORDER BY priority LIMIT 1000")


df_pd = df.toPandas()


StatementMeta(, 9383db11-2fc4-4b33-8c3b-cbc6f91ab628, 14, Finished, Available, Finished)

In [9]:


for _id, user_id in zip(df_pd["id"], df_pd["user_id"]):
    # Query to get the specific document by its ID and user_id (partition key)
    query = f"SELECT * FROM c WHERE c.id = '{_id}' AND c.user_id = '{user_id}'"
    documents = cosmos_client_query.query_documents(query=query)
    
    if documents:
        document = documents[0]  # Assuming each ID corresponds to a single document
        
        # Update the status and add a timestamp
        document['status'] = 1
        document['updated_at'] = datetime.now(timezone.utc).isoformat()  # Optional timestamp
        
        # Upsert the updated document back into Cosmos DB
        cosmos_client_query.upsert_document(document)


df_pd["status"] =1

appendTable(df_pd,"query.query_processing")


spark.sql("DROP TABLE IF EXISTS query_pending")
empty_pending = spark.createDataFrame([], schema)
appendTable([],"query_pending")

StatementMeta(, 9383db11-2fc4-4b33-8c3b-cbc6f91ab628, 13, Finished, Available, Finished)

Document upserted successfully.


AnalysisException: [_LEGACY_ERROR_TEMP_DELTA_0007] A schema mismatch detected when writing to the Delta table (Table ID: 515fd3b5-d45e-4ab8-b6f5-2475daf90db5).
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.

Table schema:
root
-- hash_id: long (nullable = true)
-- title: string (nullable = true)
-- authors: string (nullable = true)
-- published: timestamp (nullable = true)
-- summary: string (nullable = true)
-- pdf_url: string (nullable = true)
-- entry_id: string (nullable = true)
-- recommended: long (nullable = true)
-- referenceCount: long (nullable = true)
-- citationCount: long (nullable = true)
-- references: string (nullable = true)
-- citations: string (nullable = true)
-- s2FieldsOfStudy: string (nullable = true)
-- tldr: string (nullable = true)
-- query_id: string (nullable = true)
-- Tag_1: string (nullable = true)
-- Tag_2: string (nullable = true)
-- Tag_3: string (nullable = true)
-- Tag_4: string (nullable = true)
-- Tag_5: string (nullable = true)
-- field: string (nullable = true)


Data schema:
root
-- id: string (nullable = true)
-- user_id: string (nullable = true)
-- query_text: string (nullable = true)
-- date_from: string (nullable = true)
-- date_to: string (nullable = true)
-- priority: long (nullable = true)
-- email: string (nullable = true)
-- status: long (nullable = true)
-- metadata.created_at: timestamp (nullable = true)
-- metadata.updated_at: timestamp (nullable = true)
-- metadata.source: string (nullable = true)
-- partitionKey: string (nullable = true)
-- _rid: string (nullable = true)
-- _self: string (nullable = true)
-- _etag: string (nullable = true)
-- _attachments: string (nullable = true)
-- _ts: long (nullable = true)

         

In [None]:
import pandas as pd

COSMOS_URL = ''
COSMOS_KEY = ""
DATABASE_NAME = ''
CONTAINER_NAME = ''
PARTITION_KEY = ''  # Ensure this matches the partition key path in your Cosmos DB

USER_ID = ""

cosmos_client_query = CosmosDBClient(
    url=COSMOS_URL,
    key=COSMOS_KEY,
    database_name=DATABASE_NAME,
    container_name=CONTAINER_NAME,
    partition_key=PARTITION_KEY
)
def appendTable(source,name):
    spark_df = spark.createDataFrame(source,schema)
    spark_df.write.mode("append").format("delta").saveAsTable(name)

try:
    df = spark.sql("SELECT * FROM query.query_pending  WHERE status=0 ORDER BY priority LIMIT 1000")
except:
    appendTable([],"query_pending")
    df = spark.sql("SELECT * FROM query.query_pending  WHERE status=0 ORDER BY priority LIMIT 1000")


df_pd = df.toPandas()
