Receive and Ingest Data from Greenhouse API Streaming

In [15]:
import requests
import boto3
import logging
import time

# Connect to Amazon Kinesis Data Firehose
firehose = boto3.client("firehose", region_name="eu-north-1")

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger(__name__)

def ingest_data(api_url="http://greenhouse.shef.ac.uk:7070/stream"):
    while True:
        try:
            with requests.get(api_url, stream=True) as response:

                for line in response.iter_lines(decode_unicode=True): # Decode the lines
                    if line:
                        #### Accepting the line if it starts with "data: "
                        # Pings are ignored
                        if line.startswith("data: "):

                            raw_json = line.replace("data: ", "") # Extract only the JSON part

                            logger.info(f"Datapoint received.")

                            try:
                                firehose.put_record(
                                    DeliveryStreamName="greenhouse_stream",
                                    Record={
                                        'Data': str(raw_json) + '\n' # append newline character for proper record separation
                                    })
                                
                                logger.info(f"A datapoint has been sent to Firehose.")

                            except Exception as e:
                                logger.error(f"Error sending data to Firehose: {e}")
                    
        except requests.RequestException as e:
            logger.error(f"Connection error: {e}")
            logger.info("Reconnecting in 10 seconds...")
            time.sleep(10)

        except KeyboardInterrupt:
            logger.info("Session interrupted. Shutting down...")
            break

        except Exception as e:
            logger.error(f"Unexpected error: {e}")
            time.sleep(10)

In [None]:
ingest_data()

2026-01-11 12:34:44,684 [INFO] Received data: {"timestamp": "2026-01-11T12:29:57.120005", "outdoor_global_radiation": 78.0, "outdoor_air_temp": -0.4, "outdoor_rh": 87.2, "outdoor_wind_speed": 0.5, "indoor_temperature": 23.6, "indoor_vpd": 4.080273729, "energy_screen_closure": 0.0, "blackout_screen_closure": 0.0, "lee_vent_aperture": 0.0, "wind_vent_aperture": 0.0, "pipe_rail_inlet_temp": 55.0, "grow_pipes_inlet_temp": 0.0, "lights_on": 1.0, "14": 0.0, "15": 0.0, "16": 0.0, "17": 0.0, "18": 0.0, "19": 0.0, "20": 0.0, "21": 0.0, "co2_injection_status": 0.0, "indoor_co2": 1145.0}
2026-01-11 12:34:58,343 [INFO] Session interrupted. Shutting down...


In [16]:
import base64
import json
import logging
import time

print('Loading function')

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def lambda_handler(event, context):
    output = []

    for record in event['records']:
        try:
            print(record['recordId'])
            payload = base64.b64decode(record['data']).decode('utf-8')

            # -------- PARSING ----------
            # 1. Parse string to JSON
            data = json.loads(payload)

            # -------- VALIDATION ----------
            # 2. Check if timestamp is present, if not, mark as failed and send to S3 error bucket
            if data.get('timestamp') is None:
                logger.error("Missing timestamp field!")
                output_record = {
                    'recordId': record['recordId'],
                    'result': 'ProcessingFailed',
                    'data': record['data']
                }
            else:
                # ---------- CLEANING ------------
                fields = [
                    'outdoor_global_radiation','outdoor_air_temp',
                    'outdoor_rh','outdoor_wind_speed','indoor_temperature','indoor_vpd',
                    'energy_screen_closure','blackout_screen_closure',
                    'lee_vent_aperture','wind_vent_aperture','pipe_rail_inlet_temp',
                    'grow_pipes_inlet_temp','lights_on'
                ]

                # 3. Ensure all fields are included, and if not, fill missing ones with Null
                for field in fields:
                    if field not in data:
                        logger.warning(f"Missing field: {field}")
                        data[field] = None

                # 4. Drop unwanted fields
                keys = ['14','15','16','17','18','19']
                for key in keys:
                    data.pop(key, None)
                
                # --------- TRANSFORMATION ----------
                # 5. Format the timestamp to fit Athena's requirements
                data['timestamp'] = str(data['timestamp'])
                data["timestamp"] = data["timestamp"].replace("T", " ") 

                # 6. Add ingestion time
                data['ingested_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime())

                # 7. Convert numeric fields to float
                for field in fields:
                    if data[field] is not None:
                        data[field] = float(data[field])
                  
                # --------- RE-ENCODE ----------
                # Convert back to string and encode
                processed_json = json.dumps(data) + '\n'  # Append newline character
                processed_payload = base64.b64encode(processed_json.encode('utf-8')).decode('utf-8')
                
                output_record = {
                    'recordId': record['recordId'],
                    'result': 'Ok',
                    'data': processed_payload
                }

        # Error handling        
        except Exception as e:
            logger.error(f"Error processing record {record['recordId']}: {e}")
            output_record = {
                'recordId': record['recordId'],
                'result': 'ProcessingFailed',
                'data': record['data']
            }

        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['records'])))
    return {'records': output}


Loading function


Test Lambda Function

In [23]:
import base64

# Test payload (with timestamp and some sample fields)
payload_1 = '{"timestamp": "2025-01-04T12:00:00", "outdoor_global_radiation": 0.0, "outdoor_air_temp": -2.5}'


encoded_data_1 = base64.b64encode(payload_1.encode('utf-8')).decode('utf-8')

# Test payload (without and some sample fields)
payload_2 = '{"lights_on": 0, "outdoor_global_radiation": 0.0, "outdoor_air_temp": -2.5}'

encoded_data_2 = base64.b64encode(payload_2.encode('utf-8')).decode('utf-8')

print(encoded_data_1)
print(encoded_data_2)

eyJ0aW1lc3RhbXAiOiAiMjAyNS0wMS0wNFQxMjowMDowMCIsICJvdXRkb29yX2dsb2JhbF9yYWRpYXRpb24iOiAwLjAsICJvdXRkb29yX2Fpcl90ZW1wIjogLTIuNX0=
eyJsaWdodHNfb24iOiAwLCAib3V0ZG9vcl9nbG9iYWxfcmFkaWF0aW9uIjogMC4wLCAib3V0ZG9vcl9haXJfdGVtcCI6IC0yLjV9


In [24]:
%pip install watchtower

Collecting watchtower
  Downloading watchtower-3.4.0-py3-none-any.whl.metadata (16 kB)
Downloading watchtower-3.4.0-py3-none-any.whl (18 kB)
Installing collected packages: watchtower
Successfully installed watchtower-3.4.0
Note: you may need to restart the kernel to use updated packages.
