# CosmosDB Configuration

### 1. Load the IoTDeviceInfo dataset from ADLS Gen2 to a dataframe
>The Synapse workspace is attached to an ADLS Gen2 storage account and the files placed on the default storage account can be accessed using the relative path as below.
&nbsp;

In [None]:
dfDeviceInfo = (spark
                .read
                .csv("abfss://cosmosdemo@<ADLS Gen2 Account Name>.dfs.core.windows.net/SynapseDemoIoT/IoTDeviceInfo.csv", header=True)
              )

dfSignals = (spark
                .read
                .csv("abfss://cosmosdemo@<ADLS Gen2 Account Name>.dfs.core.windows.net/SynapseDemoIoT/IoTSignals.csv", header=True)
              )

In [None]:
dfProducts = (spark
                .read
                .csv("abfss://cosmosdemo@<ADLS Gen2 Account Name>.dfs.core.windows.net/SynapseDemoRetail/Products.csv", header=True)
              )

dfRetailSales = (spark
                .read
                .csv("abfss://cosmosdemo@<ADLS Gen2 Account Name>.dfs.core.windows.net/SynapseDemoRetail/RetailSales.csv", header=True)
              )

dfStoreDemographics = (spark
                .read
                .csv("abfss://cosmosdemo@<ADLS Gen2 Account Name>.dfs.core.windows.net/SynapseDemoRetail/StoreDemoGraphics.csv", header=True)
              )


### 2. Write the dataframe to the Azure Cosmos DB collection


In [None]:
dfDeviceInfo.write\
            .format("cosmos.oltp")\
            .option("spark.synapse.linkedService", "CosmosDemo")\
            .option("spark.cosmos.container", "IoTDeviceInfo")\
            .option("spark.cosmos.write.upsertEnabled", "true")\
            .mode('append')\
            .save()

dfSignals.write\
            .format("cosmos.oltp")\
            .option("spark.synapse.linkedService", "CosmosDemo")\
            .option("spark.cosmos.container", "IoTSignals")\
            .option("spark.cosmos.write.upsertEnabled", "true")\
            .mode('append')\
            .save()

In [None]:
dfProducts.write\
            .format("cosmos.oltp")\
            .option("spark.synapse.linkedService", "CosmosDemo")\
            .option("spark.cosmos.container", "RetailProducts")\
            .option("spark.cosmos.write.upsertEnabled", "true")\
            .mode('append')\
            .save()

dfRetailSales.write\
            .format("cosmos.oltp")\
            .option("spark.synapse.linkedService", "CosmosDemo")\
            .option("spark.cosmos.container", "RetailSales")\
            .option("spark.cosmos.write.upsertEnabled", "true")\
            .mode('append')\
            .save()

dfStoreDemographics.write\
            .format("cosmos.oltp")\
            .option("spark.synapse.linkedService", "CosmosDemo")\
            .option("spark.cosmos.container", "RetailStoreDemographics")\
            .option("spark.cosmos.write.upsertEnabled", "true")\
            .mode('append')\
            .save()

### 3. Simulate streaming data generation using Rate streaming source
* The Rate streaming source is used to simplify the solution here and can be replaced with any supported streaming sources such as [Azure Event Hubs](https://azure.microsoft.com/en-us/services/event-hubs/) and [Apache Kafka](https://docs.microsoft.com/en-us/azure/hdinsight/kafka/apache-kafka-introduction).


>The Rate streaming source generates data at the specified number of rows per second and each output row contains a timestamp and value.


In [None]:
dfStream = (spark
                .readStream
                .format("rate")
                .option("rowsPerSecond", 10)
                .load()
            )

### 4. Format the stream dataframe as per the IoTSignals schema


In [None]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
import uuid

numberOfDevices = 10
generate_uuid = F.udf(lambda : str(uuid.uuid4()), StringType())
              
dfIoTSignals = (dfStream
                    .withColumn("id", generate_uuid())
                    .withColumn("dateTime", dfStream["timestamp"].cast(StringType()))
                    .withColumn("deviceId", F.concat(F.lit("dev-"), F.expr("mod(value, %d)" % numberOfDevices)+1))
                    .withColumn("measureType", F.expr("CASE WHEN rand() < 0.5 THEN 'Rotation Speed' ELSE 'Output' END"))
                    .withColumn("unitSymbol", F.expr("CASE WHEN rand() < 0.5 THEN 'RPM' ELSE 'MW' END"))
                    .withColumn("unit", F.expr("CASE WHEN rand() < 0.5 THEN 'Revolutions per Minute' ELSE 'MegaWatts' END"))
                    .withColumn("measureValue", F.expr("CASE WHEN rand() > 0.9 THEN value * 2 WHEN rand() < 0.1 THEN value div 2 ELSE value END"))
                    .drop("timestamp")
                )

### 5. Stream writes to the Azure Cosmos DB Collection


In [None]:
import time

streamQuery = dfIoTSignals\
        .writeStream\
        .format("cosmos.oltp")\
        .outputMode("append")\
        .option("checkpointLocation", "/writeCheckpointDir")\
        .option("spark.synapse.linkedService", "CosmosDemo")\
        .option("spark.cosmos.container", "IoTStreamingSignals")\
        .option("spark.cosmos.connection.mode", "gateway")\
        .start()

time.sleep(120)
streamQuery.stop()

# IoT Investigation Scenario

### 1. Create Spark tables pointing to the Azure Cosmos DB Analytical Store collections using Azure Synapse Link 



In [None]:
%%sql
create database CosmosDemoIoT

In [None]:
%%sql
create database CosmosDemoRetail

In [None]:
%%sql

create table if not exists CosmosDemoIoT.IoTSignals
using cosmos.olap
options(spark.synapse.linkedService 'CosmosDemo',
        spark.cosmos.container 'IoTSignals')

In [None]:
%%sql

create table if not exists CosmosDemoIoT.IoTDeviceInfo
using cosmos.olap
options(spark.synapse.linkedService 'CosmosDemo',
        spark.cosmos.container 'IoTDeviceInfo')

In [None]:
%%sql

create table if not exists CosmosDemoRetail.RetailProducts
using cosmos.olap
options(spark.synapse.linkedService 'CosmosDemo',
        spark.cosmos.container 'RetailProducts')

In [None]:
%%sql

create table if not exists CosmosDemoRetail.RetailSales
using cosmos.olap
options(spark.synapse.linkedService 'CosmosDemo',
        spark.cosmos.container 'RetailSales')

In [None]:
%%sql

create table if not exists CosmosDemoRetail.RetailStoreDemographics
using cosmos.olap
options(spark.synapse.linkedService 'CosmosDemo',
        spark.cosmos.container 'RetailStoreDemographics')

### 2. Perform Joins across collections, apply filters and aggregations using Spark SQL 


In [None]:
df_RPM_details = spark.sql("select a.deviceid \
                                 , b.devicetype \
                                 , cast(b.location as string) as location\
                                 , cast(b.latitude as float) as latitude\
                                 , cast(b.longitude as float) as  longitude\
                                 , a.measuretype \
                                 , a.unitSymbol \
                                 , cast(sum(measureValue) as float) as measureValueSum \
                                 , count(*) as count \
                            from CosmosDemoIoT.IoTSignals a \
                            left join CosmosDemoIoT.IoTDeviceInfo b \
                            on a.deviceid = b.deviceid \
                            where a.unitSymbol = 'RPM' \
                            group by a.deviceid, b.devicetype, b.location, b.latitude, b.longitude, a.measuretype, a.unitSymbol")

display(df_RPM_details)

### 3. Visualizations using plotly and displayHTML()
The below shows a heatmap of IoT signals across diffrent locations


In [None]:
from plotly.offline import plot
import plotly.express as px

df_RPM_details_pd = df_RPM_details.toPandas()
fig = px.scatter_mapbox(df_RPM_details_pd, 
                        lat='latitude', 
                        lon='longitude', 
                        size = 'measureValueSum',
                        color = 'measureValueSum',
                        hover_name = 'location',
                        hover_data = ['measureValueSum','location'],
                        size_max = 30,
                        color_continuous_scale = px.colors.carto.Temps,
                        zoom=3,
                        height=600,
                        width =900)

fig.update_layout(mapbox_style='open-street-map')
fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0})

p = plot(fig,output_type='div')
displayHTML(p)       

### 4. Loda the data in Cosmos DB Analytical store collection 

In [None]:
df_IoTSignals = spark.read\
                    .format("cosmos.olap")\
                    .option("spark.synapse.linkedService", "CosmosDemo")\
                    .option("spark.cosmos.container", "IoTSignals")\
                    .load()

### 5. Data exploration using pyplot

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

df_IoTSignals_pd = df_IoTSignals.toPandas()
df_dev = df_IoTSignals_pd[(df_IoTSignals_pd.deviceId == "dev-1")]
df_dev = df_dev.dropna()
df_dev = df_dev.astype({"measureValue": int})
#display(df_dev)
df_dev = df_dev.pivot(index='dateTime', columns = 'unitSymbol' , values =  'measureValue')
df_dev['timestamp']=df_dev.index
df_dev['index']=list(range(len(df_dev)))
df_dev.set_index('index',inplace=True)
df_dev.plot(y='MW', x= 'timestamp', color='green',figsize=(20,5), label = 'Output MW')
plt.title('MW TimeSeries')
df_dev.plot(y='RPM', x= 'timestamp', color='black', figsize=(20,5), label = 'RPM')
plt.title('RPM TimeSeries')
plt.legend(loc = 'best')
plt.show()

### 6. Perform anomaly detection using Microsoft Machine Learning for Spark (MMLSpark)

In [None]:
from pyspark.sql.functions import col
from mmlspark.cognitive import SimpleDetectAnomalies
from mmlspark.core.spark import FluentAPI

anomaly_detector = (SimpleDetectAnomalies()
                            .setSubscriptionKey("<Anomaly Detector Access Key>")
                            .setUrl("<Anomaly Detector Endpoint>/anomalydetector/v1.0/timeseries/entire/detect")
                            .setLocation('koreacentral')
                            .setOutputCol("anomalies")
                            .setGroupbyCol("grouping")
                            .setSensitivity(95)
                            .setGranularity("secondly"))

df_anomaly = (df_IoTSignals
                    .where(col("unitSymbol") == 'RPM')
                    .withColumnRenamed("dateTime", "timestamp")
                    .withColumn("value", col("measureValue").cast("double"))
                    .withColumn("grouping", col("deviceId"))
                    .mlTransform(anomaly_detector))

df_anomaly.createOrReplaceTempView('df_anomaly')

display(df_anomaly)

### 7. Format the dataframe for visualization

In [None]:
df_anomaly_single_device = spark.sql("select timestamp \
                                            , measureValue \
                                            , anomalies.expectedValue \
                                            , anomalies.expectedValue + anomalies.upperMargin as expectedUpperValue \
                                            , anomalies.expectedValue - anomalies.lowerMargin as expectedLowerValue \
                                            , case when anomalies.isAnomaly=true then 1 else 0 end as isAnomaly \
                                        from df_anomaly \
                                        where deviceid = 'dev-1' and timestamp < '2020-12-29'\
                                        order by timestamp \
                                        limit 200")

display(df_anomaly_single_device)  

### 8. Visualize the anomalies using plotly
* Plot Expected value, Upper Value, Lower Value and Actual Value along with Anomaly flag

In [None]:
#import chart_studio.plotly as py
import plotly.graph_objs as go
from plotly.offline import plot
import matplotlib.pyplot as plt
from pyspark.sql.functions import col
from matplotlib.pyplot import figure
 
adf = df_anomaly_single_device.toPandas()
adf_subset = df_anomaly_single_device.where(col("isAnomaly") == 1).toPandas() 

plt.figure(figsize=(23,8))
plt.plot(adf['timestamp'],adf['expectedUpperValue'], color='darkred', linestyle='solid', linewidth=0.25)
plt.plot(adf['timestamp'],adf['expectedValue'], color='darkgreen', linestyle='solid', linewidth=2)
plt.plot(adf['timestamp'],adf['measureValue'], 'b', color='royalblue', linestyle='dotted', linewidth=2)
plt.plot(adf['timestamp'],adf['expectedLowerValue'],  color='black', linestyle='solid', linewidth=0.25)
plt.plot(adf_subset['timestamp'],adf_subset['measureValue'], 'ro')
plt.legend(['RPM-UpperMargin', 'RPM-ExpectedValue', 'RPM-ActualValue', 'RPM-LowerMargin', 'RPM-Anomaly'])
plt.title('RPM Anomalies with Expected, Actual, Upper and Lower Values')
plt.show()

# Surface Notebook Sales Prediction Model via Auto ML


### 9. Perform Joins across collections, apply filters and aggregations using Spark SQL from Azure Cosmos DB


In [None]:
data = spark.sql("select a.storeId \
                       , b.productCode \
                       , b.wholeSaleCost \
                       , b.basePrice \
                       , c.ratioAge60 \
                       , c.collegeRatio \
                       , c.income \
                       , c.highIncome150Ratio \
                       , c.largeHH \
                       , c.minoritiesRatio \
                       , c.more1FullTimeEmployeeRatio \
                       , c.distanceNearestWarehouse \
                       , c.salesNearestWarehousesRatio \
                       , c.avgDistanceNearest5Supermarkets \
                       , c.salesNearest5StoresRatio \
                       , a.quantity \
                       , a.logQuantity \
                       , a.advertising \
                       , a.price \
                       , a.weekStarting \
                 from CosmosDemoRetail.RetailSales a \
                 left join CosmosDemoRetail.RetailProducts b \
                 on a.productcode = b.productcode \
                 left join CosmosDemoRetail.RetailStoreDemographics c \
                 on a.storeId = c.storeId \
                 order by a.weekStarting, a.storeId, b.productCode")

display(data)

### 10. Azure Machine Learning's environment setup for AutoML to build a Forecasting Model


In [None]:
import azureml.core
import pandas as pd
import numpy as np
import logging
from azureml.core.workspace import Workspace
from azureml.core import Workspace
from azureml.core.experiment import Experiment
from azureml.train.automl import AutoMLConfig
import os
subscription_id = os.getenv("SUBSCRIPTION_ID", default="<Your Subscription ID>")
resource_group = os.getenv("RESOURCE_GROUP", default="mzc-rg")
workspace_name = os.getenv("WORKSPACE_NAME", default="mzcmlworkspace")
workspace_region = os.getenv("WORKSPACE_REGION", default="Korea Central")

ws = Workspace(subscription_id = subscription_id, resource_group = resource_group, workspace_name = workspace_name)
ws.write_config()
    
experiment_name = 'automl-surfaceforecasting'
experiment = Experiment(ws, experiment_name)
output = {}
output['Subscription ID'] = ws.subscription_id
output['Workspace'] = ws.name
output['SKU'] = ws.sku
output['Resource Group'] = ws.resource_group
output['Location'] = ws.location
output['Run History Name'] = experiment_name
pd.set_option('display.max_colwidth', -1)
outputDf = pd.DataFrame(data = output, index = [''])
outputDf.T

### 11. Data Preparation - Feature engineering, Splitting train & test datasets



In [None]:
# Initial variables
time_column_name = 'weekStarting'
grain_column_names = ['storeId', 'productCode']
target_column_name = 'quantity'
use_stores = [2, 5, 8,71,102]
n_test_periods = 20


#DataFrame
df = data.toPandas()
df[time_column_name] = pd.to_datetime(df[time_column_name])
df['storeId'] = pd.to_numeric(df['storeId'])
df['quantity'] = pd.to_numeric(df['quantity'])
df['advertising'] = pd.to_numeric(df['advertising'])
df['price'] = df['price'].astype(float)
df['basePrice'] = df['basePrice'].astype(float)
df['ratioAge60'] = df['ratioAge60'].astype(float)
df['collegeRatio'] = df['collegeRatio'].astype(float)
df['highIncome150Ratio'] = df['highIncome150Ratio'].astype(float)
df['income'] = df['income'].astype(float)
df['largeHH'] = df['largeHH'].astype(float)
df['minoritiesRatio'] = df['minoritiesRatio'].astype(float)
df['logQuantity'] = df['logQuantity'].astype(float)
df['more1FullTimeEmployeeRatio'] = df['more1FullTimeEmployeeRatio'].astype(float)
df['distanceNearestWarehouse'] = df['distanceNearestWarehouse'].astype(float)
df['salesNearestWarehousesRatio'] = df['salesNearestWarehousesRatio'].astype(float)
df['avgDistanceNearest5Supermarkets'] = df['avgDistanceNearest5Supermarkets'].astype(float)
df['salesNearest5StoresRatio'] = df['salesNearest5StoresRatio'].astype(float)


# Time Series
data_subset = df[df.storeId.isin(use_stores)]
nseries = data_subset.groupby(grain_column_names).ngroups
print('Data subset contains {0} individual time-series.'.format(nseries))

# Group by date
def split_last_n_by_grain(df, n):
    """Group df by grain and split on last n rows for each group."""
    df_grouped = (df.sort_values(time_column_name) # Sort by ascending time
                  .groupby(grain_column_names, group_keys=False))
    df_head = df_grouped.apply(lambda dfg: dfg.iloc[:-n])
    df_tail = df_grouped.apply(lambda dfg: dfg.iloc[-n:])
    return df_head, df_tail

# splitting
train, test = split_last_n_by_grain(data_subset, n_test_periods)
print(len(train),len(test))
train.to_csv (r'./SurfaceSales_train.csv', index = None, header=True)
test.to_csv (r'./SurfaceSales_test.csv', index = None, header=True)
datastore = ws.get_default_datastore()
datastore.upload_files(files = ['./SurfaceSales_train.csv', './SurfaceSales_test.csv'], target_path = 'dataset/', overwrite = True,show_progress = True)

# loading the train dataset
from azureml.core.dataset import Dataset
train_dataset = Dataset.Tabular.from_delimited_files(path=datastore.path('dataset/SurfaceSales_train.csv'))

### 12. Training the Models using AutoML Forecasting

Please notice that **compute_target** is commented, meaning that the model training will run locally in Synapse Spark.


In [None]:
# Parameters
time_series_settings = {
    'time_column_name': time_column_name,
    'grain_column_names': grain_column_names,
    'max_horizon': n_test_periods
}

# Config
automl_config = AutoMLConfig(task='forecasting',
                             debug_log='automl_ss_sales_errors.log',
                             primary_metric='normalized_mean_absolute_error',
                             experiment_timeout_hours=0.25,
                             training_data=train_dataset,
                             label_column_name=target_column_name,
                             #compute_target=compute_target,
                             enable_early_stopping=True,
                             n_cross_validations=3,
                             verbosity=logging.INFO,
                             **time_series_settings)

# Running the training
remote_run = experiment.submit(automl_config, show_output=True)

### 13. Retrieving the Best Model and Forecasting


In [None]:
# Retrieving the best model
best_run, fitted_model = remote_run.get_output()
print(fitted_model.steps)
model_name = best_run.properties['model_name']
print(model_name)

# Forecasting based on test dataset
X_test = test
y_test = X_test.pop(target_column_name).values
X_test[time_column_name] = pd.to_datetime(X_test[time_column_name])
y_predictions, X_trans = fitted_model.forecast(X_test)

### 14. Plotting the Results

At this point you should have a chart that created with AutoML and MatplotLib. 

The results are that good because of the **logQuantity** column, a  data Leakage calculated from **quantity** column. You can try to run the same experiment without it.


In [None]:
import pandas as pd
import numpy as np
from pandas.tseries.frequencies import to_offset


def align_outputs(y_predicted, X_trans, X_test, y_test, target_column_name,
                  predicted_column_name='predicted',
                  horizon_colname='horizon_origin'):
    """
    Demonstrates how to get the output aligned to the inputs
    using pandas indexes. Helps understand what happened if
    the output's shape differs from the input shape, or if
    the data got re-sorted by time and grain during forecasting.

    Typical causes of misalignment are:
    * we predicted some periods that were missing in actuals -> drop from eval
    * model was asked to predict past max_horizon -> increase max horizon
    * data at start of X_test was needed for lags -> provide previous periods
    """

    if (horizon_colname in X_trans):
        df_fcst = pd.DataFrame({predicted_column_name: y_predicted,
                                horizon_colname: X_trans[horizon_colname]})
    else:
        df_fcst = pd.DataFrame({predicted_column_name: y_predicted})

    # y and X outputs are aligned by forecast() function contract
    df_fcst.index = X_trans.index

    # align original X_test to y_test
    X_test_full = X_test.copy()
    X_test_full[target_column_name] = y_test

    # X_test_full's index does not include origin, so reset for merge
    df_fcst.reset_index(inplace=True)
    X_test_full = X_test_full.reset_index().drop(columns='index')
    together = df_fcst.merge(X_test_full, how='right')

    # drop rows where prediction or actuals are nan
    # happens because of missing actuals
    # or at edges of time due to lags/rolling windows
    clean = together[together[[target_column_name,
                               predicted_column_name]].notnull().all(axis=1)]
    return(clean)


df_all = align_outputs(y_predictions, X_trans, X_test, y_test, target_column_name)

#from azureml.automl.core._vendor.automl.client.core.common import metrics
from matplotlib import pyplot as plt
from automl.client.core.common import constants

# use automl metrics module
#scores = metrics.compute_metrics_regression(
#    df_all['predicted'],
#    df_all[target_column_name],
#    list(constants.Metric.SCALAR_REGRESSION_SET),
#    None, None, None)

#print("[Test data scores]\n")
#for key, value in scores.items():    
#    print('{}:   {:.3f}'.format(key, value))
    
# Plot outputs
#%matplotlib inline
test_pred = plt.scatter(df_all[target_column_name], df_all['predicted'], color='b')
test_test = plt.scatter(df_all[target_column_name], df_all[target_column_name], color='g')
plt.legend((test_pred, test_test), ('prediction', 'truth'), loc='upper left', fontsize=8)
plt.show()

### 15. Register Model

In [None]:
model_name = best_run.properties['model_name']

script_file_name = 'inference/score.py'
conda_env_file_name = 'inference/env.yml'

best_run.download_file('outputs/scoring_file_v_1_0_0.py', 'inference/score.py')
best_run.download_file('outputs/conda_env_v_1_0_0.yml', 'inference/env.yml')

registered_model = remote_run.register_model(model_name = model_name)

print(remote_run.model_id)

### 16. Deploy Web Service as Azure Container Instance

In [None]:
from azureml.core.webservice import AciWebservice, Webservice

aci_config = AciWebservice.deploy_configuration(
   cpu_cores = 1, 
   memory_gb = 2, 
   tags = {'name':'scoring'}, 
   description = 'Scoring web service')

from azureml.core.model import InferenceConfig
inference_config = InferenceConfig(runtime="python", 
                                       entry_script=script_file_name,
                                       conda_file=conda_env_file_name)

from azureml.core.model import Model
webservice = Model.deploy(workspace=ws,
                              name="scoringservice",
                              models=[registered_model],
                              inference_config=inference_config,
                              deployment_config=aci_config)

### 20. Cleansing Resources

In [None]:
%%sql

drop table CosmosDemoIoT.IoTSignals

In [None]:
%%sql

drop table CosmosDemoIoT.IoTDeviceInfo

In [None]:
%%sql

drop table CosmosDemoRetail.RetailProducts

In [None]:
%%sql

drop table CosmosDemoRetail.RetailSales

In [None]:
%%sql

drop table CosmosDemoRetail.RetailStoreDemographics

In [None]:
%%sql
drop database CosmosDemoIoT

In [None]:
%%sql

drop database CosmosDemoRetail