# CFIA Importer
[![Static Badge](https://img.shields.io/badge/Jupyter_Notebook-F37726?style=for-the-badge)](https://jupyter.org/)

<br>

## Requirements
- Python (Version 3.6 or up)

<br>
<br>

### Install Required Dependencies
Run the code block below to install the required dependencies.

<br>

> ***📝 NOTE:*** <br>
>
> If you already have the required dependencies, you can *optionally* run the code block below.
> 
> (For this case, running the code will only import the required libraries without sending any HTTP server requests to Pypi)

In [23]:
###################
# Note: The code below is modified from AGRemap for dynamically installing
#   packages at runtime. That way, we do not need to make extra HTTP requests
#   to Pypi's server if the user's python environment already has the required libraries
#
# Reference:
#    https://github.com/nhok0169/Anime-Game-Remap/blob/nhok0169/Anime%20Game%20Remap%20(for%20all%20users)/api/src/FixRaidenBoss2/tools/PackageManager.py
#    https://github.com/nhok0169/Anime-Game-Remap/blob/nhok0169/Anime%20Game%20Remap%20(for%20all%20users)/api/src/FixRaidenBoss2/tools/PackageData.py


import pip._internal as pip
import importlib
from types import ModuleType
from typing import Optional, Dict


# PackageData: Class to hold data for importing a package
class PackageData():
    def __init__(self, module: str, installName: Optional[str] = None):
        self.module = module
        self.installName = module if (installName is None) else installName


# PackageManager: Class to manage the packages
class PackageManager():
    def __init__(self):
        self._packages: Dict[str, ModuleType] = {}

    # load(module, installName, save): Tries to import a package and install the package if the package
    #   is not installed yet. Can optionally save to cache.
    def load(self, module: str, installName: Optional[str] = None, save: bool = True) -> ModuleType:
        if (installName is None):
            installName = module

        try:
            return importlib.import_module(module)
        except ModuleNotFoundError:
            pip.main(['install', '-U', installName])

        result = importlib.import_module(module)
        if (save):
            self._packages[module] = result
        
        return result
    
    # get(packageData, cache): Retrieves a package and installs the package if the package is not installed yet.
    #   Has optional caching capability.
    def get(self, packageData: PackageData, cache: bool = True) -> ModuleType:
        if (not cache):
            return self.load(packageData.module, installName = packageData.installName, save = cache)

        result = None
        try:
            result = self._packages[packageData.module]
        except KeyError:
            result = self.load(packageData.module, installName = packageData.installName, save = cache)

        return result


##############################
# The required installations #
# ############################
Packages = [
    PackageData("pandas"),
    PackageData("openpyxl")
]

Packager = PackageManager()

for package in Packages:
    Packager.get(package, cache = False)


<br>

## User Settings

Below shows some configurations that may be different depending on the user.

<br>

> ***❇️ Important***
>
> Please ensure settings below are configured correctly.
>

In [1]:
import os

# The folder where the raw data files are located
DataFolder = "data"

# The file location to the output files 
OutputFolder = os.path.join("..", "..", "data")
OutputFileName = "CFIA Data"


<br>

## Running the Importer

The code blocks below cleans up the raw CFIA data files to look similar to the Health Canada data

<br>

In [17]:
import warnings
warnings.simplefilter(action='ignore')

import pandas as pd
import numpy as np
import glob
import os
from threading import Lock, Thread
from enum import Enum
from typing import List, Dict, Callable


# Languages: Different languages available
class Languages(Enum):
    English = "en"
    French = "fr"


# CFIADataCols: Different columns for the CFIA data
class CFIADataCols(Enum):
    FoodGroup = "Food Group"
    FoodName = "Food Name"
    Agent = "Agent"
    Genus = "Genus"
    Species = "Species"
    SeroType = "Serotype"
    EColiCategory = "Ecoli CFIA Category"
    Result = "Result"
    QualitativeResult = "Qualitative Result"
    QuantitativeResult = "Quantitative Result"
    QuantitativeResultOperator = "Quantitative Result Operator"
    QuantitativeResultUnit = "Quantitative Result Unit"
    ProductDescription = "Product Description"
    CountryOfOrigin = "Country of Origin"
    ResultComments = "Result Comments"
    SamplingLocationCityName = "Sampling Location City Name"
    ProjectCode = "Project Code"
    MethodComments = "Method Comments"


# CFIAStrConsts: Some string keywords used in the raw CFIA data
class CFIAStrConsts(Enum):
    LangSeperator = "//"


# index position for each language part in to retrieve from the raw CFIA data
CFIALangPos = {Languages.English: 0, Languages.French: 1}

# Columns to have english and french seperated
CFIALangSeperatedCols = [CFIADataCols.FoodName.value, CFIADataCols.QuantitativeResultUnit.value, 
                         CFIADataCols.ProductDescription.value, CFIADataCols.CountryOfOrigin.value, 
                         CFIADataCols.ResultComments.value, CFIADataCols.SamplingLocationCityName.value,
                         CFIADataCols.ProjectCode.value, CFIADataCols.MethodComments.value]

# Columns that need translations
CFIANeedTranslationCols = [CFIADataCols.FoodGroup.value]

# Translations for certain keywords
Translations = {
    Languages.English: {
        CFIADataCols.FoodGroup.value: {
            'Fish and fish products, including mollusks, crustaceans, and echinoderms': 'Fish and fish products, including mollusks, crustaceans, and echinoderms', 
            'Foodstuffs intended for particular nutritional uses': 'Foodstuffs intended for particular nutritional uses', 
            'Dairy products and analogues, excl butter ': 'Dairy products and analogues, excl butter ', 
            'Meat and meat products, including poultry and game': 'Meat and meat products, including poultry and game', 
            'Fruits and vegetables (incl fungi, legumes, aloe), seaweeds, nuts, seeds': 'Fruits and vegetables (incl fungi, legumes, aloe), seaweeds, nuts, seeds', 
            'Environmental Samples': 'Environmental Samples', 
            'Eggs and egg products': 'Eggs and egg products'
        },

        CFIADataCols.EColiCategory.value: {
            "O157": "O157", 
            "Verotoxigenic": "Verotoxigenic"
        },

        CFIADataCols.QualitativeResult.value: {
            "Detected": "Detected",
            "Not Detected": "Not Detected",
            "Not Tested": "Not Tested"
        }
    },
    Languages.French: {
        CFIADataCols.FoodGroup.value: {
            'Fish and fish products, including mollusks, crustaceans, and echinoderms': 'Fish and fish products, including mollusks, crustaceans, and echinoderms', 
            'Foodstuffs intended for particular nutritional uses': 'Foodstuffs intended for particular nutritional uses', 
            'Dairy products and analogues, excl butter ': 'Dairy products and analogues, excl butter ', 
            'Meat and meat products, including poultry and game': 'Meat and meat products, including poultry and game', 
            'Fruits and vegetables (incl fungi, legumes, aloe), seaweeds, nuts, seeds': 'Fruits and vegetables (incl fungi, legumes, aloe), seaweeds, nuts, seeds', 
            'Environmental Samples': 'Environmental Samples', 
            'Eggs and egg products': 'Eggs and egg products'
        },

        CFIADataCols.EColiCategory.value: {
            "O157": "O157", 
            "Verotoxigenic": "Verotoxigenic"
        },

        CFIADataCols.QualitativeResult.value: {
            "Detected": "Detected",
            "Not Detected": "Not Detected",
            "Not Tested": "Not Tested"
        }
    }
}


# ThreadManager: Class to manage running many threads
class ThreadManager():
    def __init__(self):
        self.threads = []

    # clear(): Clears all the threads
    def clear(self):
        self.threads.clear()

    # add(*args, **kwargs): Adds a thread
    def add(self, *args, **kwargs):
        self.threads.append(Thread(*args, **kwargs))

    # waitAll(): Runs all the threads at once and waits for all the threads to finish
    def waitAll(self):
        for thread in self.threads:
            thread.start()

        for thread in self.threads:
            thread.join()


# Importer: Class to cleanup the CFIA data
class Importer():
    def __init__(self, dataFolder: str = DataFolder, outputFolder: str = OutputFolder, outputFileName: str = OutputFileName):
        self.dataFolder = dataFolder
        self.outputFolder = outputFolder
        self.outputFileName = outputFileName

        self._outputDataLocks = {}
        self._outputData = {}

        for lang in Languages:
            self._outputDataLocks[lang] = Lock()
            self._outputData[lang] = None

    # _translateCol(col, rawData, data, dataLocks): Translates all the values in a single column
    def _translateCol(self, col: str, rawData: pd.DataFrame, data: Dict[Languages, pd.DataFrame], dataLocks: Dict[Languages, Lock]):
        colVals = rawData[col]

        for lang in Languages:
            translations = Translations[lang].get(col)
            if (translations is None):
                continue

            data[col] = colVals.apply(lambda colVal: translations.get(colVal, colVal))

    # _translate(rawData, data, dataLocks): Translates certain columns from the raw data
    def _translate(self, rawData: pd.DataFrame, data: Dict[Languages, pd.DataFrame], dataLocks: Dict[Languages, Lock]):
        threads = ThreadManager()
        for col in CFIANeedTranslationCols:
            threads.add(target = self._translateCol, args = [col, rawData, data, dataLocks], daemon=True)

        threads.waitAll()

    # _seperateTranslationCol(colValue): Seperate out a value consisting of the combined
    #   translations into individual translated parts
    @classmethod
    def _seperateTranslationVal(cls, colValue: str) -> List[str]:
        result = colValue.split(CFIAStrConsts.LangSeperator.value)
        result = list(map(lambda translatedPart: translatedPart.strip(), result))

        langLen = len(Languages)
        resultLen = len(result)

        if (resultLen == langLen):
            return result
        elif (resultLen > langLen):
            return result[:langLen]
        
        for i in range(resultLen, langLen):
            result.append(result[0])
        
        return result
    
    # _seperateTranslationCol(col, rawData, data, dataLocks): Seperate out the combined translated values in a column
    def _seperateTranslationCol(self, col: str, rawData: pd.DataFrame, data: Dict[Languages, pd.DataFrame], dataLocks: Dict[Languages, Lock]):
        colVals = rawData.get(col)
        if (colVals is None):
            return

        colVals = colVals.apply(self._seperateTranslationVal)

        for lang in Languages:
            currentData = data[lang]
            langInd = CFIALangPos[lang]
            lock = dataLocks[lang]

            with lock:
                currentData[col] = colVals.apply(lambda langFoodGroups: langFoodGroups[langInd])

    # _seperateTranslations(rawData, data, rawDataLock, dataLocks): Seperate out the combined translated values from the raw data
    #   for each seperate language result data
    def _seperateTranslations(self, rawData: pd.DataFrame, data: Dict[Languages, pd.DataFrame], dataLocks: Dict[Languages, Lock]):
        colThreads = ThreadManager()
        for col in CFIALangSeperatedCols:
            colThreads.add(target = self._seperateTranslationCol, args = [col, rawData, data, dataLocks], daemon=True)

        colThreads.waitAll()

    # _createLangEColiCategory(lang, eColiCategoryVals, eColiBitArr, data, dataLocks): Creates the E-Coli Category for the data of a particular language
    def _createLangEColiCateogory(self, lang: Languages, eColiCategoryVals: np.array, eColiBitArr: List[pd.Series], data: Dict[Languages, pd.DataFrame], dataLocks: Dict[Languages, Lock]):
        currentData = data[lang]
        lock = dataLocks[lang]

        translations = Translations[lang][CFIADataCols.EColiCategory.value]
        eColiCategoryVals = np.select(eColiBitArr, [translations["O157"], translations["Verotoxigenic"]], default = "")

        with lock:
            currentData[CFIADataCols.EColiCategory.value] = eColiCategoryVals

    # _createEColiCategory(rawData, data, dataLocks): Creates the E-Coli category for the microorganism breadcrumb
    def _createEColiCategory(self, rawData: pd.DataFrame, data: Dict[Languages, pd.DataFrame], dataLocks: Dict[Languages, Lock]):
        rowCount = len(rawData.index)
        eColiCategoryVals = np.zeros(rowCount)

        agentVals = rawData[CFIADataCols.Agent.value]
        genusVals = rawData[CFIADataCols.Genus.value]
        speciesVals = rawData[CFIADataCols.Species.value]
        seroTypeVals = rawData[CFIADataCols.SeroType.value].fillna("")

        isEColi = ((agentVals == "Bacteria") & (genusVals == "Escherichia") & (speciesVals == "coli"))

        seroTypeIsO157 = seroTypeVals.str.contains(pat = "O157", regex = False)
        seroTypeNotO157 = ~seroTypeIsO157

        eColiBitArr = [isEColi & seroTypeIsO157, isEColi & seroTypeNotO157]

        threads = ThreadManager()
        for lang in Languages:
            threads.add(target = self._createLangEColiCateogory, args = [lang, eColiCategoryVals, eColiBitArr, data, dataLocks], daemon=True)

        threads.waitAll()

    def _readIndividual(self, key: str, combinedResult: Dict[str, pd.Series], combinedResultLock: Lock, readFunc: Callable[[], pd.Series], *args, **kwargs):
        result = readFunc(*args, **kwargs)

        with combinedResultLock:
            combinedResult[key] = result

    # _createLangQualitative(lang, qualitativeVals, bitArr, data, dataLocks): Creates the qualitative results column for the data of a certain language
    def _createLangQualitative(self, lang: Languages, qualitativeVals: np.array, bitArr: List[pd.Series], data: Dict[Languages, pd.DataFrame], dataLocks: Dict[Languages, Lock]):
        currentData = data[lang]
        lock = dataLocks[lang]

        translations = Translations[lang][CFIADataCols.QualitativeResult.value]
        qualitativeVals = np.select(bitArr,[translations["Detected"], translations["Not Detected"], translations["Not Tested"]], default = translations["Not Detected"])

        with lock:
            currentData[CFIADataCols.QualitativeResult.value] = qualitativeVals

    # _createLangQuantitativeResult(lang, data, dataLocks): Creates the quantitative results column for the data of a certain language
    def _createLangQuantitativeResult(self, lang: Languages, bitArr: pd.Series, data: Dict[Languages, pd.DataFrame], dataLocks: Dict[Languages, Lock]):
        currentData = data[lang]
        lock = dataLocks[lang]

        quantitativeNums = currentData[CFIADataCols.Result.value].str.replace(">|>=|=|~|<|<=| |\t", "", case = False, regex = True)

        with lock:
            currentData[CFIADataCols.QuantitativeResult.value] = np.where(bitArr, quantitativeNums, "")

    # _createLangQuantitativeOperator(lang, bitArr, data, dataLocks): Creates the quantitative operator column for the data of a certain language
    def _createLangQuantitativeOperator(self, lang: Languages, bitArr: pd.Series, data: Dict[Languages, pd.DataFrame], dataLocks: Dict[Languages, Lock]):
        currentData = data[lang]
        lock = dataLocks[lang]

        quantitativeOps = currentData[CFIADataCols.Result.value].str.replace(r"[0-9]|-|\.| |\t", "", case = False, regex = True)

        with lock:
            currentData[CFIADataCols.QuantitativeResultOperator.value] = np.where(bitArr, quantitativeOps, "")

    # _createQualitativeAndQuantitative(rawData, data, dataLocks): Creates both the qualitative and quantitative results
    def _createQualitativeAndQuantitative(self, rawData: pd.DataFrame, data: Dict[Languages, pd.DataFrame], dataLocks: Dict[Languages, Lock]):
        rowCount = len(rawData.index)
        resultVals = rawData[CFIADataCols.Result.value]
        qualitative = np.zeros(rowCount)

        bitArrDict = {}
        bitArrsLock = Lock()

        threads = ThreadManager()
        threads.add(target = self._readIndividual, args = ["isEmpty", bitArrDict, bitArrsLock, lambda col: col.isna(), resultVals], daemon=True)
        threads.add(target = self._readIndividual, args = ["foundNotDetected", bitArrDict, bitArrsLock, lambda col: col.str.contains("Not Detected", case = False, na = False, regex = False), resultVals], daemon = True)
        threads.add(target = self._readIndividual, args = ["foundNotAnalyzed", bitArrDict, bitArrsLock, lambda col: col.str.contains("Not Analyzed", case = False, na = False, regex = False), resultVals], daemon = True)
        threads.add(target = self._readIndividual, args = ["foundDetected", bitArrDict, bitArrsLock, lambda col: col.str.contains("^(?!.*Not).*Detected", case = False, na = False, regex = True), resultVals], daemon = True)
        threads.add(target = self._readIndividual, args = ["foundPositive", bitArrDict, bitArrsLock, lambda col: col.str.contains(r"^( |\t)*(((>|>=|=|~)( |\t)*-?[0-9]*(\.[0-9]+)?)|([1-9][0-9]*(\.[0-9]+)?))( |\t)*$", case = False, na = False, regex = True), resultVals], daemon = True)
        threads.add(target = self._readIndividual, args = ["foundNonPositive", bitArrDict, bitArrsLock, lambda col: col.str.contains(r"^( |\t)*(((<|<=)( |\t)*-?[0-9]*(\.[0-9]+)?)|0+(\.[0-9]+)?|-[0-9]+(\.[0-9]+)?)( |\t)*$", case = False, na = False, regex = True), resultVals], daemon = True)

        threads.waitAll()

        qualitativeBitArr = [bitArrDict["foundDetected"] | bitArrDict["foundPositive"], bitArrDict["foundNotDetected"] | bitArrDict["foundNonPositive"], bitArrDict["foundNotAnalyzed"]]
        quantitativeBitArr = bitArrDict["foundPositive"] | bitArrDict["foundNonPositive"]

        threads.clear()
        for lang in Languages:
            threads.add(target = self._createLangQualitative, args = [lang, qualitative, qualitativeBitArr, data, dataLocks])
            threads.add(target = self._createLangQuantitativeResult, args = [lang, quantitativeBitArr, data, dataLocks])
            threads.add(target = self._createLangQuantitativeOperator, args = [lang, quantitativeBitArr, data, dataLocks])

        threads.waitAll()

    # _mergeLangProcessedData(lang, processedData): Writes back the processed data from a single excel file to the merged result
    def _mergeLangProcessedData(self, lang: Languages, processedData: Dict[Languages, pd.DataFrame]):
        lock = self._outputDataLocks[lang]

        with lock:
            outputData = self._outputData[lang]
            self._outputData[lang] = processedData[lang].copy() if (outputData is None) else pd.concat([outputData, processedData[lang]])

    # processFile(excelFile): Cleanups a single excel file and merges the
    #   result into the output data
    def processFile(self, excelFile: str):
        rawData = pd.read_excel(excelFile)

        resultData = {}
        resultDataLocks = {}
        for lang in Languages:
            resultData[lang] = rawData.copy()
            resultDataLocks[lang] = Lock()

        # Since the procedure for cleaning/creating the columns do not have any data
        #   that depend on each other, we can run all these procedures in parallel
        threads = ThreadManager()
        threads.add(target = self._translate, args = [rawData, resultData, resultDataLocks], daemon=True)
        threads.add(target = self._seperateTranslations, args = [rawData, resultData, resultDataLocks], daemon=True)
        threads.add(target = self._createEColiCategory, args = [rawData, resultData, resultDataLocks], daemon=True)
        threads.add(target = self._createQualitativeAndQuantitative, args = [rawData, resultData, resultDataLocks], daemon = True)

        threads.waitAll()

        # write the processed data back to the combined output
        threads.clear()
        for lang in Languages:
            threads.add(target = self._mergeLangProcessedData, args = [lang, resultData])

        threads.waitAll()

    # _writeResult(lang): Writes back the processed result into the CSV file
    def _writeResult(self, lang: Languages):
        outputData = self._outputData[lang]
        if (outputData is None):
            return

        file = os.path.join(self.outputFolder, f"{self.outputFileName}-{lang.value}.csv")
        print(f"Writing CSV output for language: {lang.value} ...\n", end = "")
        outputData.to_csv(file, index = False, encoding = "utf-8")

    # run(): Runs the importer to create the cleaned-up data
    def run(self):
        excelFiles = glob.glob(os.path.join(f"{self.dataFolder}", "*.xlsx"))

        # simultaneously process multiple excel files at the same time
        threads = ThreadManager()
        for excelFile in excelFiles:
            threads.add(target = self.processFile, args=[excelFile], daemon=True)

        threads.waitAll()

        threads.clear()
        for lang in Languages:
            threads.add(target = self._writeResult, args = [lang])

        threads.waitAll()

########
# MAIN #
########
importer = Importer()
importer.run()

Writing CSV output for language: en ...
Writing CSV output for language: fr ...
