In [1]:
%load_ext autoreload
%autoreload 1  
# Automatically reload bioscout package
%aimport bioscout_tech_challenge

In [2]:
from bioscout_tech_challenge import *
# Now any changes to your package will be automatically reloaded
import pandas as pd
import numpy as np

## Pre-Process Weather Data

Things to look at 
- extra_information column
- storage 
- autodetect header from mutliple files
- extract out of sensors to other tables
- add new data to existing tables
- write to sql db


### Merge Weather and Devices Data

Hardcode some data for testing.

In [3]:
weather_folder = r"data/tables/weather_data/"
devices_fn = "weather_devices.csv"
weather_fn = "weather_data_1.csv"


devices_df = read_csv_file(weather_folder+devices_fn)
weather_df = read_csv_file(weather_folder+weather_fn)

print(devices_df.columns)
print(weather_df.columns)
print(weather_df.loc[0])

Index(['device_id', 'device_name', 'site_id', 'utc_offset_in_hours',
       'longitude', 'latitude'],
      dtype='object')
Index(['index', 'weather_reading_id', 'date_measured', 'device_id', 'voc',
       'pressure', 'extra_information'],
      dtype='object')
index                                                           7243411
weather_reading_id                                              9983574
date_measured                                 2024-10-19 11:56:31+00:00
device_id                                                           259
voc                                                              9400.0
pressure                                                        10246.0
extra_information     {'VOCs': [{'Value': 9.4, 'Sensor': 'BME680'}],...
Name: 0, dtype: object


Try merging the data check resulting shape makes sense. ie merge_rows = weather_rows and merge columns = weather_columns + device_columns -1 (device_id repeated)

In [4]:
merged_df = merge_weather_data(weather_df,devices_df)
#Check shape makes sense
print(devices_df.shape)
print(weather_df.shape)
print(merged_df.shape)

(16, 6)
(49999, 7)
(49999, 12)


Need to check if any rows are missing corresponding device information.

In [5]:
# Check if any rows are missing corresponding device information 
# with function get_na_rows pick a random column from devices_df.
print(get_na_rows(merged_df,"index"))

Empty DataFrame
Columns: [index, device_id]
Index: []


### Find and join multiple Weather Files

Lets try to find multiple files  based on a pattern matching and determine if they all contain header data.

In [6]:
# Find all csv files in the weather folder
weather_csvs = find_csv_files(weather_folder,prefix="weather_data")
print(weather_csvs)

# Identify the header for each file
for csv in weather_csvs:
    print(identify_header(csv))

[PosixPath('/home/zach/repo/data.data_engineer_technical_challenge/data/tables/weather_data/weather_data_1.csv'), PosixPath('/home/zach/repo/data.data_engineer_technical_challenge/data/tables/weather_data/weather_data_2.csv'), PosixPath('/home/zach/repo/data.data_engineer_technical_challenge/data/tables/weather_data/weather_data_3.csv')]
infer
None
None


Looks like only the first file has a header. Lets try to combine the files.

In [7]:
combined_df = combine_csv_files(weather_csvs,detect_header=True)
print(combined_df.shape)
print(combined_df.columns)
print(combined_df.loc[0])

(144399, 8)
Index(['index', 'weather_reading_id', 'date_measured', 'device_id', 'voc',
       'pressure', 'extra_information', 'source_file'],
      dtype='object')
index                                                           7243411
weather_reading_id                                              9983574
date_measured                                 2024-10-19 11:56:31+00:00
device_id                                                           259
voc                                                              9400.0
pressure                                                        10246.0
extra_information     {'VOCs': [{'Value': 9.4, 'Sensor': 'BME680'}],...
source_file                                          weather_data_1.csv
Name: 0, dtype: object


Now lets try to merge the combined data with the devices data and check if any rows are missing corresponding device information.

In [8]:
combined_merged_df = merge_weather_data(combined_df,devices_df)
#Check shape makes sense
print(combined_merged_df.shape)
print(get_na_rows(merged_df,"index"))

(144399, 13)
Empty DataFrame
Columns: [index, device_id]
Index: []


No issues with the data given but is a good check for integration into the end user application.

### Parse Extra Information (Flatten extra_information data)

Additional sensor readings are stored in the extra_information column as a json string.
We need to parse the json string and flatten the data into a table.
Start by looking at the data in a sample row.

In [9]:
index = 500
# First get a sample row's extra_information
extra_info = combined_merged_df.loc[index]['extra_information']
print(extra_info)

{'VOCs': [{'Value': 12.044, 'Sensor': 'BME680'}], 'IotID': '0a10aced202194944a051624', 'Humidity': [{'Value': 56.283, 'Sensor': 'BME680'}, {'Value': 100, 'Sensor': 'SHT30'}], 'Pressure': [{'Value': 985.11, 'Sensor': 'BME680'}], 'Rainfall': [{'Value': 0, 'Sensor': 'OpticalRainGauge', 'SampleTimeLength': 300}, {'Value': 0, 'Sensor': 'TippingRainGauge', 'SampleTimeLength': 300}], 'Timestamp': '2024-10-19T15:22:02Z', 'WindSpeed': [{'Value': 0.856, 'Sensor': '40ms_spin_wind'}, {'Value': 1.092, 'Sensor': '60ms_louvre_us'}], 'Temperature': [{'Value': 5.36, 'Sensor': 'BME680'}, {'Value': 4.27291, 'Sensor': 'SHT30'}], 'WindDirection': [{'Value': 298.039, 'Sensor': '40ms_spin_wind'}, {'Value': 112.795, 'Sensor': '60ms_louvre_us'}]}


String dump of a dictionary stored in json format. Lets make this pretty so we can see the structure.

In [10]:
# Convert the extra_information JSON string to a dictionary
import json
import pprint

# The string appears to be double-encoded (both JSON and string literal), so we need to:
# 1. Parse the outer JSON
# 2. Evaluate the inner string literal as a Python dict
def parse_extra_info(json_str):
    # First, clean up the string if needed
    cleaned_str = json_str.replace("'", '"')  # Replace single quotes with double quotes
    # Parse JSON
    return json.loads(cleaned_str)

sample_parsed = parse_extra_info(extra_info)
# Create a PrettyPrinter instance with desired formatting
pp = pprint.PrettyPrinter(indent=4, width=80)
print("\nParsed Extra Information:")
pp.pprint(sample_parsed)

# lets also compare to the rest of the table
columns_to_print = combined_merged_df.columns.difference(['extra_information'])
print("\nRest of Table:")
print(combined_merged_df[columns_to_print].loc[index])



Parsed Extra Information:
{   'Humidity': [   {'Sensor': 'BME680', 'Value': 56.283},
                    {'Sensor': 'SHT30', 'Value': 100}],
    'IotID': '0a10aced202194944a051624',
    'Pressure': [{'Sensor': 'BME680', 'Value': 985.11}],
    'Rainfall': [   {   'SampleTimeLength': 300,
                        'Sensor': 'OpticalRainGauge',
                        'Value': 0},
                    {   'SampleTimeLength': 300,
                        'Sensor': 'TippingRainGauge',
                        'Value': 0}],
    'Temperature': [   {'Sensor': 'BME680', 'Value': 5.36},
                       {'Sensor': 'SHT30', 'Value': 4.27291}],
    'Timestamp': '2024-10-19T15:22:02Z',
    'VOCs': [{'Sensor': 'BME680', 'Value': 12.044}],
    'WindDirection': [   {'Sensor': '40ms_spin_wind', 'Value': 298.039},
                         {'Sensor': '60ms_louvre_us', 'Value': 112.795}],
    'WindSpeed': [   {'Sensor': '40ms_spin_wind', 'Value': 0.856},
                     {'Sensor': '60ms_louvre_us'

### Data Inconsistencies

Mostly contains sensor readings with a sensor type, device name and reading value. However, there are some other entries that are not sensor readings; Timestamp and IotID. Also pressure and VOC are included again despite aleady having a column in the main table.

In [11]:
# Sanity check of parsed data with existing pressure and VOC readings.
print("Parsed Data:")
print(f"Pressure: {sample_parsed['Pressure'][0]['Value']}")
print(f"VOCs: {sample_parsed['VOCs'][0]['Value']}")

print("\nDataframe Data:")
print(f"Pressure: {combined_merged_df.loc[index]['pressure']}")
print(f"VOCs: {combined_merged_df.loc[index]['voc']}")


Parsed Data:
Pressure: 985.11
VOCs: 12.044

Dataframe Data:
Pressure: 9851.1
VOCs: 12044.0


Tested the above with a few more rows and it seems that pressure is a factor of 10 different and VOCs are a factor of 1000 different. This seems to be consistent based on about 10 checks but we should probably automate this checking when flattening the data.

Its unclear how the data was collected and what algorithm was used to calculate the sensor readings. However, the earths pressure is between 900- 1100hPa so the json data seems to be in the correct units.

Best solution is probably to add a new column with the units of the sensor reading.

Its unclear how the VOC data was transformed. BME680 documentation suggests that it is standard to convert the resistance measured into a IAQ (Indoor Air Quality) reading. However, the key word indoor makes it seem like this might not be the case considering the data is collected outside. 

Assume the values are converted to the IAQ scale a VOC reading of 500 is considered hazardous and therefore a value of ~10000 is not plausible given the nature of the measurements. However, a reading between 0-50 is excellent air quality which seems likely to be found outdoors on a farm. Therefore, moving forward with the assumption that the json data is in the units of the IAQ scale. (Ideally this would be confirmed with the engineering team)


#### Flatten Extra Information

Moving forward we need to flatten each sensor reading into a new row.

Therefore we aim to add the following columns to the dataframe:
- sensor_type (name of the measurement ie pressure, voc)
- sensor_device (name of the device the measurement was taken from)
- sensor_reading (value of the measurement)
- sensor_units (units of the measurement)

whilst dropping the following columns:
- extra_information (data has all been flattened)
- pressure (will be replaced with sensor_type and sensor_reading)
- voc (will be replaced with sensor_type and sensor_reading)

Additionally the IotID is new unique information and will be copied over each new expanded row.

The timestamp data seems to be consistent with the main dataframe however it would be nice to do a sanity check and flagging any inconsistencies. A simple solution based on the above analysis is to assume the json data is correct and should be the source of truth. Therefore, any timestamp in the main dataframe that disagrees with the timestamp in the json data should be flagged as an inconsistency.


Finally a design decision needs to be made about the sensors that have a SampleTimeLength. Whilst the rest of the sensors are assuming to sample a discrete point in time, the rain sensors are collected over a short period.

One solution would be to make a sample time length column and fill it with -1 for sensors that do not have a sample time length. This is probably the simplest solution but may not be the best from a data storage perspective. 

Without knowing the direction of the data in the future this is probably the best solution without introducing my own assumptions unneccesarily.

In [12]:

# lets make a sensor schema to help with the parsing
# This is a placeholder for now and will be updated as we learn more about the data.
# The keys are the sensor types and the values are a list of the sensor type, units.
sensor_schema = {
    'Humidity': ['humidity', '%'],
    'Pressure': ['pressure', 'hPa'],
    'Rainfall': ['rainfall', 'mm'],
    'Temperature': ['temperature', 'C'],
    'VOCs': ['voc', 'IAQ'],
    'WindDirection': ['winddirection', 'degrees'],
    'WindSpeed': ['windspeed', 'm/s'],
}

expanded_df = expand_extra_information(combined_merged_df.loc[index],sensor_schema)

In [13]:
print(expanded_df.columns)
print(expanded_df)

Index(['index', 'weather_reading_id', 'device_id', 'source_file',
       'device_name', 'site_id', 'utc_offset_in_hours', 'longitude',
       'latitude', 'timezone', 'timestamp', 'iotid', 'sensor_type',
       'sensor_device', 'sensor_value', 'sensor_units', 'sample_time_length'],
      dtype='object')
       index  weather_reading_id  device_id         source_file device_name  \
500  7365694             9987227        262  weather_data_1.csv     WH-0010   
500  7365694             9987227        262  weather_data_1.csv     WH-0010   
500  7365694             9987227        262  weather_data_1.csv     WH-0010   
500  7365694             9987227        262  weather_data_1.csv     WH-0010   
500  7365694             9987227        262  weather_data_1.csv     WH-0010   
500  7365694             9987227        262  weather_data_1.csv     WH-0010   
500  7365694             9987227        262  weather_data_1.csv     WH-0010   
500  7365694             9987227        262  weather_data_1.csv 

Looks pretty good. Now we need to apply this to the whole dataframe and add the functionality to the package.

In [14]:
try:
    flattened_df
except NameError:
    flattened_df = expand_weather_dataframe(combined_merged_df[0:100],sensor_schema)
print(flattened_df.columns)
print(flattened_df)

Index(['index', 'weather_reading_id', 'device_id', 'source_file',
       'device_name', 'site_id', 'utc_offset_in_hours', 'longitude',
       'latitude', 'timezone', 'timestamp', 'iotid', 'sensor_type',
       'sensor_device', 'sensor_value', 'sensor_units', 'sample_time_length'],
      dtype='object')
        index  weather_reading_id  device_id         source_file device_name  \
0     7243411             9983574        259  weather_data_1.csv     WH-0011   
1     7243411             9983574        259  weather_data_1.csv     WH-0011   
2     7243411             9983574        259  weather_data_1.csv     WH-0011   
3     7243411             9983574        259  weather_data_1.csv     WH-0011   
4     7243411             9983574        259  weather_data_1.csv     WH-0011   
...       ...                 ...        ...                 ...         ...   
1195  7263615             9994016        267  weather_data_1.csv     WH-0014   
1196  7263615             9994016        267  weather_da

This approach is useful for a small dataframe but is not scalable. We need to find a way to vectorize the process. A quick search suggest that the native python json library is particularly slow and is not suitable for large dataframes.

Lets refactor the code and use Pandas json_normalize function. Since this task is not dependent on the device information we can create a function that only deals with the extra_information column.

In [15]:
flattened_weather_df = flatten_weather_data(combined_df)
print(flattened_weather_df.columns)
print(flattened_weather_df)

Index(['Value', 'Sensor', 'IotID', 'Timestamp', 'sensor_type',
       'weather_reading_id', 'SampleTimeLength'],
      dtype='object')
Index(['index', 'weather_reading_id', 'date_measured', 'device_id',
       'source_file', 'sensor_value', 'sensor_device', 'iotid', 'timestamp',
       'sensor_type', 'sample_time_length'],
      dtype='object')
            index  weather_reading_id              date_measured  device_id  \
0         7243411             9983574  2024-10-19 11:56:31+00:00        259   
1         7243411             9983574  2024-10-19 11:56:31+00:00        259   
2         7243411             9983574  2024-10-19 11:56:31+00:00        259   
3         7243411             9983574  2024-10-19 11:56:31+00:00        259   
4         7243411             9983574  2024-10-19 11:56:31+00:00        259   
...           ...                 ...                        ...        ...   
1732783  10342010             9983536  2024-10-19 11:53:47+00:00        258   
1732784  10342010    

Also need to add the sensor units to the dataframe and check the timestamp match.

In [16]:
sensor_units = {
    'humidity': '%',
    'pressure': 'hPa',
    'rainfall': 'mm',
    'temperature': 'C',
    'voc': 'IAQ',
    'winddirection': 'degrees',
    'windspeed': 'm/s',
}

flattened_weather_df = add_sensor_units(flattened_weather_df,sensor_units)
print(check_timestamp_match(flattened_weather_df))
if len(check_timestamp_match(flattened_weather_df)) > 0:
    print("Warning: Timestamps do not match")
    print(flattened_weather_df.loc[check_timestamp_match(flattened_weather_df)])
else:
    flattened_weather_df.drop(columns=['date_measured'],inplace=True)

[]


Need to refactor the changes to the device information table into a separate function for adding a timezone and checking the timestamp. This can be seperate function since it is not dependent on the weather data.

In [17]:
devices_timezone_df = add_timezone_from_coordinates(devices_df)
print(devices_timezone_df.columns)
print(devices_timezone_df.loc[0])

Index(['device_id', 'device_name', 'site_id', 'utc_offset_in_hours',
       'longitude', 'latitude', 'timezone'],
      dtype='object')
device_id                           279
device_name                     WH-0016
site_id                              85
utc_offset_in_hours                11.0
longitude                    146.730825
latitude                     -41.022719
timezone               Australia/Hobart
Name: 0, dtype: object


Alright now lets merge the weather and devices dataframes.


In [18]:
print(expanded_df.iloc[0])


index                                   7365694
weather_reading_id                      9987227
device_id                                   262
source_file                  weather_data_1.csv
device_name                             WH-0010
site_id                                      57
utc_offset_in_hours                        13.0
longitude                            171.388011
latitude                             -43.418845
timezone                       Pacific/Auckland
timestamp                  2024-10-19T15:22:02Z
iotid                  0a10aced202194944a051624
sensor_type                                 voc
sensor_device                            BME680
sensor_value                             12.044
sensor_units                                IAQ
sample_time_length                           -1
Name: 500, dtype: object


In [19]:
final_df = merge_weather_data(flattened_weather_df,devices_timezone_df)
print(final_df.columns)
final_row = final_df.loc[0]
print(final_row)
#compare to original combined_merged_df
flattened_row  = flattened_df.loc[0]  
print(flattened_row)
# find the difference in columns
print(final_df.columns.difference(flattened_df.columns))
print(flattened_df.columns.difference(final_df.columns))
# Find columns that are in both dataframes but have different names
common_values = set(final_df.columns) & set(flattened_df.columns)
print("\nCommon columns:", common_values)
print("Match:", (final_row[list(common_values)] == flattened_row[list(common_values)]).all())

Index(['index', 'weather_reading_id', 'device_id', 'source_file',
       'sensor_value', 'sensor_device', 'iotid', 'timestamp', 'sensor_type',
       'sample_time_length', 'sensor_units', 'device_name', 'site_id',
       'utc_offset_in_hours', 'longitude', 'latitude', 'timezone'],
      dtype='object')
index                                   7243411
weather_reading_id                      9983574
device_id                                   259
source_file                  weather_data_1.csv
sensor_value                                9.4
sensor_device                            BME680
iotid                  0a10aced202194944a0514dc
timestamp                  2024-10-19T11:56:31Z
sensor_type                                 voc
sample_time_length                         -1.0
sensor_units                                IAQ
device_name                             WH-0011
site_id                                      63
utc_offset_in_hours                        13.0
longitude               

Great that works and is much faster. Need to now add the functionality to the package for the end user.

In [23]:
# Lets try and load the sensor schema from a json file.
sensor_schema = read_json_file(r"src/bioscout_tech_challenge/data/sensor_schema.json")
print(parse_sensor_schema(sensor_schema))


{'sensor_types': ['VOCs', 'Pressure', 'Humidity', 'Temperature', 'WindSpeed', 'WindDirection', 'Rainfall'], 'sensor_mappings': {'Humidity': {'column_name': 'humidity', 'unit': '%'}, 'Pressure': {'column_name': 'pressure', 'unit': 'hPa'}, 'Rainfall': {'column_name': 'rainfall', 'unit': 'mm'}, 'Temperature': {'column_name': 'temperature', 'unit': 'C'}, 'VOCs': {'column_name': 'voc', 'unit': 'IAQ'}, 'WindDirection': {'column_name': 'winddirection', 'unit': 'degrees'}, 'WindSpeed': {'column_name': 'windspeed', 'unit': 'm/s'}}, 'meta_columns': ['IotID', 'Timestamp'], 'column_mapping': {'Value': 'sensor_value', 'Sensor': 'sensor_device', 'IotID': 'iotid', 'SampleTimeLength': 'sample_time_length', 'Timestamp': 'timestamp'}, 'columns_to_drop': ['extra_information', 'pressure', 'voc']}
