# HW 1 - Data Platform IDC
ID1: 308564293

ID2: 311898746

## Step 1: Setup Cassandra Cluster locally -

> Code is in markdown as we don't want to run this from the notebook
> The setup is oriented for mac env

Make sure Docker Desktop is installed. If not, download and install Docker from the official website [Get Docker | Docker Docs](https://docs.docker.com/get-docker/).

Download docker image for cassandra
```bash
docker pull cassandra:latest
```

Run container locally open for conncetion from the notebook
```bash
docker run --volume=/var/lib/cassandra --restart=no -p 127.0.0.1:9042:9042 -p 127.0.0.1:9160:9160 --name hw-cass -d cassandra:latest
```

Connect to run `cqlsh` locally
```bash
docker exec -it hw-cass cqlsh
```

## Step 2: setup python environment

install python
```bash
brew install python
```

install required libraries

Install libraries
```bash
pip3 install cassandra-driver
pip3 install pandas
```

## Step 3: Chossing the dataset

##### Data set content - [Dataset link.](https://www.kaggle.com/datasets/arashnic/book-recommendation-dataset/data)
The Book-Crossing dataset has 3 files and is given in .csv form.


`Users` Has 3 colums:
* User-ID (int): Anonymized user identifier mapped to integers.
* Location (text): Demographic information about the user's location.
* Age (float): Age of the user. May contain NULL-values if not available.

Rows:  279K, each identifing a user.

`Books` Has 8 columns:
* SBN (text): Unique identifier for each book.
* Book-Title (text): Title of the book.
* Book-Author (text): Author of the book. In case of multiple authors, only the first one is provided.
* Year-Of-Publication (int): The year when the book was published.
* Publisher (text): The publisher of the book.
* Image-URL-S, Image-URL-M, Image-URL-L (text): URLs linking to cover images in small, medium, and large sizes.

Rows: 271K, that identify books.

`Ratings` Has 3 columns:
* User-ID (int): Anonymized user identifier corresponding to Users table.
* ISBN (text): Unique identifier corresponding to Books table.
* Book-Rating (int): User's rating for a particular book. Explicit ratings are on a scale from 1-10, while implicit ratings are expressed as 0.

Rows: 1.14M, each representing an interaction where a user rated a specific (single) book.


#### Dataset Selection
* The dataset was selected for it's very large size (14 columsn and over 1.5M rows) that will allow us to explore Cassandra's strengths with large datasets.
* The dataset is also realistic, reflecting a real-life scenario of different data soruces and systems that are interconnected. 
* Data is relatively well organized and clean.

**In this project:** The data set is stored in the `Data` folder - where the names of the files are corresponding to the explnation above.

## Step 4: Cassendra Database design

##### Databse Design
The DB will be designed in accordance to the queries that we will want to run:
* Queries related to the books rating, number of rates, avg rating.
* Queries related to authors, most rated authors, highest rated authors.
* Most active users (users with highest rating counts) and their locations.

From this the structure of the DB will be built to reflect these queries. 
`books` and `users` will be tables that correspond to a database of books and users with their full information.

`books_rated_by_user` will contain each transaction (rating event) keyed by the book ID. Similalry, `users_by_rated_books` will contrain the same transaction information but keyed by the user.

This will allow us to access data based on book ID or user ID and also preform aggregations on each of the partition keys.

##### `books`
* **Structure**: ISBN: text, Book-Title: text, Book-Author: text, Year-Of-Publication: int, Publisher: text, Image-URL-S: text, Image-URL-M: text, Image-URL-L: text
* **Primary key**: (ISBN, Book-Author, Publisher)
* **Partition Key**: ISBN
* **Clustering Column**: Book-Author, Publisher

ISBN is a unique identifier per book so it will be the partition key.


##### `users`
* **Structure**: User-ID: int, Location: text, Age: float (for some cases age was not present, we decided to store this as 0 since age is part of the clustering key)
* **Primary key**: (User-ID, Location, Age)
* **Partition Key**: User-ID
* **Clustering Column**: Location, Age

User-ID is a unique identifier per user so it's a good partition key. Clustering keys are seleted to be Location and Age which are expected aggregations.



## Step 5: Setup keyspace and tables

First - We would like to load all the libraries need for ingestion and working with the cassandra DB

In [2]:
# Cassandra-driver
from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement, BatchStatement

# Data
import csv
import pandas as pd
import concurrent.futures
from collections import namedtuple

Connect to our cassandra instance.

In [3]:
# Connect Cassandra-Driver to the Cluster running on the Docker:
cluster = Cluster(['127.0.0.1'])
session = cluster.connect()

Create out key space `books`.
We will use `SimpleStrategy` and `replication_factor` = `1` as they serve us well for the purpose of this excsrsize - as we are not looking for any HA or significant scale

In [4]:
session.execute("CREATE KEYSPACE IF NOT EXISTS books WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };")

<cassandra.cluster.ResultSet at 0x110108040>

Make sure keyspace created

In [5]:
session.execute("DESCRIBE books;").one()[3]

"CREATE KEYSPACE books WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}  AND durable_writes = true;"

Use the keyspace

In [6]:
session.execute("USE books");

##### Table creation: 
Next, we will create the tables according to the DB schema.

In [11]:
session.execute("""
    CREATE TABLE IF NOT EXISTS books (
        ISBN text,
        Book_Title text,
        Book_Author text,
        Publisher text,
        YOP int,       
        Image_URL_S text,
        Image_URL_M text,
        Image_URL_L text,
        PRIMARY KEY ((isbn))
    )
""").one()

In [12]:
session.execute("""
    CREATE TABLE IF NOT EXISTS users (
        age float,
        location text,
        user_id int,
        PRIMARY KEY(user_id)
    )
""").one()

In [13]:
session.execute("""
    CREATE TABLE IF NOT EXISTS books_rated_by_users (
        user_id   int,
        age int,
        location text,
        isbn   text,
        book_author text,
        yop int,
        publisher text,
        book_rating int,
        primary key ((isbn), age, location, user_id)
    )
""").one()

In [14]:
session.execute("""
    CREATE TABLE IF NOT EXISTS users_by_rated_books (
        user_id   int,
        age int,
        location text,
        isbn   text,
        book_author text,
        yop int,
        publisher text,
        book_rating int,
        primary key ((user_id), publisher, isbn)
    )
""").one()

##### Validation:

In [22]:
session.execute("""DESCRIBE tables;""").all()


[Row(keyspace_name='books', type='table', name='book_ratings'),
 Row(keyspace_name='books', type='table', name='books'),
 Row(keyspace_name='books', type='table', name='books_rated_by_users'),
 Row(keyspace_name='books', type='table', name='users'),
 Row(keyspace_name='books', type='table', name='users_by_rated_books')]

## Step 6: Data ingestion

*Data Ingestion function*
`load_data` function reads data from a CSV file, splits it into batches, and inserts the data into a table using concurrency.

Batches and concurrency are important for improving the performance and efficiency of inserting large amounts of data into a database table.

When inserting data into a database table, it is possible to insert row by row, or to insert multiple rows at once as a batch. Inserting data in a batch greatly improves the performance of inserting data into the database, as it reduces the number of round trips to the database and can ensure that the data is consistent. This is more efficient than inserting one row at a time, which can be slow and can lead to unnecessary overhead.

Concurrency is important because it allows multiple threads to be used when inserting data into a database table. This improves performance by allowing multiple inserts to happen simultaneously, which can greatly increase the speed of inserting large amounts of data. Without concurrency, each insert operation would have to wait for the previous insert to complete, leading to a slower overall process.

In summary, using batches and concurrency can greatly improve the performance and efficiency of inserting large amounts of data into a database table, resulting in faster insert times and better use of system resources.



In [24]:
Files = namedtuple("Files", "users ratings books")

files = Files(
    users='Data/Users.csv',
    ratings='Data/Ratings.csv',
    books='Data/Books.csv'
)

# Specify data types for each column
books_data_types = {
    'ISBN': str,
    'Book-Title': str,
    'Book-Author': str,
    'Year-Of-Publication': int,  
    'Publisher': str,
    'Image-URL-S': str,
    'Image-URL-M': str,
    'Image-URL-L': str,
}

users_data_types = {
    'User-ID': int,
    'Location': str,
    'Age': float,  
}

ratings_data_types = {
    'User-ID': int,
    'ISBN': str,
    'Book-Rating': int,
}


df_books = pd.read_csv(files.books, dtype=books_data_types)
df_users = pd.read_csv(files.users, dtype=users_data_types)
df_ratings = pd.read_csv(files.ratings, dtype=ratings_data_types)

# clean whitespaces
df_books.columns = df_books.columns.str.strip()


> Note - If that CSV files were beggier, we had to work in chaunks in loading the df.

In [25]:
# Merge df_ratings with df_users on 'user_id'
df_merged = pd.merge(df_ratings, df_users, on='User-ID', how='inner')

# Merge the result with df_books on 'isbn'
df_merged = pd.merge(df_merged, df_books, on='ISBN', how='inner')

# Select the desired columns for the new DataFrame
df_merged = df_merged[['User-ID', 'Age', 'Location', 'ISBN', 'Book-Author', 'Year-Of-Publication', 'Publisher', 'Book-Rating']]

# Remove rows with NaN values except for the 'Age' column from the existing DataFrame
# this is useful since we have ratings for bools that not exist, this is less relevant for us
df_merged.dropna(subset=df_merged.columns.difference(['Age']), inplace=True)

#### Validate we only have NaN on age columns
Other columnd with NaN specify broken data

In [27]:
df_merged.isna().sum()

User-ID                     0
Age                    277835
Location                    0
ISBN                        0
Book-Author                 0
Year-Of-Publication         0
Publisher                   0
Book-Rating                 0
dtype: int64

In [36]:
# TODO: improve the data casting
# Helper function to construct the batch operation and then run concurrently
def df_load_data(df, insert_query, data_types, batch_size=100, max_batches=None):
    '''Load data from a DataFrame into a Cassandra table using batch inserts.
        Inputs:
        1. df: pandas DataFrame
            DataFrame containing the data to be inserted.
        2. insert_query: str
            CQL query for inserting data into the Cassandra table.
        3. data_types: list
            List of data types corresponding to the columns in the DataFrame.
        4. batch_size: int, optional (default=100)
            Number of rows in each batch.
        5. concurrency: int, optional (default=20)
            Number of threads for parallel execution.

        Returns: None
    '''
    
    # Helper functions
    def convert_data(row):
        # Convert a single row's data to the correct format according to the data_types
        converted_data = [data_type(value) if pd.notna(value) else data_type() for value, data_type in zip(row, data_types)]
        return tuple(converted_data)

    def import_batch(rows):
        # Build a batch statement for the current set of rows (single batch)
        batch = BatchStatement()
        for row in rows.itertuples(index=False):
            # Convert data types
            try:
                converted_row = convert_data(row)
                #converted_row = row
                #print(row)
                # Create the query and add it to the batch
                batch.add(insert_query, converted_row)
            except Exception as e:
                print(f"error on row: {row}, skipping")
        session.execute(batch, trace=True)

    # Function body
    # split DataFrame to batches
    df_split = [df.iloc[i:i + batch_size] for i in range(0, len(df), batch_size)][:max_batches]

    for batch in df_split:
        import_batch(batch)

# Example usage:
# load_data_concurrent(new_df, "INSERT INTO your_table_name (columns) VALUES (?, ?, ?)", [int, str, float])


In [19]:
df_books.columns

Index(['ISBN', 'Book-Title', 'Book-Author', 'Year-Of-Publication', 'Publisher',
       'Image-URL-S', 'Image-URL-M', 'Image-URL-L'],
      dtype='object')

##### Define the files

##### Investigate the content

In [None]:
def count_lines(file_name):
    with open(file_name, "r") as file:
        return len(file.readlines()) - 1

In [None]:
users_file_size = count_lines(files.users)
ratings_file_size = count_lines(files.ratings)
books_file_size = count_lines(files.books)

print(f"file sizes are - users: {users_file_size}, ratings: {ratings_file_size}, books: {books_file_size}")

file sizes are - users: 278859, ratings: 1149780, books: 271360


In [29]:
def num_of_records(table):
    count_query = f"SELECT COUNT(1) FROM {table}"
    result = session.execute(count_query)
    return result.one().count

#### Data Ingestion:
Now we will ingest the data using the `load_data_concurrent` fucntion and validate for each table that the number of rows matches the number in the CSV.

##### `books`:

In [23]:
## books table
# Insertion Query
books_query = """
            INSERT INTO books (ISBN, Book_Title, Book_Author, yop, Publisher, Image_URL_S, Image_URL_M, Image_URL_L)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
            """

# A list of data types for this table
books_data_types = [str, str, str, int, str, str, str, str] 

df_load_data(df_books, books_query, books_data_types)

##### Book ratings - Validate all inserted

In [31]:
# Row count validation
cass_len = num_of_records("books")

assert cass_len == df_books.shape[0]

##### `users`:

In [28]:
# Insertion Query
users_query = """
                INSERT INTO users (User_ID, Location, Age)
                VALUES (%s, %s, %s)
                """

# A list of data types for this table
users_data_types = [int, str, float]  # User_ID, Location, Age

# load data
df_load_data(df_users, users_query, users_data_types)

Validation:

In [32]:
# Row count validation
cass_len = num_of_records("users")

assert cass_len == df_users.shape[0]

In [33]:
df_merged.columns

Index(['User-ID', 'Age', 'Location', 'ISBN', 'Book-Author',
       'Year-Of-Publication', 'Publisher', 'Book-Rating'],
      dtype='object')

##### `books_rated_by_user`:

In [44]:
## books_rated_by_user table
# Insertion Query
books_rated_by_user_query = """
            INSERT INTO books_rated_by_user (User_ID, Age, Location, ISBN, Book_Author,
            yop, Publisher, Book_Rating)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
            """

# A list of data types for this table
brbu_data_types = [int, int, str, str, str, int, str, int]  

# load data
df_load_data(df_merged,books_rated_by_user_query, brbu_data_types)

##### Validation - Books table - all data is inserted

In [45]:
# Row count validation
cass_len = num_of_records("books_rated_by_user")

assert cass_len == df_merged.shape[0]

`users_rated_by_books`

In [46]:
## books table
# Insertion Query
users_by_rated_books_query = """
            INSERT INTO users_by_rated_books (User_ID, Age, Location, ISBN, Book_Author,
            yop, Publisher, Book_Rating)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
            """

# A list of data types for this table
brbu_data_types = [int, int, str, str, str, int, str, int]  

# load data
df_load_data(df_merged,users_by_rated_books_query, brbu_data_types)

Validation:

In [47]:
# Row count validation
cass_len = num_of_records("users_by_rated_books")

assert cass_len == df_merged.shape[0]

## Step 7: Data investigation

- Book distribution by publisher - J
- Book distribuiton by year - J
- Book distribuiton by author - J
- Avarage rating per user - J
- Avarage rating per publisher - J
- Popular Authors by Number of Ratings
- Books with Highest Ratings and Most Ratings - ?

- top rated book each year - B
- top rated books by age - B 
- top rated authors by age - B,

In [50]:
# count rankings per book

result = session.execute("""
        SELECT isbn, COUNT(book_rating)
        FROM books_rated_by_user
        GROUP BY isbn;
                """)

In [None]:
result.all()

In [None]:
# avg. rating per book
result = session.execute("""
        SELECT isbn, AVG(book_rating) as avg
        FROM books_rated_by_user
        GROUP BY isbn;
                """)
result.all()

In [None]:
# avg book rating per age
result = session.execute("""
        SELECT isbn, age, AVG(book_rating), COUNT(book_rating)
        FROM books.books_rated_by_user
        GROUP BY isbn;
                """)
result.all()

In [67]:
# find locations of users with most rankings
result = session.execute("""
        SELECT user_id, location, COUNT(book_rating) AS rating_count
        FROM users_by_rated_books
        GROUP BY user_id;
                """)

In [69]:
result.all(
)

[Row(user_id=4317, location='slfjk, alberta, brazil', rating_count=1),
 Row(user_id=121478, location='laporte, indiana, usa', rating_count=2),
 Row(user_id=176996, location='dunedin, otago, new zealand', rating_count=1),
 Row(user_id=143843, location='hamburg, norddeutschland, germany', rating_count=1),
 Row(user_id=123410, location='washington, dc, usa', rating_count=20),
 Row(user_id=180284, location='nashua, new hampshire, usa', rating_count=5),
 Row(user_id=230875, location='healdsburg, california, usa', rating_count=1),
 Row(user_id=62602, location='grimes, iowa, usa', rating_count=1),
 Row(user_id=118683, location='plaistow, new hampshire, usa', rating_count=8),
 Row(user_id=199679, location='normal, illinois, usa', rating_count=1),
 Row(user_id=228419, location='karlsruhe, baden-wuerttemberg, germany', rating_count=1),
 Row(user_id=64791, location='los angeles, california, usa', rating_count=1),
 Row(user_id=254343, location='new orleans, louisiana, usa', rating_count=1),
 Row(u

In [74]:
df_result = pd.DataFrame(result.all())

In [73]:
print(df_result)

Empty DataFrame
Columns: []
Index: []
