## Testing the use of Athena for bulk querying of the cloud prediction store

We want to enable bulk queries of the data lake/analytical cloud store of precalculations. Instead of having to run complex scans over DynamoDB, since we don't have strict latency requirements, we can use Athena to run queries over the structured precalculation data in S3.

### Querying Athena

We will use `awswrangler` as a python interface to query Athena, and also to read and write from S3. It's an open source package developed by AWS to make data lake interactions easier for data scientists/engineers.

In [4]:
import awswrangler as wr
import pandas as pd

I've created a table called `eos3b5e` for now, to demonstrate how we can use Athena on top of data in S3. Going forward, the inference pipeline will take care of creating tables/inserting new data, but for now this one was click-ops only.

In [15]:
df = wr.athena.read_sql_query("SELECT * FROM eos3b5e limit 20", database="precalcs_test")
df.head()

Using the test input, we can read it in and get a list of SMILEs strings. We read a CSV because this is how we expect users to interact with the Ersilia Model Hub.

In [10]:
model_id = "eos3b5e"
df_input = pd.read_csv("../test_input.csv", header=None)
df_input["input"] = df_input[0].apply(lambda x: f"'{x}'")
smiles = ",".join(df_input.input.values)

We collect all the smiles strings and use them to form a simple query with a WHERE clause.

In [36]:
query = f"select * from {model_id} where input in ({smiles})"

In [37]:
df_out = wr.athena.read_sql_query(query, database="precalcs_test")
df_out.head()

Testing the speed with and input size of 1000; we see its around 15s on average. This seems reasonable for our purposes.

(`test_input_large.csv` is just a copy of the reference library)

In [45]:
%%timeit
model_id = "eos3b5e"
dfi = pd.read_csv("../test_input_large.csv", nrows=1000)
dfi["input"] = dfi[0].apply(lambda x: f"'{x}'")
smiles = ",".join(dfi.input.values)
query = f"select * from {model_id} where input in ({smiles})"
df_out_large = wr.athena.read_sql_query(query, database="precalcs_test")

16.4 s ± 857 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [53]:
model_id = "eos3b5e"
dfi = pd.read_csv("../test_input_large.csv", nrows=1000)
dfi["input"] = dfi[0].apply(lambda x: f"'{x}'")
smiles = ",".join(dfi.input.values)
query = f"select * from {model_id} where input in ({smiles})"
df_out_large = wr.athena.read_sql_query(query, database="precalcs_test")

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 999 entries, 0 to 998
Data columns (total 3 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   key     999 non-null    string 
 1   input   999 non-null    string 
 2   mw      999 non-null    float32
dtypes: float32(1), string(2)
memory usage: 19.6 KB


We tried with 10k inputs and got ThrottlingErrors. This approach doesn't feel scalable and the condition itself is not best practice for most SQL engines. Instead, we can try to upload the input file to the data lake, then use an INNER JOIN within Athena to return the desired predictions.

In [58]:
model_id = "eos3b5e"
dfi = pd.read_csv("../test_input_large.csv", nrows=input_size)

In [61]:
request_id = "test_request"
dfi["request"] = request_id
dfi["model"] = model_id
dfi.head()

We add some supplementary information. This means that for every request to the API, we record:
- a unique identifier for that request
- the model requested
- the molecules requested

We can play around with the partition scheme and add more information later (like a unique _user_ ID).

In [64]:
wr.s3.to_parquet(
    df=dfi,
    path="s3://precalculations-bucket/in/test",
    dataset=True,
    database="precalcs_test",
    table="requests",
    partition_cols=["model", "request"]
)

{'paths': ['s3://precalculations-bucket/in/test/model=eos3b5e/request=test_request/fed3ae8f053349939b4bd878df8b8727.snappy.parquet'],
 'partitions_values': {'s3://precalculations-bucket/in/test/model=eos3b5e/request=test_request/': ['eos3b5e',
   'test_request']}}

The query is a simple inner join

In [66]:
query = f"""
select
    p.key,
    p.input,
    p.mw
from
    {model_id} p
    inner join requests r
        on p.input = r.smiles
where 
    r.model = '{model_id}'
    and r.request = '{request_id}';

"""

In [67]:
df_out_large = wr.athena.read_sql_query(query, database="precalcs_test")

In [68]:
df_out_large.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000 entries, 0 to 9999
Data columns (total 3 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   key     10000 non-null  string 
 1   input   10000 non-null  string 
 2   mw      10000 non-null  float32
dtypes: float32(1), string(2)
memory usage: 195.4 KB


Now we can scale out to 10k and beyond (should the need arise), and we have enhanced auditability within our data lake. This information may seem redundant for now, but storage is cheap, and with a smart partitioning scheme like we've implemented, the compute requirements for individual requests won't increase over time. Keeping all this information enables analytics in the future.