#Databricks notebook source

In [None]:
spark.builder.getOrCreate()

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.functions import count, lit, lpad, col, hour, minute, to_timestamp, to_date, when, isnull, lag, lead, unix_timestamp, min as min_, max as max_
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from datetime import *
from datetime import date, datetime, timedelta
import os

# Parameter settings<br>
 - milking system<br>
 - parity <br>
 - breed<br>
 - breed percentage<br>
 - window size rolling average<br>
 - calved before the date `x`, now `June 1, 2017`<br>
 - ...

In [None]:
milkingsystem = 'AMS'
parity = '4' # every parity under this number will be taken into accoount in the analysis
breed = 'HF'
breedpercentage = '6'
windowsizerol_days = '10'

# ETL with AMS dataset

In [None]:
df = spark.read.text(ams_data)

In [None]:
data = df.select(
      F.trim(df.value.substr( 1,8)).cast("Int").alias('animal'),
      F.trim(df.value.substr(10,8)).cast("Int").alias('herd'),
      to_date(df.value.substr(19,8),"yyyyMMdd").alias('milkdate'),
      df.value.substr(28,4).alias('milkstarttime'),
      df.value.substr(33,4).alias('milkendtime'),
      F.trim(df.value.substr(38,5)).cast("Int").alias('kgmilk'),
      F.trim(df.value.substr(43,4)).alias('parlour_unit'),
      df.value.substr(48,1).alias('success'),
      df.value.substr(19,13).alias('ts'),
)
print('# records: ', data.count())
print('# cows: ', data.select('animal').distinct().count())

Loading calves-dataframe

In [None]:
cdf = calfdates.txtrenumd.gz
calving = cdf.select(
      F.trim(cdf.value.substr( 1,16)).cast("Int").alias('animal'),
      F.trim(cdf.value.substr(17,3)).cast("Int").alias('parity'),
      to_date(cdf.value.substr(20,8),"yyyyMMdd").alias('calfdate')
)

In [None]:
def days(i):
    return 60 * 60 * 24 * i

In [None]:
calving = calving.withColumn("calfdate_minus7", F.to_date(F.to_timestamp((F.to_timestamp("calfdate","yyyyMMdd HHmm").cast("long") - days(7)))))\
                 .withColumn("calfdate_plus7", F.to_date(F.to_timestamp((F.to_timestamp("calfdate","yyyyMMdd HHmm").cast("long") + days(7)))))\
                 .withColumn("calfdate_plus7ts", F.to_timestamp((F.to_timestamp("calfdate","yyyyMMdd HHmm").cast("long") + days(7))))\
.withColumn('nextParity',F.lead(calving['calfdate'])
                                          .over(Window.partitionBy("animal")\
                                          .orderBy("parity")))\
.withColumn('calfdate_lagged',F.lag(calving['calfdate'])
                                          .over(Window.partitionBy("animal")\
                                          .orderBy("parity")))\
.withColumn('parity_lagged',F.lag(calving['parity'])
                                          .over(Window.partitionBy("animal")\
                                          .orderBy("parity")))

In [None]:
calving = calving.filter(calving["parity"] == 1)

In [None]:
data1 = data.join(calving, ((data.animal == calving.animal) &
                             (data.milkdate >= calving.calfdate) &
                             (data.milkdate <= calving.nextParity)),how='inner')\
             .select(data["*"],calving["parity"], calving["calfdate"])

In [None]:
print('# records: ', data1.count())
print('# cows: ', data1.select('animal').distinct().count())

#Loading pm005 

In [None]:
pm = pm005geb.txtrenumd.gz
pm005 = pm.select(
    F.trim(pm.value.substr( 1,16)).cast("Int").alias('animal'),
    to_date(pm.value.substr(17,8),"yyyyMMdd").alias('birthdate'),
    F.trim(pm.value.substr(27,1)).cast("Int").alias('parity'),
    to_date(pm.value.substr(29,8),"yyyyMMdd").alias('calfdate'),
    F.trim(pm.value.substr(37,9)).cast("Int").alias('herd'),
    to_date(pm.value.substr(47,8),"yyyyMMdd").alias('testmilkdate'),
    F.trim(pm.value.substr(56,3)).cast("Int").alias('kgmilk'),
    F.trim(pm.value.substr(60,3)).cast("Int").alias('%fat'),
    F.trim(pm.value.substr(64,3)).cast("Int").cast("Int").alias('%protein'),
    F.trim(pm.value.substr(69,3)).cast("Int").alias('cellcount'),
    F.trim(pm.value.substr(75,1)).cast("Int").alias('ureum'),
    F.trim(pm.value.substr(79,1)).cast("Int").alias('%lactose'),
    F.trim(pm.value.substr(83,1)).cast("Int").alias('ketosisindicator'),
    F.trim(pm.value.substr(86,1)).cast("Int").alias('nrmilkingsonwhichdayproductionisbased'),
    F.trim(pm.value.substr(89,1)).cast("Int").alias('statusdayproduction'),
    F.trim(pm.value.substr(92,1)).cast("Int").alias('statusfictivity'),
    F.trim(pm.value.substr(95,1)).cast("Int").alias('statuscow'),    
)

In [None]:
pm005v2 = pm005.select("animal", "parity", "birthdate", "herd", "nrmilkingsonwhichdayproductionisbased")\
               .filter((pm005["nrmilkingsonwhichdayproductionisbased"] != '4') & (pm005["nrmilkingsonwhichdayproductionisbased"] != '0'))\
.withColumn("milk_system", F.when(F.col("nrmilkingsonwhichdayproductionisbased") == '9', 'AMS').otherwise('CMS'))\
                 .drop("nrmilkingsonwhichdayproductionisbased")\

In [None]:
pm005v4 = pm005v2.groupBy("animal")\
                  .agg(F.sum(F.when(F.col('milk_system')=="AMS",1).otherwise(0)).alias("AMS"),\
                       F.sum(F.when(F.col('milk_system')=="CMS",1).otherwise(0)).alias("CMS"),\
                       F.first("birthdate").alias("birthdate"),\
                       F.first("herd").alias("herd"))\
                  .na.fill({ 'AMS':0, 'CMS':0 })\
                  .withColumn('year', year("birthdate")).drop("birthdate")

In [None]:
pm005v8 = pm005v4.withColumn("milk_system2", when((F.col("AMS") >= "1") & (F.col("CMS") <= "1"), "AMS")\
                                             .otherwise(when((F.col("CMS") >= "1") & (F.col("AMS") <= "1"), "CMS")\
                                             .otherwise("switch")))\
.select("animal", "year", "herd", "milk_system2")

In [None]:
pm005v8.cache()

# Pedigree file<br>
 Load pedigree file.

In [None]:
ped = pedigree.txtrenumd.gz

Again, we load every line as text, and transform them into the appropriate datatypes.
We have consulted the documentation of the dataset, and the following structure was indicated.

In [None]:
pedigree = ped.select(
      F.trim(ped.value.substr( 1,16)).cast("Int").alias('animal'),
      F.trim(ped.value.substr(17,15)).cast("Int").alias('sire'),
      F.trim(ped.value.substr(33,16)).cast("Int").alias('dam'),
      to_date(ped.value.substr(49,8),"yyyyMMdd").alias('birthdate'),
      F.trim(ped.value.substr(58,3)).alias('first_breed'),
      ped.value.substr(61,1).cast("Int").alias('breed_part1'),
    F.trim(ped.value.substr(62,3)).alias('second_breed'),
    ped.value.substr(65,1).cast("Int").alias('breed_part2'),
    ped.value.substr(67,1).alias('sexe'),
    ped.value.substr(69,1).alias('herdbooked')
)

In [None]:
pedigree0 = pedigree.join(pm005v8, (pedigree.animal == pm005v8.animal)).drop(pm005v8.animal)

Multiple filter statements:<br>
  - on sexe, only selecting the females<br>
  - only herdbooked 'S'<br>
  - selecting Holstein Friesian (HF), having 7/8 or 8/8 parts HF

In [None]:
pedigree1 = pedigree0.filter(pedigree["sexe"] == 'F')\
                     .filter(pedigree["herdbooked"] == 'S')\
                     .where(pedigree["first_breed"] == breed)\
                     .where(pedigree["breed_part1"] > breedpercentage)\

In [None]:
pedigree1.cache()

Again, we load every line as text, and transform them into the appropriate datatypes.<br>
We have consulted the documentation of the dataset, and the following structure was indicated.

In [None]:
cdf = calvingdataframe
calving = cdf.select(
      F.trim(cdf.value.substr( 1,16)).cast("Int").alias('animal'),
      F.trim(cdf.value.substr(17,3)).cast("Int").alias('parity'),
      to_date(cdf.value.substr(20,8),"yyyyMMdd").alias('calfdate')

In [None]:
def days(i):
    return 60 * 60 * 24 * i

In [None]:
calving = calving.withColumn("calfdate_minus7", F.to_date(F.to_timestamp((F.to_timestamp("calfdate","yyyyMMdd HHmm").cast("long") - days(7)))))\
                 .withColumn("calfdate_plus7", F.to_date(F.to_timestamp((F.to_timestamp("calfdate","yyyyMMdd HHmm").cast("long") + days(7)))))\
                 .withColumn("calfdate_plus7ts", F.to_timestamp((F.to_timestamp("calfdate","yyyyMMdd HHmm").cast("long") + days(7))))

In [None]:
calving = calving.filter(calving["calfdate"] < '2017-06-01')

In [None]:
calving = calving.withColumn('nextParity', to_date(when(calving.nextParity.isNull(), 
                                                  datetime.strptime("01/06/2017", '%m/%d/%Y')).otherwise(calving.nextParity),"yyyyMMdd"))

Make season for (herd-year-season) of calving variable (four seasons 1-2-3, 4-5-6, 7-8-9, 10-11-12)

In [None]:
calving0 = calving.withColumn("season", when(month(col("calfdate")) < '4', '1')\
                   .when(month(col("calfdate")) == '4', '2')\
                   .when(month(col("calfdate")) == '5', '2')\
                   .when(month(col("calfdate")) == '6', '2')\
                   .when(month(col("calfdate")) == '7', '3')\
                   .when(month(col("calfdate")) == '8', '3')\
                   .when(month(col("calfdate")) == '9', '3')\
                   .when(month(col("calfdate")) > '9', '4'))

In [None]:
calving0 = calving0.filter(calving0["parity"] < parity)

Join calving and pedrigree datasets

In [None]:
calving1 = calving0.join(pedigree1, (calving.animal == pedigree1.animal)).drop(pedigree1.animal)

Make age at calving variable and remove animals calved earlier than at 640 days of age

In [None]:
calving1 = calving1.withColumn("ageAtCalving",datediff(col("calfdate"),col("birthdate")))\
.withColumn("calvingAge",(F.round(col("ageAtCalving")/30.5,0)).cast("Int"))

In [None]:
calving2 = calving1.filter(calving1["ageAtCalving"] > '640')\

In [None]:
calving2 = calving2.withColumn("hys", concat(col("herd"), lit(""), col("Year"), lit(""), col("season")).cast("Int"))

In [None]:
calving2.cache()

In [None]:
data = data.withColumn("timestamp", F.to_timestamp("ts","yyyyMMdd HHmm"))

In [None]:
data1 = data.join(calving2, ((data.animal == calving2.animal) &
                             (data.milkdate >= calving2.calfdate_minus7) &
                             (data.milkdate <= calving2.nextParity)),how='inner')\
             .select(data["*"],calving2["calvingAge"], calving2["calfdate"],calving2["hys"],calving2["parity"],calving2["nextParity"])

In [None]:
data11 = data1.withColumn('firstMilkdate',F.first(data1['milkdate'])\
                                          .over(Window.partitionBy("animal", "parity")\
                                          .orderBy("milkdate")))

In [None]:
data111 = data11.withColumn("firstMilkdate14", F.to_date(F.to_timestamp((F.to_timestamp("firstMilkdate","yyyyMMdd").cast("long") + days(7)))))

Remove animals that start milking later than at day 14 after calving.

In [None]:
data1111 = data111.filter(F.col("calfdate") <= F.col("firstMilkdate14"))


Getting a subset of the data, needed for further analysis.

In [None]:
data2 = data1111.select("animal", "herd", "hys", "milkdate", "parity", "milkstarttime", "timestamp", "success", "kgmilk", "parlour_unit", "calfdate","calvingAge", "nextParity")

Generating new columns for further calculations, inlcuding the "milkstarttime" in hours and minutes, as well as the date.

In [None]:
data = data2.withColumn('milkstarttime_h', when(isnull(hour('timestamp')), 0).otherwise(hour('timestamp')))\
            .withColumn('milkstarttime_m', col('milkstarttime').substr(3, 2))\
            .withColumn('milkstarttime_hm', F.concat(F.col('milkstarttime_h'), F.lit(':'), F.col('milkstarttime_m')))\
            .withColumn('temp', F.concat(F.col('milkdate'), F.lit(' '), F.col('milkstarttime_hm')))\
            .withColumn('date', to_date(F.col('milkdate'),"yyyyMMdd"))\
            .withColumn("timestamp2", F.to_timestamp("temp","yyyy-MM-dd HH:mm"))

Filtering the data for only the successful milkings.

In [None]:
data = data.filter(data["success"] == 'T')\
.filter(data["kgmilk"] != '0')

In order to create the previous and next milking periods per calving, we utilize the `lag` and `lead` function

In [None]:
data = data.drop("milkstarttime", "timestamp", "milkstarttime_hm", "temp")\
           .withColumn('prevMilking',F.lag(data['timestamp2'])
                                          .over(Window.partitionBy("animal")\
                                          .orderBy("timestamp2","animal")))\
           .withColumn('nextMilking',F.lead(data['timestamp2'])
                                          .over(Window.partitionBy("animal")\
                                          .orderBy("timestamp2","animal")))\
           .withColumn('nextMilkingValue',F.lead(data['kgmilk'])
                                          .over(Window.partitionBy("animal")\
                                          .orderBy("timestamp2","animal")))

Here, for each milking the current date, previous date, and next date are calculated.

In [None]:
data = data.withColumn('prevDate', to_date(F.col('prevMilking')))\
           .withColumn('nextDate', to_date(F.col('nextMilking')))\
           .withColumn('currentDate', to_date(F.col('timestamp2')))

Below the calculation of the average milk in kg per day. This calculation is based upon Marieke's PhD, where we part of the first milking of a day is actually a part of the previous day. This part is thus discarded for the current day average, whereas the first milking of the next day is included in the average of the current day.

In [None]:
data = data.withColumn("time_diff_portion1", (F.col("timestamp2").cast("long") - F.col("prevMilking").cast("long"))/60.)\
           .withColumn("time_diff_min_cleaned_portion1", when(F.col("time_diff_portion1") < 86400,F.col("time_diff_portion1")).otherwise("0"))\
           .withColumn("time_diff_sec_portion1", F.col("time_diff_min_cleaned_portion1")*60.)\
           .withColumn("timesincemid_portion1", (((F.col("milkstarttime_h")*60.*60.)+(F.col("milkstarttime_m")*60.))))\
           .withColumn("milk_pps_portion1", ((F.col("kgmilk") / F.col('time_diff_sec_portion1')) * F.col('timesincemid_portion1')))\
           .withColumn("portion1", when(((F.col('currentDate') != F.col("prevDate")) & (F.col('currentDate') == F.col("nextDate"))), F.col("milk_pps_portion1")).otherwise("0"))\
           .withColumn("portion2", when(((F.col('currentDate') == F.col("prevDate")) & (F.col('currentDate') == F.col("nextDate"))), F.col("kgmilk")).otherwise("0"))\
           .withColumn("time_diff_portion3", (F.col("nextMilking").cast("long") - F.col("timestamp2").cast("long"))/60.)\
           .withColumn("time_diff_min_cleaned_portion3", when(F.col("time_diff_portion3") < 86400,F.col("time_diff_portion3")).otherwise("0"))\
           .withColumn("time_diff_sec_portion3", F.col("time_diff_min_cleaned_portion3")*60.)\
           .withColumn("timetomid_portion3", (86400 - ((F.col("milkstarttime_h")*60.*60.)+(F.col("milkstarttime_m")*60.))))\
           .withColumn("milk_pps_portion3", ((F.col("nextMilkingValue") / F.col('time_diff_sec_portion3')) * F.col('timetomid_portion3')))\
           .withColumn("portion3", when(((F.col('currentDate') == F.col("prevDate")) & (F.col('currentDate') != F.col("nextDate"))), F.col("kgmilk")+F.col('milk_pps_portion3')).otherwise("0"))\
           .withColumn("dayAvgMilk", round(F.expr("portion1 + portion2 + portion3"),2))

In [None]:
joined_df99 = data.select("animal","herd","hys","milkdate","parity","kgmilk","calvingAge","calfdate","nextParity","date","timestamp2","currentDate", "dayAvgMilk")\
.withColumn("mmy", F.round(F.mean("dayAvgMilk").over(Window.partitionBy("animal", "currentDate"))/10, 2))

Beginning of period and end of period still not right yet `Nov 5 2019`

Use `window` function for calculating the moving average.

Function to calculate number of seconds from number of days

In [None]:
days = lambda i: i * 86400
joined_df1a = joined_df99.withColumn('timestampGMT', joined_df99.currentDate.cast('timestamp'))
#create window by casting timestamp to long (number of seconds)
w = (Window.partitionBy(F.col("animal")).orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(10), days(10)))
joined_df2 = joined_df1a.withColumn('rollingMean', F.mean("mmy").over(w))\
                      .withColumn('residualRollingMean', F.col("mmy")-F.col('rollingMean'))

In [None]:
joined_df2 = joined_df2.dropDuplicates()

# Further subsetting/filtering

In [None]:
joined_df2a = joined_df2.withColumn('DIM',F.row_number().over(Window.partitionBy("animal", "parity").orderBy("currentDate")))\
.withColumn("llengthMax", (F.max("DIM").over(Window.partitionBy("animal", "parity"))-20))

In [None]:
joined_df3a = joined_df2a.filter((joined_df2a["DIM"] > 10) & (joined_df2a["DIM"] < 340))
joined_df3b = joined_df3a.drop("DIM").distinct()

In [None]:
w =  Window.partitionBy("animal", "parity")
joined_df3b2 = joined_df3b.withColumn("NaNcount", F.sum(F.when(F.isnan("residualRollingMean"), 1).otherwise(0)).over(w))\
  .filter("NaNcount<=50")

In [None]:
from pyspark.sql.types import FloatType
def autocorr(ret):
  import pandas as pd
  s = pd.Series(ret)
  return float(s.autocorr(lag=1))
auto=F.udf(autocorr, FloatType())

In [None]:
joined_df3c = joined_df3b2.withColumn("residualRollingMeanArray", F.collect_list(F.round(F.col("residualRollingMean"), 2)).over(Window.partitionBy("animal", "parity")))\
    .withColumn("autocor", F.round(auto("residualRollingMeanArray"),2))\
.withColumn("var_residualRollingMean", F.var_samp("residualRollingMean").over(Window.partitionBy("animal", "parity")))\
.withColumn("skewness_residualRollingMean", F.skewness("residualRollingMean").over(Window.partitionBy("animal", "parity")))

Making 7 classes: (1)50-90 (2)91-130 (3)131-170 (4)171-210 (5)211-250 (6)251-290 (7)291-330

In [None]:
joined_df3c = joined_df3c.withColumn("llengthCat", 
                 when((F.col("llengthMax") >= "50") & (F.col("llengthMax") <= "90"), "1")\
                .when((F.col("llengthMax") >= "91") & (F.col("llengthMax") <= "130"), "2")\
                .when((F.col("llengthMax") >= "131") & (F.col("llengthMax") <= "170"), "3")\
                .when((F.col("llengthMax") >= "171") & (F.col("llengthMax") <= "210"), "4")\
                .when((F.col("llengthMax") >= "211") & (F.col("llengthMax") <= "250"), "5")\
                .when((F.col("llengthMax") >= "251") & (F.col("llengthMax") <= "290"), "6")\
                .when((F.col("llengthMax") >= "291") & (F.col("llengthMax") <= "330"), "7")\
                                    .cast("Int"))

In [None]:
joined_df3d = joined_df3c.filter(joined_df3c["llengthCat"] > '0')

In [None]:
joined_df7 = joined_df3d.select("animal","herd","calvingAge", "calfdate","hys","mmy","parity","llengthMax","llengthCat","autocor","var_residualRollingMean","skewness_residualRollingMean")\
.withColumn("lnvar", F.round(log(F.col("var_residualRollingMean")),2))\
.distinct()
print("nr of records: ", joined_df7.count())
print("nr of cows: ", joined_df7.select("animal").distinct().count())

In [None]:
joined_df99 = joined_df7.groupby("animal", "herd").pivot("parity").agg(first(F.round("mmy", 2)).alias("mmy"), first("var_residualRollingMean").alias("var_residualRollingMean"), first("calvingAge").alias("calvingAge"), first("hys").alias("hys"), first(F.round("autocor", 2)).alias("autocor"),first(F.round("skewness_residualRollingMean",2)).alias("skewness"),first(F.round("lnvar",2)).alias("lnvar"), first("llengthCat").alias("llengthCat"))\
.withColumn("intHerdBin", (F.col("herd")%10))
print("nr of records: ", joined_df99.count())
print("nr of cows: ", joined_df99.select("animal").distinct().count())

In [None]:
joined_df99.repartition('intHerdBin').write.partitionBy('intHerdBin').format('csv').save("/mnt/files/intHerdBinFull_09032021_")