From 99448825a4b42c66f2b8e98bd2b41e4a19001ede Mon Sep 17 00:00:00 2001 From: Takashi Matsuo Date: Fri, 17 Apr 2020 10:36:26 -0700 Subject: [PATCH] move stuff in order --- dlp/risk.py | 332 ++++++++++++++++++++++++++-------------------------- 1 file changed, 166 insertions(+), 166 deletions(-) diff --git a/dlp/risk.py b/dlp/risk.py index 386f05c0d73d..a31dfb12c6ef 100644 --- a/dlp/risk.py +++ b/dlp/risk.py @@ -56,31 +56,6 @@ def numerical_risk_analysis( # potentially long-running operations. import google.cloud.pubsub - def callback(message): - if message.attributes["DlpJobName"] == operation.name: - # This is the message we're looking for, so acknowledge it. - message.ack() - - # Now that the job is done, fetch the results and print them. - job = dlp.get_dlp_job(operation.name) - results = job.risk_details.numerical_stats_result - print( - "Value Range: [{}, {}]".format( - results.min_value.integer_value, - results.max_value.integer_value, - ) - ) - prev_value = None - for percent, result in enumerate(results.quantile_values): - value = result.integer_value - if prev_value != value: - print("Value at {}% quantile: {}".format(percent, value)) - prev_value = value - subscription.set_result(None) - else: - # This is not the message we're looking for. - message.drop() - # Instantiate a client. dlp = google.cloud.dlp_v2.DlpServiceClient() @@ -107,15 +82,40 @@ def callback(message): "actions": actions, } + # Call API to start risk analysis job + operation = dlp.create_dlp_job(parent, risk_job=risk_job) + + def callback(message): + if message.attributes["DlpJobName"] == operation.name: + # This is the message we're looking for, so acknowledge it. + message.ack() + + # Now that the job is done, fetch the results and print them. + job = dlp.get_dlp_job(operation.name) + results = job.risk_details.numerical_stats_result + print( + "Value Range: [{}, {}]".format( + results.min_value.integer_value, + results.max_value.integer_value, + ) + ) + prev_value = None + for percent, result in enumerate(results.quantile_values): + value = result.integer_value + if prev_value != value: + print("Value at {}% quantile: {}".format(percent, value)) + prev_value = value + subscription.set_result(None) + else: + # This is not the message we're looking for. + message.drop() + # Create a Pub/Sub client and find the subscription. The subscription is # expected to already be listening to the topic. subscriber = google.cloud.pubsub.SubscriberClient() subscription_path = subscriber.subscription_path(project, subscription_id) subscription = subscriber.subscribe(subscription_path, callback) - # Call API to start risk analysis job - operation = dlp.create_dlp_job(parent, risk_job=risk_job) - try: subscription.result(timeout=timeout) except TimeoutError: @@ -166,6 +166,35 @@ def categorical_risk_analysis( # potentially long-running operations. import google.cloud.pubsub + # Instantiate a client. + dlp = google.cloud.dlp_v2.DlpServiceClient() + + # Convert the project id into a full resource id. + parent = dlp.project_path(project) + + # Location info of the BigQuery table. + source_table = { + "project_id": table_project_id, + "dataset_id": dataset_id, + "table_id": table_id, + } + + # Tell the API where to send a notification when the job is complete. + actions = [{"pub_sub": {"topic": "{}/topics/{}".format(parent, topic_id)}}] + + # Configure risk analysis job + # Give the name of the numeric column to compute risk metrics for + risk_job = { + "privacy_metric": { + "categorical_stats_config": {"field": {"name": column_name}} + }, + "source_table": source_table, + "actions": actions, + } + + # Call API to start risk analysis job + operation = dlp.create_dlp_job(parent, risk_job=risk_job) + def callback(message): if message.attributes["DlpJobName"] == operation.name: # This is the message we're looking for, so acknowledge it. @@ -201,41 +230,12 @@ def callback(message): # This is not the message we're looking for. message.drop() - # Instantiate a client. - dlp = google.cloud.dlp_v2.DlpServiceClient() - - # Convert the project id into a full resource id. - parent = dlp.project_path(project) - - # Location info of the BigQuery table. - source_table = { - "project_id": table_project_id, - "dataset_id": dataset_id, - "table_id": table_id, - } - - # Tell the API where to send a notification when the job is complete. - actions = [{"pub_sub": {"topic": "{}/topics/{}".format(parent, topic_id)}}] - - # Configure risk analysis job - # Give the name of the numeric column to compute risk metrics for - risk_job = { - "privacy_metric": { - "categorical_stats_config": {"field": {"name": column_name}} - }, - "source_table": source_table, - "actions": actions, - } - # Create a Pub/Sub client and find the subscription. The subscription is # expected to already be listening to the topic. subscriber = google.cloud.pubsub.SubscriberClient() subscription_path = subscriber.subscription_path(project, subscription_id) subscription = subscriber.subscribe(subscription_path, callback) - # Call API to start risk analysis job - operation = dlp.create_dlp_job(parent, risk_job=risk_job) - try: subscription.result(timeout=timeout) except TimeoutError: @@ -290,6 +290,39 @@ def k_anonymity_analysis( def get_values(obj): return int(obj.integer_value) + # Instantiate a client. + dlp = google.cloud.dlp_v2.DlpServiceClient() + + # Convert the project id into a full resource id. + parent = dlp.project_path(project) + + # Location info of the BigQuery table. + source_table = { + "project_id": table_project_id, + "dataset_id": dataset_id, + "table_id": table_id, + } + + # Convert quasi id list to Protobuf type + def map_fields(field): + return {"name": field} + + quasi_ids = map(map_fields, quasi_ids) + + # Tell the API where to send a notification when the job is complete. + actions = [{"pub_sub": {"topic": "{}/topics/{}".format(parent, topic_id)}}] + + # Configure risk analysis job + # Give the name of the numeric column to compute risk metrics for + risk_job = { + "privacy_metric": {"k_anonymity_config": {"quasi_ids": quasi_ids}}, + "source_table": source_table, + "actions": actions, + } + + # Call API to start risk analysis job + operation = dlp.create_dlp_job(parent, risk_job=risk_job) + def callback(message): if message.attributes["DlpJobName"] == operation.name: # This is the message we're looking for, so acknowledge it. @@ -326,45 +359,12 @@ def callback(message): # This is not the message we're looking for. message.drop() - # Instantiate a client. - dlp = google.cloud.dlp_v2.DlpServiceClient() - - # Convert the project id into a full resource id. - parent = dlp.project_path(project) - - # Location info of the BigQuery table. - source_table = { - "project_id": table_project_id, - "dataset_id": dataset_id, - "table_id": table_id, - } - - # Convert quasi id list to Protobuf type - def map_fields(field): - return {"name": field} - - quasi_ids = map(map_fields, quasi_ids) - - # Tell the API where to send a notification when the job is complete. - actions = [{"pub_sub": {"topic": "{}/topics/{}".format(parent, topic_id)}}] - - # Configure risk analysis job - # Give the name of the numeric column to compute risk metrics for - risk_job = { - "privacy_metric": {"k_anonymity_config": {"quasi_ids": quasi_ids}}, - "source_table": source_table, - "actions": actions, - } - # Create a Pub/Sub client and find the subscription. The subscription is # expected to already be listening to the topic. subscriber = google.cloud.pubsub.SubscriberClient() subscription_path = subscriber.subscription_path(project, subscription_id) subscription = subscriber.subscribe(subscription_path, callback) - # Call API to start risk analysis job - operation = dlp.create_dlp_job(parent, risk_job=risk_job) - try: subscription.result(timeout=timeout) except TimeoutError: @@ -421,6 +421,44 @@ def l_diversity_analysis( def get_values(obj): return int(obj.integer_value) + # Instantiate a client. + dlp = google.cloud.dlp_v2.DlpServiceClient() + + # Convert the project id into a full resource id. + parent = dlp.project_path(project) + + # Location info of the BigQuery table. + source_table = { + "project_id": table_project_id, + "dataset_id": dataset_id, + "table_id": table_id, + } + + # Convert quasi id list to Protobuf type + def map_fields(field): + return {"name": field} + + quasi_ids = map(map_fields, quasi_ids) + + # Tell the API where to send a notification when the job is complete. + actions = [{"pub_sub": {"topic": "{}/topics/{}".format(parent, topic_id)}}] + + # Configure risk analysis job + # Give the name of the numeric column to compute risk metrics for + risk_job = { + "privacy_metric": { + "l_diversity_config": { + "quasi_ids": quasi_ids, + "sensitive_attribute": {"name": sensitive_attribute}, + } + }, + "source_table": source_table, + "actions": actions, + } + + # Call API to start risk analysis job + operation = dlp.create_dlp_job(parent, risk_job=risk_job) + def callback(message): if message.attributes["DlpJobName"] == operation.name: # This is the message we're looking for, so acknowledge it. @@ -464,50 +502,12 @@ def callback(message): # This is not the message we're looking for. message.drop() - # Instantiate a client. - dlp = google.cloud.dlp_v2.DlpServiceClient() - - # Convert the project id into a full resource id. - parent = dlp.project_path(project) - - # Location info of the BigQuery table. - source_table = { - "project_id": table_project_id, - "dataset_id": dataset_id, - "table_id": table_id, - } - - # Convert quasi id list to Protobuf type - def map_fields(field): - return {"name": field} - - quasi_ids = map(map_fields, quasi_ids) - - # Tell the API where to send a notification when the job is complete. - actions = [{"pub_sub": {"topic": "{}/topics/{}".format(parent, topic_id)}}] - - # Configure risk analysis job - # Give the name of the numeric column to compute risk metrics for - risk_job = { - "privacy_metric": { - "l_diversity_config": { - "quasi_ids": quasi_ids, - "sensitive_attribute": {"name": sensitive_attribute}, - } - }, - "source_table": source_table, - "actions": actions, - } - # Create a Pub/Sub client and find the subscription. The subscription is # expected to already be listening to the topic. subscriber = google.cloud.pubsub.SubscriberClient() subscription_path = subscriber.subscription_path(project, subscription_id) subscription = subscriber.subscribe(subscription_path, callback) - # Call API to start risk analysis job - operation = dlp.create_dlp_job(parent, risk_job=risk_job) - try: subscription.result(timeout=timeout) except TimeoutError: @@ -571,41 +571,6 @@ def k_map_estimate_analysis( def get_values(obj): return int(obj.integer_value) - def callback(message): - if message.attributes["DlpJobName"] == operation.name: - # This is the message we're looking for, so acknowledge it. - message.ack() - - # Now that the job is done, fetch the results and print them. - job = dlp.get_dlp_job(operation.name) - histogram_buckets = ( - job.risk_details.k_map_estimation_result.k_map_estimation_histogram - ) - # Print bucket stats - for i, bucket in enumerate(histogram_buckets): - print("Bucket {}:".format(i)) - print( - " Anonymity range: [{}, {}]".format( - bucket.min_anonymity, bucket.max_anonymity - ) - ) - print(" Size: {}".format(bucket.bucket_size)) - for value_bucket in bucket.bucket_values: - print( - " Values: {}".format( - map(get_values, value_bucket.quasi_ids_values) - ) - ) - print( - " Estimated k-map anonymity: {}".format( - value_bucket.estimated_anonymity - ) - ) - subscription.set_result(None) - else: - # This is not the message we're looking for. - message.drop() - # Instantiate a client. dlp = google.cloud.dlp_v2.DlpServiceClient() @@ -648,15 +613,50 @@ def map_fields(quasi_id, info_type): "actions": actions, } + # Call API to start risk analysis job + operation = dlp.create_dlp_job(parent, risk_job=risk_job) + + def callback(message): + if message.attributes["DlpJobName"] == operation.name: + # This is the message we're looking for, so acknowledge it. + message.ack() + + # Now that the job is done, fetch the results and print them. + job = dlp.get_dlp_job(operation.name) + histogram_buckets = ( + job.risk_details.k_map_estimation_result.k_map_estimation_histogram + ) + # Print bucket stats + for i, bucket in enumerate(histogram_buckets): + print("Bucket {}:".format(i)) + print( + " Anonymity range: [{}, {}]".format( + bucket.min_anonymity, bucket.max_anonymity + ) + ) + print(" Size: {}".format(bucket.bucket_size)) + for value_bucket in bucket.bucket_values: + print( + " Values: {}".format( + map(get_values, value_bucket.quasi_ids_values) + ) + ) + print( + " Estimated k-map anonymity: {}".format( + value_bucket.estimated_anonymity + ) + ) + subscription.set_result(None) + else: + # This is not the message we're looking for. + message.drop() + # Create a Pub/Sub client and find the subscription. The subscription is # expected to already be listening to the topic. subscriber = google.cloud.pubsub.SubscriberClient() subscription_path = subscriber.subscription_path(project, subscription_id) subscription = subscriber.subscribe(subscription_path, callback) - # Call API to start risk analysis job - operation = dlp.create_dlp_job(parent, risk_job=risk_job) - try: subscription.result(timeout=timeout) except TimeoutError: