In [None]:
from   collections import defaultdict
import hashlib
import json
import re
import time

import pandas

from   datalabs.access.aws import AWSClient

# Extractor

In [None]:
sheet = pandas.read_excel("snomed_cpt_map_fsn_descr_20230131.xlsx")

In [None]:
sheet.columns.values

# Transformer

In [None]:
snomed = sheet.loc[:, ["Concept Id", "FSN", "Map Category", "CPT Code", "CPT Descriptor", "Map Id"]]

snomed.loc[:, "Concept Id"] = snomed["Concept Id"].fillna(method='ffill').astype('int').astype('str')
snomed.loc[:, "FSN"] = snomed["FSN"].fillna(method='ffill')
snomed.loc[:, "FSN"] = snomed["FSN"].astype('str')
snomed.loc[snomed["FSN"] == "nan", "FSN"] = ""

snomed.loc[:, "CPT Descriptor"] = snomed["CPT Descriptor"].astype('str')
snomed.loc[snomed["CPT Descriptor"] == "nan", "CPT Descriptor"] = ""

snomed

In [None]:
snomed.loc[:, "pk"] = "CONCEPT:" + snomed["Concept Id"]

snomed.loc[snomed["Map Category"] == "Unmappable", "sk"] = "UNMAPPABLE:" + snomed.loc[snomed["Map Category"] == "Unmappable", "Map Id"].astype('str')
snomed.loc[~(snomed["Map Category"] == "Unmappable"), "sk"] = "CPT:" + snomed.loc[~(snomed["Map Category"] == "CPT"), "CPT Code"].astype(str)
snomed = snomed.drop_duplicates(subset=("pk", "sk"))

mappings = snomed.loc[:, ["sk", "pk", "FSN", "Map Category", "CPT Descriptor"]].rename(
    columns={
        "FSN": "snomed_descriptor",
        "Map Category": "map_category",
        "CPT Descriptor": "cpt_descriptor",
    }
)

mappings

In [None]:
def generate_keywords(mappings):
    mappings["keywords"] = mappings.snomed_descriptor.apply(lambda x: re.sub(r'[^\w ]+', '', x)).str.lower().str.split()

    keyword_map = mappings.loc[:, ["pk", "sk", "keywords"]].explode("keywords").reset_index(drop=True).drop_duplicates().rename(columns=dict(keywords="keyword"))
    keyword_map = keyword_map.groupby(["keyword", "pk"]).sk.agg(lambda items: [x for x in items if x.startswith("CPT:")])

    mappings = mappings.drop(columns="keywords")

    # for (keyword, pk), mapping in keyword_groups:
    #     print(pk)
    #     print(keys.head(5), end="\n\n")
    
    return keyword_map

In [None]:
keyword_map = generate_keywords(mappings)
keyword_map

In [None]:
keyword_map[("zygomatic", "CONCEPT:257904000")]

In [None]:
# keyword_map[keyword_map.pk == "CONCEPT:609038002"]

In [None]:
items = []

for index in range(len(mappings)):
    row = mappings.iloc[index]

    items.append(row.to_dict())

# items += [dict(pk=f"{row.pk}:{row.sk}", sk=f"KEYWORD:{row.keyword}") for index, row in keyword_map.iterrows() if row.sk.startswith("CPT:")]

# items

for keys, codes in keyword_map.items():
    items.append(dict(pk=keys[1], sk=f"KEYWORD:{keys[0]}", codes=codes))

items

In [None]:
print(len(items))
items[-10:]

In [None]:
# mappings.loc[(mappings.pk == "CONCEPT:609038002")]
descriptors = mappings.loc[(mappings.pk == "CONCEPT:609038002") & (mappings.sk == "CPT:88160"), ("cpt_descriptor")]
list(descriptors)

# Paginate

In [None]:
def paginate(db, statement):
    results = db.execute_statement(Statement=statement, Limit=600000)

    for item in results["Items"]:
        yield item
    
    while "NextToken" in results:
        results = db.execute_statement(Statement=statement, Limit=600000, NextToken=results["NextToken"])
        
        for item in results["Items"]:
            yield item

# Loader

In [None]:
with open("snomed_cpt_mappings_v2.json", "w") as file:
    file.write(json.dumps(items))

In [None]:
with open("snomed_cpt_mappings_v2.json", "r") as file:
    items = json.loads(file.read())

In [None]:
# snomed_table = snomed
# snomed_table["md5"] = snomed[["pk", "sk", "snomed_descriptor", "map_category", "cpt_descriptor"]].apply(lambda row: hashlib.md5(str(row.values).encode('utf-8')).hexdigest(), axis=1)
# snomed_table

# for item in items:
#     item.pop("md5")
items

## Generate Hashes

In [None]:
def hash_item(item):
    item.pop("md5", None)

    if item["sk"].startswith("UNMAPPABLE:") or item["sk"].startswith("CPT:"):
        item["md5"] = hashlib.md5(json.dumps(item, sort_keys=True).encode('utf-8')).hexdigest()
    
    return item

In [None]:
# hashed_items = items

# for item in items:
#     if item["sk"].startswith("UNMAPPABLE:") or item["sk"].startswith("CPT:"):
#         md5 = hashlib.md5(json.dumps(item, sort_keys=True).encode('utf-8')).hexdigest()
#         hashed_items.append(dict(pk=f'{item["pk"]}:{item["sk"]}', sk=f"MD5:{md5}"))
hashed_items = [hash_item(item) for item in items if item["sk"][:4]]

hashed_items

In [None]:
print(len(hashed_items))
hashed_items[-120000:]

In [None]:
# Swap after reload
# with AWSClient("s3") as db:
#     table = db.resource.Table('CPT-API-snomed-sbx')
#     dir(table)
import boto3
table = boto3.resource("dynamodb").Table('CPT-API-snomed-sbx')

from bisect import bisect_left

In [None]:
json_items = sorted([json.dumps(item, sort_keys=True) for item in hashed_items])
unique_items = set(json.dumps(item, sort_keys=True) for item in hashed_items)

duplicate_json_items = []

for item in unique_items:
    index = bisect_left(json_items, item)
    if index < (len(json_items)-1) and json_items[index+1] == item:
        duplicate_json_items.append(item)

duplicate_json_items

## Load Sandbox

In [None]:
start_time = time.perf_counter()

with boto3.resource("dynamodb").Table('CPT-API-snomed-sbx-v2').batch_writer() as batch:
#     table = dynamodb.Table('CPT-API-snomed-sbx-v2')

#     with table.batch_writer() as batch:
    for item in hashed_items:
        batch.put_item(Item=item)

time.perf_counter() - start_time

## Load Dev

In [None]:
start_time = time.perf_counter()

with AWSClient("dynamodb").resource as dynamodb:
    table = dynamodb.Table('CPT-API-snomed-dev')

    with table.batch_writer() as batch:
        for item in hashed_items:
            batch.put_item(Item=item)

time.perf_counter() - start_time

## Get Hashes

In [None]:
start_time = time.perf_counter()

with AWSClient("dynamodb") as dynamodb:
    results = paginate(dynamodb, "SELECT pk, sk, md5 FROM \"CPT-API-snomed-sbx-v2\" WHERE begins_with(\"sk\", 'CPT:') OR begins_with(\"sk\", 'UNMAPPABLE:')")

    results = list(results)

print(time.perf_counter() - start_time)
results

In [None]:
current_hashes_columns = defaultdict(list)

for result in results:
    for key, value in result.items():
        current_hashes_columns[key].append(value["S"])

current_hashes = pandas.DataFrame(current_hashes_columns)
current_hashes

In [None]:
len(current_hashes_columns["md5"])

In [None]:
incoming_hashes = current_hashes.copy()

# Updated Records
incoming_hashes.iloc[2].md5 = "e17f3c5679f89a98bfb0d607e1f9d30f"
incoming_hashes.iloc[3].md5 = "8ba441daa6315eb9de240125e2333b9d"

# New Records
incoming_hashes = pandas.concat((
    incoming_hashes,
    pandas.DataFrame(
        dict(
            pk=['CONCEPT:123456789', 'CONCEPT:987654321'],
            sk=['CPT:12345', 'CPT:54321'],
            md5=["1234567890abcdefghijklmnopqrstuv", "abcdefghijklmnopqurstvwxyz123456"]
        )
    )
))


# Deleted Records
incoming_hashes = incoming_hashes[~incoming_hashes.md5.isin(("133a480aaf6b8d4de199db48813dc0e4", "4e4842764faf66388936416f61c9255b"))]

incoming_hashes

In [None]:
incoming_hashes[(incoming_hashes.pk == 'CONCEPT:80285004') & (incoming_hashes.sk == 'CPT:39540')]

In [None]:
current_hashes[current_hashes.md5.isin(("133a480aaf6b8d4de199db48813dc0e4", "4e4842764faf66388936416f61c9255b"))]

In [None]:
incoming_hashes[incoming_hashes.md5.isin(("133a480aaf6b8d4de199db48813dc0e4", "4e4842764faf66388936416f61c9255b"))]

In [None]:
start_time = time.perf_counter()

deleted_hashes = current_hashes[~current_hashes.md5.isin(incoming_hashes.md5) & ~(current_hashes.pk+current_hashes.sk).isin(incoming_hashes.pk+incoming_hashes.sk)]

updated_hashes = current_hashes[~current_hashes.md5.isin(incoming_hashes.md5) & (current_hashes.pk+current_hashes.sk).isin(incoming_hashes.pk+incoming_hashes.sk)]

new_hashes = incoming_hashes[~incoming_hashes.md5.isin(current_hashes.md5) & ~(incoming_hashes.pk+incoming_hashes.sk).isin(current_hashes.pk+current_hashes.sk)]

print(time.perf_counter() - start_time)

In [None]:
deleted_hashes

In [None]:
new_hashes

In [None]:
updated_hashes

## Remove Keywords for Updated or Deleted Hashes

__{'pk': 'CONCEPT:29137007',
  'sk': 'KEYWORD:procedure',
  'codes': ['CPT:67107', 'CPT:67108', 'CPT:67299']}__

In [None]:
keyword_map

In [None]:
keyword_delete_keys = pandas.concat((deleted_hashes, updated_hashes))[["pk", "sk"]]
keyword_delete_keys

In [None]:
for index, keys in keyword_delete_keys.iterrows():
    print(f"{keys.pk}, {keys.sk}")

In [None]:
def filter_updated

In [None]:
start_time = time.perf_counter()
deleted_keywords = []

with AWSClient("dynamodb") as dynamodb:
    for index, keys in keyword_delete_keys.iterrows():
        results = paginate(dynamodb, f"SELECT * FROM \"CPT-API-snomed-sbx-v2\" WHERE pk = '{keys.pk}' AND begins_with(\"sk\", 'KEYWORD:')")

        deleted_keywords.append(list(results))

print(time.perf_counter() - start_time)
deleted_keywords

## /snomed/map/cpt/{concept}

In [None]:
def generate_map(items):
    mappings = []

    for item in items:
        mapping = {key:value['S'] for key, value in item.items()}

        mapping.pop("pk")
        mapping["cpt_code"] = mapping.pop("sk").replace("CPT:", "")

        mappings.append(mapping)

    return mappings

In [None]:
concept = "306683007"

with AWSClient("dynamodb") as db:
    results = db.execute_statement(
        Statement=f"SELECT * FROM \"CPT-API-snomed-sbx\" WHERE pk = 'CONCEPT:{concept}' AND begins_with(\"sk\", 'CPT:')"
    )

print(generate_map(results["Items"]))


## /snomed/maps/cpt

In [None]:
def get_mapping_references_for_keyword(keyword, db):
    maps = None

    search_results = db.execute_statement(
        Statement=f"SELECT * FROM \"CPT-API-snomed-sbx\".\"SearchIndex\" WHERE sk = 'KEYWORD:{keyword}'"
    )

    return search_results["Items"]

In [None]:
def get_mappings_from_references(keyword_items, db):
    mappings = defaultdict(list)

    for search_item in keyword_items:
        mapping = get_mapping_from_reference(search_item['pk']['S'], db)

        mappings[mapping["pk"]["S"]].append(mapping)

    return mappings

In [None]:
# TODO: handle DynamoDB pagination

def get_all_mappings(db):
    mappings = defaultdict(list)

    results = db.execute_statement(
        Statement=f"SELECT * FROM \"CPT-API-snomed-sbx\" WHERE begins_with(\"sk\", 'CPT:')"
    )
    
    for item in results["Items"]:
        mappings[item["pk"]["S"]].append(item)

    return mappings

In [None]:
def generate_map(mapping_items):
    mappings = []
    concept = None

    for item in mapping_items:
        mapping = {key:value['S'] for key, value in item.items()}

        if not concept:
            concept = mapping["pk"]
            snomed_descriptor = mapping["snomed_descriptor"]

        mappings.append(
            dict(
                code=mapping["sk"].replace("CPT:", ""),
                descriptor=mapping["cpt_descriptor"],
                category=mapping["map_category"]
            )
        )

    return dict(
        concept=concept,
        descriptor=snomed_descriptor,
        mappings=mappings
    )

In [None]:
def get_mapping_from_reference(pksk, db):
    pk = pksk.rsplit(':', 2)[0]   # CONCEPT:...
    sk = pksk.split(':', 2)[2]    # CPT:...

    concept = pk.replace("CONCEPT:", "")

    results = db.execute_statement(
        Statement=f"SELECT * FROM \"CPT-API-snomed-sbx\" WHERE pk = '{pk}' AND sk = '{sk}'"
    )
    
    return results["Items"][0]

## Use Case 1: Get Mappings by Keyword

In [None]:
# TODO: handle response pagination

keyword = "discharge"

with AWSClient("dynamodb") as db:
    mapping_references = get_mapping_references_for_keyword(keyword, db)
    
    mapping_set = get_mappings_from_references(mapping_references, db)

maps = [generate_map(mappings) for mappings in mapping_set.values()]
maps

In [None]:
with AWSClient("dynamodb") as db:
    results = get_mapping_references_for_keyword("discharge", db)

results

## Use Case 2: Get All Mappings (Default)

In [None]:
# TODO: handle response pagination

with AWSClient("dynamodb") as db:
    mapping_set = get_all_mappings(db)

maps = [generate_map(mappings) for mappings in mapping_set.values()]
maps

## Pagination