In [0]:
from pyspark.sql.functions import to_date,col
from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType, DoubleType, FloatType, TimestampType
import pyspark.sql.functions as F
from pyspark.sql import Window

#Read data

In [0]:
FinalData= spark. \
  read. \
  parquet("wasbs://gpluse-cluster-2@bovianalytics.blob.core.windows.net/Projects/ChenYoungYan/11022022/individualMilkingViews/")

In [0]:
# descriptive
FinalData.\
  agg(F.count('*'),
    F.countDistinct("HerdIdentifier"),
    F.countDistinct("HerdIdentifier","AnimalIdentifier").alias('NumberOfCows')).\
display()

count(1),count(HerdIdentifier),NumberOfCows
191462180,91,44540


In [0]:
#recalculate DIM and Age

FinalDataFormatted=FinalData.\
    drop('Age','Teams','TotalIntakeConsumedKg','TotalIntakeProgrammedKg','TotalTimeMilking','MilkSpeed','Failures','Milkings','Refusals','Parity','DaysInMilkCategory','AnimalEartag','AnimalFarmNumber','AnimalFarmName','IsActive','FirstMilkYield','HerdName','dataProviderGuid').\
    withColumn('AgeInDays',F.datediff('Date','BirthDate')+1).\
    withColumn('AgeInYears',F.col('AgeInDays')/365).\
    withColumn('DaysInMilk',F.datediff('Date','CalvingDate')+1).\
    withColumn('BirthDate',to_date('BirthDate','yyyy-MM-dd')).\
    withColumn('CalvingDate',to_date('CalvingDate','yyyy-MM-dd'))

In [0]:
FinalDataFormatted.\
  write.\
  parquet(path='wasbs://gpluse-cluster-2@bovianalytics.blob.core.windows.net/Projects/ChenYoungYan/11022022/Output/dfFinalDataFormatted0607/',mode="overwrite")

# Read formatted data

In [0]:
FinalDataFormatted= spark. \
  read. \
  parquet("wasbs://gpluse-cluster-2@bovianalytics.blob.core.windows.net/Projects/ChenYoungYan/11022022/Output/dfFinalDataFormatted0607/")

In [0]:
FinalDataFormatted.show(2)

+----------------+----------+-------------------+--------------+----------+---------------+-----------+------------------+-----------+-------------+----------+--------------+-----------------+---------+------------------+
|AnimalIdentifier| BirthDate|               Date|HerdIdentifier|DaysInMilk|LactationNumber|MilkYieldKg|     CalculatedSPP|CalvingDate|CurrentStatus|BodyWeight|BodyWeightMAvg|BodyWeightMAvgDev|AgeInDays|        AgeInYears|
+----------------+----------+-------------------+--------------+----------+---------------+-----------+------------------+-----------+-------------+----------+--------------+-----------------+---------+------------------+
|            1267|2008-01-16|2010-01-16 07:30:28|           720|         1|              1|        3.5|             6.195| 2010-01-16|            8|     469.0|         469.0|              0.0|      732|2.0054794520547947|
|            1267|2008-01-16|2010-01-16 18:24:27|           720|         1|              1|        3.0|5.3100000

In [0]:
FinalDataFormatted.\
    select('Date').\
    withColumn('Year',F.year('Date')).\
    describe().\
    display()

summary,Year
count,191462180.0
mean,2016.5890266735707
stddev,3.4820146248308794
min,2000.0
max,2022.0


In [0]:
FinalDataFormatted.\
    agg(F.count('*').alias('NumberOfRecords'),
    F.countDistinct("HerdIdentifier").alias('NumberOfHerds'),
    F.countDistinct("HerdIdentifier","AnimalIdentifier").alias('NumberOfCows')).\
    display()

NumberOfRecords,NumberOfHerds,NumberOfCows
191462180,91,44540


# Filtering

## record level

### milk>0

In [0]:
# overview
FinalDataFormatted.\
    agg(F.count('*').alias('NumbersOfRecord'),
        F.countDistinct("HerdIdentifier").alias('NumbersOfHerds'),
        F.countDistinct("HerdIdentifier","AnimalIdentifier").alias('NumberOfCows')).\
    display()

NumbersOfRecord,NumbersOfHerds,NumberOfCows
191462180,91,44540


In [0]:
FinalDataFiltered = FinalDataFormatted.\
    filter((F.col('MilkYieldKg')>0))

In [0]:
FinalDataFiltered.\
    agg(F.count('*').alias('NumbersOfRecord'),
        F.countDistinct("HerdIdentifier").alias('NumbersOfHerds'),
        F.countDistinct("HerdIdentifier","AnimalIdentifier").alias('NumberOfCows')).\
    display()

NumbersOfRecord,NumbersOfHerds,NumberOfCows
95529301,91,44308


## lactation level

In [0]:
# lactation level data
w1=Window.partitionBy('HerdIdentifier','AnimalIdentifier','CalvingDate','LactationNumber').\
  orderBy(F.desc('Date'))

LactationLevel = FinalDataFiltered.\
    withColumn('LastRecordDate',to_date(F.first('Date').over(w1),'yyyy-MM-dd')).\
    withColumn('LastDIM',F.datediff('LastRecordDate','CalvingDate')+1).\
    withColumn('AgeAtCalvingDateInDays',F.datediff('CalvingDate','BirthDate')+1).\
    withColumn('AgeAtCalvingDateInYears',col('AgeAtCalvingDateInDays')/365).\
    select('HerdIdentifier','AnimalIdentifier','CalvingDate','LactationNumber','LastDIM','LastRecordDate','AgeAtCalvingDateInDays','AgeAtCalvingDateInYears').\
    distinct()

In [0]:
LactationLevel.show(2)

+--------------+----------------+-----------+---------------+-------+--------------+----------------------+-----------------------+
|HerdIdentifier|AnimalIdentifier|CalvingDate|LactationNumber|LastDIM|LastRecordDate|AgeAtCalvingDateInDays|AgeAtCalvingDateInYears|
+--------------+----------------+-----------+---------------+-------+--------------+----------------------+-----------------------+
|           503|             624| 2008-09-15|              4|    268|    2009-06-09|                  1987|      5.443835616438356|
|           503|             625| 2008-09-03|              4|    287|    2009-06-16|                  1973|      5.405479452054794|
+--------------+----------------+-----------+---------------+-------+--------------+----------------------+-----------------------+
only showing top 2 rows



In [0]:
# overview
LactationLevel.\
    agg(F.count('*').alias('NumberOfLactations'),
        F.countDistinct("HerdIdentifier").alias('NumberOfHerds'),
        F.countDistinct("HerdIdentifier","AnimalIdentifier").alias('NumberOfCows')).\
    display()

NumberOfLactations,NumberOfHerds,NumberOfCows
117420,91,44308


### parity!=0

In [0]:
LactationLevelFiltered = LactationLevel.\
    filter(col('LactationNumber')>0)

In [0]:
# overview
LactationLevelFiltered.\
    agg(F.count('*').alias('NumberOfLactations'),
        F.countDistinct("HerdIdentifier").alias('NumberOfHerds'),
        F.countDistinct("HerdIdentifier","AnimalIdentifier").alias('NumberOfCows')).\
    display()

NumberOfLactations,NumberOfHerds,NumberOfCows
117094,91,44283


### age>parity

In [0]:
LactationLevelFiltered = LactationLevelFiltered.\
    filter(col('AgeAtCalvingDateInYears')> col('LactationNumber'))

In [0]:
# overview
LactationLevelFiltered.\
    agg(F.count('*').alias('NumberOfLactations'),
        F.countDistinct("HerdIdentifier").alias('NumberOfHerds'),
        F.countDistinct("HerdIdentifier","AnimalIdentifier").alias('NumberOfCows')).\
    display()

NumberOfLactations,NumberOfHerds,NumberOfCows
116492,91,44177


### age 1-99 per parity

In [0]:
# before 
LactationLevelFiltered.\
    groupBy('LactationNumber').\
    agg(F.min('AgeAtCalvingDateInYears'),
     F.expr('percentile(AgeAtCalvingDateInYears,array(0.01,0.05,0.95,0.99))').alias('1%,5%,95%,99%'),
       F.max('AgeAtCalvingDateInYears')).\
    orderBy('LactationNumber').\
    display()

LactationNumber,min(AgeAtCalvingDateInYears),"1%,5%,95%,99%",max(AgeAtCalvingDateInYears)
1,1.0136986301369864,"List(1.7452054794520548, 1.841095890410959, 2.5835616438356164, 3.1232876712328768)",12.126027397260271
2,2.010958904109589,"List(2.6876712328767125, 2.8164383561643835, 3.8575342465753426, 4.499232876712332)",13.216438356164383
3,3.0027397260273974,"List(3.5753424657534247, 3.7945205479452055, 5.0682191780821855, 5.722684931506846)",14.07123287671233
4,4.024657534246575,"List(4.465753424657534, 4.7753424657534245, 6.280547945205479, 6.912547945205479)",17.18904109589041
5,5.005479452054795,"List(5.378794520547944, 5.745205479452054, 7.4612328767123275, 8.139013698630142)",15.498630136986302
6,6.005479452054795,"List(6.2684931506849315, 6.7315068493150685, 8.638904109589038, 9.421315068493154)",16.671232876712327
7,7.013698630136986,"List(7.28295890410959, 7.704109589041096, 9.707397260273973, 10.575561643835613)",12.608219178082193
8,8.021917808219179,"List(8.295616438356165, 8.711780821917808, 10.887945205479452, 11.699232876712328)",12.490410958904109
9,9.024657534246575,"List(9.27041095890411, 9.656712328767124, 11.936027397260276, 12.797205479452053)",12.964383561643835
10,10.156164383561643,"List(10.24295890410959, 10.608356164383562, 13.153424657534247, 13.962767123287671)",14.375342465753423


In [0]:
QuantilesAgeInDays=[(i, LactationLevelFiltered.filter((col('LactationNumber')==i)).\
                     agg(F.expr('percentile(AgeAtCalvingDateInDays,array(0.01,0.99))').alias('Q1')).\
                     select('Q1').\
                     collect()[0][0]) for i in range(0,15)]

In [0]:
QuantilesAgeInDays

Out[24]: [(0, None),
 (1, [637.0, 1140.0]),
 (2, [981.0, 1642.2200000000012]),
 (3, [1305.0, 2088.779999999999]),
 (4, [1630.0, 2523.08]),
 (5, [1963.2600000000002, 2970.7400000000016]),
 (6, [2288.0, 3438.7800000000007]),
 (7, [2658.28, 3860.079999999999]),
 (8, [3027.9, 4270.22]),
 (9, [3383.7000000000003, 4670.98]),
 (10, [3738.6800000000003, 5096.41]),
 (11, [4228.76, 5470.0]),
 (12, [4552.2, 5749.04]),
 (13, [4923.26, 5789.38]),
 (14, [5406.24, 5759.04])]

In [0]:
QuantilesAgeInDays[1][1][1]

Out[34]: 1140.0

In [0]:
# keep 1-99% in parity 1-14, and remove the extreme parity 62-66

LactationLevelFiltered = LactationLevelFiltered.\
  filter(((col('LactationNumber')==1)&(col('AgeAtCalvingDateInDays')>QuantilesAgeInDays[1][1][0])&(col('AgeAtCalvingDateInDays')<QuantilesAgeInDays[1][1][1]))|\
         ((col('LactationNumber')==2)&(col('AgeAtCalvingDateInDays')>QuantilesAgeInDays[2][1][0])&(col('AgeAtCalvingDateInDays')<QuantilesAgeInDays[2][1][1]))|\
         ((col('LactationNumber')==3)&(col('AgeAtCalvingDateInDays')>QuantilesAgeInDays[3][1][0])&(col('AgeAtCalvingDateInDays')<QuantilesAgeInDays[3][1][1]))|\
         ((col('LactationNumber')==4)&(col('AgeAtCalvingDateInDays')>QuantilesAgeInDays[4][1][0])&(col('AgeAtCalvingDateInDays')<QuantilesAgeInDays[4][1][1]))|\
         ((col('LactationNumber')==5)&(col('AgeAtCalvingDateInDays')>QuantilesAgeInDays[5][1][0])&(col('AgeAtCalvingDateInDays')<QuantilesAgeInDays[5][1][1]))|\
         ((col('LactationNumber')==6)&(col('AgeAtCalvingDateInDays')>QuantilesAgeInDays[6][1][0])&(col('AgeAtCalvingDateInDays')<QuantilesAgeInDays[6][1][1]))|\
         ((col('LactationNumber')==7)&(col('AgeAtCalvingDateInDays')>QuantilesAgeInDays[7][1][0])&(col('AgeAtCalvingDateInDays')<QuantilesAgeInDays[7][1][1]))|\
         ((col('LactationNumber')==8)&(col('AgeAtCalvingDateInDays')>QuantilesAgeInDays[8][1][0])&(col('AgeAtCalvingDateInDays')<QuantilesAgeInDays[8][1][1]))|\
         ((col('LactationNumber')==9)&(col('AgeAtCalvingDateInDays')>QuantilesAgeInDays[9][1][0])&(col('AgeAtCalvingDateInDays')<QuantilesAgeInDays[9][1][1]))|\
         ((col('LactationNumber')==10)&(col('AgeAtCalvingDateInDays')>QuantilesAgeInDays[10][1][0])&(col('AgeAtCalvingDateInDays')<QuantilesAgeInDays[10][1][1]))|\
         ((col('LactationNumber')==11)&(col('AgeAtCalvingDateInDays')>QuantilesAgeInDays[11][1][0])&(col('AgeAtCalvingDateInDays')<QuantilesAgeInDays[11][1][1]))|\
         ((col('LactationNumber')==12)&(col('AgeAtCalvingDateInDays')>QuantilesAgeInDays[12][1][0])&(col('AgeAtCalvingDateInDays')<QuantilesAgeInDays[12][1][1]))|\
         ((col('LactationNumber')==13)&(col('AgeAtCalvingDateInDays')>QuantilesAgeInDays[13][1][0])&(col('AgeAtCalvingDateInDays')<QuantilesAgeInDays[13][1][1]))|\
         ((col('LactationNumber')==14)&(col('AgeAtCalvingDateInDays')>QuantilesAgeInDays[12][1][0])&(col('AgeAtCalvingDateInDays')<QuantilesAgeInDays[14][1][1]))
        )

In [0]:
QuantilesAgeInDaysAfter = [(i, LactationLevelFiltered.filter((col('LactationNumber')==i)).\
                     agg(F.expr('percentile(AgeAtCalvingDateInDays,array(0.01,0.99))').alias('Q1')).\
                     select('Q1').\
                     collect()[0][0]) for i in range(0,15)]

In [0]:
QuantilesAgeInDaysAfter

Out[27]: [(0, None),
 (1, [652.0, 1040.0]),
 (2, [1003.0, 1525.0]),
 (3, [1348.0, 1979.0]),
 (4, [1693.0, 2416.0]),
 (5, [2038.0, 2856.1800000000003]),
 (6, [2373.01, 3317.0]),
 (7, [2729.0, 3729.7799999999997]),
 (8, [3081.81, 4170.379999999999]),
 (9, [3454.16, 4500.120000000001]),
 (10, [3778.0699999999997, 4874.02]),
 (11, [4349.3, 5365.5599999999995]),
 (12, [4711.5, 5603.099999999999]),
 (13, [5050.0, 5658.32]),
 (14, [5403.12, 5708.88])]

In [0]:
# overview
LactationLevelFiltered.\
    agg(F.count('*').alias('NumberOfLactations'),
        F.countDistinct("HerdIdentifier").alias('NumberOfHerds'),
        F.countDistinct("HerdIdentifier","AnimalIdentifier").alias('NumberOfCows')).\
    display()

NumberOfLactations,NumberOfHerds,NumberOfCows
114106,91,43422


### LastDIM

In [0]:
QuantilesLastDIM = LactationLevelFiltered.\
    agg(F.expr('percentile(LastDIM,array(0.01,0.99))').alias('Q1')).\
    select('Q1').\
    collect()[0][0]

In [0]:
QuantilesLastDIM

Out[31]: [5.0, 605.0]

In [0]:
LactationLevelFiltered = LactationLevelFiltered.\
    filter((col('LastDIM') < QuantilesLastDIM[1]))

In [0]:
# overview
LactationLevelFiltered.\
    agg(F.count('*').alias('NumberOfLactations'),
        F.countDistinct("HerdIdentifier").alias('NumberOfHerds'),
        F.countDistinct("HerdIdentifier","AnimalIdentifier").alias('NumberOfCows')).\
    display()

NumberOfLactations,NumberOfHerds,NumberOfCows
112950,91,42990


In [0]:
#95529301
FinalDataFiltered.count()

Out[34]: 95529301

In [0]:
FinalDataFiltered = FinalDataFiltered.\
    join(LactationLevelFiltered, ['HerdIdentifier','AnimalIdentifier','CalvingDate','LactationNumber'], 'inner')

In [0]:
FinalDataFiltered.\
  write.\
  parquet(path='wasbs://gpluse-cluster-2@bovianalytics.blob.core.windows.net/Projects/ChenYoungYan/11022022/Output/FinalDataFiltered/',mode="overwrite")

In [0]:
FinalDataFiltered = spark. \
  read. \
  parquet('wasbs://gpluse-cluster-2@bovianalytics.blob.core.windows.net/Projects/ChenYoungYan/11022022/Output/FinalDataFiltered/')

In [0]:
# 91296026
FinalDataFiltered.count()

Out[38]: 91295489

# day level

In [0]:
w1=Window.partitionBy('HerdIdentifier','AnimalIdentifier','CalvingDate','LactationNumber').\
  orderBy(F.desc('Date'))
w2=Window.partitionBy('HerdIdentifier','AnimalIdentifier','CalvingDate','LactationNumber').\
    orderBy(F.desc('Date')).\
    rowsBetween(0,12)
          
FinalDataFiltered  = FinalDataFiltered .\
    withColumn('DiffInSeconds',F.lag(col('Date')).over(w1).cast('long')-col('Date').cast('long')).\
    withColumn('DiffInHours',col('DiffInSeconds')/3600).\
    withColumn('SumMilkYield',F.sum('MilkYieldKg').over(w2)).\
    withColumn('SumHours',F.sum('DiffInHours').over(w2)).\
    withColumn('24hMilkYieldKg',(F.sum('MilkYieldKg').over(w2)/F.sum('DiffInHours').over(w2))*24)

In [0]:
FinalDataFiltered .\
    select('HerdIdentifier','AnimalIdentifier','Date','DaysInMilk','DiffInHours','MilkYieldKg','SumMilkYield','SumHours','24hMilkYieldKg').\
    filter((col('HerdIdentifier')==503)&\
        (col('AnimalIdentifier')==1992)).\
    orderBy(F.desc('Date')).\
    show(5)

+--------------+----------------+-------------------+----------+------------------+-----------+------------------+-----------------+------------------+
|HerdIdentifier|AnimalIdentifier|               Date|DaysInMilk|       DiffInHours|MilkYieldKg|      SumMilkYield|         SumHours|    24hMilkYieldKg|
+--------------+----------------+-------------------+----------+------------------+-----------+------------------+-----------------+------------------+
|           503|            1992|2022-01-11 00:24:01|       254|              null|        9.6|             116.1| 83.7938888888889|33.253021633770686|
|           503|            1992|2022-01-10 16:48:12|       253|7.5969444444444445|        9.5|115.80000000000001|91.19222222222223|30.476283308761715|
|           503|            1992|2022-01-10 09:22:37|       253| 7.426388888888889|       10.3|114.60000000000001|89.53666666666669|30.718141543501723|
|           503|            1992|2022-01-10 01:54:30|       253|7.4686111111111115|     

### last 24h MY per day

In [0]:
w3=Window.partitionBy('HerdIdentifier','AnimalIdentifier','CalvingDate','LactationNumber','DaysInMilk').\
  orderBy(F.desc('Date'))
          
FinalDataFiltered.\
    withColumn('TestDayMilkYield',F.first('24hMilkYieldKg').over(w3)).\
    select('HerdIdentifier','AnimalIdentifier','Date','DaysInMilk','DiffInHours','MilkYieldKg','SumMilkYield','SumHours','24hMilkYieldKg','TestDayMilkYield').\
    filter((col('HerdIdentifier')==503)&\
        (col('AnimalIdentifier')==1992)).\
    orderBy('Date').\
    show(5)

+--------------+----------------+-------------------+----------+------------------+-----------+------------------+------------------+------------------+------------------+
|HerdIdentifier|AnimalIdentifier|               Date|DaysInMilk|       DiffInHours|MilkYieldKg|      SumMilkYield|          SumHours|    24hMilkYieldKg|  TestDayMilkYield|
+--------------+----------------+-------------------+----------+------------------+-----------+------------------+------------------+------------------+------------------+
|           503|            1992|2019-05-29 23:28:48|         1|11.831944444444444|        6.0|               6.0|11.831944444444444|12.170442540204249|12.170442540204249|
|           503|            1992|2019-05-30 11:18:43|         2|  8.99388888888889|        5.8|              11.8|20.825833333333335| 13.59849545836501|14.272585728933276|
|           503|            1992|2019-05-30 20:18:21|         2| 8.433055555555555|        5.6|              17.4| 29.25888888888889|14.2725

In [0]:
w3=Window.partitionBy('HerdIdentifier','AnimalIdentifier','CalvingDate','LactationNumber','DaysInMilk').\
  orderBy(F.desc('Date'))
          
FinalDataFiltered = FinalDataFiltered.\
    withColumn('TestDayMilkYield',F.first('24hMilkYieldKg').over(w3))

In [0]:
FinalDataFiltered.\
  write.\
  parquet(path='wasbs://gpluse-cluster-2@bovianalytics.blob.core.windows.net/Projects/ChenYoungYan/11022022/Output/FinalDataFilteredTestDayLevel/',mode="overwrite")

In [0]:
FinalDataFiltered = spark. \
  read. \
  parquet('wasbs://gpluse-cluster-2@bovianalytics.blob.core.windows.net/Projects/ChenYoungYan/11022022/Output/FinalDataFilteredTestDayLevel/')

In [0]:
FittingData = FinalDataFiltered.\
    select('HerdIdentifier','AnimalIdentifier','DaysInMilk','TestDayMilkYield','LactationNumber','LastDIM').\
    distinct()

In [0]:
FittingData.count()

Out[51]: 31693777

In [0]:
FittingData.show(5)

+--------------+----------------+----------+------------------+---------------+-------+
|HerdIdentifier|AnimalIdentifier|DaysInMilk|  TestDayMilkYield|LactationNumber|LastDIM|
+--------------+----------------+----------+------------------+---------------+-------+
|           503|             602|        71| 35.61022779512705|              5|    344|
|           503|             751|       177|   33.581062181284|              3|    576|
|           503|             751|       333|23.319641856544063|              3|    576|
|           503|             751|       338| 22.89201500681387|              3|    576|
|           503|             755|        43| 52.45079215989614|              2|    471|
+--------------+----------------+----------+------------------+---------------+-------+
only showing top 5 rows



In [0]:
FittingData.\
  write.\
  parquet(path='wasbs://gpluse-cluster-2@bovianalytics.blob.core.windows.net/Projects/ChenYoungYan/11022022/Output/FittingData/',mode="overwrite")

In [0]:
# whole data
FittingData= spark. \
  read. \
  parquet('wasbs://gpluse-cluster-2@bovianalytics.blob.core.windows.net/Projects/ChenYoungYan/11022022/Output/FittingData/')

In [0]:
# overview
FittingData.\
    agg(F.count('*').alias('NumbersOfRecord'),
        F.countDistinct("HerdIdentifier").alias('NumbersOfHerds'),
        F.countDistinct("HerdIdentifier","AnimalIdentifier").alias('NumberOfCows')).\
    display()

NumbersOfRecord,NumbersOfHerds,NumberOfCows
31693777,91,42990
