In [46]:
%%pyspark
blob_account_name = "pandemicdatalake"
blob_container_name = "public"
blob_relative_path = "curated/covid-19/bing_covid-19_data/latest/bing_covid-19_data.parquet"
blob_sas_token = r""
# Allow SPARK to read from Blob remotely
wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)

spark.conf.set(
    'fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name),
    blob_sas_token)
df = spark.read.parquet(wasbs_path)
display(df.limit(10))

StatementMeta(SparkPool24, 3, 47, Finished, Available)

SynapseWidget(Synapse.DataFrame, e7786b44-3859-4a38-917a-d633350c3f1a)

In [47]:
#filtered_df = df[df.country_region == 'France']
df.describe().show()

StatementMeta(SparkPool24, 3, 48, Finished, Available)

+-------+--------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+-------+-------+--------------+--------------+---------------+--------------+
|summary|                  id|         confirmed|  confirmed_change|            deaths|     deaths_change|        recovered|  recovered_change|          latitude|         longitude|   iso2|   iso3|country_region|admin_region_1|iso_subdivision|admin_region_2|
+-------+--------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+-------+-------+--------------+--------------+---------------+--------------+
|  count|             3051712|           3051712|           3046194|           2966180|           2955138|          1007464|            998455|           3051023|           3051023|3047595|3047595|       3051712|       2904

In [48]:
from datetime import datetime
from pyspark.sql.functions import *

# To make development easier, faster, and less expensive, downsample for now
#sampled_df = df#.sample(True, 0.001, seed=1234)

all_df = df.select('confirmed', 'deaths').filter(df.country_region.isNotNull())
all_df.show(10)
#all_df.withColumn("confirmed",col("confirmed").cast("numeric"))

all_df = all_df \
    .withColumn('confirmed', col('confirmed').cast('double')) \
    .withColumn('deaths', col('deaths').cast('double'))

#all_df = all_df.to_numeric(all_df, errors='ignore')
all_df.dropna(how='any')
all_df.show(10)

StatementMeta(SparkPool24, 3, 49, Finished, Available)

+---------+------+
|confirmed|deaths|
+---------+------+
|      262|     0|
|      313|     0|
|      578|     0|
|      841|     0|
|     1320|     0|
|     2014|     0|
|     2798|     0|
|     4593|     0|
|     6065|     0|
|     7818|     0|
+---------+------+
only showing top 10 rows

+---------+------+
|confirmed|deaths|
+---------+------+
|    262.0|   0.0|
|    313.0|   0.0|
|    578.0|   0.0|
|    841.0|   0.0|
|   1320.0|   0.0|
|   2014.0|   0.0|
|   2798.0|   0.0|
|   4593.0|   0.0|
|   6065.0|   0.0|
|   7818.0|   0.0|
+---------+------+
only showing top 10 rows

In [49]:
# Random split dataset using Spark; convert Spark to pandas
training_data, validation_data = all_df.randomSplit([0.8,0.2], 223)
training_data.count()

StatementMeta(SparkPool24, 3, 50, Finished, Available)

2440220

In [50]:
from azureml.core import Workspace

# Enter your subscription id, resource group, and workspace name.
subscription_id = "f46d5b09-3ce0-45c9-bd29-d2f0b2691914" #you should be owner or contributor
resource_group = "BigData" #you should be owner or contributor
workspace_name = "BigDataMachineLearning" #your workspace name

ws = Workspace(workspace_name = workspace_name,
               subscription_id = subscription_id,
               resource_group = resource_group)

StatementMeta(SparkPool24, 3, 51, Finished, Available)

In [51]:
import pandas 
from azureml.core import Dataset

# Get the Azure Machine Learning default datastore
datastore = ws.get_default_datastore()
training_pd = training_data.toPandas().to_csv('training_pd.csv', index=False)

# Convert into an Azure Machine Learning tabular dataset
datastore.upload_files(files = ['training_pd.csv'],
                       target_path = 'train-dataset/tabular/',
                       overwrite = True,
                       show_progress = True)
dataset_training = Dataset.Tabular.from_delimited_files(path = [(datastore, 'train-dataset/tabular/training_pd.csv')])

StatementMeta(SparkPool24, 3, 52, Finished, Available)

Uploading an estimated of 1 files
Uploading training_pd.csv
Uploaded training_pd.csv, 1 files out of an estimated total of 1
Uploaded 1 files

In [52]:
import logging

automl_settings = {
    "iteration_timeout_minutes": 10,
    "experiment_timeout_minutes": 30,
    "enable_early_stopping": True,
    "primary_metric": 'r2_score',
    "featurization": 'auto',
    "verbosity": logging.INFO,
    "n_cross_validations": 2}

dataset_training.printSchema()

StatementMeta(SparkPool24, 3, 53, Finished, Available)

AttributeError: 'TabularDataset' object has no attribute 'printSchema'

In [None]:
from azureml.train.automl import AutoMLConfig

automl_config = AutoMLConfig(task='regression',
                             debug_log='automated_ml_errors.log',
                             training_data = dataset_training,
                             spark_context = sc,
                             model_explainability = False, 
                             label_column_name ="deaths",**automl_settings)

StatementMeta(, , , Cancelled, )

In [None]:
from azureml.core.experiment import Experiment

# Start an experiment in Azure Machine Learning
experiment = Experiment(ws, "aml-synapse-regression")
tags = {"Synapse": "regression"}
local_run = experiment.submit(automl_config, show_output=True, tags = tags)

# Use the get_details function to retrieve the detailed output for the run.
run_details = local_run.get_details()

StatementMeta(, , , Cancelled, )

In [None]:
# Get best model
best_run, fitted_model = local_run.get_output()

StatementMeta(, , , Cancelled, )

In [None]:
# Test best model accuracy
validation_data_pd = validation_data.toPandas()
y_test = validation_data_pd.pop("deaths").to_frame()
y_predict = fitted_model.predict(validation_data_pd)

StatementMeta(, , , Cancelled, )

In [None]:
from sklearn.metrics import mean_squared_error
from math import sqrt

# Calculate root-mean-square error
y_actual = y_test.values.flatten().tolist()
rmse = sqrt(mean_squared_error(y_actual, y_predict))

print("Root Mean Square Error:")
print(rmse)

StatementMeta(, , , Cancelled, )

In [None]:
# Calculate mean-absolute-percent error and model accuracy 
sum_actuals = sum_errors = 0

for actual_val, predict_val in zip(y_actual, y_predict):
    abs_error = actual_val - predict_val
    if abs_error < 0:
        abs_error = abs_error * -1

    sum_errors = sum_errors + abs_error
    sum_actuals = sum_actuals + actual_val

mean_abs_percent_error = sum_errors / sum_actuals

print("Model MAPE:")
print(mean_abs_percent_error)
print()
print("Model Accuracy:")
print(1 - mean_abs_percent_error)

StatementMeta(, , , Cancelled, )

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import mean_squared_error, r2_score

# Calculate the R2 score by using the predicted and actual fare prices
y_test_actual = y_test["deaths"]
r2 = r2_score(y_test_actual, y_predict)

# Plot the actual versus predicted fare amount values
plt.style.use('ggplot')
plt.figure(figsize=(10, 7))
plt.scatter(y_test_actual,y_predict)
plt.plot([np.min(y_test_actual), np.max(y_test_actual)], [np.min(y_test_actual), np.max(y_test_actual)], color='lightblue')
plt.xlabel("confirmed")
plt.ylabel("deaths")
plt.title("Actual vs Predicted R^2={}".format(r2))
plt.show()

StatementMeta(, , , Cancelled, )

In [None]:
description = 'Covid 19 ML model'
model_path='outputs/model.pkl'
model = best_run.register_model(model_name = 'Covid19', model_path = model_path, description = description)
print(model.name, model.version)

StatementMeta(, , , Cancelled, )