In [16]:
from pyspark.sql.types import StructType, StructField, LongType, StringType, TimestampType
import pandas as pd
# Define schema for flattened research paper data with additional fields
schema = StructType(
    [
        StructField('hash_id', LongType(), True),  # Unique hash ID for the paper
        StructField('title', StringType(), True),  # Title of the paper
        StructField('authors', StringType(), True),  # Authors of the paper as a comma-separated string
        StructField('published', TimestampType(), True),  # Publication date of the paper
        StructField('summary', StringType(), True),  # Abstract or summary of the paper
        StructField('pdf_url', StringType(), True),  # URL to download the paper's PDF
        StructField('entry_id', StringType(), True),  # Unique entry ID for the paper (e.g., arXiv ID)
        StructField('recommended', LongType(), True),  # Flag indicating if the paper is recommended (1 if recommended, 0 otherwise)
        StructField('referenceCount', LongType(), True),  # Number of references in the paper
        StructField('citationCount', LongType(), True),  # Number of times the paper has been cited
        StructField('references', StringType(), True),  # JSON string of references
        StructField('citations', StringType(), True),  # JSON string of citations
        StructField('s2FieldsOfStudy', StringType(), True),  # JSON string of fields of study
        StructField('tldr', StringType(), True),  # JSON string for TLDR summary
        StructField('query_id', StringType(), True),  # Identifier for the query that generated the result
        StructField('Tag_1', StringType(), True),  # Additional tag 1
        StructField('Tag_2', StringType(), True),  # Additional tag 2
        StructField('Tag_3', StringType(), True),  # Additional tag 3
        StructField('Tag_4', StringType(), True),  # Additional tag 4
        StructField('Tag_5', StringType(), True),  # Additional tag 5
        StructField('field', StringType(), True)  # Field of the paper
    ]
)

df = spark.sql("SELECT * FROM silver.target")
df_pd = df.toPandas()

StatementMeta(, 48c629aa-a57b-401b-b430-be169eb99d26, 20, Finished, Available, Finished)

In [25]:
import os
from email.policy import default

from azure.cosmos import CosmosClient, PartitionKey, exceptions
from typing import List, Dict, Any, Optional
import requests
import json
import hashlib
import hmac
import base64
from datetime import datetime , timezone


from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, MapType, TimestampType, LongType



class CosmosDBClient:
    def __init__(self, url: str, key: str, database_name: str, container_name: str, partition_key: str):
        """
        Initialize the CosmosDBClient with Cosmos DB URL, key, database, container, and partition key.

        Parameters:
            url (str): Cosmos DB endpoint URL.
            key (str): Cosmos DB primary key for authorization.
            database_name (str): Name of the Cosmos DB database.
            container_name (str): Name of the Cosmos DB container.
            partition_key (str): Partition key path for the container.
        """
        self.url = url
        self.key = key
        self.database_name = database_name
        self.container_name = container_name
        self.partition_key = partition_key

        # Initialize Cosmos Client and connect to database and container
        try:
            self.client = CosmosClient(self.url, credential=self.key)
            self.database = self._create_database_if_not_exists()
            self.container = self._create_container_if_not_exists()
        except exceptions.CosmosHttpResponseError as e:
            print(f"Failed to connect to Cosmos DB: {e}")

    def _create_database_if_not_exists(self):
        """
        Create the database if it does not exist.

        Returns:
            DatabaseProxy: A reference to the Cosmos DB database.
        """
        return self.client.create_database_if_not_exists(id=self.database_name)

    def _create_container_if_not_exists(self):
        """
        Create the container if it does not exist with a specified partition key.

        Returns:
            ContainerProxy: A reference to the Cosmos DB container.
        """
        return self.database.create_container_if_not_exists(
            id=self.container_name,
            partition_key=PartitionKey(path=f"/{self.partition_key}"),
            offer_throughput=400  # Set the desired throughput
        )

    def create_document(self, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """
        Insert a document into the container. ID will be autogenerated if not provided.

        Parameters:
            data (Dict[str, Any]): Document data to insert.

        Returns:
            Optional[Dict[str, Any]]: The created document, or None if an error occurs.
        """
        try:
            data.setdefault(self.partition_key, "default_partition")  # Ensure partition key exists
            document = self.container.create_item(body=data)
            print("Document created successfully.")
            return document
        except exceptions.CosmosHttpResponseError as e:
            print(f"Error creating document: {e}")
            return None

    def upsert_document(self, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """
        Upsert (update or insert) a document in the container.

        Parameters:
            data (Dict[str, Any]): Document data to upsert.

        Returns:
            Optional[Dict[str, Any]]: The upserted document, or None if an error occurs.
        """
        try:
            data.setdefault(self.partition_key, "default_partition")
            document = self.container.upsert_item(body=data)
            # print("Document upserted successfully.")
            return document
        except exceptions.CosmosHttpResponseError as e:
            print(f"Error upserting document: {e}")
            return None

    def read_document(self, document_id: str, partition_key: str) -> Optional[Dict[str, Any]]:
        """
        Read a document by ID.

        Parameters:
            document_id (str): The ID of the document to read.
            partition_key (str): The partition key of the document.

        Returns:
            Optional[Dict[str, Any]]: The retrieved document, or None if not found.
        """
        try:
            document = self.container.read_item(item=document_id, partition_key=partition_key)
            return document
        except exceptions.CosmosResourceNotFoundError:
            print("Document not found.")
            return None
        except exceptions.CosmosHttpResponseError as e:
            print(f"Error reading document: {e}")
            return None

    def query_documents(self, query: str) -> List[Dict[str, Any]]:
        """
        Query documents in the container.

        Parameters:
            query (str): SQL query string to execute.

        Returns:
            List[Dict[str, Any]]: A list of documents that match the query.
        """
        try:
            items = list(self.container.query_items(query=query, enable_cross_partition_query=True))
            return items
        except exceptions.CosmosHttpResponseError as e:
            print(f"Error querying documents: {e}")
            return []

    def delete_document(self, document_id: str, partition_key: str) -> bool:
        """
        Delete a document by ID.

        Parameters:
            document_id (str): The ID of the document to delete.
            partition_key (str): The partition key of the document.

        Returns:
            bool: True if the document was deleted, False otherwise.
        """
        try:
            self.container.delete_item(item=document_id, partition_key=partition_key)
            print("Document deleted successfully.")
            return True
        except exceptions.CosmosResourceNotFoundError:
            print("Document not found.")
            return False
        except exceptions.CosmosHttpResponseError as e:
            print(f"Error deleting document: {e}")
            return False

    def list_all_documents(self) -> List[Dict[str, Any]]:
        """
        List all documents in the container.

        Returns:
            List[Dict[str, Any]]: A list of all documents in the container.
        """
        try:
            items = list(self.container.read_all_items())
            return items
        except exceptions.CosmosHttpResponseError as e:
            print(f"Error listing all documents: {e}")
            return []


import pandas as pd

COSMOS_URL = ''
COSMOS_KEY = ""
DATABASE_NAME = ''
CONTAINER_NAME = ''
PARTITION_KEY = ''  # Ensure this matches the partition key path in your Cosmos DB

USER_ID = ""

cosmos_client_papers = CosmosDBClient(
    url=COSMOS_URL,
    key=COSMOS_KEY,
    database_name=DATABASE_NAME,
    container_name=CONTAINER_NAME,
    partition_key=PARTITION_KEY
)
def appendTable(source,name):
    spark_df = spark.createDataFrame(source,schema)
    spark_df.write.mode("append").format("delta").saveAsTable(name)


df_pd['published'] = df_pd['published'].apply(lambda x:str(x))  # Convert 'published' to Python str
df_pd['recommended'] = df_pd['recommended'].fillna(0).apply(int)  # Convert to native Python int
df_pd['referenceCount'] = df_pd['referenceCount'].fillna(0).apply(int)  # Convert to native Python int
df_pd['citationCount'] = df_pd['citationCount'].fillna(0).apply(int)  # Convert to native Python int
df_pd['id'] = df_pd["hash_id"].apply(lambda x:str(x))
# Convert the DataFrame row to a dictionary and upsert it to CosmosDB
for _,row in df_pd.iterrows():
    cosmos_client_papers.upsert_document(row.to_dict())

# df_pd.iloc[0].to_dict()
# df_pd.info()


StatementMeta(, 48c629aa-a57b-401b-b430-be169eb99d26, 29, Finished, Available, Finished)

Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document upserted successfully.
Document

In [26]:
!pip install azure-search-documents azure-core azure-identity

from azure.core.credentials import AzureKeyCredential
from azure.search.documents.indexes import SearchIndexerClient
import time
import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class IndexerManager:
    def __init__(self, endpoint, key, indexer_name):
        """Initialize the indexer manager.
        
        Args:
            endpoint (str): Azure Cognitive Search service endpoint
            key (str): Azure Cognitive Search admin key
            indexer_name (str): Name of the existing indexer
        """
        self.endpoint = endpoint
        self.credential = AzureKeyCredential(key)
        self.indexer_name = indexer_name
        self.client = SearchIndexerClient(endpoint=endpoint, credential=self.credential)

    def run_indexer(self):
        """Run the indexer and wait for completion."""
        try:
            # Run the indexer
            self.client.run_indexer(self.indexer_name)
            logger.info(f"Started indexer: {self.indexer_name}")
            
            # Monitor progress
            self._monitor_indexer_status()
            
        except Exception as e:
            logger.error(f"Error running indexer: {str(e)}")
            raise

    def _monitor_indexer_status(self, check_interval=30, timeout=3600):
        """Monitor indexer status until completion or timeout."""
        start_time = time.time()
        
        while True:
            # Check if we've exceeded timeout
            if time.time() - start_time > timeout:
                raise TimeoutError("Indexer monitoring timed out")

            # Get current status
            status = self.client.get_indexer_status(self.indexer_name)
            
            if status.last_result:
                logger.info(f"Status: {status.last_result.status}")
                if hasattr(status.last_result, 'document_count'):
                    logger.info(f"Documents processed: {status.last_result.document_count}")
                if hasattr(status.last_result, 'failed_document_count'):
                    logger.info(f"Failed documents: {status.last_result.failed_document_count}")
                
                # Check if completed
                if status.last_result.status in ["success", "warning"]:
                    logger.info("Indexing completed successfully")
                    break
                elif status.last_result.status == "error":
                    error_message = status.last_result.error_message
                    raise Exception(f"Indexing failed: {error_message}")
            
            # Wait before next check
            time.sleep(check_interval)

    def get_indexer_stats(self):
        """Get detailed indexer statistics."""
        try:
            status = self.client.get_indexer_status(self.indexer_name)
            return {
                "status": status.last_result.status if status.last_result else "Unknown",
                "document_count": status.last_result.document_count if status.last_result and hasattr(status.last_result, 'document_count') else 0,
                "failed_document_count": status.last_result.failed_document_count if status.last_result and hasattr(status.last_result, 'failed_document_count') else 0,
                "errors": status.last_result.errors if status.last_result and hasattr(status.last_result, 'errors') else [],
                "warnings": status.last_result.warnings if status.last_result and hasattr(status.last_result, 'warnings') else [],
                "start_time": status.last_result.start_time if status.last_result else None,
                "end_time": status.last_result.end_time if status.last_result else None
            }
        except Exception as e:
            logger.error(f"Error getting indexer stats: {str(e)}")
            raise

def main():
    # Configuration
    SEARCH_ENDPOINT = ""
    SEARCH_KEY = ""
    INDEXER_NAME = ""  # Your existing indexer name

    # Initialize indexer manager
    manager = IndexerManager(SEARCH_ENDPOINT, SEARCH_KEY, INDEXER_NAME)

    try:
        # Run the indexer
        logger.info("Starting indexer run...")
        manager.run_indexer()

        # Get final stats
        logger.info("\nFinal indexer statistics:")
        stats = manager.get_indexer_stats()
        logger.info(f"Total documents processed: {stats['document_count']}")
        logger.info(f"Failed documents: {stats['failed_document_count']}")
        
        if stats['errors']:
            logger.info("\nErrors encountered:")
            for error in stats['errors']:
                logger.error(f"- {error.message}")
        
        if stats['warnings']:
            logger.info("\nWarnings:")
            for warning in stats['warnings']:
                logger.warning(f"- {warning.message}")

    except Exception as e:
        logger.error(f"Error in indexing process: {str(e)}")


main()

StatementMeta(, 48c629aa-a57b-401b-b430-be169eb99d26, 30, Finished, Available, Finished)

Collecting azure-search-documents
  Downloading azure_search_documents-11.5.2-py3-none-any.whl.metadata (23 kB)
Collecting azure-common>=1.1 (from azure-search-documents)
  Downloading azure_common-1.1.28-py2.py3-none-any.whl.metadata (5.0 kB)
Downloading azure_search_documents-11.5.2-py3-none-any.whl (298 kB)
Downloading azure_common-1.1.28-py2.py3-none-any.whl (14 kB)
Installing collected packages: azure-common, azure-search-documents
Successfully installed azure-common-1.1.28 azure-search-documents-11.5.2
