In [1]:
import warnings
warnings.filterwarnings('ignore')

In [2]:
output_dir = "/home/jovyan/stage-f-10-police-shootings/data/out"

In [3]:
def get_data(download_url, out_path):
    import subprocess
    import sys
    import logging
    
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas'])
    
    default_url = "https://storage.googleapis.com/police-shootings/data/shootings.csv"
    url = download_url if download_url else default_url
    
    subprocess.run(["wget", "-O", f"{out_path}/shootings.csv", url])
    
    print("File Downloaded")

    

In [4]:
get_data("", output_dir)

File Downloaded


In [5]:
def prepare_data(out_path):
    import subprocess
    import sys
    
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas', 'scikit-learn', 'imblearn'])
    
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import LabelEncoder
    import imblearn
    from imblearn.over_sampling import SMOTE
    import pandas as pd
    from sklearn.utils import shuffle
    import logging
    import pickle
    
    
    def preprocess(data, out_path, is_train=False):


        data['day'] = pd.DatetimeIndex(data['date']).day
        data['month'] = pd.DatetimeIndex(data['date']).month
        data['year'] = pd.DatetimeIndex(data['date']).year



        cat_cols = ['state', 'arms_category']

        onehot_encoding_columns = ['gender', 'race', 'signs_of_mental_illness', 'manner_of_death', 'body_camera']

        data = pd.get_dummies(data, drop_first=True, columns=onehot_encoding_columns, prefix_sep='-')

        if is_train:

            state_encoder = LabelEncoder()
            ac_encoder = LabelEncoder()

            cat_cols_encoders = [state_encoder, ac_encoder]

            encoders = zip(cat_cols, cat_cols_encoders)

            for column, encoder in encoders:

                data[column] = encoder.fit_transform(data[column])

                with open(f"{out_path}/{column}_encoder.pkl", "wb") as enc:
                    pickle.dump(encoder, enc)



        else:
            encoders_dict = {}
            for col in cat_cols:

                with open(f"{out_path}/{col}_encoder.pkl", "rb") as enc:
                    encoders_dict[f"{col}_encoder"] = pickle.load(enc)


            encoders = zip(cat_cols, encoders_dict.keys())

            for col, encoder in encoders:
                data[col] = encoders_dict[encoder].transform(data[col])


        df_copy = data.copy()

        df_copy = shuffle(df_copy)
        features = df_copy.drop(columns=['name','date','label', 'id', 'day', 'month', 'year',
                                         'armed', 'city', 'threat_level', 'flee' ])
        target = df_copy['label']


        # Oversampling the undersampled labels
        if is_train:
            smote = SMOTE(random_state=0)
            X, y = smote.fit_sample(features, target)

        else:
            X, y = features, target

        # converting ndarray to dataframe
        X = pd.DataFrame(X, columns=features.columns)
        y = pd.Series(y, name=target.name)

        return X, y
    
    
        
    def f(row):
        
        '''
          Function that will be used to create the target column of two classes 1 and 0.
          Where 1 represents the unjustified cases and 0 represents the just ones. 
          '''
        if ((row['threat_level']=='undetermined' or row['threat_level']=='other') and (row['flee']=='Not fleeing')):
            val = 1

        else:
            val = 0
        return val

 

    
    data = pd.read_csv(f'{out_path}/shootings.csv')
    data['label'] = data.apply(f, axis=1)
    train, test = train_test_split(data, test_size=0.2, random_state=100)
    
    trainset = preprocess(train, out_path, is_train=True)
    testset = preprocess(test, out_path)
    
    logging.info(f"Training data count: {trainset[0].shape}")
    logging.info(f"Testing data count: {testset[0].shape}")
    

        
    with open(f"{out_path}/trainset.pkl", "wb") as train:
        pickle.dump(trainset, train)
        
    with open(f"{out_path}/testset.pkl", "wb") as test:
        pickle.dump(testset, test)

In [6]:
prepare_data(output_dir)

In [7]:
def train(out_path, trainset):
    
    import sys
    import subprocess
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas', 'scikit-learn'])
    from sklearn.linear_model import LogisticRegression
    import numpy as np
    import pickle
    import joblib
#     from storage import Storage
#     import io
#     from io import BytesIO
    
#     preprocessed_data = pickle.load(f'{out_path}/trainset.pkl')
    
    with open(f"{out_path}/trainset.pkl", 'rb') as f:
        preprocessed_data = pickle.load(f)
        
    features = preprocessed_data[0]
    targets = preprocessed_data[1]
    
    lrc = LogisticRegression()
    lrc.fit(features, targets)
    
    
    with open(f'{out_path}/export', "wb") as model:
        joblib.dump(lrc, model)

    
    


In [8]:
train(output_dir, 'trainset.pkl')

In [9]:
def test(out_path, testset, model_uri):
    
    import sys
    import subprocess
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas', 'scikit-learn'])
    from sklearn.linear_model import LogisticRegression
    from sklearn.metrics import roc_auc_score
    import numpy as np
    import pickle
    import joblib
    import logging
    
    with open(f"{out_path}/testset.pkl", 'rb') as f:
        preprocessed_data = pickle.load(f)
        
    with open(f"{out_path}/export", 'rb') as f:
        lrc = joblib.load(f)
        
    features = preprocessed_data[0]
    targets = preprocessed_data[1]
    
    lrc_pred = lrc.predict_proba(features)
    auc_score = roc_auc_score(targets, lrc_pred[:,1])
    
    print("AUC Score", auc_score)
    logging.info(f"ROC Score: {auc_score}")

    
    


In [10]:
test(output_dir, 'testset.pkl', '/tmp/export')

AUC Score 0.6088995203922548


In [11]:
import kfp
from kfp import dsl
import kfp.components as comp

In [12]:
download_op = comp.func_to_container_op(get_data, base_image="python:3.7")

prepare_data_op = comp.func_to_container_op(prepare_data, base_image="python:3.7-slim")

train_op = comp.func_to_container_op(train, base_image="python:3.7-slim")

test_op = comp.func_to_container_op(test, base_image="python:3.7-slim")


In [13]:
@dsl.pipeline(
    name="Police Shootings Sales Justification Pipeline",
    description="A Machine Learning Pipeline for determining police shootings justification"
)

def police_shootings_pipeline(
    out_path="/mnt",
    trainset="trainset.pkl",
    testset="testset.pkl",
    model_uri="export",
    download_url="",
    serving_name="ps-server"
    serving_namespace="kubeflow",
    serving_export_dir="gs://police-shootings/export",
    transform_image="gcr.io/kubeflow-292422/police-shootings-processing:latest"
    
):
    
    volume_op = dsl.VolumeOp(
        name="volume",
        resource_name="data-volume",
        size="2Gi",
        modes=dsl.VOLUME_MODE_RWO)
    
    download = download_op(download_url, out_path).add_pvolumes({out_path: volume_op.volume})
    
    prepare_data = prepare_data_op(out_path).add_pvolumes({out_path: download_container.pvolume})
    
    train = train_op(out_path, trainset).add_pvolumes({out_path: prepare_data_container.pvolume})
    
    test = test_op(out_path, testset, model_uri).add_pvolumes({out_path: train_container.pvolume})
    
    kfserving_template = Template(
        """
            {
                  "apiVersion": "serving.kubeflow.org/v1alpha2",
                  "kind": "InferenceService",
                  "metadata": {
                    "labels": {
                      "controller-tools.k8s.io": "1.0"
                    },
                    "name": "$serving_name",
                    "namespace": "$namespace"
                  },
                  "spec": {
                    "default": {
                      "predictor": {
                        "minReplicas": 1,
                        "serviceAccountName": "kf-user",
                        "sklearn": {
                          "storageUri": "$bucket"
                        }
                      },
                      "transformer": {
                        "serviceAccountName": "kf-user",
                        "minReplicas": 1,
                        "custom": {
                          "container": {
                            "image": "$transformer",
                            "name": "user-container",
                            "imagePullPolicy": "Always"
                          }
                        }
                      }
                    }
                  }
        }
        """
    )
    
    kfservingjson = kfserving_template.substitute({'name': str(serving_name),
                                                  'namespace': str(serving_namespace),
                                                  'bucket': str(serving_export_dir),
                                                  'transform': str(transform_image)})
    
    kfservingdeployment = json.loads(kfservingjson)
    
    serve = dsl.ResourceOp(
        name="serve",
        k8s_resource=kfservingdeployment,
        action="apply",
        success_condition=status.url
    )
    
    serve.after(test)
    



In [14]:
pipeline_func = police_shootings_pipeline
experiment_name = 'police-shootings-training'
run_name = f'{pipeline_func.__name__} run'

In [15]:
OUT_PATH = '/mnt',
TRAINSET = 'trainset.pkl',
TESTSET = 'testset.pkl',
MODEL_URI = '/tmp/export',
DOWNLOAD_URL = ""

In [16]:

arguments = {
    "out_path": OUT_PATH,
    "trainset": TRAINSET,
    "testset": TESTSET,
    "model_uri": MODEL_URI,
    "download_url": DOWNLOAD_URL,
}

kfp.compiler.Compiler().compile(pipeline_func, f'{experiment_name}.zip')


In [17]:
client = kfp.Client()

run_result = client.create_run_from_pipeline_func(pipeline_func,
                                                    experiment_name=experiment_name,
                                                    run_name=run_name,
                                                    arguments={}
                                                 )