## Sample notebook showing end-to-end Sales prediction use case using the FedML Databricks Library. 

### The FedML Databricks Library reads the training data via SAP Datasphere, trains the model in Databricks, deploys the model in Databricks and the inference result is written back to SAP Datasphere.

### Install fedml_databricks library

In [None]:
%pip install fedml-databricks --no-cache-dir --upgrade --force-reinstall

In [None]:
import numpy as np
import pandas as pd
import json
from fedml_databricks import DbConnection,predict

### 1. Connect to SAP Datasphere , Explore & Acquire Data

#### 1.1 Create a Databricks Secret to store the SAP Datasphere connection credentials securely and connect to SAP Datasphere.

Create a Databricks Secret Scope by referring the [(link)](https://docs.databricks.com/security/secrets/secret-scopes.html#create-a-databricks-backed-secret-scope). Then, create the Databricks Secret containing SAP Datasphere credentials in the form of json, using the [(link)](https://docs.databricks.com/security/secrets/secrets.html#create-a-secret-in-a-databricks-backed-scope). The  SAP Datasphere connection credentials can be obtained by completing the pre-requisite step using the [(link)](https://github.com/SAP-samples/data-warehouse-cloud-fedml/blob/main/Databricks/docs/dbconnection.md#pre-requisite#pre-requisite)

In [None]:
config_str=dbutils.secrets.get('<databricks-secret-scope>','<databricks-secret-key>')
config=json.loads(config_str)

In [None]:
db = DbConnection(dict_obj=config)

#### 1.2 List all business models available to read from

In [None]:
data= db.get_schema_views()
data

#### 1.3 Query the SAP Datasphere data using SQL Queries. Get the data as a PySpark DataFrame

In [None]:
spark_df=db.execute_query_pyspark('SELECT * FROM \"DEMOSALESANALYSIS\".\"PP_Gross_Sales_S4\"')
spark_df.show(truncate=False)

##### 1.3.1 Get Insights from the data. In the below cell, we get the average projected sales for the year '2021'

In [None]:
average_sales_for_2021_df=spark_df.filter(spark_df['YEAR_Label']=='2021').groupBy().avg('Projected Sales Volume')
average_sales_for_2021_df.show(truncate=False)

##### 1.3.2 Convert the PySpark DataFrame to Pandas DataFrame

In [None]:
dataframe=spark_df.toPandas()

#### 1.4 Preprocess the data

##### 1.4.1 Replace the zero values with the mean values in few of the selected columns

In [None]:
dataframe=dataframe.replace({'GROSSAMOUNT_1': {0: dataframe['GROSSAMOUNT_1'].mean(skipna=True)}}) 
dataframe=dataframe.replace({'Gross amount': {0: dataframe['Gross amount'].mean(skipna=True)}}) 
dataframe=dataframe.replace({'Projected Sales Volume': {0: dataframe['Projected Sales Volume'].mean(skipna=True)}}) 

##### 1.4.2 Perform One Hot Encoding on the Categorical columns

Note that if you use a lower version of sklearn, you will have to replace get_feature_names_out() with get_feature_names() in the below cell.

In [None]:
from sklearn.preprocessing import OneHotEncoder
def one_hot_encode(df,column):
    encoder = OneHotEncoder(handle_unknown='ignore')
    encoder_df = pd.DataFrame(encoder.fit_transform(df[[column]]).toarray())
    encoded_columns = encoder.get_feature_names_out([column])
    encoder_df.columns = encoded_columns
    return encoder_df,encoded_columns

In [None]:
encoded_column_names=[]
for column in ['Country','YEAR']:
    encoded_df,encoded_columns=one_hot_encode(dataframe,column)
    dataframe = dataframe.join(encoded_df)
    encoded_column_names += encoded_columns.tolist()

In [None]:
dataframe.head(10)

### 2. Now, using the data,  train the model

In [None]:
import os,json
import pandas as pd
import mlflow 
from sklearn.linear_model import LinearRegression
import numpy as np
from sklearn.model_selection import train_test_split

label_column = 'Projected Sales Volume'
y = dataframe[label_column]
dataframe.drop(label_column, axis=1, inplace=True)
X_train, X_test, y_train, y_test = train_test_split(dataframe , y, test_size=0.3)

#### 2.1 Use the columns required for training the model

In [None]:
train_columns=['GROSSAMOUNT_1','Gross amount']+encoded_column_names
X_train_dataframe,X_test_dataframe=X_train[train_columns],X_test[train_columns]

#### 2.2 Train the model and log results using mlflow

In [None]:
def train_model(X_train,X_test, y_train, y_test,experiment_name,model_name):
    mlflow.set_experiment(experiment_name) 
    print("Training model...")

    #Train the LinearRegression model using the fit method
    with mlflow.start_run() as run:
        model = LinearRegression().fit(X_train_dataframe, y_train)
        score = model.score(X_test_dataframe, y_test)
        mlflow.log_param("score",score)
        mlflow.sklearn.log_model(model,model_name,
                         registered_model_name = model_name)
        
    run_id = run.info.run_id
    return run_id


Replace the user with the appropriate databricks user in the below cell.

In [None]:
experiment_name,model_name='/Users/<user>/SalesPredictionExperiment','SalesPredictionModel'
run_id=train_model(X_train,X_test, y_train, y_test,experiment_name,model_name)

In [None]:
model_uri=f"runs:/{run_id}/{model_name}"
print("The MODEL_URI is '{}'".format(model_uri))

### 3. Register the  model

In [None]:
import time
model_version = mlflow.register_model(model_uri=model_uri,name=model_name)
 
# Registering the model takes a few seconds, so add a small delay
time.sleep(15)

### 4. Transition the model to production

In [None]:
from mlflow.tracking import MlflowClient
 
client = MlflowClient()
client.transition_model_version_stage(
  name=model_name,
  version=model_version.version,
  stage="Production",
)

### 5. Inference the deployed model by passing the test data

In [None]:
X_test_dataframe['GROSSAMOUNT_1'] = X_test_dataframe['GROSSAMOUNT_1'].astype(float)
X_test_dataframe['Gross amount'] = X_test_dataframe['Gross amount'].astype(float)

In [None]:
import pandas as pd
model = mlflow.pyfunc.load_model(f"models:/{model_name}/production")
result=model.predict(X_test_dataframe)
inference_dataframe=pd.DataFrame(result,columns=['prediction_result'])
inference_dataframe

### 6. Store the inferencing result in SAP Datasphere

#### 6.1 Store the inference result in the pandas dataframe

In [None]:
X_test['PredictedSalesVolume']=inference_dataframe['prediction_result'].values

#### 6.2 Select the required columns from pandas dataframe

In [None]:
X_test.columns

In [None]:
datasphere_write_dataframe=X_test[['Country_Label', 'Country','YEAR_Label', 'YEAR','HarmonizedCountryDimension_COUNTRYCODE','Gross amount', 'Value', 'Value_1', 'Gross amount_1', 'GROSSAMOUNT_1','PredictedSalesVolume']]

#### 6.3 Renaming the columns in the pandas dataframe

In [None]:
datasphere_write_dataframe.rename(columns = {'Gross amount':'Gross_amount', 'Gross amount_1':'Gross_amount_1'}, inplace = True)

In [None]:
datasphere_write_dataframe

#### 6.4 Create a table in SAP Datasphere for storing the inference result

In [None]:
db.create_table("CREATE TABLE SALES_TABLE (Country_Label Varchar(20),Country Varchar(20),YEAR_Label Varchar(20),YEAR Varchar(20),HarmonizedCountryDimension_COUNTRYCODE Varchar(20), Gross_amount FLOAT,Value FLOAT,Value_1 FLOAT,Gross_amount_1 FLOAT,GROSSAMOUNT_1 FLOAT,PredictedSalesVolume FLOAT)")

#### 6.5 Write the prediction results to 'SALES_TABLE' table in SAP Datasphere

In [None]:
db.insert_into_table('SALES_TABLE',datasphere_write_dataframe)