<a href="https://colab.research.google.com/github/ChenYongyan-uu/1/blob/master/Merge_herd_lactation_curve_with_economic_data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Preparation

First mount your gdrive to connect to the data

In [None]:
from google.colab import drive 
drive.mount('/content/gdrive') 

Mounted at /content/gdrive


Install PySpark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz
!tar xf spark-2.4.8-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
!apt-get update

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.8-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

Initiate the spark session

In [None]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType, DoubleType, FloatType
import pyspark
from pyspark.sql import functions as F
from pyspark.sql.functions import year, col, array, monotonically_increasing_id, to_json, struct, round, datediff,unix_timestamp,udf,to_date ,avg
import json
from urllib.request import  urlopen
from pyspark.sql import Window

# Economic Data

## Raw data with selected columns

In [None]:
SelectedAccountancyData = spark \
  .read \
  .csv('/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Data/EconomicRawData/SelectedRawFlynthData.csv',header=True,inferSchema=True)

In [None]:
SelectedAccountancyData.count()

30341

In [None]:
len(SelectedAccountancyData.columns)

124

In [None]:
SelectedAccountancyData.select('KVK','FiscalYear').describe().show()

+-------+--------------------+------------------+
|summary|                 KVK|        FiscalYear|
+-------+--------------------+------------------+
|  count|               30341|             30341|
|   mean|3.0773749032484733E7|2009.8291349022209|
| stddev| 2.776762810949111E7|3.6563408775502557|
|    min|             1000421|              2004|
|    max|                  NA|                NA|
+-------+--------------------+------------------+



In [None]:
SelectedAccountancyData.\
  agg(F.count('*'),F.count('KVK'),F.countDistinct('KVK')).collect()

[Row(count(1)=30341, count(KVK)=30341, count(DISTINCT KVK)=2809)]

In [None]:
SelectedAccountancyData.printSchema()

root
 |-- KVK: string (nullable = true)
 |-- FiscalYear: string (nullable = true)
 |-- ProceOwnProduct: string (nullable = true)
 |-- Organic: string (nullable = true)
 |-- ProductType: string (nullable = true)
 |-- Robot: string (nullable = true)
 |-- GrassLandAreaInHa: string (nullable = true)
 |-- FodderCropLandAreaInHa: string (nullable = true)
 |-- TotalLandAreaInHa: string (nullable = true)
 |-- TotalOwnedLandAreaInHa: string (nullable = true)
 |-- TotalLeasedLandAreaInHa: string (nullable = true)
 |-- BossFTE: string (nullable = true)
 |-- PartnerFTE: string (nullable = true)
 |-- ChildrenFTE: string (nullable = true)
 |-- ForeignerFTE: string (nullable = true)
 |-- TotalFTE: string (nullable = true)
 |-- NumberOfHour: string (nullable = true)
 |-- MilkQuotaIn1000KgMilk : string (nullable = true)
 |-- TotalMilkInFarmKg: string (nullable = true)
 |-- TotalMilkForFactoryKg: string (nullable = true)
 |-- MilkRevenue: string (nullable = true)
 |-- MilkRevenueOfOwnProduct: string (nu

In [None]:
SelectedAccountancyData.show(2)

+-------+----------+---------------+-------+-----------+-----+-----------------+----------------------+-----------------+----------------------+-----------------------+-------+----------+-----------+------------+--------+------------+----------------------+-----------------+---------------------+-----------+-----------------------+-------------+-----------------+-------------------+-----------------+-------------+---------+---------------+-------------+------------+-------------------+-------------+-------------+-----------------+--------------------------------------+------------+--------------+-------------------+-------------+----------------------+-----------+------+-----------------------+--------------------------------+---------------------+---------------------------------------+------------------------------+------+----------------+------------------+--------------------------+--------------------+-------------+--------------------+--------------+----------------+-------------

### filter those variables contain ‘per100kg’
 As I will calculate those by myself.

In [None]:
selected = [s for s in SelectedAccountancyData.columns if 'Per100Kg' in s]
SelectedAccountancyData=SelectedAccountancyData.drop(*selected)

In [None]:
len(SelectedAccountancyData.columns)

77

47 columns were excluded

### Filter KVK and FiscalYear NA

In [None]:
SelectedAccountancyData.where(F.col('KVK')=='NA').count()

3159

In [None]:
SelectedAccountancyData.where(F.col('FiscalYear')=='NA').count()

171

In [None]:
SelectedAccountancyData=SelectedAccountancyData.\
  filter((F.col('KVK')!='NA') &\
     (F.col('FiscalYear')!='NA'))

In [None]:
SelectedAccountancyData.\
  agg(F.count('*'),F.count('KVK'),F.countDistinct('KVK')).collect()

[Row(count(1)=27182, count(KVK)=27182, count(DISTINCT KVK)=2808)]

### Calculate Feed cost, revenue, other variable cost

In [None]:
SelectedAccountancyData=SelectedAccountancyData.withColumnRenamed('OtherRevenues28','NonDairyRevenue').\
  withColumnRenamed('ProceOwnProduct','ProduceOwnProduct').\
  withColumnRenamed('TotalRevenues','TotalRevenuesFlynth').\
  withColumnRenamed('TotalFeedCosts','TotalFeedCostsFlynth').\
  withColumnRenamed('TotalVariableCosts','TotalOtherVariableCostsFlynth')

In [None]:
CalculatedSelectedAccountancyData=SelectedAccountancyData.\
  withColumn('TotalFeedCosts',(col("Concentrates")+col("VitaminsAndMinerals")+col("WetByProducts")+col("PurchasedRoughage")+col("ChangeOwnRoughageStockIncreaseDecrease")+col("PastureMoney"))).\
  withColumn('TotalRevenues',(col("MilkRevenue")+col("MilkRevenueOfOwnProduct")+col("SellLiveStock")+col("IncreaseDairyCattle")+col("SalesRoughage")+col("FeedMoney")+col("NonDairyRevenue"))).\
  withColumn('TotalOtherVariableCosts',(col("Fertilization")+col("PurchaseLivestock")+col("SeedsAndCropProtection")+col("HealthCosts")+col("AiBreedingAndMilkControlTogether")+col("EmbryoTransplantation")+col("RearingCostsOfYoungStockForThirdParties")+col("DirectCostsOfCheesePreparation")+col("Litter")+col("OtherDirectCosts")))

in the dataset, the money 'purchase livestock' was deducted in the revenue part, but it should belong to variable cost. So the revenue and other variable cost I calculated, should both higher than the dataset (the excess amount should be ‘LivestockPurchasesPer100Kg’

FeedCost exclude dairy products, since that's for heifers

Another thing should be remineded: variable cost = feed cost + other variable cost \
In the dataset, variable cost = other variable cost

In [None]:
CalculatedSelectedAccountancyData.show(2)

+-------+----------+-----------------+-------+-----------+-----+-----------------+----------------------+-----------------+----------------------+-----------------------+-------+----------+-----------+------------+--------+------------+----------------------+-----------------+---------------------+-----------+-----------------------+-------------+-----------------+-------------------+-----------------+-------------+---------+---------------+-------------------+------------+-------------------+-------------+-------------+-----------------+--------------------------------------+------------+--------------------+-------------------+-------------+----------------------+-----------+------+-----------------------+--------------------------------+---------------------+---------------------------------------+------------------------------+------+----------------+-----------------------------+--------------------------+--------------------+-------------+--------------------+--------------+-----

In [None]:
CalculatedSelectedAccountancyData=CalculatedSelectedAccountancyData.\
  withColumn('TotalVariableCosts',col('TotalFeedCosts')+col('TotalOtherVariableCosts'))

In [None]:
CalculatedSelectedAccountancyData=CalculatedSelectedAccountancyData.\
  withColumn('GrossMargin',col('TotalRevenues')-col('TotalVariableCosts'))

In [None]:
CalculatedSelectedAccountancyData.select('TotalFeedCosts','TotalOtherVariableCosts','TotalVariableCosts','TotalRevenues','GrossMargin').show(2)

+--------------+-----------------------+------------------+-------------+-----------+
|TotalFeedCosts|TotalOtherVariableCosts|TotalVariableCosts|TotalRevenues|GrossMargin|
+--------------+-----------------------+------------------+-------------+-----------+
|       18454.0|                11318.0|           29772.0|     120395.0|    90623.0|
|       20639.0|                25994.0|           46633.0|     140873.0|    94240.0|
+--------------+-----------------------+------------------+-------------+-----------+
only showing top 2 rows



### filter KVK has 1+ record in one year

In [None]:
from pyspark.sql import Window
w=Window.partitionBy('KVK','FiscalYear')

In [None]:
CalculatedSelectedAccountancyData.withColumn('AccountingRecordPerYear',F.count('*').over(w)).orderBy(F.desc('AccountingRecordPerYear')).show(3)

+-------+----------+---------------+-------+-----------+-----+-----------------+----------------------+-----------------+----------------------+-----------------------+-------+----------+-----------+------------+--------+------------+---------------------+-----------------+---------------------+-----------+-----------------------+-------------+-----------------+-------------------+-----------------+-------------+---------+---------------+-------------+------------+-------------------+-------------+-------------+-----------------+--------------------------------------+------------+--------------+-------------------+-------------+----------------------+-----------+------+-----------------------+--------------------------------+---------------------+---------------------------------------+------------------------------+------+----------------+------------------+--------------------------+--------------------+-------------+--------------------+--------------+----------------+--------------

In [None]:
CalculatedSelectedAccountancyData=CalculatedSelectedAccountancyData.withColumn('AccountingRecordPerYear',F.count('*').over(w))

In [None]:
CalculatedSelectedAccountancyData.\
  groupBy('AccountingRecordPerYear').\
  agg(F.count('*')).show()

+-----------------------+--------+
|AccountingRecordPerYear|count(1)|
+-----------------------+--------+
|                      1|   27156|
|                      2|      26|
+-----------------------+--------+



In [None]:
CalculatedSelectedAccountancyData=CalculatedSelectedAccountancyData.filter(col('AccountingRecordPerYear')<2)

In [None]:
CalculatedSelectedAccountancyData.\
  agg(F.count('*'),F.count('KVK'),F.countDistinct('KVK')).collect()

[Row(count(1)=27156, count(KVK)=27156, count(DISTINCT KVK)=2808)]

### save

In [None]:
CalculatedSelectedAccountancyData=CalculatedSelectedAccountancyData.\
  withColumnRenamed('MilkQuotaIn1000KgMilk ' , 'MilkQuotaIn1000KgMilk')

In [None]:
CalculatedSelectedAccountancyData.\
  repartition(1).\
  write.\
  parquet(path='/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet/FilteredCalculatedSelectedAccountancyData/',mode="overwrite")

### Add variables from raw data
equity, herdsize, Successor

In [None]:
CalculatedSelectedAccountancyData= spark \
  .read \
  .parquet('/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet/FilteredCalculatedSelectedAccountancyData/')

In [None]:
AddVariable= spark \
  .read \
  .parquet('/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet/AddVariable/')

In [None]:
AddVariable.count()

30341

In [None]:
AddVariable.describe().show()

+-------+--------------------+------------------+-------------------+-------------------+------------------------+
|summary|                 KVK|        FiscalYear|             Equity|          Successor|FlynthNumberOfMilkingCow|
+-------+--------------------+------------------+-------------------+-------------------+------------------------+
|  count|               27182|             30170|              22406|              30170|                   30170|
|   mean|3.0773749032484733E7|2009.8291349022209| 0.4261737927340839| 0.3472986410341399|       83.48348027842212|
| stddev|2.7767628109491173E7| 3.656340877550479|0.36745284994341454|0.47611953196547685|      46.445192374227474|
|    min|             0001064|              2004|              -5.38|                  0|                     4.7|
|    max|             9992594|              2016|                2.0|                  1|                  1074.5|
+-------+--------------------+------------------+-------------------+-----------

In [None]:
CalculatedSelectedAccountancyData=CalculatedSelectedAccountancyData.join(AddVariable, ['KVK','FiscalYear'], 'left')

In [None]:
len(CalculatedSelectedAccountancyData.columns)

86

In [None]:
CalculatedSelectedAccountancyData.\
  agg(F.count('*'),F.count('KVK'),F.countDistinct('KVK')).collect()

[Row(count(1)=27156, count(KVK)=27156, count(DISTINCT KVK)=2808)]

In [None]:
CalculatedSelectedAccountancyData.show(2)

+-------+----------+-----------------+-------+-----------+-----+-----------------+----------------------+-----------------+----------------------+-----------------------+-------+----------+-----------+------------+--------+------------+---------------------+-----------------+---------------------+-----------+-----------------------+-------------+-----------------+-------------------+-----------------+-------------+---------+---------------+-------------------+------------+-------------------+-------------+-------------+-----------------+--------------------------------------+------------+--------------------+-------------------+-------------+----------------------+-----------+------+-----------------------+--------------------------------+---------------------+---------------------------------------+------------------------------+------+----------------+-----------------------------+--------------------------+--------------------+-------------+--------------------+--------------+------

In [None]:
CalculatedSelectedAccountancyData.\
  repartition(1).\
  write.\
  parquet(path='/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet/SelectedAccountancyData/',mode="overwrite")

## UBN KVK key

In [None]:
UBNKVK = spark \
  .read \
  .option('delimiter',';')\
  .csv('/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Data/EconomicRawData/tr_trim-kvk_ubn.csv',header=True)

In [None]:
UBNKVK.show(3)

+----------+-------+
|Kvk-nummer|    UBN|
+----------+-------+
|   1967427|0483411|
|   9057218|6104835|
|   9104218|4837102|
+----------+-------+
only showing top 3 rows



In [None]:
key=UBNKVK.withColumnRenamed('Kvk-nummer','KVK')

In [None]:
key.printSchema()

root
 |-- KVK: string (nullable = true)
 |-- UBN: string (nullable = true)



In [None]:
key.agg(F.count('*'),F.count('KVK'),F.countDistinct('KVK'),F.count('UBN'),F.countDistinct('UBN')).collect()

[Row(count(1)=38463, count(KVK)=31732, count(DISTINCT KVK)=28173, count(UBN)=38463, count(DISTINCT UBN)=38463)]

### filter KVK null

In [None]:
key.filter(col('KVK').isNull()).count()

6731

In [None]:
key=key.filter(col('KVK').isNotNull())

In [None]:
key.agg(F.count('*'),F.count('KVK'),F.countDistinct('KVK'),F.count('UBN'),F.countDistinct('UBN')).collect()

[Row(count(1)=31732, count(KVK)=31732, count(DISTINCT KVK)=28173, count(UBN)=31732, count(DISTINCT UBN)=31732)]

### filter KVK with multiple UBN

check whether one kvk has 1+ UBN:

In [None]:
a=key.groupBy('KVK').agg(F.countDistinct("UBN").alias('NumberOfUBNPerKVK'))

In [None]:
a.groupBy('NumberOfUBNPerKVK').agg(F.countDistinct("KVK").alias('NumberOfKVK')).orderBy(F.desc('NumberOfKVK')).show()

+-----------------+-----------+
|NumberOfUBNPerKVK|NumberOfKVK|
+-----------------+-----------+
|                1|      25314|
|                2|       2446|
|                3|        295|
|                4|         64|
|                5|         24|
|                6|         10|
|                7|          6|
|                8|          5|
|                9|          3|
|               12|          2|
|               11|          1|
|               15|          1|
|               30|          1|
|               16|          1|
|             6731|          0|
+-----------------+-----------+



In [None]:
a.filter('NumberOfUBNPerKVK=16').show()

+-------+-----------------+
|    KVK|NumberOfUBNPerKVK|
+-------+-----------------+
|6011095|               16|
+-------+-----------------+



In [None]:
key.filter('KVK=6011095').show()

+-------+-------+
|    KVK|    UBN|
+-------+-------+
|6011095|9083009|
|6011095|3317166|
|6011095|2641338|
|6011095|3303031|
|6011095|3345365|
|6011095|5364342|
|6011095|2348835|
|6011095|8166378|
|6011095|5617149|
|6011095|8367159|
|6011095|1193728|
|6011095|2287071|
|6011095|7738255|
|6011095|1739755|
|6011095|7073044|
|6011095|1149738|
+-------+-------+



Then we decide to exclude those kvk with 1+ UBN

In [None]:
w2=Window.partitionBy('KVK')
key=key.withColumn('UBNPerKVK',F.count('*').over(w2))

In [None]:
key.show(2)

+-------+-------+---------+
|    KVK|    UBN|UBNPerKVK|
+-------+-------+---------+
|0092684|1611606|        1|
|0132396|7134505|        1|
+-------+-------+---------+
only showing top 2 rows



In [None]:
key=key.filter(col('UBNPerKVK')<2)

In [None]:
key.agg(F.count('*'),F.count('KVK'),F.countDistinct('KVK'),F.count('UBN'),F.countDistinct('UBN')).collect()

[Row(count(1)=25314, count(KVK)=25314, count(DISTINCT KVK)=25314, count(UBN)=25314, count(DISTINCT UBN)=25314)]

Finally, one kvk only has one UBN

In [None]:
key.\
  repartition(1).\
  write.\
  parquet(path='/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet/KvkToUbnFilteredKey/',mode="overwrite")

## join ubn to economic data


In [None]:
key = spark \
  .read \
  .parquet('/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet/KvkToUbnFilteredKey/')

In [None]:
key.printSchema()

root
 |-- KVK: string (nullable = true)
 |-- UBN: string (nullable = true)
 |-- UBNPerKVK: long (nullable = true)



In [None]:
key.agg(F.count('KVK'),F.countDistinct('KVK'),F.count('UBN'),F.countDistinct('UBN')).collect()

[Row(count(KVK)=25314, count(DISTINCT KVK)=25314, count(UBN)=25314, count(DISTINCT UBN)=25314)]

In [None]:
key.show(2)

+-------+-------+---------+
|    KVK|    UBN|UBNPerKVK|
+-------+-------+---------+
|0092684|1611606|        1|
|0132396|7134505|        1|
+-------+-------+---------+
only showing top 2 rows



In [None]:
CalculatedSelectedAccountancyData= spark \
  .read \
  .parquet('/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet/SelectedAccountancyData/')

In [None]:
len(CalculatedSelectedAccountancyData.columns)

86

In [None]:
CalculatedSelectedAccountancyData.agg(F.count('KVK'),F.countDistinct('KVK')).collect()

[Row(count(KVK)=27156, count(DISTINCT KVK)=2808)]

In [None]:
CalculatedSelectedAccountancyData.show(2)

+-------+----------+-----------------+-------+-----------+-----+-----------------+----------------------+-----------------+----------------------+-----------------------+-------+----------+-----------+------------+--------+------------+---------------------+-----------------+---------------------+-----------+-----------------------+-------------+-----------------+-------------------+-----------------+-------------+---------+---------------+-------------------+------------+-------------------+-------------+-------------+-----------------+--------------------------------------+------------+--------------------+-------------------+-------------+----------------------+-----------+------+-----------------------+--------------------------------+---------------------+---------------------------------------+------------------------------+------+----------------+-----------------------------+--------------------------+--------------------+-------------+--------------------+--------------+------

In [None]:
AccountingWithUBN=CalculatedSelectedAccountancyData.\
  join(key,['KVK'],'inner').\
  drop('UBNPerKVK')

In [None]:
AccountingWithUBN.show(2)

+-------+----------+-----------------+-------+-----------+-----+-----------------+----------------------+-----------------+----------------------+-----------------------+-------+----------+-----------+------------+--------+------------+---------------------+-----------------+---------------------+-----------+-----------------------+-------------+-----------------+-------------------+-----------------+-------------+---------+---------------+-------------------+------------+-------------------+-------------+-------------+-----------------+--------------------------------------+------------+--------------------+-------------------+-------------+----------------------+-----------+------+-----------------------+--------------------------------+---------------------+---------------------------------------+------------------------------+------+----------------+-----------------------------+--------------------------+--------------------+-------------+--------------------+--------------+------

In [None]:
AccountingWithUBN.agg(F.count('*'),F.count('KVK'),F.countDistinct('KVK'),F.count('UBN'),F.countDistinct('UBN')).collect()

[Row(count(1)=21148, count(KVK)=21148, count(DISTINCT KVK)=2106, count(UBN)=21148, count(DISTINCT UBN)=2106)]

In [None]:
AccountingWithUBN.printSchema()

In [None]:
AccountingWithUBN=AccountingWithUBN.withColumn('UBN',AccountingWithUBN['UBN'].cast(IntegerType())).\
  withColumn('FiscalYear',AccountingWithUBN['FiscalYear'].cast(IntegerType()))

In [None]:
AccountingWithUBN.show(2)

+-------+----------+-----------------+-------+-----------+-----+-----------------+----------------------+-----------------+----------------------+-----------------------+-------+----------+-----------+------------+--------+------------+---------------------+-----------------+---------------------+-----------+-----------------------+-------------+-----------------+-------------------+-----------------+-------------+---------+---------------+-------------------+------------+-------------------+-------------+-------------+-----------------+--------------------------------------+------------+--------------------+-------------------+-------------+----------------------+-----------+------+-----------------------+--------------------------------+---------------------+---------------------------------------+------------------------------+------+----------------+-----------------------------+--------------------------+--------------------+-------------+--------------------+--------------+------

## join crv herd data



### DIM, MilkKg, M305

In [None]:
HerdInformation= spark \
  .read \
  .parquet('/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet/CrvHerdDataPerTestYear/')

In [None]:
HerdInformation.printSchema()

root
 |-- UBN: integer (nullable = true)
 |-- TestYear: integer (nullable = true)
 |-- AverageDIM: double (nullable = true)
 |-- AverageMilkKg: double (nullable = true)
 |-- 305MilkKg: double (nullable = true)



In [None]:
HerdInformation.show(2)

+-------+--------+------------------+------------------+-----------------+
|    UBN|TestYear|        AverageDIM|     AverageMilkKg|        305MilkKg|
+-------+--------+------------------+------------------+-----------------+
|4930102|    2008| 165.9090909090909|21.436363567005504|6667.454545454545|
|1049546|    2015|205.55555555555554|23.200000127156574|7714.222222222223|
+-------+--------+------------------+------------------+-----------------+
only showing top 2 rows



In [None]:
HerdInformation.agg(F.count('*'),F.count('UBN'),F.countDistinct('UBN')).collect()

[Row(count(1)=185509, count(UBN)=185509, count(DISTINCT UBN)=20553)]

In [None]:
condition=[AccountingWithUBN.UBN == HerdInformation.UBN, \
 AccountingWithUBN.FiscalYear == HerdInformation.TestYear]

In [None]:
AccountingWithUBNWithProduction=AccountingWithUBN.join(HerdInformation, condition,'left').\
  drop(HerdInformation.UBN)

In [None]:
AccountingWithUBNWithProduction.show(2)

+-------+----------+-----------------+-------+-----------+-----+-----------------+----------------------+-----------------+----------------------+-----------------------+-------+----------+-----------+------------+--------+------------+---------------------+-----------------+---------------------+-----------+-----------------------+-------------+-----------------+-------------------+-----------------+-------------+---------+---------------+-------------------+------------+-------------------+-------------+-------------+-----------------+--------------------------------------+------------+--------------------+-------------------+-------------+----------------------+-----------+------+-----------------------+--------------------------------+---------------------+---------------------------------------+------------------------------+------+----------------+-----------------------------+--------------------------+--------------------+-------------+--------------------+--------------+------

In [None]:
AccountingWithUBNWithProduction.agg(F.count('*'),F.count('UBN'),F.countDistinct('UBN')).collect()

[Row(count(1)=21148, count(UBN)=21148, count(DISTINCT UBN)=2106)]

### SCC

In [None]:
HerdSCCPerYear = spark \
  .read \
  .parquet('/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet/HerdSCCPerYear/')

In [None]:
HerdSCCPerYear.printSchema()

root
 |-- HerdIdentifier: integer (nullable = true)
 |-- TestYear: integer (nullable = true)
 |-- AverageSCCPerYear: double (nullable = true)
 |-- AverageSCCTestedCowPerYear: double (nullable = true)
 |-- Average%HighCellCountPerYear: double (nullable = true)
 |-- Average%NewInfectionsPerYear: double (nullable = true)



In [None]:
HerdSCCPerYear=HerdSCCPerYear.withColumnRenamed('HerdIdentifier','UBN')

In [None]:
HerdSCCPerYear.show(2)

+-------+--------+-----------------+--------------------------+----------------------------+----------------------------+
|    UBN|TestYear|AverageSCCPerYear|AverageSCCTestedCowPerYear|Average%HighCellCountPerYear|Average%NewInfectionsPerYear|
+-------+--------+-----------------+--------------------------+----------------------------+----------------------------+
|7410854|    2013|           221.13|                     102.5|                       18.08|                        8.17|
|2483417|    2012|           163.78|                     92.44|                       16.54|                        9.17|
+-------+--------+-----------------+--------------------------+----------------------------+----------------------------+
only showing top 2 rows



In [None]:
HerdSCCPerYear.agg(F.count('*'),F.count('UBN'),F.countDistinct('UBN')).collect()

[Row(count(1)=183183, count(HerdIdentifier)=183183, count(DISTINCT HerdIdentifier)=20393)]

In [None]:
AccountingWithUBNWithProduction.agg(F.count('*'),F.count('UBN'),F.countDistinct('UBN')).collect()

[Row(count(1)=21148, count(UBN)=21148, count(DISTINCT UBN)=2106)]

In [None]:
AccountingWithUBNWithProductionWithSCC=AccountingWithUBNWithProduction.\
  join(HerdSCCPerYear, ['UBN','TestYear'],'left').\
  drop(HerdSCCPerYear.TestYear, HerdSCCPerYear.HerdIdentifier)

In [None]:
AccountingWithUBNWithProductionWithSCC=AccountingWithUBNWithProduction.\
  join(HerdSCCPerYear, ['UBN','TestYear'],'left')

Why left? because some of the farm they don't have SCC record. And we don't know whether SCC affect our dependent variable.

In [None]:
AccountingWithUBNWithProductionWithSCC.agg(F.count('*'),F.count('UBN'),F.countDistinct('UBN')).collect()

[Row(count(1)=21148, count(UBN)=21148, count(DISTINCT UBN)=2106)]

In [None]:
AccountingWithUBNWithProductionWithSCC.show(2)

+-------+--------+-------+----------+-----------------+-------+-----------+-----+-----------------+----------------------+-----------------+----------------------+-----------------------+-------+----------+-----------+------------+--------+------------+---------------------+-----------------+---------------------+-----------+-----------------------+-------------+-----------------+-------------------+-----------------+-------------+---------+---------------+-------------------+------------+-------------------+-------------+-------------+-----------------+--------------------------------------+------------+--------------------+-------------------+-------------+----------------------+-----------+------+-----------------------+--------------------------------+---------------------+---------------------------------------+------------------------------+------+----------------+-----------------------------+--------------------------+--------------------+-------------+--------------------+----

### Calving Interval, AgeInDays

In [None]:
CalvingIntervalFromHerdData = spark \
  .read \
  .parquet('/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet/AgeInDaysAndCalvingInterval/')

In [None]:
CalvingIntervalFromHerdData.show(2)

+--------------+----+----------------------------+---------------+
|HerdIdentifier|Year|AverageAgeInDaysOfLivingCows|CalvingInterval|
+--------------+----+----------------------------+---------------+
|        400517|2007|                      1546.0|          396.0|
|       1040480|2007|                      2050.0|          513.0|
+--------------+----+----------------------------+---------------+
only showing top 2 rows



In [None]:
CalvingIntervalFromHerdData=CalvingIntervalFromHerdData.\
  withColumnRenamed('Year','TestYear').\
  withColumnRenamed('HerdIdentifier','UBN')

In [None]:
CalvingIntervalFromHerdData.agg(F.count('*'),F.count('UBN'),F.countDistinct('UBN')).collect()

[Row(count(1)=185731, count(UBN)=185731, count(DISTINCT UBN)=20796)]

In [None]:
AccountingWithUBNWithProductionWithSCCWithCI = AccountingWithUBNWithProductionWithSCC.\
  join(CalvingIntervalFromHerdData, ['UBN','TestYear'], 'left')

In [None]:
AccountingWithUBNWithProductionWithSCCWithCI.agg(F.count('*'),F.count('UBN'),F.countDistinct('UBN')).collect()

[Row(count(1)=21148, count(UBN)=21148, count(DISTINCT UBN)=2106)]

In [None]:
AccountingWithUBNWithProductionWithSCCWithCI.show(2)

+-------+--------+-------+----------+-----------------+-------+-----------+-----+-----------------+----------------------+-----------------+----------------------+-----------------------+-------+----------+-----------+------------+--------+------------+---------------------+-----------------+---------------------+-----------+-----------------------+-------------+-----------------+-------------------+-----------------+-------------+---------+---------------+-------------------+------------+-------------------+-------------+-------------+-----------------+--------------------------------------+------------+--------------------+-------------------+-------------+----------------------+-----------+------+-----------------------+--------------------------------+---------------------+---------------------------------------+------------------------------+------+----------------+-----------------------------+--------------------------+--------------------+-------------+--------------------+----

### Herd lactation curve charateristics

#### combine 2 parity columns to 1 column

In [None]:
HerdWeighted= spark \
  .read \
  .parquet('/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet/HerdWeighted')

In [None]:
HerdWeighted.show(2)

+--------------+--------+-----------+------------------+------------------+--------------------+--------------------+------------------+-----------------+--------------+-----------+-------------+----------+------------+-----------+-----------------+------------+
|HerdIdentifier|TestYear|ParityGroup|       MeanScaleKg|          MeanRamp|          MeanOffset|           MeanDecay|   MeanPersistence|       MeanM305Kg|NumberOfAnimal|TotalWeight|MedianScaleKg|MedianRamp|MedianOffset|MedianDecay|MedianPersistence|MedianM305Kg|
+--------------+--------+-----------+------------------+------------------+--------------------+--------------------+------------------+-----------------+--------------+-----------+-------------+----------+------------+-----------+-----------------+------------+
|        762411|    2017|         2+|  44.1255412371134|23.196907216494854| -0.3257731958762887|0.002591030927835...|286.92783505154637|8799.090206185567|            75|        388|        44.26|      22.5|     

In [None]:
HerdWeighted.filter((col('HerdIdentifier')=='4170115')&\
           (col('TestYear')==2013)).show(3)

+--------------+--------+-----------+-----------------+------------------+--------------------+--------------------+-----------------+-----------------+--------------+-----------+-------------+----------+------------+-----------+-----------------+------------+
|HerdIdentifier|TestYear|ParityGroup|      MeanScaleKg|          MeanRamp|          MeanOffset|           MeanDecay|  MeanPersistence|       MeanM305Kg|NumberOfAnimal|TotalWeight|MedianScaleKg|MedianRamp|MedianOffset|MedianDecay|MedianPersistence|MedianM305Kg|
+--------------+--------+-----------+-----------------+------------------+--------------------+--------------------+-----------------+-----------------+--------------+-----------+-------------+----------+------------+-----------+-----------------+------------+
|       4170115|    2013|          1|33.60602094240838|29.476439790575917|                -0.5|0.001566175652091...|575.1570680628272|7690.596282336968|            54|        191|        32.58|      29.8|        -0.5|

In [None]:
HerdWeighted.agg(F.count('*'),F.count('HerdIdentifier'),F.countDistinct('HerdIdentifier')).collect()

[Row(count(1)=372752, count(HerdIdentifier)=372752, count(DISTINCT HerdIdentifier)=20597)]

reshape two group's results into one row

In [None]:
HerdWeightedPerYear=HerdWeighted.groupBy('HerdIdentifier','TestYear').\
  pivot('ParityGroup').\
  agg(F.first('MeanScaleKg').alias('MeanMagnitude'),
    F.first('MeanRamp').alias('MeanTimeToPeak'),
    F.first('MeanOffset').alias('MeanOffset'),
    F.first('MeanDecay').alias('MeanDecay'),
    F.first('MeanPersistence').alias('MeanPersistence'),
    F.first('MeanM305Kg').alias('MeanM305Kg'),
    F.first('MedianScaleKg').alias('MedianMagnitude'),
    F.first('MedianRamp').alias('MedianTimeToPeak'),
    F.first('MedianOffset').alias('MedianOffset'),
    F.first('MedianDecay').alias('MedianDecay'),
    F.first('MedianPersistence').alias('MedianPersistence'),
    F.first('MedianM305Kg').alias('MedianM305Kg'))

In [None]:
HerdWeightedPerYear.agg(F.count('*'),F.count('HerdIdentifier'),F.countDistinct('HerdIdentifier')).collect()

[Row(count(1)=187068, count(HerdIdentifier)=187068, count(DISTINCT HerdIdentifier)=20597)]

In [None]:
HerdWeightedPerYear.show(3)

+--------------+--------+-----------------+------------------+------------+--------------------+-----------------+-----------------+-----------------+------------------+--------------+-------------+-------------------+--------------+------------------+------------------+--------------------+--------------------+------------------+------------------+------------------+-------------------+---------------+--------------+--------------------+---------------+
|HerdIdentifier|TestYear|  1_MeanMagnitude|  1_MeanTimeToPeak|1_MeanOffset|         1_MeanDecay|1_MeanPersistence|     1_MeanM305Kg|1_MedianMagnitude|1_MedianTimeToPeak|1_MedianOffset|1_MedianDecay|1_MedianPersistence|1_MedianM305Kg|  2+_MeanMagnitude| 2+_MeanTimeToPeak|       2+_MeanOffset|        2+_MeanDecay|2+_MeanPersistence|     2+_MeanM305Kg|2+_MedianMagnitude|2+_MedianTimeToPeak|2+_MedianOffset|2+_MedianDecay|2+_MedianPersistence|2+_MedianM305Kg|
+--------------+--------+-----------------+------------------+------------+-------

In [None]:
HerdWeightedPerYear.\
  repartition(1).\
  write.\
  parquet(path='/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet/HerdWeightedPerYear/',mode="overwrite")

#### join

In [None]:
HerdWeightedPerYear= spark \
  .read \
  .parquet('/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet/HerdWeightedPerYear/')

In [None]:
HerdWeightedPerYear.show(2)

+--------------+--------+-----------------+------------------+------------+--------------------+-----------------+-----------------+-----------------+------------------+--------------+-------------+-------------------+--------------+------------------+------------------+--------------------+--------------------+------------------+-----------------+------------------+-------------------+---------------+--------------+--------------------+---------------+
|HerdIdentifier|TestYear|  1_MeanMagnitude|  1_MeanTimeToPeak|1_MeanOffset|         1_MeanDecay|1_MeanPersistence|     1_MeanM305Kg|1_MedianMagnitude|1_MedianTimeToPeak|1_MedianOffset|1_MedianDecay|1_MedianPersistence|1_MedianM305Kg|  2+_MeanMagnitude| 2+_MeanTimeToPeak|       2+_MeanOffset|        2+_MeanDecay|2+_MeanPersistence|    2+_MeanM305Kg|2+_MedianMagnitude|2+_MedianTimeToPeak|2+_MedianOffset|2+_MedianDecay|2+_MedianPersistence|2+_MedianM305Kg|
+--------------+--------+-----------------+------------------+------------+---------

In [None]:
w=Window.partitionBy('HerdIdentifier').\
  orderBy(F.asc('TestYear'))

In [None]:
HerdWeightedPerYear=HerdWeightedPerYear.\
  withColumn('LastyearInCRV',F.lag(HerdWeightedPerYear.TestYear,1).over(w)).\
  withColumn('NextyearInCRV',F.lead(HerdWeightedPerYear.TestYear,1).over(w))

In [None]:
HerdWeightedPerYear=HerdWeightedPerYear.\
  withColumnRenamed('HerdIdentifier','UBN')

In [None]:
HerdWeightedPerYear.show(5)

+------+--------+------------------+------------------+------------+--------------------+------------------+------------------+-----------------+------------------+--------------+-------------+-------------------+--------------+------------------+------------------+--------------------+--------------------+------------------+-----------------+------------------+-------------------+---------------+--------------+--------------------+---------------+-------------+-------------+
|   UBN|TestYear|   1_MeanMagnitude|  1_MeanTimeToPeak|1_MeanOffset|         1_MeanDecay| 1_MeanPersistence|      1_MeanM305Kg|1_MedianMagnitude|1_MedianTimeToPeak|1_MedianOffset|1_MedianDecay|1_MedianPersistence|1_MedianM305Kg|  2+_MeanMagnitude| 2+_MeanTimeToPeak|       2+_MeanOffset|        2+_MeanDecay|2+_MeanPersistence|    2+_MeanM305Kg|2+_MedianMagnitude|2+_MedianTimeToPeak|2+_MedianOffset|2+_MedianDecay|2+_MedianPersistence|2+_MedianM305Kg|LastyearInCRV|NextyearInCRV|
+------+--------+------------------+--

In [None]:
AccountingWithUBNWithProductionWithSCCWithCI.agg(F.count('*'),F.count('UBN'),F.countDistinct('UBN')).collect()

[Row(count(1)=21148, count(UBN)=21148, count(DISTINCT UBN)=2106)]

In [None]:
HerdWeightedPerYear.agg(F.count('*'),F.count('UBN'),F.countDistinct('UBN')).collect()

[Row(count(1)=187068, count(UBN)=187068, count(DISTINCT UBN)=20597)]

In [None]:
AccountingWithUBNWithProductionWithSCCWithCIWithLCC=HerdWeightedPerYear.\
  join(AccountingWithUBNWithProductionWithSCCWithCI, ['UBN','TestYear'] ,'inner')

In [None]:
AccountingWithUBNWithProductionWithSCCWithCIWithLCC.agg(F.count('*'),F.count('UBN'),F.countDistinct('UBN')).collect()

[Row(count(1)=15710, count(UBN)=15710, count(DISTINCT UBN)=1957)]

In [None]:
AccountingWithUBNWithProductionWithSCCWithCIWithLCC.show(2)

+------+--------+------------------+------------------+------------+--------------------+------------------+-----------------+-----------------+------------------+--------------+-------------+-------------------+--------------+-----------------+------------------+--------------------+--------------------+------------------+-----------------+------------------+-------------------+---------------+--------------+--------------------+---------------+-------------+-------------+-------+----------+-----------------+-------+-----------+-----+-----------------+----------------------+-----------------+----------------------+-----------------------+-------+----------+-----------+------------+--------+------------+---------------------+-----------------+---------------------+-----------+-----------------------+-------------+-----------------+-------------------+-----------------+-------------+---------+---------------+-------------------+------------+-------------------+-------------+------------

## Expansion rate

In [None]:
import pyspark.sql.functions as F
from pyspark.sql import Window

In [None]:
w=Window.partitionBy('UBN').\
  orderBy(F.asc('FiscalYear'))

AccountingWithUBNWithProductionWithSCCWithCIWithLCC=AccountingWithUBNWithProductionWithSCCWithCIWithLCC.withColumn('ExpansionRate',(ExpansionRate.FlynthNumberOfMilkingCow-F.lag(ExpansionRate['FlynthNumberOfMilkingCow']).over(w))/F.lag(ExpansionRate['FlynthNumberOfMilkingCow']).over(w))

In [None]:
AccountingWithUBNWithProductionWithSCCWithCIWithLCC.select('UBN','FiscalYear','FlynthNumberOfMilkingCow','ExpansionRate').show(5)

+-------+----------+------------------------+--------------------+
|    UBN|FiscalYear|FlynthNumberOfMilkingCow|       ExpansionRate|
+-------+----------+------------------------+--------------------+
|1454517|      2009|                    75.5|                null|
|1454517|      2010|                    70.4|-0.06754966887417212|
|1454517|      2011|                    80.7| 0.14630681818181812|
|1454517|      2012|                    70.1|-0.13135068153655524|
|1454517|      2013|                    71.8|0.024251069900142697|
+-------+----------+------------------------+--------------------+
only showing top 5 rows



In [None]:
AccountingWithUBNWithProductionWithSCCWithCIWithLCC.show(3)

+-------+--------+------------------+------------------+------------+--------------------+-----------------+-----------------+-----------------+------------------+--------------+-------------+-------------------+--------------+------------------+------------------+--------------------+--------------------+------------------+-----------------+------------------+-------------------+---------------+--------------+--------------------+---------------+-------------+-------------+-------+----------+-----------------+-------+-----------+-----+-----------------+----------------------+-----------------+----------------------+-----------------------+-------+----------+-----------+------------+--------+------------+---------------------+-----------------+---------------------+-----------+-----------------------+-------------+-----------------+-------------------+-----------------+-------------+---------+---------------+-------------------+------------+-------------------+-------------+-----------

## Save

In [None]:
AccountingWithUBNWithProductionWithSCCWithCIWithLCC.\
  repartition(1).\
  write.\
  csv(path='/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/CSV/AccountingWithUBNWithProductionWithSCCWithCIWithLCC/',mode="overwrite",header=True)

In [None]:
AccountingWithUBNWithProductionWithSCCWithCIWithLCC.\
  repartition(1).\
  write.\
  parquet(path='/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet/AccountingWithUBNWithProductionWithSCCWithCIWithLCC/',mode="overwrite")

# Calculate herd lactation curve characteristics

## calculate weight

In [None]:
MilkRecordFormatted= spark \
  .read \
  .parquet('/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet/MilkRecordFormatted/')

In [None]:
MilkRecordFormatted.show(2)

+----------------+----------+------+-----------+--------------+----------+-----------+-------------+-----------------+--------+----------------+----------------+-------------+---------+----------+--------+---------+
|AnimalIdentifier| BirthDate|Parity|CalvingDate|HerdIdentifier|      Date|MilkYieldKg|FatPercentage|ProteinPercentage|     SCC|NumberOfMilkings|StatusProduction|StatusFictive|StatusCow|DaysInMilk|TestYear|AgeInDays|
+----------------+----------+------+-----------+--------------+----------+-----------+-------------+-----------------+--------+----------------+----------------+-------------+---------+----------+--------+---------+
|    NL 103926450|1994-08-22|   9.0| 2005-09-11|       5341329|2007-01-17|        5.5|         4.99|             4.05|445000.0|             2.0|             0.0|          0.0|      0.0|     493.0|    2007|   4531.0|
|    NL 107789383|1993-10-29|  11.0| 2006-09-03|       4109485|2007-01-17|       21.1|         3.72|              2.8| 23000.0|         

In [None]:
MilkRecordFormatted.count()

134520115

In [None]:
MilkRecordFormatted.agg(F.countDistinct("HerdIdentifier"),F.countDistinct('AnimalIdentifier')).collect()

[Row(count(DISTINCT HerdIdentifier)=20603, count(DISTINCT AnimalIdentifier)=5901586)]

In [None]:
weight=MilkRecordFormatted.groupBy('HerdIdentifier','AnimalIdentifier','CalvingDate','Parity','TestYear').\
  agg(F.count('*').alias('Weight'))

In [None]:
weight.show(3)

+--------------+----------------+-----------+------+--------+------+
|HerdIdentifier|AnimalIdentifier|CalvingDate|Parity|TestYear|Weight|
+--------------+----------------+-----------+------+--------+------+
|       4109485|    NL 107789383| 2006-09-03|  11.0|    2007|     8|
|       5561233|    NL 137072554| 2006-11-14|  11.0|    2007|     7|
|       3955511|    NL 174293008| 2007-06-06|   9.0|    2007|     6|
+--------------+----------------+-----------+------+--------+------+
only showing top 3 rows



In [None]:
weight.agg(F.countDistinct("HerdIdentifier"),F.countDistinct('AnimalIdentifier')).collect()

[Row(count(DISTINCT HerdIdentifier)=20603, count(DISTINCT AnimalIdentifier)=5901586)]


## combine weight with FittingResult

In [None]:
FittingPerLactation= spark \
  .read \
  .parquet('/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet/FittingPerLactationRecalculated/')

In [None]:
FittingPerLactation.show(2)

+--------------+----------------+-----------+------+----------+------+-------+----+------+-------+-----------+--------+------+
|HerdIdentifier|AnimalIdentifier|CalvingDate|Parity|DaysInMilk|Points|ScaleKg|Ramp|Offset|  Decay|Persistence|StdErrKg|M305Kg|
+--------------+----------------+-----------+------+----------+------+-------+----+------+-------+-----------+--------+------+
|           111|    NL 377418789| 2006-06-23|   1.0|     249.0|   2.0|  33.79|30.0|  -0.5|0.00263|      264.0|     0.4|6613.0|
|           111|    NL 481411289| 2010-11-11|   2.0|       5.0|   1.0|  43.29|22.5|  -0.8|0.00276|      251.0|     0.0|8463.0|
+--------------+----------------+-----------+------+----------+------+-------+----+------+-------+-----------+--------+------+
only showing top 2 rows



In [None]:
FittingPerLactation.count()

16160238

In [None]:
FittingPerLactation.agg(F.countDistinct("HerdIdentifier"),F.countDistinct('AnimalIdentifier')).collect()

[Row(count(DISTINCT HerdIdentifier)=20597, count(DISTINCT AnimalIdentifier)=5843321)]

In [None]:
Weight=weight.join(FittingPerLactation,['HerdIdentifier','AnimalIdentifier','Parity','CalvingDate'],'inner')

In [None]:
Weight.agg(F.countDistinct("HerdIdentifier"),F.countDistinct('AnimalIdentifier')).collect()

[Row(count(DISTINCT HerdIdentifier)=20597, count(DISTINCT AnimalIdentifier)=5843321)]

In [None]:
Weight.count()

26895467

In [None]:
Weight.orderBy('HerdIdentifier','AnimalIdentifier','Parity').show(10)

+--------------+----------------+------+-----------+--------+------+----------+------+-------+----+------+-------+-----------+--------+------+
|HerdIdentifier|AnimalIdentifier|Parity|CalvingDate|TestYear|Weight|DaysInMilk|Points|ScaleKg|Ramp|Offset|  Decay|Persistence|StdErrKg|M305Kg|
+--------------+----------------+------+-----------+--------+------+----------+------+-------+----+------+-------+-----------+--------+------+
|           111|    NL 262732219|   5.0| 2006-07-02|    2007|     4|     334.0|   4.0|  39.03|22.5|   0.0|0.00136|      510.0|     0.9|9293.0|
|           111|    NL 262732219|   6.0| 2007-08-06|    2007|     3|     158.0|   4.0|  46.89|25.3|   0.0|0.00381|      182.0|     1.4|7901.0|
|           111|    NL 262732219|   6.0| 2007-08-06|    2008|     1|     158.0|   4.0|  46.89|25.3|   0.0|0.00381|      182.0|     1.4|7901.0|
|           111|    NL 267356449|   5.0| 2006-09-11|    2007|     2|     402.0|   2.0|  49.56|22.5|   0.0|0.00293|      237.0|     1.3|9450.0|

In [None]:
from pyspark.sql import Window
w=Window.partitionBy('HerdIdentifier','AnimalIdentifier','Parity')

In [None]:
Weight=Weight.withColumn('NYear',F.count('*').over(w)).\
      withColumn('ParityGroup',F.when(F.col('Parity')==1,'1').otherwise('2+'))

In [None]:
Weight.show(5)

+--------------+----------------+------+-----------+--------+------+----------+------+-------+----+------+-------+-----------+--------+------+-----+-----------+
|HerdIdentifier|AnimalIdentifier|Parity|CalvingDate|TestYear|Weight|DaysInMilk|Points|ScaleKg|Ramp|Offset|  Decay|Persistence|StdErrKg|M305Kg|NYear|ParityGroup|
+--------------+----------------+------+-----------+--------+------+----------+------+-------+----+------+-------+-----------+--------+------+-----+-----------+
|           111|    NL 377418758|   3.0| 2008-07-06|    2008|     4|     360.0|   8.0|  48.41|26.9|   0.0|0.00257|      269.0|     3.5|9598.0|    2|         2+|
|           111|    NL 377418758|   3.0| 2008-07-06|    2009|     5|     360.0|   8.0|  48.41|26.9|   0.0|0.00257|      269.0|     3.5|9598.0|    2|         2+|
|           111|    NL 539813836|   3.0| 2016-03-23|    2017|     2|     338.0|   8.0|   40.7|27.2|   0.0|0.00376|      184.0|     1.3|6872.0|    2|         2+|
|           111|    NL 539813836| 

In [None]:
Weight.\
  repartition(1).\
  write.\
  parquet(path='/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet/FittingPerLactationWithWeight/',mode="overwrite")

## weight explore

if weight=2, then the row will show 2 times in the dataset.

In [None]:
Weight= spark \
  .read \
  .parquet('/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet/FittingPerLactationWithWeight/')

In [None]:
Weight.show(2)

+--------------+----------------+------+-----------+--------+------+----------+------+-------+----+------+-------+-----------+--------+------+-----+-----------+
|HerdIdentifier|AnimalIdentifier|Parity|CalvingDate|TestYear|Weight|DaysInMilk|Points|ScaleKg|Ramp|Offset|  Decay|Persistence|StdErrKg|M305Kg|NYear|ParityGroup|
+--------------+----------------+------+-----------+--------+------+----------+------+-------+----+------+-------+-----------+--------+------+-----+-----------+
|           111|    NL 377418758|   3.0| 2008-07-06|    2008|     4|     360.0|   8.0|  48.41|26.9|   0.0|0.00257|      269.0|     3.5|9598.0|    2|         2+|
|           111|    NL 377418758|   3.0| 2008-07-06|    2009|     5|     360.0|   8.0|  48.41|26.9|   0.0|0.00257|      269.0|     3.5|9598.0|    2|         2+|
+--------------+----------------+------+-----------+--------+------+----------+------+-------+----+------+-------+-----------+--------+------+-----+-----------+
only showing top 2 rows



In [None]:
n_to_array = udf(lambda n : [n] * n, ArrayType(IntegerType()))

In [None]:
df=Weight.withColumn('WeightArray',n_to_array(Weight.Weight))

In [None]:
df.show(2)

+--------------+----------------+------+-----------+--------+------+----------+------+-------+----+------+-------+-----------+--------+------+-----+-----------+---------------+
|HerdIdentifier|AnimalIdentifier|Parity|CalvingDate|TestYear|Weight|DaysInMilk|Points|ScaleKg|Ramp|Offset|  Decay|Persistence|StdErrKg|M305Kg|NYear|ParityGroup|    WeightArray|
+--------------+----------------+------+-----------+--------+------+----------+------+-------+----+------+-------+-----------+--------+------+-----+-----------+---------------+
|           111|    NL 377418758|   3.0| 2008-07-06|    2008|     4|     360.0|   8.0|  48.41|26.9|   0.0|0.00257|      269.0|     3.5|9598.0|    2|         2+|   [4, 4, 4, 4]|
|           111|    NL 377418758|   3.0| 2008-07-06|    2009|     5|     360.0|   8.0|  48.41|26.9|   0.0|0.00257|      269.0|     3.5|9598.0|    2|         2+|[5, 5, 5, 5, 5]|
+--------------+----------------+------+-----------+--------+------+----------+------+-------+----+------+-------+-

In [None]:
WeightedExplore=df.withColumn('weight',F.explode(df.WeightArray))

this lactation had 4 test-day records in 2008, and 5 in 2009. So now in the WeightedExplore, we can see the lactation curve characteristics appear 4 times and 5 times respectively in 2008 and 2009

In [None]:
WeightedExplore.show(9)

+--------------+----------------+------+-----------+--------+------+----------+------+-------+----+------+-------+-----------+--------+------+-----+-----------+---------------+
|HerdIdentifier|AnimalIdentifier|Parity|CalvingDate|TestYear|weight|DaysInMilk|Points|ScaleKg|Ramp|Offset|  Decay|Persistence|StdErrKg|M305Kg|NYear|ParityGroup|    WeightArray|
+--------------+----------------+------+-----------+--------+------+----------+------+-------+----+------+-------+-----------+--------+------+-----+-----------+---------------+
|           111|    NL 377418758|   3.0| 2008-07-06|    2008|     4|     360.0|   8.0|  48.41|26.9|   0.0|0.00257|      269.0|     3.5|9598.0|    2|         2+|   [4, 4, 4, 4]|
|           111|    NL 377418758|   3.0| 2008-07-06|    2008|     4|     360.0|   8.0|  48.41|26.9|   0.0|0.00257|      269.0|     3.5|9598.0|    2|         2+|   [4, 4, 4, 4]|
|           111|    NL 377418758|   3.0| 2008-07-06|    2008|     4|     360.0|   8.0|  48.41|26.9|   0.0|0.00257| 

In [None]:
WeightedExplore.\
  repartition(1).\
  write.\
  parquet(path='/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet/WeightedExplore/',mode="overwrite")

In [None]:
WeightedExplore= spark \
  .read \
  .parquet('/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet/WeightedExplore/')

## weighted mean

In [None]:
WeightedMean=WeightedExplore.groupBy('HerdIdentifier','TestYear','ParityGroup').\
  agg(avg('ScaleKg').alias('MeanScaleKg'),
  avg('Ramp').alias('MeanRamp'),
  avg('Offset').alias('MeanOffset'),
  avg('Decay').alias('MeanDecay'),
  avg('Persistence').alias('MeanPersistence'),
  avg('M305Kg').alias('MeanM305Kg'),
  F.countDistinct('AnimalIdentifier').alias('NumberOfAnimal'),
  F.count(F.lit(1)).alias('TotalWeight')
  )

In [None]:
WeightedMean.show(10)

+--------------+--------+-----------+------------------+------------------+--------------------+--------------------+------------------+-----------------+--------------+-----------+
|HerdIdentifier|TestYear|ParityGroup|       MeanScaleKg|          MeanRamp|          MeanOffset|           MeanDecay|   MeanPersistence|       MeanM305Kg|NumberOfAnimal|TotalWeight|
+--------------+--------+-----------+------------------+------------------+--------------------+--------------------+------------------+-----------------+--------------+-----------+
|        762411|    2017|         2+|  44.1255412371134|23.196907216494854| -0.3257731958762887|0.002591030927835...|286.92783505154637|8799.090206185567|            75|        388|
|         54011|    2017|         2+|45.814849785407745|21.687553648068672|-0.23004291845493563|0.002946459227467811|332.52789699570815|8810.010729613734|            87|        466|
|        397152|    2016|          1| 36.81097315436241| 29.26812080536912|               

In [None]:
WeightedMean.\
  repartition(1).\
  write.\
  parquet(path='/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet/WeightedMean',mode="overwrite")

## weighted median

In [None]:
WeightedMedian=WeightedExplore.groupBy('HerdIdentifier','TestYear','ParityGroup').\
  agg(F.expr('percentile(ScaleKg, array(0.5))')[0].alias('MedianScaleKg'),
  F.expr('percentile(Ramp,array(0.5))')[0].alias('MedianRamp'),
  F.expr('percentile(Offset,array(0.5))')[0].alias('MedianOffset'),
  F.expr('percentile(Decay,array(0.5))')[0].alias('MedianDecay'),
  F.expr('percentile(Persistence,array(0.5))')[0].alias('MedianPersistence'),
  F.expr('percentile(M305Kg,array(0.5))')[0].alias('MedianM305Kg'),
  )

In [None]:
WeightedMedian.\
  repartition(1).\
  write.\
  parquet(path='/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet/WeightedMedian',mode="overwrite")

In [None]:
WeightedMedian= spark \
  .read \
  .parquet('/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet//WeightedMedian')

In [None]:
WeightedMedian.show(3)

+--------------+--------+-----------+-------------+----------+------------+-----------+-----------------+------------+
|HerdIdentifier|TestYear|ParityGroup|MedianScaleKg|MedianRamp|MedianOffset|MedianDecay|MedianPersistence|MedianM305Kg|
+--------------+--------+-----------+-------------+----------+------------+-----------+-----------------+------------+
|          2089|    2012|          1|        35.27|      31.1|        -0.5|    0.00341|            203.0|      6267.0|
|          6517|    2009|          1|        25.18|      29.8|        -0.5|    0.00132|            525.0|      5859.0|
|         14574|    2009|          1|        41.71|      29.7|        -0.5|    0.00211|            328.0|      8886.0|
+--------------+--------+-----------+-------------+----------+------------+-----------+-----------------+------------+
only showing top 3 rows



In [None]:
WeightedMedian.count()

372752

In [None]:
WeightedMean= spark \
  .read \
  .parquet('/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet/WeightedMean')

In [None]:
WeightedMean.show(3)

+--------------+--------+-----------+------------------+------------------+--------------------+--------------------+------------------+-----------------+--------------+-----------+
|HerdIdentifier|TestYear|ParityGroup|       MeanScaleKg|          MeanRamp|          MeanOffset|           MeanDecay|   MeanPersistence|       MeanM305Kg|NumberOfAnimal|TotalWeight|
+--------------+--------+-----------+------------------+------------------+--------------------+--------------------+------------------+-----------------+--------------+-----------+
|        762411|    2017|         2+|  44.1255412371134|23.196907216494854| -0.3257731958762887|0.002591030927835...|286.92783505154637|8799.090206185567|            75|        388|
|         54011|    2017|         2+|45.814849785407745|21.687553648068672|-0.23004291845493563|0.002946459227467811|332.52789699570815|8810.010729613734|            87|        466|
|        397152|    2016|          1| 36.81097315436241| 29.26812080536912|               

In [None]:
WeightedMean.count()

372752

In [None]:
Weighted=WeightedMean.join(WeightedMedian,['HerdIdentifier','TestYear','ParityGroup'],'inner')

In [None]:
Weighted.count()

372752

In [None]:
Weighted.\
  repartition(1).\
  write.\
  parquet(path='/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet/HerdWeighted',mode="overwrite")

In [None]:
HerdWeighted= spark \
  .read \
  .parquet('/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet/HerdWeighted')

In [None]:
HerdWeighted.orderBy('HerdIdentifier','TestYear').show(3)

+--------------+--------+-----------+------------------+------------------+-------------------+--------------------+------------------+-----------------+--------------+-----------+-------------+----------+------------+-----------+-----------------+------------+
|HerdIdentifier|TestYear|ParityGroup|       MeanScaleKg|          MeanRamp|         MeanOffset|           MeanDecay|   MeanPersistence|       MeanM305Kg|NumberOfAnimal|TotalWeight|MedianScaleKg|MedianRamp|MedianOffset|MedianDecay|MedianPersistence|MedianM305Kg|
+--------------+--------+-----------+------------------+------------------+-------------------+--------------------+------------------+-----------------+--------------+-----------+-------------+----------+------------+-----------+-----------------+------------+
|           111|    2007|          1|30.183749999999993|29.911111111111115|               -0.5|0.001846249999999...|477.22222222222223|6651.569444444444|            22|         72|        30.63|      30.0|        -

In [None]:
HerdWeighted.count()

372752

In [None]:
HerdWeighted.agg(F.countDistinct("HerdIdentifier")).collect()

[Row(count(DISTINCT HerdIdentifier)=20597)]

Now every herd has its own yearly herd lactation curve characteristics.

# Combine economic data with herd laction curve characteristics

In [None]:
GrossMarginWithUBN= spark. \
  read.\
  parquet('/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet/GrossMarginWithUBN/')

In [None]:
GrossMarginWithUBN.filter(col('UBN')=='4136665').orderBy(F.desc('FiscalYear')).show(50)

+--------+----------+-------------------+-------+
|     KVK|FiscalYear|GrossMarginPer100Kg|    UBN|
+--------+----------+-------------------+-------+
|55169835|      2016|  8.550129097104552|4136665|
|55169835|      2015|  12.23992645932281|4136665|
|55169835|      2014|  23.93985856590347|4136665|
|55169835|      2013| 26.600058840835544|4136665|
|55169835|      2013|  32.46010013515556|4136665|
|55169835|      2012| 27.140040974944178|4136665|
|55169835|      2012| 22.780165582645523|4136665|
|55169835|      2011| 25.630316036919076|4136665|
|55169835|      2011|  29.32004605712306|4136665|
|55169835|      2010| 23.589976549158767|4136665|
|55169835|      2010| 27.170150685479037|4136665|
|55169835|      2009| 15.620345296476382|4136665|
|55169835|      2009| 21.120019291112413|4136665|
|55169835|      2008|  28.67996737017495|4136665|
|55169835|      2008|  25.00988798661668|4136665|
|55169835|      2007| 26.789677675820716|4136665|
|55169835|      2007| 27.349893864348527|4136665|


In [None]:
GrossMarginWithUBN.show(2)

+-------+----------+-------------------+----+
|    KVK|FiscalYear|GrossMarginPer100Kg| UBN|
+-------+----------+-------------------+----+
|4186815|      2004|  24.26000337305146|null|
|4186815|      2005| 24.109701187065085|null|
+-------+----------+-------------------+----+
only showing top 2 rows



In [None]:
HerdWeighted= spark \
  .read \
  .parquet('/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/Parquet/HerdWeighted')

In [None]:
HerdWeighted.orderBy('HerdIdentifier','TestYear').show(3)

+--------------+--------+-----------+------------------+------------------+-------------------+--------------------+------------------+-----------------+--------------+-----------+-------------+----------+------------+-----------+-----------------+------------+
|HerdIdentifier|TestYear|ParityGroup|       MeanScaleKg|          MeanRamp|         MeanOffset|           MeanDecay|   MeanPersistence|       MeanM305Kg|NumberOfAnimal|TotalWeight|MedianScaleKg|MedianRamp|MedianOffset|MedianDecay|MedianPersistence|MedianM305Kg|
+--------------+--------+-----------+------------------+------------------+-------------------+--------------------+------------------+-----------------+--------------+-----------+-------------+----------+------------+-----------+-----------------+------------+
|           111|    2007|          1|30.183749999999993|29.911111111111115|               -0.5|0.001846249999999...|477.22222222222223|6651.569444444444|            22|         72|        30.63|      30.0|        -

join

In [None]:
GrossMarginWithUBN.printSchema()

root
 |-- KVK: string (nullable = true)
 |-- FiscalYear: string (nullable = true)
 |-- GrossMarginPer100Kg: double (nullable = true)
 |-- ProceOwnProduct: string (nullable = true)
 |-- Organic: string (nullable = true)
 |-- ProductType: string (nullable = true)
 |-- Robot: string (nullable = true)
 |-- UBN: string (nullable = true)



In [None]:
HerdWeighted.printSchema()

root
 |-- HerdIdentifier: integer (nullable = true)
 |-- TestYear: integer (nullable = true)
 |-- ParityGroup: string (nullable = true)
 |-- MeanScaleKg: double (nullable = true)
 |-- MeanRamp: double (nullable = true)
 |-- MeanOffset: double (nullable = true)
 |-- MeanDecay: double (nullable = true)
 |-- MeanPersistence: double (nullable = true)
 |-- MeanM305Kg: double (nullable = true)
 |-- NumberOfAnimal: long (nullable = true)
 |-- TotalWeight: long (nullable = true)
 |-- MedianScaleKg: double (nullable = true)
 |-- MedianRamp: double (nullable = true)
 |-- MedianOffset: double (nullable = true)
 |-- MedianDecay: double (nullable = true)
 |-- MedianPersistence: double (nullable = true)
 |-- MedianM305Kg: double (nullable = true)



UBN in GrossMarginWithUBN is string; \
HerdIdentifier in HerdWeighted is integer \
So I change them to the same type before join

In [None]:
GrossMarginWithUBN=GrossMarginWithUBN.withColumn('UBN',GrossMarginWithUBN['UBN'].cast(IntegerType()))

In [None]:
condition=[HerdWeighted.HerdIdentifier == GrossMarginWithUBN.UBN, \
 HerdWeighted.TestYear == GrossMarginWithUBN.FiscalYear]

In [None]:
HerdWeightedWithGrossMargin=HerdWeighted.join(GrossMarginWithUBN, \
        condition, \
        'inner')

In [None]:
HerdWeightedWithGrossMargin.show(2)

+--------------+--------+-----------+-----------------+------------------+------------------+--------------------+-----------------+-----------------+--------------+-----------+-------------+----------+------------+-----------+-----------------+------------+-------+----------+-------------------+---------------+-------+-----------+-----+---+
|HerdIdentifier|TestYear|ParityGroup|      MeanScaleKg|          MeanRamp|        MeanOffset|           MeanDecay|  MeanPersistence|       MeanM305Kg|NumberOfAnimal|TotalWeight|MedianScaleKg|MedianRamp|MedianOffset|MedianDecay|MedianPersistence|MedianM305Kg|    KVK|FiscalYear|GrossMarginPer100Kg|ProceOwnProduct|Organic|ProductType|Robot|UBN|
+--------------+--------+-----------+-----------------+------------------+------------------+--------------------+-----------------+-----------------+--------------+-----------+-------------+----------+------------+-----------+-----------------+------------+-------+----------+-------------------+---------------

In [None]:
HerdWeightedWithGrossMargin.show(2)

+--------------+--------+-----------+------------------+------------------+------------------+--------------------+------------------+-----------------+--------------+-----------+-------------+----------+------------+-----------+-----------------+------------+--------+----------+-------------------+-------+
|HerdIdentifier|TestYear|ParityGroup|       MeanScaleKg|          MeanRamp|        MeanOffset|           MeanDecay|   MeanPersistence|       MeanM305Kg|NumberOfAnimal|TotalWeight|MedianScaleKg|MedianRamp|MedianOffset|MedianDecay|MedianPersistence|MedianM305Kg|     KVK|FiscalYear|GrossMarginPer100Kg|    UBN|
+--------------+--------+-----------+------------------+------------------+------------------+--------------------+------------------+-----------------+--------------+-----------+-------------+----------+------------+-----------+-----------------+------------+--------+----------+-------------------+-------+
|       8414354|    2011|          1|28.069508196721316|29.76475409836065

In [None]:
HerdWeightedWithGrossMargin.count()

34910

In [None]:
HerdWeightedWithGrossMargin.agg(F.countDistinct("HerdIdentifier")).collect()

[Row(count(DISTINCT HerdIdentifier)=2208)]

In [None]:
HerdWeightedWithGrossMargin.filter(col('HerdIdentifier')=='4136665').orderBy(F.desc('TestYear')).show(50)

+--------------+--------+-----------+------------------+------------------+--------------------+--------------------+------------------+------------------+--------------+-----------+------------------+----------+------------+-----------+-----------------+------------+--------+----------+-------------------+-------+
|HerdIdentifier|TestYear|ParityGroup|       MeanScaleKg|          MeanRamp|          MeanOffset|           MeanDecay|   MeanPersistence|        MeanM305Kg|NumberOfAnimal|TotalWeight|     MedianScaleKg|MedianRamp|MedianOffset|MedianDecay|MedianPersistence|MedianM305Kg|     KVK|FiscalYear|GrossMarginPer100Kg|    UBN|
+--------------+--------+-----------+------------------+------------------+--------------------+--------------------+------------------+------------------+--------------+-----------+------------------+----------+------------+-----------+-----------------+------------+--------+----------+-------------------+-------+
|       4136665|    2016|         2+| 49.76336708

In [None]:
a=HerdWeightedWithGrossMargin.groupBy('HerdIdentifier','TestYear').agg(F.count('*').alias('AccountingRecordPerYear'))

In [None]:
a.groupBy('AccountingRecordPerYear').agg(F.countDistinct("HerdIdentifier").alias('NumberOfHerd')).orderBy(F.desc('AccountingRecordPerYear')).show()

+-----------------------+------------+
|AccountingRecordPerYear|NumberOfHerd|
+-----------------------+------------+
|                      4|           1|
|                      2|        2205|
|                      1|          27|
+-----------------------+------------+



In [None]:
a.filter('AccountingRecordPerYear=4').show(20)

+--------------+--------+-----------------------+
|HerdIdentifier|TestYear|AccountingRecordPerYear|
+--------------+--------+-----------------------+
|       4136665|    2011|                      4|
|       4136665|    2008|                      4|
|       4136665|    2013|                      4|
|       4136665|    2009|                      4|
|       4136665|    2012|                      4|
|       4136665|    2007|                      4|
|       4136665|    2010|                      4|
+--------------+--------+-----------------------+



In [None]:
from pyspark.sql import Window
w=Window.partitionBy('HerdIdentifier','TestYear')

In [None]:
HerdWeightedWithGrossMargin.withColumn('AccountingRecordPerYear',F.count('*').over(w)).orderBy(F.desc('AccountingRecordPerYear'),'HerdIdentifier','TestYear','ParityGroup').show()

+--------------+--------+-----------+------------------+------------------+--------------------+--------------------+------------------+------------------+--------------+-----------+-------------+----------+------------+-----------+-----------------+------------+--------+----------+-------------------+-------+-----------------------+
|HerdIdentifier|TestYear|ParityGroup|       MeanScaleKg|          MeanRamp|          MeanOffset|           MeanDecay|   MeanPersistence|        MeanM305Kg|NumberOfAnimal|TotalWeight|MedianScaleKg|MedianRamp|MedianOffset|MedianDecay|MedianPersistence|MedianM305Kg|     KVK|FiscalYear|GrossMarginPer100Kg|    UBN|AccountingRecordPerYear|
+--------------+--------+-----------+------------------+------------------+--------------------+--------------------+------------------+------------------+--------------+-----------+-------------+----------+------------+-----------+-----------------+------------+--------+----------+-------------------+-------+-----------------

In [None]:
HerdWeightedWithGrossMargin.withColumn('AccountingRecordPerYear',F.count('*').over(w)).orderBy('AccountingRecordPerYear','HerdIdentifier','TestYear','ParityGroup').show()

+--------------+--------+-----------+------------------+------------------+--------------------+--------------------+------------------+-----------------+--------------+-----------+-------------+----------+------------+--------------------+-----------------+------------+--------+----------+-------------------+-------+-----------------------+
|HerdIdentifier|TestYear|ParityGroup|       MeanScaleKg|          MeanRamp|          MeanOffset|           MeanDecay|   MeanPersistence|       MeanM305Kg|NumberOfAnimal|TotalWeight|MedianScaleKg|MedianRamp|MedianOffset|         MedianDecay|MedianPersistence|MedianM305Kg|     KVK|FiscalYear|GrossMarginPer100Kg|    UBN|AccountingRecordPerYear|
+--------------+--------+-----------+------------------+------------------+--------------------+--------------------+------------------+-----------------+--------------+-----------+-------------+----------+------------+--------------------+-----------------+------------+--------+----------+-------------------+-

In [None]:
GrossMarginWithUBN.agg(F.count('KVK'),F.countDistinct('KVK'),F.count('UBN'),F.countDistinct('UBN')).collect()

[Row(count(KVK)=32511, count(DISTINCT KVK)=2809, count(UBN)=25299, count(DISTINCT UBN)=2548)]

In [None]:
HerdWeighted.agg(F.countDistinct("HerdIdentifier")).collect()

[Row(count(DISTINCT HerdIdentifier)=20597)]

## one KVK have several UBN per year?

In [None]:
w1=Window.partitionBy('KVK','TestYear')
HerdWeightedWithGrossMargin.withColumn('LactationCurveRecordPerYear',F.count('*').over(w1)).orderBy(F.desc('LactationCurveRecordPerYear'),'KVK','TestYear','ParityGroup').show()

+--------------+--------+-----------+------------------+------------------+--------------------+--------------------+------------------+-----------------+--------------+-----------+------------------+----------+------------+-----------+-----------------+-----------------+--------+----------+-------------------+-------+---------------------------+
|HerdIdentifier|TestYear|ParityGroup|       MeanScaleKg|          MeanRamp|          MeanOffset|           MeanDecay|   MeanPersistence|       MeanM305Kg|NumberOfAnimal|TotalWeight|     MedianScaleKg|MedianRamp|MedianOffset|MedianDecay|MedianPersistence|     MedianM305Kg|     KVK|FiscalYear|GrossMarginPer100Kg|    UBN|LactationCurveRecordPerYear|
+--------------+--------+-----------+------------------+------------------+--------------------+--------------------+------------------+-----------------+--------------+-----------+------------------+----------+------------+-----------+-----------------+-----------------+--------+----------+----------

In [None]:
HerdWeightedWithGrossMargin=HerdWeightedWithGrossMargin.\
  withColumn('AccountingRecordPerYearPerUBN',F.count('*').over(w)).\
  withColumn('LactationCurveRecordPerYearPerKVK',F.count('*').over(w1))


In [None]:
HerdWeightedWithGrossMargin.\
  repartition(1).\
  write.\
  csv(path='/content/gdrive/Shared drives/Bovi-Analytics/Projects/ChenYongYan/Chapter2/Output/CSV/HerdWeightedWithGrossMargin/',mode="overwrite",header=True)