In [2]:
import json
import datetime
import folium
import pandas as pd
import sqlite3

In [3]:
with open('sqs_messages.json', 'r') as infile:
    msgs = json.load(infile)
print("Num messages:", len(msgs))

Num messages: 26810


In [4]:
bodies = []
for msg in msgs:
    bodies.append(json.loads(msg["Body"]))

measurements = []
for body in bodies:
    measurements.append(json.loads(body["Message"]))
    
print("Num measurements:", len(measurements))

Num measurements: 26810


# Example SQS Message

In [5]:
print(json.dumps(msgs[0], indent=2))

{
  "MessageId": "f5a789d2-b54e-4e73-99b8-9c74a97a45a3",
  "ReceiptHandle": "AQEBVhqbpA9nlS8Xo5KrzxWO293Yetz5pFaDuf1Pd7Mw75seY4vJiAU/gMzKVs923L+gVM/vCPdvbrO45GpIVfs5stVYJXW0ng0hKCnAPxe5o8ovuBmITYmcubeOz5Pj+Yacs6scCcz4K9LiubZgh3Cj/EiafZORPY4v3kHHJ3KdZlAnPv2/ldTRYvCde1AgB2V3V2JSeBNrdpwKlVMXdB8mEEtCW+oOtkQQTo4VrZL3ZdaFtX0TBbUtL7RLYiBKy5n3x22LsL1EK01HDoJgNXmv12XpFQsprw52oN9233CKDl0q3ItmmFDGEM0MSodNRcAORzJHusQZMtqilB1aT7fM9EKFxBlhJO1HA3sWPFLTQTXtxQ5pKs/Heo6pOvPIt0TPhBL3Lrjp0CUD7j7GU9qw4w==",
  "MD5OfBody": "16b5ba30c1c5e92dfe254b77ed67719c",
  "Body": "{\n  \"Type\" : \"Notification\",\n  \"MessageId\" : \"1be90193-377f-59b4-8a81-5afe548b87b5\",\n  \"TopicArn\" : \"arn:aws:sns:us-east-1:470049585876:OPENAQ_NEW_MEASUREMENT\",\n  \"Message\" : \"{\\\"date\\\":{\\\"utc\\\":\\\"2022-08-09T18:00:00.000Z\\\",\\\"local\\\":\\\"2022-08-09T14:00:00-04:00\\\"},\\\"parameter\\\":\\\"so2\\\",\\\"value\\\":38,\\\"unit\\\":\\\"\u00b5g/m\u00b3\\\",\\\"location\\\":\\\"Romeral\\\",\\\"city\\\":\\\"Catemu\\

# Example SQS Message Body

In [6]:
print(json.dumps(bodies[0], indent=2))

{
  "Type": "Notification",
  "MessageId": "1be90193-377f-59b4-8a81-5afe548b87b5",
  "TopicArn": "arn:aws:sns:us-east-1:470049585876:OPENAQ_NEW_MEASUREMENT",
  "Message": "{\"date\":{\"utc\":\"2022-08-09T18:00:00.000Z\",\"local\":\"2022-08-09T14:00:00-04:00\"},\"parameter\":\"so2\",\"value\":38,\"unit\":\"\u00b5g/m\u00b3\",\"location\":\"Romeral\",\"city\":\"Catemu\",\"country\":\"CL\",\"coordinates\":{\"latitude\":-32.823956,\"longitude\":-71.006441},\"attribution\":[{\"name\":\"SINCA\",\"url\":\"http://sinca.mma.gob.cl/\"},{\"name\":\"Fundicion Chagres de Anglo American Sur S.A.\"}],\"sourceName\":\"Chile - SINCA\",\"sourceType\":\"government\",\"mobile\":false}",
  "Timestamp": "2022-08-10T07:43:48.707Z",
  "SignatureVersion": "1",
  "Signature": "2hpVmWT6zgiT/EGuwAkCiBKZ198WKB5hSyPY/rQwoNutiXmLroRDoPgGttvkfFvE7PI4oUhPW7d/b/xZGRRYePFW0HV5EEf3k8NM3tbIx451wUCNSYeyjw6JwSIX2Y3+gzpPS/HiS4WRKHrjW6yW7AStxdKZ8076h1EU+1dKIs0RYXbSkkYNa1Rz1gBIUCvsFgQgOZueX2ZcKP2zZ9FYRxOcBCyRhbSGbeTJfB2bHksaGBn

# Example Measurement (from JSON body)

In [7]:
print(json.dumps(measurements[0], indent=2))

{
  "date": {
    "utc": "2022-08-09T18:00:00.000Z",
    "local": "2022-08-09T14:00:00-04:00"
  },
  "parameter": "so2",
  "value": 38,
  "unit": "\u00b5g/m\u00b3",
  "location": "Romeral",
  "city": "Catemu",
  "country": "CL",
  "coordinates": {
    "latitude": -32.823956,
    "longitude": -71.006441
  },
  "attribution": [
    {
      "name": "SINCA",
      "url": "http://sinca.mma.gob.cl/"
    },
    {
      "name": "Fundicion Chagres de Anglo American Sur S.A."
    }
  ],
  "sourceName": "Chile - SINCA",
  "sourceType": "government",
  "mobile": false
}


# Reading SQS attributes instead of JSON body

In [8]:
measurements_from_attributes = []
for body in bodies:
    obj = {}
    for key, attr in body["MessageAttributes"].items():
        typ = attr["Type"]
        val = attr["Value"]
        if typ == "String":
            obj[key] = val
        elif typ == "Number":
            obj[key] = float(val)
        else:
            assert False, "unknown type!"
    measurements_from_attributes.append(obj)
print(json.dumps(measurements_from_attributes[0], indent=2))

{
  "country": "CL",
  "unit": "\u00b5g/m\u00b3",
  "date_utc": "2022-08-09T18:00:00.000Z",
  "date_local": "2022-08-09T14:00:00-04:00",
  "city": "Catemu",
  "sourceType": "government",
  "parameter": "so2",
  "latitude": -32.823956,
  "location": "Romeral",
  "sourceName": "Chile - SINCA",
  "value": 38.0,
  "longitude": -71.006441
}


# Number of times each key was found in the JSON body

In [9]:
keys = {}
for measurement in measurements:
    for key in measurement.keys():
        keys[key] = keys.get(key, 0) + 1
print(json.dumps(keys, indent=2, default=str))

{
  "date": 26810,
  "parameter": 26810,
  "value": 26810,
  "unit": 26810,
  "location": 26810,
  "city": 26810,
  "country": 26810,
  "coordinates": 26701,
  "attribution": 25446,
  "sourceName": 26810,
  "sourceType": 26810,
  "mobile": 26810,
  "averagingPeriod": 20018
}


# Number of times each country was seen

In [10]:
countries = {}
for measurement in measurements:
    country = measurement["country"]
    countries[country] = countries.get(country, 0) + 1
print(json.dumps(countries, indent=2, default=str))

{
  "CL": 5428,
  "BR": 2136,
  "CI": 24,
  "CN": 120,
  "BE": 584,
  "CO": 879,
  "CR": 24,
  "CY": 24,
  "CW": 24,
  "CZ": 515,
  "DZ": 24,
  "FI": 93,
  "ET": 49,
  "GB": 796,
  "FR": 1995,
  "GR": 94,
  "GH": 24,
  "GN": 24,
  "HK": 91,
  "HU": 217,
  "GT": 45,
  "HR": 26,
  "ID": 48,
  "IN": 123,
  "IE": 14,
  "CA": 383,
  "US": 2672,
  "CD": 1,
  "CF": 1,
  "AJ": 1,
  "LA": 25,
  "BK": 2,
  "MX": 139,
  "BM": 1,
  "EG": 1,
  "PK": 100,
  "IT": 5523,
  "IQ": 24,
  "BG": 1,
  "UG": 9,
  "KG": 25,
  "IZ": 1,
  "SU": 1,
  "TX": 1,
  "CH": 3,
  "KV": 1,
  "VM": 2,
  "MD": 1,
  "TI": 1,
  "AE": 3,
  "NI": 1,
  "KU": 1,
  "AG": 1,
  "JO": 24,
  "KZ": 24,
  "KW": 24,
  "LU": 19,
  "LK": 24,
  "MK": 65,
  "LV": 20,
  "MM": 48,
  "ME": 30,
  "ML": 24,
  "MN": 620,
  "MT": 18,
  "NO": 170,
  "NL": 281,
  "NP": 72,
  "PE": 160,
  "PL": 1364,
  "PT": 339,
  "RO": 242,
  "RS": 96,
  "SA": 19,
  "SE": 83,
  "SD": 37,
  "SK": 74,
  "SI": 13,
  "TH": 222,
  "TJ": 10,
  "TD": 12,
  "TM": 12,
  "VN

# Last measurement seen of each country

country_latest = {}
for measurement in measurements:
    country = measurement["country"]
    date = datetime.datetime.strptime(measurement["date"]["utc"], "%Y-%m-%dT%H:%M:%S.%fZ")
    country_latest.setdefault(country, date)
    if date < country_latest[country]:
        country_latest[country] = date
print(json.dumps(country_latest, indent=2, default=str))

# Attributions (for Belgium)

In [11]:
attrlens = {}
attrs = {}
for measurement in measurements:
    country = measurement["country"]
    if country != "BE":
        continue
    if "attribution" in measurement:
        attribution = measurement["attribution"]
        attrlens[len(attribution)] = attrlens.get(len(attribution), 0) + 1
        for attr in attribution:
            attr = attr["name"]
            attrs[attr] = attrs.get(attr, 0) + 1
print("Number of attributions:")
print(json.dumps(attrlens, indent=2, default=str))
print("Attributions:")
print(json.dumps(attrs, indent=2, default=str))

Number of attributions:
{
  "1": 584
}
Attributions:
{
  "EEA": 584
}


# SourceName (for Belgium)

In [12]:
sourcenames = {}
for measurement in measurements:
    country = measurement["country"]
    if country != "BE":
        continue
    sourcename = measurement["sourceName"]
    sourcenames[sourcename] = sourcenames.get(sourcename, 0) + 1
print(json.dumps(sourcenames, indent=2, default=str))

{
  "EEA Belgium": 584
}


# Averaging period (for Belgium)

In [19]:
averaging_periods = {}
for measurement in measurements:
    country = measurement["country"]
    if country != "BE":
        continue
    avg_period = str(measurement["averagingPeriod"]["value"]) + " " + str(measurement["averagingPeriod"]["unit"])
    averaging_periods[avg_period] = averaging_periods.get(avg_period, 0) + 1
print(json.dumps(averaging_periods, indent=2, default=str))

{
  "1 hours": 584
}


# Is Mobile?

In [12]:
mobile = {}
for measurement in measurements:
    country = measurement["country"]
    if country != "BE":
        continue
    if "mobile" in measurement:
        mobile[measurement["mobile"]] = mobile.get(measurement["mobile"], 0) + 1
print(json.dumps(mobile, indent=2, default=str))

{
  "false": 584
}


# Pandas Dataframe

In [13]:
df = pd.DataFrame(measurements_from_attributes)
df_be = df[df["country"] == "BE"]
df_be

Unnamed: 0,country,unit,date_utc,date_local,city,sourceType,parameter,latitude,location,sourceName,value,longitude
847,BE,µg/m³,2022-08-11T01:00:00.000Z,2022-08-11T03:00:00+02:00,Limburg,government,no2,50.882299,BETN046,EEA Belgium,15.0,5.618874
848,BE,µg/m³,2022-08-11T01:00:00.000Z,2022-08-11T03:00:00+02:00,Liege,government,so2,50.613413,BETR222,EEA Belgium,2.0,5.570223
849,BE,µg/m³,2022-08-11T01:00:00.000Z,2022-08-11T03:00:00+02:00,Hainaut,government,co,50.407810,BETR512,EEA Belgium,180.0,4.395897
920,BE,µg/m³,2022-08-11T01:00:00.000Z,2022-08-11T03:00:00+02:00,Liege,government,so2,50.620671,BETR223,EEA Belgium,0.0,5.516405
921,BE,µg/m³,2022-08-11T01:00:00.000Z,2022-08-11T03:00:00+02:00,Oost-Vlaanderen,government,so2,51.263037,BETR892,EEA Belgium,3.0,4.278890
...,...,...,...,...,...,...,...,...,...,...,...,...
5631,BE,µg/m³,2022-08-11T01:00:00.000Z,2022-08-11T03:00:00+02:00,Namur,government,no2,50.502716,BETN073,EEA Belgium,11.0,4.988423
5632,BE,µg/m³,2022-08-11T01:00:00.000Z,2022-08-11T03:00:00+02:00,Antwerpen,government,so2,51.250106,BETR897,EEA Belgium,1.0,4.342105
5634,BE,µg/m³,2022-08-11T01:00:00.000Z,2022-08-11T03:00:00+02:00,Brussels-Capital Region,government,o3,50.858031,BETB011,EEA Belgium,87.0,4.288336
5678,BE,µg/m³,2022-08-11T01:00:00.000Z,2022-08-11T03:00:00+02:00,Liege,government,o3,50.583614,BETR240,EEA Belgium,73.5,5.397446


In [14]:
df_be.groupby("city").agg(count = ("city", "count"))

Unnamed: 0_level_0,count
city,Unnamed: 1_level_1
Antwerpen,106
Brussels-Capital Region,72
Hainaut,58
Liege,120
Limburg,18
Luxembourg,40
Namur,28
Oost-Vlaanderen,72
Vlaams-Brabant,28
West-Vlaanderen,42


In [18]:
df_be.groupby("location").agg(count = ("location", "count"))

Unnamed: 0_level_0,count
location,Unnamed: 1_level_1
BELAL01,4
BELAL03,5
BELAL04,5
BELAL05,2
BELHB23,6
...,...
BETR831,4
BETR834,4
BETR891,4
BETR892,4


In [17]:
df_be.groupby("city").agg(count = ("city", "count"))

Unnamed: 0_level_0,count
city,Unnamed: 1_level_1
Antwerpen,106
Brussels-Capital Region,72
Hainaut,58
Liege,120
Limburg,18
Luxembourg,40
Namur,28
Oost-Vlaanderen,72
Vlaams-Brabant,28
West-Vlaanderen,42


In [16]:
df_be.groupby("location").agg(
    location = ("location", "min"),
    count = ("value", "count"),
    avg = ("value", "mean"),
    longitude = ("longitude", "mean"),
    latitude = ("latitude", "mean"),
    unit = ("unit", "min")
)

Unnamed: 0_level_0,location,count,avg,longitude,latitude,unit
location,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
BELAL01,BELAL01,4,12.725000,4.385224,51.236194,µg/m³
BELAL03,BELAL03,5,18.824000,4.201460,51.253965,µg/m³
BELAL04,BELAL04,5,18.136000,4.293329,51.290668,µg/m³
BELAL05,BELAL05,2,8.460000,4.278890,51.263118,µg/m³
BELHB23,BELHB23,6,10.326667,4.341005,51.170298,µg/m³
...,...,...,...,...,...,...
BETR831,BETR831,4,43.000000,4.339710,51.348795,µg/m³
BETR834,BETR834,4,15.900000,4.380104,51.092000,µg/m³
BETR891,BETR891,4,6.750000,4.385360,51.255807,µg/m³
BETR892,BETR892,4,18.250000,4.278890,51.263037,µg/m³


In [58]:
for measurement in measurements:
    if measurement["country"] == "BE":
        print(json.dumps(measurement, indent=2))
        break

{
  "date": {
    "utc": "2022-08-11T01:00:00.000Z",
    "local": "2022-08-11T03:00:00+02:00"
  },
  "parameter": "no2",
  "value": 15,
  "unit": "\u00b5g/m\u00b3",
  "averagingPeriod": {
    "unit": "hours",
    "value": 1
  },
  "location": "BETN046",
  "city": "Limburg",
  "country": "BE",
  "coordinates": {
    "latitude": 50.8822988262256,
    "longitude": 5.61887398523951
  },
  "attribution": [
    {
      "name": "EEA",
      "url": "http://www.eea.europa.eu/themes/air/air-quality"
    }
  ],
  "sourceName": "EEA Belgium",
  "sourceType": "government",
  "mobile": false
}
