In [0]:
%scala
#fill in directory and key
spark.sparkContext.hadoopConfiguration.set()

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import Row, Column
from pyspark.sql.types import ArrayType, StringType, StructType, StructField, DoubleType, TimestampType, FloatType, IntegerType

In [0]:
# 'hubbard' is used for slow-growing hubbard breeds (hubbard ja87 and ja57). Other fast-growing hubbards are called 'hubbard conventional' or 'hubbard flex'.
# 'ross' is used for fast-growing ross breeds. 'ross ranger' is called 'rowan ranger' everywhere to separate them from other ross breeds.
# 'ross 308' -> 'ross', 'cobb 500' -> 'cobb'

import re

def breed_correction(text):
    dict = {
    "bijproducten" : "byproduct",
    "fokproduct" : "breeding product",
    "conventioneel" : "conventional",
    "87 " : "",
    "57" : "",
    " ff" : "",
    " 308" : "",
    " 500": ""} 
    
    #if value is Null, do nothing
    if text is None:
        val = None
    else:
        # First make text lower case
        lowerCaseText = text.lower()
        
        # Create a regular expression  from the dictionary keys
        regex = re.compile("(%s)" % "|".join(map(re.escape, dict.keys())))
        # For each match, look-up corresponding value in dictionary
        lowerCaseText = regex.sub(lambda mo: dict[mo.string[mo.start():mo.end()]], lowerCaseText) 
        
        # Then replace specific strings with the correct breed name:
        if re.search("ranger", lowerCaseText):
            val = 'rowan ranger'
        elif lowerCaseText == "kippen diverse rassen":
            val = "diverse breeds"
        elif lowerCaseText == "hubbard ja":
            val = "hubbard"
        else:
            val = lowerCaseText
    return val

breed_correction_udf = udf(breed_correction, StringType())

In [0]:
def pen_correction(text):
    dict = {
    " " : "",
    "-" : ""} 
    
    #if value is Null, do nothing
    if text is not None:
        # Create a regular expression  from the dictionary keys
        regex = re.compile("(%s)" % "|".join(map(re.escape, dict.keys())))
        # For each match, look-up corresponding value in dictionary
        text = regex.sub(lambda mo: dict[mo.string[mo.start():mo.end()]], text) 
        text = text.lower()
    return text

pen_correction_udf = udf(pen_correction, StringType())

# Data

Data cleaning/filter steps:
- Exclude entries with Roosters or Hens (instead of Unsexed)
- Only include Type 'Kip' (no ducks or Null)
- Exclude CompanyType 'Leghanen'
- HatchDate between 2013-2021   (! then null values are excluded)
- SlaughterDate between 2013-2021
- SlaughterDate > HatchDate
- Remove entries with Unsexed (number slaughtered) == 1
- Remove duplicates


In [0]:
dfAfvoer = spark \
  .read \
  .option("header", "true") \
  .option("delimiter", ";") \
  .option("decimal", ",") \
  .option("multiLine", "true") \
  .option("nullValue", "-") \
  .csv("//afvoermeldingen.csv")

In [0]:
# change column names
# correct breed and pen names
# drop unnecessary columns

dfSlaughterWithoutFarmId1 = dfAfvoer \
    .withColumnRenamed('Datumverplaatsing','MoveDate') \
    .withColumnRenamed('Opmerking','Note') \
    .withColumnRenamed('Verplaatsingtype','MoveType') \
    .withColumnRenamed('Geboorte-datum','HatchDate') \
    .withColumnRenamed('Slachtdatum','SlaughterDate') \
    .withColumnRenamed('Kipnrherk','KIPNumber') \
    .withColumnRenamed('Kipnrbest','KIPNumberDestination') \
    .withColumnRenamed('Cate-gorie','Category') \
    .withColumn('Breed', breed_correction_udf(F.col('Ras'))) \
    .withColumn('PenNumber', pen_correction_udf(F.col('Stal'))) \
    .drop('Stal') \
    .withColumnRenamed('Soort','Type') \
    .withColumnRenamed('Houderij-vorm','CompanyType') \
    .withColumnRenamed('Levering-type','DeliveryType') \
    .withColumnRenamed('Hanen','Roosters') \
    .withColumnRenamed('Hennen','Hens') \
    .withColumnRenamed('Ongesext','Unsexed') \
    .withColumnRenamed('Gewicht(kg)','Weight') \
    .withColumnRenamed('Mortaliteit (%)','Mortality') \
    .withColumnRenamed('Uitvoerder-monitoring','MonitoringExecutor') \
    .withColumnRenamed('Voetzool-laesies','FootpadLesionScores') \
    .withColumnRenamed('Bedrijfssoortbestemming','DestinationCompanyType') \
    .drop('Formulier','Versie','Import','Export','Pluim-veetype','Bedrijfssoortherkomst','Ras')

In [0]:
# change data types
# add age

dfSlaughterWithoutFarmId = dfSlaughterWithoutFarmId1 \
    .withColumn('Unsexed',
                dfSlaughterWithoutFarmId1['Unsexed'] \
                .cast("integer")) \
    .withColumn('Weight',
                F.when(F.col('Weight').cast('integer') > 10,
                      F.col('Weight').cast('integer'))) \
    .withColumn('Mortality',
                F.regexp_replace(dfSlaughterWithoutFarmId1['Mortality'], ',', '.')
                .cast("float")) \
    .withColumn('FootpadLesionScores',
                dfSlaughterWithoutFarmId1['FootpadLesionScores'] \
                .cast("integer")) \
    .withColumn('MoveDate',
                F.to_timestamp(dfSlaughterWithoutFarmId1['MoveDate'], "dd-MM-yyyy")) \
    .withColumn('HatchDate',
                F.to_timestamp(dfSlaughterWithoutFarmId1['HatchDate'], "dd-MM-yyyy")) \
    .withColumn('SlaughterDate',
                F.to_timestamp(dfSlaughterWithoutFarmId1['SlaughterDate'], "dd-MM-yyyy")) \
    .withColumn('AgeAtSlaughter',
                F.datediff(F.col('MoveDate'), F.col('HatchDate'))) \
    .withColumnRenamed('KIPNumber', 'PoultryFarmIdentification2') \
    .withColumnRenamed('PenNumber', 'House2') \
    .withColumnRenamed('HatchDate', 'HatchDateKIP') \
    .distinct()

# Add PMP data with UBN

And VetId, thinning yes/no etc.

In [0]:
dfPMPFlocks1 = spark.read.parquet("//dfPMPFlocks.parquet")

In [0]:
dfPMPFlocks = dfPMPFlocks1 \
    .filter(F.col('HatchDate') <= "2021-12-31 00:00:00") \
    .withColumn('PoultryFarmIdentification',
                F.substring(F.col('PoultryFarmIdentification'), 3, 5)) \
    .dropDuplicates(['FlockIdentification', 'House', 'NumberPlaced'])

# there are some duplicate flockIDs. This is because of a double VetID: these are filtered because otherwise the flock and therefore the number placed is doubled. 50 flocks have two different houses: these are kept because I see them as separate flocks (see notebook 'merge PMP data').

In [0]:
dfHouder = spark. \
  read. \
  option("header", "true"). \
  option("delimiter", ","). \
  csv("//vmp_houder.csv")

dfFarmer = dfHouder \
    .withColumnRenamed('HDRID','FarmerIdentification') \
    .withColumn('PoultryFarmIdentification', F.substring(F.col('KIPNUMMER'), 3, 5)) \
    .withColumnRenamed('UBN','FarmIdentification') \
    .withColumnRenamed('POSTCODE', 'PostalCode') \
    .drop('WOONPLAATS', 'ADRES', 'KIPBEGINDATUM', 'KIPNUMMER', 'NAAM', 'DATUM_LAMU', 'USER_LAMU', 'KIPEINDDATUM', 'PMPKUBID', 'STRAAT', 'HUISNR', 'HUISNRTOEV', 'REGISTRATIENUMMER', 'POSTPLAATS', 'POSTADRES', 'POSTPOSTCODE', 'POSTSTRAAT', 'POSTHUISNR', 'POSTHUISNRTOEV', 'SRTCODE', 'PMPKIPID', 'POSTCODE', 'BEDRIJFSOORT')

In [0]:
cond = [dfSlaughterWithoutFarmId.PoultryFarmIdentification2 == dfPMPFlocks.PoultryFarmIdentification,
        dfSlaughterWithoutFarmId.House2 == dfPMPFlocks.House,
        F.abs(F.datediff(dfSlaughterWithoutFarmId.HatchDateKIP, dfPMPFlocks.HatchDate)) <= 3]

dfHatch_PMP  = dfSlaughterWithoutFarmId \
    .join(dfPMPFlocks, on = cond, how = 'inner') \
    .join(dfFarmer, on = ['FarmerIdentification', 'FarmIdentification', 'PoultryFarmIdentification'], how = "inner") \
    .drop('House2', 'PoultryFarmIdentification2') \
    .filter((F.col('Roosters') == '0') &
           (F.col('Hens') == '0') &
           (F.col('Type') == 'Kip') &
           (F.col('CompanyType') != 'Leghanen')) \
    .filter((F.col('Unsexed') > 1)) \
    .filter((F.col('HatchDate') >= "2013-01-01 00:00:00") &
            (F.col('HatchDate') <= "2021-12-31 00:00:00")) \
    .filter((F.col('MoveDate') >= "2013-01-01 00:00:00") &
            (F.col('MoveDate') <= "2021-12-31 00:00:00") &
            (F.col('AgeAtSlaughter') > 1) &
            (F.col('AgeAtSlaughter') < 100)) \
    .drop('Roosters', 'Hens')

# this is without dropDuplicates on flock information, so all slaughter and thinning transports are a separate record.
# dfFarmer join gives same number of rows (no duplicates)

# Transform to events
- Hatch
- Thinning
- Slaughter
- Death
- Relocation

Thanks to the filters, there are no more Null Dates.

In [0]:
dfThinningTransport = dfHatch_PMP \
    .filter((F.col('DeliveryType') == "Uitladen") &
            (F.col('MoveType') == "Afvoer"))

dfSlaughterTransport = dfHatch_PMP \
    .filter((F.col('DeliveryType') == "Wegladen") &
            (F.col('MoveType') == "Afvoer"))

dfDeathTransport = dfHatch_PMP \
    .filter(F.col('MoveType') == "Uitval ")

dfRelocationTransport = dfHatch_PMP \
    .filter(F.col('MoveType') == "Overplaatsing")

# Note: thinning and slaughter are filtered on MoveDate (like the rest), not SlaughterDate.

In [0]:
# hatch records:
# not only from slaughter as sometimes slaughter is accidentally recorded as thinning, plus some flocks only have death/relocation

# distinct on: FarmerIdentification, FarmIdentification, PoultryFarmIdentification, FlockIdentification, NumberPlaced, HatchDateKIP or HatchDate (???)
# one hatch date per PMP registration, so all columns from PMP

dfHatch = dfHatch_PMP \
    .dropDuplicates(["House", "FlockIdentification", "NumberPlaced", "HatchDate"])

In [0]:
from itertools import chain

# fill in for the specific dataset:
Type = "Hatch"
Pen = "House"
EventDate = "HatchDate"
df = dfHatch
# Hatch date is approximately placement date

RightColumns = ["FarmIdentification",
                Pen,
                EventDate,
                "EventType"]
OtherColumns = sorted(list(set(df.columns) - set(RightColumns)))

#create column EventDate and Type, put all other columns in metadata column, drop all other columns
dfHatchEventsStruct = df \
    .withColumnRenamed(EventDate, "EventDate") \
    .withColumnRenamed(Pen, "Pen") \
    .withColumn("EventType", F.lit(Type)) \
    .withColumn("MetaData", F.struct(*OtherColumns)) \
    .select("FarmIdentification","Pen","EventDate","EventType", "MetaData")

In [0]:
# fill in for the specific dataset:
Type = "Thinning"
Pen = "House"
EventDate = "MoveDate"
df = dfThinningTransport

RightColumns = ["FarmIdentification",
                Pen,
                EventDate,
                "EventType"]
OtherColumns = sorted(list(set(df.columns) - set(RightColumns)))

#create column EventDate and Type, put all other columns in metadata column, drop all other columns
dfThinningEventsStruct = df \
    .withColumnRenamed(EventDate, "EventDate") \
    .withColumnRenamed(Pen, "Pen") \
    .withColumn("EventType", F.lit(Type)) \
    .withColumn("MetaData", F.struct(*OtherColumns)) \
    .select("FarmIdentification","Pen","EventDate","EventType", "MetaData")

In [0]:
# fill in for the specific dataset:
Type = "Slaughter"
Pen = "House"
EventDate = "MoveDate"
# --> MoveDate and SlaughterDate should be the same.*
df = dfSlaughterTransport

RightColumns = ["FarmIdentification",
                Pen,
                EventDate,
                "EventType"]
OtherColumns = sorted(list(set(df.columns) - set(RightColumns)))

# create column EventDate and Type, put all other columns in metadata column, drop all other columns
dfSlaughterEventsStruct = df \
    .withColumnRenamed(EventDate, "EventDate") \
    .withColumnRenamed(Pen, "Pen") \
    .withColumn("EventType", F.lit(Type)) \
    .withColumn("Metadata", F.struct(*OtherColumns)) \
    .select("FarmIdentification","Pen","EventDate","EventType", "MetaData")

# * movedate is not the same as slaughterdate in 10.103 cases of 136.586 slaughters. 8.512 of those have a difference of 1 day. (this is from unfiltered excel). In a lot of the others, either the movedate or the slaughterdate has a wrong year (before Hatch).

In [0]:
# fill in for the specific dataset:
Type = "Death"
Pen = "House"
EventDate = "MoveDate"
df = dfRelocationTransport

RightColumns = ["FarmIdentification",
                Pen,
                EventDate,
                "EventType"]
OtherColumns = sorted(list(set(df.columns) - set(RightColumns)))

# create column EventDate and Type, put all other columns in metadata column, drop all other columns
dfDeathEventsStruct = df \
    .withColumnRenamed(EventDate, "EventDate") \
    .withColumnRenamed(Pen, "Pen") \
    .withColumn("EventType", F.lit(Type)) \
    .withColumn("Metadata", F.struct(*OtherColumns)) \
    .select("FarmIdentification","Pen","EventDate","EventType", "MetaData")

In [0]:
# fill in for the specific dataset:
Type = "Relocation"
Pen = "House"
EventDate = "MoveDate"
df = dfDeathTransport

RightColumns = ["FarmIdentification",
                Pen,
                EventDate,
                "EventType"]
OtherColumns = sorted(list(set(df.columns) - set(RightColumns)))

# create column EventDate and Type, put all other columns in metadata column, drop all other columns
dfRelocationEventsStruct = df \
    .withColumnRenamed(EventDate, "EventDate") \
    .withColumnRenamed(Pen, "Pen") \
    .withColumn("EventType", F.lit(Type)) \
    .withColumn("Metadata", F.struct(*OtherColumns)) \
    .select("FarmIdentification","Pen","EventDate","EventType", "MetaData")

In [0]:
dfHatchEventsStruct.write \
    .format("parquet") \
    .mode("overwrite") \
    .save("//dfHatchEventsStruct.parquet")

# first version: 24/11/22  (this was first not saved separately)
# edits:
# - 25/11/22 added pen number correction
# - 29/11/22 removed error: column 'Stal'
# - 12/12/22 hubbard ja -> hubbard
# - 13/01/22 filter on number slaughtered >1, removed weights <10
# - 30/08/23 changed 'birth' to 'hatch'
# - 20/09/23 complete change (PMP)
# - 06/02/24 added hatch dates from thinning/death/relocation, removed double PMP flocks with >1 VetID

In [0]:
dfSlaughterEventsStruct.write \
    .format("parquet") \
    .mode("overwrite") \
    .save("//dfSlaughterEventsStruct.parquet")

# first version: 24/11/22  (this was first not saved seperately)
# edits:
# - 25/11/22 added pen number correction
# - 29/11/22 removed error: column 'Stal'
# - 12/12/22 hubbard ja -> hubbard
# - 13/01/22 filter on number slaughtered >1, removed weights <10
# - 30/08/23 changed 'birth' to 'hatch' and 'FootPadLesions' to 'footpadlesions'
# - 20/09/23 complete change (PMP)
# - 06/02/24 removed double PMP flocks with >1 VetID

In [0]:
dfThinningEventsStruct.write \
    .format("parquet") \
    .mode("overwrite") \
    .save("//dfThinningEventsStruct.parquet")

# first version: 24/11/22  (this was first not saved seperately)
# edits:
# - 25/11/22 added pen number correction
# - 29/11/22 removed error: column 'Stal'
# - 12/12/22 hubbard ja -> hubbard
# - 13/01/22 filter on number slaughtered >1, removed weights <10
# - 30/08/23 changed 'birth' to 'hatch'
# - 20/09/23 complete change (PMP)
# - 06/02/24 removed double PMP flocks with >1 VetID

In [0]:
dfDeathEventsStruct.write \
    .format("parquet") \
    .mode("overwrite") \
    .save("//dfDeathEventsStruct.parquet")

# first version: 24/11/22  (this was first not saved seperately)
# edits:
# - 25/11/22 added pen number correction
# - 29/11/22 removed error: column 'Stal'
# - 12/12/22 hubbard ja -> hubbard
# - 13/01/22 filter on number slaughtered >1, removed weights <10
# - 30/08/23 changed 'birth' to 'hatch'
# - 20/09/23 complete change (PMP)
# - 06/02/24 removed double PMP flocks with >1 VetID

In [0]:
dfRelocationEventsStruct.write \
    .format("parquet") \
    .mode("overwrite") \
    .save("//dfRelocationEventsStruct.parquet")

# first version: 24/11/22  (this was first not saved seperately)
# edits:
# - 25/11/22 added pen number correction
# - 29/11/22 removed error: column 'Stal'
# - 12/12/22 hubbard ja -> hubbard
# - 13/01/22 filter on number slaughtered >1, removed weights <10
# - 30/08/23 changed 'birth' to 'hatch'
# - 20/09/23 complete change (PMP)
# - 06/02/24 removed double PMP flocks with >1 VetID