In [None]:
# Specifying appid, appkey and tenanid is optional in spark-cdm-connector-assembly-0.16.jar with Premium Databricks Cluster and Synapse
appid = "<appId>"
appkey = "<appKey>"
tenantid = "<tenantId>"

storageAccountName = "<storageAccount>.dfs.core.windows.net"

In [None]:
# Implicit write case
from pyspark.sql.types import *
from pyspark.sql import functions, Row
from decimal import Decimal
from datetime import datetime

# Write a CDM entity with Parquet data files, entity definition is derived from the dataframe schema
d = datetime.strptime("2015-03-31", '%Y-%m-%d')
ts = datetime.now()
data = [
  ["a", 1, True, 12.34, 6, d, ts, Decimal(1.4337879), Decimal(999.00), Decimal(18.8)],
  ["b", 1, True, 12.34, 6, d, ts, Decimal(1.4337879), Decimal(999.00), Decimal(18.8)]
]

schema = (StructType()
  .add(StructField("name", StringType(), True))
  .add(StructField("id", IntegerType(), True))
  .add(StructField("flag", BooleanType(), True))
  .add(StructField("salary", DoubleType(), True))
  .add(StructField("phone", LongType(), True))
  .add(StructField("dob", DateType(), True))
  .add(StructField("time", TimestampType(), True))
  .add(StructField("decimal1", DecimalType(15, 3), True))
  .add(StructField("decimal2", DecimalType(38, 7), True))
  .add(StructField("decimal3", DecimalType(5, 2), True))
)

df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

# Creates the CDM manifest and adds the entity to it with gzip'd parquet partitions
# with both physical and logical entity definitions 
(df.write.format("com.microsoft.cdm")
  .option("storage", storageAccountName)
  .option("manifestPath", container + "/implicitTest/default.manifest.cdm.json")
  .option("entity", "TestEntity")
  .option("format", "parquet")
  .option("compression", "gzip")
  .save())

# Append the same dataframe content to the entity in the default CSV format
(df.write.format("com.microsoft.cdm")
  .option("storage", storageAccountName)
  .option("manifestPath", container + "/implicitTest/default.manifest.cdm.json")
  .option("entity", "TestEntity")
  .mode("append")
  .save())

readDf = (spark.read.format("com.microsoft.cdm")
  .option("storage", storageAccountName)
  .option("manifestPath", container + "/implicitTest/default.manifest.cdm.json")
  .option("entity", "TestEntity")
  .load())

readDf.select("*").show()

In [None]:
# Explicit write, creating an entity in a CDM folder based on a pre-defined model 

# Case 1: Using an entity definition defined in the CDM Github repo

data = [
  ["1", "2", "3", 4],
  ["4", "5", "6", 8],
  ["7", "8", "9", 4],
  ["10", "11", "12", 8],
  ["13", "14", "15", 4]
]

schema = (StructType()
  .add(StructField("teamMembershipId", StringType(), True))
  .add(StructField("systemUserId", StringType(), True))
  .add(StructField("teamId", StringType(), True))
  .add(StructField("versionNumber", LongType(), True))
)

df = spark.createDataFrame(spark.sparkContext.parallelize(data,1), schema)
          
(df.write.format("com.microsoft.cdm")
  .option("storage", storageAccountName)
  .option("manifestPath", container + "/explicitTest/root.manifest.cdm.json")
  .option("entity", "TeamMembership")
  .option("entityDefinitionPath", "core/applicationCommon/TeamMembership.cdm.json/TeamMembership")
  .option("useCdmStandardModelRoot", True)  # sets the model root to the CDM CDN schema documents folder
  .option("useSubManifest", True)
  .save()) # If table already exists, add .mode("overwrite")

readDf = (spark.read.format("com.microsoft.cdm")
  .option("storage", storageAccountName)
  .option("manifestPath", container + "/explicitTest/root.manifest.cdm.json")
  .option("entity", "TeamMembership")
  .load())
          
readDf.select("*").show()

In [None]:
# Explicit write, creating an entity in a CDM folder based on a pre-defined model 

# Case 2: Using an entity definition defined in a CDM model stored in ADLS

# UPLOAD CDM FILES FIRST
# To run this example, first create a /Models/Contacts folder to your demo container in ADLS gen2,
# then upload the provided Contacts.manifest.cdm.json, Person.cdm.json, Entity.cdm.json files

birthdate = datetime.strptime("1991-03-31", '%Y-%m-%d')
now = datetime.now()
data2 = [
  [1,now,"Donna","Carreras",birthdate],
  [2,now,"Keith","Harris",birthdate],
  [2,now,"Carla","McGee",birthdate]
]

schema2 = (StructType()
  .add(StructField("identifier", IntegerType()))
  .add(StructField("createdTime", TimestampType()))
  .add(StructField("firstName", StringType()))
  .add(StructField("lastName", StringType()))
  .add(StructField("birthDate", DateType())))

# Create the dataframe that matches the CDM definition of the entity, Person
df2 = spark.createDataFrame(spark.sparkContext.parallelize(data2, 1), schema2)
(df2.write.format("com.microsoft.cdm")
  .option("storage", storageAccountName)
  .option("manifestPath", container + "/Data/Contacts/root.manifest.cdm.json")
  .option("entity", "Person")
  .option("entityDefinitionModelRoot", container + "/Models") 
  .option("entityDefinitionPath", "/Contacts/Person.cdm.json/Person")
  .save()) # If table already exists, add .mode("overwrite")

readDf2 = (spark.read.format("com.microsoft.cdm")
  .option("storage", storageAccountName)
  .option("manifestPath", container + "/Data/Contacts/root.manifest.cdm.json")
  .option("entity", "Person")
  .load())

readDf2.select("*").show()

In [None]:
# Overriding from configPath

from pyspark.sql.types import *
from pyspark.sql import functions, Row
from decimal import Decimal
from datetime import datetime

timestamp1 = datetime.now()
timestamp2 = datetime.now()
cdata = [
  [timestamp1, timestamp2, 1, "A", Decimal(33.5)],
  [timestamp1, timestamp2, 2, "B", Decimal(42.1)],
  [timestamp1, timestamp2, 3, "C", Decimal(7.90)]
]
    
cschema = (StructType()
  .add(StructField("ValidFrom", TimestampType()))
  .add(StructField("ValidTo", TimestampType()))
  .add(StructField("CustomerId", IntegerType()))
  .add(StructField("CustomerName", StringType()))
  .add(StructField("CreditLimit", DecimalType(18, 2))))

# Create the dataframe
customerdf = spark.createDataFrame(spark.sparkContext.parallelize(cdata), cschema)

(customerdf.write.format("com.microsoft.cdm")
  .option("storage", storageAccountName)
  .option("manifestPath", outputContainer + "/customer/default.manifest.cdm.json")
  .option("entity", "TestEntity")
  .option("entityDefinitionPath", "Customer.cdm.json/Customer")  # Customer.cdm.json has an alias - "core"
  .option("entityDefinitionModelRoot", container + "Models")   # fetches config.json from this location and finds definition of "core" alias, if configPath option is not present
  .option("configPath", "/config")  # Add your config.json to override the above definition. This will find config.json in container - "config"
  .option("entityDefinitionStorage", "<storage1>.dfs.core.windows.net") # entityDefinitionModelRoot contains in this storage account
  .option("format", "parquet")
  .save())

readDf2 = (spark.read.format("com.microsoft.cdm")
  .option("storage", storageAccountName)
  .option("manifestPath", outputContainer + "/customer/default.manifest.cdm.json")
  .option("entity", "TestEntity")
  .load())

readDf2.select("*").show()


In [None]:
from pyspark.sql.types import *
from pyspark.sql import functions, Row
from decimal import Decimal
from datetime import datetime

birthdate = datetime.strptime("1991-03-31", '%Y-%m-%d')
now = datetime.now()

data2 = [
  [13, ["Donna Carreras", True, 12.34, 63232, birthdate, Decimal(22.7), now, [95110, ["Bose street", 321], [['bieber1', 1], ['bieber2', 2]] ]]],
  [24, ["Keith Harris", True, 12.34, 63234, birthdate, Decimal(22.7), now, [95110, ["Estancia Dr", 185], [['baby1', 3], ['baby2', 34], ['baby3', 5], ['baby4', 6]] ]]]
]

streetSchema = [StructField("streetName", StringType(), True),
               StructField("streetNumber", IntegerType(), True)]

songSchema = [StructField("name", StringType(), True),
               StructField("number", IntegerType(), True)]

addressSchema = [StructField("zipcode", StringType(), True),
                StructField("street", StructType(streetSchema), True),
                StructField("songs", ArrayType(StructType(songSchema)), True)]

detailSchema = [StructField("name", StringType(), True),
                StructField("USCitizen", BooleanType(), True),
                StructField("salary", DoubleType(), True),
                StructField("phone", LongType(), True),
                StructField("birthDate", DateType(), True),
                StructField("bodyMassIndex", DecimalType(5, 2), True),
                StructField("createdTime", TimestampType(), True),
                StructField("address", StructType(addressSchema), True)]

schema = [StructField("id", IntegerType(), True),
          StructField("details", StructType(detailSchema), True)]

schema2 = StructType(schema)

# Create the dataframe
df2 = spark.createDataFrame(spark.sparkContext.parallelize(data2), schema2)

# Implicit write
(df2.write.format("com.microsoft.cdm")
  .option("storage", storageAccountName)
  .option("manifestPath", container + "/nestedImplicit/default.manifest.cdm.json")
  .option("entity", "NestedExampleImplicit")
  .option("format", "parquet")
  .save())

#Explicit write

#To run this example, first create a /Models/Contacts folder to your demo container in ADLS gen2,
#then upload the provided NestedExample.cdm.json file
(df2.write.format("com.microsoft.cdm")
  .option("storage", storageAccountName)
  .option("manifestPath", container + "/nestedExplicit/default.manifest.cdm.json")
  .option("entity", "NestedExampleExplicit")
  .option("entityDefinitionPath", "/Contacts/NestedExample.cdm.json/NestedExample")
  .option("entityDefinitionModelRoot", container + "/Models")
  .option("format", "parquet")
  .save())

readImplicit = (spark.read.format("com.microsoft.cdm")
  .option("storage", storageAccountName)
  .option("manifestPath", container + "/nestedImplicit/default.manifest.cdm.json")
  .option("entity", "NestedExampleImplicit")
  .load())

readExplicit = (spark.read.format("com.microsoft.cdm")
  .option("storage", storageAccountName)
  .option("manifestPath", container + "/nestedExplicit/default.manifest.cdm.json")
  .option("entity", "NestedExampleExplicit")
  .load())

df2.select("*").show(truncate = False)
readImplicit.select("*").show(truncate = False)
readExplicit.select("*").show(truncate = False)
