This notebooks contains code for creating and testing pipeline steps scripts

In [35]:
from HavosAi.src.sagemaker_pipeline.constants import BASE_DIR, INPUTS_DIR, OUTPUTS_DIR, MODELS_DIR, OUTCOMES_MODEL

### download artifacts for the pipeline to this notebook instance

it is done so that we can debug the scripts here, before deploying them to the pipeline containers

In [36]:
!sudo mkdir -p {BASE_DIR}
!sudo mkdir -p {INPUTS_DIR}
!sudo mkdir -p {OUTPUTS_DIR}
!sudo mkdir -p {MODELS_DIR}

In [37]:
import os

def download_s3_folder(bucket_name, s3_folder, local_dir=None):
    """
    Download the contents of a folder directory
    Args:
        bucket_name: the name of the s3 bucket
        s3_folder: the folder path in the s3 bucket
        local_dir: a relative or absolute directory path in the local file system
    """
    bucket = s3.Bucket(bucket_name)
    for obj in bucket.objects.filter(Prefix=s3_folder):
        target = obj.key if local_dir is None \
            else os.path.join(local_dir, os.path.relpath(obj.key, s3_folder))
        if not os.path.exists(os.path.dirname(target)):
            os.makedirs(os.path.dirname(target))
        if obj.key[-1] == '/':
            continue
        bucket.download_file(obj.key, target)

In [38]:
# artifacts = [input_data_path_in_bucket , search_index_path_in_bucket, 
#                  population_tags_path_in_bucket, 
#                  outcomes_bert_model_path_in_bucket
#                 ]

In [39]:
# s3 = boto3.resource("s3")

# for artifact in artifacts:
#     if artifact.endswith('/'):
#         download_s3_folder(default_bucket, artifact, local_dir=f"{base_dir}/{artifact}")
#     else:
#         s3.Bucket(default_bucket).download_file(
#             artifact, f"{base_dir}/{artifact}"
#         )

### Prepare for creating PipelineStep scripts

### Abbreviations Resolver

In [40]:
%%writefile HavosAi/src/sagemaker_pipeline/AbbreviationsResolverStep.py

import json
import pandas as pd
import pickle
from src.text_processing.abbreviations_resolver import AbbreviationsResolver
from src.sagemaker_pipeline.constants import CONFIG_INPUTS_DIR, INPUTS_DIR, MODELS_INPUTS_DIR, OUTPUTS_DIR, OUTCOMES_MODEL

  
if __name__ == "__main__":
    with open(f"{CONFIG_INPUTS_DIR}/config.json", "r") as file:
        config = json.load(file)
        print("file config.json: ", config)
        
    df = pd.read_csv(
        f"{INPUTS_DIR}/subfolder-0-reduced.csv",
        sep=';',
        index_col=0,
    )
    df.to_csv(f"{OUTPUTS_DIR}/AbbreviationsResolverOutput.csv", sep=';')
    
    
    if config["Steps"]["AbbreviationsResolverStep"]["Apply"] == "True":
        print("AbbreviationsResolverStep", "true")
        
        abbreviations_resolver = AbbreviationsResolver([]) 
        abbreviations_resolver.load_model(f"{MODELS_INPUTS_DIR}")
        
        print("Test before dump:")
        print(type(abbreviations_resolver.abbreviations_finder_dict), 
              type(abbreviations_resolver.sorted_resolved_abbreviations))
        
        with open(f"{OUTCOMES_MODEL}/abbreviation_resolver.pickle", "wb") as outfile:
            pickle.dump(abbreviations_resolver, outfile)
            
        print("Test after dump:")
        with open(f"{OUTCOMES_MODEL}/abbreviation_resolver.pickle", "rb") as outfile:
            abbreviations_resolver2 = pickle.load(outfile)
        print(type(abbreviations_resolver2.abbreviations_finder_dict), 
              type(abbreviations_resolver2.sorted_resolved_abbreviations))
        
    else:
        print("AbbreviationsResolverStep", "false")


Overwriting HavosAi/src/sagemaker_pipeline/AbbreviationsResolverStep.py


### SearchIndex

In [41]:

%%writefile HavosAi/src/sagemaker_pipeline/SearchIndexStep.py

import json
import pandas as pd
import pickle
from src.text_processing.search_engine_insensitive_to_spelling import SearchEngineInsensitiveToSpelling
from src.sagemaker_pipeline.constants import CONFIG_INPUTS_DIR, INPUTS_DIR, MODELS_INPUTS_DIR, OUTPUTS_DIR, OUTCOMES_MODEL

  
if __name__ == "__main__":
    with open(f"{CONFIG_INPUTS_DIR}/config.json", "r") as file:
        config = json.load(file)
        print("file config.json: ", config)
        
    df = pd.read_csv(
        f"{INPUTS_DIR}/AbbreviationsResolverOutput.csv",
        sep=';',
        index_col=0,
    )
    df.to_csv(f"{OUTPUTS_DIR}/SearchIndexOutput.csv", sep=';')
    
    
    if config["Steps"]["SearchIndexStep"]["Apply"] == "True":
        print("SearchIndexStep", "true")
        search_engine = SearchEngineInsensitiveToSpelling(
            abbreviation_folder=MODELS_INPUTS_DIR, 
            load_abbreviations=True,
        )
        search_engine.create_inverted_index(df)
        with open(f"{OUTCOMES_MODEL}/search_index.pickle", "wb") as outfile:
            pickle.dump(search_engine, outfile)
    else:
        print("SearchIndexStep", "false")


Overwriting HavosAi/src/sagemaker_pipeline/SearchIndexStep.py


### AdvancedTextNormalizer


In [42]:

%%writefile HavosAi/src/sagemaker_pipeline/AdvancedTextNormalizerStep.py

import json
import pandas as pd
import pickle
from src.text_processing.advanced_text_normalization import AdvancedTextNormalizer
from src.sagemaker_pipeline.constants import CONFIG_INPUTS_DIR, INPUTS_DIR, MODELS_INPUTS_DIR, OUTPUTS_DIR, OUTCOMES_MODEL

  
if __name__ == "__main__":
    with open(f"{CONFIG_INPUTS_DIR}/config.json", "r") as file:
        config = json.load(file)
        print("file config.json: ", config)
        
    df = pd.read_csv(
            f"{INPUTS_DIR}/SearchIndexOutput.csv",
        sep=';',
        index_col=0,
    )

    with open(f"{MODELS_INPUTS_DIR}/abbreviation_resolver.pickle", "rb") as outfile:
        abbreviations_resolver = pickle.load(outfile)
    if config["Steps"]["AdvancedTextNormalizerStep"]["Apply"] == "True":
        print("AdvancedTextNormalizerStep", "true")
        advanced_text_normalizer = AdvancedTextNormalizer(abbreviations_resolver)
        df = advanced_text_normalizer.normalize_text_for_df(df)
    else:
        print("AdvancedTextNormalizerStep", "false")
    df.to_csv(f"{OUTPUTS_DIR}/AdvancedTextNormalizerOutput.csv", sep=';')

Overwriting HavosAi/src/sagemaker_pipeline/AdvancedTextNormalizerStep.py


### KeywordsNormalizer


In [43]:
%%writefile HavosAi/src/sagemaker_pipeline/KeywordsNormalizerStep.py

import json
import pandas as pd
from src.text_processing.keywords_normalizer import KeywordsNormalizer
from src.sagemaker_pipeline.constants import CONFIG_INPUTS_DIR, INPUTS_DIR, OUTPUTS_DIR, OUTCOMES_MODEL

  
if __name__ == "__main__":
    with open(f"{CONFIG_INPUTS_DIR}/config.json", "r") as file:
        config = json.load(file)
        print("file config.json: ", config)
        
    df = pd.read_csv(
        f"{INPUTS_DIR}/AdvancedTextNormalizerOutput.csv",
        sep=';',
        index_col=0,
    )

    if config["Steps"]["KeywordsNormalizerStep"]["Apply"] == "True":
        print("KeywordsNormalizerStep", "true")
        # no column "identificators" in `key_words_column_names`, as we couldn't find it
        df = KeywordsNormalizer().normalize_key_words(df, key_words_column_names=["keywords",])
    else:
        print("KeywordsNormalizerStep", "false")
    df.to_csv(f"{OUTPUTS_DIR}/KeywordsNormalizerOutput.csv", sep=';')

Overwriting HavosAi/src/sagemaker_pipeline/KeywordsNormalizerStep.py


### JournalNormalizer


In [44]:
%%writefile HavosAi/src/sagemaker_pipeline/JournalNormalizerStep.py

import json
import pandas as pd
from src.text_processing.journal_normalizer import JournalNormalizer
from src.sagemaker_pipeline.constants import CONFIG_INPUTS_DIR, INPUTS_DIR, OUTPUTS_DIR

  
if __name__ == "__main__":
    
    with open(f"{CONFIG_INPUTS_DIR}/config.json", "r") as file:
        config = json.load(file)
        print("file config.json: ", config)
        
    df = pd.read_csv(
            f"{INPUTS_DIR}/KeywordsNormalizerOutput.csv",
        sep=';',
        index_col=0,
    )
    
    if config["Steps"]["JournalNormalizerStep"]["Apply"] == "True":
        print("JournalNormalizerStep", "true")
        df = JournalNormalizer().correct_journal_names(df, journal_column="journal_name")
    else:
        print("JournalNormalizerStep", "false")
    df.to_csv(f"{OUTPUTS_DIR}/JournalNormalizerOutput.csv", sep=';')

Overwriting HavosAi/src/sagemaker_pipeline/JournalNormalizerStep.py


### AuthorAndAffiliationsProcessing


In [45]:
%%writefile HavosAi/src/sagemaker_pipeline/AuthorAndAffiliationsProcessingStep.py

import json
import pandas as pd
import os
from src.text_processing.author_and_affiliations_processing import AuthorAndAffiliationsProcessing
from src.sagemaker_pipeline.constants import CONFIG_INPUTS_DIR, INPUTS_DIR, OUTPUTS_DIR

  
if __name__ == "__main__":
    print("/opt/ml/processing/data", os.listdir("/opt/ml/processing/data"))
    # tmp
    df_ex = pd.read_excel("/opt/ml/processing/data/districts.xlsx")
    
    with open(f"{CONFIG_INPUTS_DIR}/config.json", "r") as file:
        config = json.load(file)
        print("file config.json: ", config)
        
    df = pd.read_csv(
        f"{INPUTS_DIR}/JournalNormalizerOutput.csv",
        sep=';',
        index_col=0,
    )

    if config["Steps"]["AuthorAndAffiliationsProcessingStep"]["Apply"] == "True":
        print("AuthorAndAffiliationsProcessingStep", "true")
        os.chdir("/opt/ml/processing/data")
        df_ex = pd.read_excel("../data/GeoRegions.xlsx")
        df = AuthorAndAffiliationsProcessing().process_authors_and_affiliations(df)
        os.chdir("/")
    else:
        print("AuthorAndAffiliationsProcessingStep", "false")
    df.to_csv(f"{OUTPUTS_DIR}/AuthorAndAffiliationsProcessingOutput.csv", sep=';')

Overwriting HavosAi/src/sagemaker_pipeline/AuthorAndAffiliationsProcessingStep.py


### GeoNameFinder


In [53]:
%%writefile HavosAi/src/sagemaker_pipeline/GeoNameFinderStep.py

import json
import pandas as pd
import os
import pickle
from src.text_processing.geo_names_finder import GeoNameFinder
from src.sagemaker_pipeline.constants import CONFIG_INPUTS_DIR, INPUTS_DIR, MODELS_INPUTS_DIR, OUTPUTS_DIR, OUTCOMES_MODEL

  
if __name__ == "__main__":
    print("/opt/ml/processing/data", os.listdir("/opt/ml/processing/data"))
    # tmp
    df_ex = pd.read_excel("/opt/ml/processing/data/districts.xlsx")
    
    with open(f"{CONFIG_INPUTS_DIR}/config.json", "r") as file:
        config = json.load(file)
        print("file config.json: ", config)
        
    df = pd.read_csv(
            f"{INPUTS_DIR}/AuthorAndAffiliationsProcessingOutput.csv",
        sep=';',
        index_col=0,
    )

    with open(f"{MODELS_INPUTS_DIR}/search_index.pickle", "rb") as outfile:
        search_index = pickle.load(outfile)
    if config["Steps"]["GeoNameFinderStep"]["Apply"] == "True":
        print("GeoNameFinderStep", "true")
        os.chdir("/opt/ml/processing/data")
        df = GeoNameFinder().label_articles_with_geo_names(df, search_index)
        os.chdir("/")
    else:
        print("GeoNameFinderStep", "false")
    print(f"saving to {OUTPUTS_DIR}/GeoNameFinderOutput.csv")
    df.to_csv(f"{OUTPUTS_DIR}/GeoNameFinderOutput.csv", sep=';')
    print("saved")

Overwriting HavosAi/src/sagemaker_pipeline/GeoNameFinderStep.py


### CropsSearch


In [47]:
%%writefile HavosAi/src/sagemaker_pipeline/CropsSearchStep.py

import json
import pandas as pd
import os
import pickle
from src.text_processing.crops_finder import CropsSearch
from src.sagemaker_pipeline.constants import CONFIG_INPUTS_DIR, INPUTS_DIR, MODELS_INPUTS_DIR, OUTPUTS_DIR, OUTCOMES_MODEL

  
if __name__ == "__main__":
    print("/opt/ml/processing/data", os.listdir("/opt/ml/processing/data"))
    # tmp
    df_ex = pd.read_excel("/opt/ml/processing/data/districts.xlsx")
    
    with open(f"{CONFIG_INPUTS_DIR}/config.json", "r") as file:
        config = json.load(file)
        print("file config.json: ", config)
        
    df = pd.read_csv(
        f"{INPUTS_DIR}/GeoNameFinderOutput.csv",
        sep=';',
        index_col=0,
    )

    with open(f"{MODELS_INPUTS_DIR}/search_index.pickle", "rb") as outfile:
        search_index = pickle.load(outfile)
    if config["Steps"]["CropsSearchStep"]["Apply"] == "True":
        print("CropsSearchStep", "true")
        os.chdir("/opt/ml/processing/data")
        df = CropsSearch(
            search_index, 
            "../data/map_plant_products.xlsx"
        ).find_crops(df, column_name="plant_products_search")
        df = CropsSearch(
            search_index, 
            "../data/map_animal_products.xlsx"
        ).find_crops(df, column_name="animal_products_search")
        df = CropsSearch(
            search_index, 
            "../data/map_animals.xlsx"
        ).find_crops(df, column_name="animals_found")
        os.chdir("/")
    else:
        print("CropsSearchStep", "false")
    df.to_csv(f"{OUTPUTS_DIR}/CropsSearchOutput.csv", sep=';')

Overwriting HavosAi/src/sagemaker_pipeline/CropsSearchStep.py


### PopulationTagsFinder


In [48]:
%%writefile HavosAi/src/sagemaker_pipeline/PopulationTagsFinderStep.py

import json
import pandas as pd
import pickle
from src.text_processing.population_tags_finder import PopulationTagsFinder
from src.sagemaker_pipeline.constants import CONFIG_INPUTS_DIR, INPUTS_DIR, MODELS_INPUTS_DIR, OUTPUTS_DIR, OUTCOMES_MODEL

  
if __name__ == "__main__":
    with open(f"{CONFIG_INPUTS_DIR}/config.json", "r") as file:
        config = json.load(file)
        print("file config.json: ", config)
        
    df = pd.read_csv(
            f"{INPUTS_DIR}/CropsSearchOutput.csv",
        sep=';',
        index_col=0,
    )

    with open(f"{MODELS_INPUTS_DIR}/search_index.pickle", "rb") as outfile:
        search_index = pickle.load(outfile)
    if config["Steps"]["PopulationTagsFinderStep"]["Apply"] == "True":
        print("PopulationTagsFinderStep", "true")
        population_tags_finder = PopulationTagsFinder()
        df = population_tags_finder.label_with_population_tags(df, search_index)
    else:
        print("PopulationTagsFinderStep", "false")
    df.to_csv(f"{OUTPUTS_DIR}/PopulationTagsFinderOutput.csv", sep=';')

Overwriting HavosAi/src/sagemaker_pipeline/PopulationTagsFinderStep.py


### ColumnFiller


In [49]:
%%writefile HavosAi/src/sagemaker_pipeline/ColumnFillerStep.py

import json
import pandas as pd
import os
import pickle
from src.text_processing.column_filler import ColumnFiller
from src.sagemaker_pipeline.constants import CONFIG_INPUTS_DIR, INPUTS_DIR, SEARCH_INPUTS_DIR, ABBREV_INPUTS_DIR, OUTPUTS_DIR, OUTCOMES_MODEL

  
if __name__ == "__main__":
    df_tmp = pd.read_excel("/opt/ml/processing/data/population_tags.xlsx")
    
    with open(f"{CONFIG_INPUTS_DIR}/config.json", "r") as file:
        config = json.load(file)
        print("file config.json: ", config)
        
    df = pd.read_csv(
            f"{INPUTS_DIR}/PopulationTagsFinderOutput.csv",
        sep=';',
        index_col=0,
    )

    with open(f"{SEARCH_INPUTS_DIR}/search_index.pickle", "rb") as outfile:
        search_index = pickle.load(outfile)
    with open(f"{ABBREV_INPUTS_DIR}/abbreviation_resolver.pickle", "rb") as outfile:
        abbreviation_resolver = pickle.load(outfile)
    
    print("Abbreviation_resolver check:")
    print(type(abbreviation_resolver))
    try:
        print(type(abbreviation_resolver.abbreviations_finder_dict))
    except:
        print("error: abbreviations_finder_dict")  
    try:
        print(type(abbreviation_resolver.resolved_abbreviations))
    except:
        print("error: resolved_abbreviations") 
    try:
        print(type(abbreviation_resolver.words_to_abbreviations))
    except:
        print("error: words_to_abbreviations")
    try:
        print(type(abbreviation_resolver.sorted_resolved_abbreviations))
    except:
        print("error: sorted_resolved_abbreviations")
    try:
        print(type(abbreviation_resolver.sorted_words_to_abbreviations))
    except:
        print("error: sorted_words_to_abbreviations")
    
    if config["Steps"]["ColumnFillerStep"]["Apply"] == "True":
        print("ColumnFillerStep", "true")
        os.chdir("/opt/ml/processing/data")
        df = ColumnFiller(
            dict_filename="../data/population_tags.xlsx",
        ).label_articles_with_outcomes(
            "gender_age_population_tags", 
            df, 
            search_index, 
            abbreviation_resolver,
        )
        os.chdir("/")
    else:
        print("ColumnFillerStep", "false")
    df.to_csv(f"{OUTPUTS_DIR}/ColumnFillerOutput.csv", sep=';')

Overwriting HavosAi/src/sagemaker_pipeline/ColumnFillerStep.py


### ProgramExtractor


In [50]:
%%writefile HavosAi/src/sagemaker_pipeline/ProgramExtractorStep.py

import json
import pandas as pd
import os
import pickle
from src.interventions_labeling_lib.programs_extractor import ProgramExtractor
from src.sagemaker_pipeline.constants import CONFIG_INPUTS_DIR, INPUTS_DIR, SEARCH_INPUTS_DIR, ABBREV_INPUTS_DIR, OUTPUTS_DIR, OUTCOMES_MODEL

  
if __name__ == "__main__":
    df_tmp = pd.read_excel("/opt/ml/processing/data/extracted_programs.xlsx")
    with open(f"/opt/ml/processing/tmp/programs_extraction_model_2619/meta.json", "r") as file:
        json_tmp = json.load(file)
    
    with open(f"{CONFIG_INPUTS_DIR}/config.json", "r") as file:
        config = json.load(file)
        print("file config.json: ", config)
        
    df = pd.read_csv(
            f"{INPUTS_DIR}/ColumnFillerOutput.csv",
        sep=';',
        index_col=0,
    )

    with open(f"{SEARCH_INPUTS_DIR}/search_index.pickle", "rb") as outfile:
        search_index = pickle.load(outfile)
    with open(f"{ABBREV_INPUTS_DIR}/abbreviation_resolver.pickle", "rb") as outfile:
        abbreviation_resolver = pickle.load(outfile)
    if config["Steps"]["ProgramExtractorStep"]["Apply"] == "True":
        print("ProgramExtractorStep", "true")
        os.chdir("/opt/ml/processing/data")
        df = ProgramExtractor([]).label_articles_with_programs(df, search_index, abbreviation_resolver)
        os.chdir("/")
    else:
        print("ProgramExtractorStep", "false")
    df.to_csv(f"{OUTPUTS_DIR}/ProgramExtractorOutput.csv", sep=';')

Overwriting HavosAi/src/sagemaker_pipeline/ProgramExtractorStep.py


### Outcomes finder

In [51]:
%%writefile HavosAi/src/sagemaker_pipeline/OutcomesFinderStep.py

import json
import pandas as pd
from src.text_processing.all_column_filler import AllColumnFiller
from src.sagemaker_pipeline.constants import  CONFIG_INPUTS_DIR, INPUTS_DIR, OUTPUTS_DIR, OUTCOMES_MODEL


if __name__ == "__main__":
    with open(f"{CONFIG_INPUTS_DIR}/config.json", "r") as file:
        config = json.load(file)
        print("file config.json: ", config)
#     with open(f"{CONFIG_OUTPUTS_DIR}/config.json", "w") as file:
#         json.dump(config, file)
    df = pd.read_csv(
        f"{INPUTS_DIR}/ProgramExtractorOutput.csv",
        sep=';',
        index_col=0,
    )    
    if config["Steps"]["OutcomesFinderStep"]["Apply"] == "True":
        print("OutcomesFinderStep", "true")
        column_info = dict()
        column_info["model_folder"] = OUTCOMES_MODEL
        all_column_filler = AllColumnFiller()
        df = all_column_filler.fill_outcomes(df, None, None, column_info)
    else:
        print("OutcomesFinderStep", "false")
    print(f'Outcomes finder. Saving df to {OUTPUTS_DIR}/OutcomesFinderOutput.csv')
    df.to_csv(f"{OUTPUTS_DIR}/OutcomesFinderOutput.csv", sep=';')
    print(f'Outcomes finder. Saved df.')


Overwriting HavosAi/src/sagemaker_pipeline/OutcomesFinderStep.py
