Skip to content

aws-samples/msk-powered-financial-data-feed

Publish a real-time financial data feed to a Kafka client using Amazon MSK

This application demonstrates how to publish a real-time financial data feed as a service on AWS. It contains the code for a data provider to send streaming data to its clients via an Amazon MSK cluster. Clients can consume the data using a Kafka client SDK. If the client application is in another AWS account, it can connect to the provider's feed directly through AWS PrivateLink. The client can subscribe to a Kafka topic (e.g., "stock-quotes") to consume the data that is of interest. The client and provider authenticate each other using mutual TLS.

Best Practices Folder

In the Best Practices Folder you will find recommendations on:

  • MSK Official Best Practices
  • Right Sizing your MSK Cluster
  • What metrics should you monitor
  • Other supporting resources

Pre-requisites

You will need an existing Amazon Linux EC2 instance to deploy the cluster. This deployment instance should have git, jq, Python 3.7, Kafka Tools 2.6.2 or higher and the AWS CLI v2 installed. To install AWS CLI v2, see Installing the latest version of the AWS CLI You should run aws configure to specify the AWS access key and secret access key of an IAM user who has sufficient privileges (e.g., an admin) to create a new VPC, launch an MSK cluster and launch EC2 instances. The cluster will be deployed to your default region using AWS CDK. To install CDK on the deployment instance, see Getting started with the AWS CDK.

Deployment steps

  1. Creating a Private Certificate Authority
  2. Deploying the MSK Cluster
  3. Setting up the provider instance
  4. Deploying the Kafka client instance
  5. Configuring the client instance setup
  6. Running the provider and consumer applications

1. Creating a Private Certificate Authority

The Kafka provider and client will authenticate each other using mutual TLS (mTLS), so you need to use AWS Certificate Manager to create a Private Certificate Authority and root certificate as follows.

  1. Log in to your AWS Certificate Manager console and click on AWS Private CA.
  2. Click Create a Private CA , select CA type Root and fill in your organization details. Leave the other options as default and click Create CA.
  3. Once the CA becomes active, select Actions -> Install CA certificate on the CA's details page to install the root certificate.

2. Deploying the MSK Cluster

These steps will create a new Kafka provider VPC, and launch the Amazon MSK cluster there, along with a new EC2 instance to run the provider app.

  1. Log in to your deployment EC2 instance using ssh, and clone this repo.

    git clone https://github.com/aws-samples/msk-powered-financial-data-feed.git msk-feed
    cd msk-feed
    python3 -m pip install -r requirements.txt    
    export PATH=$PATH:$HOME/msk-feed/bin
    
  2. Add the following shell environment variables to your .bashrc file. Update the above variables with your AWS account number, region you are deploying to, and EC2 keypair name for that region. For the ACM_PCA_ARN variable, you can paste in the ARN of your Private CA from the CA details page.

    echo "export CDK_DEFAULT_ACCOUNT=123456789012" >> ~/.bashrc
    echo "export CDK_DEFAULT_REGION="us-east-1" >> ~/.bashrc
    echo "export EC2_KEY_PAIR='Your EC2 keypair'" >> ~/.bashrc
    echo "export ACM_PCA_ARN='ARN of your ACM Private Hosted CA'" >> ~/.bashrc
    echo "export MSK_PUBLIC='FALSE'" >> ~/.bashrc
    
    source ~/.bashrc
    
  3. Deploy the MSK cluster and other required infrastructure using the following cdk commands.

    cd cluster-setup
    cdk bootstrap
    cdk synth
    cdk deploy
    

NOTE: This step can take up to 45 minutes.

  1. After the app is deployed you will notice that your MSK Cluster does not have Public connectivity. For security reasons MSK does not allow to create a cluster with public access enabled. To enable public access set the MSK_PUBLIC environment variable to TRUE after the cluster is deployed. And redeploy CDK Stack.

    echo "export MSK_PUBLIC='TRUE'" >> ~/.bashrc
    source ~/.bashrc
    
    cdk deploy
    

NOTE: This step can take up to another 45 minutes.

  1. For the setup of client application stack you will need the MskVPCEndpoint and CLUSTERARN in your environment variables.These two variables can be found on the cdk deployment output (step above).

    echo "export MSK_VPC_ENDPOINT_SERVICE='DataFeedUsingMskStack.MskVPCEndpoint value'" >> ~/.bashrc
    echo "export CLUSTERARN='ARN of your MSK Cluster'" >> ~/.bashrc
    
    source ~/.bashrc
    

3. Setting up the provider instance

  1. After the above command finishes, ssh into the newly created provider EC2 instance as ec2-user. The name of the instance will end in msk-provider. In your home directory there, run the following commands.

    echo "export ACM_PCA_ARN='ARN of your ACM Private Hosted CA'" >> ~/.bashrc
    echo "export CLUSTERARN='ARN of your MSK Cluster'" >> ~/.bashrc
    source ~/.bashrc
    
    export PATH=$PATH:$HOME/msk-feed/bin
    
  2. Run aws configure and enter the AWS credentials of a user with admin privileges. Make sure to specify the same region that your MSK cluster got deployed.

  3. Run get_nodes.py python script to capture Zookeeper and Bootstrap nodes and export then to environment variables. First you will need export a few variables.

    alias python3=python3.8
    python3 -m pip install -r ~/msk-feed/requirements.txt
    python3 ~/msk-feed/bin/get_nodes.py
    
    source ~/.bashrc 
    

NOTE: You can find the values for your Bootstrap servers string and Zookeeper connections string by clicking on View client information on your MSK cluster details page. ZKNODES is the Plaintext Zookeeper connection string and TLSBROKERS is the Private endpoint.

  1. In your certs directory, create a private key and certificate signing request (CSR) file for the MSK broker's certificate.

    cd ~/certs
    makecsr
    

Enter your organization's domain name when asked for first and last name and enter additional organization details when prompted. Then make up a password for the your keystore when prompted. You will now have a CSR file called client_cert.csr.

  1. Sign the CSR and issue the certificate by running

    issuecert client_cert.csr
    

This uses your ACM Private Certificate Authority to sign the CSR and generate the certificate file, called client_cert.pem. Make sure you have ACM_PCA_ARN environment variable set.

  1. Import the certificate into your keystore.

    importcert client_cert.pem
    
    source ~/.bashrc 
    
  2. You should have in your certs directory the following files.

    • client_cert.csr - Certificate signing request file
    • client_cert.pem - Client certificate file
    • client.properties - Properties file that contains Kafka tools client configuration for TLS connection
    • kafka.client.keystore.jks - Java Key Store file that contains Client certificate, private key and trust chain
    • kafka.client.truststore.jks - Java Key Store file that contains trusted public CAs
    • private_key.pem - Private key for mutual TLS
    • truststore.pem - Store of external certificates that are trusted
  3. Update the advertised listener ports on the MSK cluster

    kfeed -u
    

NOTE: The above command updates the advertised listeners on the MSK cluster to allow the private NLB to send a message to a specific broker at a specific port (e.g., port 8441 for broker b-1). If prompted to confirm removing the temporary ACL, type yes.

4. Deploying the Kafka client instance

The steps below will create a client EC2 instance in a new VPC to run the Kafka consumer application. These steps will also create a VPC endpoint that connects to the MSK cluster via PrivateLink, and a Route 53 Private Hosted Zone that aliases the broker names to the VPC endpoint's DNS name.

  1. Go to your deployment instance (used in section 2) and make sure you have CLUSTERARN, MSK_VPC_ENDPOINT_SERVICE and EC2_KEY_PAIR on your environment variables. If you don't have these variables, follow the steps below.

    echo "export MSK_VPC_ENDPOINT_SERVICE='value of DataFeedUsingMskStack.MskVPCEndpoint'" >> ~/.bashrc
    echo "export CLUSTERARN='ARN of your MSK Cluster'" >> ~/.bashrc
    echo "export EC2_KEY_PAIR='Your EC2 keypair'" >> ~/.bashrc
    
    source ~/.bashrc
    

You can find the name of your VPC endpoint service by clicking on Endpoint services in your AWS VPC console, and selecting the service, and looking in the service details section. The name begins with com.amazonaws.

  1. Then create the client infrastructure in a new client VPC by typing the following.

    cd ../client-setup
    cdk synth
    cdk deploy
    

5. Configuring client instance

The steps below will finish setting up the client instance for private access to the cluster via PrivateLink. The client will need to obtain a signed certificate from the provider.

  1. In a separate terminal window, ssh to your client instance and enter the following.

    alias python3=python3.8
    export PATH=$PATH:$HOME/msk-feed/bin
    
    source ~/.bashrc
    
    cd certs
    makecsr
    

Enter the organization details for the client when prompted.

  1. Copy the client_cert.csr file from the client instance to the provider instance as consumer_cert.csr, and run the issuecert command on it to generate the SSL cert for the client application.

    cd ~/certs
    issuecert consumer_cert.csr
    

NOTE: In a real-world scenario, the client would upload the CSR file to the provider's Website for signing.

  1. Copy the generated consumer_cert.pem file back to the client instance as , and put it in the certs folder. Then issue the following command on the client instance.

    importcert consumer_cert.pem
    
    source ~/.bashrc
    
  2. On the provider instance, create a test Kafka topic named topic1 using the kfeed command.

    kfeed --create-topic topic1
    

The above can be shortened to kfeed -c topic1

  1. On the provider instance, add an ACL to allow the producer to write to the topic.

    cd ~/certs/
    kfeed --allow client_cert.pem producer topic1
    

The above can be abbreviated as kfeed -a client_cert.pem p topic1

Note: **client_cert.pem` is the certificate you generated earlier for the producer.

  1. On the provider instance add an ACL for the consumer application to consume from the topic, as follows.

    cd ~/certs/
    kfeed --allow consumer_cert.pem consumer topic1
    

The above can be abbreviated as kfeed -a consumer_cert.pem c topic1

6. Running the provider and consumer applications

Testing sample producer and consumer python clients

  1. In your client instance, run the test consumer application.

    # For Public Connectivity over Internet
    cd ~/msk-feed/data-feed-examples/
    python3 consumer_internet.py
    
    # For Private Connectivity over PrivateLink
    cd ~/msk-feed/data-feed-examples/
    python3 consumer_internet.py
    
  2. In your provider instance, run the test producer application.

    cd ~/msk-feed/data-feed-examples/
    python3 producer.py 
    

Testing Alpaca producer and consumer python clients

  1. alpaca-producer.py is an example of a Kafka producer that ingests data from a market data provider called Alpaca Markets and feeds the data to your MSK Cluster. Alpaca offers a free tier API that is a good example of real world data, since it is live market data. There are a few steps that you need to perform to make it work correctly.

  2. Sign up for the Alpaca free tier API.

  3. Generate an API KEY ID and a Secret Key

  4. Log in using ssh to the provider instance and export Alpaca credentials to the following environment variables.

    export APCA_API_KEY_ID="<API KEY ID>"
    export APCA_API_SECRET_KEY="<Secret Key>"
    
  5. On the provider instance, create the following topics.

    kfeed -c trade 
    kfeed -c quote
    kfeed -c crypto_trade
    kfeed -l 
    
  6. On the provider instance, add the necessary ACLs to give the producer and consumer access to the topics.

    kfeed -a client_cert.pem p trade
    kfeed -a client_cert.pem p quote
    kfeed -a client_cert.pem p crypto_trade
    kfeed -a consumer_cert.pem c trade
    kfeed -a consumer_cert.pem c quote
    kfeed -a consumer_cert.pem c crypto_trade
    
  7. On the provider instance, run the producer in the ~/msk-feed/data-feed-examples folder.

    python3 alpaca-producer.py
    
  8. In a separate terminal window, ssh to the client instance and run the consumer in the data-feed-examples folder

    python3 alpaca-consumer.py
    

You should see the messages in the screen.

Contributors

Diego Soares

Rana Dutt

Security

See CONTRIBUTING for more information.

License

This library is licensed under the MIT-0 License. See the LICENSE file.