# Create dataset

In [1]:
from io import BytesIO
from zipfile import ZipFile
from urllib.request import urlopen
import pandas as pd
import re
import os
import glob
from collections import defaultdict
import numpy as np
from workalendar.registry import registry
import datetime

## Read data

In [1]:
path = r"data\private-data\raw_stored\reports"

In [3]:
# list all .fiscal files
dir_list = [file for file in os.listdir(path) if file.endswith(".fiscal")]
dir_list[0:4]

['20191113.1.fiscal',
 '20191114.1.fiscal',
 '20191115.1.fiscal',
 '20191116.1.fiscal']

In [4]:
def read_flat(path):
    with open(path, mode="r") as f:
        _list = []
        # read all the lines of a file in a list
        for line in f.readlines():
            # removing white spaces
            _list.append(line.rstrip())
    # return list without empty items
    return list(filter(None, _list))


def filter_list(_list):
    """Return only items before string '###'.
    This assures those only items the first occurrence of all items is considered.
    """
    hashtags = [re.findall("\#{2,}", item) for item in _list]
    hashtags = list(filter(None, hashtags))
    # identify more than one line with a string that contains '###'
    if len(hashtags) > 1:
        hashtag_idx = [i for i, item in enumerate(_list) if re.search(r"\#{2,}", item)]
        # return sliced list
        return _list[: hashtag_idx[0]]
    else:
        # slice first item that is not present
        return _list


def clean_file(_list):

    # delete some strings string
    _list = [x for x in _list if x != "PORTERHOUSE"]
    # insert ID and Date
    id_str = "Id: " + _list[1].split()[4]

    date_str = "Date: " + _list[1].split()[-1]

    _list.insert(0, id_str)
    _list.insert(1, date_str)

    # delete more than one repeated character
    _list = [re.sub(r"\.{2,}|\={2,}|\#{2,}|\-{2,}", "", item) for item in _list]
    # Remove additional white spaces
    _list = [re.sub("[\s]+", "", item).strip().replace("EUR", "") for item in _list]
    # delete empty items
    _list = list(filter(None, _list))
    # split items with ':'
    _list = [item.split(":") for item in _list]

    return _list


def listToDict(lst):
    return {lst[i]: lst[i + 1] for i in range(0, len(lst), 2)}

In [5]:
# File sample
file_path = os.path.join(path, dir_list[0])
read_flat(file_path)

['Batch-Auswertung',
 'Fiskal & Z-Bericht Nr. 1 |Mittwoch 13.11.2019',
 'Gedruckt am Donnerstag 14.11.2019 um 05:00:06',
 'Abrechnungen aller Bediener',
 'Abgerechnete Tische......:         67',
 'Stornos..................:         10',
 'Stornobetrag.............:      31.90 EUR',
 'Im Umsatz enthalten',
 'BARZAHLUNG               :    2443.20 EUR',
 'EC-KARTE                 :    1760.90 EUR',
 '                           --------------',
 'Zahlg.im Umsatz..........:    4204.10 EUR',
 'Summe aller Zahlungen....:    4204.10 EUR',
 'Tip......................:     -77.80 EUR',
 '                           --------------',
 'Gesamtumsatz.............:    4126.30 EUR',
 'Bestellumsätze',
 'Summe Im Haus............:    4126.30 EUR',
 'Summe Außer Haus.........:       0.00 EUR',
 'Nicht im Umsatz..........:       0.00 EUR',
 '                           --------------',
 'Gesamt Bestellung Brutto.:    4126.30 EUR',
 'MWST  19.00 %............:     658.82 EUR',
 'Netto 19.00 %............:  

Variables selected:
- `Abgerechnete Tische`: Total number of tables
- `Gesamt Bestellung Brutto.`: Gross sales
- `Gesamt Bestellung Netto`: Net sales

In [6]:
# selected features
var_list = ["AbgerechneteTische", "GesamtBestellungBrutto.", "GesamtBestellungNetto"]


def return_df_dict(var_list):
    # Append generated values from the function clean_file()
    var_list.append("Id")
    var_list.append("Date")
    # create an empty list
    df_array = []

    # apply and append all files
    for file in dir_list:
        file_path = os.path.join(path, file)
        _temp = read_flat(file_path)
        _temp = filter_list(_temp)
        _temp = clean_file(_temp)
        _temp = [x for x in _temp if set(x).intersection(var_list)]
        _temp = [listToDict(list) for list in _temp]
        result = {}
        for d in _temp:
            result.update(d)
        for key, val in result.items():
            if key in d:
                d[key] = [d[key], val]
                df_array.append(result)

    # merge dicts
    df_all = defaultdict(list)

    for d in df_array:
        for k, v in d.items():
            df_all[k].append(v)
    return df_all


def generate_pandas_df(df_dict, initial_date, final_date):

    _dates = pd.DataFrame(pd.date_range(initial_date, final_date), columns=["Date"])
    _df = pd.DataFrame(df_dict)
    _df["Date"] = pd.to_datetime(_df["Date"], format="%d.%m.%Y")
    # rename columns
    _df = _df.rename(
        columns={
            "GesamtBestellungBrutto.": "gross_sales",
            "GesamtBestellungNetto": "net_sales",
            "AbgerechneteTische": "n_tables",
        }
    )
    ## from the all_dates DataFrame, left join onto the DataFrame with missing dates
    return _dates.merge(right=_df, how="left", on="Date")

In [7]:
df_dict = return_df_dict(var_list)

initial_date = df_dict["Date"][0].replace(".", "-")
final_date = df_dict["Date"][-1].replace(".", "-")

df_raw = generate_pandas_df(df_dict, initial_date, final_date)

  _dates = pd.DataFrame(pd.date_range(initial_date, final_date), columns=['Date'])


## Weather data

In [8]:
def get_weather_data(url):
    resp = urlopen(url)
    myzip = ZipFile(BytesIO(resp.read()))
    files = myzip.infolist()
    _list = []
    for file in files:
        if file.filename.startswith("produkt_"):
            for line in myzip.open(file).readlines():
                _list.append(line.decode("utf-8").replace(" ", "").strip().split(";"))

    _df = pd.DataFrame(_list[1:], columns=_list[0])
    _df = _df[["MESS_DATUM", "NM", "RSK", "PM", "UPM", "SDK", "TMK", "FM"]]
    _df = _df.rename(
        columns={
            "MESS_DATUM": "Date",
            "NM": "cloud_cover",
            "RSK": "precipitation",
            "PM": "pressure_msl",
            "UPM": "relative_humidity",
            "SDK": "sunshine",
            "TMK": "temperature",
            "FM": "wind_speed",
        }
    )
    _df["Date"] = pd.to_datetime(_df["Date"], format="%Y%m%d")
    return _df


weather_old = get_weather_data(
    "https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/daily/kl/historical/tageswerte_KL_01975_19360101_20211231_hist.zip"
)
weather_recent = get_weather_data(
    "https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/daily/kl/recent/tageswerte_KL_01975_akt.zip"
)

weather_data = pd.concat(
    [
        # to avoid duplicates: recent data begin on this date
        weather_old.query('Date < "2021-04-10"'),
        weather_recent,
    ],
    axis=0,
    ignore_index=True,
)

weather_data = weather_data.set_index("Date").loc[initial_date:final_date]

  weather_data = weather_data.set_index('Date').loc[initial_date:final_date]


| Name              | id  | description                     | unit      |
| ----------------- | --- | ------------------------------- | --------- |
| Cloud cover       | NM  | daily mean of cloud cover       | 1/8       |
| Precipitation     | RSK | daily precipitation height      | mm        |
| Pressure          | PM  | daily mean of pressure          | hPa       |
| Relative humidity | UPM | daily mean of relative humidity | %         |
| Sunshine          | SDK | daily sunshine duration         | h->min    |
| Temperature       | TMK | daily mean of temperature       | °C        |
| Wind speed        | FM  | daily mean of wind speed        | m/s->km/h |

[Data Source](https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/daily/kl/historical/DESCRIPTION_obsgermany_climate_daily_kl_historical_en.pdf)

[Forecast API](https://brightsky.dev/docs/#get-/current_weather)


## Holidays

In [9]:
CalendarClass = registry.get("DE-SH")
de_sh_holidays = CalendarClass()
de_sh_holidays.holidays()

[(datetime.date(2022, 1, 1), 'New year'),
 (datetime.date(2022, 4, 15), 'Good Friday'),
 (datetime.date(2022, 4, 18), 'Easter Monday'),
 (datetime.date(2022, 5, 1), 'Labour Day'),
 (datetime.date(2022, 5, 26), 'Ascension Thursday'),
 (datetime.date(2022, 6, 6), 'Whit Monday'),
 (datetime.date(2022, 10, 3), 'Day of German Unity'),
 (datetime.date(2022, 10, 31), 'Reformation Day'),
 (datetime.date(2022, 12, 25), 'Christmas Day'),
 (datetime.date(2022, 12, 26), 'Second Christmas Day')]

In [38]:
def get_ydt_range(df: pd.DataFrame, date_column: str):
    init_dt = df[date_column][0].date()
    last_dt = df[date_column].iloc[-1].date()
    return init_dt, last_dt


def get_de_holidays(date_init: datetime.date, date_end: datetime.date):
    CalendarClass = registry.get("DE-SH")
    de_sh_holidays = CalendarClass()
    _list = []
    year_init = date_init.year
    year_end = date_end.year
    for year in range(year_init, year_end + 1):
        temp = de_sh_holidays.holidays(year)
        _list.extend(temp)
        _dict = dict((x, y) for x, y in _list)
        _series = pd.Series(_dict, name="holiday").to_frame()
        mask = (_series.index < date_init) | (_series.index > date_end)
        _series = _series[~mask]
        # _series['Days since the last holiday'] = _series['Date'].sub(_series.pop('date1_y')).dt.days
        _series.index.names = ["Date"]
    return _series


init_dt, last_dt = get_ydt_range(df_raw, "Date")
df_holidays = get_de_holidays(init_dt, last_dt)
df_holidays

Unnamed: 0_level_0,holiday
Date,Unnamed: 1_level_1
2019-12-25,Christmas Day
2019-12-26,Second Christmas Day
2020-01-01,New year
2020-04-10,Good Friday
2020-04-13,Easter Monday
2020-05-01,Labour Day
2020-05-21,Ascension Thursday
2020-06-01,Whit Monday
2020-10-03,Day of German Unity
2020-10-31,Reformation Day


## Merge datasets

In [40]:
def combine_dfs(df_raw):
    # set date columns as index
    df_temp = df_raw.set_index("Date").copy()
    # all columns name to lower case
    df_temp.columns = df_temp.columns.str.lower()

    # merge all dfs
    df_all = df_temp.join(df_holidays, how="left").join(weather_data, how="left")

    df_all.holiday = df_all.holiday.fillna("None").astype("category")
    # select all object columns
    cols = df_all.columns[df_all.dtypes.eq("object")]

    # convert all to numeric
    df_all[cols] = df_all[cols].apply(pd.to_numeric, errors="coerce")

    return df_all


df_combined = combine_dfs(df_raw)
df_combined.holiday.value_counts()

None                    979
Ascension Thursday        3
Christmas Day             3
Easter Monday             3
Good Friday               3
Labour Day                3
New year                  3
Second Christmas Day      3
Whit Monday               3
Day of German Unity       2
Reformation Day           2
Name: holiday, dtype: int64

In [41]:
# df_combined['2db_holiday'] = df_combined.index.isin(df_holidays.index-pd.DateOffset(2)).astype('int8')
# df_combined['2db_holiday'].value_counts()

## Add temporal features

In [44]:
def get_dt(df_input):
    df = df_input.copy(deep=True)
    df["day"] = df.index.day.values
    df["month"] = df.index.month.values
    df["year"] = df.index.year.values
    df["week"] = df.index.isocalendar().week.astype("int16")
    df["dow"] = df.index.dayofweek.values
    df["month_end"] = df.index.is_month_end.astype("int8")
    df["weekend"] = (df.index.weekday.values >= 5).astype("int8")

    #
    days = {
        0: "monday",
        1: "tuesday",
        2: "wednesday",
        3: "thursday",
        4: "friday",
        5: "saturday",
        6: "sunday",
    }
    df["dow"] = df.dow.map(days).astype("category")
    # Seasons
    # dictionary for the future replacement of months with seasons
    ds = {
        12: "winter",
        1: "winter",
        2: "winter",
        3: "spring",
        4: "spring",
        5: "spring",
        6: "summer",
        7: "summer",
        8: "summer",
        9: "autumn",
        10: "autumn",
        11: "autumn",
    }

    df["season"] = df["month"].replace(ds).astype("category")

    return df


df_combined = get_dt(df_combined)

## Export data

In [43]:
def filter_df(df):
    df = df.replace(-999, 0)

    for col in ["gross_sales", "net_sales"]:
        df[col] = np.where(df[col] <= 10, np.nan, df[col])

    for col in ["id", "n_tables"]:
        df[col] = np.where(pd.isnull(df["net_sales"]), np.nan, df[col])

    df["missing"] = np.where(pd.isnull(df["net_sales"]), 1, 0)

    df["holiday"] = np.where(df.holiday == "None", 0, 1)
    return df


df = filter_df(df_combined)
df.to_parquet(
    r"C:\Users\Eric\Documents\restaurant-sales-prediction\data\sales_data.parquet",
    engine="pyarrow",
)