# Welcome to the AIShield Integration Notebook!

**We are delighted to have you here!**

## What to Expect !!
This notebook serves as a comprehensive demonstration of the seamless integration capabilities of AIShield. As a powerful tool for securing your AI/ML assets against supply-chain threats, AIShield ensures the integrity and robustness of your models, preventing financial loss, protecting brand reputation, and mitigating the risk of intellectual property theft.

## What to Do !!
- **Explore the notebook** at your own pace to gain deep insights into the functionalities and features offered by AIShield.
- **Run the first code cell/block**, which includes **Support functionalities** crucial for the smooth execution of the notebook. This setup will enhance your experience and enable the seamless integration of AIShield.
- Even without running the entire notebook, you can get a glimpse of AIShield's capabilities and its approach to vulnerability assessments through sample scenarios.
- **Run the code cells** to witness firsthand how AIShield seamlessly integrates with your existing codebase, empowering you to protect your AI models against adversarial attacks.
- **Experiment and adapt the code** to suit your specific use cases, leveraging the flexibility and customization options provided by AIShield.
- Should you have any questions or require assistance, our dedicated [AIShield.Contact@bosch.com](mailto:AIShield.Contact@bosch.com) is just a click away.

## What You'll Get !!
Throughout this notebook, you'll find:
- Code snippets demonstrating step-by-step procedures for incorporating AIShield into your existing workflows, ensuring comprehensive protection against supply-chain threats.
- Detailed guidance on the integration process, including any additional inputs required for thorough vulnerability analysis.
- Report artifacts providing valuable insights into potential attack scenarios

## <span style="color:teal">AIShield Website and LinkedIn</span>
To learn more about AIShield and its cutting-edge features, visit the official [AIShield website](https://www.boschaishield.com/). Connect with AIShield on [LinkedIn](https://www.linkedin.com/company/bosch-aishield/about/) to stay updated on the latest advancements in AI security.

We hope this notebook empowers you to seamlessly integrate AIShield into your projects, ensuring the utmost protection for your AI/ML assets. If you have any questions or need further assistance, please don't hesitate to reach out. **Happy exploring!**


<a target="_blank" href="https://colab.research.google.com/github/bosch-aisecurity-aishield/Reference-Implementations/blob/main/Product_Taskpair_wise/Supply_Chain_Attack_Analysis/Supplychain_Analysis_Reference_Implementation.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>


## 1.0 Support Functionalities:

In [None]:
"""
Description : Use PyPi or AIShield API for Vulnerability Analysis
              if PyPi = True (this notebook will use aishield pypi package for vulnerability analysis)
              else = False (this notebook will use aishield api for vulnerability analysis)
"""
pypi = False

In [None]:
def make_code_input_scrollable():
    """Description : To make the input code cell scrollable for jupyternotebook"""
    from IPython.display import display, HTML
    display(HTML('''
        <style>
        .scrollable-code-input {
            max-height: 500px;
            overflow-y: scroll;
        }
        </style>
        <script>
        var codeInputCells = document.querySelectorAll('div.code_cell .input');
        codeInputCells.forEach(function(cell) {
            cell.classList.add('scrollable-code-input');
        });
        </script>
    '''))

make_code_input_scrollable()

def display_pdf(runtime_environment,output_filename, pdf_path=None, file_id=None):

    """Descrption : To embeded pdf file in the ntoebook"""

    from IPython.display import display, HTML
    import os
    output_file = output_filename+'.html'

    # Check if the output file exists
    if os.path.exists(output_file):
        with open(output_file, 'r') as f:
            saved_output = f.read()
            display(HTML(saved_output))
    else:
        if 'colab' in runtime_environment.lower():
            # Generate the Google Drive Viewer URL
            if file_id is None:
                print(pdf_path)
                # Generate the Google Drive Viewer URL
                drive_viewer_url = f'https://drive.google.com/viewerng/viewer?embedded=true&url={pdf_path}'
                html_code = f'<iframe src="{drive_viewer_url}" width="100%" height="600px"></iframe>'
            else:
                viewer_url = f"https://drive.google.com/file/d/{file_id}/preview"
                html_code = f'<iframe src="{viewer_url}" width="100%" height="600px"></iframe>'
        else:
            html_code = f'<embed src="{pdf_path}" width="100%" height="600px" type="application/pdf">'
        # Save the output to a file
        with open(output_file, 'w') as f:
            f.write(html_code)

        # Display the output
        display(HTML(html_code))


def check_runtime_environment():
    """
    Description : Check notebook is running on colab or jupyter-notebook
    """
    import os
    colab = False
    if 'google.colab' in str(get_ipython()):
        colab = True
        return 'Colab' , colab
    else:
        return 'Jupyter Notebook' , colab

# Check the runtime environment
runtime_environment,colab_running  = check_runtime_environment()

# print("Notebook is running in", runtime_environment)






<img src="https://aisdocs.blob.core.windows.net/reference/Workflow Images/wk1.png"/>

## 2.0 Below code cell/block performs Step 1 to 4 with following these sequential steps :
**STEP 1 : Installing libraries:** The required libraries will be installed to provide necessary functionalities for the task at hand.

In [None]:
# #@title Show/Hide Code
# '''
# Description: user input about dataset information, for eg: number of classes, input_shape
# '''

print("Starting installing aishield library")
!pip install aishield
print("Installed all the prerequisites packages")
!pip install tqdm==4.61.1
!pip install requests==2.28.0

import os
import requests
import zipfile
import tqdm
import shutil
import time
from time import sleep
import json


import aishield as ais


def make_directory(directory):
    """
    Create directory

    Parameters
    ----------
    directorys : list containing the directory's path to create

    Returns
    -------
    None.

    """
    for d in directory:
        if os.path.isdir(d):
            print("directory {} already exist".format(d))
        if os.path.isdir(d)==False:
            os.mkdir(path=d)
            print("directory {} created successfully".format(d))

def delete_directory(directorys):
    """
    Delete directory

    Parameters
    ----------
    directorys : list containing the directory's path to delete along with all the files

    Returns
    -------
    None.

    """
    if len(directorys)>=1:
        for d in directorys:
            if os.path.isdir(d):
                try:
                    if os.path.isfile(d):
                        os.remove(path=d)
                    else:
                        shutil.rmtree(path=d)
                        print("Removed: {}".format(d))
                except:
                    print("Failed to removed: {}".format(d))
            else:
                print("Failed to removed: {}".format(d))


def create_folders():
    '''
        Descrption: this will remove(if present previously) and create folders needed
                    to store the data , model and label for ease access

        input_parameters: None
        return_parameters: returns the path of the data , model , label , report, zip directory
    '''

    report_path=os.path.join(os.getcwd(),"reports")

    #deleting previously generated folders
    delete_directory(directorys=[report_path])

    #creating folders
    make_directory([report_path])

    return report_path


if __name__ == "__main__":

    report_path = create_folders()


<img src="https://aisdocs.blob.core.windows.net/reference/Workflow Images/wk2.png"/>

## 3.0 Vulnerability Analysis by AIShield

**STEP 2  :You can move to trigger the AIShield analysis by following steps given below. If you want to preview the generated artifact by vulnerability analysis like the report and dashboard, please find below a few samples of same, all without having to trigger an actual analysis.**

By examining these Reports, we can gain a comprehensive understanding of the upcoming Vulnerability Analysis conducted by AIShield, which will enable us to prepare more effectively.


<img src="https://aisdocs.blob.core.windows.net/reference/dashboard_images/Dashboard.png"/>

<img src="https://aisdocs.blob.core.windows.net/reference/dashboard_images/supply_chain_dashboard.png"/>

In [None]:
hosted_pdf_url = 'https://aisdocs.blob.core.windows.net/reference/Reports/supply-chain/VulnerabilityReport.pdf'
display_pdf( output_filename = "vul_sample",runtime_environment = runtime_environment,pdf_path =hosted_pdf_url)


#### Let's proceed with the step-by-step process of calling and integrating AIShield for Vulnerability Analysis:

1. **Initialize AIShield:** Ensure that AIShield is properly installed and its dependencies are set up. Also, initialize AIShield to prepare for the analysis.

2. **Model Registration:** Register the specific task pair (e.g. image_classification) and analysis type (e.g. supply-chain) to perform the vulnerability analysis accurately.

3. **Upload Artifacts:** Select the necessary file for scanning

4. **Model Analysis:** Trigger the model analysis API to initiate the vulnerability analysis assessment. AIShield will perform the analysis using the uploaded artifacts.

5. **Monitor Analysis Status:** Keep track of the analysis progress and patiently wait for AIShield to generate the vulnerability assessment results. This may take some time depending on the complexity of the analysis.

6. **Download Reports & Artifacts:** Once the analysis is complete, access and download the vulnerability analysis reports and any additional artifacts generated by AIShield. Analyze these reports to gain insights into potential vulnerabilities in your system or application.

By following instructions, you can effectively call and integrate AIShield for Vulnerability Analysis.

<img src="https://aisdocs.blob.core.windows.net/reference/Workflow Images/aw.png"/>

### 3.1 Initialize AIShield

In [None]:
"""
Description : AIShield URL , subscription key and orgid
              Initialize the
"""
base_url = "https://api.aws.boschaishield.com/prod"
url=base_url+"/api/ais/v1.5"
org_id = 'xxxx' #<<Replace xxxx with Org_id mentioned in welcome mail after AIShield Subscription>>

"""
Description: Initialize the AIShield API
"""
pypi = False
if pypi:
    #if pypi then , x-api-key generation is taken care from pypi side using given org_id
    aishield_client = ais.AIShieldApi(api_url=url,org_id=org_id)

else:

    #if using AIShield API then , need an x-api-key for accessing AIShield registration / Analysis APIs
    # below function will get insight of how to generate the x-api-key
    def get_aws_api_key(url, org_id):

        """
        Description: to get the x_api_key
        """
        url = url+"/get_aws_api_key"
        headers = {'Org-Id': org_id}
        payload = {}

        x_api_key_request = requests.request("GET", url, headers=headers, data=payload)
        status_code = x_api_key_request.status_code
        x_api_key_request = json.loads(x_api_key_request.text)

        if status_code == 200:
            x_api_key = x_api_key_request['x_api_key']
            return x_api_key

        else:
            print(x_api_key_request)
            return None

    #generate the x-api-key
    x_api_key = get_aws_api_key(url, org_id)
    print("x_api_key is :",x_api_key)

    headers={'Cache-Control': 'no-cache',
    'x-api-key': x_api_key,
    'Org-Id' : org_id
    }

<img src="https://aisdocs.blob.core.windows.net/reference/Workflow Images/aw1.png"/>



### 3.2 Supply chain Upload files (model and notebook) directly in .zip format

In [None]:
"""
  Description: call Model generate api to get unique model it and url to upload data

"""
model_generate_url = url + "/generate_model_id"

try:
    new_request = requests.request(method="POST", url=model_generate_url, headers=headers)
    new_request = json.loads(new_request.text)
    model_id = new_request['model_id']
    upload_obj = new_request
    print('model_id: ', model_id)

except Exception as e:
    print(new_request, str(e))

In [None]:
"""
Description: Define to_clone for user to provide option to clone a repository from huggingface or github
            or can directly give the path of model or notebook in zip format for performing supply chain scanning

"""
to_clone=True # True if directly giving a link to Huggingface,Github or S3 , False is uploading a zip directly

In [None]:
def download_zip(artifact_url, dest_dir):
    """
    Downloads a ZIP file from the specified URL and saves it to the destination directory.

    :param artifact_url: str: URL of the ZIP file to download
    :param dest_dir: str: Path to the directory where the ZIP file should be downloaded
    :return: str: Path to the downloaded ZIP file
    """
    try:
        # Create the directory if it doesn't exist
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)
            print(f"Created directory: {dest_dir}")

        # Download the ZIP file
        response = requests.get(artifact_url, stream=True)
        response.raise_for_status()  # Raise an error for HTTP errors
        zip_path = os.path.join(dest_dir, "mnist_model.zip")

        with open(zip_path, "wb") as file:
            for chunk in response.iter_content(chunk_size=8192):
                file.write(chunk)
        print(f"Downloaded ZIP file to: {zip_path}")

        return zip_path

    except Exception as e:
        print(f"An error occurred: {e}")
        return None

In [None]:
"""
Description: Upload file or notebook in zip format

"""
if to_clone==True:  # Handle direct upload scenario

  # If the user wants to upload the file manually, specify the artifact path here
  # Uncomment and update the line below with the full path of the manually uploaded ZIP file
  artifact_path = "xxxx"  # Replace xxxx with Full path of the ZIP file uploaded manually

  file_upload=True

  if not os.path.isfile(artifact_path):
      file_upload = True

  if pypi:
      print("Currently pypi is not available for direct upload")
  else:
      def upload_file(upload_obj, file_path):

        url = upload_obj['artifact_path']['url']
        request_payload = upload_obj['artifact_path']['fields']
        files=[
                  ('file',(os.path.basename(file_path),open(file_path,'rb'),'application/zip')) # Use file_path instead of artifact_path
                ]

        headers = {}
        new_request = requests.request("POST", url, headers=headers, data=request_payload, files=files)
        status_cd = new_request.status_code
        if status_cd == 204:
            status = 'upload sucessful'
        else:
            status = 'upload failed'
        return status

      """
  Description: Hit AIShield File Upload API
  """
  if file_upload:
    file_upload_status = upload_file(upload_obj, artifact_path) # Pass artifact_path to upload_file function
    print('files_upload_status: ', file_upload_status)
    supply_chain_url = url + "/supplychain"
    payload={
        "model_id":model_id,
        "repo_type":"upload"
             }

    new_request = requests.request(method="POST", url=supply_chain_url, headers=headers, json=payload)
    new_request = json.loads(new_request.text)
    for k, v in new_request.items():
      print("* {} : {}".format(k,v))

    job_id=new_request['job_id']
  else:
    print("file is not present at file path")

<img src="https://aisdocs.blob.core.windows.net/reference/Workflow Images/aw2.png"/>

### 3.3 Clone Repository for scanning

In [None]:
if to_clone == False:  # Set to False for direct uploads or S3 use

  repo_type = "xxxx"  # Replace xxxx with either keywords github or huggingface or s3_bucket
  repo_url = "xxxx"  # Enter your repository URL to scan
  branch_name = "main"  # Default branch (main or master or custom)
  depth = 1  # Default depth

  # Construct payload
  payload = {
  "repo_type": repo_type,
  "repo_url": repo_url,
  "branch_name": branch_name,
  "depth": depth
  }

  # Case: Using S3 # Uncomment below lines
  # bucket_name = "<your_bucket_name>"
  # region = "<your_region>"
  # aws_access_key_id = "<your_aws_access_key>"
  # aws_secret_access_key = "<your_aws_secret_key>"

  # Construct payload for S3 # Uncomment below lines
  # payload = {
  # "repo_type": repo_type,
  # "bucket_name": bucket_name,
  # "region": region,
  # "aws_access_key_id": aws_access_key_id,
  #  "aws_secret_access_key": aws_secret_access_key
  # }

  supply_chain_url = url + "/supplychain"
  new_request = requests.request(method="POST", url=supply_chain_url, headers=headers, json=payload)
  new_request = json.loads(new_request.text)
  for k, v in new_request.items():
    print("* {} : {}".format(k,v))
  job_id=new_request['job_id']



<img src="https://aisdocs.blob.core.windows.net/reference/Workflow Images/aw3.png"/>

### 3.6 Monitor Analysis Status

In [None]:
"""
Description: Fetch Job status using Job ID
"""
# to get the status of job, just rerun again until job got successfull
if pypi:
    status = aishield_client.job_status (job_id = job_id)
    print('job status ', status)

else:
    def monitor_api_progress(new_job_id):
        job_status_url = url + "/job_status_detailed?job_id=" + new_job_id

        # status dictionary
        status_dictionary = {
            "SupplyChainStatus":'na'
        }
        counts = [0] * len(status_dictionary)
        failed_api_hit_count = 0
        while True:
            time.sleep(15)
            try:
                job_status_response = requests.request("GET", job_status_url, params={},
                                                       headers=headers)

                job_status_payload = json.loads(job_status_response.text)
                failing_key = 'SupplyChainStatus'
                for i, key in enumerate(status_dictionary.keys()):
                    if status_dictionary[key] == 'na':
                        if job_status_payload[key] == 'inprogress' and status_dictionary[key] == 'na':
                            status_dictionary[key] = job_status_payload[key]
                            print(str(key), ":", status_dictionary[key])

                        elif job_status_payload[key] == 'completed' or job_status_payload[key] == 'passed':
                            status_dictionary[key] = job_status_payload[key]
                            counts[i] += 1
                            print(str(key), ":", status_dictionary[key])

                        if job_status_payload[key] == 'failed':
                            failing_key = key
                            status_dictionary[key] = job_status_payload[key]
                            print(str(key), ":", status_dictionary[key])

                    elif job_status_payload[key] == 'completed' or job_status_payload[key] == 'passed':
                        status_dictionary[key] = job_status_payload[key]
                        if counts[i] < 1:
                            print(str(key), ":", status_dictionary[key])
                        counts[i] += 1

                    else:
                        if job_status_payload[key] == 'failed':
                            failing_key = key
                            status_dictionary[key] = job_status_payload[key]
                            print(str(key), ":", status_dictionary[key])

                if job_status_payload[failing_key] == 'failed':
                    break

                if status_dictionary['SupplyChainStatus'] == 'passed' or status_dictionary[
                    'SupplyChainStatus'] == 'completed':
                    print("\n job success")
                    break
            except Exception as e:
                failed_api_hit_count += 1
                print("Error {}. trying {} ...".format(str(e), failed_api_hit_count))
                if failed_api_hit_count >= 3:
                    break
        return status_dictionary

    status_dictionary = monitor_api_progress(new_job_id=job_id)

<img src="https://aisdocs.blob.core.windows.net/reference/Workflow Images/aw5.png"/>

### 3.7 Download Reports and artifacts

In [None]:
report_path = "reports/"
status ="success"

In [None]:
def download_artifact(job_id, report_path, report_type='Vulnerability', file_format=0):
    """
    job_id: job_id  received after successful api call
    report_type: report to be downloaded
    file_format: change file_format to : 0- all report in zip
                        1- report in .txt
                        2- report in .pdf
                        3- report in .json
                        4- report in .xml
    """
    print("received report_type : {} and file format is: {}".format(report_type, file_format))
    report_url = url + "/" + "get_report?job_id=" + str(
        job_id) + "&report_type=" + report_type + "&file_format=" + str(file_format)

    headers1 = headers
    headers1["content-type"] = "application/zip"

    response = requests.request("GET", report_url, params={}, headers=headers1)

    file_path = None
    if file_format == 0 or file_format == "Attack_samples":
        file_path=os.path.join(report_path, report_type + ".zip")
        with open(file_path, 'wb') as f:
            f.write(response.content)

    elif file_format == 1:
        file_path=os.path.join(report_path, report_type + ".txt")
        with open(file_path, 'wb') as f:
            f.write(response.content)

    elif file_format == 2:
        file_path=os.path.join(report_path, report_type + ".pdf")
        with open(file_path, 'wb') as f:
            f.write(response.content)

    elif file_format == 3:
        file_path=os.path.join(report_path, report_type + ".json")
        with open(file_path, 'wb') as f:
            f.write(response.content)

    elif file_format == 4:
        file_path=os.path.join(report_path, report_type + ".xml")
        with open(file_path, 'wb') as f:
            f.write(response.content)

    return file_path

In [None]:
"""
Description: Download the Vulnerability Reports
"""
import os
if pypi:
    if status == "success":
        output_conf = ais.OutputConf(report_type=ais.get_type("report", "vulnerability"),
                                     file_format=ais.get_type("file_format", "pdf"),
                                     save_folder_path=report_path)

        vul_report = aishield_client.save_job_report(job_id=job_id, output_config=output_conf)

else:

    vul_report = download_artifact(job_id=job_id, report_path= report_path, report_type='Vulnerability', file_format=2)

In [None]:
def get_file_id(report_path):

    file_id = None
    !pip install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib

    # from google.colab import drive
    # drive.mount('/content/drive')
    import os
    from googleapiclient.discovery import build
    from googleapiclient.http import MediaFileUpload

    # Authenticate and authorize access to the Google Drive API
    from google.colab import auth
    auth.authenticate_user()

    # Build the Drive API service
    drive_service = build('drive', 'v3')

    # Path to the PDF file in your local drive
#         local_pdf_path = '/content/drive/MyDrive/Vulnerability.pdf'

    # Upload the PDF file to Google Drive
    pdf_file_name = os.path.basename(report_path)
    file_metadata = {'name': pdf_file_name}
    media = MediaFileUpload(report_path, mimetype='application/pdf')
    file = drive_service.files().create(body=file_metadata, media_body=media, fields='id').execute()

    # Set sharing permissions to "Anyone with the link"
    file_id = file.get('id')
    drive_service.permissions().create(
        fileId=file_id,
        body={'role': 'reader', 'type': 'anyone'}
    ).execute()

    return file_id

In [None]:
"""
Description: Displaying the current Vulnerability Assement report
"""

if ("colab" in runtime_environment.lower()):
    file_id = get_file_id(report_path = vul_report)
    display_pdf(output_filename = "vul_"+job_id[-10:],runtime_environment=runtime_environment,file_id=file_id)
else:
    display_pdf(output_filename = "vul_"+job_id[-10:],runtime_environment=runtime_environment, pdf_path =os.path.join(report_path, os.path.basename(vul_report)))

<img src="https://aisdocs.blob.core.windows.net/reference/Workflow Images/aw6.png"/>