In [None]:
"""
Notebook to purse Testaankoop promotions.
"""
import os, re, enum, logging, decimal
import pandas as pd
import psycopg2 as pg
import numpy as np

logger = logging.getLogger('parser')

# TODO fill in your connection info.
username = 'postgres'
password = 'postgres'
# TODO you should set this value to 5432!
port = '55432'

uri = 'postgres://{user}:{pw}@localhost:{port}/daltix'.format(user=username, pw=password, port=port)

# Helper functions

Don't worry too much about the helper functions below, they might look scary but you don't need to understand them at all.

In [None]:
# Helper functions
def count_trues(row):
    count = 0
    for t in row:
        if t:
            count +=1
    return count

#For igonoring several white spaces 
def multiply_whitespace(regex: str) -> str:
    """
    Multiply whitespace in the given regex; replace any whitespace character with any sequence of whitespace characters.
    :param regex:
    :return:
    """
    return re.sub(r'\s+', '\s*', regex)

def merge_into_array(data: pd.DataFrame, cols: list=None, drop_duplicates: bool=False) -> pd.Series:
    """
    Merge the given columns into a single column with as value an array containing all not-null elements of the
    other columns. Does not alter data, but returns a new Series object.
    Original items that were lists, will just be concatenated into longer lists.
    :param data:
    :param cols: The columns which to merge or concatenate into arrays. If None: concatenates all columns.
    :param drop_duplicates: Whether to drop duplicate values from the resulting arrays.
    :return: A new pd.Series object.
    """
    # TODO: check status of this in Pandas 0.20.2
    """
    Yet another case of Pandas' unfathomable inventiveness.
    The easy version was:
    data.apply(lambda x: [x[col] for col in cols if x[col] is not None], axis=1)
    Or, in words, for each row return a list of all not-null elements and put that list in a single cell.
    Easy, right? Well, turns out you can't do that if you have a timestamp as an index somewhere? What? Yeah.
    Was on pandas 0.19.2.
    """
    # Some preliminary checks: don't do anything if there is nothing to do...
    if data.empty:
        return pd.Series()
    if cols is None:
        cols = data.columns

    original_index = data.index.names
    indexless = pd.DataFrame(data.reset_index()[cols].copy())

    def merge_or_concat(x):
        try:
            ret_list = []
            for col in cols:
                if isinstance(x[col], list):
                    if drop_duplicates:
                        ret_list += [y for y in x[col] if y not in ret_list and not pd.isnull(y)]
                    else:
                        ret_list += [y for y in x[col] if not pd.isnull(y)]
                elif pd.isnull(x[col]):
                    continue
                else:
                    if drop_duplicates:
                        if x[col] not in ret_list:
                            ret_list.append(x[col])
                    else:
                        ret_list.append(x[col])
            return tuple(ret_list)
        except Exception as e:
            raise e

    listed = indexless.apply(merge_or_concat, axis=1)
    tmp = data.reset_index().assign(listed=listed).set_index(original_index)
    return tmp.listed.apply(lambda x: list(x) if len(x) > 0 else None)

def concat_df_duplicate_cols(df1: pd.DataFrame, df2: pd.DataFrame, strategy: str='array', copy: bool=False) -> pd.DataFrame:
    """
    Concatenate two dataframes columnwise, taking care of duplicate columns.
    :param df1: First dataframe.
    :param df2: Second dataframe.
    :param strategy: Indicates the strategy. Allowed values are: 'array', 'exclusive', 'json_array'.
    If set to 'array', will merge all not-null values into a list. If set to 'exclusive', will fail if there are
    multiple not-null values per row. If set to 'json_array', will merge duplicate values into a JSON array string, but
    *only if there is more than 1 result*, otherwise the single result will be returned. Duplicates will be dropped.
    :return: The first dataframe, with the second concatenated to it.
    """
    @enum.unique
    class HelpEnum(enum.Enum):
        ARRAY = 0
        EXCLUSIVE = 1
        JSON_ARRAY = 2

    # Sanity check
    if strategy.lower() == 'array':
        strategy = HelpEnum.ARRAY
    elif strategy.lower() == 'exclusive':
        strategy = HelpEnum.EXCLUSIVE
    elif strategy.lower() == 'json_array':
        strategy = HelpEnum.JSON_ARRAY
    else:
        raise ValueError('Strategy must be either "array",  "exclusive" or "json_array".')

    # Be safe!
    if copy:
        df1 = df1.copy()
        df2 = df2.copy()
    # Check if there's work to be done
    overlap_cols = df1.columns.intersection(df2.columns)
    if overlap_cols.empty:
        # Easy peasy!
        df1[df2.columns] = df2
    else:
        # There are overlapping columns.
        # First take care of the ones without overlap
        no_overlap = df2.columns.difference(overlap_cols)
        df1[no_overlap] = df2[no_overlap]
        # Then with overlap
        for col in overlap_cols:
            if strategy is HelpEnum.ARRAY:
                df1[col] = merge_into_array(
                    pd.DataFrame({'old': df1[col], 'new': df2[col]}),
                    drop_duplicates=True)
            elif strategy is HelpEnum.JSON_ARRAY:
                df1[col] = merge_into_json_array(pd.DataFrame({'old': df1[col], 'new': df2[col]}), unwrap=True,
                                                 drop_duplicates=True)
            else:
                # We only accept merging if there are no values together
                valid = (df2[col].isnull() | df1[col].isnull()) | (df2[col] == df1[col])
                if not valid.all():
                    raise ValueError(
                        "Merging columns failed for column {col}. Appears twice, with conflicting data.\n"
                        "Data:{df1}\n"
                        "{df2}"
                            .format(col=col,
                                    df1=df1[~valid].reset_index()[["shop", "location", "downloaded_on", "product_id", col]],
                                    df2=df2[~valid].reset_index()[["shop", "location", "downloaded_on", "product_id", col]]),
                        col
                    )
                else:
                    """
                    Fuck pandas.
                    If you don't do this mask inplace, it fails for timezoned timestamps. Somehow...
                    Don't even ask about what's going on here. I wouldn't know.
                    """
                    try:
                        cop = df2[col].copy()
                        cop.mask(cop.isnull(), df1[col], inplace=True)
                        df1[col] = cop
                    except ValueError:
                        df1[col] = df2[col].mask(df2[col].isnull(), df1[col])
    return df1

def apply_regexes(data: pd.DataFrame, regexes: list, fail_limit: int=10, collate: bool=True):
    """
    Apply regexes to the data, combining the extractions into a single DataFrame.
    Use capturing groups with predefined names (see docs of create_promo).
    Performs automatic price collation: The capturing groups 'xxx_int' and 'xxx_frac' will be combined into 'xxx' and
    the original columns will be dropped, unless collate is set to False.
    :param data: The data.
    :param regexes: The regexes to apply.
    :param fail_limit: The number of failed strings to print out. 0 means all failed lines will get printed.
    Default is 10 because when Sentry is enabled we cannot log too many lines.
    :param collate: Whether or not to automatically collate prices.
    :return:
    """
    if len(regexes) == 0:
        raise ValueError('Pass a list of regexes please!')
        
    extraction = pd.DataFrame()
    if isinstance(data, pd.Series):
        data = pd.DataFrame({'promos': data})
    all_matches = data.isnull()
    for regex in regexes:
        regex = multiply_whitespace(regex=regex)
        captures = (re.compile(regex).groups > 0)
        for c in data.columns:
            matches = data[c].str.match(regex).mask(data[c].isnull(), True).astype(bool)
            all_matches[c] = all_matches[c] | matches
            if captures:
                e = data[c].str.extract(regex, expand=True)
                try:
                    extraction = concat_df_duplicate_cols(extraction, e, 'exclusive')
                except ValueError as e:
                    logger.exception(e.args[0])
                    raise e
    if not all_matches.all().all():
        failed = data.stack()[~(all_matches.stack())].drop_duplicates().tolist()
        if fail_limit != 0:
            lines = failed[0:min(fail_limit, len(failed))]
        else:
            lines = failed

        # Print as one line so that we have the faulty strings as is (easy to copy in regex 101).
        logger.error('Not everything matches, listing ({}) unique failed lines:\n{}'
                     .format(len(lines), '\n'.join(lines)))

        unmatched = data.mask(all_matches, None)

        def format_failures(b: pd.Series):
            l = [c for c in b if c is not None]
            if l:
                return "Could not match all strings. Unmatched:\n{}".format('\n'.join(l))
            else:
                return None
        extraction["errors"] = unmatched.apply(format_failures, axis=1)

    # Add the original promo strings
    extraction['strings'] = data.promos
    return extraction

# Load the data
Load the promotion strings into a Dataframe. We only consider the first entry in the promo strings list as that is enough for this exercise.

In [None]:
query = """
SELECT shop, promo_string 
FROM promo_strings
"""

with pg.connect(uri) as connection:
    promotions = pd.read_sql(query, connection)

promotions.head()

In [None]:
# TODO add fitting regex for all the promo strings.
regexes = [
]
extraction = apply_regexes(promotions['promo_string'], regexes)

# Have a look at your output.
extraction.head()
