Skip to content

Commit

Permalink
Merge pull request #8009 from MicrosoftDocs/EdPrice-MSFT-patch-773
Browse files Browse the repository at this point in the history
Update stream-processing-databricks-content.md
  • Loading branch information
prmerger-automator[bot] committed Sep 12, 2022
2 parents e7367ba + 660799c commit ee62101
Showing 1 changed file with 36 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,6 @@ This reference architecture shows an end-to-end [stream processing](../../data-g

![GitHub logo](../../_images/github.png) A reference implementation for this architecture is available on [GitHub][github].

## Potential use cases

**Scenario**: A taxi company collects data about each taxi trip. For this scenario, we assume there are two separate devices sending data. The taxi has a meter that sends information about each ride — the duration, distance, and pickup and drop-off locations. A separate device accepts payments from customers and sends data about fares. To spot ridership trends, the taxi company wants to calculate the average tip per mile driven, in real time, for each neighborhood.

This solution is optimized for the retail industry.

## Architecture

![Diagram showing a reference architecture for stream processing with Azure Databricks.](./images/stream-processing-databricks-new.png)
Expand All @@ -36,7 +30,15 @@ The architecture consists of the following components:

- [Synapse Link](/azure/cosmos-db/synapse-link) is the Microsoft preferred solution for analytics on top of Cosmos DB data.

## Data ingestion
## Scenario details

**Scenario**: A taxi company collects data about each taxi trip. For this scenario, we assume there are two separate devices sending data. The taxi has a meter that sends information about each ride — the duration, distance, and pickup and drop-off locations. A separate device accepts payments from customers and sends data about fares. To spot ridership trends, the taxi company wants to calculate the average tip per mile driven, in real time, for each neighborhood.

### Potential use cases

This solution is optimized for the retail industry.

### Data ingestion

To simulate a data source, this reference architecture uses the [New York City Taxi Data](https://uofi.app.box.com/v/NYCtaxidata/folder/2332218797) dataset<sup>[[1]](#note1)</sup>. This dataset contains data about taxi trips in New York City over a four-year period (2010 &ndash; 2013). It contains two types of record: Ride data and fare data. Ride data includes trip duration, trip distance, and pickup and drop-off location. Fare data includes fare, tax, and tip amounts. Common fields in both record types include medallion number, hack license, and vendor ID. Together these three fields uniquely identify a taxi plus a driver. The data is stored in CSV format.

Expand Down Expand Up @@ -90,17 +92,17 @@ using (var client = pool.GetObject())
}
```

### Event Hubs
#### Event Hubs

The throughput capacity of Event Hubs is measured in [throughput units](/azure/event-hubs/event-hubs-scalability#throughput-units). You can autoscale an event hub by enabling [auto-inflate](/azure/event-hubs/event-hubs-auto-inflate), which automatically scales the throughput units based on traffic, up to a configured maximum.

## Stream processing
### Stream processing

In Azure Databricks, data processing is performed by a job. The job is assigned to and runs on a cluster. The job can either be custom code written in Java, or a Spark [notebook](https://docs.databricks.com/user-guide/notebooks/index.html).
In this reference architecture, the job is a Java archive with classes written in both Java and Scala. When specifying the Java archive for a Databricks job, the class is specified for execution by the Databricks cluster. Here, the **main** method of the **com.microsoft.pnp.TaxiCabReader** class contains the data processing logic.

### Reading the stream from the two event hub instances
#### Reading the stream from the two event hub instances

The data processing logic uses [Spark structured streaming](https://spark.apache.org/docs/2.1.2/structured-streaming-programming-guide.html) to read from the two Azure event hub instances:
Expand All @@ -122,7 +124,7 @@ val rideEventHubOptions = EventHubsConf(rideEventHubConnectionString)
.load
```

### Enriching the data with the neighborhood information
#### Enriching the data with the neighborhood information

The ride data includes the latitude and longitude coordinates of the pick up and drop off locations. While these coordinates are useful, they are not easily consumed for analysis. Therefore, this data is enriched with neighborhood data that is read from a [shapefile](https://en.wikipedia.org/wiki/Shapefile).
Expand All @@ -134,7 +136,7 @@ val neighborhoodFinder = (lon: Double, lat: Double) => {
}
```

### Joining the ride and fare data
#### Joining the ride and fare data

First the ride and fare data is transformed:

Expand Down Expand Up @@ -181,7 +183,7 @@ And then the ride data is joined with the fare data:
val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))
```

### Processing the data and inserting into Cosmos DB
#### Processing the data and inserting into Cosmos DB

The average fare amount for each neighborhood is calculated for a given time interval:

Expand Down Expand Up @@ -210,11 +212,17 @@ maxAvgFarePerNeighborhood
.awaitTermination()
```

## Security considerations
## Considerations

These considerations implement the pillars of the Azure Well-Architected Framework, which is a set of guiding tenets that can be used to improve the quality of a workload. For more information, see [Microsoft Azure Well-Architected Framework](/azure/architecture/framework).

### Security

Security provides assurances against deliberate attacks and the abuse of your valuable data and systems. For more information, see [Overview of the security pillar](/azure/architecture/framework/security/overview).

Access to the Azure Databricks workspace is controlled using the [administrator console](https://docs.databricks.com/administration-guide/admin-settings/index.html). The administrator console includes functionality to add users, manage user permissions, and set up single sign-on. Access control for workspaces, clusters, jobs, and tables can also be set through the administrator console.
### Managing secrets
#### Managing secrets

Azure Databricks includes a [secret store](/azure/databricks/security/secrets/) that is used to store secrets, including connection strings, access keys, user names, and passwords. Secrets within the Azure Databricks secret store are partitioned by **scopes**:

Expand All @@ -233,7 +241,7 @@ databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

In code, secrets are accessed via the Azure Databricks [secrets utilities](https://docs.databricks.com/user-guide/dev-tools/dbutils.html#secrets-utilities).
## Monitoring considerations
### Monitoring

Azure Databricks is based on Apache Spark, and both use [log4j](https://logging.apache.org/log4j/2.x) as the standard library for logging. In addition to the default logging provided by Apache Spark, you can implement logging to Azure Log Analytics following the article [Monitoring Azure Databricks](../../databricks-monitoring/index.md).
Expand All @@ -250,15 +258,15 @@ Apache Spark uses the Dropwizard library to send metrics, and some of the native

The following are example queries that you can use in your Azure Log Analytics workspace to monitor the execution of the streaming job. The argument `ago(1d)` in each query will return all records that were generated in the last day, and can be adjusted to view a different time period.

### Exceptions logged during stream query execution
#### Exceptions logged during stream query execution

```kusto
SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"
```

### Accumulation of malformed fare and ride data
#### Accumulation of malformed fare and ride data

```kusto
SparkMetric_CL
Expand All @@ -274,7 +282,7 @@ SparkMetric_CL
| render timechart
```

### Job execution over time
#### Job execution over time

```kusto
SparkMetric_CL
Expand All @@ -286,7 +294,7 @@ SparkMetric_CL

For more information, see [Monitoring Azure Databricks](../../databricks-monitoring/index.md).

## DevOps considerations
### DevOps

- Create separate resource groups for production, development, and test environments. Separate resource groups make it easier to manage deployments, delete test deployments, and assign access rights.

Expand All @@ -311,13 +319,15 @@ For more information, see [Monitoring Azure Databricks](../../databricks-monitor

For more information, see the DevOps section in [Microsoft Azure Well-Architected Framework][AAF-devops].

## Cost considerations
### Cost optimization

Cost optimization is about looking at ways to reduce unnecessary expenses and improve operational efficiencies. For more information, see [Overview of the cost optimization pillar](/azure/architecture/framework/cost/overview).

Use the [Azure pricing calculator][azure-pricing-calculator] to estimate costs. Here are some considerations for services used in this reference architecture.

<!-- markdownlint-disable MD024 -->

### Event Hubs
#### Event Hubs

This reference architecture deploys Event Hubs in the **Standard** tier. The pricing model is based on throughput units, ingress events, and capture events. An ingress event is a unit of data 64 KB or less. Larger messages are billed in multiples of 64 KB. You specify throughput units either through the Azure portal or Event Hub management APIs.

Expand All @@ -327,7 +337,7 @@ The **Standard** tier is also billed based on ingress events and throughput unit

For information about Event Hubs pricing, see the [Event Hubs pricing][event-hubs-pricing].

### Azure Databricks
#### Azure Databricks

Azure Databricks offers two tiers **Standard** and **Premium** each supports three workloads. This reference architecture deploys Azure Databricks workspace in the **Premium** tier.

Expand All @@ -345,13 +355,13 @@ Azure Databricks offers many pricing models.

For more information, see [Azure Databricks Pricing][azure-databricks-pricing].

### Azure Cosmos DB
#### Azure Cosmos DB

In this architecture, a series of records are written to Cosmos DB by the Azure Databricks job. You are charged for the capacity that you reserve, expressed in Request Units per second (RU/s), used to perform insert operations. The unit for billing is 100 RU/sec per hour. For example, the cost of writing 100-KB items is 50 RU/s.

For write operations, provision enough capacity to support the number of writes needed per second. You can increase the provisioned throughput by using the portal or Azure CLI before performing write operations and then reduce the throughput after those operations are complete. Your throughput for the write period is the minimum throughput needed for the given data plus the throughput required for the insert operation assuming no other workload is running.

#### Example cost analysis
##### Example cost analysis

Suppose you configure a throughput value of 1,000 RU/sec on a container. It's deployed for 24 hours for 30 days, a total of 720 hours.

Expand All @@ -365,7 +375,7 @@ Use the [Cosmos DB capacity calculator][Cosmos-Calculator] to get a quick estima

For more information, see the cost section in [Microsoft Azure Well-Architected Framework][aaf-cost].

## Deploy the solution
## Deploy this scenario

To the deploy and run the reference implementation, follow the steps in the [GitHub readme][github].

Expand Down

0 comments on commit ee62101

Please sign in to comment.