In [1]:
import datetime
import os

import pandas as pd 

import constants

from azure.ai.ml import command, Input, Output
from azure.ai.ml import MLClient
from azure.ai.ml.entities import Environment
from azure.ai.ml.constants import AssetTypes, InputOutputModes
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential



In [2]:
try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    credential = InteractiveBrowserCredential()

# Get a handle to the workspace
ml_client = MLClient(
    credential=credential,
    subscription_id=constants.SUBSCRIPTION_ID,
    resource_group_name=constants.RESOURCE_GROUP_NAME,
    workspace_name=constants.WORKSPACE_NAME,
)

In [3]:
%%writefile components/prep.py 

from azure.storage.blob import BlobServiceClient

import argparse

def main():

    parser = argparse.ArgumentParser("prep")
    parser.add_argument("--input_data", type=str, help="Path of prepped data")
    parser.add_argument("--output_data", type=str, help="Path of prepped data")
    args = parser.parse_args()

    # log in to the Blob Service Client
    account_url = "https://mlstorageleo.blob.core.windows.net"
    blob_service_client = BlobServiceClient(account_url, account_key=constants.BLOB_KEY)

    # connect to the container 
    container_client = blob_service_client.get_container_client(container="stock-news-json") 

    # list and download all currently available blobs
    blob_list = container_client.list_blobs()

    # get the timestamp with the current day 
    current_day_timestamp = datetime.datetime.today().timestamp()
    current_day_timestamp = str(current_day_timestamp)[:8] # first 8 digits are the timestamp of the day

    blobs_to_use = [blob.name for blob in blob_list if current_day_timestamp in blob.name]

    # ! the files should not be downloaded in this step. Instead it might make more sense to pass a list with the filenames to the next component
    with open(str(args.output_data)+"blobs_to_use.txt", "w") as f:
        f.write("\n".join(blob for blob in blobs_to_use), f)
    
    
    # for blob in blobs_to_download:
    #     download_file_path = os.path.join(args.prep_data, str(blob))
    #     with open(file=download_file_path, mode="wb") as download_file:
    #         download_file.write(container_client.download_blob(blob).readall())

if __name__ == "__main__":
    main()

Overwriting components/prep.py


In [4]:
%%writefile components/classify.py

import argparse
import json
import os

from transformers import AutoTokenizer, AutoModelForSequenceClassification

parser = argparse.ArgumentParser()
parser.add_argument("--input_data", type=str, help="path or URL to input data")
parser.add_argument("--folder_path", type=str, help="path or URL to output data")
args = parser.parse_args()

# download distilbert model from HuggingFace
tokenizer = AutoTokenizer.from_pretrained("KernAI/stock-news-destilbert")
model = AutoModelForSequenceClassification.from_pretrained("KernAI/stock-news-destilbert")

def main():
      # retriev the list of blobs from the current day - input is a .txt file
      with open(args.input_data, "r") as f:
            blobs_to_use = f.read()


      dir_list = args.folder_path
      for file_name in [file for file in os.listdir(dir_list) if file in blobs_to_use]:
            with open(dir_list + file_name) as json_file:
                  data = json.load(json_file)
            texts = data["texts"]

            sentiments = []
            for text in texts: 
                  tokenized_text = tokenizer(
                        text,
                        truncation=True,
                        is_split_into_words=False,
                        return_tensors="pt"
                  )

                  outputs = model(tokenized_text["input_ids"])
                  outputs_logits = outputs.logits.argmax(1)

                  mapping = {0: 'neutral', 1: 'negative', 2: 'positive'}
                  predicted_label = mapping[int(outputs_logits[0])]
                  sentiments.append(predicted_label)

            # add the sentiments to the data
            data["sentiments"] = sentiments

            # overwrite old files with new files containing the sentiment
            with open(dir_list+file_name, "w") as f:
                  json.dump(data, f)

            # Note: no dedicated output needed here: we'll take the output from the first component again for the next step

if __name__ == "__main__":
      main()


Overwriting components/classify.py


In [5]:
%%writefile components/summarize.py

import argparse
import json
import os

from transformers import PegasusTokenizer, PegasusForConditionalGeneration, TFPegasusForConditionalGeneration

parser = argparse.ArgumentParser()
parser.add_argument("--input_data", type=str, help="path or URL to input data")
parser.add_argument("--folder_path", type=str, help="path or URL to output data")
args = parser.parse_args()

# load the model and the tokenizer
tokenizer = PegasusTokenizer.from_pretrained("human-centered-summarization/financial-summarization-pegasus")
model = PegasusForConditionalGeneration.from_pretrained("human-centered-summarization/financial-summarization-pegasus") 

def main():
      #dir_list = os.listdir(args.input_data)
      dir_list = args.input_data
      for file_name in [file for file in os.listdir(dir_list) if file.endswith('.json')]:
            with open(dir_list + file_name) as json_file:
                  data = json.load(json_file)
            texts = data["texts"]

            summaries = []
            for text in texts: 
                # Tokenize our text
                # If you want to run the code in Tensorflow, please remember to return the particular tensors as simply as using return_tensors = 'tf'
                input_ids = tokenizer(text, return_tensors="pt").input_ids

                # Generate the output (Here, we use beam search but you can also use any other strategy you like)
                output = model.generate(
                    input_ids, 
                    max_length=32, 
                    num_beams=5, 
                    early_stopping=True
                )

                # Finally, we can print the generated summary
                summaries.append(tokenizer.decode(output[0], skip_special_tokens=True))

            # add the sentiments to the data
            data["summaries"] = summaries

            # overwrite old files with new files containing the sentiment
            with open(dir_list+file_name, "w") as f:
                  json.dump(data, f)

if __name__ == "__main__":
      main()


Overwriting components/summarize.py


In [None]:
%%writefile components/store.py

In [6]:
%%writefile dependencies/conda.yml
name: model-env
channels:
  - conda-forge
dependencies:
  - python=3.9
  - pip
  - pip:
    - inference-schema[numpy-support]
    - azure-ai-m
    - transformers
    - sentencepiece
    - pandas
    - numpy

Overwriting dependencies/conda.yml


In [7]:
custom_env_name = "stock-analysis-env"

try:    
    pipeline_job_env = ml_client.environments.get(custom_env_name, version="1.0")

except:
    pipeline_job_env = Environment(
        name=custom_env_name,
        description="Custom environment for stock analysis pipeline",
        conda_file=os.path.join("dependencies", "conda.yml"),
        image="mcr.microsoft.com/azureml/curated/python-sdk-v2:4",
        version="1.0",
    )
    pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)

    print(
        f"Environment with name {pipeline_job_env.name} is registered to workspace, the environment version is {pipeline_job_env.version}"
    )

Environment with name stock-analysis-env is registered to workspace, the environment version is 1.0


In [None]:
data_type = AssetTypes.URI_FILE
mode = InputOutputModes.RO_MOUNT
path = "azureml://datastores/stocknewsjson/stock-news-json"

In [None]:
data_prep_component = command(
    name="data_prep",
    display_name="Data preparation for training",
    description="Loads files from Azure Blob Storage from todays ",
    inputs={
        "input_data": Input(type=data_type, mode=mode),
        "folder_path": Input(type=AssetTypes.URI_FOLDER, path=path)
    },
    outputs={"output_data": Output(type=data_type, mode=mode, path=path)},
    code="./components/prep.py",
    command="python prep.py --input_data ${{inputs.input_data}} --output_data ${{outputs.output_data}}",
    environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
    compute="ava"
)

In [None]:
classify_component = command(
    name="data_prep",
    display_name="Data preparation for training",
    description="Loads data via AlphaVantage API input, preps data and stores to as data asset",
    inputs={
        "input_data": Input(type=data_type, mode=mode, path=path),
        "folder_path": Input(type=AssetTypes.URI_FOLDER, path=path)
    },
    code="./components/classify.py",
    command="python get_data.py --input_data ${{inputs.input_data}} --train_data ${{outputs.train_data}}",
    environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
    compute="ava"
)

In [None]:
summarize_component = command(
    name="data_prep",
    display_name="Data preparation for training",
    description="Loads data via AlphaVantage API input, preps data and stores to as data asset",
    inputs={
        "input_data": Input(type=data_type, mode=mode),
        "folder_path": Input(type=AssetTypes.URI_FOLDER, path=path)
    },
    code="./components/summarize.py",
    command="python get_data.py --input_data ${{inputs.input_data}} --train_data ${{outputs.train_data}}",
    environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
    compute="ava"
)

In [None]:
from azure.ai.ml.dsl import pipeline

@pipeline(compute="ava")
def pipeline_with_non_python_components(input_data):

    data_prep_job = data_prep_component(input_data=input_data)
    classify_job = classify_component(input_data=data_prep_job.outputs.train_data) # feed putput of previous step into the training job
    summarize_job = summarize_component(input_data=data_prep_component.outputs.output_data)

    return {"out": data_prep_job.outputs.train_data}


pipeline_job = pipeline_with_non_python_components(
    input_data=Input(
        path="https://www.alphavantage.co/query?function=TIME_SERIES_DAILY_ADJUSTED&symbol=IBMdatatype=csv&outputsize=full&apikey=SGXL42YQBJ7R7WXL"
        ) # stock data via AlphaVantage
    )

# set pipeline level compute
pipeline_job.settings.default_compute = "ava"