%md
# Write DynamoDB JSON data in Azure storage (ADLS) to Azure Cosmos DB 

This notebook demonstrates how to load DynamoDB JSON data from Azure Data Lake Storage Gen2 (ADLS Gen2) and write it to Azure Cosmos DB for NoSQL. The workflow is divided into these steps:

**1. Load DynamoDB JSON data from ADLS:** Set up Spark configuration and load DynamoDB compressed JSON data from ADLS Gen2. Optionally, transform DynamoDB data.

**2. Load data to Azure Cosmos DB NoSQL using CosmosDB Spark Connector:** Create the Azure Cosmos DB database and container using the Catalog API. Write data to Azure Cosmos DB container. 


**Step 1: Install dependencies**

In [0]:
pip install azure-cosmos azure-mgmt-cosmosdb azure.mgmt.authorization

**Step 2: Restart the kernel to ensure the updated packages are being used.**

In [0]:
dbutils.library.restartPython()

**Step 3: Connect to the Azure Data Lake Storage Gen2 account, read DynamoDB JSON files 
Using Apache Spark, and display the loaded data in a Spark DataFrame.**

In [0]:
storage_account_name = ""
container_name = ""
# update this as per your AWS S3 bucket
file_path = "AWSDynamoDB/01738047791106-7ba095a9/data/*"

# Construct the ABFS path
abfs_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{file_path}"
print(abfs_path)

# Microsoft Entra ID application
client_id = ""
client_secret = ""
tenant_id = ""

# Set up Spark configuration for Entra ID authentication
spark.conf.set(f"fs.azure.account.auth.type.{storage_account_name}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account_name}.dfs.core.windows.net", 
               "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account_name}.dfs.core.windows.net", client_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account_name}.dfs.core.windows.net", client_secret)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account_name}.dfs.core.windows.net", 
               f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

# Read DynamoDB JSON data from Azure Storage

df = spark.read.format("json") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(abfs_path)

# Show the DataFrame
df.show()


**Step 4 (optional): This step can be used to further transform the DynamoDB data before writing it to Azure Cosmos DB. For example, it can to be used to generate a unqiue `id` field if not already present in DynamoDB**

In [0]:
from pyspark.sql.functions import col,monotonically_increasing_id

def extract_dynamodb_fields(df):
    # Define the list of columns you want to extract dynamically. Replace with the list of columns\fields you want to extract. 
    columns_to_select = ["customer_id", "Address", "Phone", "Email","Name"] 

    # DynamoDB possible attribute types
    dynamodb_types = ["S", "N", "B", "BS", "BOOL", "NS", "SS", "L", "M", "NULL"]

    # Generate valid field selections dynamically from "Item"
    select_expressions = []

    # Check if "Item" column exists
    if "Item" in df.columns:
        item_schema = df.schema["Item"].dataType  # Get the StructType of the "Item" column

        for col_name in columns_to_select:
            # Check if the attribute exists inside "Item"
            if col_name in item_schema.fieldNames():  # Correct way to access struct field names
                subfields = item_schema[col_name].dataType  # Get subfields of the attribute

                for dtype in dynamodb_types:
                    if dtype in subfields.fieldNames():  # Check if specific DynamoDB type exists
                        field_expr = f"Item.{col_name}.{dtype}"
                        select_expressions.append(col(field_expr).alias(f"{col_name}_{dtype}"))

        # Select only the existing fields dynamically
        if select_expressions:
            transformed_df = df.select(*select_expressions)
            transformed_df.show(truncate=False)
            return transformed_df
        else:
            print("No valid fields found in 'Item'.")
            return df
    else:
        print("No 'Item' column found in the DataFrame.")
        return df

# Call the function to process the provided DataFrame (df)
result_df = extract_dynamodb_fields(df)
# Adding an 'id' column to the DataFrame as the source doesn't have an 'id' column. Exclude if you already have 'id' column in source. 
objecttransformed_df_new = result_df.withColumn("id", monotonically_increasing_id().cast("string"))


**Step 5: Create Azure Cosmos DB database and container using the Catalog API of the Azure Cosmos DB Spark connector**

In [0]:
cosmosEndpoint = ""
resourceGroupName = ""
subscriptionId = ""
cosmosDatabaseName = "demodb"
cosmosContainerName = "customers"

# Microsoft Entra ID application
client_id = ""
client_secret = ""
tenant_id = ""

partitionKeyPath = "/id"
throughput = '400'

# Configure Catalog API to use Entra ID authentication
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.type", "ServicePrincipal")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.account.subscriptionId", subscriptionId)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.account.resourceGroupName", resourceGroupName)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.account.tenantId", tenant_id)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.aad.clientId", client_id)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.aad.clientSecret", client_secret)


spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format(cosmosDatabaseName))
# Provide the manualThroughput value based on the data loading size. 
spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = partitionKeyPath, manualThroughput = throughput)".format(cosmosDatabaseName, cosmosContainerName))

**Step 6: Write data to Azure Cosmos DB container using the Azure Cosmos DB Spark connector**

In [0]:
cosmosEndpoint = ""
cosmosDatabaseName = "demodb"
cosmosContainerName = "customers"
subscriptionID=""
resourceGroup=""

# Microsoft Entra ID application
client_id = ""
client_secret = ""
tenant_id = ""


# Set configuration settings to include Entra ID authentication
config = {
  "spark.cosmos.accountEndpoint": cosmosEndpoint,
  "spark.cosmos.auth.type": "ServicePrincipal",
  "spark.cosmos.account.tenantId": tenant_id,
  "spark.cosmos.auth.aad.clientId": client_id,
  "spark.cosmos.auth.aad.clientSecret": client_secret,
  "spark.cosmos.database": cosmosDatabaseName,
  "spark.cosmos.container": cosmosContainerName,
  "spark.cosmos.account.subscriptionId": subscriptionID,
  "spark.cosmos.account.resourceGroupName": resourceGroup
}

# Write data to Azure Cosmos DB
objecttransformed_df_new\
   .write\
   .format("cosmos.oltp")\
   .options(**config)\
   .mode("APPEND")\
   .save()