# Observed Air Quality (PurpleAir)

This notebook retrieves readings from PurpleAir Sensors in Minneapolis and cleans the entries and saves the results as a csv file.

Documentation is available here: https://api.purpleair.com.
You can read this article for help getting started: https://community.purpleair.com/t/making-api-calls-with-the-purpleair-api/180.

From PurpleAir: 

"The data from individual sensors will update no less than every 30 seconds. As a courtesy, we ask that you limit the number of requests to no more than once every 1 to 10 minutes, assuming you are only using the API to obtain data from sensors. If retrieving data from multiple sensors at once, please send a single request rather than individual requests in succession.

The PurpleAir historical API is released as of July 18, 2022. For more information, view this post: https://community.purpleair.com/t/new-version-of-the-purpleair-api-on-july-18th/1251.

Please let us know if you have any questions or concerns, and have a great day!"

A paper on this process: https://doi.org/10.5194/amt-14-4617-2021 (Link for [Download](https://www.researchgate.net/publication/352663348_Development_and_application_of_a_United_States-wide_correction_for_PM25_data_collected_with_the_PurpleAir_sensor) )

Chat on which PM Estimate to use: https://community.purpleair.com/t/pm2-5-algorithms/3972/6

In [1]:
### Import Packages

# File manipulation

import os # For working with Operating System
import requests # Accessing the Web
import datetime as dt # Working with dates/times
import io # Input/Output Bytes objects

# Database 

import psycopg2
from psycopg2 import sql

# Analysis

import numpy as np
import arcpy
import pandas as pd

## Set Working Environment

In [2]:
# Get CWD

cwd = os.getcwd() # This is a global variable for where the notebook is (must change if running in arcpro)

# Create GeoDataBase
# This is the communal GeoDataBase

if not os.path.exists(os.path.join(cwd, '..', '..', 'data', 'QAQC.gdb')): # If it doesn't exist, create it

    arcpy.management.CreateFileGDB(os.path.join(cwd, '..', '..', 'data'), 'QAQC')

# Make it workspace

arcpy.env.workspace = os.path.join(cwd, '..', '..', 'data', 'QAQC.gdb')

arcpy.env.overwriteOutput = True # Overwrite layers is okay

## Set Bounds for PurpleAir Parameters

In [3]:
#Setting lat/long for PurpleAir API Parameters
nwlng = arcpy.Describe("mpls_8km_wgs").extent.XMin
nwlat = arcpy.Describe("mpls_8km_wgs").extent.YMax
selng = arcpy.Describe("mpls_8km_wgs").extent.XMax
selat = arcpy.Describe("mpls_8km_wgs").extent.YMin

## Get Station IDs

In [4]:
# This function will be used to collect data for multiple public PurpleAir sensors.
def getSensorsData(query='', api_read_key=''):

    # my_url is assigned the URL we are going to send our request to.
    url = 'https://api.purpleair.com/v1/sensors?' + query
    
    print('Here is the full url for the API call:\n\n', url)

    # my_headers is assigned the context of our request we want to make. In this case
    # we will pass through our API read key using the variable created above.
    my_headers = {'X-API-Key':api_read_key}

    # This line creates and sends the request and then assigns its response to the
    # variable, r.
    response = requests.get(url, headers=my_headers)

    # We then return the response we received.
    return response

In [5]:
# This is my personal API key... Please use responsibly!

api = input('Please enter your Purple Air api key')

Please enter your Purple Air api key 51592903-B445-11ED-B6F4-42010A800007


In [6]:
#Set bounding strings for API parameters
bounds_strings = [f'nwlng={nwlng}',
                  f'nwlat={nwlat}',
                  f'selng={selng}',
                  f'selat={selat}']

bounds_string = '&'.join(bounds_strings)

print(bounds_string)

nwlng=-93.43047670599998&nwlat=45.12326797900003&selng=-93.09299994199998&selat=44.81858013100003


In [7]:
# Designating and formatting the fields to request

fields = ['location_type']

fields_string = 'fields=' + '%2C'.join(fields)

print(fields_string)

fields=location_type


In [8]:
# Put it all together

query_string = '&'.join([fields_string, bounds_string])

print(query_string)

fields=location_type&nwlng=-93.43047670599998&nwlat=45.12326797900003&selng=-93.09299994199998&selat=44.81858013100003


In [9]:
# Make the request

response = getSensorsData(query_string, api)

Here is the full url for the API call:

 https://api.purpleair.com/v1/sensors?fields=location_type&nwlng=-93.43047670599998&nwlat=45.12326797900003&selng=-93.09299994199998&selat=44.81858013100003


In [10]:
# Get response into Pandas DataFrame

response_dict = response.json() # Read response as a json (dictionary)

col_names = response_dict['fields']
data = np.array(response_dict['data'])

df = pd.DataFrame(data, columns = col_names)

df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 88 entries, 0 to 87
Data columns (total 2 columns):
 #   Column         Non-Null Count  Dtype
---  ------         --------------  -----
 0   sensor_index   88 non-null     int32
 1   location_type  88 non-null     int32
dtypes: int32(2)
memory usage: 832.0 bytes


In [11]:
# Only want outside sensors

outside_sensors = df[df['location_type']==0] # 0 = outside

len(outside_sensors)

80

In [12]:
#drop the location_type now that we have filtered for outdoor sensors only
df_historic = outside_sensors.drop('location_type', axis=1)

## Pulling Historic Sensor CSVs

### Setting time period

In [13]:
#pulling from 9/1/22 - 4/2/23 

# Start time

end_datetime = dt.datetime(2023,4,30) # April 2, 2023
end_timestamp = int(dt.datetime.timestamp(end_datetime))

# End time

start_datetime = dt.datetime(2022,6,15) # September 1, 2022
start_timestamp = int(dt.datetime.timestamp(start_datetime))

# Sensors

sensor_ids = df_historic.sensor_index.apply(lambda x: int(x))

In [14]:
sensor_ids

0       3088
2      11134
3     142718
4     142720
5     142726
       ...  
83    110979
84    113486
85    116413
86    126487
87    128195
Name: sensor_index, Length: 80, dtype: int64

### Creating the Query for the API

### Calling a SINGLE sensor

In [32]:
#DO NOT EDIT - KEEP AS IS FOR NOW
# Sensor id

sensor_id = sensor_ids[0]

# Timestamp String

time_string = 'start_timestamp=' + str(start_timestamp) + '&end_timestamp=' + str(end_timestamp)

# Average string (in minutes) 1440 is 1 day average

avg_string = 'average=1440'

# Environmental fields

env_fields = ['humidity', 'temperature', 'pressure', 'pm2.5_cf_1']

env_fields_string = 'fields=' + '%2C%20'.join(env_fields)

# Base URL

base_url = f'https://api.purpleair.com/v1/sensors/{sensor_id}/history/csv?'

# Put it all together

query_url = base_url + '&'.join([time_string, avg_string, env_fields_string])

my_headers = {'X-API-Key':api}

# This line creates and sends the request and then assigns its response to the
# variable, r.
r = requests.get(query_url, headers=my_headers)

# Read response as CSV data
csv_data = r.content.decode('utf-8')

# Parse CSV data into pandas DataFrame
df_historic = pd.read_csv(io.StringIO(csv_data), header=None)
df_historic = df_historic.iloc[1:]  # exclude the header row
df_historic.columns = ['timestamp'] + ['sensor_index'] + env_fields
df_historic

Unnamed: 0,humidity,temperature,pressure,pm2.5_cf_1,timestamp,pm2.5_cf_1.1
0,1674864000,3088,40.201,17.581,991.743,0.6330
1,1671753600,3088,42.605,0.743,987.762,0.0785
2,1668729600,3088,48.795,29.080,990.701,54.4190
3,1672531200,3088,53.942,42.170,980.359,8.8820
4,1676764800,3088,50.660,40.259,978.572,7.8060
...,...,...,...,...,...,...
208,1677628800,3088,52.806,44.600,977.058,3.9945
209,1679356800,3088,41.454,42.326,987.113,2.3800
210,1676592000,3088,38.144,22.964,995.503,0.5600
211,1679529600,3088,41.673,41.035,988.664,3.2915


### Creating a 'for' loop to parse through ALL sensor_ids

### Setting Parameters for API

In [15]:
 # Timestamp String

time_string = 'start_timestamp=' + str(start_timestamp) + '&end_timestamp=' + str(end_timestamp)

# Average string (in minutes) 1440 is 1 day average

avg_string = 'average=1440'

# Environmental fields

env_fields = ['humidity', 'temperature', 'pressure', 'pm2.5_cf_1']

env_fields_string = 'fields=' + '%2C%20'.join(env_fields)

# My Header

my_headers = {'X-API-Key': api}

#Creating an empty dataframe to hold all sensor data

cols = ['timestamp'] + ['sensor_index'] + env_fields

datatypes = [str, int, float, float, float, float]

dtypes = np.dtype(list(zip(cols, datatypes)))

df_all_sensors = pd.DataFrame(np.empty(0, dtype = dtypes))

### The For Loop

In [16]:
for sensor_id in sensor_ids:
    # Base URL
    base_url = f'https://api.purpleair.com/v1/sensors/{sensor_id}/history/csv?'

    # Put it all together
    query_url = base_url + '&'.join([time_string, avg_string, env_fields_string])

    r = requests.get(query_url, headers=my_headers)

    if r.status_code == 200:
        
        # Read response as CSV data
        csv_data = r.content.decode('utf-8')
        
        if csv_data.count('\n') == 1: # There is only one line (empty data)
            print(f"No data for sensor {sensor_id}")
        else:

            # Parse CSV data into pandas DataFrame
            df_individual_sensor = pd.read_csv(io.StringIO(csv_data), header=0, dtype = dtypes)
    #         df_individual_sensor = df_individual_sensor.iloc[1:]  # exclude the header row
            df_individual_sensor.columns = ['timestamp'] + ['sensor_index'] + env_fields

            # Concatenate the individual sensor data with the main DataFrame
            df_all_sensors = pd.concat([df_all_sensors, df_individual_sensor], ignore_index=True)
    else:
        print(f"Error fetching data for sensor {sensor_id}: {r.status_code}")

# Display the concatenated DataFrame for all sensors
df_all_sensors

Unnamed: 0,timestamp,sensor_index,humidity,temperature,pressure,pm2.5_cf_1
0,1662249600,3088,43.297,74.176,993.320,0.4255
1,1680220800,3088,61.289,43.874,972.402,6.7030
2,1681257600,3088,30.396,81.503,976.471,5.7295
3,1670976000,3088,62.533,43.450,976.015,2.9040
4,1674950400,3088,34.392,14.193,999.315,0.6595
...,...,...,...,...,...,...
16708,1665446400,128195,36.957,70.990,976.219,7.3895
16709,1667260800,128195,32.001,61.188,981.327,10.2005
16710,1657411200,128195,47.633,84.253,983.682,4.9420
16711,1679875200,128195,33.946,40.133,990.732,2.1750


## Cleaning Historic Data for Analysis

In [26]:
#rename pm2.5 column to pm2_5 for SQL
df_historic = df_all_sensors.rename(columns={'pm2.5_cf_1' : 'pm2_5'})

#changing UNIX date to pd date
df_historic['timestamp'] = pd.to_datetime(df_historic['timestamp'], unit='s')
df_historic.to_csv('historicAirQuality_4-30.csv', index=False)

## Finding Most Recent Use Date

In [23]:
most_recent_timestamps = df_historic.groupby('sensor_index')['timestamp'].max()
most_recent_use = pd.DataFrame({'sensor_index' : most_recent_timestamps.index,
                                'most_recent_timestamp' : most_recent_timestamps.values})
most_recent_use.to_csv('most_recent_sensor_history.csv', index=False)

## QAQC

In [20]:
#create a blank dataframe to hold the errors

purpleair_historic_errors = pd.DataFrame(columns = ['humidity_error', 'temperature_error', 'pressure_error', 'pm2_5_error'])
purpleair_historic_errors['sensor_index'] = df_historic['sensor_index']
purpleair_historic_errors['timestamp'] = df_historic['timestamp']

### Humidity Check

In [21]:
#ranges pulled from https://www.currentresults.com/Weather/Minnesota/humidity-annual.php
#range is actually 40-90 but I was getting tons of errors so I reduced it to 10-90

def check_range(value):
    if pd.isna(value):
        return 'no value given'
    elif value >= 10 and value <= 90:
        pass
    else:
        return 'out of range (10%-90%)'

# Assuming 'df_historic' is your original dataframe

# # Convert 'humidity' column to float
# df_historic['humidity'] = pd.to_numeric(df_historic['humidity'], errors='coerce')

# Apply 'check_range' function to 'humidity' column and store the result in a new column 'humidity_error'
purpleair_historic_errors['humidity_error'] = df_historic['humidity'].apply(check_range)

### Temperature Check

In [22]:
#winter -4 - 28
#spring 22 - 57
#summer 48 - 81
#fall 29 - 59
#ref from https://www.dnr.state.mn.us/climate/summaries_and_publications/normalsportal.html


def check_range(value):
    if pd.isna(value):
        return 'no value given'
    elif float(value) >= -20 and float(value) <= 100:
        pass
    else:
        return 'out of range (-20-100F)'

'''
#if we can get time stamp we should use this with a date check too
#this is not correct - we can do seasonal if we can relate it to date range
def check_range(value):
    if value is None:
        return -1
    if value >= -20 and value <=35:
        return 'winter (-20-35F)'
    if value >10 and value <=70:
        return 'spring (10-70F)'
    if value >30 and value <=100:
        return 'summer (30-100F)'
    if value >15 and value <=70:
        return 'fall (15-70F)'
    else:
        return 'out of range'
'''

# Assuming 'df_historic' is your original dataframe

# Convert 'temperature' column to float
# df_historic['temperature'] = pd.to_numeric(df_historic['temperature'], errors='coerce')

# Apply 'check_range' function to 'temperature' column and store the result in a new column 'temperature_error'
purpleair_historic_errors['temperature_error'] = df_historic['temperature'].apply(check_range)

### Pressure Check

In [23]:
# range is 25 - 35 Hg according to https://barometricpressure.app/minneapolis
# PurpleAir uses Millibars so I used https://www.weather.gov/epz/wxcalc_pressureconvert to convert
# rage is 846.6 - 1185.24

def check_range(value):
    if pd.isna(value):
        return 'no value given'
    elif float(value) >= 830 and float(value) <= 1200:
        pass
    else:
        return 'out of range (830 - 1200 Millibars)'

# Assuming 'df_historic' is your original dataframe

# Convert 'pressure' column to float
# df_historic['pressure'] = pd.to_numeric(df_historic['pressure'], errors='coerce')

# Apply 'check_range' function to 'pressure' column and store the result in a new column 'pressure_error'
purpleair_historic_errors['pressure_error'] = df_historic['pressure'].apply(check_range)

### PM Check

In [24]:
#Average reading in MPLS is 30 ug/m3 per https://www.epa.gov/air-trends/air-quality-cities-and-counties

def check_range(value):
    if value is None:
        return 'no value given'
    try:
        value = float(value)  # Convert value to float
    except ValueError:
        return 'invalid value'
    
    if 0.1 < value < 1000:  # Use numerical comparison on float value
        pass
    else:
        return 'above 1000'


#    if value == 0:
 #       return '0'
#    if value >0.1 and value <=10:
#        return 'PM2.5 0.1-10'
#    if value >10 and value <=20:
#        return 'PM2.5 10-20'
#    if value >20 and value <=30:
#        return 'PM2.5 20-30'
#    if value >30 and value <=40:
#        return 'PM2.5 30-40'
#    if value >40 and value <=50:
#        return 'PM2.5 40-50'
#    if value >50 and value <=60:
#        return 'PM2.5 50-60'
#    if value >60 and value <=70:
#        return 'PM2.5 60-70'

# Convert 'pm2_5' column to float
# df_historic['pm2_5'] = pd.to_numeric(df_historic['pm2_5'], errors='coerce')

    
purpleair_historic_errors['pm2_5_error'] = df_historic['pm2_5'].apply(check_range)

In [25]:
# Removing rows from the error table that don't have any errors

#This may not be doing what we think it's doing... 

purpleair_historic_errors = purpleair_historic_errors.dropna(subset=purpleair_historic_errors.columns.difference(['sensor_index', 'timestamp']), how='all')
purpleair_historic_errors

Unnamed: 0,humidity_error,temperature_error,pressure_error,pm2_5_error,sensor_index,timestamp
4,,,,above 1000,3088,2022-10-17
46,,,,above 1000,3088,2022-12-23
61,,,,above 1000,3088,2022-12-24
177,,,,above 1000,3088,2022-10-18
207,,,,above 1000,3088,2022-11-12
...,...,...,...,...,...,...
12498,,,,above 1000,128195,2022-09-22
12500,,,,above 1000,128195,2022-11-13
12503,,,,above 1000,128195,2023-02-10
12508,,,,above 1000,128195,2023-02-03


## Save to Local and Remote Databases

In [26]:
### Local (Values)

table_name = 'purpleair_historic'

if table_name in arcpy.ListTables(): # Delete table if it exists
    arcpy.management.Delete(table_name)

cols_for_gdb = ['sensor_index', 'timestamp', 'humidity', 'temperature', 'pressure', 'pm2_5']

vals = df_historic[cols_for_gdb].values

dtypes = [('sensor_index', '<U6'), ('timestamp', 'M8[us]'),
          ('humidity', '<f8'), ('temperature', '<f8'),
          ('pressure', '<f8'), ('pm2_5', '<f8')]

array_for_gdb = numpy.rec.fromrecords(vals, names = cols_for_gdb, dtype=dtypes)

arcpy.da.NumPyArrayToTable(array_for_gdb, os.path.join(arcpy.env.workspace,table_name))

# df_historic.to_csv(os.path.join(save_path, "purpleair_historic.csv"), index=False)

  array_for_gdb = numpy.rec.fromrecords(vals, names = cols_for_gdb, dtype=dtypes)


In [32]:
### Local (Errors)

table_name = 'purpleair_historic_errors'

if table_name in arcpy.ListTables(): # Delete table if it exists
    arcpy.management.Delete(table_name)

cols_for_gdb = ['sensor_index', 'timestamp', 'humidity_error', 'temperature_error', 'pressure_error', 'pm2_5_error']

vals = purpleair_historic_errors[cols_for_gdb].values

dtypes = [('sensor_index', '<U6'), ('timestamp', 'M8[us]'),
          ('humidity_error', str), ('temperature_error', str),
          ('pressure_error', str), ('pm2_5_error', str)]

array_for_gdb = numpy.rec.fromrecords(vals, names = cols_for_gdb, dtype=dtypes)

arcpy.da.NumPyArrayToTable(array_for_gdb, os.path.join(arcpy.env.workspace,table_name))

  array_for_gdb = numpy.rec.fromrecords(vals, names = cols_for_gdb, dtype=dtypes)


## Connecting to the Server

In [11]:
# Get credentials

cred_pth = os.path.join(os.getcwd(), '..', '..', 'database', 'db_credentials.txt')

with open(cred_pth, 'r') as f:
    
    creds = f.readlines()[0].rstrip('\n').split(', ')
    
# Connect to PostGIS Database

pg_connection_dict = dict(zip(['dbname', 'user', 'password', 'port', 'host'], creds))

connection = psycopg2.connect(**pg_connection_dict)

0

## Insert Data into SQL Table

In [70]:
# connect to the cursor
cur = connection.cursor()

# iterate over the dataframe and insert each row into the database using a SQL INSERT statement
for index, row in df_historic.iterrows():
    cur.execute('''
    INSERT INTO PURPLEAIR_HISTORIC (sensor_index, timestamp, humidity, temperature, pressure, pm2_5) 
    VALUES (%s, %s, %s, %s, %s, %s) 
    ''', (row['sensor_index'], row['timestamp'], row['humidity'], row['temperature'], row['pressure'], row['pm2_5']))
    connection.commit()

for i, r in purpleair_historic_errors.iterrows():
    cur.execute('''
    INSERT INTO PURPLEAIR_HISTORIC_ERRORS (sensor_index, timestamp, humidity_error, temperature_error, pressure_error, pm2_5_error) 
    VALUES (%s, %s, %s, %s, %s, %s) 
    ''', (r['sensor_index'], r['timestamp'], r['humidity_error'], r['temperature_error'], r['pressure_error'], r['pm2_5_error']))
    connection.commit()

# commit the changes to the database and close the cursor and connection
cur.close()
connection.close()