# Consultar datos desde MySQL e cargalos en Cassandra (Notebook 2)

## Crea un segundo notebook

Este caderno (```2.CargarCassandra.ipynb```) xestiona este paso do exercicio.

## Consulta a táboa en MySQL e carga os datos nun DataFrame de pandas.

In [None]:
# Requerimentos da contorna

%pip install mysql-connector-python 
%pip install pandas
%pip install cassandra-driver

# !pip install asyncore
# !pip install --no-cache-dir --no-binary :all: cassandra-driver
# !pip install cassandra-driver --no-cache-dir --no-binary cassandra-driver
# !pip install cassandra-driver --no-binary :all:

In [None]:
# Conexión á BD MySQL

import mysql.connector


# Nomes e ferrollos da Conexión
HOST = "127.0.0.1"
USER = "root"
PASSWORD = "root_password" 
DATABASE = "data_pipeline_db"

# Conectamos coa nosa BD
try:
    mydb = mysql.connector.connect(
        host=HOST,
        user=USER,
        password=PASSWORD,
        database=DATABASE
    )
    cursor = mydb.cursor()
    print("Afeixado á BD MySQL '%s' como %s@%s" %(DATABASE, USER,HOST))

# Xestor groseiro de erros

except mysql.connector.Error as err:
    print(f"Erro de MySQL: {err}")
    if '1049' in str(err):
         print("Non atopo a base de datos 'data_pipeline_db'.")

In [None]:
# Copiamos o contido da táboa 'books' en cursor.

import pandas as pd

sql_query = "SELECT * FROM books"
cursor.execute(sql_query)
query_data = cursor.fetchall()
column_names = [i[0] for i in cursor.description]


# Creamos o Dataframe 'df_books' cos datos que vimos de gardar.

df_books = pd.DataFrame(query_data, columns=column_names)

# Amosámo-los resultados.
print("DataFrame 'df_books' arrebolado. Total rexistros:", len(df_books))
print("Amosando os primeiros 5 rexistros:")
print(df_books.head())


In [None]:
# Se xa temos o DataFrame listo, podemos liberar os seguintes recursos

cursor.close()
mydb.close()

print("Recursos de Cursor liberados.\nConexión a MySQL rematada.")

# Tamén podemos adescarregar o contedor de MySQL
#  Mais non podemos facelo automáticamente porque o comando !sudo docker stop mysql_db
#   solicítanos inserir a contrasinal do usuario actual membro do grupo 'sudoers'.
print("Se desexa remata-la execución de MySQL para aforrar recursos, abra unha terminal e execute:")
print("  sudo docker stop mysql_db")


## Conéctate á base de datos Cassandra

In [None]:
from cassandra.cluster import Cluster

# Supoñemos o Cluster Cassandra na computadora anfitrioa onde está este Notebook (localhost)
#   segundo o arquivo docker-compose.yml empregamos o porto predeterminado (default) 

cluster = Cluster(['127.0.0.1'], port=9042)

try:
    # 2. Conectar e crear a sesión
    session = cluster.connect()
    
    # 3. Proba de conexión: obter o nome do cluster
    row = session.execute("SELECT cluster_name FROM system.local").one()
    print(f"Conectado ó cluster: {row.cluster_name}")

except Exception as e:
    print(f"Non fun quen de conectar a Cassandra. Erro: {e}")


# Deixamos Aberta a conexion ó cluster Cassandra.


## Crea un keyspace e unha táboa cun esquema compatible cos teus datos

In [None]:
# Creamos o Chaveiro ("Keyspace")
session.execute("""
CREATE KEYSPACE IF NOT EXISTS chaveiro
    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}
""")

# Seleccionamos o chaveiro 
session.set_keyspace('chaveiro')

# Creamos a táboa co seguinte esquema ("Schema")
query_crear_taboa = """
    CREATE TABLE IF NOT EXISTS libros (
        isbn bigint PRIMARY KEY,
        title text,
        author text,
        year_of_publication int,
        publisher text
    )
"""

try:
    session.execute(query_crear_taboa)
    print("Creada Táboa 'libros' no keyspace 'chaveiro'.")
except Exception as e:
    print(f"Erro ao definir o esquema: {e}")


## Inserta os datos do DataFrame en Cassandra

In [None]:
# Libraría para arrebolamento concorrente
from cassandra.concurrent import execute_concurrent_with_args

# Comando de inserción
sentenza_insert = session.prepare("""
    INSERT INTO libros (isbn, title, author, year_of_publication, publisher)
    VALUES (?, ?, ?, ?, ?)
""")

# Mudamos os datos do DataFrame en tuplas empregando 'itertuples'
datos_libros = list(df_books.itertuples(index=False, name=None))

# Arrebolamos de xeito concorrente
#  gardamos as respostas da operación en 'rexistro'
# NB. "concurrency=20" pódese borrar se hai recursos de computación
rexistro = execute_concurrent_with_args(session, sentenza_insert, datos_libros, concurrency=20)

# print(f"Arrebolamento concorrente de {len(datos_libros)} libros rematado!")

# Percorrémo-lo rexistro para acumula-los acertos e erros das insercións
atari = 0
hazure = 0

for ben, mal in rexistro:
    if ben == True:
        atari +=1
    else:
        hazure +=1

# Amosámo-los resultados e revisamos integridade
print(f"Arrebolamento concorrente de {len(datos_libros)} libros rematado!")
print(f"Filas inseridas: {atari}. Erros: {hazure} ")

if((hazure == 0) and (atari == len(datos_libros))):
    print("Todas as filas foron inseridas sen erros.")



In [None]:
# Facemos comprobacións básicas contra a BD de Cassandra
# Como teño pouca memoria, só vou consultar o primeiro e derradeiro rexistro

session.default_timeout = 30

isbn_primeiro = "0195153448"  
isbn_derradeiro = "0767409752" 

def comprobar_libro(isbn):
    try:
        res = session.execute(f"SELECT title FROM libros WHERE isbn = '{isbn}'").one()
        if res:
            print(f"✅ Atopado: {res.title}")
        else:
            print(f"❌ Non atopado: {isbn}")
    except Exception as e:
        print(f"Erro na consulta: {e}")

print("Comprobando os rexistros primeiro e derradeiro:")
comprobar_libro(isbn_primeiro)
comprobar_libro(isbn_derradeiro)



In [None]:
# Se temos recursos, podemos conta-los rexistros gardados na BD de Casandra

# Damos máis tempo para completar (timeout), 60 segundos
session.default_timeout = 60 
resultado = session.execute("SELECT COUNT(*) FROM libros")
print(f"Total de rexistros na BD: {resultado.one()[0]}")