# Test-Driven Data Pre-processing

Before you start, make sure that you are familiar with the basic usage of Jupyter Notebook. If not, please finish the Jupyter Notebook primer first.

We will demonstrate how to solve complicated problems by splitting a problem into several testable steps with unit tests, together with interactive programming provided by Jupyter Notebook.

After you finish the development and pass the tests, you can convert the notebook into an executable Python script `data_filter.py`. You can then invoke the Python script in `runner.sh` to make submissions.

`jupyter nbconvert data_filter.ipynb --to python --TagRemovePreprocessor.remove_input_tags='{"excluded_from_script"}'`

`--TagRemovePreprocessor.remove_cell_tags='{"excluded_from_script"}'`: exclude the cells tagged with "excluded_from_script" during the conversion. Cells with unit test cases are tagged with `excluded_from_script`. You can select `View - Cell Toolbar - Tags` to view and edit tags.

Further reading:

https://www.blog.pythonlibrary.org/2016/07/07/python-3-testing-an-intro-to-unittest/

http://www.diveintopython3.net/unit-testing.html

In [None]:
import unittest
import sys

"""
Implement data filter with test-driven development.
A valid line in the pageview files has 4 space-separated fields: 
domain_code page_title count_views total_response_size 

Transform and filter the dataset by the following rules:
* Exclude lines that don't have four columns
* Exclude lines if the domain code is not exactly "en" or "en.m" (case sensitive) "en" indicates the article is an English desktop page, and "en.m" is for English mobile page
* The title might be percent-encoded by Wikipedia, use the provided `decode(str)` method to decode the title for each record e.g. "Special%3ASearch" will be decoded into "Special:Search"
* Exclude lines if the title starts with any prefix defined in PREFIX_BLACKLIST (case insensitive)
* Exclude lines if the title ends with any suffix defined in SUFFIX_BLACKLIST (case insensitive)
* Exclude lines if the title starts with any lowercase English character
* Exclude lines if the title is exactly any of the special page defined in the provided list SPECIAL_PAGES (case sensitive)

We provide you with the starting template, as well as the code of constants, Standard I/O, summing the desktop and mobile site pageviews, and sorting the output.

Your task is to implement the methods with "To be implemented".

Execute the cells with `unittest.main()` to run the unit tests
"""

DOMAIN = 0
TITLE = 1
ACCESS = 2
CLEAN_DATA_LENGTH = 4
PREFIX_BLACKLIST = ["media:",
            "special:",
            "talk:",
            "user:",
            "user_talk:",
            "wikipedia:",
            "wikipedia_talk:",
            "file:",
            "file_talk:",
            "mediawiki:",
            "mediawiki_talk:",
            "template:",
            "template_talk:",
            "help:",
            "help_talk:",
            "category:",
            "category_talk:",
            "portal:",
            "portal_talk:",
            "book:",
            "book_talk:",
            "draft:",
            "draft_talk:",
            "education_program:",
            "education_program_talk:",
            "timedtext:",
            "timedtext_talk:",
            "module:",
            "module_talk:",
            "gadget:",
            "gadget_talk:",
            "gadget_definition:",
            "gadget_definition_talk:"]
SUFFIX_BLACKLIST = [".png", ".gif",
            ".jpg", ".jpeg",
            ".tiff", ".tif",
            ".xcf", ".mid",
            ".ogg", ".ogv",
            ".svg", ".djvu",
            ".oga", ".flac",
            ".opus", ".wav",
            ".webm", ".ico", ".txt",
            "_(disambiguation)"]
SPECIAL_PAGES = ["Main_Page", "404.php", "-"]


'''
Decoder for percent encoded strings

In contrast to URLDecoder, this decoder keeps percent signs that are not
followed by hexadecimal digits, and does not convert plus-signs to spaces.

You can put this snippet of code into your filter script.
'''

def decode(encoded):
    def getHexValue(b):
        if '0' <= b <= '9':
            return chr(ord(b) - 0x30)
        elif 'A' <= b <= 'F':
            return chr(ord(b) - 0x37)
        elif 'a' <= b <= 'f':
            return chr(ord(b) - 0x57)
        return None

    if encoded is None:
        return None
    encodedChars = encoded
    encodedLength = len(encodedChars)
    decodedChars = ''
    encodedIdx = 0
    while encodedIdx < encodedLength:
        if encodedChars[encodedIdx] == '%' and encodedIdx + 2 < encodedLength and getHexValue(encodedChars[encodedIdx + 1]) and getHexValue(encodedChars[encodedIdx + 2]):
            #  current character is % char
            value1 = getHexValue(encodedChars[encodedIdx + 1])
            value2 = getHexValue(encodedChars[encodedIdx + 2])
            decodedChars += chr((ord(value1) << 4) + ord(value2))
            encodedIdx += 2
        else:
            decodedChars += encodedChars[encodedIdx]
        encodedIdx += 1
    return str(decodedChars)


## Divide and Conquer

In [None]:
def get_columns(str):
    """
    Perform percent-decoding and split the record into columns, separated by single or consecutive whitespaces.
    
    We pre-implemented this method for you to help you follow and learn how to perform test-driven development.
    
    :param str: the pageview recor
    :return: cols as a str list
    """
    return decode(str).split()

In [None]:
class TestGetColumns(unittest.TestCase):
    """
    Run this cell to test the method in the previous cell.
    This cell is tagged with "excluded_from_script", and will be excluded when you convert the notebook to a Python script.
    
    To pass the test cases, running the cell must return "OK" instead of "FAILED" in the end.
    """
    def test_equal(self):
        self.assertEqual(get_columns("en User:K6ka 34 0"), 
                          ["en", "User:K6ka", "34", "0"])
        self.assertEqual(get_columns("en User%3AK6ka 34 0"), 
                          ["en", "User:K6ka", "34", "0"])
        self.assertEqual(get_columns("en User%3AK6ka 34 0"), 
                         get_columns("en User%3aK6ka 34 0"))

    def test_not_equal(self):
        self.assertNotEqual(get_columns("en User%3AK6ka 34 0"), 
                          ["en", "User%3AK6ka", "34", "0"])
        
unittest.main(argv=['ignored', '-v', 'TestGetColumns'], exit=False)

In [None]:
def check_data_length(cols):
    """
    Check if length == 4
    
    :param cols: record as columns
    :return: True if length == 4
    """
    raise NotImplementedError("To be implemented")

In [None]:
class TestCheckDataLength(unittest.TestCase):
    def test_return_true(self):
        self.assertTrue(check_data_length(get_columns("en Carnegie_Mellon_University 34 0")))

    def test_return_false(self):
        self.assertFalse(check_data_length(get_columns("en 34 0")))
        self.assertFalse(check_data_length(get_columns("en Carnegie_Mellon_University 34 34 0")))
        self.assertFalse(check_data_length(get_columns("en Carnegie_Mellon_University%2034 34 0")))
        
unittest.main(argv=['ignored', '-v', 'TestCheckDataLength'], exit=False)

In [None]:
def check_domain(cols):
    """
    Check if the domain code is en or en.m (case sensitive).
    
    :param cols: record as columns
    :return: True if the domain code is en or en.m
    """
    raise NotImplementedError("To be implemented")

In [None]:
class TestCheckDomain(unittest.TestCase):
    def test_return_true(self):
        self.assertTrue(check_domain(get_columns("en Carnegie_Mellon_University 34 0")))
        self.assertTrue(check_domain(get_columns("en.m Carnegie_Mellon_University 34 0")))

    def test_return_false(self):
        self.assertFalse(check_domain(get_columns("fr Carnegie_Mellon_University 34 0")))
        self.assertFalse(check_domain(get_columns("EN.M Carnegie_Mellon_University 34 0")))
        
unittest.main(argv=['ignored', '-v', 'TestCheckDomain'], exit=False)

In [None]:
def check_special_page(cols):
    """
    Check if it is a special page, case sensitive.
    
    :param cols: record as columns
    :return: False if it is a special page
    """
    raise NotImplementedError("To be implemented")

In [None]:
class TestSpecialPage(unittest.TestCase):
    def test_return_true(self):
        self.assertTrue(check_special_page(get_columns("en Carnegie_Mellon_University 34 0")))

    def test_return_false(self):
        self.assertFalse(check_special_page(get_columns("en Main_Page 34 0")))
        self.assertFalse(check_special_page(get_columns("en - 34 0")))
        self.assertFalse(check_special_page(get_columns("en %2D 34 0")))
        
unittest.main(argv=['ignored', '-v', 'TestSpecialPage'], exit=False)

In [None]:
def check_prefix(cols):
    """
    Check if the title starts with any blacklisted prefix, case insensitive.
    
    Any occurrences of `%3a` should have been decoded into `:`
    
    :param cols: record as columns
    :return: False if the title starts with any blacklisted prefix
    """
    raise NotImplementedError("To be implemented")

In [None]:
class TestCheckPrefix(unittest.TestCase):
    def test_return_true(self):
        self.assertTrue(check_prefix(get_columns("en Carnegie_Mellon_University 34 0")))

    def test_return_false(self):
        self.assertFalse(check_prefix(get_columns("en User:K6ka 34 0")))
        self.assertFalse(check_prefix(get_columns("en User%3AK6ka 34 0")))
        self.assertFalse(check_prefix(get_columns("en User%3aK6ka 34 0")))
        
unittest.main(argv=['ignored', '-v', 'TestCheckPrefix'], exit=False)

In [None]:
def check_suffix(cols):
    """
    Check if the title ends with any blacklisted suffix, case insensitive. 
    
    Any occurrences of `%3a` should have been decoded into `:`
    
    :param cols: record as columns
    :return: False if the title ends with any blacklisted suffix
    """
    raise NotImplementedError("To be implemented")

In [None]:
class TestCheckSuffix(unittest.TestCase):
    def test_return_true(self):
        raise NotImplementedError("To be implemented")

    def test_return_false(self):
        raise NotImplementedError("To be implemented")
        
unittest.main(argv=['ignored', '-v', 'TestCheckSuffix'], exit=False)

In [None]:
def check_first_letter(cols):
    """
    Check if the first letter is English lowercase letter. 
    
    Many other Unicode characters are lowercase too. Only [a-z] should count. 
   
    Hint: Be careful and read the doc if you want to use str.islower()
    
    :param cols: record as columns
    :return: False if the title starts with [a-z]
    """
    raise NotImplementedError("To be implemented")

In [None]:
class TestFirstLetter(unittest.TestCase):
    def test_return_true(self):
        raise NotImplementedError("To be implemented")

    def test_return_false(self):
        raise NotImplementedError("To be implemented")
        
unittest.main(argv=['ignored', '-v', 'TestFirstLetter'], exit=False)

## Put them together

In [None]:
def check_all_rules(cols):
    """
    Check if the record passes all the rules.
    
    You do not need to modify this method. 
    Instead, you should divide and conquer the complicated filtering task by implementing the methods above:
    
    :param cols: record as columns
    :return: True if the record passes all the rules
    """
    return check_data_length(cols) and check_domain(cols) and check_special_page(cols) \
        and check_prefix(cols) and check_suffix(cols) and check_first_letter(cols)

In [None]:
class TestAllRules(unittest.TestCase):
    def test_return_true(self):
        raise NotImplementedError("To be implemented")

    def test_return_false(self):
        raise NotImplementedError("To be implemented")
        
unittest.main(argv=['ignored', '-v', 'TestAllRules'], exit=False)

## Filter the hourly dataset

In [None]:
def main():
    """
    Data pre-processing

    The main method reads from System.in and writes to `output`. 
    
    I/O must be encoding-aware instead of relying on the system default encoding. 
    The code below are encoding-naive.
    To make the program encoding-aware, you should explictly set the encoding when running the Python program.
    Explore the usage of PYTHONIOENCODING, e.g. `PYTHONIOENCODING=<charset> python3 data_filter.py`.
    """
    records = {}
    for line in sys.stdin:
        columns = get_columns(line)
        if check_all_rules(columns):
            # If there are records from both desktop and mobile sites for the same page title,
            # sum the accesses into one record.
            if columns[TITLE] in records:
                records[columns[TITLE]] += int(columns[ACCESS])
            else:
                records[columns[TITLE]] = int(columns[ACCESS])
    # Sort the map in descending numerical order of the values, 
    # break ties by ascending lexicographical order of keys
    sorted_records = sorted(records.items(), key=lambda r: (-r[1], r[0]))
    with open('output', 'wt') as f:
        for (key, value) in sorted_records:
            f.write(key + '\t' + str(value) + '\n')

if __name__ == "__main__":
    main()

## Submit

Now you can convert the notebook into an executable Python script data_filter.py. You can then invoke the Python script in runner.sh to make submissions.

`jupyter nbconvert data_filter.ipynb --to python --TagRemovePreprocessor.remove_input_tags='{"excluded_from_script"}'`