In [None]:
%config InlineBackend.figure_formats = ['svg']

import re
import itertools
import typing
from pathlib import Path
import shutil
import csv
import warnings
warnings.filterwarnings('ignore')

import IPython.display as ipd                               
from matplotlib import pyplot as plt
import seaborn as sns
import numpy as np
import pandas as pd
import openpyxl


# Konfiguration
sns.set_style("whitegrid")

# Pfade aufsetzen
output = Path(f"output")
output.mkdir(exist_ok=True)
input = Path('input')
if not input.exists() or next(input.iterdir(), None) is None:
    raise Exception(f"Expected input data in {input.name} not found")
workdir = Path("workdir")
if workdir.is_dir():
    shutil.rmtree(workdir)
workdir.mkdir(exist_ok=True)

# Standorte; Reihenfolge nach Priorität 
# d.h. Originale werden immer aus dem Standort
# genommen, der hier zuerst steht
locations = ['AMS', 'MZ', 'STG', 'BAD',  'KA', 'TUE', 'MA', 'FR']



In [None]:
# Konfiguration: CSV einlesen

na_values = [
    '0\'00000"',
    '',
    'UNBEKANNT',
    'Unbekannt',
    '-',
    'o. A',
    'o.A.',
    # BNR: Platzhalter-Werte
    '[]',
    '[-]', 
    '[unbekannt]',
    '[PROMO]',
    '[- PROMO]',
    '[[ PROMO]]',
    '[ PROMO]',
    '[Promo]',
    '[Promo-CD]',
    '[o. A.]',
    '[ohne Nummer]',
    '[ohne Nr.]',
    '[o. Nr.]'
    # RHTI: Leere Werte
    ' "',
    ' " ',
    # MIT: Leere Werte
    'Diverse',
    'Nicht genannt',
    'nicht genannt'
]

"""
Sonstige, überdurchschnittlich häufige Werte, ggf. auch als leer interpretieren?
    # RHTI: Allweltstitel 
    'Live',
    'Streichquartette',
    'Lieder',
    'Kammermusik',
    'Live USA',
    'Greatest hits', 
    'Greatest Hits',
    'Die großen Erfolge',
    # MIT: Gruppenbezeichnungen
    'Diverse',
    'Ensemble',
    'Orchester',
    'Original Cast'
"""


for value in list(na_values):
    na_values.append(value.lower())
    na_values.append(value.upper())

dtypes = {
         'BEST': 'string',
         'ANR': 'string',  # Archivnummern erhalten entgegen des Namens manchmal Zeichen
         'EAN': 'string',
         'PEAN': 'string',
         'LC': 'string',
         'LN': 'string',
         'BNR':'string',
         'TTS': 'string',
         'MIT': 'string',
         'MIT_TYP': 'string',
         'ABS_DAUER': 'string',
         'T-ISRC': 'string',
         'T-RHTI': 'string',
         'RHTI': 'string',
         'TRÄGER': 'Int64',
         'SEITE': 'Int64',
         'TAKE': 'Int64',
         'AMO_ID': 'string'
}

for dtype in list(dtypes.keys()):
    dtypes[f"Q_{dtype}"] = dtypes[dtype]

pd_args = dict(encoding='latin1',
               sep=';',
               na_values=na_values,
               low_memory=False,
               dtype=dtypes,
               usecols=lambda x: 'l_' not in x and 'unnamed' not in x.lower())

In [None]:
# Eingabedaten lesen und in besser händelbare Form 
# bringen, d.h. eine CSV per Standort 
# (wie von `location_data` erwartet)

input_dirs = [dir for dir in input.iterdir() if dir.is_dir()]

every_location_has_dir = all(dir.name in locations for dir in input_dirs)

if not every_location_has_dir:
    raise Exception(f"Data not as expected in {input}")

for dir in input_dirs:
    location = dir.name
    csv_files = [f for f in dir.glob('**/*') if f.is_file() and f.suffix == '.csv']
    src = None
    dst = f"{workdir/location}.csv"
    tmp_src_csv = []
    for csv_file in csv_files:
        tmp_df = pd.read_csv(csv_file, index_col=1, **pd_args)
        tmp_df['COVER_PDF'] = 'AMS' in csv_file.name and 'Mit_PDF' in csv_file.name
        tmp_df['COVER_KEIN_PDF'] = 'AMS'  in csv_file.name and 'Ohne_PDF'  in csv_file.name
        tmp_src_csv.append(tmp_df)
    con_src_csv = pd.concat(tmp_src_csv)
    con_src_csv.to_csv(dst, encoding='latin1', sep=';')
    src_datei = dst
    if src_datei == None:
        raise Exception(f"Couldn't collect data for {location}")
    print(f"Collected {location} -> {dst}")





In [None]:
# Daten einlesen

dfs = []
for location in locations:
    src = f"{workdir/location}.csv"
    tmp_df = pd.read_csv(src, index_col=0, **pd_args)
    tmp_df.index = tmp_df.index.astype(str)
    dfs.append(tmp_df)
data = pd.concat(dfs)

# Daten aufbereiten

# Bestand als Kategorie setzen 
data['BEST'] = data['BEST'].astype('category')
data['BEST'] = data['BEST'].cat.set_categories(locations, ordered=True)

# EAN/PEAN zusammenfassen
data['EAN/PEAN'] = data['EAN']
data['EAN/PEAN'].fillna(data['PEAN'], inplace=True)


# Labelcode
data['LC'].fillna(data['LN'], inplace=True)


# Titel finden
data['RHTI'] = ""
data['T-RHTI'] = data['T-TITEL']
## immer den RHTI mit höchster PRIO behalten
for prio in ["3", "2", "1"]:
    mask = data['TPRIO'] == prio 
    data.loc[mask, 'RHTI'] = data.loc[mask, 'TITEL']

# MIT und MIT-TYP sind in den Eingangsdaten verwechselt
data['MIT'] = data['MIT_TYP']


print(f"Total before deduplication: {len(data)}")
# Duplikat-ANRs entfernen (durch mehrere Titel, MIT etc. entstanden)
data = data[~data.index.duplicated(keep='first')]
print(f"Total after deduplication: {len(data)} \n")

# Irrelevante Spalten entfernen
cols_to_delete = ['TTS', 'DAUER', 'ABS_DAUER', 'TPRIO', 'TITEL', 
                  'T-TITEL', 'MIT_TYP', 'LN', 'PEAN', 'EAN',
                  'TTYP', 'KAT_GATT']
deleted_cols = []
for col_to_delete in cols_to_delete:
    del data[col_to_delete]
    deleted_cols.append(col_to_delete)
    q_col = f"Q_{col_to_delete}"
    if q_col in data:
        del data[q_col]
        deleted_cols.append(q_col)

print(f"Deleted cols: {deleted_cols} \n")        
cols = [col for col in list(data.columns)]
print(f"Kept cols: {cols} \n")


In [None]:
# Wertverteilung visualisieren

missing_values_dir = output / Path("fehlende_werte") 
missing_values_dir.mkdir(exist_ok=True)
for location in locations:
    cols = data.columns
    has_value = [ data.loc[data['BEST'] == location, col].count() for col in cols]
    no_value = [ data.loc[data['BEST'] == location, col].isnull().sum() for col in cols]
    compare_cols = pd.DataFrame({'fehlt': no_value, 'vorhanden': has_value}, index=cols)
    ax = compare_cols.plot(kind='barh', stacked=True, title=f'{location}: Fehlende Daten')
    ax.set(xlabel='Einheiten', ylabel='Spalten')
    plt.title(f'{location}: Fehlende Werte')
    plt.savefig(f"{missing_values_dir / location}.svg")
    plt.show()

In [None]:
# Hilfsfunktion, um Vergleiche hinzuzufügen






ALLOWED_CONDITIONS = [
    'ignore_minimal_distance', # ndw: Schließe Dublette auch ein, wenn der Mindestabstand nicht erfüllt ist
    'no_tst', # tmf: Träger, Seite, Take haben keinen Wert
    'minimal_bnr', # BNR muss eine Mindestlänge (siehe unten) erfüllen 
    'shorten_z_bnr', # Kürze erste Stelle Z-BNR, wenn Z-BNR aus 6 oder mehr Ziffern besteht
    'no_bnr', # bnf: BNR fehlt,
    'check_q_locators'
]

LOCALISATORS = ['TRÄGER', 'SEITE', 'TAKE']

# Spalten, die später hinzugefügt werden
ADDITIONAL_COLS = ['Z-BNR']



# Vergleich hinzufügen
def compare(df: pd.DataFrame, to_compare: dict, use_cols: list, name_prefix = None, check_q_compare = True, use_conditions: typing.Dict[str, bool] = dict(), name_suffix: typing.Optional[str] = None) -> None:
    name = ", ".join(use_cols)
    if name_prefix is not None:
        name = f'{name_prefix}: {name}'
    if name_suffix is not None:
        name = f'{name} {name_suffix}'
    
    if check_q_compare and any(col in LOCALISATORS for col in use_cols):
        return add_q_compare(df, to_compare, use_cols, name_prefix, check_q_compare=False, use_conditions=use_conditions)

    duplicated_comparision = any(set(use_cols) == set(comparision['cols']) for comparision in to_compare.values())
    if duplicated_comparision:
        raise Exception(f'Comparision already included: {name}')
        
    unknown_cols = [col for col in use_cols if col not in [*df.columns, *ADDITIONAL_COLS]]
    if len(unknown_cols) > 0:
        raise Exception(f'Column(s) ({unknown_cols}) unknown.')
    
    unknown_conds = [cond for cond in use_conditions if cond not in ALLOWED_CONDITIONS]
    if len(unknown_conds) > 0:
        raise Exception(f'Condition(s) ({unknown_conds}) unknown.')
    
    
   
    to_compare[name] = dict()
    to_compare[name]['cols'] = use_cols
    to_compare[name]['conditions'] = use_conditions
    print(f"Added: {name}")



# Q-Elemente ebenfalls berücksichtigen
def add_q_compare(df: pd.DataFrame, to_compare: dict, use_cols: list, name_prefix = None, check_q_compare = True, use_conditions: typing.Dict[str, bool] = dict()) -> None:
    
    localisator_cols = []
    final_cols = []

    for col in use_cols:
        if col in LOCALISATORS:
            localisator_cols.append(col)
            localisator_cols.append(f'Q_{col}')
        else:
            final_cols.append(col)
        
    def is_proper_combination(combination: tuple):
        return any("TRÄGER" in s for s in combination) and any("SEITE" in s for s in combination) and any("TAKE" in s for s in combination)

    new_compares = [[*final_cols, *combination] for combination in itertools.combinations(localisator_cols, 3) if is_proper_combination(combination)]
    for i, cols in enumerate(new_compares, 1):
        
        compare(df, to_compare, use_cols=cols, name_prefix=name_prefix, check_q_compare = False, use_conditions=use_conditions, name_suffix=f'Kombination ({i})')


In [None]:
# Vergleiche hinzufügen


to_compare = dict()

tst = ['TRÄGER', 'SEITE', 'TAKE']

compare(data, to_compare,
        ['AMO_ID'],
        use_conditions=dict(ignore_minimal_distance=True))

compare(data, to_compare,
        [*tst, 'EAN/PEAN', 'LC', 'Z-BNR', 'MIT', 'RHTI', 'T-ISRC', 'T-RHTI'],
        name_prefix="alles",
        use_conditions=dict(ignore_minimal_distance=True))

compare(data, to_compare,
        ['LC', 'Z-BNR', 'T-ISRC'],
        use_conditions=dict(ignore_minimal_distance=True))

compare(data, to_compare,
        ['EAN/PEAN', 'T-ISRC'],
        use_conditions=dict(ignore_minimal_distance=True))

compare(data, to_compare,
        ['RHTI', 'T-ISRC'],
        use_conditions=dict(ignore_minimal_distance=True))


compare(data, to_compare,
        [*tst, 'EAN/PEAN', 'LC', 'Z-BNR', 'MIT', 'RHTI'])

compare(data, to_compare,
        [*tst, 'EAN/PEAN', 'T-RHTI', 'RHTI'])

compare(data, to_compare,
        [*tst, 'EAN/PEAN', 'LC', 'Z-BNR', 'RHTI'])

compare(data, to_compare,
        [*tst, 'EAN/PEAN', 'RHTI'])

compare(data, to_compare,
        ['EAN/PEAN', 'LC', 'Z-BNR', 'RHTI'],
        use_conditions=dict(no_tst=True))

compare(data, to_compare,
        ['EAN/PEAN', 'LC', 'Z-BNR'],
        use_conditions=dict(no_tst=True))

# Tübinger Methode
compare(data, to_compare,
        ['MIT', 'LC', *tst, 'Z-BNR'],
        name_prefix="Tübinger Methode",
        use_conditions=dict(shorten_z_bnr=True))

compare(data, to_compare,
        [*tst, 'EAN/PEAN', 'LC'])

compare(data, to_compare,
        [*tst, 'LC', 'Z-BNR', 'T-RHTI'],
        use_conditions=dict(ignore_minimal_distance=True))

compare(data, to_compare,
        ['LC', 'Z-BNR', 'T-RHTI'],
        use_conditions=dict(no_tst=True))

compare(data, to_compare,
        ['LC', 'Z-BNR', 'RHTI'],
        use_conditions=dict(no_tst=True))


compare(data, to_compare,
        [*tst, 'LC', 'Z-BNR', 'RHTI'],
        use_conditions=dict(minimal_bnr=True))

compare(data, to_compare,
        [*tst, 'MIT', 'RHTI'])

compare(data, to_compare,
        ['RHTI', 'T-RHTI'],
        use_conditions=dict(no_tst=True,
        no_bnr=True))

compare(data, to_compare,
        ['LC', 'MIT', 'RHTI'],
        use_conditions=dict( no_tst=True,
        no_bnr=True))



In [None]:
# Vergleich vorbereiten

PATTERN_NON_ALPHANUMERIC = re.compile(r'\W+')
PATTERN_ONLY_NUMBERS = re.compile(r'\D')
MINIMAL_DISTANCE = 80
MINIMAL_BNR_LEN = 4
minimal_distance_col = f'distance >= {MINIMAL_DISTANCE}?'

data['hat Dubletten?'] = False
data['ist Dublette?'] = False
data[minimal_distance_col] = False
data['Original ANR'] = np.nan
data['Original BEST'] = np.nan
data['Original RHTI'] = np.nan
data['Fundmethode'] = np.nan
data['AMO geteilt'] = False
data['Abstand'] = np.nan
data['Abstand'] = data['Abstand'].astype('Int64')

data['MIT'] = data['MIT'].str.replace(PATTERN_NON_ALPHANUMERIC, '').str.lower()
data['RHTI'] = data['RHTI'].str.replace(PATTERN_NON_ALPHANUMERIC, '').str.lower()
data['T-RHTI'] = data['T-RHTI'].str.replace(PATTERN_NON_ALPHANUMERIC, '').str.lower()

print('Generate Z-BNR (numbers only)')
data['Z-BNR'] = data['BNR'].str.replace(PATTERN_ONLY_NUMBERS, '').str.lower()

# EAN/PEAN 1. Stelle ignorieren, wenn länger als 12 Zeichen
print('Shorten PEAN if necessary')
data['O-PEAN'] = data['EAN/PEAN']
mask = data['EAN/PEAN'].str.len() > 12
column_name = 'EAN/PEAN'
data.loc[mask, column_name] = data.loc[mask, column_name].apply(lambda v: v[1:13])
data['t_ANR'] = data.index

# Vergleich durchführen


print("Comparing...")
print("Priority")
for i, location in enumerate(locations):
    print(f"{i}. {location}")
print('\n')


found = dict()
found['total'] = dict()
ignored = 'ignorierte Dubletten (mit weniger Kriterien anderes Original gefunden, zur Sicherheit verworfen (Ergebnisse ohne Abstandsprüfung!)'
found[ignored] = 0
found['total']['total'] = 0
found['total'][minimal_distance_col] = 0
found['total']['marked'] = 0

for position, method in enumerate(to_compare, 1):
    fundmethod = f'{position}: {method}'
    cols = to_compare[method]['cols']
    conditions = to_compare[method]['conditions']
    print(f'\n Comparing: : {fundmethod} ({position}/{len(to_compare)})')
    print(f'-> Columns: {cols}')
    print(f"-> Conditions: {conditions}")
    

    vgl = data[(data['ist Dublette?'] == False)].copy()
        
    if 'minimal_bnr' in conditions:
        vgl.drop( (vgl[ ~(vgl['BNR'].str.len() >= MINIMAL_BNR_LEN) ] ).index, inplace=True)
    
    
    if 'no_bnr' in conditions:
        vgl.drop( (vgl[ ~(vgl['BNR'].isnull()) ] ).index, inplace=True)
        
    if 'shorten_z_bnr' in conditions:
        mask = vgl['Z-BNR'].str.len() >= 6
        column_name = 'Z-BNR'
        vgl.loc[mask, column_name] = vgl.loc[mask, column_name].apply(lambda v: v[1:])
        
    

    fillNA_INT = 1337
    fillna_STR = "leerwert"
    for col in cols:
        #get dtype for column
        dtype = vgl[col].dtype 
        if dtype == "Int64":
            vgl[col].fillna(fillNA_INT, inplace=True)
        else:
            vgl[col].fillna(fillna_STR, inplace=True)


    
    vgl.sort_values(['BEST'], inplace=True)
    

    dubletten = vgl.duplicated(subset=cols)
    originale = vgl.groupby(cols)['t_ANR'].transform('first').values
    
    vgl['ist Dublette?'] = dubletten
    vgl['Original ANR'] = originale
    
    

    for col in cols:
        vgl = vgl.drop(vgl[(vgl[col] == fillNA_INT) | (vgl[col] == fillna_STR)].index)


    if 'no_tst' in conditions:
        tst_spalten = ['TRÄGER', 'SEITE', 'TAKE']
        vgl.drop((vgl[(vgl[col] == fillNA_INT) | (vgl[col] == fillna_STR)]).index, 
                inplace=True)    

    if len(vgl[vgl['ist Dublette?'] == True]) > 0:
        # Schon als Originale registrierte Einheiten nicht als Dubletten zählen
        found[ignored] += vgl.loc[(vgl['ist Dublette?'] == True) & (data['hat Dubletten?'] == True), 'ist Dublette?'].sum() 
        vgl.loc[ (vgl['ist Dublette?'] == True) & (data['hat Dubletten?'] == True), 'ist Dublette?'] = False


        vgl['Original ANR'] = vgl['Original ANR']
        vgl['Original BEST'] = data.loc[vgl['Original ANR'], 'BEST'].values
        vgl['Original RHTI'] = data.loc[vgl['Original ANR'], 'RHTI'].values

        vgl.drop( vgl[(vgl['ist Dublette?'] == False)].index, inplace=True)
        
        vgl['eigene ANR (numerisch)'] = pd.to_numeric(vgl.index.str.replace(r'[^\d]', '', regex=True))
        vgl['originale ANR (numerisch)'] = pd.to_numeric(vgl['Original ANR'].str.replace(r'[^\d]', '', regex=True))
        vgl['Abstand'] = abs(vgl['eigene ANR (numerisch)'] - vgl['originale ANR (numerisch)'])

        # Ergebnisse übertragen
        if 'ignore_minimal_distance' in conditions:
             data.loc[vgl.index, 'ist Dublette?'] = True
        else:
            data.loc[vgl.index, 'ist Dublette?'] = vgl['Abstand'] >= MINIMAL_DISTANCE


        data.loc[vgl.index, 'Original ANR'] = vgl['Original ANR']
        data.loc[vgl.index, 'Original BEST'] = vgl['Original BEST']
        data.loc[vgl.index, 'Original RHTI'] = vgl['Original RHTI']
        data.loc[data.index.isin(vgl['Original ANR']), 'hat Dubletten?'] = True
        data.loc[vgl.index, 'Fundmethode'] = fundmethod
        if position == 0:
            data.loc[vgl.index, 'AMO geteilt'] = True
        data.loc[vgl.index, 'Abstand'] = vgl['Abstand']
        data.loc[vgl.index, minimal_distance_col] =  vgl['Abstand'] >= MINIMAL_DISTANCE
    
    
    found_total = 0
    found_marked = 0
    found_without_too_near = 0
    if len(vgl[vgl['ist Dublette?'] == True]) > 0:
        found_total = len(vgl['ist Dublette?'])
        found_marked = (data.loc[vgl.index, 'ist Dublette?']).sum()
        found_without_too_near = len(vgl[(vgl['ist Dublette?'] == True) & (vgl['Abstand'] >= MINIMAL_DISTANCE)])
    
    print(f'found (total): {found_total}')
    print(f'found (excluding too near): {found_without_too_near}')
    print(f'found (marked as dublette): {found_marked}\n')
    
    found[fundmethod] = dict()
    found[fundmethod]['total'] = found_total
    found[fundmethod]['marked'] = found_marked
    found[fundmethod][minimal_distance_col] = found_without_too_near
    found['total']['total'] += found[fundmethod]['total']
    found['total']['marked'] += found[fundmethod]['marked']
    found['total'][minimal_distance_col] += found[fundmethod][minimal_distance_col]
    
    del vgl 

del data['t_ANR']

assert len(data[ (data['ist Dublette?'] == True) & (data.index.isin(data['Original ANR'])) ]) == 0
assert len(data[ (data['ist Dublette?'] == True) & (data['hat Dubletten?'] == True) ]) == 0
           
ipd.display(found['total'])
print('Fertig!')


In [None]:
# Ergebnis-Matrix

g = found.copy()

del g['total']
del g[ignored]

for m in g:
    i = g[m]['total']
    a = g[m][minimal_distance_col]
    print(f'{i},{a}')

print('\n')
for m in g:
    i = g[m]['total']
    a = g[m][minimal_distance_col]
    print(f'{m}: {i},{a}')

In [None]:
# Visualisierungen: Ergebnisverteilung

result_dir = output / Path("vergleich_ergebnis")
result_dir.mkdir(exist_ok=True)

x = data.loc[data['ist Dublette?'] == True, 'Fundmethode'].value_counts()
y =  x.index
ax = sns.barplot(x=x, y=y)
ax.set(ylabel='Reihenfolge: Vergleichsmethode', xlabel='Dubletten (abs.)')
plt.title('Fundmethoden nach Fundmenge')
save_to = result_dir / 'fundmethoden.svg'
plt.savefig(save_to, bbox_inches='tight')
plt.show()



x = data.loc[(data['ist Dublette?'] == True), ['BEST', 'Original BEST']].value_counts().unstack().sort_values('BEST')
ax = x.plot(kind='bar')
plt.title('Wo stehen die Originale der Dubletten?')
ax.legend(title='Standort des Originals')
ax.set(ylabel='Originale', xlabel='Standort der Dublette')
save_to = result_dir / 'dubletten_originale.svg'
plt.savefig(save_to, bbox_inches='tight')
plt.show()

    
x = data.loc[:, ['BEST', 'ist Dublette?']].value_counts()
ax = x.unstack(level=1).plot(kind='bar', stacked=True)
ax.legend(['Original', 'Dublette'])
plt.title('Dubletten nach Standort (bezogen auf die jeweilige Gesamtmenge)')
ax.set(ylabel='Einheiten', xlabel='Standort')
save_to = result_dir / 'dubletten_nach_standort.svg'
plt.savefig(save_to, bbox_inches='tight')
plt.show()

for location in locations: 
    x = data.loc[data['BEST'] == location, ['BEST', 'ist Dublette?']].value_counts(normalize=True) * 100
    ax = x.unstack().plot(kind='bar', stacked=True)
    ax.legend(['Original', 'Dublette'])
    plt.title(f'{location}: Dubletten im Bestand (prozentual)')
    ax.set(ylabel='Einheiten (%)', xlabel=None)
    save_to = result_dir / f'{location}_dubletten_nach_standort.svg'
    plt.savefig(save_to, bbox_inches='tight')
    plt.show()


In [None]:
# Ergebnis speichern

results_file = output / Path(f'ergebnis.xlsx')
if results_file.exists():
    results_file.unlink()

out = data.copy()
bool_cols = list()
for col, col_type in dict(out.dtypes).items():
    if col_type == 'bool':
        out[col] = out[col].astype('object')
        out.loc[(out[col] == True), col] = 'x'
        out.loc[(out[col] == False), col] = np.nan

        
with pd.ExcelWriter(results_file) as writer:
    for location in locations:
        out.to_excel(writer, sheet_name='Alles')
        out.loc[out['BEST'] == location, :].to_excel(writer, sheet_name=f'{location} Alles')
        out.loc[out['BEST'] == location, :].sample(10).to_excel(writer, sheet_name=f'{location} Random sample (10)')
        out.loc[(out['BEST'] == location) & (out['ist Dublette?'] == 'x'), :].to_excel(writer, sheet_name=f'{location} Dubletten')

del out