Permalink
Browse files

Add publish functionality to billing pipeline

This closes the end-to-end billing pipeline, allowing us to share generated detail reports with registrars via Drive and e-mail the invoicing team a link to the generated invoice.

This also factors out the email configs from ICANN reporting into the common 'misc' config, since we'll likely need alert e-mails for future periodic tasks.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=180805972
  • Loading branch information...
Exprosed authored and jianglai committed Jan 4, 2018
1 parent 27f12b9 commit ab5e16ab67e7f68b3ebafe85a3415a620ab2cc19
Showing with 719 additions and 93 deletions.
  1. +1 −0 java/google/registry/beam/BUILD
  2. +3 −1 java/google/registry/beam/BillingEvent.java
  3. +7 −1 java/google/registry/beam/InvoicingPipeline.java
  4. +1 −1 java/google/registry/beam/InvoicingUtils.java
  5. +4 −0 java/google/registry/billing/BUILD
  6. +85 −0 java/google/registry/billing/BillingEmailUtils.java
  7. +17 −2 java/google/registry/billing/BillingModule.java
  8. +129 −0 java/google/registry/billing/CopyDetailReportsAction.java
  9. +29 −20 java/google/registry/billing/GenerateInvoicesAction.java
  10. +35 −11 java/google/registry/billing/PublishInvoicesAction.java
  11. +35 −18 java/google/registry/config/RegistryConfig.java
  12. +8 −2 java/google/registry/config/RegistryConfigSettings.java
  13. +8 −5 java/google/registry/config/files/default-config.yaml
  14. +8 −0 java/google/registry/env/common/backend/WEB-INF/web.xml
  15. +8 −1 java/google/registry/export/PublishDetailReportAction.java
  16. +27 −0 java/google/registry/gcs/GcsUtils.java
  17. +2 −0 java/google/registry/module/backend/BackendRequestComponent.java
  18. +2 −2 java/google/registry/reporting/ReportingEmailUtils.java
  19. +1 −1 javatests/google/registry/beam/InvoicingUtilsTest.java
  20. +5 −0 javatests/google/registry/billing/BUILD
  21. +76 −0 javatests/google/registry/billing/BillingEmailUtilsTest.java
  22. +180 −0 javatests/google/registry/billing/CopyDetailReportsActionTest.java
  23. +17 −15 javatests/google/registry/billing/GenerateInvoicesActionTest.java
  24. +30 −13 javatests/google/registry/billing/PublishInvoicesActionTest.java
  25. +1 −0 javatests/google/registry/module/backend/testdata/backend_routing.txt
@@ -9,6 +9,7 @@ java_library(
srcs = glob(["*.java"]),
resources = glob(["sql/*"]),
deps = [
"//java/google/registry/billing",
"//java/google/registry/config",
"//java/google/registry/util",
"@com_google_apis_google_api_services_bigquery",
@@ -18,6 +18,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import google.registry.billing.BillingModule;
import google.registry.util.FormattingLogger;
import java.io.IOException;
import java.io.InputStream;
@@ -173,7 +174,8 @@ static String getHeader() {
* filepath with the arguments, such as "../sensitive_info".
*/
String toFilename(String yearMonth) {
return String.format("invoice_details_%s_%s_%s", yearMonth, registrarId(), tld());
return String.format(
"%s_%s_%s_%s", BillingModule.DETAIL_REPORT_PREFIX, yearMonth, registrarId(), tld());
}
/** Generates a CSV representation of this {@code BillingEvent}. */
@@ -16,6 +16,7 @@
import google.registry.beam.BillingEvent.InvoiceGroupingKey;
import google.registry.beam.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder;
import google.registry.billing.BillingModule;
import google.registry.config.RegistryConfig.Config;
import java.io.Serializable;
import javax.inject.Inject;
@@ -130,7 +131,11 @@ void applyTerminalTransforms(
.to(
NestedValueProvider.of(
yearMonthProvider,
yearMonth -> String.format("%s/results/CRR-INV-%s", beamBucket, yearMonth)))
// TODO(larryruili): Replace with billing bucket after verifying 2017-12 output.
yearMonth ->
String.format(
"%s/results/%s-%s",
beamBucket, BillingModule.OVERALL_INVOICE_PREFIX, yearMonth)))
.withHeader(InvoiceGroupingKey.invoiceHeader())
.withoutSharding()
.withSuffix(".csv");
@@ -140,6 +145,7 @@ void applyTerminalTransforms(
private TextIO.TypedWrite<BillingEvent, Params> writeDetailReports(
ValueProvider<String> yearMonthProvider) {
return TextIO.<BillingEvent>writeCustomType()
// TODO(larryruili): Replace with billing bucket/yyyy-MM after verifying 2017-12 output.
.to(
InvoicingUtils.makeDestinationFunction(beamBucket + "/results", yearMonthProvider),
InvoicingUtils.makeEmptyDestinationParams(beamBucket + "/results"))
@@ -68,7 +68,7 @@ static Params makeEmptyDestinationParams(String outputBucket) {
return new Params()
.withBaseFilename(
FileBasedSink.convertToFileResourceIfPossible(
String.format("%s/%s", outputBucket, "failed")));
String.format("%s/%s", outputBucket, "FAILURES")));
}
/**
@@ -12,12 +12,16 @@ java_library(
],
deps = [
"//java/google/registry/config",
"//java/google/registry/gcs",
"//java/google/registry/model",
"//java/google/registry/request",
"//java/google/registry/request/auth",
"//java/google/registry/storage/drive",
"//java/google/registry/util",
"@com_google_api_client_appengine",
"@com_google_apis_google_api_services_dataflow",
"@com_google_appengine_api_1_0_sdk",
"@com_google_appengine_tools_appengine_gcs_client",
"@com_google_dagger",
"@com_google_guava",
"@com_google_http_client",
@@ -0,0 +1,85 @@
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.billing;
import com.google.common.collect.ImmutableList;
import google.registry.config.RegistryConfig.Config;
import google.registry.util.FormattingLogger;
import google.registry.util.SendEmailService;
import javax.inject.Inject;
import javax.mail.Message;
import javax.mail.Message.RecipientType;
import javax.mail.MessagingException;
import javax.mail.internet.InternetAddress;
import org.joda.time.YearMonth;
/** Utility functions for sending emails involving monthly invoices. */
class BillingEmailUtils {
private final SendEmailService emailService;
private final YearMonth yearMonth;
private final String alertSenderAddress;
private final ImmutableList<String> invoiceEmailRecipients;
// TODO(larryruili): Replace this bucket after verifying 2017-12 output.
private final String beamBucketUrl;
@Inject
BillingEmailUtils(
SendEmailService emailService,
YearMonth yearMonth,
@Config("alertSenderEmailAddress") String alertSenderAddress,
@Config("invoiceEmailRecipients") ImmutableList<String> invoiceEmailRecipients,
@Config("apacheBeamBucketUrl") String beamBucketUrl) {
this.emailService = emailService;
this.yearMonth = yearMonth;
this.alertSenderAddress = alertSenderAddress;
this.invoiceEmailRecipients = invoiceEmailRecipients;
this.beamBucketUrl = beamBucketUrl;
}
private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass();
/**
* Sends a link to the generated overall invoice in GCS.
*
* <p>Note the users receiving the e-mail should have access to the object or bucket, via an
* authorization mechanism such as IAM.
*/
void emailInvoiceLink() {
// TODO(larryruili): Add read permissions for appropriate buckets.
try {
String beamBucket = beamBucketUrl.replaceFirst("gs://", "");
Message msg = emailService.createMessage();
msg.setFrom(new InternetAddress(alertSenderAddress));
for (String recipient : invoiceEmailRecipients) {
msg.addRecipient(RecipientType.TO, new InternetAddress(recipient));
}
msg.setSubject(String.format("Domain Registry invoice data %s", yearMonth.toString()));
msg.setText(
String.format(
"Link to invoice on GCS:\nhttps://storage.cloud.google.com/%s/%s",
beamBucket,
String.format(
"%s%s-%s.csv",
BillingModule.RESULTS_DIRECTORY_PREFIX,
BillingModule.OVERALL_INVOICE_PREFIX,
yearMonth.toString())));
emailService.sendMessage(msg);
} catch (MessagingException e) {
// TODO(larryruili): Replace with retrier with final failure email settings.
logger.warning(e, "E-mail service failed due to %s");
}
}
}
@@ -33,10 +33,18 @@
@Module
public final class BillingModule {
public static final String DETAIL_REPORT_PREFIX = "invoice_details";
public static final String OVERALL_INVOICE_PREFIX = "CRR-INV";
static final String PARAM_JOB_ID = "jobId";
static final String PARAM_DIRECTORY_PREFIX = "directoryPrefix";
static final String BILLING_QUEUE = "billing";
static final String CRON_QUEUE = "retryable-cron-tasks";
// TODO(larryruili): Replace with invoices/yyyy-MM after verifying 2017-12 invoice.
static final String RESULTS_DIRECTORY_PREFIX = "results/";
private static final String CLOUD_PLATFORM_SCOPE =
"https://www.googleapis.com/auth/cloud-platform";
static final String BILLING_QUEUE = "billing";
static final String PARAM_JOB_ID = "jobId";
/** Provides the invoicing Dataflow jobId enqueued by {@link GenerateInvoicesAction}. */
@Provides
@@ -45,6 +53,13 @@ static String provideJobId(HttpServletRequest req) {
return extractRequiredParameter(req, PARAM_JOB_ID);
}
/** Provides the subdirectory under a GCS bucket that we copy detail reports from. */
@Provides
@Parameter(PARAM_DIRECTORY_PREFIX)
static String provideDirectoryPrefix(HttpServletRequest req) {
return extractRequiredParameter(req, PARAM_DIRECTORY_PREFIX);
}
/** Constructs a {@link Dataflow} API client with default settings. */
@Provides
static Dataflow provideDataflow(
@@ -0,0 +1,129 @@
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.billing;
import static google.registry.request.Action.Method.POST;
import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
import static javax.servlet.http.HttpServletResponse.SC_OK;
import com.google.appengine.tools.cloudstorage.GcsFilename;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
import com.google.common.net.MediaType;
import google.registry.config.RegistryConfig.Config;
import google.registry.gcs.GcsUtils;
import google.registry.model.registrar.Registrar;
import google.registry.request.Action;
import google.registry.request.Parameter;
import google.registry.request.Response;
import google.registry.request.auth.Auth;
import google.registry.storage.drive.DriveConnection;
import google.registry.util.FormattingLogger;
import google.registry.util.Retrier;
import java.io.IOException;
import java.io.InputStream;
import java.util.Optional;
import javax.inject.Inject;
/** Copy all registrar detail reports in a given bucket's subdirectory from GCS to Drive. */
@Action(path = CopyDetailReportsAction.PATH, method = POST, auth = Auth.AUTH_INTERNAL_OR_ADMIN)
public final class CopyDetailReportsAction implements Runnable {
public static final String PATH = "/_dr/task/copyDetailReports";
private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass();
// TODO(larryruili): Replace this bucket with the billing bucket after verifying 2017-12 output.
private final String beamBucketUrl;
private final String folderPrefix;
private final DriveConnection driveConnection;
private final GcsUtils gcsUtils;
private final Retrier retrier;
private final Response response;
@Inject
CopyDetailReportsAction(
@Config("apacheBeamBucketUrl") String beamBucketUrl,
@Parameter(BillingModule.PARAM_DIRECTORY_PREFIX) String folderPrefix,
DriveConnection driveConnection,
GcsUtils gcsUtils,
Retrier retrier,
Response response) {
this.beamBucketUrl = beamBucketUrl;
this.folderPrefix = folderPrefix;
this.driveConnection = driveConnection;
this.gcsUtils = gcsUtils;
this.retrier = retrier;
this.response = response;
}
@Override
public void run() {
// Strip the URL prefix from the beam bucket
String beamBucket = beamBucketUrl.replace("gs://", "");
ImmutableList<String> detailReportObjectNames;
try {
detailReportObjectNames =
gcsUtils
.listFolderObjects(beamBucket, folderPrefix)
.stream()
.filter(objectName -> objectName.startsWith(BillingModule.DETAIL_REPORT_PREFIX))
.collect(ImmutableList.toImmutableList());
} catch (IOException e) {
logger.severefmt("Copy failed due to %s", e.getMessage());
response.setStatus(SC_INTERNAL_SERVER_ERROR);
response.setContentType(MediaType.PLAIN_TEXT_UTF_8);
response.setPayload(String.format("Failure, encountered %s", e.getMessage()));
return;
}
for (String detailReportName : detailReportObjectNames) {
// The standard report format is "invoice_details_yyyy-MM_registrarId_tld.csv
// TODO(larryruili): Determine a safer way of enforcing this.
String registrarId = detailReportName.split("_")[3];
Optional<Registrar> registrar = Registrar.loadByClientId(registrarId);
// TODO(larryruili): Send an email alert if any report fails to be copied for any reason.
if (!registrar.isPresent()) {
logger.warningfmt(
"Registrar %s not found in database for file %s", registrar, detailReportName);
continue;
}
String driveFolderId = registrar.get().getDriveFolderId();
if (driveFolderId == null) {
logger.warningfmt("Drive folder id not found for registrar %s", registrarId);
continue;
}
// Attempt to copy each detail report to its associated registrar's drive folder.
retrier.callWithRetry(
() -> {
try (InputStream input =
gcsUtils.openInputStream(
new GcsFilename(beamBucket, folderPrefix + detailReportName))) {
driveConnection.createFile(
detailReportName,
MediaType.CSV_UTF_8,
driveFolderId,
ByteStreams.toByteArray(input));
logger.infofmt(
"Published detail report for %s to folder %s using GCS file gs://%s/%s.",
registrarId, driveFolderId, beamBucket, detailReportName);
}
},
IOException.class);
}
response.setStatus(SC_OK);
response.setContentType(MediaType.PLAIN_TEXT_UTF_8);
response.setPayload("Copied detail reports.");
}
}
Oops, something went wrong.

0 comments on commit ab5e16a

Please sign in to comment.