# Milk Data Preprocessing (merge Raw Data from Swiss Herdbook and Holstein Switzerland to parquet format)
HS = Holstein Switzerland

BS = Braunvieh Schweiz

SHB = Swiss Herdbook

In [None]:
from pathlib import Path
from typing import Any, Callable, Dict, List
from datetime import datetime
import pandas as pd
from joblib import Parallel, delayed

In [None]:
workspace = Path("/home/aschneuwl/workspace/")
workspace2 = Path("/mnt/wks3/aschneuwl/workspace")
shb_bs = workspace / Path("data/dairy/shb_bs/Daten_ETH_Masterarbeit_202404")
holstein_dir = workspace / Path("data/dairy/holstein")

In [None]:
def float_and_none(input_data: str) -> float:
    if(input_data != "" and input_data != "####" and input_data != "#####"):
        output = float(input_data) 
    else:
        output = None

    return output

def str_and_none(input_data: str) -> str:
    if(input_data != "" and input_data != "####" and input_data != "#####"):
        output = input_data 
    else:
        output = None

    return output

In [None]:
def parse_raw_file_parallel(fpath: Path, source: str) -> Dict[str, Any]:
    with open(fpath,  encoding="cp437") as fp:
        lines = fp.readlines()

    records = Parallel(n_jobs=-1, verbose=10)(delayed(b01_record)(line, "bv") for line in lines)

    return records

In [None]:
class QualitasDataReader():
    def __init__(self):
        self._line_reader: Callable[[str, str], Dict[str,Any]] = None

    def _select_reader(self, fpath: Path) -> Callable[[str, str], Dict[str,Any]]:
        ext = fpath.suffix

        if ext == ".B01":
            self._line_reader = b01_record
        elif ext == ".K33":
            self._line_reader = k33_record
        elif ext == ".K10":
            self._line_reader = k10_record
        elif ext == ".K11":
            self._line_reader = k11_record
        elif ext == ".K03":
            self._line_reader = k03_record_ho
        else:
            raise ArgumentError(f"Unknown file type: {ext}.")

        return self._line_reader

    def parse_raw_file(self, fpath: Path, source: str) -> List[Dict[str,Any]]:
        self._select_reader(fpath)
        records = []
        line_count = 0
        with open(fpath,  encoding="cp437") as fp:
            while line := fp.readline():
                try:
                    record = self._line_reader(line, source)
                    records.append(record)
                except Exception as e:
                    print(line_count, "-", line)
                    print(e)

                line_count += 1

        records = [r for r in records if r is not None]
        
        return records

    def read_into_df(self, fpath: Path, source: str) -> pd.DataFrame:
        records = self.parse_raw_file(fpath, source)
        df = pd.DataFrame.from_dict(records)

        return df

In [None]:
def b01_record(line: str, source: str):
    """
    Extracts B01 records from Datenschnittstelle Rindvieh-Schweiz Data Exports
    # Stammdaten des Betriebes
    """
    if line == '\x1a':
        return None
    
    n_line = len(line)
    if not (n_line == 222 or n_line == 272):
        raise ValueError(f"Expected B01 record length is 222 or 272 (Holstein Switzerland). Received {n_line} char(s)")
    
    # Empty Record
    record = {}
    record["source"] = source

    # Satzart
    record["recordType"] = line[0:3]
    
    # Versionsnummer Satzformat
    record["recordVersion"] = line[3:5].strip()

    # Betriebsidentifikation
    record["farmId"] = line[5:15].strip()

    # Betriebsidentifikation TVD
    record["farmIdTvd"] =  line[15:22].strip()

    # Postleihzahl
    record["zip"] = line[144:149].strip()

    # Ort
    record["city"] = line[149:179].strip()

    # Land
    record["country"] = line[181:184].strip()

    # Kataster-Zone
    record["cadasterZone"] = line[184:185].strip()

    # Standort-Höhe
    data_cols = line[185:187].strip()
    record["altitude"] = float_and_none(data_cols)

    # Region
    record["region"] = line[187:189].strip()

    # Gemeindenummer
    record["municipalityId"] = line[189:193].strip()


    # Standort Typ
    record["locationType"] = line[208:212].strip()

    # Prüfbetrieb Status
    record["testFarmStatus"] = line[212:213].strip()

    # Prüfbetrieb Datum Beginnn
    data_cols = line[213:221].strip()
    
    try:
        parsed_date = datetime.strptime(data_cols, '%Y%m%d').date() if (data_cols) else None
    except ValueError:
        parsed_date = None
    record["testFarmStart"] = parsed_date

    return record    

In [None]:
def k10_record(line: str, source: str = None):
    """
    Extracts K10 records from Datenschnittstelle Rindvieh-Schweiz Data Exports
    Besamungs-/Belegungsdaten der Kühe
    """
    
    n_line = len(line)
    if not (n_line == 208 or n_line == 147):
        raise ValueError(f"Expected K10 record length is 208 (BS, SHB) or 147 (HS). Received {n_line} char(s)")
    
    # Empty Record
    record = {}

    record["source"] = source
    
    # Satzart
    record["recordType"] = line[0:3]
    
    # Versionsnummer Satzformat
    record["recordVersion"] = line[3:5].strip()

    # Betriebsidentifikation aktueller Standort
    record["currentFarmId"] = line[5:15].strip()

    # Betriebsidentifikation TVD
    record["currentFarmIdTvd"] = line[15:22].strip()

    # Tier Identifikation
    record["animalId"] = line[22:36].strip()

    # Tier Rassecode
    record["animalBreedCode"] = line[36:39].strip()

    # Tiername
    record["animalName"] = line[39:51].strip()

    # Betriebsidentifikation Standort
    record["farmId"] = line[51:61].strip()

    # Betriebsidentifikation nach TVD
    record["farmIdTvd"] = line[61:68].strip()

    # Kuh / Rind
    record["cowHeifer"] = line[70:71].strip()

    # Datum letze Abkalbung
    record["lastCalving"] = line[71:79].strip()

    # Besamungs- / Belegungsdatum
    data_cols = line[79:87].strip()
    parsed_date = datetime.strptime(data_cols, '%Y%m%d').date() if (data_cols) else None
    record["inseminationDate"] = parsed_date
    
    # Besamungs/Belegungscode
    record["inseminationCode"] = line[87:88].strip()
    
    # Nummer der Besamung/Belegung
    record["inseminationCount"] = line[88:90].strip()
    
    # Stier Identifikation
    record["bullId"] = line[90:104].strip()
    
    # Stier Rassencode
    record["bullBreedId"] = line[104:107].strip()
    
    # Stier-Name
    record["inseminationDate"] = parsed_date
    record["bullName"] = line[107:119].strip()
    
    # Datum vorhergehende Besamung / Belegung
    data_cols = line[119:127].strip()
    parsed_date = datetime.strptime(data_cols, '%Y%m%d').date() if (data_cols) else None
    record["previousInseminationDate"] = parsed_date
    
    # Code Daten Lieferant
    record["dataSourceCode"] = line[127:129].strip()

    # Identifikation Daten Lieferant
    record["dataSourceId"] = line[129:136].strip()
    
    # Code Besamer
    record["inseminatorCode"] = line[136:138].strip()
    
    # Code Teststier
    record["testBullCode"] = line[138:139].strip()
    
    # Code Stierenwechsel
    record["bullChange"] = line[139:140].strip()
    
    # Stierkategorie
    record["bullCategory"] = line[140:142].strip()
    
    # Hofcontainer
    record["farmContainer"] = line[142:143].strip()
    
    # Reservation
    record["reservation"] = line[143:144].strip()
    
    # Spezialcode
    record["specialCode"] = line[144:146].strip()

    if n_line > 147:
        # Ejakulationsdatum
        data_cols = line[146:154].strip()
        parsed_date = datetime.strptime(data_cols, '%Y%m%d').date() if (data_cols) else None
        record["ejaculationDate"] = parsed_date
        
        # Samenbehandlung
        record["semenTreatment"] = line[154:155].strip()
        
        # BesamungsID
        record["inseminationId"] = line[155:170].strip()
        
        # Mutationscode
        record["mutationCode"] = line[170:171].strip()
        
        # Code Daten Lieferant
        record["dataSourceCode"] = line[171:179].strip()
        
        # Bei ET, Identifikation der genetischen Mutter
        record["geneticalMotherId"] = line[179:193].strip()
        
        # Bei ET, Rassencode der genetischen Mutter
        record["geneticalMotherBreed"] = line[193:196].strip()
        
        # Gesextes Spterma geschlecht
        record["sexedSemenGender"] = line[196:197].strip()
            
        # Belegdatum bis (für Sprungeperioden)
        record["inseminationDateTo"] = line[197:205].strip()
        
        # Daten Herkunft
        record["dataSource"] = line[205:207].strip()

    return record

In [None]:
def k11_record(line: str, source: str = None):
    """
    Extracts K11 records from Datenschnittstelle Rindvieh-Schweiz Data Exports
    Abkalbedaten der Kühe
    """
    
    n_line = len(line)
    if n_line != 253 and n_line != 246 and n_line != 250:
        raise ValueError(f"Expected K11 record length is 253. Received {n_line} char(s)")
    
    # Empty Record
    record = {}

    record["source"] = source
    
    # Satzart
    record["recordType"] = line[0:3]
    
    # Versionsnummer Satzformat
    record["recordVersion"] = line[3:5].strip()

    # Betriebsidentifikation aktueller Standort
    record["currentFarmId"] = line[5:15].strip()

    # Betriebsidentifikation TVD
    record["currentFarmIdTvd"] = line[15:22].strip()

    # Mutter Identifikation
    record["damId"] = line[22:36].strip()

    # Mutter Rassecode
    record["dambreedCode"] = line[36:39].strip()

    # Tiername
    record["animalName"] = line[39:51].strip()

    # Betriebsidentifikation Standort bei der Abkalbung
    record["calvingFarmId"] = line[51:61].strip()

    # Betriebsidentifikation nach TVD bei der Abkalbung
    record["calvingFarmIdTvd"] = line[61:68].strip()

    # Laktationsnummer
    record["lactationNumber"] = line[68:70]

    # Kalbedatum
    data_cols = line[70:78].strip()
    parsed_date = datetime.strptime(data_cols, '%Y%m%d').date() if (data_cols) else None
    record["calvingDate"] = parsed_date

    # Kalb Identifikation
    record["calfId"] = line[78:92].strip()

    # Kalb Rassecode
    record["calfBreedCode"] = line[92:95].strip()

    # Kalb Geschlecht
    record["gender"]= line[95:96].strip()

    # Zwilling / Drilling
    record["twinTriplet"] = line[96:97].strip()

    # Vater Identifikation
    record["fatherId"] = line[97:111].strip()

    # Vater Rassecode
    record["fatherBreedCode"] = line[111:114].strip()

    # Abort
    record["abort"] = line[114:115].strip()

    # ZKZ (Zwischenkalbzeit) Tage
    record["intercalvingPeriod"] = line[115:118].strip()

    # Geburtsverlauf
    record["birthProcedure"] = line[118:119].strip()

    # Kalb verendet innterhalb 24 Stunden
    record["calfDeath24"] = line[119:120].strip()

    # Geburtsgewicht
    record["birthWeight"] = line[120:122].strip()

    # Farbe
    record["color"] = line[122:124].strip()

    # Ausweis erwünscht
    record["certificateDesired"] = line[124:125].strip()

    # Bei ET (Embryonentransfer), Identifikation der genetischen Mutter
    record["etGeneticMotherId"] = line[125:139].strip()

    # Bei ET (Embryonentransfer), Rassencode der genetischem Mutter
    record["etGeneticMotherBreedCode"] = line[139:142].strip()

    # Besamungs / Belegunsdatum
    data_cols = line[142:150].strip()
    parsed_date = datetime.strptime(data_cols, '%Y%m%d').date() if (data_cols) else None
    record["inseminationDate"] = parsed_date

    # Kastriert
    record["castrated"] = line[150:151].strip()

    # Totgeburt
    record["stillBirth"] = line[151:152].strip()

    # Zeitpunkt des Todes
    record["deathTime"] = line[152:155].strip()

    # Erbfehlercode
    record["geneticDefectCode"] = line[155:158].strip()

    # Erbfehlercode
    record["geneticDefectCode2"] = line[158:161].strip()

    # Erbfehler / Missbildung
    record["geneticDefect"] = line[162:197].strip()

    # original Bewegungs Id TVD
    record["originalMovementIdTvd"] = line[197:212].strip()

    # aktuelle Bewegungs ID TVD
    record["currentMovementIdTvd"] = line[212:227].strip()

    # Mutationscode
    record["mutationCode"] = line[227:228].strip()

    # Missbildung
    record["malformation"] = line[228:231].strip()

    # Stanzprobenummer
    record["sampleId"] = line[231:237].strip()

    # Geburtsdatum Mutter
    data_cols = line[237:245].strip()
    parsed_date = datetime.strptime(data_cols, '%Y%m%d').date() if (data_cols) else None
    record["dateOfBirthMother"] = parsed_date

    # Betriebsidentifikation Ganzjahrestierhaltung nach TVD
    if n_line > 250:
        record["yearRoundHusbandryFarmId"] = line[245:252]
    
    return record

In [None]:
def k33_record(line: str, source: str):
    """
    Extracts K33 records from Datenschnittstelle Rindvieh-Schweiz Data Exports
    Ergebnisse der einzelnen Milchwägungen der Kühe
    """
    n_line = len(line)
    if n_line != 207:
        raise ValueError(f"Expected K33 record length is 222. Received {n_line} char(s)")
    
    # Empty Record
    record = {}
    record["source"] = source
    
    # Satzart
    record["recordType"] = line[0:3]
    
    # Versionsnummer Satzformat
    record["recordVersion"] = line[3:5].strip()

    # Betriebsidentifikation
    data_cols = line[5:15].strip()
    record["farmId"] = str_and_none(data_cols)

    # Betriebsidentifikation TVD
    data_cols = line[15:22].strip()
    record["farmIdTvd"] = str_and_none(data_cols)

    # Tier Identifikation
    data_cols = line[22:36].strip()
    record["animalId"] = str_and_none(data_cols)

    # Tier Rassencode
    data_cols = line[36:39].strip()
    record["animalBreedCode"] = str_and_none(data_cols)
    
    # Animal Name
    data_cols = line[39:51].strip()
    record["animalName"] = str_and_none(data_cols)

    # Betriebsidentifikation Standort, wo Milchprobe
    data_cols = line[51:61].strip()
    record["farmIdLocationSample"] = str_and_none(data_cols)

    # Betriebsidentifikation nach TVD, wo MilchProbe
    data_cols =  line[61:68].strip()
    record["farmIdTvdSample"] = str_and_none(data_cols)

    # Kalbedatum
    data_cols = line[68:76].strip()
    parsed_date = datetime.strptime(data_cols, '%Y%m%d').date() if (data_cols) else None
    record["calvingDate"] = parsed_date

    # Laktationsnummer
    data_cols = line[76:78].strip()
    record["lactationNumber"] = float_and_none(data_cols)

    # Probenummer
    data_cols = line[78:81].strip()
    record["sampleNumber"] = float_and_none(data_cols)

    # Datum Probewägung
    data_cols = line[81:89].strip()
    parsed_date = datetime.strptime(data_cols, '%Y%m%d').date() if (data_cols) else None
    record["sampleWeighingDate"] = parsed_date

    # Milch kg
    record["milk"] = float(line[89:93].strip())

    # Fett %
    data_cols = line[93:97].strip()
    record["fat"] = float_and_none(data_cols)

    # Protein %
    data_cols = line[97:101].strip()
    record["protein"] = float_and_none(data_cols)

    # Lactose %
    data_cols = line[101:105].strip()
    record["lactose"] = float_and_none(data_cols)
    
    # Proben-Persitenz %
    data_cols = line[105:108].strip()
    record["samplePersistence"] = float_and_none(data_cols)

    # Zellzahl %
    data_cols = line[108:112].strip()
    record["somaticCellCount"] = float_and_none(data_cols)

    # Milchharnstoff x1000/ml
    data_cols = line[112:115].strip()
    record["milkUreaNitrogen"] = float_and_none(data_cols)
    

    # Bemerkung
    data_cols = line[115:117].strip()
    record["comment"] = str_and_none(data_cols)

    # Alp-Höhe x100m
    data_cols = line[117:119].strip()
    record["alpAltitude"] = float_and_none(data_cols)

    # Citrat
    data_cols = line[119:122].strip()
    record["citrate"] = float_and_none(data_cols)

    # Melkmethode
    data_cols = line[123:124].strip()
    record["milkingMethod"] = float_and_none(data_cols)

    # Prüfmethode
    data_cols = line[123:124].strip()
    record["sampleMethod"] =  str_and_none(data_cols)

    # Aceton mg/l
    data_cols = line[126:129].strip()
    record["aceton"] =  float_and_none(data_cols)

    # Milch kg morgen gewogen
    data_cols = line[129:133].strip()
    record["milkMorning"] = float_and_none(data_cols)

    # Milch kg abend gewogen
    data_cols = line[133:137].strip()
    record["milkEvening"] = float_and_none(data_cols)

    # Fett % gemessen
    data_cols = line[137:141].strip()
    record["fatMeasured"] = float_and_none(data_cols)

    # Protein % gemessen
    data_cols = line[141:145].strip()
    record["proteinMeasured"] = float_and_none(data_cols)

    # Code Wägung
    data_cols = line[145:147].strip()
    record["weighingCode"] = str_and_none(data_cols)

    # Code Labor
    data_cols = line[147:149].strip()
    record["labCode"] = str_and_none(data_cols)

    # Melkzeit morgen
    data_cols = line[149:154].strip()
    parsed_time =  datetime.strptime(data_cols, "%H:%M").time() if data_cols else None
    record["milkingTimeMorning"] = parsed_time

    # Melkzeit abend
    data_cols = line[154:159].strip()
    parsed_time =  datetime.strptime(data_cols, "%H:%M").time() if data_cols else None
    record["milkingTimeEvening"] = parsed_time

    # Anmeldung MBK (Melkbarkeitsprüfung)
    data_cols = line[159:161].strip()
    record["registrationMbk"] = str_and_none(data_cols)
    

    # Anmeldung LBE (Lineare Beschreibung und Einstufung)
    data_cols = line[161:163].strip()
    record["registrationLbe"] = str_and_none(data_cols)

    # Kasein %
    data_cols = line[163:167].strip()
    record["caseinMeasured"] = float_and_none(data_cols)

    # 38 - Laufnummer in Herde
    data_cols = line[167:171].strip()
    record["herdIdentification"] =  str_and_none(data_cols)

    # Wägetyp
    data_cols =  line[171:172].strip()
    record["weighingType"] = str_and_none(data_cols)

    # Daten Herkunft
    data_cols = line[172:174].strip()
    record["dataOrigin"] = str_and_none(data_cols)

    # Aceton mmol/l
    data_cols = line[174:178].strip()
    record["acetonMmol"] = float_and_none(data_cols)

    # Fett %
    data_cols = line[178:183].strip()
    record["fat2"] = float_and_none(data_cols)

    # AcetonIR (Infrarotspektroskopie?)
    data_cols = line[183:187].strip()
    record["acetonIr"] = float_and_none(data_cols)

    # BHB (beta-hydroxybutyrate)
    data_cols = line[187:191].strip()
    record["bhbConcentration"] = float_and_none(data_cols)

    # Protein
    data_cols = line[191:196].strip()
    record["protein2"] = float_and_none(data_cols)

    # Fett gemessen %
    data_cols = line[196:201].strip()
    record["fatMeasured2"] = float_and_none(data_cols)

    # Portein measured %
    data_cols = line[201:206].strip()
    record["proteinMeasured2"] = float_and_none(data_cols)
    
    return record    

In [None]:
def k03_record_ho(line: str, source: str):
    """
    Extracts K03 records from Datenschnittstelle Rindvieh-Schweiz Data Exports for Holstein Switzerland
    """
    n_line = len(line)
    if n_line != 172:
        raise ValueError(f"Expected K03 v4 record length is 172. Received {n_line} char(s)")
    
    # Empty Record
    record = {}
    record["source"] = source
    
    # Satzart
    record["recordType"] = line[0:3]
    
    # Versionsnummer Satzformat
    record["recordVersion"] = line[3:5].strip()

    # Betriebsidentifikation
    data_cols = line[5:15].strip()
    record["farmId"] = str_and_none(data_cols)

    # Betriebsidentifikation TVD
    data_cols = line[15:22].strip()
    record["farmIdTvd"] = str_and_none(data_cols)

    # Tier Identifikation
    data_cols = line[22:36].strip()
    record["animalId"] = str_and_none(data_cols)

    # Tier Rassencode
    data_cols = line[36:39].strip()
    record["animalBreedCode"] = str_and_none(data_cols)
    
    # Animal Name
    data_cols = line[39:51].strip()
    record["animalName"] = str_and_none(data_cols)

    # Betriebsidentifikation Standort, wo Milchprobe
    data_cols = line[51:61].strip()
    record["farmIdLocationSample"] = str_and_none(data_cols)

    # Betriebsidentifikation nach TVD, wo MilchProbe
    data_cols =  line[61:68].strip()
    record["farmIdTvdSample"] = str_and_none(data_cols)

    # Kalbedatum
    data_cols = line[68:76].strip()
    parsed_date = datetime.strptime(data_cols, '%Y%m%d').date() if (data_cols) else None
    record["calvingDate"] = parsed_date

    # Laktationsnummer
    data_cols = line[76:78].strip()
    record["lactationNumber"] = float_and_none(data_cols)

    # Probenummer
    data_cols = line[78:81].strip()
    record["sampleNumber"] = float_and_none(data_cols)

    # Datum Probewägung
    data_cols = line[81:89].strip()
    parsed_date = datetime.strptime(data_cols, '%Y%m%d').date() if (data_cols) else None
    record["sampleWeighingDate"] = parsed_date

    # Milch kg
    data_cols = line[89:93].strip()
    record["milk"] = float_and_none(data_cols)

    # Fett %
    data_cols = line[93:97].strip()
    record["fat"] = float_and_none(data_cols)

    # Protein %
    data_cols = line[97:101].strip()
    record["protein"] = float_and_none(data_cols)

    # Lactose %
    data_cols = line[101:105].strip()
    record["lactose"] = float_and_none(data_cols)
    
    # Proben-Persitenz %
    data_cols = line[105:108].strip()
    record["samplePersistence"] = float_and_none(data_cols)

    # Zellzahl %
    data_cols = line[108:112].strip()
    record["somaticCellCount"] = float_and_none(data_cols)

    # Milchharnstoff x1000/ml
    data_cols = line[112:115].strip()
    record["milkUreaNitrogen"] = float_and_none(data_cols)
    

    # Bemerkung
    data_cols = line[115:117].strip()
    record["comment"] = str_and_none(data_cols)

    # Alp-Höhe x100m
    data_cols = line[117:119].strip()
    record["alpAltitude"] = float_and_none(data_cols)

    # Citrat
    data_cols = line[119:122].strip()
    record["citrate"] = float_and_none(data_cols)

    # Melkmethode
    data_cols = line[123:124].strip()
    record["milkingMethod"] = float_and_none(data_cols)

    # Prüfmethode
    data_cols = line[123:124].strip()
    record["sampleMethod"] =  str_and_none(data_cols)

    # Aceton mg/l
    data_cols = line[126:129].strip()
    record["aceton"] =  float_and_none(data_cols)

    # Milch kg morgen gewogen
    data_cols = line[129:133].strip()
    record["milkMorning"] = float_and_none(data_cols)

    # Milch kg abend gewogen
    data_cols = line[133:137].strip()
    record["milkEvening"] = float_and_none(data_cols)

    # Fett % gemessen
    data_cols = line[137:141].strip()
    record["fatMeasured"] = float_and_none(data_cols)

    # Protein % gemessen
    data_cols = line[141:145].strip()
    record["proteinMeasured"] = float_and_none(data_cols)

    # Code Wägung
    data_cols = line[145:147].strip()
    record["weighingCode"] = str_and_none(data_cols)

    # Code Labor
    data_cols = line[147:149].strip()
    record["labCode"] = str_and_none(data_cols)

    # Melkzeit morgen
    data_cols = line[149:154].strip()
    parsed_time =  datetime.strptime(data_cols, "%H:%M").time() if data_cols else None
    record["milkingTimeMorning"] = parsed_time

    # Melkzeit abend
    data_cols = line[154:159].strip()
    parsed_time =  datetime.strptime(data_cols, "%H:%M").time() if data_cols else None
    record["milkingTimeEvening"] = parsed_time

    # Anmeldung MBK (Melkbarkeitsprüfung)
    data_cols = line[159:161].strip()
    record["registrationMbk"] = str_and_none(data_cols)
    

    # Anmeldung LBE (Lineare Beschreibung und Einstufung)
    data_cols = line[161:163].strip()
    record["registrationLbe"] = str_and_none(data_cols)

    # Kasein %
    data_cols = line[163:167].strip()
    record["caseinMeasured"] = float_and_none(data_cols)

    # 38 - Laufnummer in Herde
    data_cols = line[167:171].strip()
    record["herdIdentification"] =  str_and_none(data_cols)

    # Wägetyp
    data_cols =  line[171:172].strip()
    record["weighingType"] = str_and_none(data_cols)
    
    return record    

In [None]:
from typing import Iterable

def read_parallel(file_list: Iterable[Path]):
    qdr = QualitasDataReader()
    dfs = Parallel(n_jobs=-1, verbose=10)(delayed(qdr.read_into_df)(f, f.name) for f in file_list)
    df = pd.concat(dfs)

    return df

## Farm Data (B01)

Holstein Switzerland Data

In [None]:
df_b01_hs = read_parallel(holstein_dir.glob("*.B01"))

Swissherdbook Data

In [None]:
df_b01_shb = read_parallel(shb_bs.glob("*.B01"))

Merge

In [None]:
df_b01 = pd.concat([df_b01_hs, df_b01_shb])

Save

In [None]:
df_b01.to_parquet(workspace2 / Path("data/preprocessed/dairy") / Path("b01.parquet"))

## Insemination Data (K10)

In [None]:
df_k10_shb = read_parallel(shb_bs.glob("*.K10"))

In [None]:
df_k10_hs = read_parallel(holstein_dir.glob("*.K10"))

In [None]:
df_k10 = pd.concat([df_k10_shb, df_k10_hs])

In [None]:
df_k10.to_parquet(workspace2 / Path("data/preprocessed/dairy") / Path("k10.parquet"))

## Calving Data

In [None]:
df_k11_shb = read_parallel(shb_bs.glob("*.K11"))

In [None]:
df_k11_hs = read_parallel(holstein_dir.glob("*.K11"))

In [None]:
df_k11_shb.columns

In [None]:
df_k11 = pd.concat([df_k11_shb, df_k11_hs])

In [None]:
df_k11["calvingDate"] = pd.to_datetime(df_k11.calvingDate, errors='coerce')
df_k11["dateOfBirthMother"] = pd.to_datetime(df_k11.dateOfBirthMother, errors='coerce')

In [None]:
df_k11.to_parquet(workspace2 / Path("data/preprocessed/dairy") / Path("k11.parquet"))

## Milk Data

In [None]:
df_shb_bs_k33 = read_parallel(shb_bs.glob("*.K33"))

The records from Holstein Switzerland are provided as K03 data points. K03 sample = most recent milk samples, K33 = all milk samples -> Holstein Switzerland provides them as a merge from K03. Hence, nothing to worry. Double checked with Mr Neuenschwander.

In [None]:
df_hs_k33 = read_parallel(holstein_dir.glob("*.K03"))

In [None]:
df_k33 = pd.concat([df_shb_bs_k33, df_hs_k33], axis=0)

In [None]:
df_k33["sampleWeighingDate"] = pd.to_datetime(df_k33.sampleWeighingDate, errors='coerce')
df_k33["calvingDate"] = pd.to_datetime(df_k33.calvingDate, errors='coerce')

In [None]:
df_k33.loc[:,"year"] = df_k33.sampleWeighingDate.dt.year

In [None]:
df_k33.to_parquet("k33.parquet", partition_cols=["year", "animalBreedCode"])

In [None]:
df_k33.animalBreedCode.unique()

je = JE (Jersey)

ob = OB (Original Braunvieh)

rob

bs = BS (Brown Swiss)

bv =

ho = HO, RH, RF (Holstein)

sf = SF (Swiss Fleckvieh)

si = SI, 60, 70 (Simmental)

mo = ? (Montbéillard)

In [None]:
df_k33.animalBreedCode.value_counts()

In [None]:
df_k33[df_k33["animalBreedCode"] == "60"]

In [None]:
df_k33[df_k33["animalBreedCode"] == "70"]

# Farms

In [None]:
df_k33.farmId.nunique()

In [None]:
df_k33.farmIdTvd.nunique()

In [None]:
df_k33.farmIdLocation.nunique()

# Animals

In [None]:
df_k33.calvingDate.min()

In [None]:
df_k33.calvingDate.max()

In [None]:
pd.options.plotting.backend = "matplotlib"

In [None]:
sample_per_breed_per_day = df_k33[["sampleWeighingDate", "animalBreedCode"]].value_counts().reset_index()

In [None]:
sample_per_breed_per_day

In [None]:
df_b01 = 

In [None]:
sample_meta = ["sampleNumber", "sampleWeighingDate", "comment", "milkingMethod", "sampleMethod", "milkEvening", "milkMorning", "weighingCode", "labCode", "milkingTimeMorning", "milkingTimeEvening"]
animal_meta = ["animalId", "animalBreedCode", "calvingDate", "lactationNumber"]
farm_meta = ["farmId", "farmIdTvd" , "farmIdLocation", "alpAltitude"]
yield_cols = ['milk', 'fat', 'protein', 'lactose', 'samplePersistence', 'somaticCellCount', "citrate", "milkUreaNitrogen", "aceton", "fatMeasured", "proteinMeasured", "acetonMmol", "acetonIr", "bhbConcentration"]

In [None]:
df_k33[yield_cols]

In [None]:
df_k33[[, "milk"]]

In [None]:
df_k33

In [None]:
df_k33.animalId.nunique()

In [None]:
import pandas as pd
df = pd.read_parquet("k33.parquet")

In [None]:
df.

In [None]:
milk = df

In [None]:
df.animalId.nunique()

In [None]:
(milk.groupby("farmId").sampleWeighingDate.max()- milk.groupby("farmId").sampleWeighingDate.min()).dt.years.hist()

In [None]:
(milk.groupby("animalId").sampleWeighingDate.max()- milk.groupby("animalId").sampleWeighingDate.min()).dt.days.hist()

In [None]:
milk.loc[milk.animalBreedCode == "70", "animalBreedCode"] = "SI"
milk.loc[milk.animalBreedCode == "60", "animalBreedCode"] = "SI"
milk.loc[milk.animalBreedCode == "RH", "animalBreedCode"] = "HO"
milk.loc[milk.animalBreedCode == "RF", "animalBreedCode"] = "HO"

In [None]:
import numpy as np

In [None]:
milk[milk.sampleWeighingDate.dt.year == 2023].groupby("animalBreedCode").somaticCellCount.apply(np.log10).reset_index().groupby("animalBreedCode").mean()

In [None]:
milk[milk.sampleWeighingDate.dt.year == 2023].groupby("animalBreedCode").somaticCellCount.apply(np.log10).reset_index().groupby("animalBreedCode").std()

In [None]:
df[df.sampleWeighingDate.dt.year == 2023].groupby("animalBreedCode").fat.std()

In [None]:
df[df.sampleWeighingDate.dt.year == 2023].groupby("animalBreedCode").milk.max()

In [None]:
df[df.sampleWeighingDate.dt.year == 2023].groupby("animalBreedCode").milk.std()

In [None]:
df.m

In [None]:
import numpy as np