In [1]:
import pandas as pd
import os

In [2]:
s3_prefix = "s3://aegovan-data/processed_dataset/train_multiclass.json"
s3_output_prefix = "{}_trainingdata/".format("/".join(s3_prefix.split("/")[0:-1]))


In [3]:
s3_output_prefix

's3://aegovan-data/processed_dataset_trainingdata/'

In [4]:
local_temp = "temp"
local_temp_pred_dir = os.path.join( local_temp, "pred_results")
local_temp_wk_dir = os.path.join( local_temp, "wk")

In [5]:
!rm -rf $local_temp
!mkdir -p $local_temp_pred_dir
!mkdir -p $local_temp_wk_dir

In [6]:
import boto3
import glob
from multiprocessing.dummy import Pool as ThreadPool
import argparse
import datetime 
import os


def upload_file(localpath, s3path):
        """
Uploads a file to s3
        :param localpath: The local path
        :param s3path: The s3 path in format s3://mybucket/mydir/mysample.txt
        """

        bucket, key = get_bucketname_key(s3path)

        if key.endswith("/"):
            key = "{}{}".format(key, os.path.basename(localpath))
        
        s3 = boto3.client('s3')
        
        s3.upload_file(localpath, bucket, key)

def get_bucketname_key(uripath):
    assert uripath.startswith("s3://")

    path_without_scheme = uripath[5:]
    bucket_end_index = path_without_scheme.find("/")

    bucket_name = path_without_scheme
    key = "/"
    if bucket_end_index > -1:
        bucket_name = path_without_scheme[0:bucket_end_index]
        key = path_without_scheme[bucket_end_index + 1:]

    return bucket_name, key


def download_file(s3path, local_dir):
    bucket, key = get_bucketname_key(s3path)
    
    s3 = boto3.client('s3')
    
    local_file = os.path.join(local_dir, s3path.split("/")[-1])
    

    s3.download_file(bucket, key, local_file)
    
def download_object(s3path):
    bucket, key = get_bucketname_key(s3path)
    
    s3 = boto3.client('s3')    

    s3_response_object = s3.get_object(Bucket=bucket, Key=key)
    object_content = s3_response_object['Body'].read()
    
    return len(object_content)



def list_files(s3path_prefix):
    assert s3path_prefix.startswith("s3://")
    
    bucket, key = get_bucketname_key(s3path_prefix)
    
   
   
    s3 = boto3.resource('s3')
    
    bucket = s3.Bucket(name=bucket)

    return ( (o.bucket_name, o.key) for o in bucket.objects.filter(Prefix=key))





def upload_files(local_dir, s3_prefix, num_threads=20):    
    input_tuples = ( (f,  s3_prefix) for f in glob.glob("{}/*".format(local_dir)))
    
    with ThreadPool(num_threads) as pool:
        pool.starmap(uploadfile, input_tuples)
    


def download_files(s3_prefix, local_dir, num_threads=20):    
    input_tuples = ( ("s3://{}/{}".format(s3_bucket,s3_key),  local_dir) for s3_bucket, s3_key in list_files(s3_prefix))
    
    with ThreadPool(num_threads) as pool:
        results = pool.starmap(download_file, input_tuples)
        
        

def download_objects(s3_prefix, num_threads=20):    
    s3_files = ( "s3://{}/{}".format(s3_bucket,s3_key) for s3_bucket, s3_key in list_files(s3_prefix))
    
    with ThreadPool(num_threads) as pool:
        results = pool.map(download_object, s3_files)
        
    return sum(results)/1024
        

def get_directory_size(start_path):
    total_size = 0
    for dirpath, dirnames, filenames in os.walk(start_path):
        for f in filenames:
            fp = os.path.join(dirpath, f)
            # skip if it is symbolic link
            if not os.path.islink(fp):
                total_size += os.path.getsize(fp)
    return total_size

def get_s3file_size(bucket, key):
    s3 = boto3.client('s3')
    response = s3.head_object(Bucket=bucket, Key=key)
    size = response['ContentLength']
    return size
    
def download_files_min_files(s3_prefix, local_dir, min_file_size=310, num_threads=20):    
    input_tuples = ( ("s3://{}/{}".format(s3_bucket,s3_key),  local_dir) for s3_bucket, s3_key in list_files(s3_prefix) if get_s3file_size(s3_bucket, s3_key) > min_file_size )
    
    with ThreadPool(num_threads) as pool:
        results = pool.starmap(download_file, input_tuples)
        


In [7]:
%%time

download_file(s3_prefix, local_temp_pred_dir)

CPU times: user 82.2 ms, sys: 41.2 ms, total: 123 ms
Wall time: 220 ms


In [8]:
!ls -l $local_temp_pred_dir | grep "train_multiclass.json"

-rw-rw-r-- 1 ec2-user ec2-user 7099862 Feb 21 02:58 train_multiclass.json


In [9]:
data_training_full_df = pd.read_json(os.path.join(local_temp_pred_dir, "train_multiclass.json"))

In [10]:
data_training_full_df.head(n=3)

Unnamed: 0,pubmedId,pubmedabstract,annotations,num_unique_gene_normalised_id,num_gene_normalised_id,normalised_abstract,participant1Id,participant2Id,gene_to_uniprot_map,class
0,17197020,"In HEK293 cells, transfected with the Ca2+ cha...","[{'start': '3', 'end': '9', 'name': 'HEK293', ...",4,10,"In HEK293 cells, transfected with the Ca2+ cha...",Q9H1D0,P18031,"{'6714': ['P12931'], '55503': ['Q9H1D0'], '577...",dephosphorylation
1,17053781,"Receptor interacting protein 140 (RIP140), a l...","[{'start': '0', 'end': '32', 'name': 'Receptor...",3,12,"P48552 (P48552), a ligand-dependent corepresso...",Q99873,P48552,"{'8204': ['P48552', 'A8K171'], '7514': ['O1498...",methylation
2,17079228,TAK1 (transforming growth factor beta-activate...,"[{'start': '0', 'end': '4', 'name': 'TAK1', 't...",5,24,O43318 (O43318) is a serine/threonine kinase t...,O00743,O43318,"{'4296': ['Q16584', 'A0A024R5E6'], '5524': ['Q...",dephosphorylation


#### Rename column so they match predictions, so ground truth can be verified

In [11]:
data_training_full_df = data_training_full_df.rename(
    columns={"pubmedabstract" : "abstract"
                                      ,"annotations":"annotations"
                                      ,"class" : "predicted"}
                             
                             ) 


In [12]:
def missing_uniprot(x):
    uniprots =[]
    for i in list(x["gene_id_map"].values()):
        uniprots.extend(i)
        
    
    return x["participant1Id"] not in uniprots or x["participant2Id"] not in uniprots

def gene_id_map(x):
    gene_to_uniprot_map = x["gene_to_uniprot_map"]
    for n, uniprots in x["gene_to_uniprot_map"].items():
        if x["participant1Id"]  in uniprots :
            gene_to_uniprot_map[n]= x["participant1Id"]
        if x["participant2Id"]  in uniprots:
            gene_to_uniprot_map[n]= x["participant2Id"]
        
        
    return gene_to_uniprot_map
                                    


In [13]:
data_training_full_df["gene_id_map"] = data_training_full_df.apply(gene_id_map, axis=1)


In [14]:

                                    
data_training_full_df["missing_uniprot"] = data_training_full_df.apply(missing_uniprot, axis=1)


In [15]:
data_training_full_df.query( "missing_uniprot == True").groupby("predicted")["predicted"].count()

predicted
acetylation             6
dephosphorylation      40
deubiquitination        2
methylation            11
other                1246
phosphorylation       151
ubiquitination          5
Name: predicted, dtype: int64

In [16]:
data_training_full_df.query( "missing_uniprot == False").groupby("predicted")["predicted"].count()

Series([], Name: predicted, dtype: int64)

In [17]:
data_training_full_df.query("participant1Id == participant2Id").groupby("predicted")["predicted"].count()

Series([], Name: predicted, dtype: int64)

In [18]:
samples_subset = data_training_full_df.groupby('predicted', group_keys=False)\
                .apply(lambda x: x.sample(min(len(x), 10),random_state=45))

samples_subset.groupby(["predicted"])["predicted"].count()

predicted
acetylation           6
dephosphorylation    10
deubiquitination      2
methylation          10
other                10
phosphorylation      10
ubiquitination        5
Name: predicted, dtype: int64

In [19]:
data_training_full_df.query("predicted == 'acetylation' and pubmedId=='19407811' ").count()

pubmedId                         1
abstract                         1
annotations                      1
num_unique_gene_normalised_id    1
num_gene_normalised_id           1
normalised_abstract              1
participant1Id                   1
participant2Id                   1
gene_to_uniprot_map              1
predicted                        1
gene_id_map                      1
missing_uniprot                  1
dtype: int64

In [20]:
samples_subset.query("predicted == 'acetylation'")

Unnamed: 0,pubmedId,abstract,annotations,num_unique_gene_normalised_id,num_gene_normalised_id,normalised_abstract,participant1Id,participant2Id,gene_to_uniprot_map,predicted,gene_id_map,missing_uniprot
108,12471024,ClC chloride channels are widely distributed i...,"[{'start': '111', 'end': '120', 'name': 'mamma...",5,13,ClC chloride channels are widely distributed i...,P13569,Q5T2W1,"{'1182': ['P51790', 'B3KXK0'], '57120': ['Q9HD...",acetylation,"{'1182': ['P51790', 'B3KXK0'], '57120': ['Q9HD...",True
103,22771473,Eukaryotic translation initiation factor 5A (e...,"[{'start': '0', 'end': '43', 'name': 'Eukaryot...",4,11,P63241 (P63241) is a protein subject to hypusi...,Q92831,P63241,"{'8850': 'Q92831', '10013': ['Q9UBN7', 'Q9BRX7...",acetylation,"{'8850': 'Q92831', '10013': ['Q9UBN7', 'Q9BRX7...",True
157,21157427,SRSF2 is a serine/arginine-rich protein belong...,"[{'start': '0', 'end': '5', 'name': 'SRSF2', '...",6,13,Q01130 is a serine/arginine-rich protein belon...,Q01130,Q92993,"{'6732': ['Q96SB4'], '10524': 'Q92993', '6733'...",acetylation,"{'6732': ['Q96SB4'], '10524': 'Q92993', '6733'...",True
44,26829474,Faithful segregation of chromosomes in mammali...,"[{'start': '39', 'end': '48', 'name': 'mammali...",4,14,Faithful segregation of chromosomes in mammali...,Q96GD4,Q92993,"{'9212': 'Q96GD4', '10524': 'Q92993', '983': [...",acetylation,"{'9212': 'Q96GD4', '10524': 'Q92993', '983': [...",True
159,23441852,The regulation of gene repression by corepress...,"[{'start': '214', 'end': '219', 'name': 'Alien...",4,11,The regulation of gene repression by corepress...,Q09472,P61201,"{'9318': 'P61201', '2033': 'Q09472', '1387;203...",acetylation,"{'9318': 'P61201', '2033': 'Q09472', '1387;203...",True
128,19407811,Regulation of BubR1 is central to the control ...,"[{'start': '14', 'end': '19', 'name': 'BubR1',...",3,17,Regulation of O60566 is central to the control...,O60566,Q92831,"{'701': 'O60566', '991': ['Q12834'], '8850': '...",acetylation,"{'701': 'O60566', '991': ['Q12834'], '8850': '...",True


In [21]:
import json
import json
def create_manifest_file(df, outfile):
    items = df.to_dict(orient='records' )
    with open(outfile , "w") as f:
        for item in items:
            # Write without new lines
            item_m  = {}
            item_m["source"] = json.dumps(item)
            f.write(json.dumps(item_m).replace("\n", "\t"))
            f.write("\n")

In [22]:
 "{}/".format(s3_output_prefix.rstrip("/"))

's3://aegovan-data/processed_dataset_trainingdata/'

In [23]:
samples_subset_file = "predictions_sample_subset.json"
samples_subset.to_json(samples_subset_file, orient='records')
upload_file(samples_subset_file, "{}/".format(s3_output_prefix.rstrip("/")))


manifest_file = "predictions_sample_subset.mainfest"
create_manifest_file(samples_subset, manifest_file)
upload_file(manifest_file, "{}/".format(s3_output_prefix.rstrip("/")))

# Create one manifest file per interaction type
s3_manifests = []
for i in filter(lambda x: x.lower() != 'other', list(samples_subset["predicted"].unique())):
    manifest_file = "training_sample_subset_{}.mainfest".format(i)
    create_manifest_file( samples_subset.query("predicted == '{}'".format(i)), manifest_file)
    s3_manifest_file = "{}/{}".format(s3_output_prefix.rstrip("/"), manifest_file.split(os.path.sep)[-1])
    upload_file(manifest_file, s3_manifest_file)
    s3_manifests.append(s3_manifest_file)


### Create sagemaker ground truth labelling job

In [24]:
import boto3
import sagemaker

In [25]:
from datetime import datetime

def create_groundtruth_labelling_job(s3_manifest, s3_gt_output, s3_template, pre_lambda, post_lambda, role, workforce_name, job_name, label_attribute_name="class", workforce_type= "private-crowd" ):
    client = boto3.client('sagemaker')
    
    sagemaker_session = sagemaker.Session()
    account_id =  boto3.client('sts').get_caller_identity().get('Account')
    region = boto3.session.Session().region_name
    
    workforce_arn = "arn:aws:sagemaker:{}:{}:workteam/{}/{}".format(region, account_id, workforce_type, workforce_name)
    role_arn = "arn:aws:iam::{}:role/{}".format( account_id,  role)
    pre_lambda_arn = "arn:aws:lambda:{}:{}:function:{}".format(region, account_id,  pre_lambda)
    post_lambda_arn = "arn:aws:lambda:{}:{}:function:{}".format(region, account_id,  post_lambda)
    
    num_workers_per_object = 1
    task_time_limit_sec = 60  * 60 * 5
    task_availablity_sec =60  * 60 * 24 * 10
    
    job = client.create_labeling_job(LabelingJobName=job_name
                                    ,LabelAttributeName = label_attribute_name
                                    ,InputConfig = {
                                        "DataSource": {
                                            'S3DataSource': {
                                            'ManifestS3Uri': s3_manifest
                                            }
                                        }
                                        
                                    }
                                  ,OutputConfig={
                                        'S3OutputPath': s3_gt_output
                                    }

                                  , RoleArn = role_arn
                                  , HumanTaskConfig={
                                    'WorkteamArn': workforce_arn,
                                    'UiConfig': {
                                        'UiTemplateS3Uri': s3_template
                                    },
                                    'PreHumanTaskLambdaArn': pre_lambda_arn,
                                    'TaskKeywords': [
                                        'PPI',
                                    ],
                                    'TaskTitle': 'Verify PPI extraction for protein {}'.format(s3_manifest.split("/")[-1]),
                                    'TaskDescription': 'Verifies PPi extraction',
                                    'NumberOfHumanWorkersPerDataObject': num_workers_per_object,
                                    'TaskTimeLimitInSeconds': task_time_limit_sec,
                                    'TaskAvailabilityLifetimeInSeconds': task_availablity_sec,
                                    'MaxConcurrentTaskCount': 10,
                                    'AnnotationConsolidationConfig': {
                                        'AnnotationConsolidationLambdaArn': post_lambda_arn
                                    }
                                }
                            )
    
    return job
    
    

def create_groundtruth_labelling_multiple_jobs(lst_s3_manifests, s3_gt_output, s3_template, pre_lambda, post_lambda, role, workforce_name, job_prefix ="ppi", label_attribute_name="class"):
    job_prefix = "{}-{}".format(job_prefix , datetime.now().strftime("%Y%m%d%H%M%S"))
    for s3_manifest in lst_s3_manifests:
        job_name = "{}-{}".format( job_prefix, s3_manifest.split("/")[-1].split("_")[-1].split(".")[0])
        create_groundtruth_labelling_job(s3_manifest, s3_gt_output, s3_template, pre_lambda, post_lambda, role, workforce_name, job_name)

In [26]:
!wget https://raw.githubusercontent.com/elangovana/ppi-sagemaker-groundtruth-verification/main/src/template/template.html

--2021-02-21 02:58:17--  https://raw.githubusercontent.com/elangovana/ppi-sagemaker-groundtruth-verification/main/src/template/template.html
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3360 (3.3K) [text/plain]
Saving to: ‘template.html.4’


2021-02-21 02:58:17 (48.6 MB/s) - ‘template.html.4’ saved [3360/3360]



In [27]:
role_name = "service-role/AmazonSageMaker-ExecutionRole-20210104T161547"
pre_lambda="Sagemaker-ppipreprocessing"
post_lambda="sagemaker-ppipostprocessing"
s3_gt_output = "{}_trainingdata_groundtruth/".format("/".join(s3_prefix.split("/")[0:-1]))
workforce_name = "ppi-team"
s3_template_file = "{}_gt_templatet/template.html".format("/".join(s3_prefix.split("/")[0:-1]))

upload_file("template.html", s3_template_file )
create_groundtruth_labelling_multiple_jobs (s3_manifests,s3_gt_output, s3_template_file,pre_lambda, post_lambda, role_name, workforce_name)