In [1]:
from dotenv import load_dotenv
import boto3
import os

load_dotenv()
access_key_id = os.environ.get("OLIVER_ACCESS_KEY")
access_key_secret = os.environ.get("OLIVER_SECRET_ACCESS_KEY")


## Getting data from bucket as csv:

In [8]:

session = boto3.Session(
    aws_access_key_id=access_key_id,
    aws_secret_access_key=access_key_secret,
)

s3_client = boto3.client(
    's3',
    aws_access_key_id=access_key_id,
    aws_secret_access_key=access_key_secret
)

bucket = session.resource('s3').Bucket(name='emu-air-quality-bucket')

latest_object = list(bucket.objects.all())[-1]

with open("./download.csv", 'wb') as file:
    s3_client.download_fileobj(
        bucket.name,
        latest_object.key,
        file
    )





## Grabbing data from csv as DataFrame:

In [88]:
import pandas as pd
import re
import numpy as np

raw_data = pd.read_csv("./download.csv", dtype=str)

def get_application_id(val: str) -> str:
    my_regex = re.compile(r'.*\{application_id=(.*?)[\}\,]')
    result = my_regex.search(val).group(1)
    if result == "null":
        return np.nan
    return result

def get_eui(val: str) -> str:
    if "decoded_payload=null" in val:
        return np.nan
    my_regex = re.compile(r'.*eui=(.*?)[\}\,]')
    result = my_regex.search(val).group(1)
    if result == "null":
        return np.nan
    return result

def get_co2(val: str) -> int:
    if "decoded_payload=null" in val:
        return np.nan
    my_regex = re.compile(r'.*co2=(.*?)[\}\,]')
    result = my_regex.search(val).group(1)
    if result == "null":
        return np.nan
    return int(result)

def get_hcho(val: str) -> float:
    if "decoded_payload=null" in val:
        return np.nan
    my_regex = re.compile(r'.*hcho=(.*?)[\}\,]')
    result = my_regex.search(val).group(1)
    if result == "null":
        return np.nan
    return float(result)

def get_humidity(val: str) -> float:
    if "decoded_payload=null" in val:
        return np.nan
    my_regex = re.compile(r'.*humidity=(.*?)[\}\,]')
    result = my_regex.search(val).group(1)
    if result == "null":
        return np.nan
    return float(result)

def get_light_level(val: str) -> int:
    if "decoded_payload=null" in val:
        return np.nan
    my_regex = re.compile(r'.*light_level=(.*?)[\}\,]')
    result = my_regex.search(val).group(1)
    if result == "null":
        return np.nan
    return int(result)

def get_pir(val: str) -> str:
    if "decoded_payload=null" in val:
        return np.nan
    my_regex = re.compile(r'.*pir=(.*?)[\}\,]')
    result = my_regex.search(val).group(1)
    if result == "null":
        return np.nan
    return result

def get_pm10(val: str) -> int:
    if "decoded_payload=null" in val:
        return np.nan
    my_regex = re.compile(r'.*pm10=(.*?)[\}\,]')
    result = my_regex.search(val).group(1)
    if result == "null":
        return np.nan
    return int(result)

def get_pm2_5(val: str) -> int:
    if "decoded_payload=null" in val:
        return np.nan
    my_regex = re.compile(r'.*pm2_5=(.*?)[\}\,]')
    result = my_regex.search(val).group(1)
    if result == "null":
        return np.nan
    return int(result)

def get_pressure(val: str) -> float:
    if "decoded_payload=null" in val:
        return np.nan
    my_regex = re.compile(r'.*pressure=(.*?)[\}\,]')
    result = my_regex.search(val).group(1)
    if result == "null":
        return np.nan
    return float(result)

def get_temperature(val: str) -> float:
    if "decoded_payload=null" in val:
        return np.nan
    my_regex = re.compile(r'.*temperature=(.*?)[\}\,]')
    result = my_regex.search(val).group(1)
    if result == "null":
        return np.nan
    return float(result)

def get_tvoc(val: str) -> int:
    if "decoded_payload=null" in val:
        return np.nan
    my_regex = re.compile(r'.*tvoc=(.*?)[\}\,]')
    result = my_regex.search(val).group(1)
    if result == "null":
        return np.nan
    return int(result)

def get_measured_at(val: str) -> str:
    if "decoded_payload=null" in val:
        return np.nan
    my_regex = re.compile(r'.*received_at=(.*?)[\}\,]')
    result = my_regex.search(val).group(1)
    if result == "null":
        return np.nan
    return result



conversion_dict = {
    "received_at": {
        "column": "received_at",
        "function": lambda val: val
    },
    "application_id": {
        "column": "end_device_ids",
        "function": get_application_id
    },
    "eui": {
        "column": "uplink_message",
        "function": get_eui
    },
    "co2": {
        "column": "uplink_message",
        "function": get_co2
    },
    "hcho": {
        "column": "uplink_message",
        "function": get_hcho
    },
    "humidity": {
        "column": "uplink_message",
        "function": get_humidity
    },
    "light_level": {
        "column": "uplink_message",
        "function": get_light_level
    },
    "pir": {
        "column": "uplink_message",
        "function": get_pir
    },
    "pm10": {
        "column": "uplink_message",
        "function": get_pm10
    },
    "pm2_5": {
        "column": "uplink_message",
        "function": get_pm2_5
    },
    "pressure": {
        "column": "uplink_message",
        "function": get_pressure
    },
    "temperature": {
        "column": "uplink_message",
        "function": get_temperature
    },
    "tvoc": {
        "column": "uplink_message",
        "function": get_tvoc
    },
    "measured_at": {
        "column": "uplink_message",
        "function": get_measured_at
    },
}

data = pd.DataFrame({ key: raw_data[val["column"]].apply(val["function"]) for key, val in conversion_dict.items() })


## Getting data fro a datastore

We don't need to use the S3 Bucket, since I can just set a lambda on a chron job to access the datastore and push to the database. S3 bucket remains as backup.

In [249]:

analytics_client = boto3.client(
    'iotanalytics',
    aws_access_key_id=access_key_id,
    aws_secret_access_key=access_key_secret
)

response = analytics_client.get_dataset_content(
    datasetName="emu_air_quality_data",
    versionId="$LATEST_SUCCEEDED"
)
print(response)

{'ResponseMetadata': {'RequestId': '97b0248e-8121-4760-b92b-a243acc3edb6', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Tue, 28 Feb 2023 07:27:48 GMT', 'content-type': 'application/json', 'content-length': '747', 'connection': 'keep-alive', 'x-amzn-requestid': '97b0248e-8121-4760-b92b-a243acc3edb6', 'cache-control': 'no-cache, no-store, must-revalidate, private', 'expires': '0', 'pragma': 'no-cache'}, 'RetryAttempts': 0}, 'pipeline': {'name': 'emu_air_quality_pipe', 'arn': 'arn:aws:iotanalytics:us-east-1:638201302569:pipeline/emu_air_quality_pipe', 'activities': [{'channel': {'name': 'MyInput', 'channelName': 'emu_air_quality', 'next': 'MyOutput'}}, {'datastore': {'name': 'MyOutput', 'datastoreName': 'emu_air_quality_store'}}], 'reprocessingSummaries': [], 'creationTime': datetime.datetime(2023, 2, 27, 14, 7, 29, 788000, tzinfo=tzlocal()), 'lastUpdateTime': datetime.datetime(2023, 2, 28, 16, 49, 25, 763000, tzinfo=tzlocal())}}
{'ResponseMetadata': {'RequestId': '2f4e138f-e0f1-4d9a-b

## Creating a dynamodb table

In [248]:
db_client = boto3.client(
    'dynamodb',
    aws_access_key_id=access_key_id,
    aws_secret_access_key=access_key_secret
)

response = db_client.create_table(
    AttributeDefinitions=[
        {
            'AttributeName': 'DeviceEUI',
            'AttributeType': 'S',
        },
        {
            'AttributeName': 'Timestamp',
            'AttributeType': 'S',
        },
    ],
    KeySchema=[
        {
            'AttributeName': 'DeviceEUI',
            'KeyType': 'HASH',
        },
        {
            'AttributeName': 'Timestamp',
            'KeyType': 'RANGE',
        },
    ],
    ProvisionedThroughput={
        'ReadCapacityUnits': 5,
        'WriteCapacityUnits': 5,
    },
    TableName='DeviceMeasurements',
)

print(response)

{'TableDescription': {'AttributeDefinitions': [{'AttributeName': 'DeviceEUI', 'AttributeType': 'S'}, {'AttributeName': 'Timestamp', 'AttributeType': 'S'}], 'TableName': 'DeviceMeasurements', 'KeySchema': [{'AttributeName': 'DeviceEUI', 'KeyType': 'HASH'}, {'AttributeName': 'Timestamp', 'KeyType': 'RANGE'}], 'TableStatus': 'CREATING', 'CreationDateTime': datetime.datetime(2023, 2, 28, 17, 26, 9, 903000, tzinfo=tzlocal()), 'ProvisionedThroughput': {'NumberOfDecreasesToday': 0, 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5}, 'TableSizeBytes': 0, 'ItemCount': 0, 'TableArn': 'arn:aws:dynamodb:us-east-1:638201302569:table/DeviceMeasurements', 'TableId': 'ccbc5f52-4087-417b-9418-8edecb4ecea7'}, 'ResponseMetadata': {'RequestId': '38ROP6LQN5D76O2KHP4RHC852JVV4KQNSO5AEMVJF66Q9ASUAAJG', 'HTTPStatusCode': 200, 'HTTPHeaders': {'server': 'Server', 'date': 'Tue, 28 Feb 2023 07:26:10 GMT', 'content-type': 'application/x-amz-json-1.0', 'content-length': '608', 'connection': 'keep-alive', 'x-amzn-requ

## PUTting to dynamodb table

In [76]:
def create_request_item_from_row(row):
    the_item = dict()
    if not isinstance(row["measured_at"], str):
        return None
    if not isinstance(row["eui"], str):
        return None
    
    the_item["Timestamp"] = { "S": str(row["measured_at"]) }
    the_item["DeviceEUI"] = { "S": str(row["eui"]) }

    if isinstance(row["received_at"], str):
        the_item["ReceivedAt"] = { "S": str(row["received_at"]) }

    if isinstance(row["application_id"], str):
        the_item["ApplicationID"] = { "S": str(row["application_id"]) }

    if not np.isnan(row["co2"]):
        the_item["CO2"] = { "N": str(row["co2"]) }

    if not np.isnan(row["hcho"]):
        the_item["HCHO"] = { "N": str(row["hcho"]) }

    if not np.isnan(row["humidity"]):
        the_item["Humidity"] = { "N": str(row["humidity"]) }

    if not np.isnan(row["light_level"]):
        the_item["LightLevel"] = { "N": str(row["light_level"]) }

    if isinstance(row["pir"], str):
        the_item["PIR"] = { "S": str(row["pir"]) }

    if not np.isnan(row["pm10"]):
        the_item["PM10"] = { "N": str(row["pm10"]) }

    if not np.isnan(row["pm2_5"]):
        the_item["PM2_5"] = { "N": str(row["pm2_5"]) }

    if not np.isnan(row["pressure"]):
        the_item["Pressure"] = { "N": str(row["pressure"]) }

    if not np.isnan(row["temperature"]):
        the_item["Temperature"] = { "N": str(row["temperature"]) }

    if not np.isnan(row["tvoc"]):
        the_item["TVOC"] = { "N": str(row["tvoc"]) }

    return the_item

In [108]:


db_client = boto3.client(
    'dynamodb',
    aws_access_key_id=access_key_id,
    aws_secret_access_key=access_key_secret
)

chunks = np.array_split(data, int(len(data)/25)+1)

for chunk in chunks:

    request_items = [
        {
            'PutRequest': {
                'Item': create_request_item_from_row(row) 
            },
        } for index, row in chunk.iterrows() if not create_request_item_from_row(row) is None
    ]

    print(f"PUTting {len(request_items)} items.")

    response = db_client.batch_write_item(
        RequestItems={
            'DeviceMeasurements': request_items
        },
    )

    print(response)

PUTting 21 items.
{'UnprocessedItems': {}, 'ResponseMetadata': {'RequestId': '3NQ860EHLIK4880HSJJ0MEJBHVVV4KQNSO5AEMVJF66Q9ASUAAJG', 'HTTPStatusCode': 200, 'HTTPHeaders': {'server': 'Server', 'date': 'Wed, 01 Mar 2023 01:54:31 GMT', 'content-type': 'application/x-amz-json-1.0', 'content-length': '23', 'connection': 'keep-alive', 'x-amzn-requestid': '3NQ860EHLIK4880HSJJ0MEJBHVVV4KQNSO5AEMVJF66Q9ASUAAJG', 'x-amz-crc32': '4185382651'}, 'RetryAttempts': 0}}
PUTting 22 items.
{'UnprocessedItems': {}, 'ResponseMetadata': {'RequestId': 'FPFCPG7EQ8CN4O6E1SKBK2PS3JVV4KQNSO5AEMVJF66Q9ASUAAJG', 'HTTPStatusCode': 200, 'HTTPHeaders': {'server': 'Server', 'date': 'Wed, 01 Mar 2023 01:54:31 GMT', 'content-type': 'application/x-amz-json-1.0', 'content-length': '23', 'connection': 'keep-alive', 'x-amzn-requestid': 'FPFCPG7EQ8CN4O6E1SKBK2PS3JVV4KQNSO5AEMVJF66Q9ASUAAJG', 'x-amz-crc32': '4185382651'}, 'RetryAttempts': 0}}
PUTting 20 items.
{'UnprocessedItems': {}, 'ResponseMetadata': {'RequestId': 'KA3EO5

## Querying dynamodb table

In [116]:
from boto3.dynamodb.conditions import Key

def get_measurements(session: boto3.Session, tablename: str, eui:str, starttime:str, endtime:str):
    db_resource = session.resource('dynamodb')

    table = db_resource.Table(tablename)

    response = table.query(
        ExpressionAttributeNames={
            '#nsk': 'Timestamp'
        },
        KeyConditionExpression=(Key('DeviceEUI').eq(eui) & Key("Timestamp").between(starttime, endtime)),
        ProjectionExpression='DeviceEUI, #nsk, ReceivedAt, ApplicationID, CO2, HCHO, Humidity, LightLevel, PIR, PM10, PM2_5, Pressure, Temperature, TVOC',
    )

    return response["Items"]

print(
    get_measurements(
        session=session, 
        tablename="DeviceMeasurements", 
        eui="AC1F09FFFE053AD4", 
        starttime='2023-02-28T20:59:59.408246963Z', 
        endtime='2023-03-01T06:21:21.876972278Z'
    )
)

[{'TVOC': Decimal('235'), 'CO2': Decimal('453'), 'Pressure': Decimal('1009.7'), 'HCHO': Decimal('0.02'), 'PM10': Decimal('2'), 'Timestamp': '2023-02-28T21:01:22.366963483Z', 'PM2_5': Decimal('2'), 'DeviceEUI': 'AC1F09FFFE053AD4', 'PIR': 'idle', 'LightLevel': Decimal('0'), 'ReceivedAt': '2023-02-28T21:01:22.573216648Z', 'Temperature': Decimal('25'), 'Humidity': Decimal('61.5'), 'ApplicationID': 'office-air-quality'}, {'TVOC': Decimal('236'), 'CO2': Decimal('457'), 'Pressure': Decimal('1009.7'), 'HCHO': Decimal('0.02'), 'PM10': Decimal('1'), 'Timestamp': '2023-02-28T21:06:22.414879847Z', 'PM2_5': Decimal('1'), 'DeviceEUI': 'AC1F09FFFE053AD4', 'PIR': 'idle', 'LightLevel': Decimal('0'), 'ReceivedAt': '2023-02-28T21:06:22.622845918Z', 'Temperature': Decimal('25.1'), 'Humidity': Decimal('61.5'), 'ApplicationID': 'office-air-quality'}, {'TVOC': Decimal('237'), 'CO2': Decimal('451'), 'Pressure': Decimal('1009.8'), 'HCHO': Decimal('0.02'), 'PM10': Decimal('2'), 'Timestamp': '2023-02-28T21:11:22

## Download latest dataset

In [127]:
import requests
import boto3

analytics_client = boto3.client(
    'iotanalytics',
    aws_access_key_id=access_key_id,
    aws_secret_access_key=access_key_secret
)

response = analytics_client.get_dataset_content(
    datasetName="emu_air_quality_data",
    versionId="$LATEST_SUCCEEDED"
)
raw_file = requests.get(response["entries"][0]["dataURI"])

with open("./download.csv", "wb") as file:
    file.write(raw_file.content)



Unnamed: 0,received_at,application_id,eui,co2,hcho,humidity,light_level,pir,pm10,pm2_5,pressure,temperature,tvoc,measured_at
0,2023-02-28T04:11:21.960167651Z,office-air-quality,AC1F09FFFE053AD4,408.0,0.01,54.0,1.0,idle,5.0,5.0,1010.4,29.7,80.0,2023-02-28T04:11:21.755884916Z
1,2023-02-28T11:01:22.262012028Z,office-air-quality,AC1F09FFFE053AD4,430.0,0.01,59.0,0.0,idle,1.0,1.0,1011.1,27.6,249.0,2023-02-28T11:01:22.057066251Z
2,2023-02-28T03:06:22.136282886Z,office-air-quality,AC1F09FFFE053AD4,403.0,0.01,54.5,1.0,idle,5.0,5.0,1010.9,29.4,25.0,2023-02-28T03:06:21.931255755Z
3,2023-02-27T09:26:21.394296728Z,office-air-quality,AC1F09FFFE053AD4,558.0,0.02,59.5,0.0,idle,3.0,3.0,1012.5,28.6,86.0,2023-02-27T09:26:21.189972802Z
4,2023-02-28T01:51:21.932181027Z,office-air-quality,AC1F09FFFE053AD4,398.0,0.01,58.5,1.0,trigger,7.0,6.0,1011.8,28.1,57.0,2023-02-28T01:51:21.726918373Z


## Creating a user

In [134]:
iam_client = boto3.client(
    'iam',
    aws_access_key_id=access_key_id,
    aws_secret_access_key=access_key_secret
)

response = iam_client.create_user(
    UserName='db_user'
)

print(response)

{'User': {'Path': '/', 'UserName': 'db_user', 'UserId': 'AIDAZJF6DBYURMHLPRKCW', 'Arn': 'arn:aws:iam::638201302569:user/db_user', 'CreateDate': datetime.datetime(2023, 3, 1, 2, 41, 26, tzinfo=tzutc())}, 'ResponseMetadata': {'RequestId': '258e0730-8218-428f-b3bc-3b31898e543f', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '258e0730-8218-428f-b3bc-3b31898e543f', 'content-type': 'text/xml', 'content-length': '475', 'date': 'Wed, 01 Mar 2023 02:41:26 GMT'}, 'RetryAttempts': 0}}


## Creating a policy

In [135]:
import json

policydoc = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "ListAndDescribe",
            "Effect": "Allow",
            "Action": [
                "dynamodb:List*",
                "dynamodb:DescribeReservedCapacity*",
                "dynamodb:DescribeLimits",
                "dynamodb:DescribeTimeToLive"
            ],
            "Resource": "*"
        },
        {
            "Sid": "SpecificTable",
            "Effect": "Allow",
            "Action": [
                "dynamodb:BatchGet*",
                "dynamodb:DescribeStream",
                "dynamodb:DescribeTable",
                "dynamodb:Get*",
                "dynamodb:Query",
                "dynamodb:Scan",
                "dynamodb:BatchWrite*",
                "dynamodb:CreateTable",
                "dynamodb:Delete*",
                "dynamodb:Update*",
                "dynamodb:PutItem"
            ],
            "Resource": "arn:aws:dynamodb:*:*:table/DeviceMeasurements"
        }
    ]
}

response = iam_client.create_policy(
    PolicyName='db_user_policy',
    PolicyDocument=json.dumps(policydoc)
)

print(response)

{'Policy': {'PolicyName': 'db_user_policy', 'PolicyId': 'ANPAZJF6DBYU65TDDAC4I', 'Arn': 'arn:aws:iam::638201302569:policy/db_user_policy', 'Path': '/', 'DefaultVersionId': 'v1', 'AttachmentCount': 0, 'PermissionsBoundaryUsageCount': 0, 'IsAttachable': True, 'CreateDate': datetime.datetime(2023, 3, 1, 2, 46, 50, tzinfo=tzutc()), 'UpdateDate': datetime.datetime(2023, 3, 1, 2, 46, 50, tzinfo=tzutc())}, 'ResponseMetadata': {'RequestId': 'fb52c650-9bb3-487c-92d4-5a22b295896e', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'fb52c650-9bb3-487c-92d4-5a22b295896e', 'content-type': 'text/xml', 'content-length': '763', 'date': 'Wed, 01 Mar 2023 02:46:50 GMT'}, 'RetryAttempts': 0}}


## Attaching a policy to a user

In [136]:
response = iam_client.attach_user_policy(
    UserName='db_user',
    PolicyArn='arn:aws:iam::638201302569:policy/db_user_policy'
)

print(response)

{'ResponseMetadata': {'RequestId': 'e5e08271-4cf6-44d5-9f01-5222a9685073', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'e5e08271-4cf6-44d5-9f01-5222a9685073', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Wed, 01 Mar 2023 02:47:45 GMT'}, 'RetryAttempts': 0}}


## Creating a Secret

In [None]:
"""secrets_client = boto3.client(
    'secretsmanager',
    aws_access_key_id=access_key_id,
    aws_secret_access_key=access_key_secret
)"""



## Getting an example json event for lambda

In [138]:
raw_data = pd.read_csv("./download.csv", dtype=str)

raw_data.to_json("./json_data.json", orient="index")



In [140]:
example_json_string = '''[{
        "end_device_ids": "{device_id=eui-24e124710b421822, application_ids={application_id=office-air-quality}, dev_eui=24E124710B421822, join_eui=88C146E20AA3631E, dev_addr=260D09CA}",
        "correlation_ids": "[as:up:01GTB3FGF8AS1WY5S50YZCM0S4, gs:conn:01GT1GCSRE1A4WJHYT8VZ15NNV, gs:up:host:01GT1GCT0RPBSM0XYKZK9KJC0R, gs:uplink:01GTB3FG8T7KRS6BSVE9WC93JN, ns:uplink:01GTB3FG8VS1SS1WT106QF3908, rpc:\/ttn.lorawan.v3.GsNs\/HandleUplink:01GTB3FG8VRB42849MTR4VW4ZR, rpc:\/ttn.lorawan.v3.NsAs\/HandleUplink:01GTB3FGF7C172DQM2C7B0M6PP]",
        "received_at": "2023-02-28T04:11:21.960167651Z",
        "uplink_message": "{session_key_id=AYaDInuedXPNkGqnYWx0GA==, f_cnt=1259, rx_metadata=[{gateway_ids={gateway_id=eui-ac1f09fffe053ad4, eui=AC1F09FFFE053AD4}, timestamp=4187439780, snr=10.3, uplink_token=CiIKIAoUZXVpLWFjMWYwOWZmZmUwNTNhZDQSCKwfCf\/+BTrUEKSF3cwPGgwI6f31nwYQuOvl5wIgoKHxtu+bSQ==, channel_index=6, received_at=2023-02-28T04:11:21.754546104Z}], settings={data_rate={lora={bandwidth=125000, spreading_factor=7, coding_rate=4\/5}}, frequency=916400000, timestamp=4187439780}, received_at=2023-02-28T04:11:21.755884916Z, consumed_airtime=0.097536s, network_ids={net_id=000013, tenant_id=ttn, cluster_id=au1, cluster_address=au1.cloud.thethings.network}, f_port=86, frm_payload=A2cpAQRobAUAAAbLAQd9mAEIfVAACXN4Jwp9AQALfQUADH0FAA==, decoded_payload={co2=408, hcho=0.01, humidity=54.0, light_level=1, pir=idle, pm10=5, pm2_5=5, pressure=1010.4, temperature=29.7, tvoc=80, beep=null}}",
        "__dt": "2023-02-28 00:00:00.000",
        "__year": null,
        "__month": null,
        "__day": null,
        "__hour": null
    }]'''

## Lambda function code

In [150]:
import re
import json

def get_application_id(val: str) -> str:
    my_regex = re.compile(r'.*\{application_id=(.*?)[\}\,]')
    result = my_regex.search(val).group(1)
    if result == "null":
        return None
    return result

def get_eui(val: str) -> str:
    if "decoded_payload=null" in val:
        return None
    my_regex = re.compile(r'.*eui=(.*?)[\}\,]')
    result = my_regex.search(val).group(1)
    if result == "null":
        return None
    return result

def get_co2(val: str) -> int:
    if "decoded_payload=null" in val:
        return None
    my_regex = re.compile(r'.*co2=(.*?)[\}\,]')
    result = my_regex.search(val).group(1)
    if result == "null":
        return None
    return int(result)

def get_hcho(val: str) -> float:
    if "decoded_payload=null" in val:
        return None
    my_regex = re.compile(r'.*hcho=(.*?)[\}\,]')
    result = my_regex.search(val).group(1)
    if result == "null":
        return None
    return float(result)

def get_humidity(val: str) -> float:
    if "decoded_payload=null" in val:
        return None
    my_regex = re.compile(r'.*humidity=(.*?)[\}\,]')
    result = my_regex.search(val).group(1)
    if result == "null":
        return None
    return float(result)

def get_light_level(val: str) -> int:
    if "decoded_payload=null" in val:
        return None
    my_regex = re.compile(r'.*light_level=(.*?)[\}\,]')
    result = my_regex.search(val).group(1)
    if result == "null":
        return None
    return int(result)

def get_pir(val: str) -> str:
    if "decoded_payload=null" in val:
        return None
    my_regex = re.compile(r'.*pir=(.*?)[\}\,]')
    result = my_regex.search(val).group(1)
    if result == "null":
        return None
    return result

def get_pm10(val: str) -> int:
    if "decoded_payload=null" in val:
        return None
    my_regex = re.compile(r'.*pm10=(.*?)[\}\,]')
    result = my_regex.search(val).group(1)
    if result == "null":
        return None
    return int(result)

def get_pm2_5(val: str) -> int:
    if "decoded_payload=null" in val:
        return None
    my_regex = re.compile(r'.*pm2_5=(.*?)[\}\,]')
    result = my_regex.search(val).group(1)
    if result == "null":
        return None
    return int(result)

def get_pressure(val: str) -> float:
    if "decoded_payload=null" in val:
        return None
    my_regex = re.compile(r'.*pressure=(.*?)[\}\,]')
    result = my_regex.search(val).group(1)
    if result == "null":
        return None
    return float(result)

def get_temperature(val: str) -> float:
    if "decoded_payload=null" in val:
        return None
    my_regex = re.compile(r'.*temperature=(.*?)[\}\,]')
    result = my_regex.search(val).group(1)
    if result == "null":
        return None
    return float(result)

def get_tvoc(val: str) -> int:
    if "decoded_payload=null" in val:
        return None
    my_regex = re.compile(r'.*tvoc=(.*?)[\}\,]')
    result = my_regex.search(val).group(1)
    if result == "null":
        return None
    return int(result)

def get_measured_at(val: str) -> str:
    if "decoded_payload=null" in val:
        return None
    my_regex = re.compile(r'.*received_at=(.*?)[\}\,]')
    result = my_regex.search(val).group(1)
    if result == "null":
        return None
    return result

conversion_dict = {
    "received_at": {
        "column": "received_at",
        "function": lambda val: val
    },
    "application_id": {
        "column": "end_device_ids",
        "function": get_application_id
    },
    "eui": {
        "column": "uplink_message",
        "function": get_eui
    },
    "co2": {
        "column": "uplink_message",
        "function": get_co2
    },
    "hcho": {
        "column": "uplink_message",
        "function": get_hcho
    },
    "humidity": {
        "column": "uplink_message",
        "function": get_humidity
    },
    "light_level": {
        "column": "uplink_message",
        "function": get_light_level
    },
    "pir": {
        "column": "uplink_message",
        "function": get_pir
    },
    "pm10": {
        "column": "uplink_message",
        "function": get_pm10
    },
    "pm2_5": {
        "column": "uplink_message",
        "function": get_pm2_5
    },
    "pressure": {
        "column": "uplink_message",
        "function": get_pressure
    },
    "temperature": {
        "column": "uplink_message",
        "function": get_temperature
    },
    "tvoc": {
        "column": "uplink_message",
        "function": get_tvoc
    },
    "measured_at": {
        "column": "uplink_message",
        "function": get_measured_at
    },
}

def lambda_handler(event, context):

    output = []
    
    for e in event:
        output.append({ key: val["function"](e[val["column"]]) for key, val in conversion_dict.items() })

    return json.dumps(output)

print(lambda_handler(json.loads(example_json_string), None))

[{"received_at": "2023-02-28T04:11:21.960167651Z", "application_id": "office-air-quality", "eui": "AC1F09FFFE053AD4", "co2": 408, "hcho": 0.01, "humidity": 54.0, "light_level": 1, "pir": "idle", "pm10": 5, "pm2_5": 5, "pressure": 1010.4, "temperature": 29.7, "tvoc": 80, "measured_at": "2023-02-28T04:11:21.755884916Z"}]


## Give pipeline permission to run lambda   

In [151]:
lambda_client = boto3.client(
    'lambda',
    aws_access_key_id=access_key_id,
    aws_secret_access_key=access_key_secret
)

# aws lambda add-permission --function-name exampleFunctionName --action lambda:InvokeFunction --statement-id iotanalytics --principal iotanalytics.amazonaws.com --source-account 123456789012 --source-arn arn:aws:iotanalytics:us-east-1:123456789012:pipeline/examplePipeline

response = lambda_client.add_permission(
    FunctionName='parse_measurements',
    StatementId='iotanalytics',
    Action='lambda:InvokeFunction',
    Principal='iotanalytics.amazonaws.com',
    SourceArn='arn:aws:iotanalytics:us-east-1:638201302569:pipeline/emu_air_quality_pipe',
    SourceAccount='638201302569'
)
print(response)

## Add Lambda to pipeline

In [157]:
analytics_client = boto3.client(
    'iotanalytics',
    aws_access_key_id=access_key_id,
    aws_secret_access_key=access_key_secret
)

response = analytics_client.update_pipeline(
    pipelineName='emu_air_quality_pipe',
    pipelineActivities=[
        {
            'channel': {
                'name': 'MyInput',
                'channelName': 'emu_air_quality',
                'next': 'ParsingLambda'
            }
        },
        {
            'lambda': {
                'name': 'ParsingLambda',
                'lambdaName': 'parse_measurements',
                'batchSize': 25,
                'next': 'MyOutput'
            },
        },
        {
            'datastore': {
                'name': 'MyOutput',
                'datastoreName': 'emu_air_quality_store'
            }
        }
    ]
)

print(response)

print("\n\n\n\n")

response = analytics_client.describe_pipeline(
    pipelineName='emu_air_quality_pipe'
)

print(response)

{'ResponseMetadata': {'RequestId': '7deb959d-c118-415f-a342-8d171395806a', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Wed, 01 Mar 2023 04:54:34 GMT', 'content-type': 'application/json', 'content-length': '0', 'connection': 'keep-alive', 'x-amzn-requestid': '7deb959d-c118-415f-a342-8d171395806a', 'cache-control': 'no-cache, no-store, must-revalidate, private', 'expires': '0', 'pragma': 'no-cache'}, 'RetryAttempts': 0}}





{'ResponseMetadata': {'RequestId': '28b7d511-52df-40d2-a73d-65ab439e76e1', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Wed, 01 Mar 2023 04:54:34 GMT', 'content-type': 'application/json', 'content-length': '1036', 'connection': 'keep-alive', 'x-amzn-requestid': '28b7d511-52df-40d2-a73d-65ab439e76e1', 'cache-control': 'no-cache, no-store, must-revalidate, private', 'expires': '0', 'pragma': 'no-cache'}, 'RetryAttempts': 0}, 'pipeline': {'name': 'emu_air_quality_pipe', 'arn': 'arn:aws:iotanalytics:us-east-1:638201302569:pipeline/emu_air_quality_pipe', 'activiti

## dynamodb lambda

In [183]:
import json
import boto3

db_client = boto3.client(
    'dynamodb',
)

def create_request_item_from_row(row):
    the_item = dict()
    if not "measured_at" in row or row["measured_at"] is None:
        return None
    if not "measured_at" in row or row["eui"] is None:
        return None
    
    the_item["Timestamp"] = { "S": str(row["measured_at"]) }
    the_item["DeviceEUI"] = { "S": str(row["eui"]) }

    if "measured_at" in row and not row["received_at"] is None:
        the_item["ReceivedAt"] = { "S": str(row["received_at"]) }

    if "measured_at" in row and not row["application_id"] is None:
        the_item["ApplicationID"] = { "S": str(row["application_id"]) }

    if "measured_at" in row and not row["co2"] is None:
        the_item["CO2"] = { "N": str(row["co2"]) }

    if "measured_at" in row and not row["hcho"] is None:
        the_item["HCHO"] = { "N": str(row["hcho"]) }

    if "measured_at" in row and not row["humidity"] is None:
        the_item["Humidity"] = { "N": str(row["humidity"]) }

    if "measured_at" in row and not row["light_level"] is None:
        the_item["LightLevel"] = { "N": str(row["light_level"]) }

    if "measured_at" in row and not row["pir"] is None:
        the_item["PIR"] = { "S": str(row["pir"]) }

    if "measured_at" in row and not row["pm10"] is None:
        the_item["PM10"] = { "N": str(row["pm10"]) }

    if "measured_at" in row and not row["pm2_5"] is None:
        the_item["PM2_5"] = { "N": str(row["pm2_5"]) }

    if "measured_at" in row and not row["pressure"] is None:
        the_item["Pressure"] = { "N": str(row["pressure"]) }

    if "measured_at" in row and not row["temperature"] is None:
        the_item["Temperature"] = { "N": str(row["temperature"]) }

    if "measured_at" in row and not row["tvoc"] is None:
        the_item["TVOC"] = { "N": str(row["tvoc"]) }

    return the_item

def lambda_handler(event, context):
    
    event_items = json.loads(event)
    
    steps = range(0, len(event_items), 24)
    
    for step in steps:
        chunk = event_items[step: step+24]

        request_items = [
            {
                'PutRequest': {
                    'Item': create_request_item_from_row(measurement) 
                },
            } for measurement in chunk if not create_request_item_from_row(measurement) is None
        ]
        
        response = db_client.batch_write_item(
            RequestItems={
                'DeviceMeasurements': request_items
            },
        )
    
    return event

lambda_handler("""[{"received_at": "2023-02-28T04:11:21.960167651Z", "application_id": "office-air-quality", "eui": "AC1F09FFFE053AD4", "co2": 408, "hcho": 0.01, "humidity": 54.0, "light_level": 1, "pir": "idle", "pm10": 5, "pm2_5": 5, "pressure": 1010.4, "temperature": 29.7, "tvoc": 80, "measured_at": "2023-02-28T04:11:21.755884916Z"}]""", None)



'[{"received_at": "2023-02-28T04:11:21.960167651Z", "application_id": "office-air-quality", "eui": "AC1F09FFFE053AD4", "co2": 408, "hcho": 0.01, "humidity": 54.0, "light_level": 1, "pir": "idle", "pm10": 5, "pm2_5": 5, "pressure": 1010.4, "temperature": 29.7, "tvoc": 80, "measured_at": "2023-02-28T04:11:21.755884916Z"}]'