## Basic Cassandra Operations using Python

In [111]:
from cassandra.cluster import Cluster
import time

In [112]:
cluster = Cluster(['localhost'])

In [113]:
try:
    session = cluster.connect()
    print("Connection to Cassandra Cluster Successfull!")
except:
    print("Something went wrong!")

Connection to Cassandra Cluster Successfull!


### Creating Keyspace

In [114]:
keyspace="empdb"

In [115]:
create_keyspace="CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class':'SimpleStrategy','replication_factor':1}}".format(keyspace)
session.execute(create_keyspace)
print("{} Keyspace Created Successfully!".format(keyspace))

empdb Keyspace Created Successfully!


In [116]:
session.set_keyspace('{}'.format(keyspace))
print("Using {} Keyspace".format(keyspace))

Using empdb Keyspace


### Creating Table

In [117]:
table="emp"

In [118]:
create_table = "CREATE TABLE {}({}_id int PRIMARY KEY,{}_name text, {}_city text, {}_phone varint)".format(table,table,table,table,table)
session.execute(create_table)
print("{} table created Successfully!".format(table))

emp table created Successfully!


### Inserting data into table

In [119]:
insert_into_table = "INSERT INTO {}({}_id, {}_name, {}_city, {}_phone) VALUES(1, 'Varun', 'Nagpur', 7875121904)".format(table,table,table,table,table)
session.execute(insert_into_table)
print("Data inserted into {} table Successfully!".format(table))

Data inserted into emp table Successfully!


In [120]:
insert_into_table = "INSERT INTO {}({}_id, {}_name, {}_city, {}_phone) VALUES(2, 'Sandesh', 'Aurangabad', 8793753269)".format(table,table,table,table,table)
session.execute(insert_into_table)
print("Data inserted into {} table Successfully!".format(table))

Data inserted into emp table Successfully!


### Reading data from table

In [121]:
read_from_table = "SELECT * FROM {}".format(table)
rows = session.execute(read_from_table)
for row in rows:
    print(row)

Row(emp_id=1, emp_city='Nagpur', emp_name='Varun', emp_phone=7875121904)
Row(emp_id=2, emp_city='Aurangabad', emp_name='Sandesh', emp_phone=8793753269)


### Updating data in table

In [122]:
update_table = "UPDATE {} SET {}_city='Pune' WHERE {}_id=1".format(table,table,table)
session.execute(update_table)
print("Table {} updated Successfully!".format(table))

Table emp updated Successfully!


### Deleting data from table

In [123]:
delete_from_table = "DELETE {}_phone FROM {} WHERE {}_id=2".format(table,table,table)
session.execute(delete_from_table)
print("Delete operation on {} table done Successfully!".format(table))

Delete operation on emp table done Successfully!


### Truncating data from table

In [124]:
truncate_table = "TRUNCATE {}".format(table)
session.execute(truncate_table)
print("All data deleted from {} table Successfully!".format(table))

All data deleted from emp table Successfully!


 ## Basic Cassandra Operations using PySpark

In [125]:
import findspark
from pyspark.sql import SparkSession

In [126]:
findspark.init("/usr/local/spark")

### Building SparkSession

In [127]:
spark = SparkSession.builder.getOrCreate()

### Reading CSV file into PySpark Dataframe

In [128]:
df = spark.read.csv("/home/ubuntu/empSample.csv", header="True")
df.show()

+------+--------+--------+----------+
|emp_id|emp_name|emp_city| emp_phone|
+------+--------+--------+----------+
|     1|   Varun|  Nagpur|7875121904|
+------+--------+--------+----------+



### Writing PySpark Dataframe into Cassandra

In [129]:
df.write.format("org.apache.spark.sql.cassandra").mode("append").options(table=table, keyspace=keyspace).save()
print("Successfully inserted PySpark Dataframe into Cassandra!")

Successfully inserted PySpark Dataframe into Cassandra!


### Reading data from Cassandra using PySpark

In [130]:
cas_df = spark.read.format("org.apache.spark.sql.cassandra").options(table=table, keyspace=keyspace).load()
cas_df.show()

+------+--------+--------+----------+
|emp_id|emp_city|emp_name| emp_phone|
+------+--------+--------+----------+
|     1|  Nagpur|   Varun|7875121904|
+------+--------+--------+----------+



### Deleting table

In [131]:
delete_table = "DROP table {}".format(table)
session.execute(delete_table)
print("{} table deleted Successfully!".format(table))

emp table deleted Successfully!


### Deleting Keyspace

In [132]:
delete_keyspace="DROP KEYSPACE IF EXISTS {}".format(keyspace)
session.execute(delete_keyspace)
print("{} Keyspace deleted Successfully!".format(keyspace))

empdb Keyspace deleted Successfully!


### Shutting down Cassandra Cluster

In [133]:
print("Shutting down Cluster...")
time.sleep(5)
cluster.shutdown()
print("Cluster Shutdown!")

Shutting down Cluster...
Cluster Shutdown!


### Stopping the SparkSession

In [134]:
spark.stop()
print("SparkSession Stopped Successfully!")

SparkSession Stopped Successfully!
