Skip to content

Commit

Permalink
used signal_type index and generalize hashstore (#398)
Browse files Browse the repository at this point in the history
* used signal_type index and generalize hashstore

* remove store and move it to base class of dto objects
  • Loading branch information
BarrettOlson committed Mar 4, 2021
1 parent 749cacd commit 0d0154a
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 79 deletions.
2 changes: 1 addition & 1 deletion hasher-matcher-actioner/Makefile
Expand Up @@ -23,7 +23,7 @@ upload_docker:
DOCKER_TAG=${DOCKER_TAG} ./scripts/update_lambda_docker_image.sh

terraform/terraform.tfvars:
echo 'hma_lambda_docker_uri = "${DOCKER_URI}"' > terraform/terraform.tfvars
echo 'hma_lambda_docker_uri = "${DOCKER_URI}:${DOCKER_TAG}"' > terraform/terraform.tfvars

terraform/backend.tf:
./scripts/write_backend_config.sh > terraform/backend.tf
Expand Down
91 changes: 80 additions & 11 deletions hasher-matcher-actioner/hmalib/dto.py
@@ -1,39 +1,108 @@
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved

import datetime
import typing as t
from dataclasses import dataclass
from mypy_boto3_dynamodb.service_resource import Table

"""
Not enforceable because named tuples can't have multiple inheritance, but all
DTO classes in this module should implement methods `to_dynamodb_item(self)` and
Data transfer object classes to be used with dynamodbstore
Classes in this module should implement methods `to_dynamodb_item(self)` and
`to_sqs_message(self)`
"""

class PDQHashRecord(t.NamedTuple):
class DynamoDBItem():

def write_to_table(self, table: Table):
table.put_item(Item=self.to_dynamodb_item())

def to_dynamo_db_item(self) -> t.Dict:
raise NotImplementedError


@dataclass
class PDQRecordBase(DynamoDBItem):
"""
Successful execution at the hasher produces this record.
Abstract Base Record for PDQ releated items.
"""

SIGNAL_TYPE = "pdq"

content_key: str
content_hash: str
quality: int
timestamp: datetime.datetime # ISO-8601 formatted

@staticmethod
def get_dynamodb_pk(key: str):
def get_dynamodb_content_key(key: str):
return f"c#{key}"

@staticmethod
def get_dynamodb_type_key(key: str):
return f"type#{key}"

@staticmethod
def get_dynamodb_type_key(key: str):
return f"type#{key}"

def to_dynamodb_item(self) -> dict:
raise NotImplementedError

def to_sqs_message(self) -> dict:
raise NotImplementedError


@dataclass
class PipelinePDQHashRecord(PDQRecordBase):
"""
Successful execution at the hasher produces this record.
"""

quality: int

def to_dynamodb_item(self) -> dict:
return {
"PK": PDQHashRecord.get_dynamodb_pk(self.content_key),
"SK": "type#pdq",
"PK": self.get_dynamodb_content_key(self.content_key),
"SK": self.get_dynamodb_type_key(self.SIGNAL_TYPE),
"ContentHash": self.content_hash,
"Quality": self.quality,
"Timestamp": self.timestamp.isoformat(),
"HashType": "pdq",
"HashType": self.SIGNAL_TYPE,
}

def to_sqs_message(self) -> dict:
return {
"hash": self.content_hash,
"type": "pdq",
"key": self.content_key
"type": self.SIGNAL_TYPE,
"key": self.content_key,
}


@dataclass
class PDQMatchRecord(PDQRecordBase):
"""
Successful execution at the matcher produces this record.
"""

te_id: int
te_hash: str

@staticmethod
def get_dynamodb_te_key(key: str):
return f"te#{key}"

def to_dynamodb_item(self) -> dict:
return {
"PK": self.get_dynamodb_content_key(self.content_key),
"SK": self.get_dynamodb_te_key(self.te_id),
"ContentHash": self.content_hash,
"Timestamp": self.timestamp.isoformat(),
"TEHash": self.te_hash,
"GSI1-PK": self.get_dynamodb_te_key(self.te_id),
"GSI1-SK": self.get_dynamodb_content_key(self.content_key),
"HashType": self.SIGNAL_TYPE,
"GSI2-PK": self.get_dynamodb_type_key(self.SIGNAL_TYPE),
}

def to_sqs_message(self) -> dict:
# TODO add method for when matches are added to a sqs
raise NotImplementedError
12 changes: 5 additions & 7 deletions hasher-matcher-actioner/hmalib/lambdas/pdq/pdq_hasher.py
Expand Up @@ -12,9 +12,8 @@
from mypy_boto3_dynamodb import DynamoDBServiceResource
from threatexchange.hashing import pdq_hasher

from hmalib.dto import PDQHashRecord
from hmalib.storage.hashstore import HashStore
from hmalib import metrics
from hmalib.dto import PipelinePDQHashRecord

logger = logging.getLogger()
logger.setLevel(logging.INFO)
Expand Down Expand Up @@ -46,7 +45,6 @@ def lambda_handler(event, context):
"""

records_table = dynamodb.Table(DYNAMODB_TABLE)
store = HashStore(records_table)

for sqs_record in event["Records"]:
sns_notification = json.loads(sqs_record["body"])
Expand All @@ -68,18 +66,18 @@ def lambda_handler(event, context):
logger.info("generating pdq hash for %s/%s", bucket_name, key)
with tempfile.NamedTemporaryFile() as tmp_file:
path = Path(tmp_file.name)

with metrics.timer(metrics.names.pdq_hasher_lambda.download_file):
s3_client.download_fileobj(bucket_name, key, tmp_file)

with metrics.timer(metrics.names.pdq_hasher_lambda.hash):
pdq_hash, quality = pdq_hasher.pdq_from_file(path)

hash_record = PDQHashRecord(
key, pdq_hash, quality, datetime.datetime.now()
hash_record = PipelinePDQHashRecord(
key, pdq_hash, datetime.datetime.now(), quality
)

# Add to dynamodb hash store
store.add_hash(hash_record)
hash_record.write_to_table(records_table)

# Publish to SQS queue
sqs_client.send_message(
Expand Down
20 changes: 15 additions & 5 deletions hasher-matcher-actioner/hmalib/lambdas/pdq/pdq_indexer.py
Expand Up @@ -9,7 +9,7 @@
from urllib.parse import unquote_plus

import boto3
from threatexchange.hashing import PDQMultiHashIndex
from threatexchange.signal_type.pdq_index import PDQIndex

from hmalib import metrics

Expand Down Expand Up @@ -87,13 +87,23 @@ def lambda_handler(event, context):
codecs.getreader("utf-8")(pdq_data_file["Body"]),
fieldnames=PDQ_DATA_FILE_COLUMNS,
)
pdq_data = [(row["hash"], int(row["id"])) for row in pdq_data_reader]
pdq_data = [
(
row["hash"],
# Also add hash to metadata for easy look up on match
{
"id": int(row["id"]),
"hash": row["hash"],
},
)
for row in pdq_data_reader
]


with metrics.timer(metrics.names.pdq_indexer_lambda.build_index):
logger.info("Creating PDQ Hash Index")
hashes = [pdq[0] for pdq in pdq_data]
ids = [pdq[1] for pdq in pdq_data]
index = PDQMultiHashIndex.create(hashes, custom_ids=ids)

index = PDQIndex.build(pdq_data)

logger.info("Putting index in S3")
index_bytes = pickle.dumps(index)
Expand Down
56 changes: 20 additions & 36 deletions hasher-matcher-actioner/hmalib/lambdas/pdq/pdq_matcher.py
Expand Up @@ -7,7 +7,9 @@
import boto3
import datetime

from threatexchange.hashing.pdq_faiss_matcher import PDQFlatHashIndex, PDQMultiHashIndex
from threatexchange.signal_type.pdq_index import PDQIndex

from hmalib.dto import PDQMatchRecord

from hmalib import metrics

Expand All @@ -28,23 +30,6 @@
DYNAMODB_TABLE = os.environ["DYNAMODB_TABLE"]


def save_match_to_datastore(
table, content_key, te_id, content_hash, current_datetime, te_hash
):
item = {
"PK": "c#{}".format(content_key),
"SK": "te#{}".format(te_id),
"ContentHash": content_hash,
"Timestamp": current_datetime.isoformat(),
"TEHash": te_hash,
"GSI1-PK": "te#{}".format(te_id),
"GSI1-SK": "c#{}".format(content_key),
"HashType": "pdq",
"GSI2-PK": "type#pdq",
}
table.put_item(Item=item)


def get_index(bucket_name, key):
"""
Load the given index from the s3 bucket and deserialize it
Expand All @@ -61,9 +46,9 @@ def get_index(bucket_name, key):


def lambda_handler(event, context):
table = dynamodb.Table(DYNAMODB_TABLE)
records_table = dynamodb.Table(DYNAMODB_TABLE)

hash_index: PDQMultiHashIndex = get_index(INDEXES_BUCKET_NAME, PDQ_INDEX_KEY)
hash_index: PDQIndex = get_index(INDEXES_BUCKET_NAME, PDQ_INDEX_KEY)
logger.info("loaded_hash_index")

for sqs_record in event["Records"]:
Expand All @@ -74,30 +59,29 @@ def lambda_handler(event, context):

hash_str = message["hash"]
key = message["key"]
query = [hash_str]
current_datetime = datetime.datetime.now()

with metrics.timer(metrics.names.pdq_matcher_lambda.search_index):
results = hash_index.search(query, THRESHOLD, return_as_ids=True)
results = hash_index.query(hash_str)

# Only checking one hash at a time for now
result = results[0]
if len(result) > 0:
message_str = "Matches found for key: {} hash: {}, for IDs: {}".format(
key, hash_str, result
)
logger.info(message_str)
for te_id in result:
te_hash = hash_index.hash_at(te_id)
save_match_to_datastore(
table, key, te_id, hash_str, current_datetime, te_hash
)
if results:
match_ids = []
for match in results:
metadata = match.metadata
logger.info("Match found for key: %s, hash %s -> %s", key, hash_str, metadata)
te_id = metadata["id"]

PDQMatchRecord(
key, hash_str, current_datetime, te_id, metadata["hash"]
).write_to_table(records_table)

match_ids.append(te_id)
sns_client.publish(
TopicArn=OUTPUT_TOPIC_ARN,
Subject="Match found in pdq_matcher lambda",
Message=message_str,
Message=f"Match found for key: {key}, hash: {hash_str}, for IDs: {match_ids}",
)
else:
logger.info("No matches found for key: {} hash: {}".format(key, hash_str))
logger.info(f"No matches found for key: {key} hash: {hash_str}")

metrics.flush()
Empty file.
19 changes: 0 additions & 19 deletions hasher-matcher-actioner/hmalib/storage/hashstore.py

This file was deleted.

0 comments on commit 0d0154a

Please sign in to comment.