In [1]:
import pandas as pd
from cassandra.cluster import Cluster, BatchStatement


In [2]:
student_info = pd.read_csv('../data/studentInfo.csv')
student_vle = pd.read_csv('../data/studentVle.csv')
student_assessment = pd.read_csv('../data/studentAssessment.csv')
student_registration = pd.read_csv('../data/studentRegistration.csv')

In [3]:
# Data cleaning: removing lines with missing values
student_info = student_info.dropna()
student_vle = student_vle.dropna()
student_assessment = student_assessment.dropna()
studen_registration = student_registration.dropna()

In [4]:
# Create a new column indicating whether a student has withdrawn
student_info['has_withdrawn'] = student_info['final_result'] == 'Withdrawn'

In [5]:
# Step 3: Create Cassandra tables
cluster = Cluster(['cassandra'])  # connect to your Cassandra instance
session = cluster.connect()

In [6]:
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS oulad 
    WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
""")
session.set_keyspace('oulad')

In [7]:
# Define table for student_info
session.execute("""
    CREATE TABLE IF NOT EXISTS student_info (
        id_student int,
        code_module text,
        code_presentation text,
        gender text,
        region text,
        highest_education text,
        imd_band text,
        age_band text,
        num_of_prev_attempts int,
        studied_credits int,
        disability text,
        final_result text,
        has_withdrawn boolean,
        PRIMARY KEY(id_student, code_module, code_presentation)
    )
""")

<cassandra.cluster.ResultSet at 0x7f1456daee60>

In [8]:
# Create table for student_vle
session.execute("""
    CREATE TABLE IF NOT EXISTS student_vle (
        code_module text,
        code_presentation text,
        id_student int,
        id_site int,
        date int,
        sum_click int,
        PRIMARY KEY (id_student, id_site, date)
    )
""")

<cassandra.cluster.ResultSet at 0x7f1456be2e60>

In [9]:
# Create table for student_assessment
session.execute("""
    CREATE TABLE IF NOT EXISTS student_assessment (
        id_assessment int,
        id_student int,
        date_submitted int,
        is_banked int,
        score int,
        PRIMARY KEY (id_assessment, id_student)
    )
""")

<cassandra.cluster.ResultSet at 0x7f1456be1e40>

In [10]:
# Create table for student_registration
session.execute("""
    CREATE TABLE IF NOT EXISTS student_registration (
        code_module text,
        code_presentation text,
        id_student int,
        date_registration int,
        date_unregistration int,
        PRIMARY KEY (code_module, code_presentation, id_student)
    )
""")

<cassandra.cluster.ResultSet at 0x7f1456be2e30>

In [11]:
# Function to insert data into Cassandra
def insert_into_student_info(row):
    prepared = session.prepare("INSERT INTO student_info (id_student, code_module, code_presentation, gender, region, highest_education, imd_band, age_band, num_of_prev_attempts, studied_credits, disability, final_result, has_withdrawn) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
    session.execute(prepared, (row['id_student'], row['code_module'], row['code_presentation'], row['gender'], row['region'], row['highest_education'], row['imd_band'], row['age_band'], row['num_of_prev_attempts'], row['studied_credits'], row['disability'], row['final_result'], row['has_withdrawn']))

# Function to insert data into student_vle
def insert_into_student_vle(row):
    prepared = session.prepare("INSERT INTO student_vle (code_module, code_presentation, id_student, id_site, date, sum_click) VALUES (?, ?, ?, ?, ?, ?)")
    session.execute(prepared, (row['code_module'], row['code_presentation'], row['id_student'], row['id_site'], row['date'], row['sum_click']))

# Function to insert data into student_assessment
def insert_into_student_assessment(row):
    prepared = session.prepare("INSERT INTO student_assessment (id_assessment, id_student, date_submitted, is_banked, score) VALUES (?, ?, ?, ?, ?)")
    session.execute(prepared, (row['id_assessment'], row['id_student'], row['date_submitted'], row['is_banked'], row['score']))

# Function to insert data into student_registration
def insert_into_student_registration(row):
    prepared = session.prepare("INSERT INTO student_registration (code_module, code_presentation, id_student, date_registration, date_unregistration) VALUES (?, ?, ?, ?, ?)")
    session.execute(prepared, (row['code_module'], row['code_presentation'], row['id_student'], row['date_registration'], row['date_unregistration']))

In [12]:
# Insert data into Cassandra using apply function
student_info.apply(insert_into_student_info, axis=1)

0        None
1        None
2        None
3        None
4        None
         ... 
32588    None
32589    None
32590    None
32591    None
32592    None
Length: 31482, dtype: object

In [None]:
student_vle.apply(insert_into_student_vle, axis=1)

In [None]:
student_assessment.apply(insert_into_student_assessment, axis=1)

In [None]:
student_registration.apply(insert_into_student_registration, axis=1)

In [None]:
# # Create table for student_vle
# session.execute("""
#     CREATE TABLE IF NOT EXISTS student_vle (
#         code_module text,
#         code_presentation text,
#         id_student int,
#         id_site int,
#         date int,
#         sum_click int,
#         PRIMARY KEY (id_student, id_site, date)
#     )
# """)

# def insert_into_student_vle(row):
#     prepared = session.prepare("INSERT INTO student_vle (code_module, code_presentation, id_student, id_site, date, sum_click) VALUES (?, ?, ?, ?, ?, ?)")
#     session.execute(prepared, (row['code_module'], row['code_presentation'], row['id_student'], row['id_site'], row['date'], row['sum_click']))
    
    
# student_vle.apply(insert_into_student_vle, axis=1)

In [None]:
# # Create table for student_assessment
# session.execute("""
#     CREATE TABLE IF NOT EXISTS student_assessment (
#         id_assessment int,
#         id_student int,
#         date_submitted int,
#         is_banked int,
#         score int,
#         PRIMARY KEY (id_assessment, id_student)
#     )
# """)

# # Function to insert data into student_assessment
# def insert_into_student_assessment(row):
#     prepared = session.prepare("INSERT INTO student_assessment (id_assessment, id_student, date_submitted, is_banked, score) VALUES (?, ?, ?, ?, ?)")
#     session.execute(prepared, (row['id_assessment'], row['id_student'], row['date_submitted'], row['is_banked'], row['score']))
    
# student_assessment.apply(insert_into_student_assessment, axis=1)

In [None]:
# # Create table for student_registration
# session.execute("""
#     CREATE TABLE IF NOT EXISTS student_registration (
#         code_module text,
#         code_presentation text,
#         id_student int,
#         date_registration int,
#         date_unregistration int,
#         PRIMARY KEY (code_module, code_presentation, id_student)
#     )
# """)

# # Function to insert data into student_registration
# def insert_into_student_registration(row):
#     prepared = session.prepare("INSERT INTO student_registration (code_module, code_presentation, id_student, date_registration, date_unregistration) VALUES (?, ?, ?, ?, ?)")
#     session.execute(prepared, (row['code_module'], row['code_presentation'], row['id_student'], row['date_registration'], row['date_unregistration']))
    
# student_registration.apply(insert_into_student_registration, axis=1) 


In [None]:
# Create table
session.execute("""
    CREATE TABLE IF NOT EXISTS student_vle_withdrawn (
        id_student int,
        code_module text,
        code_presentation text,
        id_site int,
        date int,
        sum_click int,
        has_withdrawn boolean,
        PRIMARY KEY (id_student, code_module, code_presentation, id_site, date)
    )
""")

# Merge two dataframes
merged_data = pd.merge(student_vle, student_info[['id_student', 'code_module', 'code_presentation', 'final_result']], on=['id_student', 'code_module', 'code_presentation'], how='left')
merged_data['has_withdrawn'] = merged_data['final_result'] == 'Withdrawn'

# Filter merged_data for withdrawn students
withdrawn_student_data = merged_data[merged_data['has_withdrawn']]

# Insert data into table
batch = BatchStatement()
query = session.prepare("""
    INSERT INTO student_vle_withdrawn (id_student, code_module, code_presentation, id_site, date, sum_click, has_withdrawn)
    VALUES (?, ?, ?, ?, ?, ?, ?)
""")

for _, row in withdrawn_student_data.iterrows():
    batch.add(query, (int(row['id_student']), row['code_module'], row['code_presentation'], int(row['id_site']), int(row['date']), int(row['sum_click']), bool(row['has_withdrawn'])))
    if len(batch) == 50:  # Execute every 50 statements
        session.execute(batch)
        batch.clear()

# Execute the rest of the batch
if len(batch) > 0:
    session.execute(batch)


In [None]:
# Close the connection
cluster.shutdown()