In [5]:
dfStream = (spark
                .readStream
                .format("rate")
                .option("rowsPerSecond", 10)
                .load()
            )

StatementMeta(sparkling, 6, 1, Finished, Available)



In [6]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
import uuid

numberOfDevices = 10
generate_uuid = F.udf(lambda : str(uuid.uuid4()), StringType())
              
dfIoTSignals = (dfStream
                    .withColumn("id", generate_uuid())
                    .withColumn("deviceId", F.concat(F.lit("dev-"), F.expr("mod(value, %d)" % numberOfDevices)))
                    .withColumn("dateTime", dfStream["timestamp"].cast(StringType()))
                    .withColumn("unit", F.expr("CASE WHEN rand() < 0.5 THEN 'Revolutions per Minute' ELSE 'MegaWatts' END"))
                    .withColumn("unitSymbol", F.expr("CASE WHEN rand() < 0.5 THEN 'RPM' ELSE 'MW' END"))
                    .withColumn("measureType", F.expr("CASE WHEN rand() < 0.5 THEN 'Rotation Speed' ELSE 'Output' END"))
                    .withColumn("measureValue", F.expr("CASE WHEN rand() > 0.95 THEN value * 10 WHEN rand() < 0.05 THEN value div 10 ELSE value END"))
                    .drop("timestamp", "value")
                )

StatementMeta(sparkling, 6, 2, Finished, Available)



In [8]:
kustoQ = dfIoTSignals \
    .writeStream \
    .format("com.microsoft.kusto.spark.synapse.datasink.KustoSynapseSinkProvider") \
    .option("spark.synapse.linkedService", "adx") \
    .option("kustoDatabase", "adxdb") \
    .option("kustoTable", "DeviceData") \
    .option("tableCreateOptions","CreateIfNotExist") \
    .option("checkpointLocation", "/kusto_checkpoint_path") \
    .trigger(processingTime='10 seconds') \
    .start().awaitTermination(3600)

StatementMeta(sparkling, 6, 4, Submitted, Running)