Skip to content

Cassandra

Hoa Nguyen edited this page Sep 3, 2016 · 16 revisions

Introduction

Cassandra is a decentralized NoSQL database that remains available in the event of network partitions (AP in Brewer’s CAP theorem - a brief explanation of CAP theorem) by avoiding the use of a “master” node. It uses the “wide-row” columnar data model, which can be thought of as a key-key-value (or a map of maps) which enables a high volume of writes and efficient range queries. Cassandra is one of the most popular AP databases that excels at time-series and datacenter replication.

Cassandra delivers continuous availability, linear scalability, and operational simplicity across many commodity servers with no single point of failure, along with a powerful dynamic data model designed for maximum flexibility and fast response times.

Requirements

An AWS cluster of 3 nodes or more

Dependencies

Java 8 is required for Cassandra 3.0. Install Java, if it isn’t installed, on all nodes:

node:~$ sudo add-apt-repository ppa:webupd8team/java
node:~$ sudo apt-get update
node:~$ sudo apt-get install oracle-java8-installer

Install Cassandra

This installation process must be executed on each machine.

We will grab the Cassandra 3.0.8 version and save it to a Downloads folder. Next we will install it into our /usr/local directory and rename the folder to simply ‘cassandra’

any-node:~$ curl -L http://downloads.datastax.com/community/dsc.tar.gz | tar xz -C ~/Downloads
any-node:~$ sudo mv ~/Downloads/dsc-cassandra-* /usr/local/cassandra
any-node:~$ nano ~/.profile

Add the following Cassandra environment variables to your .profile.

export CASSANDRA_HOME=/usr/local/cassandra
export PATH=$PATH:$CASSANDRA_HOME/bin

Then source .profile:

any-node:~$ . ~/.profile

Configure Cassandra

If the Cassandra cluster is within the same AWS region the configuration file will be essentially the same across all nodes.

For the purposes of this set-up, designate one of your nodes the "seed." So for your cluster, the configuration file on each node will have the same <seed-private-ip> but the <local-private-ip> should be set to the private IP address for that node. The following shows the fields to change for each node's Cassandra configuration file:

any-node:~$ sudo nano /usr/local/cassandra/conf/cassandra.yaml

See below for which sections in /usr/local/cassandra/conf/cassandra.yaml you should customize to your settings. (i.e., change the text in bold):

# The name of the cluster. This is mainly used to prevent machines in
# one logical cluster from joining another.
cluster_name: '<your-cluster-name>'

…
# any class that implements the SeedProvider interface and has a
# constructor that takes a Map of parameters will do.
seed_provider:
    # Addresses of hosts that are deemed contact points.
    # Cassandra nodes use this list of hosts to find each other and learn
    # the topology of the ring.  You must change this if you are running
    # multiple nodes!
    - class_name: org.apache.cassandra.locator.SimpleSeedProvider
      parameters:
          # seeds is actually a comma-delimited list of addresses.
          # Ex: "<ip1>,<ip2>,<ip3>"
          - seeds: "<seed-private-ip>"  # Will be the same across all nodes. e.g., format “1.2.3.4” 
…
# You _must_ change this if you want multiple nodes to be able to communicate!
#
# Set listen_address OR listen_interface, not both. Interfaces must correspond
# to a single address, IP aliasing is not supported.
#
# Leaving it blank leaves it up to InetAddress.getLocalHost(). This
# will always do the Right Thing _if_ the node is properly configured
# (hostname, name resolution, etc), and the Right Thing is to use the
# address associated with the hostname (it might not be).
#
# Setting listen_address to 0.0.0.0 is always wrong.
#
# If you choose to specify the interface by name and the interface has an ipv4 and an ipv6 address
# you can specify which should be chosen using listen_interface_prefer_ipv6. If false the first ipv4
# address will be used. If true the first ipv6 address will be used. Defaults to false preferring
# ipv4. If there is only one address it will be selected regardless of ipv4/ipv6.

listen_address: <local-private-ip> # E.g. format: 1.2.3.4
# No quotations around the DNS and you don’t need the tailing .us-west-2.compute.internal 

…
…

# The address or interface to bind the Thrift RPC service and native transport
# server to.
#
# Set rpc_address OR rpc_interface, not both. Interfaces must correspond
# to a single address, IP aliasing is not supported.
#
# Leaving rpc_address blank has the same effect as on listen_address
# (i.e. it will be based on the configured hostname of the node).
#
# Note that unlike listen_address, you can specify 0.0.0.0, but you must also
# set broadcast_rpc_address to a value other than 0.0.0.0.
#
# For security reasons, you should not expose this port to the internet.  Firewall it if needed.
#
# If you choose to specify the interface by name and the interface has an ipv4 and an ipv6 address
# you can specify which should be chosen using rpc_interface_prefer_ipv6. If false the first ipv4
# address will be used. If true the first ipv6 address will be used. Defaults to false preferring
# ipv4. If there is only one address it will be selected regardless of ipv4/ipv6.

rpc_address: 0.0.0.0  

# rpc_interface: eth1
# rpc_interface_prefer_ipv6: false
    
# port for Thrift to listen for clients on
rpc_port: 9160
    
# RPC address to broadcast to drivers and other Cassandra nodes. This cannot
# be set to 0.0.0.0. If left blank, this will be set to the value of
# rpc_address. If rpc_address is set to 0.0.0.0, broadcast_rpc_address must
# be set.
    
broadcast_rpc_address: <local-private-ip> # E.g. format: 1.2.3.4
# No quotations around the dns and you don’t need the tailing .us-west-2.compute.internal     

…
…
# endpoint_snitch -- Set this to a class that implements
# IEndpointSnitch.  The snitch has two functions:
# - it teaches Cassandra enough about your network topology to route
#   requests efficiently
# - it allows Cassandra to spread replicas around your cluster to avoid
#   correlated failures. It does this by grouping machines into
#   "datacenters" and "racks."  Cassandra will do its best not to have
#   more than one replica on the same "rack" (which may not actually
#   be a physical location)
#
# IF YOU CHANGE THE SNITCH AFTER DATA IS INSERTED INTO THE CLUSTER,
# YOU MUST RUN A FULL REPAIR, SINCE THE SNITCH AFFECTS WHERE REPLICAS
# ARE PLACED.
#
# Out of the box, Cassandra provides
#  - SimpleSnitch:
#    Treats Strategy order as proximity. This can improve cache
#    locality when disabling read repair.  Only appropriate for
#    single-datacenter deployments.
#  - GossipingPropertyFileSnitch
#    This should be your go-to snitch for production use.  The rack
#    and datacenter for the local node are defined in
#    cassandra-rackdc.properties and propagated to other nodes via
#    gossip.  If cassandra-topology.properties exists, it is used as a
#    fallback, allowing migration from the PropertyFileSnitch.
#  - PropertyFileSnitch:
#    Proximity is determined by rack and data center, which are
#    explicitly configured in cassandra-topology.properties.
#  - Ec2Snitch:
#    Appropriate for EC2 deployments in a single Region. Loads Region
#    and Availability Zone information from the EC2 API. The Region is
#    treated as the datacenter, and the Availability Zone as the rack.
#    Only private IPs are used, so this will not work across multiple
#    Regions.
#  - Ec2MultiRegionSnitch:
#    Uses public IPs as broadcast_address to allow cross-region
#    connectivity.  (Thus, you should set seed addresses to the public
#    IP as well.) You will need to open the storage_port or
#    ssl_storage_port on the public IP firewall.  (For intra-Region
#    traffic, Cassandra will switch to the private IP after
#    establishing a connection.)
#  - RackInferringSnitch:
#    Proximity is determined by rack and data center, which are
#    assumed to correspond to the 3rd and 2nd octet of each node's IP
#    address, respectively.  Unless this happens to match your
#    deployment conventions, this is best used as an example of
#    writing a custom Snitch class and is provided in that spirit.
#
# You can use a custom Snitch by setting this to the full class name
# of the snitch, which will be assumed to be on your classpath.

endpoint_snitch: Ec2Snitch
…
…

We can now start the Cassandra cluster starting with the seed node you chose. The other Cassandra nodes will use the Ec2Snitch to automatically discover the seed and add themselves to the cluster. Execute this on each node. With each addition you should also see a discovery of the previous nodes in the cluster in the terminal as well.

any-node:~$ cassandra

After all nodes have started the Cassandra service, you can check the Cassandra ring with the following command:

any-node:~$ nodetool status

Removing nodes from the cluster

Removing a Cassandra node is quite simple using the nodetool. First we need to know which node to remove.

any-node:~$ nodetool status

The Status of the node is on the left most column. We can see that the last node shows DN which means it is down. We will use the Host ID to remove this from the cluster

any-node:~$ nodetool removenode 6688865b-be26-41b1-8e70-08f1ab7ff0ff

Afterwards you can check the nodes in the ring again. The data should now be redistributed amongst the remaining nodes.

Getting started with Cassandra

Refer to CQL tutorial to get started with Cassandra. Also, read about Cassandra Data Modeling - it does a great job of explaining through examples.

There are Java and Python Cassandra clients which you can use: Python Cassandra Driver Java Cassandra Driver

Cassandra Example

Start CQL on any of the Cassandra nodes (make sure Cassandra is running on all the nodes). The first step is to create a keyspace. Creating a keyspace is the CQL counterpart to creating an SQL database, but a little different. The Cassandra keyspace is a namespace that defines how data is replicated on nodes. Typically, a cluster has one keyspace per application. Replication is controlled on a per-keyspace basis, so data that has different replication requirements typically resides in different keyspaces. Keyspaces are not designed to be used as a significant map layer within the data model. Keyspaces are designed to control data replication for a set of tables.

any-node:~$ cqlsh
cqlsh> CREATE KEYSPACE playground WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3};

There are two options for replication strategy:

  • SimpleStrategy Use only for a single data center. SimpleStrategy places the first replica on a node determined by the partitioner. Additional replicas are placed on the next nodes clockwise in the ring without considering topology (rack or data center location).

  • NetworkTopologyStrategy Use NetworkTopologyStrategy when you have (or plan to have) your cluster deployed across multiple data centers. This strategy specifies how many replicas you want in each data center. You can read more here.

As we are not deploying our cluster across multiple data centers, we use SimpleStrategy with a replication factor of 3. Now, let’s use this keyspace and create a table.

cqlsh> USE playground;
cqlsh:playground> CREATE TABLE email (id text, date text, time timestamp, fname text, lname text, message text, PRIMARY KEY ((id, date), time), ) WITH CLUSTERING ORDER BY (time DESC);

Every table in Cassandra has a Primary Key. Primary key has a compulsory Partition Key and an optional clustering column. The Partition Key can be a compound key, as we have in this case. The partition key determines on which node the data is stored (all rows for a partition key are on one node) and the data is ordered according to clustering column. Ideally, you have two goals for your data model:

  • Spread the load across all nodes in a cluster equally
  • Minimize the number of partitions read in a query to improve latency

Rows are spread around the cluster based on a hash of the partition key. So, the key to spreading data evenly is to pick a good primary key. Also, we want to minimize the number of partitions read in our query - as more the number of partitions, the more physical nodes we have to read from. So, the access time/ read time would actually depend on how many partitions we are reading in a query.

In this example, all the emails from a given email address for a given day will be on one node - because of our compound partition key. So, if our query is to get all the emails from a particular email id for a given date, we read only one partition. But if our query is to select all the emails from a given email id then we are reading multiple partitions and violating the second goal.

If we did not have a compound partition key and instead, the partition key was only “id” then the load wouldn’t be the same across all nodes because it’s possible that a large number of emails come from one particular email id as compared to the other email ids. In this case, if our query is to select all the emails from a given email id then we are reading just one partition and that aligns with our first goal.

Hence, the design of the table depends on the query we want to execute and there is always a tradeoff to balance both the goals.

cqlsh> USE playground;
cqlsh:playground> INSERT INTO email (id, date, time, fname, lname, message) VALUES ('ronak@insightdataengineering.com', '2015-09-01', '2015-09-01 10:03:00', 'Ronak', 'Nathani', 'Welcome to Insight!');
cqlsh:playground> SELECT * FROM email ;
cqlsh:playground> INSERT INTO email (id, date, time, fname, lname, message) VALUES ('ronak@insightdataengineering.com', '2015-09-01', '2015-09-01 12:10:00', 'Ronak', 'Nathani', 'We are sure you will build great things at Insight!');
cqlsh:playground> INSERT INTO email (id, date, time, fname, lname, message) VALUES ('ronak@insightdataengineering.com', '2015-09-03', '2015-09-03 14:23:00', 'Ronak', 'Nathani', 'Very excited to work with you all');
cqlsh:playground> INSERT INTO email (id, date, time, fname, lname, message) VALUES ('austin@insightdataengineering.com', '2015-09-01', '2015-09-01 10:13:00', 'Austin', 'Ouyang', 'Your goal is to become an awesome data engineer at a top tier company');
cqlsh:playground> INSERT INTO email (id, date, time, fname, lname, message) VALUES ('austin@insightdataengineering.com', '2015-09-02', '2015-09-02 09:46:00', 'Austin', 'Ouyang', 'We have happy hours on every Friday!');
cqlsh:playground> INSERT INTO email (id, date, time, fname, lname, message) VALUES ('austin@insightdataengineering.com', '2015-09-01', '2015-09-01 10:13:00', 'Austin', 'Ouyang', 'If you are stuck, ask us or your fellow Fellows.');
cqlsh:playground> INSERT INTO email (id, date, time, fname, lname, message) VALUES ('david@insightdataengineering.com', '2015-09-01', '2015-09-01 11:00:00', 'David', 'Drummond', 'In the next 4 weeks, you will meet data teams from great companies.');
cqlsh:playground> INSERT INTO email (id, date, time, fname, lname, message) VALUES ('david@insightdataengineering.com', '2015-09-01', '2015-09-01 15:33:00', 'David', 'Drummond', 'Bounce off project ideas off of us or fellows. Brainstorming is really helpful.');
cqlsh:playground> INSERT INTO email (id, date, time, fname, lname, message) VALUES ('david@insightdataengineering.com', '2015-09-01', '2015-09-01 10:15:00', 'David', 'Drummond', 'Alumni is a great resource for you to get help from.');

Now, let’s write a query to fetch all the emails by David on 2015-09-01 before 14:00pm.

cqlsh:playground> SELECT * FROM email WHERE id = 'david@insightdataengineering.com' and date='2015-09-01' and time < '2015-09-01 14:00:00';

As you can see, Cassandra is really good at time series. You can also try the following query to see that the rows are ordered by time in descending order.

cqlsh:playground> SELECT * FROM email WHERE id = 'david@insightdataengineering.com' and date='2015-09-01' Limit 2;

Read more about time series data modeling in Cassandra here.