In [1]:
import csv
import uuid
import time 
from cassandra.cluster import Cluster
from sqlalchemy import create_engine, text
import kagglehub

### Importer les données

In [2]:
# Download latest version
path = kagglehub.dataset_download("shivamb/netflix-shows")

print("Path to dataset files:", path)

Path to dataset files: /home/charlotte/.cache/kagglehub/datasets/shivamb/netflix-shows/versions/5


In [3]:
# get le nom du fichier 
import os
files = os.listdir(path)
print(files)

['netflix_titles.csv']


In [4]:
import pandas as pd

filename = f"{path}/{files[0]}"
df = pd.read_csv(filename)  # Adapte le nom du fichier si nécessaire

In [5]:
from tabulate import tabulate

# Afficher le DataFrame en utilisant tabulate
print(tabulate(df.head(10), headers='keys', tablefmt='psql'))


+----+-----------+---------+----------------------------------+-------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------+--------------------+----------------+----------+------------+---------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|    | show_id   | type    | title                            | director                      | cast                                                                                                                                                

In [6]:
# types des colonnes
print(df.dtypes)

show_id         object
type            object
title           object
director        object
cast            object
country         object
date_added      object
release_year     int64
rating          object
duration        object
listed_in       object
description     object
dtype: object


In [7]:
# Convertir la colonne show_id en int en enlevant le préfixe 's' (plus simple pour gérer exactement le meme type de données en Cassandra et MySQL)
df['show_id'] = df['show_id'].str.replace('s', '').astype(int).astype(int)

In [8]:
print(df.dtypes)

show_id          int64
type            object
title           object
director        object
cast            object
country         object
date_added      object
release_year     int64
rating          object
duration        object
listed_in       object
description     object
dtype: object


In [9]:
# est ce qu'il y a des NaN ?
print(df.isna().sum())

show_id            0
type               0
title              0
director        2634
cast             825
country          831
date_added        10
release_year       0
rating             4
duration           3
listed_in          0
description        0
dtype: int64


In [10]:
# remplir les NaN
df['director'] = df['director'].fillna('')
df['cast'] = df['cast'].fillna('')
df['country'] = df['country'].fillna('')
df['date_added'] = df['date_added'].fillna('')
df['rating'] = df['rating'].fillna('')
df['duration'] = df['duration'].fillna('')

print(df.isna().sum())

show_id         0
type            0
title           0
director        0
cast            0
country         0
date_added      0
release_year    0
rating          0
duration        0
listed_in       0
description     0
dtype: int64


In [11]:
shows_table = """
CREATE TABLE shows (
    show_id INT PRIMARY KEY,
    title TEXT,
    director TEXT,
    cast TEXT,
    country TEXT,
    date_added TEXT,
    release_year INT,
    rating TEXT,
    duration TEXT,
    listed_in TEXT,
    description TEXT
)
"""

### Cassandra

In [12]:
# Connecte-toi au cluster Cassandra (adresse locale)
cluster = Cluster(['127.0.0.1'])  # Remplace par l'adresse IP si c'est un serveur distant
cassandra_session = cluster.connect()

# Vérifie la connexion en exécutant une commande simple
row = cassandra_session.execute("SELECT release_version FROM system.local").one()
print(f"Cassandra version: {row.release_version}")

Cassandra version: 4.0.15


In [13]:
# Lister les keyspaces existants 
keyspaces = cassandra_session.execute("SELECT keyspace_name FROM system_schema.keyspaces")
for ks in keyspaces:
    print(ks.keyspace_name)

system_auth
system_schema
netflix_rf_1
netflix
system_distributed
system
system_traces


In [14]:
def create_table_cassandra():
    cassandra_session.execute("DROP KEYSPACE IF EXISTS netflix;")

    cassandra_session.execute("""
    CREATE KEYSPACE IF NOT EXISTS netflix
    WITH REPLICATION = {
        'class' : 'SimpleStrategy',
        'replication_factor' : 1
    }
    """)

    cassandra_session.set_keyspace('netflix')

    cassandra_session.execute("DROP TABLE IF EXISTS shows")

    cassandra_session.execute(shows_table)

create_table_cassandra()

### MySQL

In [15]:
# Configurer la connexion à la base de données
def create_table_mysql():
    username = 'user'
    password = 'password'
    database = 'TDLE'

    # Créer un moteur SQLAlchemy pour la connexion
    mysql_engine = create_engine(f'mysql+mysqlconnector://{username}:{password}@localhost:3306/{database}')

    with mysql_engine.connect() as conn:
        conn.execute(text("DROP TABLE IF EXISTS shows;"))
        conn.execute(text(shows_table))

    return mysql_engine

mysql_engine = create_table_mysql()

## CRUD : Create, Read, Update, Delete
- Create (*Insert*) : Insérer des données dans la base de données.
- Read (*Select*) : Récupérer des données.
- Update (*Update*) : Modifier des données existantes.
- Delete (*Delete*) : Effacer des données.

In [16]:
df['show_id'] = df['show_id'].astype(int)
df['release_year'] = df['release_year'].astype(int)

### 1. Create 

In [17]:
# 1. Test d'insertion
def test_insert():
    # Cassandra
    insert_query = cassandra_session.prepare("INSERT INTO shows (show_id, title, director, cast, country, date_added, release_year, rating, duration, listed_in, description) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
    start_time = time.time()
    for _, row in df.iterrows():
        cassandra_session.execute(insert_query, (row['show_id'], row['title'], row['director'], row['cast'], row['country'], row['date_added'], row['release_year'], row['rating'], row['duration'], row['listed_in'], row['description']))
    cassandra_time = time.time() - start_time

    # MySQL
    insert_query_mysql = """
    INSERT INTO shows (show_id, title, director, cast, country, date_added, release_year, rating, duration, listed_in, description) 
    VALUES (:show_id, :title, :director, :cast, :country, :date_added, :release_year, :rating, :duration, :listed_in, :description)
    ON DUPLICATE KEY UPDATE
    title = VALUES(title),
    director = VALUES(director),
    cast = VALUES(cast),
    country = VALUES(country),
    date_added = VALUES(date_added),
    release_year = VALUES(release_year),
    rating = VALUES(rating),
    duration = VALUES(duration),
    listed_in = VALUES(listed_in),
    description = VALUES(description);
    """
    with mysql_engine.connect() as conn:
        conn.execute(text("USE TDLE;"))
        start_time = time.time()
        for _, row in df.iterrows():
            conn.execute(text(insert_query_mysql), row.to_dict())

        conn.commit()  # Effectuer le commit : : Le commit fait partie intégrante du processus d'insertion dans une base de données relationnelle. Si les insertions ne sont pas validées par un commit, les données ne seront pas enregistrées. Par conséquent, la durée de cette opération est significative pour évaluer la performance globale de la transaction.
        mysql_time = time.time() - start_time


    return cassandra_time, mysql_time

In [18]:
cassandra_time_create, mysql_time_create = test_insert()
print(f"Insertion time: Cassandra {cassandra_time_create:.5f}s, MySQL {mysql_time_create:.5f}s")

Insertion time: Cassandra 2.83067s, MySQL 2.38815s


# STOP ICI : la suite est à faire 

### 2. Read

In [19]:
# Récupération des shows réalisés après 2015 
# TODO : Peut être changer la requête
def cassandra_read():
    select_query = "SELECT * FROM shows WHERE release_year > 2015 ALLOW FILTERING;"
    start_time = time.time()
    rows = cassandra_session.execute(select_query)
    cassandra_time = time.time() - start_time

    #cassandra_results = [row for row in rows]
    return cassandra_time #, len(cassandra_results)

def mysql_read():
    select_query_mysql = "SELECT * FROM shows WHERE release_year > 2015;"

    start_time = time.time()
    with mysql_engine.connect() as conn:
        conn.execute(text("USE TDLE;"))
        rows = conn.execute(text(select_query_mysql)).mappings().all()
    mysql_time = time.time() - start_time

    #mysql_results = [row for row in rows]
    return mysql_time #, len(mysql_results)

In [20]:
cassandra_time_read = cassandra_read()
mysql_time_read = mysql_read()

print(f"Read time: Cassandra {cassandra_time_read:.5f}s, MySQL {mysql_time_read:.5f}s")

Read time: Cassandra 0.04656s, MySQL 0.01648s


### 3. Update

In [21]:
# Changement du rating des shows produient en 2020 en "UpdatedRating"
# TODO : Peut être changer la requête
def cassandra_update():
    update_query = "UPDATE shows SET rating = ? WHERE show_id = ?"
    prepared_update = cassandra_session.prepare(update_query)

    select_query = "SELECT show_id FROM shows WHERE release_year = 2020 ALLOW FILTERING;"
    rows = cassandra_session.execute(select_query)

    start_time = time.time()
    for row in rows:
        cassandra_session.execute(prepared_update, ("UpdatedRating", row.show_id))
    cassandra_time = time.time() - start_time

    return cassandra_time

def mysql_update():
    update_query_mysql = "UPDATE shows SET rating = :rating WHERE release_year = :release_year;"

    start_time = time.time()
    with mysql_engine.connect() as conn:
        conn.execute(text("USE TDLE;"))
        conn.execute(text(update_query_mysql), {"rating": "UpdatedRating", "release_year": 2020})
    mysql_time = time.time() - start_time

    return mysql_time

In [22]:
cassandra_time_update = cassandra_update()
mysql_time_update = mysql_update()

print(f"Update time: Cassandra {cassandra_time_update:.5f}s, MySQL {mysql_time_update:.5f}s")

Update time: Cassandra 0.20917s, MySQL 0.02296s


### 4. Delete

In [23]:
# Suppression des shows de 2015
def cassandra_delete():
    select_query = "SELECT show_id FROM shows WHERE release_year = 2015 ALLOW FILTERING;"
    rows = cassandra_session.execute(select_query)

    delete_query = cassandra_session.prepare("DELETE FROM shows WHERE show_id = ?")

    start_time = time.time()
    for row in rows:
        cassandra_session.execute(delete_query, [row.show_id])
    cassandra_time = time.time() - start_time

    return cassandra_time

def mysql_delete():
    delete_query_mysql = "DELETE FROM shows WHERE release_year = :release_year;"

    start_time = time.time()
    with mysql_engine.connect() as conn:
        conn.execute(text("USE TDLE;"))
        conn.execute(text(delete_query_mysql), {"release_year": 2015})
    mysql_time = time.time() - start_time

    return mysql_time

In [24]:
cassandra_time_delete = cassandra_delete()
mysql_time_delete = mysql_delete()
print(f"Delete time: Cassandra {cassandra_time_delete:.5f}s, MySQL {mysql_time_delete:.5f}s")

Delete time: Cassandra 0.15774s, MySQL 0.01377s


### Résultats CRUD

In [25]:
import plotly.graph_objects as go

In [26]:
crud_operations = ['Create', 'Read', 'Update', 'Delete']

cassandra_times = [cassandra_time_create, cassandra_time_read, cassandra_time_update, cassandra_time_delete]
mysql_times = [mysql_time_create, mysql_time_read, mysql_time_update, mysql_time_delete]

fig = go.Figure()
fig.add_trace(go.Bar(x=crud_operations, y=cassandra_times, name='Cassandra', text=[f"{t:.2f}s" for t in cassandra_times], textposition='auto'))
fig.add_trace(go.Bar(x=crud_operations, y=mysql_times, name='MySQL', text=[f"{t:.2f}s" for t in mysql_times], textposition='auto'))

fig.update_layout(title="Comparaison des performances CRUD entre Cassandra et MySQL", xaxis_title="Opérations CRUD", yaxis_title="Temps d'exécution (secondes)", barmode='group', template='plotly_white', legend=dict(title="Système de base de données"))

fig.show()

Les opérations CRUD sur mySQL sont toujours plus rapide que sur Cassandra.

In [27]:
# TODO : Mieux expliquer les résultats

## Optimisation par indexation

### 1. Ajout d'index

In [28]:
def cassandra_create_indexes(columns):
    start_time = time.time()
    for column in columns:
        index_query = f"CREATE INDEX {column}_idx ON shows ({column});"
        cassandra_session.execute(index_query)
    cassandra_time = time.time() - start_time
    return cassandra_time

def mysql_create_indexes(columns):
    start_time = time.time()
    with mysql_engine.connect() as conn:
        conn.execute(text("USE TDLE;"))
        for column in columns:
            index_query = f"CREATE INDEX {column}_idx ON shows ({column});"
            conn.execute(text(index_query))
    mysql_time = time.time() - start_time
    return mysql_time

Dans notre cas, nous avons surtout utilisé la colonnes release_year pour faire les sélections, donc c'est la seule colonne que nous indexons.  
La colonne show_id est déjà indexé implicitement comme c'est la clé primaire.

In [29]:
# TODO : Revoir les colonnes à indexer si on change les fonctions du CURL
# Toutes les colonnes : ["show_id", "title", "director", "cast", "country", "date_added", "release_year", "rating", "duration", "listed_in", "description"]
columns_to_index = ["release_year"]
cassandra_time_index = cassandra_create_indexes(columns_to_index)
mysql_time_index = mysql_create_indexes(columns_to_index)

print(f"Indexation time: Cassandra {cassandra_time_index:.5f}s, MySQL {mysql_time_index:.5f}s")

Indexation time: Cassandra 0.11664s, MySQL 0.08643s


Indexation des colonnes utilisées dans les filtres, jointures et ordonnancement (ORDER BY).  
Seulement ces colonnes sinon le coût de stockage est trop élevé pour rien.

### 2. Opérations CRUD après indexation

In [30]:
# Recréation des tables pour avoir toutes les données
# TODO : Peut être indexer après l'ajout
def recreate_tables_with_indexes():
    create_table_cassandra()
    cassandra_create_indexes(["release_year"])

    global mysql_engine
    mysql_engine = create_table_mysql()
    mysql_create_indexes(["release_year"])

    cassandra_time_create, mysql_time_create = test_insert()

    return cassandra_time_create, mysql_time_create

In [31]:
cassandra_time_create_idx, mysql_time_create_idx = recreate_tables_with_indexes()
print(f"Create time: Cassandra {cassandra_time_create_idx:.5f}s, MySQL {mysql_time_create_idx:.5f}s")

cassandra_time_read_idx = cassandra_read()
mysql_time_read_idx = mysql_read()
print(f"Read time after indexing: Cassandra {cassandra_time_read:.5f}s, MySQL {mysql_time_read:.5f}s")

cassandra_time_update_idx = cassandra_update()
mysql_time_update_idx = mysql_update()
print(f"Update time after indexing: Cassandra {cassandra_time_update:.5f}s, MySQL {mysql_time_update:.5f}s")

cassandra_time_delete_idx = cassandra_delete()
mysql_time_delete_idx = mysql_delete()
print(f"Delete time after indexing: Cassandra {cassandra_time_delete:.5f}s, MySQL {mysql_time_delete:.5f}s")


Create time: Cassandra 3.22652s, MySQL 2.55125s
Read time after indexing: Cassandra 0.04656s, MySQL 0.01648s
Update time after indexing: Cassandra 0.20917s, MySQL 0.02296s
Delete time after indexing: Cassandra 0.15774s, MySQL 0.01377s


### Résultats CRUD après indexation

In [32]:
crud_operations = ['Create', 'Read', 'Update', 'Delete']
cassandra_times_after = [cassandra_time_create_idx, cassandra_time_read, cassandra_time_update, cassandra_time_delete]
mysql_times_after = [mysql_time_create_idx, mysql_time_read, mysql_time_update, mysql_time_delete]

fig = go.Figure()
fig.add_trace(go.Bar(x=crud_operations, y=cassandra_times_after, name='Cassandra (Après Indexation)', text=[f"{t:.2f}s" for t in cassandra_times_after], textposition='auto'))
fig.add_trace(go.Bar(x=crud_operations, y=mysql_times_after, name='MySQL (Après Indexation)', text=[f"{t:.2f}s" for t in mysql_times_after], textposition='auto'))

fig.update_layout(title="Comparaison des performances CRUD après indexation entre Cassandra et MySQL", xaxis_title="Opérations CRUD", yaxis_title="Temps d'exécution (secondes)", barmode='group', template='plotly_white', legend=dict(title="Système de base de données"))

fig.show()


In [33]:
crud_operations = ['Create', 'Read', 'Update', 'Delete']

cassandra_times_after = [cassandra_time_create_idx, cassandra_time_read, cassandra_time_update, cassandra_time_delete]
mysql_times_after = [mysql_time_create_idx, mysql_time_read, mysql_time_update, mysql_time_delete]
cassandra_times = [cassandra_time_create, cassandra_time_read, cassandra_time_update, cassandra_time_delete]
mysql_times = [mysql_time_create, mysql_time_read, mysql_time_update, mysql_time_delete]


# Cassandra
fig_cassandra = go.Figure()

fig_cassandra.add_trace(go.Bar(x=crud_operations, y=cassandra_times, name='Cassandra (Avant Indexation)', text=[f"{t:.2f}s" for t in cassandra_times], textposition='auto'))
fig_cassandra.add_trace(go.Bar(x=crud_operations, y=cassandra_times_after, name='Cassandra (Après Indexation)', text=[f"{t:.2f}s" for t in cassandra_times_after], textposition='auto'))

fig_cassandra.update_layout(title="Comparaison des performances CRUD de Cassandra (Avant vs Après Indexation)", xaxis_title="Opérations CRUD", yaxis_title="Temps d'exécution (secondes)", barmode='group', template='plotly_white')
fig_cassandra.show()

# mySQL
fig_mysql = go.Figure()

fig_mysql.add_trace(go.Bar(x=crud_operations, y=mysql_times, name='MySQL (Avant Indexation)', text=[f"{t:.2f}s" for t in mysql_times], textposition='auto'))
fig_mysql.add_trace(go.Bar(x=crud_operations, y=mysql_times_after, name='MySQL (Après Indexation)', text=[f"{t:.2f}s" for t in mysql_times_after], textposition='auto'))

fig_mysql.update_layout(title="Comparaison des performances CRUD de MySQL (Avant vs Après Indexation)", xaxis_title="Opérations CRUD", yaxis_title="Temps d'exécution (secondes)", barmode='group', template='plotly_white')
fig_mysql.show()

Création plus longue après indexation.  
Pas de différence entre les requêtes.

In [34]:
# TODO : Tester avec des requêtes plus complexes
# TODO : Utiliser un dataset plus grand
# TODO : Ajouter des colonnes à indexer

## Optimisation par réplication/partitionnement

In [35]:
# Recréation des tables pour avoir toutes les données
def recreate_tables():
    create_table_cassandra()

    global mysql_engine
    mysql_engine = create_table_mysql()

    cassandra_time_create, mysql_time_create = test_insert()

    return cassandra_time_create, mysql_time_create

cassandra_time_create, mysql_time_create = recreate_tables()

### Cassandra - Réplication

In [36]:
def create_keyspace_with_replication(replication_factor):
    cassandra_session.execute(f"DROP KEYSPACE IF EXISTS netflix;")

    # TODO : Tester d'autres stratégies de réplication
    cassandra_session.execute(f"""
    CREATE KEYSPACE netflix
    WITH REPLICATION = {{
        'class': 'SimpleStrategy',
        'replication_factor': {replication_factor}
    }};
    """)
    cassandra_session.set_keyspace('netflix')
    print(f"Keyspace créé avec replication_factor = {replication_factor}")

### Cassandra - Test du CRUD

In [37]:
replication_factors = [1, 2, 3] #,4, 5, 6, 7, 8, 9, 10]
results = []

for rf in replication_factors:
    print(f"Test avec replication_factor = {rf} :")
    create_keyspace_with_replication(rf)
    create_table_cassandra()
    cassandra_time_create, _ = test_insert()
    print(f" - Insertion time: {cassandra_time_create:.5f}s")
    cassandra_time_read = cassandra_read()
    print(f" - Read time: {cassandra_time_read:.5f}s")
    cassandra_time_update = cassandra_update()
    print(f" - Update time: {cassandra_time_update:.5f}s")
    cassandra_time_delete = cassandra_delete()
    print(f" - Delete time: {cassandra_time_delete:.5f}s")

    results.append({
        'Replication Factor': rf,
        'Insertion Time': cassandra_time_create,
        'Read Time': cassandra_time_read,
        'Update Time': cassandra_time_update,
        'Delete Time': cassandra_time_delete
    })

results_df = pd.DataFrame(results)

Test avec replication_factor = 1 :
Keyspace créé avec replication_factor = 1
 - Insertion time: 3.06901s
 - Read time: 0.05813s
 - Update time: 0.22232s
 - Delete time: 0.13981s
Test avec replication_factor = 2 :
Keyspace créé avec replication_factor = 2
 - Insertion time: 2.96056s
 - Read time: 0.05359s
 - Update time: 0.26179s
 - Delete time: 0.12426s
Test avec replication_factor = 3 :
Keyspace créé avec replication_factor = 3
 - Insertion time: 3.78335s
 - Read time: 0.06329s
 - Update time: 0.28020s
 - Delete time: 0.17769s


### Cassandra - Résultats

In [38]:
fig = go.Figure()

for rf in results_df['Replication Factor']:
    fig.add_trace(go.Bar(x=['Insertion Time', 'Read Time', 'Update Time', 'Delete Time'], y=results_df[results_df['Replication Factor'] == rf].iloc[0, 1:].values, name=f'Replication Factor {rf}', text=[f"{t:.2f}s" for t in results_df[results_df['Replication Factor'] == rf].iloc[0, 1:].values], textposition='auto'))

fig.update_layout(title="Temps d'exécution des opérations CRUD par facteur de réplication", xaxis_title="Opérations CRUD", yaxis_title="Temps d'exécution (secondes)", barmode='group', template='plotly_white', legend=dict(title="Replication Factor"))

fig.show()

## Fin du notebook

In [40]:
# Tout fermer
cassandra_session.shutdown() # Fermer la connexion Cassandra
cluster.shutdown() # Fermer la connexion Cassandra
mysql_engine.dispose() # Fermer la connexion MySQL