# How to upload data to cassandra

In [155]:
from cassandra.cluster import Cluster
from cassandra.query import BatchStatement
from cassandra import ConsistencyLevel
import pandas as pd

In [9]:
cluster = Cluster(['172.18.0.2'])
session = cluster.connect('ecommerce')

In [128]:
res = session.execute("""
DROP TABLE orders_by_year_and_month;
""")

In [129]:
CREATE_QUERY = """CREATE TABLE ecommerce.orders_by_year_and_month (
    id text,
    year int,
    month tinyint,
    day tinyint,
    hour tinyint,
    time timestamp,
    PRIMARY KEY ((year, month), day, hour));"""

In [130]:
res = session.execute(CREATE_QUERY)

In [131]:
print(session.execute("desc orders_by_year_and_month;").one().create_statement)

CREATE TABLE ecommerce.orders_by_year_and_month (
    year int,
    month tinyint,
    day tinyint,
    hour tinyint,
    id text,
    time timestamp,
    PRIMARY KEY ((year, month), day, hour)
) WITH CLUSTERING ORDER BY (day ASC, hour ASC)
    AND additional_write_policy = '99p'
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND cdc = false
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND default_time_to_live = 0
    AND extensions = {}
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair = 'BLOCKING'
    AND speculative_retry = '99p';


In [162]:
df = pd.read_csv("e-commerce.csv", usecols=["order_id", "order_purchase_timestamp"])
df["time"] = pd.to_datetime(df["order_purchase_timestamp"])
df["year"] = df.time.apply(lambda x: x.year)
df["month"] = df.time.apply(lambda x: x.month)

In [163]:
INSERT_QUERY = "INSERT INTO orders_by_year_and_month(id, year, month, day, hour, time) VALUES (?,?,?,?,?,?)"

In [164]:
prepare = session.prepare(INSERT_QUERY)

## Batch upload

In [165]:
for index, data in df.groupby(["year", "month"]):
    batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)
    for item in data.itertuples():
        batch.add(prepare,(item.order_id, item.time.year, item.time.month, item.time.day, item.time.hour, item.time.to_pydatetime()))
    session.execute(batch)

## Upload one-by-one

In [166]:
#for item in data.itertuples():
#    session.execute(prepared, (item.order_id, item.time.year, item.time.month, item.time.day, item.time.hour, item.time.to_pydatetime()))

In [167]:
res = session.execute("SELECT * FROM ecommerce.orders_by_year_and_month WHERE YEAR=2017 AND MONTH=12;")

## Download dataset

In [171]:
months = tuple(range(1,13))
str(months)

'(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)'

In [181]:
years = list(range(2017,2020))

whole_df = pd.DataFrame(columns=["year", "month", "day", "hour", "id", "time"])

for year in years:
    res = session.execute(f"select * from orders_by_year_and_month WHERE YEAR={year} AND MONTH IN {months}")
    whole_df = whole_df.append(pd.DataFrame(res.all()))

In [182]:
whole_df

Unnamed: 0,year,month,day,hour,id,time
0,2017,1,5,11,ec7a019261fce44180373d45b442d78f,2017-01-05 11:56:06
1,2017,1,5,12,b95a0a8bd30aece4e94e81f0591249d8,2017-01-05 12:14:58
2,2017,1,5,13,f175d67589e059cbbda956f10f0702e6,2017-01-05 13:59:30
3,2017,1,5,14,f92641ff0446a0e1c57195ebfe76e16a,2017-01-05 14:50:54
4,2017,1,5,15,e1fe072ef14b519af1f0a8ed997c1301,2017-01-05 15:37:58
...,...,...,...,...,...,...
5051,2018,8,29,11,d03ca98f59480e7e76c71fa83ecd8fb6,2018-08-29 11:06:11
5052,2018,8,29,12,52018484704db3661b98ce838612b507,2018-08-29 12:25:59
5053,2018,8,29,14,168626408cb32af0ffaf76711caae1dc,2018-08-29 14:52:00
5054,2018,8,29,15,35a972d7f8436f405b56e36add1a7140,2018-08-29 15:00:37
