### 1. Load the IoTDeviceInfo dataset from ADLS Gen2 to a dataframe
>The Synapse workspace is attached to an ADLS Gen2 storage account and the files placed on the default storage account can be accessed using the relative path as below.
&nbsp;



In [6]:
dfDeviceInfo = (spark
                .read
                .csv("abfss://cosmosdemo@<ADLS Gen2 Account Name>.dfs.core.windows.net/SynapseDemoIoT/IoTDeviceInfo.csv", header=True)
              )

dfSignals = (spark
                .read
                .csv("abfss://cosmosdemo@<ADLS Gen2 Account Name>.dfs.core.windows.net/SynapseDemoIoT/IoTSignals.csv", header=True)
              )

StatementMeta(sparkpool, 1, 3, Finished, Available)



In [None]:
dfProducts = (spark
                .read
                .csv("abfss://cosmosdemo@<ADLS Gen2 Account Name>.dfs.core.windows.net/SynapseDemoRetail/Products.csv", header=True)
              )

dfRetailSales = (spark
                .read
                .csv("abfss://cosmosdemo@<ADLS Gen2 Account Name>.dfs.core.windows.net/SynapseDemoRetail/RetailSales.csv", header=True)
              )

dfStoreDemographics = (spark
                .read
                .csv("abfss://cosmosdemo@<ADLS Gen2 Account Name>.dfs.core.windows.net/SynapseDemoRetail/StoreDemoGraphics.csv", header=True)
              )


### 2. Write the dataframe to the Azure Cosmos DB collection


In [8]:
dfDeviceInfo.write\
            .format("cosmos.oltp")\
            .option("spark.synapse.linkedService", "CosmosDemo")\
            .option("spark.cosmos.container", "IoTDeviceInfo")\
            .option("spark.cosmos.write.upsertEnabled", "true")\
            .mode('append')\
            .save()

dfSignals.write\
            .format("cosmos.oltp")\
            .option("spark.synapse.linkedService", "CosmosDemo")\
            .option("spark.cosmos.container", "IoTSignals")\
            .option("spark.cosmos.write.upsertEnabled", "true")\
            .mode('append')\
            .save()

StatementMeta(sparkpool, 1, 5, Finished, Available)



In [None]:
dfProducts.write\
            .format("cosmos.oltp")\
            .option("spark.synapse.linkedService", "CosmosDemo")\
            .option("spark.cosmos.container", "RetailProducts")\
            .option("spark.cosmos.write.upsertEnabled", "true")\
            .mode('append')\
            .save()

dfRetailSales.write\
            .format("cosmos.oltp")\
            .option("spark.synapse.linkedService", "CosmosDemo")\
            .option("spark.cosmos.container", "RetailSales")\
            .option("spark.cosmos.write.upsertEnabled", "true")\
            .mode('append')\
            .save()

dfStoreDemographics.write\
            .format("cosmos.oltp")\
            .option("spark.synapse.linkedService", "CosmosDemo")\
            .option("spark.cosmos.container", "RetailStoreDemographics")\
            .option("spark.cosmos.write.upsertEnabled", "true")\
            .mode('append')\
            .save()

### 3. Simulate streaming data generation using Rate streaming source
* The Rate streaming source is used to simplify the solution here and can be replaced with any supported streaming sources such as [Azure Event Hubs](https://azure.microsoft.com/en-us/services/event-hubs/) and [Apache Kafka](https://docs.microsoft.com/en-us/azure/hdinsight/kafka/apache-kafka-introduction).


>The Rate streaming source generates data at the specified number of rows per second and each output row contains a timestamp and value.


In [9]:
dfStream = (spark
                .readStream
                .format("rate")
                .option("rowsPerSecond", 10)
                .load()
            )

StatementMeta(sparkpool, 1, 6, Finished, Available)



### 4. Format the stream dataframe as per the IoTSignals schema


In [10]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
import uuid

numberOfDevices = 10
generate_uuid = F.udf(lambda : str(uuid.uuid4()), StringType())
              
dfIoTSignals = (dfStream
                    .withColumn("id", generate_uuid())
                    .withColumn("dateTime", dfStream["timestamp"].cast(StringType()))
                    .withColumn("deviceId", F.concat(F.lit("dev-"), F.expr("mod(value, %d)" % numberOfDevices)+1))
                    .withColumn("measureType", F.expr("CASE WHEN rand() < 0.5 THEN 'Rotation Speed' ELSE 'Output' END"))
                    .withColumn("unitSymbol", F.expr("CASE WHEN rand() < 0.5 THEN 'RPM' ELSE 'MW' END"))
                    .withColumn("unit", F.expr("CASE WHEN rand() < 0.5 THEN 'Revolutions per Minute' ELSE 'MegaWatts' END"))
                    .withColumn("measureValue", F.expr("CASE WHEN rand() > 0.9 THEN value * 2 WHEN rand() < 0.1 THEN value div 2 ELSE value END"))
                    .drop("timestamp")
                )

StatementMeta(sparkpool, 1, 7, Finished, Available)



### 5. Stream writes to the Azure Cosmos DB Collection


In [11]:
streamQuery = dfIoTSignals\
        .writeStream\
        .format("cosmos.oltp")\
        .outputMode("append")\
        .option("checkpointLocation", "/writeCheckpointDir")\
        .option("spark.synapse.linkedService", "CosmosDemo")\
        .option("spark.cosmos.container", "IoTStreamingSignals")\
        .option("spark.cosmos.connection.mode", "gateway")\
        .start()

streamQuery.awaitTermination()

StatementMeta(sparkpool, 1, 8, Finished, Cancelled)