> ###Loading Complete Dataset

In [0]:
%scala
spark.sparkContext.hadoopConfiguration.set(
  "fs.azure.account.key.bovianalytics.blob.core.windows.net",
  "JTG4AW2TT0WwlzNtX7tQ28fgqFt/w76st4N/pA0PyzW7RJ8yOwV0oyD9JcSnoIGc8k8yOKOPJSGX+ASt8ejJFw=="
)

In [0]:
# Container name and file path
container_name = "gpluse-cluster-2"
storage_account_name = "bovianalytics"
file_path_complete = "Projects/SenseOfSensors/ParquetData/SensorAndCalvingDataRepartitioned02042021/"


In [0]:
# Listing files in Storage path
display(dbutils.fs.ls(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{file_path_complete}"))

In [0]:
# Displaying the Dataset
df = spark.read.parquet(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{file_path_complete}")
display(df)

> Loading finished

> ###Preprocessing Dataset and removing unnecessary Columns & Rows

In [0]:
# Keep only the required columns from the dataset
required_columns = ["AnimalIdentifier", "TransitionAnimalEartag", "HerdIdentifier", "EventDate", "ObservationValue", "ObservationType", "TransitionDaysInMilk", "TransitionCalvingDate", "TransitionLactationNumber", "TransitionParity", "TransitionSeason"]

df_processed = df.select(required_columns)

display (df_processed)

> Keep only "PerDay", "Per2Hrs" in OservationType

In [0]:
from pyspark.sql.functions import col

# Keep only rows where ObservationType is "MinutesPerDay" and "MinutesPer2Hours"
df_processed_observations = df_processed.filter(
    col("ObservationType").rlike("PerDay$|Per2Hours$")
)

display(df_processed_observations)

> ###Checking unique values for "ObservationType" Column

In [0]:
# Show all distinct values in ObservationType column
display(df_processed_observations.select("ObservationType").distinct())

In [0]:
# For %Per2Hours
df_processed_observations.filter(col("ObservationType").endswith("Per2Hours")) \
         .select("ObservationType") \
         .distinct() \
         .orderBy("ObservationType") \
         .show(truncate=False)

In [0]:
# For %PerDay
df_processed_observations.filter(col("ObservationType").endswith("PerDay")) \
         .select("ObservationType") \
         .distinct() \
         .orderBy("ObservationType") \
         .show(truncate=False)

> Removing undesired Rows from "ObservationType"

In [0]:
# Removing more ObservationType 
types_to_remove = ["EatingNumberOfBoutsPerDay", "InactiveBoutLengthMinutesPerDay", "InactiveBoutsPerDay", "InactiveInterboutLengthMinutesPerDay", "LyingBoutLengthLowerQntMinutesPerDay", "LyingBoutLengthMaxMinutesPerDay", "LyingBoutLengthMedMinutesPerDay", "LyingBoutLengthMinMinutesPerDay", "LyingBoutLengthMinutesPerDay", "LyingBoutLengthUpperQntMinutesPerDay", "LyingBoutsPerDay", "RuminationNumberOfBoutsPerDay", "StandupsPerDay" ]

# Filter out those unwanted observation types
df_cleaned = df_processed_observations.filter(~col("ObservationType").isin(types_to_remove))

In [0]:
# For %PerDay
df_cleaned.filter(col("ObservationType").endswith("PerDay")) \
         .select("ObservationType") \
         .distinct() \
         .orderBy("ObservationType") \
         .show(truncate=False)

> ###Checking for Null Values of each column Seprately

In [0]:
from pyspark.sql.functions import col, sum as spark_sum, when

# Loop through each column and print null counts
for column_name in df_cleaned.columns:
    null_count = df_cleaned.select(
        spark_sum(when(col(column_name).isNull(), 1).otherwise(0)).alias("nulls")
    ).collect()[0]["nulls"]
    
    print(f"Column '{column_name}' has {null_count} null values.")


> Checking total number of Rows

In [0]:
# Count total number of rows in the dataset
row_count = df_cleaned.count()
print(f"Total number of rows: {row_count}")


> Frequency Check - Checking if each "AnimalIdentifier" maps to one and only one "TranisitonAnimalEartag"

In [0]:
from pyspark.sql.functions import col, countDistinct

# Group by AnimalIdentifier and count distinct TransitionAnimalEartag for each
ambiguous_mapping = df_cleaned.groupBy("AnimalIdentifier") \
    .agg(countDistinct("TransitionAnimalEartag").alias("distinct_eartag_count")) \
    .filter(col("distinct_eartag_count") > 1)

# Display AnimalIdentifiers with ambiguous mappings (more than one possible eartag)
display(ambiguous_mapping)


> ###Removing Rows with "TranistionAnimalEartag" = NULL

> We only have 0.001% Null values removing them will not effect the Dataset

In [0]:
# Drop rows with null TransitionAnimalEartag
df_final = df_cleaned.filter(col("TransitionAnimalEartag").isNotNull())


> Dataset is ready for 3D sensor - df_final is the finalized dataset

> ###Creating a 3D tensor

> Filtering a Single Cow

In [0]:
# Identify 1 unique cows
unique_cows = df_final.select("TransitionAnimalEartag").distinct().limit(1)

# Join to filter full dataset to those 1 cow
df_sampled = df_final.join(unique_cows, on="TransitionAnimalEartag", how="inner")

# Verify
df_sampled.select("TransitionAnimalEartag").distinct().show()


> Filtering "TransitionDaysInMilk" days from -21 to 305

In [0]:
from pyspark.sql.functions import col

# Filter days in the valid range [-21, 305]
df_sampled_filtered = df_sampled.filter(
    (col("TransitionDaysInMilk") >= -21) & (col("TransitionDaysInMilk") <= 305)
)



> Spilliting the Dataset %PerDay and %Per2Hours

In [0]:
# Filter %PerDay observation types
per_day_df = df_sampled_filtered.filter(col("ObservationType").like("%PerDay"))

# Filter %Per2Hours observation types
per_2hr_df = df_sampled_filtered.filter(col("ObservationType").like("%Per2Hours"))


> Pivoting Dataset

In [0]:
from pyspark.sql.functions import col, first

# Cast 'ObservationValue' to float for both subsets
per_day_df = per_day_df.withColumn("ObservationValue", col("ObservationValue").cast("float"))
per_2hr_df = per_2hr_df.withColumn("ObservationValue", col("ObservationValue").cast("float"))

# Pivot PerDay subset
pivot_per_day = per_day_df.groupBy("TransitionAnimalEartag", "TransitionDaysInMilk")\
    .pivot("ObservationType")\
    .agg(first("ObservationValue"))

# Pivot Per2Hours subset
pivot_per_2hours = per_2hr_df.groupBy("TransitionAnimalEartag", "TransitionDaysInMilk")\
    .pivot("ObservationType")\
    .agg(first("ObservationValue"))


> Joining the Pivotted Dataset

In [0]:
# Join pivoted PerDay and Per2Hours tables
joined_pivot = pivot_per_day.alias("day").join(
    pivot_per_2hours.alias("hr"),
    on=["TransitionAnimalEartag", "TransitionDaysInMilk"],
    how="inner"
)

display(joined_pivot)

In [0]:
# Get columns lists
per_day_cols = [col for col in pivot_per_day.columns if col not in ["TransitionAnimalEartag", "TransitionDaysInMilk"]]
per_2hr_cols = [col for col in pivot_per_2hours.columns if col not in ["TransitionAnimalEartag", "TransitionDaysInMilk"]]


In [0]:
# Collect and sort Days
# Filter for one cow (in case more got in later)
cow_id = joined_pivot.select("TransitionAnimalEartag").first()["TransitionAnimalEartag"]

# Filter and sort by day
cow_df_sorted = joined_pivot.filter(col("TransitionAnimalEartag") == cow_id)\
                            .orderBy("TransitionDaysInMilk")


In [0]:
# Build a 3D tensor
import numpy as np

# Convert to local rows
rows = cow_df_sorted.collect()
tensor = []

for row in rows:
    per_day_vec = [row[col] for col in per_day_cols]
    per_2hr_vec = [row[col] for col in per_2hr_cols]

    if None in per_day_vec or None in per_2hr_vec:
        matrix = np.zeros((7, 7)).tolist()
    else:
        matrix = np.outer(per_day_vec, per_2hr_vec).tolist()

    tensor.append(matrix)

# Final tensor shape: [T, 7, 7]
print(f" Built tensor with shape: [{len(tensor)}, 7, 7]")


> Displaying Tensor Shape

In [0]:
# Check basic shape
print(f"Tensor shape: {len(tensor)} days × 7 × 7")

# Check type and dimensions of one sample
print("Sample matrix shape:", np.array(tensor[0]).shape)


In [0]:
# Preview the first 3 day-matrices
for i, matrix in enumerate(tensor[:3]):
    print(f"\nDay {i+1} (TransitionDaysInMilk):")
    for row in matrix:
        print(row)


In [0]:
# Check how many matrices are all zeros
zero_matrices = sum(1 for m in tensor if np.sum(m) == 0.0)
print(f"⚠️ Zero-filled matrices: {zero_matrices} / {len(tensor)}")


> Normalizing 

In [0]:
# Normalize each matrix to range [0, 1]
normalized_tensor = []

for matrix in tensor:
    np_matrix = np.array(matrix)
    min_val = np_matrix.min()
    max_val = np_matrix.max()
    
    if max_val > min_val: 
        norm_matrix = (np_matrix - min_val) / (max_val - min_val)
    else:
        norm_matrix = np.zeros_like(np_matrix)  

    normalized_tensor.append(norm_matrix.tolist())


In [0]:
# Displaying Normalized values
for i, matrix in enumerate(normalized_tensor[:2]):
    print(f"\nDay {i+1} (normalized):")
    for row in matrix:
        print([round(val, 3) for val in row])


> ###Saving .tfrecords

In [0]:
import tensorflow as tf

# Flatten normalized tensor
flat_tensor = [value for matrix in normalized_tensor for row in matrix for value in row]

# Create tf.train.Example
example = tf.train.Example(features=tf.train.Features(feature={
    "tensor": tf.train.Feature(float_list=tf.train.FloatList(value=flat_tensor)),
    "tensor_shape": tf.train.Feature(int64_list=tf.train.Int64List(value=[len(normalized_tensor), 7, 7]))
}))


In [0]:
# Output path in DBFS
output_path = "/dbfs/tmp/BehaviorTFRecords_CowOne/cow_1.tfrecord"

# Ensure output directory exists
import os
os.makedirs("/dbfs/tmp/BehaviorTFRecords_CowOne/", exist_ok=True)

# Write TFRecord
with tf.io.TFRecordWriter(output_path) as writer:
    writer.write(example.SerializeToString())


In [0]:
display(dbutils.fs.ls("dbfs:/tmp/BehaviorTFRecords_CowOne/"))


In [0]:
# Copy from DBFS to WASBS
dbutils.fs.cp(
    "dbfs:/tmp/BehaviorTFRecords_CowOne/cow_1.tfrecord",
    f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/Projects/MuhammadAzam/BehaviorTFRecords_CowOne/cow_1.tfrecord"
)


In [0]:
display(dbutils.fs.ls(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/Projects/MuhammadAzam/BehaviorTFRecords_CowOne/"))


> Loading tfrecords

In [0]:
import tensorflow as tf

# Load the TFRecord dataset
tfrecord_path = "/dbfs/tmp/BehaviorTFRecords_CowOne/cow_1.tfrecord"
raw_dataset = tf.data.TFRecordDataset(tfrecord_path)

#  Check if any records exist
for raw_record in raw_dataset.take(1):
    print(" Raw record found:", raw_record)


In [0]:
print("Total flattened values:", len(flat_tensor))  # flat_tensor from earlier

In [0]:
import tensorflow as tf

# flattened length
feature_description = {
    "tensor": tf.io.FixedLenFeature([16023], tf.float32),
    "tensor_shape": tf.io.FixedLenFeature([3], tf.int64)
}

# Reload the TFRecord file
tfrecord_path = "/dbfs/tmp/BehaviorTFRecords_CowOne/cow_1.tfrecord"
raw_dataset = tf.data.TFRecordDataset(tfrecord_path)

# Parse and reshape the tensor
for raw_record in raw_dataset.take(1):
    parsed = tf.io.parse_single_example(raw_record, feature_description)
    shape = parsed["tensor_shape"]
    tensor = tf.reshape(parsed["tensor"], shape)

    print(" Parsed tensor shape:", tensor.shape)


> Displaying Reloaded tensor

In [0]:
import tensorflow as tf

# TFRecord path
tfrecord_path = "/dbfs/tmp/BehaviorTFRecords_CowOne/cow_1.tfrecord"

# Define parser
def parse_fn(example_proto):
    feature_description = {
        "tensor": tf.io.FixedLenFeature([16023], tf.float32),
        "tensor_shape": tf.io.FixedLenFeature([3], tf.int64)
    }
    parsed = tf.io.parse_single_example(example_proto, feature_description)
    tensor = tf.reshape(parsed["tensor"], parsed["tensor_shape"])
    return tensor

# Load and parse dataset
parsed_ds = tf.data.TFRecordDataset(tfrecord_path).map(parse_fn)

# Display first 3 day matrices from first tensor
for tensor in parsed_ds.take(1):
    tf.print(" Full tensor shape:", tf.shape(tensor))
    tf.print("\nDay 1 matrix:\n", tensor[0])
    tf.print("\nDay 2 matrix:\n", tensor[1])
    tf.print("\nDay 3 matrix:\n", tensor[2])


> Removing Extra Files

In [0]:
# List all files and subfolders under 'Projects/MuhammadAzam/'
base_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/Projects/MuhammadAzam/"
all_paths = dbutils.fs.ls(base_path)

# Delete each file/subfolder inside the directory
for path in all_paths:
    dbutils.fs.rm(path.path, recurse=True)


In [0]:
base_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/Projects/MuhammadAzam"
contents = dbutils.fs.ls(base_path)

# Only display if not empty
if contents:
    display(contents)
else:
    print(" Folder is empty — ready for TFRecord export.")
