Goal of this project is to play with Spark, AWS EMR, Spark Streaming data, Terraform, Ansible
The business use case can be for an investment firm. They may wants to understand the public's opinion about certain companies to see if there's as correlation between this and future stock prices. Unfortunately live stock price API is very costly so this will not be incorporated in this project. I may add an extra pipeline that gets the daily stock prices and puts them in S3
- Create connection with Twitter's Filter Stream API. Use a producer written with Java which connects to the API and sends these tweets to a topic in Kafka. This is in folder
./twitterProducer - A pySpark Structured Streaming Kafka Consumer will consume tweets in from Kafka, convert the tweets to JSON format the tweets are in into structured data for databases. See
./twitterConsumer - Consumer will save tweets in Parquet format in an AWS S3 Bucket for future use
I chose to manage my own kafka cluster, and AWS EMR with spark structured streams to just get a chance to play around with these technologies. Ansible was helpful with installing Kafka on all the hosts programmatically, restarting kafka cluster. Both terraform and ansible were helpful for documenting steps programmatically for transparency, reproducibility.
In a production environment, depending on how important this data is and what goals the team has, it may make more sense to not use self-managed kafka cluster, extra producer running manually/ec2 instance and even spark structured streaming. It's hard to maintain, and the large ec2 instances involved are costly per hour. I would probably choose just AWS Kinesis for this simple, low-streaming-rate application, with only one consumer. I would probably use lambda for processing the data. If I was using an CPU intensive ML model to classify the tweets, or needed more throughput, maybe an occasional batch processing job using EMR would be better.
Other design choices: I used both java and python since I wanted to work with both. But it may make more sense to stick to 1 language in a production environment.
Pre-set up:
- Twitter API:
- Get approval for setting up a Twitter API account
- Create API Keys as per instructions here
- Install Terraform, Ansible
- If you are running the Producer from your computer, install Java. If not, then you could save the producer as a .jar file and then run it as a daemon process on a server.
- Make sure
terraform/variables.tfandansible/hosts.ymluses the profile that you have the private key for. This will be the key pair you would use in
This guide is a work in progress
- Set up Terraform: Go to ./terraform
- Run
terraform planto ensure that there are no issues with the terraform plan - Run
terraform applyto create aws resources - Go to ./ansible:
cd ../ansible/. - Update
./updateServers.shwith newly created servers, and run the file:./updateServers.sh - To install Kafka, and start Zookeeper and brokers:
ansible-playbook -i hosts.yml kafka.yml - See readme to create kafka topic in a broker
Add BEARER_TOKEN as environmental variable to system. One may initially run unit tests in ./twitterProducer/src/tests/. to ensure that it's working
- Currently I run java project from IDE, though at some point may package it as a jar as a daemon process
In
FilteredStreamDemo.java, make sure to replacebootstrapServerswith the server and port of one of the kafka cluster's brokers
- In consumer.py, replace variable
BOOTSTRAP_SERVERSwith the list of the kafka brokers in the kafka cluster being used. Upload this consumer to a bucket in S3. Also can replace where spark structured streaming checkpoints and actual output parquet data will be saved - Submit the spark application to the AWS EMR cluster created with terraform. Look at
./twitterConsumer/readme.mdfor AWS CLI command.
Once the data starts coming into the Athena Folder, you can

- Continuous Data quality checks on
- data that comes out of twitter api before it's sent to Kafka with TwitterProducer
- data from kafka in spark streaming, before it's saved as a parquet (tradeoff is that more latency)
- Connect EC2 Instances to domain name - so that when I have to shutdown/restart Kafka, I don't have to change the ansible kafka installer, pyspark consumer, and java producer inputs
- Are there tests I can add to the consumer? At least integration tests
- Save Producer as a .jar file, and add parameters to allow one to submit kafka broker to send data to. run as a daemon process on an EC2 Instance
- If EMR Cluster, and Kafka is still running, is there a way to automate starting from missed offsets?
- Adjust NLP script to classify tweets using John Snow Lab's NLP Library
