In [0]:
from pyspark.sql import SparkSession
import requests
import os
from pyspark.sql.functions import regexp_replace, col

# Initialize Spark session
spark = SparkSession.builder \
    .appName("CosmosDBWriteOptimization") \
    .getOrCreate()

# Replace these variables with your Azure Cosmos DB credentials
COSMOS_DB_URI = ""
COSMOS_DB_KEY = ""  # Ensure to keep your key secure
COSMOS_DB_DATABASE_NAME = "diskanndb"
COSMOS_DB_CONTAINER_NAME = "search"

# Azure Blob Storage URL for the JSON file
blob_url = ""

# sample expects file with array of json docs in following format, where embedding is an array of floats that represent the "abstract" field:

# [
#   {
#     "abstract": "  $GL_h(n) \\\\times GL_h(m)$-covariant $h$-bosonic algebras are built by\\ncontracting the $GL_q(n) \\\\times GL_q(m)$-covariant $q$-bosonic algebras\\nconsidered by the present author some years ago. Their defining relations are\\nwritten in terms of the corresponding $R_h$-matrices. Whenever $n=2$, and $m=1$\\nor 2, it is proved by using U_h(sl(2)) Clebsch-Gordan coefficients that they\\ncan also be expressed in terms of coupled commutators in a way entirely similar\\nto the classical case. Some U_h(sl(2)) rank-1/2 irreducible tensor operators,\\nrecently contructed by Aizawa in terms of standard bosonic operators, are shown\\nto provide a realization of the $h$-bosonic algebra corresponding to $n=2$ and\\n$m=1$.\\n",
#     "authors": "C. Quesne",
#     "authors_parsed": "[[\"Quesne\", \"C.\", \"\"]]",
#     "categories": "math.QA hep-th math-ph math.MP",
#     "comments": "7 pages, LaTeX, no figure, presented at the 7th Colloquium ``Quantum\\n  Groups and Integrable Systems'', Prague, 18--20 June 1998, submitted to\\n  Czech. J. Phys",
#     "doi": "null",
#     "embedding": [0.005199998617172241, 0.002000004291534424, 0.0008000133037567139],
#     "id": "math_9810161",
#     "journal-ref": "Czech. J. Phys. 48 (1998) 1471-1476",
#     "license": "null",
#     "report-no": "ULB/229/CQ/98/4",
#     "submitter": "Christiane Quesne",
#     "title": "Nonstandard GL_h(n) quantum groups and contraction of covariant\\n  q-bosonic algebras",
#     "update_date": "2007-05-23",
#     "versions": "[{\"version\": \"v1\", \"created\": \"Wed, 28 Oct 1998 13:50:20 GMT\"}]",
#   },
#   {
#     ... next document
#   }
# ]

# Output file where the JSON file will be stored (mounted path)
output_file_path = "/dbfs/mnt/temp/vectors1-utf8.json"

# Ensure the directory exists
os.makedirs(os.path.dirname(output_file_path), exist_ok=True)

# Step 1: Stream the file from the Blob URL and save it to disk
response = requests.get(blob_url, stream=True)

if response.status_code == 200:
    with open(output_file_path, 'w', encoding='utf-8') as output_file:
        for chunk in response.iter_content(chunk_size=1024 * 1024, decode_unicode=True):
            output_file.write(chunk)
    print(f"File successfully written to {output_file_path}")
else:
    print(f"Failed to download the file. Status code: {response.status_code}")

# Step 2: Correct the path for Spark to use dbfs:/ format
spark_file_path = output_file_path.replace("/dbfs", "dbfs:")

# Step 3: Read the entire JSON file with schema inference and parallel reads
df = spark.read.option("multiLine", "true") \
    .option("inferSchema", "true") \
    .json(spark_file_path)

# Step 4: Show the schema to verify the data structure
df.printSchema()

# Clean the document IDs to remove illegal characters using regex replace
df = df.withColumn('id', regexp_replace(col('id'), '[/#\\\\]', '_'))

# Repartition the DataFrame to optimize writes
num_partitions = 200  # Adjust this based on your Spark cluster resources
df = df.repartition(num_partitions)

# Step 5: Write to Cosmos DB in batches and optimize partitions
df.write \
    .format("cosmos.oltp") \
    .mode("append") \
    .option("spark.cosmos.database", COSMOS_DB_DATABASE_NAME) \
    .option("spark.cosmos.container", COSMOS_DB_CONTAINER_NAME) \
    .option("spark.cosmos.accountEndpoint", COSMOS_DB_URI) \
    .option("spark.cosmos.accountKey", COSMOS_DB_KEY) \
    .option("spark.cosmos.write.strategy", "ItemOverwrite") \
    .save()

print("Data written to Azure Cosmos DB")
