In [119]:
from pyathena import connect
import timeit
import numpy as np
import datetime as dt 

In [117]:
print('mem')
exec_time = (f"Time to run (s): {np.round(timeit.timeit(),2)}")
print(exec_time)

mem
Time to run (s): 0.01


```
s3://openaq-data-archive/records/csv.gz/
├─ year=2025/
│  ├─ month=10/
│  │  ├─ locationid=2178/
│  │  ├─ locationid=827/
│  │  └─ ...
│  └─ month=11/...
└─ year=2024/...
```

Example file path:

` /records/csv.gz/locationid=2178/year=2022/month=05/location-2178-20220503.csv.gz `

In [90]:
cursor.execute("CREATE DATABASE IF NOT EXISTS openaq_db;")

<pyathena.cursor.Cursor at 0x7f923ec41590>

Following the S3 schema, I will create partitons down to the month level. From there, I can manually create partitions for days within only the current month.

### First approach:

In [194]:
create_table_query = """
CREATE EXTERNAL TABLE IF NOT EXISTS openaq_db.openaqMeasurements (
  location_id INT,
  sensors_id INT,
  location STRING,
  datetime STRING,
  lat DOUBLE,
  lon DOUBLE,
  parameter STRING,
  units STRING,
  value DOUBLE
)
PARTITIONED BY (locationid STRING, year STRING, month STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES ('serialization.format'=',','field.delim'=',')
LOCATION 's3://openaq-data-archive/records/csv.gz/'
TBLPROPERTIES ('skip.header.line.count'='1');
"""

In [148]:
cursor.execute(create_table_query)

<pyathena.cursor.Cursor at 0x7f923ec41590>

Registering partitions:

https://stackoverflow.com/questions/7370801/how-do-i-measure-elapsed-time-in-python

In [118]:
cursor.execute("MSCK REPAIR TABLE openaq_db.openaqMeasurements;")

exec_time = (f"Time to run (s): {np.round(timeit.timeit(),2)}")
print(exec_time)

Query canceled by user.


OperationalError: Query cancelled by user

Showing that our table is present:

In [None]:
cursor.execute("SHOW TABLES IN openaq_db;")
tables = cursor.fetchall()
print("Tables in openaqMeasurements:", tables)

Taking a sample row:

In [None]:
cursor.execute("SELECT * FROM openaq_db.openaqMeasurements LIMIT 1;")
print(cursor.fetchall())

Displaying partitions:

In [140]:
cursor.execute("SHOW PARTITIONS openaq_db.openaqMeasurements;")
print(cursor.fetchall())

[('locationid=3273175/year=2025/month=10/day=22',)]


So far, this approach is very slow.

I am creating partitions for locationid STRING, year STRING, month STRING across all data and possibly going back to 2015.

I will try approach 2:

## Approach 2

In this approach, I will initially establish no partitions.

Example file path:

`/records/csv.gz/locationid=2178/year=2022/month=05/location-2178-20220503.csv.gz`

The first partition is for `locationid`, so we will have to establish partitions for each item in our list of stations. 

I can then programatically insert the current year, and month. Depending on performance, I can even include the exact day

In [128]:
station_IDs = [3273175,
 3400961,
 3400987,
 3400983,
 3400988,
 3400965,
 3400964,
 3400982,
 3400948,
 5354765,
 5477232,
 5734934,
 4869634,
 5217366,
 5276665,
 8875,
 26780,
 5468690,
 617545,
 3400946]

cursor.execute("""
ALTER TABLE openaq_db.openaqMeasurements ADD
PARTITION (year='2025', month='10', day='22', locationid='3273175')
LOCATION 's3://openaq-data-archive/records/csv.gz/locationid=3273175/year=2025/month=10/day=22/';
""")

In [141]:
cursor.execute("SHOW PARTITIONS openaq_db.openaqMeasurements;")
print(cursor.fetchall())

[('locationid=3273175/year=2025/month=10/day=22',)]


As expected, this is creating partitions for month and date *within* the location partition, meaning I will need to run this programatically for each location.

In [129]:
year = dt.datetime.now().year
month = dt.datetime.now().month
day = dt.datetime.now().day

In [192]:
cursor.execute("DROP TABLE openaq_db.openaqMeasurements")

<pyathena.cursor.Cursor at 0x7f923ec41590>

In [195]:
cursor.execute(create_table_query)

<pyathena.cursor.Cursor at 0x7f923ec41590>

` /records/csv.gz/locationid=2178/year=2022/month=05/location-2178-20220503.csv.gz`

In [181]:
f"s3://openaq-data-archive/records/csv.gz/locationid={location}/year={year}/month={month}/location-{location}-{year}{month}{day}.csv.gz"

's3://openaq-data-archive/records/csv.gz/locationid=3400946/year=2025/month=10/location-3400946-20251022.csv.gz'

In [196]:
for location in station_IDs:
    cursor.execute(f"""
    ALTER TABLE openaq_db.openaqMeasurements ADD
    PARTITION (year='{year}', month='{month}', locationid='{location}')
    LOCATION 's3://openaq-data-archive/records/csv.gz/locationid={location}/year={year}/month={month}/';
    """)

exec_time = (f"Time to run (s): {np.round(timeit.timeit(),2)}")
print(exec_time)

Time to run (s): 0.01


That took longer than .01 seconds - more like 5 seconds.

In [189]:
cursor.execute("SHOW PARTITIONS openaq_db.openaqMeasurements;")
print(cursor.fetchall())

[('locationid=3400982/year=2025/month=10',), ('locationid=5734934/year=2025/month=10',), ('locationid=3400948/year=2025/month=10',), ('locationid=617545/year=2025/month=10',), ('locationid=3273175/year=2025/month=10',), ('locationid=5468690/year=2025/month=10',), ('locationid=5354765/year=2025/month=10',), ('locationid=5276665/year=2025/month=10',), ('locationid=3400983/year=2025/month=10',), ('locationid=3400964/year=2025/month=10',), ('locationid=3400988/year=2025/month=10',), ('locationid=5477232/year=2025/month=10',), ('locationid=4869634/year=2025/month=10',), ('locationid=26780/year=2025/month=10',), ('locationid=3400961/year=2025/month=10',), ('locationid=8875/year=2025/month=10',), ('locationid=3400946/year=2025/month=10',), ('locationid=3400965/year=2025/month=10',), ('locationid=3400987/year=2025/month=10',), ('locationid=5217366/year=2025/month=10',)]


We have the partitions we need now

In [197]:
cursor.execute("SELECT * FROM openaq_db.openaqMeasurements LIMIT 1;")
print(cursor.fetchall())

[(5354765, 13805636, '"Castaic-5324698"', '"2025-10-13T01:00:00-07:00"', None, None, '"pm25"', '"µg/m³"', None, '5354765', '2025', '10')]


This conforms to the expected units:

    location_id INT,
    sensors_id INT,
    location STRING,
    datetime STRING,
    lat DOUBLE,
    lon DOUBLE,
    parameter STRING,
    units STRING,
    value DOUBLE

And also appends the `locationid`, `year` and `month`

In [201]:
cursor.execute("SELECT location_id, parameter, value, units FROM openaq_db.openaqMeasurements WHERE value IS NOT NULL LIMIT 10;")
print(cursor.fetchall())

[]


The entire month's data may be null across all 20 stations

In [206]:
cursor.execute("""
SELECT location_id, location, parameter, value, units 
FROM openaq_db.openaqMeasurements 
WHERE value IS NOT NULL
LIMIT 1;
""")
print(cursor.fetchall())

[]


Adding September to the partition:

In [207]:
for location in station_IDs:
    cursor.execute(f"""
    ALTER TABLE openaq_db.openaqMeasurements ADD
    PARTITION (year='2025', month='09', locationid='{location}')
    LOCATION 's3://openaq-data-archive/records/csv.gz/locationid={location}/year=2025/month=09/';
    """)

Time to run (s): 0.02


In [208]:
cursor.execute("""
SELECT location_id, location, parameter, value, units 
FROM openaq_db.openaqMeasurements 
WHERE value IS NOT NULL
LIMIT 5;
""")
print(cursor.fetchall())

[]


In [210]:
cursor.execute("""
SELECT location_id, location, parameter, value, units 
FROM openaq_db.openaqMeasurements 
WHERE month = '09'
LIMIT 5;
""")
print(cursor.fetchall())

[(3400982, '"Sunkist Park-5805986"', '"pm25"', None, '"µg/m³"'), (3400982, '"Sunkist Park-5805986"', '"pm25"', None, '"µg/m³"'), (3400982, '"Sunkist Park-5805986"', '"pm25"', None, '"µg/m³"'), (3400982, '"Sunkist Park-5805986"', '"pm25"', None, '"µg/m³"'), (3400982, '"Sunkist Park-5805986"', '"pm25"', None, '"µg/m³"')]


In [213]:
cursor.execute("""
SELECT *
FROM openaq_db.openaqMeasurements
LIMIT 10;
"""
)
print(cursor.fetchall())

[(3400948, 12178308, '"Malibu - Cavalleri-3371036"', '"2025-10-13T01:00:00-07:00"', None, None, '"pm25"', '"µg/m³"', None, '3400948', '2025', '10'), (3400948, 12178308, '"Malibu - Cavalleri-3371036"', '"2025-10-13T02:00:00-07:00"', None, None, '"pm25"', '"µg/m³"', None, '3400948', '2025', '10'), (3400948, 12178308, '"Malibu - Cavalleri-3371036"', '"2025-10-13T03:00:00-07:00"', None, None, '"pm25"', '"µg/m³"', None, '3400948', '2025', '10'), (3400948, 12178308, '"Malibu - Cavalleri-3371036"', '"2025-10-13T04:00:00-07:00"', None, None, '"pm25"', '"µg/m³"', None, '3400948', '2025', '10'), (3400948, 12178308, '"Malibu - Cavalleri-3371036"', '"2025-10-13T05:00:00-07:00"', None, None, '"pm25"', '"µg/m³"', None, '3400948', '2025', '10'), (3400948, 12178308, '"Malibu - Cavalleri-3371036"', '"2025-10-13T06:00:00-07:00"', None, None, '"pm25"', '"µg/m³"', None, '3400948', '2025', '10'), (3400948, 12178308, '"Malibu - Cavalleri-3371036"', '"2025-10-13T07:00:00-07:00"', None, None, '"pm25"', '"µg/m

In [78]:
cursor.execute("SHOW TABLES IN openaq_db;")
tables = cursor.fetchall()
print("Tables in openaqMeasurements:", tables)

Tables in openaqMeasurements: [('openaqmeasurements',)]


Pulling a sample row:

In [61]:
cursor.execute("SELECT * FROM openaq_db.openaq_measurements LIMIT 1;")
print(cursor.fetchall())

[(100, 4231, 'Badhoevedorp-Sloterweg-100', '2018-09-28T03:00:00+02:00', 52.334, 4.77401, 'pm10', 'µg/m³', 24.3)]


This conforms to the expected units:

    location_id INT,
    sensors_id INT,
    location STRING,
    datetime STRING,
    lat DOUBLE,
    lon DOUBLE,
    parameter STRING,
    units STRING,
    value DOUBLE

For the sake of testing, I manually copied a list of 20 station IDs related to PM2.5 sensors in LA

In [None]:
cursor.execute("SELECT * FROM openaq_db.openaq_measurements WHERE sensors_id = 3273175 LIMIT 1;")
print(cursor.fetchall())

In [66]:
cursor.execute("MSCK REPAIR TABLE openaq_db.openaq_measurements;")
print(cursor.fetchall())

[]


In [67]:
cursor.execute("SHOW PARTITIONS openaq_db.openaq_measurements;")
print(cursor.fetchall())

OperationalError: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Table openaq_db.openaq_measurements is not a partitioned table