# The Data Collector

This notebook will be used to collect data to be used in the rest of the dashboard. Each
cell will be a self-contained codebase to collect data for a single data point, and will
correspond to a similar cell within The Dashboard UI notebook. Data persistence for these
notebooks will be in locally-stored CSV files (that can be changed easily by updating a
shared data persistence function), and the general execution flow will be as follows:

## Check last import date
Each data type and source will have a hard-coded Data Import Frequency variable, set by
looking at historical update frequency for said data. Checking this against the last import
date of the data in storage protects against unnecessary data pulls, network IO, and IP
blocking from data sources.

## Import data
If our cell passes the last import date gate, then we append new data to our existing source
in persistent storage.

## Check for consistency
Once data is imported, new data is checked against existing data to compare for consistency,
outliers, and missing values. If inconsistencies or missing values are found, the import is
flagged for human review.

# Shared functions
This cell contains functions to be used among all collectors, to minimize duplicated code.
*Note*: this cell _must_ be initialized before running any collector cell.

In [28]:
import io
import os
import pandas as pd
import datetime
import requests
import zipfile

# function that checks if file in persistent storage exists
def file_exists(collector_type: str, collector_subtype: str, file_name: str) -> bool:
    """
    Takes in a collector type, collector subtype, and file name and returns True if file exists and
    False otherwise.
    :param collector_type:
    :param collector_subtype:
    :param file_name:
    :return: boolean True if file exists and False otherwise
    """
    target_directory = os.path.join('data', 'output', collector_type, collector_subtype)
    target_file = os.path.join(target_directory, file_name)

    if os.path.isfile(target_file):
        return True
    else:
        return False


# function that opens or creates files in persistent storage
def file_opener(collector_type: str, collector_subtype: str, file_name: str) -> io:
    """
    Takes in a collector type, collector subtype, and file name and either opens the file if it exists
    :param collector_type:
    :param collector_subtype:
    :param file_name:
    :return:
    """
    target_directory = os.path.join('data', 'output', collector_type, collector_subtype)
    target_file = os.path.join(target_directory, file_name)

    if os.path.isfile(target_file):
        f = open(target_file, 'a')
        print('directory and file exist: file open for append')
        return f
    elif os.path.isdir(target_directory):
        # f = open(target_file, 'x')
        # print('directory exists: file created for write')

        print('directory exists: ready for file write')
        return None
    else:
        os.makedirs(target_directory)
        # f = open(target_file, 'x')
        print('neither directory nor file exist: ready for file write')
        return None


def file_closer(file) -> bool:
    file.close()
    print("file IO has terminated.")
    return True


# function that checks CSV date last updated and compares it to today
def file_last_updated(file_name, update_frequency: int) -> bool:
    """
    Checks when the file in persistent storage was last updated, compares it to the run date,
    then compares the time delta to the update frequency scheduled.

    :param file_name:
    :param update_frequency: how often the data needs to be refreshed, in days
    :return: boolean indicating whether or not the data needs to be updated in this run
    """
    file_path_representation = file_name.name
    stats = os.stat(file_path_representation)

    date_last_updated = datetime.datetime.fromtimestamp(stats.st_mtime)
    current_datetime = datetime.datetime.today()
    date_update_required = date_last_updated + datetime.timedelta(days=update_frequency)

    print('file last updated:\t\t' + str(date_last_updated))
    print('current datetime:\t\t' + str(current_datetime))
    print('next update required on:\t' + str(date_update_required))
    print('\tin ' + str(date_update_required - current_datetime) + ' and counting...')

    if date_update_required <= current_datetime:
        return True
    else:
        return False


# stub function that connects to an API to collect data based on CSV last updated date
def API_downloader(url: str, params: dict) -> None:
    pass


# stub function that scrapes website (if no API available) based on CSV last updated date
def scraper_downloader(url: str, params: dict) -> None:
    pass


def file_downloader(url: str, collector_type: str, collector_subtype: str, file_name: str) -> int:
    """
    Downloads a single file from any given URL.
    :param url: (string) a URL pointing to a file to be downloaded
    :param collector_type:
    :param collector_subtype:
    :param file_name:

    :return: return code (0 for success, 1 for failure)
    """
    target_directory = os.path.join('data', 'output', collector_type, collector_subtype)
    target_file = os.path.join(target_directory, file_name)

    data = requests.get(url)

    with open(target_file, 'wb') as f:
        f.write(data.content)
        print('data written to file\n')
        status_code = 0

    if file_name.endswith('zip'):
        with zipfile.ZipFile(target_file, 'r') as zip_ref:
            try:
                zip_ref.extractall(target_file[:-4])
                print('file unzipped\n')
            except:
                print('error in unzipping file')
                status_code = 1
                return status_code
        # removes the original zip file to save space
        os.remove(target_file)
        # creates a placeholder file with the same name in order to use file_exists and filefile_last_updated functions
        with open(target_file, 'w'):
            pass

    return status_code


def file_extractor(file_path: str) -> int:
    """
    Extracts files from a compressed file format, then deletes the original compressed file.

    :return: return code (0 for success, 1 for failure)
    """
    pass


def file_writer(f: io, dataframe: pd.DataFrame) -> int:
    """
    Takes in a file object and a pandas DataFrame, and writes the DataFrame to the file object
    location.

    Note: the DataFrame MUST be fully prepared for the file write, since this function does
    no processing on the DataFrame.

    :param f: a file object
    :param dataframe: a pandas DataFrame
    :return: status code - 0 if successful, 1 if failed
    """
    dataframe.to_csv(f, sep='\t')

    status_code = 0
    return status_code

# 1.1 Geographic Data: Basemaps
This consists of international, supranational, national, and province-level boundaries, as
well as major cities. Data is meant to be used as base layers for other geospatial products.

Data Source: The World Bank
Dataset Name: World Bank Official Boundaries
Update Frequency: yearly (365 days)
Collector Type: Geographic Data
Collector Sub-Type: Basemaps

In [29]:
def geographic_data__basemaps() -> int:
    update_frequency = 365
    update_required = False

    type = 'geographic_data'
    subtype = 'basemaps'
    file_name = 'wb_boundaries_geojson_highres.zip'

    saved_file_exists = file_exists(type, subtype, file_name)

    # calls the file openers from the core functions
    if saved_file_exists:
        global_basemap_file = file_opener(type, subtype, file_name)
    else:
        pass

    source_urls = {'World Boundaries GeoJSON - Very High Resolution':
                       'https://development-data-hub-s3-public.s3.amazonaws.com/ddhfiles/779551/wb_boundaries_geojson_highres.zip'}

    # checks to see if data needs to be updated in this run
    if saved_file_exists is False:
        print('no file exists')
        for name, url in source_urls.items():
            file_downloader(url, type, subtype, file_name)
    elif file_last_updated(global_basemap_file, update_frequency) or update_required:
        print('file exists, updating...')
        for name, url in source_urls:
            file_downloader(url, type, subtype, file_name)
    else:
        print('update deferred')
        # calls the file closer form the core functions
        file_closer(global_basemap_file)
        return 0

geographic_data__basemaps()

directory and file exist: file open for append
file last updated:		2020-09-02 17:37:59.743737
current datetime:		2020-09-02 17:38:10.050210
next update required on:	2021-09-02 17:37:59.743737
	in 364 days, 23:59:49.693527 and counting...
update deferred
file IO has terminated.


0