diff --git a/hasher-matcher-actioner/Makefile b/hasher-matcher-actioner/Makefile index d0aec025f..ca148ea92 100644 --- a/hasher-matcher-actioner/Makefile +++ b/hasher-matcher-actioner/Makefile @@ -23,7 +23,7 @@ upload_docker: DOCKER_TAG=${DOCKER_TAG} ./scripts/update_lambda_docker_image.sh terraform/terraform.tfvars: - echo 'hma_lambda_docker_uri = "${DOCKER_URI}"' > terraform/terraform.tfvars + echo 'hma_lambda_docker_uri = "${DOCKER_URI}:${DOCKER_TAG}"' > terraform/terraform.tfvars terraform/backend.tf: ./scripts/write_backend_config.sh > terraform/backend.tf diff --git a/hasher-matcher-actioner/hmalib/dto.py b/hasher-matcher-actioner/hmalib/dto.py index 1e8b6c1f2..bf57df576 100644 --- a/hasher-matcher-actioner/hmalib/dto.py +++ b/hasher-matcher-actioner/hmalib/dto.py @@ -1,39 +1,108 @@ +# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved + import datetime import typing as t +from dataclasses import dataclass +from mypy_boto3_dynamodb.service_resource import Table """ -Not enforceable because named tuples can't have multiple inheritance, but all -DTO classes in this module should implement methods `to_dynamodb_item(self)` and +Data transfer object classes to be used with dynamodbstore +Classes in this module should implement methods `to_dynamodb_item(self)` and `to_sqs_message(self)` """ -class PDQHashRecord(t.NamedTuple): +class DynamoDBItem(): + + def write_to_table(self, table: Table): + table.put_item(Item=self.to_dynamodb_item()) + + def to_dynamo_db_item(self) -> t.Dict: + raise NotImplementedError + + +@dataclass +class PDQRecordBase(DynamoDBItem): """ - Successful execution at the hasher produces this record. + Abstract Base Record for PDQ releated items. """ + SIGNAL_TYPE = "pdq" + content_key: str content_hash: str - quality: int timestamp: datetime.datetime # ISO-8601 formatted @staticmethod - def get_dynamodb_pk(key: str): + def get_dynamodb_content_key(key: str): return f"c#{key}" + @staticmethod + def get_dynamodb_type_key(key: str): + return f"type#{key}" + + @staticmethod + def get_dynamodb_type_key(key: str): + return f"type#{key}" + + def to_dynamodb_item(self) -> dict: + raise NotImplementedError + + def to_sqs_message(self) -> dict: + raise NotImplementedError + + +@dataclass +class PipelinePDQHashRecord(PDQRecordBase): + """ + Successful execution at the hasher produces this record. + """ + + quality: int + def to_dynamodb_item(self) -> dict: return { - "PK": PDQHashRecord.get_dynamodb_pk(self.content_key), - "SK": "type#pdq", + "PK": self.get_dynamodb_content_key(self.content_key), + "SK": self.get_dynamodb_type_key(self.SIGNAL_TYPE), "ContentHash": self.content_hash, "Quality": self.quality, "Timestamp": self.timestamp.isoformat(), - "HashType": "pdq", + "HashType": self.SIGNAL_TYPE, } def to_sqs_message(self) -> dict: return { "hash": self.content_hash, - "type": "pdq", - "key": self.content_key + "type": self.SIGNAL_TYPE, + "key": self.content_key, } + + +@dataclass +class PDQMatchRecord(PDQRecordBase): + """ + Successful execution at the matcher produces this record. + """ + + te_id: int + te_hash: str + + @staticmethod + def get_dynamodb_te_key(key: str): + return f"te#{key}" + + def to_dynamodb_item(self) -> dict: + return { + "PK": self.get_dynamodb_content_key(self.content_key), + "SK": self.get_dynamodb_te_key(self.te_id), + "ContentHash": self.content_hash, + "Timestamp": self.timestamp.isoformat(), + "TEHash": self.te_hash, + "GSI1-PK": self.get_dynamodb_te_key(self.te_id), + "GSI1-SK": self.get_dynamodb_content_key(self.content_key), + "HashType": self.SIGNAL_TYPE, + "GSI2-PK": self.get_dynamodb_type_key(self.SIGNAL_TYPE), + } + + def to_sqs_message(self) -> dict: + # TODO add method for when matches are added to a sqs + raise NotImplementedError diff --git a/hasher-matcher-actioner/hmalib/lambdas/pdq/pdq_hasher.py b/hasher-matcher-actioner/hmalib/lambdas/pdq/pdq_hasher.py index df4c3f543..dbdd728f4 100644 --- a/hasher-matcher-actioner/hmalib/lambdas/pdq/pdq_hasher.py +++ b/hasher-matcher-actioner/hmalib/lambdas/pdq/pdq_hasher.py @@ -12,9 +12,8 @@ from mypy_boto3_dynamodb import DynamoDBServiceResource from threatexchange.hashing import pdq_hasher -from hmalib.dto import PDQHashRecord -from hmalib.storage.hashstore import HashStore from hmalib import metrics +from hmalib.dto import PipelinePDQHashRecord logger = logging.getLogger() logger.setLevel(logging.INFO) @@ -46,7 +45,6 @@ def lambda_handler(event, context): """ records_table = dynamodb.Table(DYNAMODB_TABLE) - store = HashStore(records_table) for sqs_record in event["Records"]: sns_notification = json.loads(sqs_record["body"]) @@ -68,18 +66,18 @@ def lambda_handler(event, context): logger.info("generating pdq hash for %s/%s", bucket_name, key) with tempfile.NamedTemporaryFile() as tmp_file: path = Path(tmp_file.name) + with metrics.timer(metrics.names.pdq_hasher_lambda.download_file): s3_client.download_fileobj(bucket_name, key, tmp_file) with metrics.timer(metrics.names.pdq_hasher_lambda.hash): pdq_hash, quality = pdq_hasher.pdq_from_file(path) - hash_record = PDQHashRecord( - key, pdq_hash, quality, datetime.datetime.now() + hash_record = PipelinePDQHashRecord( + key, pdq_hash, datetime.datetime.now(), quality ) - # Add to dynamodb hash store - store.add_hash(hash_record) + hash_record.write_to_table(records_table) # Publish to SQS queue sqs_client.send_message( diff --git a/hasher-matcher-actioner/hmalib/lambdas/pdq/pdq_indexer.py b/hasher-matcher-actioner/hmalib/lambdas/pdq/pdq_indexer.py index e84e66e2e..5a3f811c6 100644 --- a/hasher-matcher-actioner/hmalib/lambdas/pdq/pdq_indexer.py +++ b/hasher-matcher-actioner/hmalib/lambdas/pdq/pdq_indexer.py @@ -9,7 +9,7 @@ from urllib.parse import unquote_plus import boto3 -from threatexchange.hashing import PDQMultiHashIndex +from threatexchange.signal_type.pdq_index import PDQIndex from hmalib import metrics @@ -87,13 +87,23 @@ def lambda_handler(event, context): codecs.getreader("utf-8")(pdq_data_file["Body"]), fieldnames=PDQ_DATA_FILE_COLUMNS, ) - pdq_data = [(row["hash"], int(row["id"])) for row in pdq_data_reader] + pdq_data = [ + ( + row["hash"], + # Also add hash to metadata for easy look up on match + { + "id": int(row["id"]), + "hash": row["hash"], + }, + ) + for row in pdq_data_reader + ] + with metrics.timer(metrics.names.pdq_indexer_lambda.build_index): logger.info("Creating PDQ Hash Index") - hashes = [pdq[0] for pdq in pdq_data] - ids = [pdq[1] for pdq in pdq_data] - index = PDQMultiHashIndex.create(hashes, custom_ids=ids) + + index = PDQIndex.build(pdq_data) logger.info("Putting index in S3") index_bytes = pickle.dumps(index) diff --git a/hasher-matcher-actioner/hmalib/lambdas/pdq/pdq_matcher.py b/hasher-matcher-actioner/hmalib/lambdas/pdq/pdq_matcher.py index 7620ac8fa..f22440547 100644 --- a/hasher-matcher-actioner/hmalib/lambdas/pdq/pdq_matcher.py +++ b/hasher-matcher-actioner/hmalib/lambdas/pdq/pdq_matcher.py @@ -7,7 +7,9 @@ import boto3 import datetime -from threatexchange.hashing.pdq_faiss_matcher import PDQFlatHashIndex, PDQMultiHashIndex +from threatexchange.signal_type.pdq_index import PDQIndex + +from hmalib.dto import PDQMatchRecord from hmalib import metrics @@ -28,23 +30,6 @@ DYNAMODB_TABLE = os.environ["DYNAMODB_TABLE"] -def save_match_to_datastore( - table, content_key, te_id, content_hash, current_datetime, te_hash -): - item = { - "PK": "c#{}".format(content_key), - "SK": "te#{}".format(te_id), - "ContentHash": content_hash, - "Timestamp": current_datetime.isoformat(), - "TEHash": te_hash, - "GSI1-PK": "te#{}".format(te_id), - "GSI1-SK": "c#{}".format(content_key), - "HashType": "pdq", - "GSI2-PK": "type#pdq", - } - table.put_item(Item=item) - - def get_index(bucket_name, key): """ Load the given index from the s3 bucket and deserialize it @@ -61,9 +46,9 @@ def get_index(bucket_name, key): def lambda_handler(event, context): - table = dynamodb.Table(DYNAMODB_TABLE) + records_table = dynamodb.Table(DYNAMODB_TABLE) - hash_index: PDQMultiHashIndex = get_index(INDEXES_BUCKET_NAME, PDQ_INDEX_KEY) + hash_index: PDQIndex = get_index(INDEXES_BUCKET_NAME, PDQ_INDEX_KEY) logger.info("loaded_hash_index") for sqs_record in event["Records"]: @@ -74,30 +59,29 @@ def lambda_handler(event, context): hash_str = message["hash"] key = message["key"] - query = [hash_str] current_datetime = datetime.datetime.now() with metrics.timer(metrics.names.pdq_matcher_lambda.search_index): - results = hash_index.search(query, THRESHOLD, return_as_ids=True) + results = hash_index.query(hash_str) - # Only checking one hash at a time for now - result = results[0] - if len(result) > 0: - message_str = "Matches found for key: {} hash: {}, for IDs: {}".format( - key, hash_str, result - ) - logger.info(message_str) - for te_id in result: - te_hash = hash_index.hash_at(te_id) - save_match_to_datastore( - table, key, te_id, hash_str, current_datetime, te_hash - ) + if results: + match_ids = [] + for match in results: + metadata = match.metadata + logger.info("Match found for key: %s, hash %s -> %s", key, hash_str, metadata) + te_id = metadata["id"] + + PDQMatchRecord( + key, hash_str, current_datetime, te_id, metadata["hash"] + ).write_to_table(records_table) + + match_ids.append(te_id) sns_client.publish( TopicArn=OUTPUT_TOPIC_ARN, Subject="Match found in pdq_matcher lambda", - Message=message_str, + Message=f"Match found for key: {key}, hash: {hash_str}, for IDs: {match_ids}", ) else: - logger.info("No matches found for key: {} hash: {}".format(key, hash_str)) + logger.info(f"No matches found for key: {key} hash: {hash_str}") metrics.flush() diff --git a/hasher-matcher-actioner/hmalib/storage/__init__.py b/hasher-matcher-actioner/hmalib/storage/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/hasher-matcher-actioner/hmalib/storage/hashstore.py b/hasher-matcher-actioner/hmalib/storage/hashstore.py deleted file mode 100644 index 619d343ff..000000000 --- a/hasher-matcher-actioner/hmalib/storage/hashstore.py +++ /dev/null @@ -1,19 +0,0 @@ -from mypy_boto3_dynamodb.service_resource import Table - -class _IDynamoDBItem(): - """ - Clowny type as an interfaces. True interfaces in python are nightmares. - """ - def to_dynamodb_item(self) -> dict: - pass - -class HashStore: - """ - Stores all kinds of hashes into a dynamodb. - """ - - def __init__(self, table: Table): - self._table = table - - def add_hash(self, record: _IDynamoDBItem): - self._table.put_item(Item=record.to_dynamodb_item())