In [1]:
!pip install praw mlflow kfp-kubernetes==2.14.0 -q


In [2]:
import kfp
import os
from kfp.v2.dsl import importer, Metrics
from typing import NamedTuple, List
from kfp.dsl import Input, component, pipeline
from kfp.dsl import OutputPath, Artifact
from kfp import dsl
from kfp.dsl import Dataset, Output, HTML

client = kfp.Client()

MLFLOW_RUN_NAME = "reddit"
MLFLOW_MODEL_NAME = "reddit-transformer"
client_id=os.getenv('REDDIT_CLIENT_ID')
client_secret=os.getenv('REDDIT_CLIENT_PW')
# Optional: For authenticated requests
username=os.getenv('REDDIT_USER')
password=os.getenv('REDDIT_PW')
storage_uri = os.getenv("REDDIT_DB_URI")
mlflow_tracking_uri = os.getenv('MLFLOW_TRACKING_URI')
mlflow_s3_endpoint_url = os.getenv('MLFLOW_S3_ENDPOINT_URL')
aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')

  from kfp.v2.dsl import importer, Metrics


# Downloading Data
----

This component downloads data from reddit and saves them to the datastore for down stream tasks. 

It is ran daily to collect the daily limit

In [3]:

@component(
    base_image="python:3.11",
    packages_to_install=["praw", "pandas"]
)
def download_dataset( ) -> None:
    import praw
    from pathlib import Path
    import json
    import os
    import pandas as pd
    from datetime import datetime
    reddit = praw.Reddit(
        client_id=os.getenv('REDDIT_CLIENT_ID'),
        client_secret=os.getenv('REDDIT_CLIENT_PW'),
        user_agent="YOUR_USER_AGENT",
        # Optional: For authenticated requests
        username=os.getenv('REDDIT_USER'),
        password=os.getenv('REDDIT_PW')
    )
    subreddit_names = [
        "funny",
        "AskReddit",
        "gaming",
        "worldnews",
        "todayilearned",
        "Music",
        "aww",
        "movies",
        "memes",
        "science"
    ]
    data_path = Path("/data")
    submissions = []
    
    for subreddit_name in subreddit_names:
        print(subreddit_name)
        subreddit = reddit.subreddit(subreddit_name)
        for submission in subreddit.top(time_filter="day", limit=10):
        
            most_upvoted_comment = None
            highest_score = -1
            
            for comment in submission.comments.list():
                if not hasattr(comment, "author") or not comment.author:  # Skip deleted comments
                    continue
                if comment.score > highest_score:
                    highest_score = comment.score
                    most_upvoted_comment = comment
            text = submission.selftext_html
            if most_upvoted_comment is not None:
                comment_text = most_upvoted_comment.body.strip()
                comment_score = most_upvoted_comment.score
            else:
                comment_text = ""
                comment_score = 0
            url = submission.url
            title = submission.title
            id  = submission.id
            submission_data = {
                "id": id,
                "title": title,
                "url": url,
                "text": text,
                "top_comment": comment_text,
                "comment_score": comment_score,
                "date_added": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            }
            submissions.append(submission_data)
    new_data = pd.DataFrame(submissions)
    data_file = data_path / "reddit.csv"
    if data_file.exists():
        existing_data = pd.read_csv(data_path / "reddit.csv")
        already_seen_mask = ~new_data['id'].isin(existing_data['id'])
        new_data = new_data[already_seen_mask]
        
        print(f"New Data: {len(new_data)}")
        print(f"Existing Data: {len(existing_data)}")
        pd.concat([existing_data, new_data]).to_csv(data_file)
    else:
        new_data.to_csv(data_file)

# Test Model
----

Test if a model's performance has degraded on the last "test_window" days of data

In [5]:

@dsl.component
def print_comp(text: str):
    print(text)
    
@component(
    base_image="wallies/python-cuda:3.11-cuda12.2-runtime",
    packages_to_install=["ajperry_pipeline>=0.1.15", "torch==2.3.0", "transformers","mlflow", "pyarrow", "boto3", "torchtext==0.18.0"]
)
def test_model(
    batch_size: int,
    num_epochs: int,
    lr: float, 
    seq_len: int,
    d_model: int, 
    data_folder: str,
    model_folder: str, 
    model_basename: str, 
    tokenizer_file: str, 
    experiment_name: str, 
    num_examples: int, 
    verbose: bool,
    test_window: int
) -> bool:
    from ajperry_pipeline.ml.utils.reddit import test
    config = {
        "batch_size": batch_size,
        "num_epochs": num_epochs,
        "lr": lr,
        "seq_len": seq_len,
        "d_model": d_model,
        "data_folder": data_folder,
        "model_folder": model_folder,
        "model_basename": model_basename,
        "tokenizer_file": tokenizer_file,
        "experiment_name": experiment_name,
        "num_examples": num_examples,
        "verbose": verbose,
        "test_window": test_window
    }
    return test(config)


# Train Model
----

Train a model using GPU. A customizable number of epochs, learning rate, and 

In [6]:



@component(
    base_image="wallies/python-cuda:3.11-cuda12.2-runtime",
    packages_to_install=["ajperry_pipeline>=0.1.15", "torch==2.3.0", "transformers","mlflow", "pyarrow", "boto3", "torchtext==0.18.0"]
)
def train_reddit_model(
    batch_size: int,
    num_epochs: int,
    lr: float, 
    seq_len: int,
    d_model: int, 
    data_folder: str,
    model_folder: str, 
    model_basename: str, 
    tokenizer_file: str, 
    experiment_name: str, 
    num_examples: int, 
    verbose: bool,
    finetune: str
):
    from ajperry_pipeline.ml.utils.reddit import train
    config = {
        "batch_size": batch_size,
        "num_epochs": num_epochs,
        "lr": lr,
        "seq_len": seq_len,
        "d_model": d_model,
        "data_folder": data_folder,
        "model_folder": model_folder,
        "model_basename": model_basename,
        "tokenizer_file": tokenizer_file,
        "experiment_name": experiment_name,
        "num_examples": num_examples,
        "verbose": verbose,
        "finetune": finetune
    }
    train(config)


# Define Pipeline
----

A pipeline that downloads data and then trains a model based off the top performer

In [7]:

@pipeline(name='download-reddit')
def download_preprocess_train_pipeline(
    batch_size: int=8,
    num_epochs: int=400,
    lr: float=0.0001, 
    seq_len: int=560,
    d_model: int=512, 
    data_folder: str="/data",
    model_folder: str=".", 
    model_basename: str="tmodel", 
    tokenizer_file: str="", 
    experiment_name: str="reddit", 
    finetune: str="",
    num_examples: int=5, 
    verbose: bool=False,
    test_window: int=2
):
    import os
    from kfp import kubernetes
    # DOWNLOAD
    pvc1 = kubernetes.CreatePVC(
        # can also use pvc_name instead of pvc_name_suffix to use a pre-existing PVC
        pvc_name='reddit-pvc',
        access_modes=['ReadWriteOnce'],
        size='5Gi',
        storage_class_name='microk8s-hostpath',
    )
    
    download_task = download_dataset(
    ).set_env_variable(name='REDDIT_CLIENT_ID', value=client_id) \
    .set_env_variable(name='REDDIT_CLIENT_PW', value=client_secret) \
    .set_env_variable(name='REDDIT_USER', value=username) \
    .set_env_variable(name='REDDIT_PW', value=password) \
    .set_env_variable(name='REDDIT_DB_URI', value=storage_uri)
    download_task.set_caching_options(enable_caching=False)
    kubernetes.mount_pvc(
        download_task,
        pvc_name=pvc1.outputs['name'],
        mount_path='/data',
    )
    test_task = test_model(
        batch_size=batch_size,
        num_epochs=num_epochs,
        lr=lr, 
        seq_len=seq_len,
        d_model=d_model, 
        data_folder=data_folder,
        model_folder=model_folder, 
        model_basename=model_basename, 
        tokenizer_file="",
        experiment_name=experiment_name, 
        num_examples=num_examples, 
        verbose=verbose,
        test_window=test_window
    ).set_env_variable(name='HF_TOKEN2', value=os.getenv('HF_TOKEN2')) \
    .set_env_variable(name='MLFLOW_TRACKING_URI', value=mlflow_tracking_uri)\
    .set_env_variable(name='MLFLOW_S3_ENDPOINT_URL', value=mlflow_s3_endpoint_url)\
    .set_env_variable(name='AWS_ACCESS_KEY_ID', value=aws_access_key_id)\
    .set_env_variable(name='AWS_SECRET_ACCESS_KEY', value=aws_secret_access_key)\
    .set_gpu_limit(1) \
    .after(download_task)
    test_task.set_caching_options(enable_caching=False)
    kubernetes.mount_pvc(
        test_task,
        pvc_name=pvc1.outputs['name'],
        mount_path='/data',
    )
    with dsl.If(test_task.output == True):
        pvc2 = kubernetes.CreatePVC(
            # can also use pvc_name instead of pvc_name_suffix to use a pre-existing PVC
            pvc_name='reddit-pvc',
            access_modes=['ReadWriteOnce'],
            size='5Gi',
            storage_class_name='microk8s-hostpath',
        )
        training_task = train_reddit_model(
            batch_size=batch_size,
            num_epochs=num_epochs,
            lr=lr, 
            seq_len=seq_len,
            d_model=d_model, 
            data_folder=data_folder,
            model_folder=model_folder, 
            model_basename=model_basename, 
            tokenizer_file="",
            experiment_name=experiment_name, 
            num_examples=num_examples, 
            verbose=verbose,
            finetune=finetune
        ).set_env_variable(name='HF_TOKEN2', value=os.getenv('HF_TOKEN2')) \
        .set_env_variable(name='MLFLOW_TRACKING_URI', value=mlflow_tracking_uri)\
        .set_env_variable(name='MLFLOW_S3_ENDPOINT_URL', value=mlflow_s3_endpoint_url)\
        .set_env_variable(name='AWS_ACCESS_KEY_ID', value=aws_access_key_id)\
        .set_env_variable(name='AWS_SECRET_ACCESS_KEY', value=aws_secret_access_key)\
        .set_gpu_limit(1)
        training_task.set_caching_options(enable_caching=False)
        kubernetes.mount_pvc(
            training_task,
            pvc_name=pvc2.outputs['name'],
            mount_path='/data',
        )


## Manually run pipeline

---

Useful for testing the downloading of data

In [8]:
run = client.create_run_from_pipeline_func(download_preprocess_train_pipeline, arguments={
    "batch_size": 8,
    "num_epochs": 500,
    "lr": 1e-4,
    "seq_len": 560,
    "d_model": 512,
    "model_folder": "weights",
    "data_folder": "/data",
    "model_basename": "tmodel_",
    # "optimizer_folder": "optimizer",
    "experiment_name": "tmodel",
    "tokenizer_file": "",
    "num_examples": 5,
    "finetune": "",
    "verbose": False,
    "test_window": 2
})

## Register pipeline

---

Used to schedule the task to run daily

In [19]:
kfp.compiler.Compiler().compile(download_preprocess_train_pipeline, package_path='pipeline.yaml')
pipeline_info = client.upload_pipeline("pipeline.yaml", 'download-train-reddit')

In [16]:
pipeline_version = [ver for ver in client.list_pipeline_versions(pipeline_info.pipeline_id).pipeline_versions if ver.display_name=='download-train-reddit'][0]
pipeline_version_id = pipeline_version.pipeline_version_id

In [17]:
experiment = [exp for exp in client.list_experiments().experiments if exp.display_name=="default"][0]
experiment_id = experiment.experiment_id

In [138]:
recurring_run = client.create_recurring_run(
    experiment_id=experiment_id,
    pipeline_id=pipeline_info.pipeline_id,
    version_id=pipeline_version_id,
    cron_expression="0 0 * * *",
    job_name="Daily Training",
    params={
        "num_epochs":100,
        "batch_size":4,
        "learning_rate":0.001,
        "finetune": True
    }
)