Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions python/datastax-v3/elasticache/write-through-cache/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
dist/
build/
write-through-cache.egg-info
write-through-cache/__pycache__/
40 changes: 40 additions & 0 deletions python/datastax-v3/elasticache/write-through-cache/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
## Using Amazon Elasticache with Redis OSS compatibility as a cache for Amazon Keyspaces

This sample project shows the use of the Amazon Elasticache Redis as a cache for book awards data stored in Amazon Keyspaces. The sample uses DataStax Python Driver for Apache Cassandra to connect to Amazon Keyspaces, and Redis Client to connect to Amazon Elasticache Redis. SigV4 has been used for authentication using short-term credentials.

### Prerequisites
You should have Python and pip installed. This sample works with Python 3.x.

You should also setup Amazon Keyspaces with an IAM user. See [Accessing Amazon Keyspaces](https://docs.aws.amazon.com/keyspaces/latest/devguide/accessing.html) for more.

You need the Starfield digital certificate 'sf-class2-root.crt' downloaded locally or in your home directory - provide the certificate path on line 13 of write_through_caching_sample/ks_redis.py. The certificate can be found at write_through_caching_sample/resources.

You should have the connection string for your Amazon Elasticache Redis cluster which you will provide on line of write_through_caching_sample/ks_redis.py.

You should have the keyspace name and table name for your Amazon Keyspaces resource which you will provide on line 36 and line 37 of write_through_caching_sample/ks_redis.py respectively.

Sample data can be found at write_through_caching_sample/resources/ and can be loaded using [CQLSH](https://docs.aws.amazon.com/keyspaces/latest/devguide/bulk-upload.html) or [DSBulk](https://docs.aws.amazon.com/keyspaces/latest/devguide/dsbulk-upload.html).


### Running the sample

This sample uses Boto3 which will find credentials based on environment variables, config or credentials file on your client, see [Boto3 Credentials Guide](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) for how to set this up.

You can quickly get this to run by explicitly setting the following environment variables...

- AWS_REGION (for example 'us-east-1)
- AWS_ACCESS_KEY_ID (ex: 'AKIAIOSFODNN7EXAMPLE')
- AWS_SECRET_ACCESS_KEY (ex: 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY')

### Install the dependencies

From the project directory...
```
pip install -r requirements.txt
```

### Testing
From this project directory...
```
python3 write_through_caching_sample/test_functions.py
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
cassandra-driver
cassandra-sigv4
redis-py-cluster
boto3
15 changes: 15 additions & 0 deletions python/datastax-v3/elasticache/write-through-cache/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from setuptools import setup, find_packages

setup(
name='write_through_caching_sample',
version='1.0',
include_package_data=True,
packages=['write_through_cachine_sample'],
install_requires=[
'cassandra-driver',
'cassandra-sigv4',
'redis-py-cluster',
'boto3'
],

)
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from cassandra.cluster import *
from ssl import SSLContext, PROTOCOL_TLSv1_2 , CERT_REQUIRED
from cassandra.auth import PlainTextAuthProvider
from cassandra_sigv4.auth import SigV4AuthProvider
from cassandra.query import SimpleStatement
from rediscluster import RedisCluster
import logging
import time
import boto3

#Keyspaces connection
ssl_context = SSLContext(PROTOCOL_TLSv1_2)
ssl_context.load_verify_locations('/home/ec2-user/sf-class2-root.crt')
ssl_context.verify_mode = CERT_REQUIRED
boto_session = boto3.Session()
auth_provider = SigV4AuthProvider(boto_session)
cluster = Cluster(['cassandra.us-east-1.amazonaws.com'],
ssl_context=ssl_context,
auth_provider=auth_provider,
port=9142)
session = cluster.connect()


#Amazon Elasticache connection
logging.basicConfig(level=logging.ERROR)
redis = RedisCluster(startup_nodes=[{"host": "YOUR_CLUSTER.clustercfg.use1.cache.amazonaws.com",
"port": "6379"}],
decode_responses=True,
skip_full_coverage_check=True)

if redis.ping():
logging.info("Connected to Redis")


#Global variables
keyspace_name="catalog"
table_name="book_awards"

#Write a row
def write_award(book_award):
write_award_to_keyspaces(book_award)
write_award_to_cache(book_award)

#write row to the Keyspaces table
def write_award_to_keyspaces(book_award):
stmt=SimpleStatement(f"INSERT INTO {keyspace_name}.{table_name} (award, year, category, rank, author, book_title, publisher) VALUES (%s, %s, %s, %s, %s, %s, %s)",
consistency_level=ConsistencyLevel.LOCAL_QUORUM)
session.execute(stmt,(book_award["award"],
book_award["year"],
book_award["category"],
book_award["rank"],
book_award["author"],
book_award["book_title"],
book_award["publisher"]))

#write row to the cache
def write_award_to_cache(book_award):
#construct Redis key name
key_name=book_award["award"]+str(book_award["year"])+book_award["category"]+str(book_award["rank"])

#write to cache using Redis set, ex=300 sets TTL for this row to 300 seconds
redis.set(key_name, str(book_award), ex=300)


#Delete a row
def delete_award(award, year, category, rank):
delete_award_from_keyspaces(award, year, category, rank)
delete_award_from_cache(award, year, category, rank)

#delete row from Keyspaces table
def delete_award_from_keyspaces(award, year, category, rank):
stmt = SimpleStatement(f"DELETE FROM {keyspace_name}.{table_name} WHERE award=%s AND year=%s AND category=%s AND rank=%s;",
consistency_level=ConsistencyLevel.LOCAL_QUORUM)
session.execute(stmt, (award, int(year), category, int(rank)))

#delete row from cache
def delete_award_from_cache(award, year, category, rank):
#construct Redis key name
key_name=award+str(year)+category+str(rank)

#delete the row from cache if it exists
if redis.get(key_name) is not None:
book_award=redis.delete(key_name)

#Read a row
def get_award(award, year, category, rank):
#construct Redis key name from parameters
key_name=award+str(year)+category+str(rank)
book_award=redis.get(key_name)

#if row not in cache, fetch it from Keyspaces table
if not book_award:
print("Fetched from Cache: ", book_award)
stmt = SimpleStatement(f"SELECT * FROM {keyspace_name}.{table_name} WHERE award=%s AND year=%s AND category=%s AND rank=%s;")
res=session.execute(stmt, (award, int(year), category, int(rank)))
if not res.current_rows:
print("Fetched from Database: None")
return None
else:
#lazy-load into cache
book_award=redis.set(key_name, str(res.current_rows), ex=300)
print("Fetched from Database: ")
return res.current_rows
else:
print("Fetched from Cache: ")
return book_award


#Read one or more rows based on parameters
def get_awards(parameters):
#construct key name from parameters
key_name=""
for p in parameters:
key_name=key_name+str(p)
book_awards=redis.lrange(key_name, 0, -1)

#if result set not in cache, fetch it from Keyspaces table
if not book_awards:
print("Fetched from Cache: ", book_awards)
stmt = SimpleStatement(f"SELECT * FROM {keyspace_name}.{table_name} WHERE award=%s AND year=%s AND category=%s AND rank<=%s;")
res=session.execute(stmt, parameters)
if not res.current_rows:
print("Fetched from Database: None")
return None
else:
#lazy-load into cache
redis.rpush(key_name, str(res.current_rows))
redis.expire(key_name, 300)
print("Fetched from Database: ")
return res.current_rows
else:
print("Fetched from Cache: ")
return book_awards

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"award","year","book_title","author","rank","publisher","category"
"Must Read",2021,"The mystery of the 7th dimension","Polly Gon",1,"PublishGo","Sci-Fi"
"Must Read",2021,"Time travellers guide","Kai K",2,"Publish123","Sci-Fi"
"Must Read",2021,"Key to the teleporter","Mick Key",3,"Penguins","Sci-Fi"
"Must Read",2021,"Victors Mars","Anonymous",4,"PinkPublish","Sci-Fi"
"Must Read1",2021,"Tomorrow is here","John Doe",2,"Ostrich books1","Sci-Fi"
"Must Read1",2021,"Tomorrow is here","John Doe",3,"Ostrich books1","Sci-Fi"
"International Best Seller",2023,"Adventures of Ji","Ji Jill",1,"PublishGo","Young Adult"
"International Best Seller",2023,"The Chronicles of Myrtlini","Jane Doe",2,"Penguins","Young Adult"
"International Best Seller",2023,"Dreams","Gigi",3,"WellPublishers","Young Adult"
"International Best Seller",2023,"Vision of Utopia","Anonymous",4,"ABCPublish","Young Adult"
"International Best Seller",2023,"The sailor said..","Pyter P",5,"PublishGo","Young Adult"
"Must read",2056,"Postcard from Andromeda","James Doe",1,"Publish123","sci-fi"
"Must read",2056,"Postcard from Almeda","James Doug",2,"Publish123","sci-fi"
"Must read",2056,"Postcard from Mars","James Doug",5,"Publish123","sci-fi"
"Golden Read",2056,"Postcard from Andromeda","James Doe",1,"Publish123","sci-fi"
"Golden Read",2056,"Tomorrow is here","John Doe",2,"Ostrich books","sci-fi"
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
-----BEGIN CERTIFICATE-----
MIIEDzCCAvegAwIBAgIBADANBgkqhkiG9w0BAQUFADBoMQswCQYDVQQGEwJVUzEl
MCMGA1UEChMcU3RhcmZpZWxkIFRlY2hub2xvZ2llcywgSW5jLjEyMDAGA1UECxMp
U3RhcmZpZWxkIENsYXNzIDIgQ2VydGlmaWNhdGlvbiBBdXRob3JpdHkwHhcNMDQw
NjI5MTczOTE2WhcNMzQwNjI5MTczOTE2WjBoMQswCQYDVQQGEwJVUzElMCMGA1UE
ChMcU3RhcmZpZWxkIFRlY2hub2xvZ2llcywgSW5jLjEyMDAGA1UECxMpU3RhcmZp
ZWxkIENsYXNzIDIgQ2VydGlmaWNhdGlvbiBBdXRob3JpdHkwggEgMA0GCSqGSIb3
DQEBAQUAA4IBDQAwggEIAoIBAQC3Msj+6XGmBIWtDBFk385N78gDGIc/oav7PKaf
8MOh2tTYbitTkPskpD6E8J7oX+zlJ0T1KKY/e97gKvDIr1MvnsoFAZMej2YcOadN
+lq2cwQlZut3f+dZxkqZJRRU6ybH838Z1TBwj6+wRir/resp7defqgSHo9T5iaU0
X9tDkYI22WY8sbi5gv2cOj4QyDvvBmVmepsZGD3/cVE8MC5fvj13c7JdBmzDI1aa
K4UmkhynArPkPw2vCHmCuDY96pzTNbO8acr1zJ3o/WSNF4Azbl5KXZnJHoe0nRrA
1W4TNSNe35tfPe/W93bC6j67eA0cQmdrBNj41tpvi/JEoAGrAgEDo4HFMIHCMB0G
A1UdDgQWBBS/X7fRzt0fhvRbVazc1xDCDqmI5zCBkgYDVR0jBIGKMIGHgBS/X7fR
zt0fhvRbVazc1xDCDqmI56FspGowaDELMAkGA1UEBhMCVVMxJTAjBgNVBAoTHFN0
YXJmaWVsZCBUZWNobm9sb2dpZXMsIEluYy4xMjAwBgNVBAsTKVN0YXJmaWVsZCBD
bGFzcyAyIENlcnRpZmljYXRpb24gQXV0aG9yaXR5ggEAMAwGA1UdEwQFMAMBAf8w
DQYJKoZIhvcNAQEFBQADggEBAAWdP4id0ckaVaGsafPzWdqbAYcaT1epoXkJKtv3
L7IezMdeatiDh6GX70k1PncGQVhiv45YuApnP+yz3SFmH8lU+nLMPUxA2IGvd56D
eruix/U0F47ZEUD0/CwqTRV/p2JdLiXTAAsgGh1o+Re49L2L7ShZ3U0WixeDyLJl
xy16paq8U4Zt3VekyvggQQto8PT7dL5WXXp59fkdheMtlb71cZBDzI0fmgAKhynp
VSJYACPq4xJDKVtHCN2MQWplBqjlIapBtJUhlbl90TSrE9atvNziPTnNvT51cKEY
WQPJIrSPnNVeKtelttQKbfi3QBFGmh95DmK/D5fs4C8fF5Q=
-----END CERTIFICATE-----
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from ks_redis import *

def test_case_1():
book_award={"award": "Golden Read",
"year": 2021,
"category": "sci-fi",
"rank": 2,
"author": "John Doe",
"book_title": "Tomorrow is here",
"publisher": "Ostrich books"}

#insert an award to the DB and cache
write_award(book_award)
print("Test Case 1:")
print("New book award inserted.")

#cache hit - get award from cache
print("\n")
print("Verify cache hit:")
res=get_award(book_award["award"],
book_award["year"],
book_award["category"],
book_award["rank"])
print(res)

#let the cache entry expire
print("\n")
print("Waiting for cached entry to expire, sleeping for 300 seconds...")
time.sleep(300)

#cache miss - get award from DB and lazy load to cache
print("\n")
print("Entry expired in cache, award expected to be fetched from DB:")
res=get_award(book_award["award"],
book_award["year"],
book_award["category"],
book_award["rank"])
print(res)

#cache hit - get award from cache
print("\n")
print("Verify that award is lazy loaded into cache:")
res=get_award(book_award["award"],
book_award["year"],
book_award["category"],
book_award["rank"])
print(res)

#delete the award from cache and DB
print("\n")
print("Deleting book award.")
delete_award(book_award["award"],
book_award["year"],
book_award["category"],
book_award["rank"])

#confirm the award was deleted from cache and DB
print("\n")
print("Verify that the award was deleted from cache and DB:")
res=get_award(book_award["award"],
book_award["year"],
book_award["category"],
book_award["rank"])
if res:
print(res)


def test_case_2():
print("Test Case 2:")
print("Get top 3 Must Read book awards for year 2021 in the Sci-Fi category")
print("\n")
res=get_awards(["Must Read", 2021, "Sci-Fi", 3])
print(res)

#cache-hit - get awards from cache
print("\n")
print("Verify cache hit on subsequent query with same parameters:")
res=get_awards(["Must Read", 2021, "Sci-Fi", 3])
print(res)

#let the cache entry expire
print("\n")
print("Waiting for cached entry to expire, sleeping for 300 seconds...")
time.sleep(300)

#cache miss - get award from DB and lazy load to cache
print("\n")
print("Entry expired in cache, awards expected to be fetched from DB.")
res=get_awards(["Must Read", 2021, "Sci-Fi", 3])
print(res)

#cache hit - get award from cache
print("\n")
print("Verify that awards are lazy loaded into cache:")
res=get_awards(["Must Read", 2021, "Sci-Fi", 3])
print(res)

test_case_1()
print(" ")

test_case_2()