# Obiceiurile studenților și performanța academică

## 1. Introducere
Setul de date [Student Habits vs Academic Performance](https://www.kaggle.com/datasets/jayaantanaath/student-habits-vs-academic-performance?resource=download) disponibil pe platforma Kaggle explorează relația dintre obiceiurile studenților și performanța lor academică.

Datele pe care le vom folosi:

| Coloana | Descriere |
| --- | --- |
| *student_id* | Identificatorul unic al studentului |
| *age (17-24)* | Vârstă |
| *gender (Female, Male, Other)* | Gen |
| *study_hours_per_day (0-8.3)* | Media orelor de studiu pe zi |
| *social_media_hours(0-7.2)* | Media orelor petrecute pe social media |
| *netflix_hours (0-5.4)* | Media orelor petrecute pe Netflix |
| *part_time_job (Yes/No)* | Existența unui part-time job |
| *attendance_percentage (56-100%)* | Procentajul de prezență la cursuri |
| *sleep_hours (3.2-10)* | Media orelor de somn pe zi |
| *exercise_frequency (0-6)* | Frecvența exercițiilor fizice pe săptămână |
| *exam_score (0-100)* | Scorul la examnul final |

În cadrul acestui proiect vom analiza cum stilul de viață al studenților (precum timpul petrecut în fața ecranului, prezența la cursuri, somnul și activitatea fizică) le afectează performanța academică.


---






Începem prin a încărca și vizualiza datele din Kaggle

In [None]:
import kagglehub
from pyspark.sql import SparkSession

# Descărcăm dataset-ul
path = kagglehub.dataset_download("jayaantanaath/student-habits-vs-academic-performance")

# Creăm sesiunea
spark = SparkSession.builder.appName('Habits').getOrCreate()

# Încarcăm fișierul CSV din Kaggle
df = spark.read.option("header", True).csv(path + "/student_habits_performance.csv")

# Afișăm primele rânduri
df.show(3)

# Statistici descriptive pentru a înțelege mai bine datele
df.describe().show()

+----------+---+------+-------------------+------------------+-------------+-------------+---------------------+-----------+------------+------------------+------------------------+----------------+--------------------+-----------------------------+----------+
|student_id|age|gender|study_hours_per_day|social_media_hours|netflix_hours|part_time_job|attendance_percentage|sleep_hours|diet_quality|exercise_frequency|parental_education_level|internet_quality|mental_health_rating|extracurricular_participation|exam_score|
+----------+---+------+-------------------+------------------+-------------+-------------+---------------------+-----------+------------+------------------+------------------------+----------------+--------------------+-----------------------------+----------+
|     S1000| 23|Female|                0.0|               1.2|          1.1|           No|                 85.0|        8.0|        Fair|                 6|                  Master|         Average|                   

## 2.1. Procesarea datelor

Utilizăm dataframe pentru a păstra doar coloanele care ne interesează.

In [None]:
# selectăm coloanele
columns = ["student_id", "age", "gender", "study_hours_per_day", "social_media_hours", "netflix_hours", "part_time_job", "attendance_percentage", "sleep_hours", "exercise_frequency", "exam_score"]
# le extragem
df_columns = df.select(columns)
# verificăm
df_columns.show(3)

+----------+---+------+-------------------+------------------+-------------+-------------+---------------------+-----------+------------------+----------+
|student_id|age|gender|study_hours_per_day|social_media_hours|netflix_hours|part_time_job|attendance_percentage|sleep_hours|exercise_frequency|exam_score|
+----------+---+------+-------------------+------------------+-------------+-------------+---------------------+-----------+------------------+----------+
|     S1000| 23|Female|                0.0|               1.2|          1.1|           No|                 85.0|        8.0|                 6|      56.2|
|     S1001| 20|Female|                6.9|               2.8|          2.3|           No|                 97.3|        4.6|                 6|     100.0|
|     S1002| 21|  Male|                1.4|               3.1|          1.3|           No|                 94.8|        8.0|                 1|      34.3|
+----------+---+------+-------------------+------------------+--------

###Curățarea datelor
Ștergem rândurile cu valori nule (NULL) sau goale ("", " "), deoarece acestea pot afecta analizele ulterioare.

In [None]:
initial_count = df_columns.count()
print(f"Numărul inițial de rânduri: {initial_count}")

# Ștergem rândurile ce conțin NULL
df_fara_null = df_columns.na.drop(how="any")
randuri_null = df_fara_null.count()
nr_randuri_null = initial_count - randuri_null
print(f"Au fost găsite {nr_randuri_null} rânduri NULL")

from pyspark.sql.functions import trim, col

# Ștergem rânduri goale
df_curat = df_fara_null
for col_name in columns:
  # Aplicăm ștergerea doar pe coloanele de tip String, altfel pentru cele de alt tip se va genera o eroare
  if str(df_fara_null.schema[col_name].dataType) == "StringType":
    # Eliminăm rândurile în care valoarea este un șir gol după eliminarea spațiilor
      df_curat = df_curat.filter(trim(col(col_name)) != "")

final_count = df_curat.count()
nr_coloane_goale = initial_count - final_count
print(f"Au fost găsite {nr_coloane_goale} rânduri goale")

nr_randuri_curatate = nr_randuri_null + nr_coloane_goale

if nr_randuri_curatate == 0:
  print("Datele sunt curate!")
  df = df_columns
else:
  print(f"Au fost curățate {nr_randuri_curatate} rânduri în total")
  df = df_curat
# !În continuare putem folosi "df" atunci când ne referim la dataframe-ul nostru

Numărul inițial de rânduri: 1000
Au fost găsite 0 rânduri NULL
Au fost găsite 0 rânduri goale
Datele sunt curate!


Adăugăm o nouă coloană *screen_time*, care reprezintă media artimetică dintre orele petrecute pe social media ("social_media_hours") și orele petrecute pe Netflix ("netflix_hours"). Acestă modificare este necesară, deoarece obiectivul nostru nu este să diferențiem platformele utilizate, ci de a analiza timpul petrecut online în raport cu notele obținute.

In [None]:
# adăugăm și calculăm noua coloană
df = df.withColumn(
    "screen_time",
    (col("social_media_hours") + col("netflix_hours")) / 2
)

# eliminăm coloanele care nu ne mai sunt necesare
df = df.drop("social_media_hours", "netflix_hours")

# afișăm noul dataframe
df.show(3)

+----------+---+------+-------------------+-------------+---------------------+-----------+------------------+----------+-----------+
|student_id|age|gender|study_hours_per_day|part_time_job|attendance_percentage|sleep_hours|exercise_frequency|exam_score|screen_time|
+----------+---+------+-------------------+-------------+---------------------+-----------+------------------+----------+-----------+
|     S1000| 23|Female|                0.0|           No|                 85.0|        8.0|                 6|      56.2|       1.15|
|     S1001| 20|Female|                6.9|           No|                 97.3|        4.6|                 6|     100.0|       2.55|
|     S1002| 21|  Male|                1.4|           No|                 94.8|        8.0|                 1|      34.3|        2.2|
+----------+---+------+-------------------+-------------+---------------------+-----------+------------------+----------+-----------+
only showing top 3 rows



## 2.2. Grupări și agregări de date

**Verificăm dacă există o legătură între statusul jobului part-time și notele finale**

Am grupat studenții după coloana *part_time_job* și am calculat media notelor la examen și număr studenților din fiecare categorie.

In [None]:
from pyspark.sql.functions import avg, count, round
# DataFrame
df_job = df.groupBy("part_time_job").agg(
    round(avg("exam_score"), 2).alias("avg_score"),
    count("student_id").alias("number_of_students")).orderBy("avg_score").show()

+-------------+---------+------------------+
|part_time_job|avg_score|number_of_students|
+-------------+---------+------------------+
|          Yes|    68.74|               215|
|           No|    69.84|               785|
+-------------+---------+------------------+



Deși putem observa o diferență de 1.1 (68.74 comparativ cu 69.84) nu este suficientă pentru a concluziona dacă existența unui job part-time are o influență negativă asupra notelor.
Astfel, în continuare, vom analiza dacă și genul studentului influențează relația.

In [None]:
# Spark SQL
# Creăm o vedere temporară pentru a putea folosi SQL
df.createOrReplaceTempView("students")

df_job_gender = spark.sql("""
          SELECT gender, part_time_job, ROUND(AVG(exam_score), 2) as avg_score
          FROM students
          GROUP BY gender, part_time_job
          ORDER BY part_time_job, gender
""").show()

+------+-------------+---------+
|gender|part_time_job|avg_score|
+------+-------------+---------+
|Female|           No|    69.63|
|  Male|           No|    69.82|
| Other|           No|    72.58|
|Female|          Yes|     70.2|
|  Male|          Yes|    67.85|
| Other|          Yes|    64.48|
+------+-------------+---------+



Observăm că efectul unui job part-time variază în funcție de gen. Femeile au tendința de a avea un scor mediu mai mare cu 0.57 puncte atunci când lucrează, în timp ce bărbații și persoanele cu alt tip de gen au scoruri mai mici cu 1.97, respectiv 8.1 puncte.

Prin urmare, job-ul part-time nu are un efect uniform, depinzând de gen.

## 3. Metode ML

#### 3.1 Regresia Liniară
Presupunem că dorim să prezicem nota la examen în funcție de 4 factori:
- timpul pe care studentul îl petrece învățând;
- orele de somn;
- timpul petrecut în fața ecranelor (cu scopul divertismentului);
- prezența la cursuri.

Motivele pentru care am ales *regresia liniară*:
- scopul este de a face o predicție numerică;
- este un model adecvat pentru a prezenta relațiile liniare dintre variabila dependentă (*exam_score*) și variabilele independente (*study_hours_per_day, screen_time, sleep_hours, attendance_percentage*);
- rezultatul este ușor de interpretat.


In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType

# Coloanele pe care dorim să le folosim sunt în format String, astfel va trebui să le convertim în Double
df = df.withColumn("study_hours_per_day", col("study_hours_per_day").cast(DoubleType()))
df = df.withColumn("sleep_hours", col("sleep_hours").cast(DoubleType()))
df = df.withColumn("screen_time", col("screen_time").cast(DoubleType()))
df = df.withColumn("attendance_percentage", col("attendance_percentage").cast(DoubleType()))
df = df.withColumn("exam_score", col("exam_score").cast(DoubleType()))

# Combinăm mai multe coloane într-una singură (features), deoarece modelele de ML nu acceptă mai multe
assembler = VectorAssembler(inputCols=["study_hours_per_day", "screen_time", "sleep_hours", "attendance_percentage"],
                            outputCol="features")

# Aplicăm assembler-ul pe df-ul nostru
output = assembler.transform(df)

# Selectăm coloanele necesare (x și y)
final_data = output.select("features", "exam_score")

# Împărțim datele în train și test
train_data, test_data = final_data.randomSplit([0.7, 0.3], seed=42)

# Inițializăm modelul
lr = LinearRegression(featuresCol="features", labelCol="exam_score")
# Și îl antrenăm
lr_model = lr.fit(train_data)

print("Interpretarea datelor")
# Interpretăm coeficienții și intercept-ul
coeffs = [f"{coef:.2f}" for coef in lr_model.coefficients]
print("")
print(f"Dacă un student:")
print(f"- învață cu o oră mai mult pe zi, scorul crește cu {coeffs[0]}")
print(f"- petrece o oră în plus în fața ecranului, scorul scade cu {coeffs[1]}")
print(f"- doarme cu o oră în plus, scorul crește cu {coeffs[2]}")
print(f"- are un procent în plus la prezență, scorul crește cu {coeffs[3]}")
print(f"Intercept: {lr_model.intercept:.2f} (scorul studentului dacă nu ar face nimic din cele de mai sus)")

# Evaluăm modelul
test_results = lr_model.evaluate(test_data)
print(f"RMSE: modelul greșește în medie cu {test_results.rootMeanSquaredError:.2f} puncte față de valorile reale")
r2_percentage = test_results.r2 * 100
print(f"R2: {r2_percentage:.2f}% din variația notelor poate fi explicată de variabilele utilizate")

# Predicții
print("\nPredicții")
unlabeled_data = test_data.select("features")
predictions = lr_model.transform(test_data)
predictions.select("features", "prediction", "exam_score").show(10)
print("Diferența dintre valoarea reală și predicția modelului")
test_results.residuals.show(10)


Interpretarea datelor

Dacă un student:
- învață cu o oră mai mult pe zi, scorul crește cu 9.39
- petrece o oră în plus în fața ecranului, scorul scade cu -5.08
- doarme cu o oră în plus, scorul crește cu 1.79
- are un procent în plus la prezență, scorul crește cu 0.13
Intercept: 25.51 (scorul studentului dacă nu ar face nimic din cele de mai sus)
RMSE: modelul greșește în medie cu 8.57 puncte față de valorile reale
R2: 77.28% din variația notelor poate fi explicată de variabilele utilizate

Predicții
+-------------------+------------------+----------+
|           features|        prediction|exam_score|
+-------------------+------------------+----------+
|[0.0,1.15,8.0,85.0]| 44.60155289472823|      56.2|
| [0.0,2.2,5.8,93.4]| 36.39129185080092|      26.7|
|[0.0,2.55,5.7,89.5]|  33.9482085763249|      31.1|
|[0.0,2.65,3.8,85.6]|29.557211027967256|      30.5|
| [0.1,0.6,7.6,78.7]| 46.82966265520549|      53.4|
| [0.2,2.6,7.0,77.9]| 36.44549677459311|      31.5|
|[0.3,2.25,8.3,77.5]| 41.

### 3.2 Regresie Logistică
Presupunem că dorim să precizăm probabilitatea ca un student sa promoveze examenul (nota >= 5), pe bază a doi factori:
* timpul mediu petrecut învățând;
* procentul de prezență la cursuri.

Motivele pentru care am ales regresia logistică:
* este potrivit pentru clasificarea binară (1 - dacă studentul a promovat, 0 - dacă nu);
* ușor de implementat, interpretat și antrenat.



In [None]:
from pyspark.sql.functions import when
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Creăm o nouă coloană passed_exam care va conține 1 daca nota este 5 sau mai mare sau 0 daca este mai mică de 5
df = df.withColumn("passed_exam", when(col("exam_score") >= 50, 1).otherwise(0))

# Combinăm mai multe coloane într-una singură (features), deoarece modelele de ML nu acceptă mai multe
assembler = VectorAssembler(inputCols=["study_hours_per_day", "attendance_percentage"],
                            outputCol="features")

# Aplicăm assembler-ul pe df-ul nostru
output = assembler.transform(df)

# Selectăm coloanele necesare (x și y)
final_data = output.select("features", "passed_exam")

# Împărțim datele în train și test
train_data, test_data = final_data.randomSplit([0.7, 0.3], seed=42)

# Inițializăm modelul
lr = LogisticRegression(featuresCol="features", labelCol="passed_exam")
# Și îl antrenăm pe setul de date de antrenament
lr_model = lr.fit(train_data)

# Evaluăm modelul pe datele de test
results = lr_model.evaluate(test_data)

# Predicțiile pentru model
predictions = lr_model.transform(test_data)

# Evaluăm eficiența folosind AUC (Area under the ROC Curve)
evaluator_auc = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="passed_exam")
# AUC - cât de bine diferențiază modelul între studenții care au trecut și cei care nu
auc = evaluator_auc.evaluate(predictions)

multi_evaluator = MulticlassClassificationEvaluator(labelCol="passed_exam", predictionCol="prediction")

# Calculăm acuratețea - proporția de predicții corecte (adică cât de des are dreptate modelul)
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
# Calculăm precizia - proporția de valori prezise ca fiind pozitive și care sunt într-adevăr pozitive (adică precizia penalizează predicțiile greșite ca pozitive)
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
# Calculăm sensibilitatea - proporția de valori pozitive corect identificate (adică sensibilitatea penalizează ratările pozitive)
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
# Calculăm F1 - media armonică dintre precizie și sensibilitate care echilibrează greșelile de tip fals pozitiv și fals negativ
f1_score = 2 * (precision * recall) / (precision + recall)

print(f"Modelul prezice corect rezultatul în {accuracy * 100:.2f}% din cazuri. (Acuratețe)")
print(f"Dintre toate cazurile pe care modelul le-a prezis ca fiind promovate au fost corecte {precision * 100:.2f}%. (Precizie)")
print(f"Indetifică {recall * 100:.2f}% din studenții care au trecut cu adevărat. (Sensibilitate)")
print(f"Capacitatea de a distinge între promovări și restanțe este de {auc * 100:.2f}% (ROC-AUC)")
print(f"Echilibrul dintre precizie și sensibilitate: {f1_score * 100:.2f}% (Scorul F1)")

#Calculăm coeficienții și intercept-ul
coeffs = lr_model.coefficients
intercept = lr_model.intercept
print("\nCoeficienții indică impactul fiecărei variabile asupra șansei de promovare.")
# log-odds - șansa de promovare, folosită de model pentru a calcula probabilitatea
print("Interpretarea coeficienților (log-odds):")
print(f"Fiecare oră suplimentară de studiu crește log-odds-ul promovării cu {coeffs[0]:.2f}")
print(f"Fiecare procent suplimentar de prezență crește log-odds-ul promovării cu {coeffs[1]:.2f}")
print(f"Intercept ({intercept:.2f}): șansa inițială de promovare când toate variabilele sunt zero")



Modelul prezice corect rezultatul în 89.45% din cazuri. (Acuratețe)
Dintre toate cazurile pe care modelul le-a prezis ca fiind promovate au fost corecte 89.86%. (Precizie)
Indetifică 89.45% din studenții care au trecut cu adevărat. (Sensibilitate)
Capacitatea de a distinge între promovări și restanțe este de 78.97% (ROC-AUC)
Echilibrul dintre precizie și sensibilitate: 89.66% (Scorul F1)

Coeficienții indică impactul fiecărei variabile asupra șansei de promovare.
Interpretarea coeficienților (log-odds):
Fiecare oră suplimentară de studiu crește log-odds-ul promovării cu 2.04
Fiecare procent suplimentar de prezență crește log-odds-ul promovării cu 0.04
Intercept (-6.51): șansa inițială de promovare când toate variabilele sunt zero


## 4. Data Pipeline
Am utilizat data pipeline-uri în mai multe locuri în codurile de mai sus.
Spre exemplu, atunci cand am transformat valorile din string in DoubleType sau când am combinat mai multe coloane într-una singură cu VectorAssembler.

## 5. UDF
Vom defini o funcție UDF care convertește genul din variabilele calitative în formă numerică. Astfel, "Male" va deveni 0, "Female" 1 și "Other" 2.

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
# Funcția care face transformarea
def gender_to_int(g):
    if g == "Female":
        return 1
    elif g == "Male":
        return 0
    else:
        return 2
# Definim funcția UDF de tip Integer
gender_udf = udf(gender_to_int, IntegerType())
# Aplicăm UDF-ul pentru a crea o nouă coloană "gender_num"
df = df.withColumn("gender_num", gender_udf(col("gender")))
# Verificăm modificările
df.select("gender", "gender_num").show(5)

df.show(5)

+------+----------+
|gender|gender_num|
+------+----------+
|Female|         1|
|Female|         1|
|  Male|         0|
|Female|         1|
|Female|         1|
+------+----------+
only showing top 5 rows

+----------+---+------+-------------------+-------------+---------------------+-----------+------------------+----------+-----------+-----------+----------+
|student_id|age|gender|study_hours_per_day|part_time_job|attendance_percentage|sleep_hours|exercise_frequency|exam_score|screen_time|passed_exam|gender_num|
+----------+---+------+-------------------+-------------+---------------------+-----------+------------------+----------+-----------+-----------+----------+
|     S1000| 23|Female|                0.0|           No|                 85.0|        8.0|                 6|      56.2|       1.15|          1|         1|
|     S1001| 20|Female|                6.9|           No|                 97.3|        4.6|                 6|     100.0|       2.55|          1|         1|
|     S100

## 6. Metoda DL și optimizarea hiperparametrilor


Ne propunem să construim un model de machine learning care va învăța să prezică dacă un student va promova sau nu un examen, pe baza factorilor precum: vârsta, genul, timpul petrecut zilnic învățând, în fața ecranului sau dormind, dar și procentul de prezență la cursuri și frecvența activității fizice.

Am ales ca metodă Multi-Layer Perceptron (MLP) deoarece:
* este versatil, fiind perfect pentru date tabulare;
* este nonlinear, deci poate modela relațiile complexe între factorii care influențeză nota finală;
* permite optimizarea ușoară a hiperparametrilor prin GridSearchCV, testând diferite combinații de neuroni.

In [None]:
import numpy as np
import pandas as pd
import scikeras
import tensorflow
from pyspark.sql.functions import col
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import confusion_matrix, classification_report
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Input
from scikeras.wrappers import KerasClassifier

# Alegem coloanele relevante
cols = ["age", "gender_num", "study_hours_per_day", "screen_time", "attendance_percentage", "sleep_hours", "exercise_frequency", "passed_exam"]
df_cols = df.select(*cols)

# Convertim din spark dataframe în pandas, pentru ca tensorflow nu poate utiliza spark
pandas_df = df_cols.toPandas().astype(float)

# Pregătim datele, x - datele de intrare, y - dacă a trecut examenul (1) sau nu (0)
X = pandas_df.drop("passed_exam", axis=1)
y = pandas_df['passed_exam'].astype(int)

# Standardizăm datele și le împărțim în antrenare 80% și testare 20%
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# în timp ce random_state face împărțirea aleatorie să fie mereu aceeași, stratify păstrează proporția claselor
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42, stratify=y)

# Creăm o UDF care construiește rețeaua neuronală
# adam = algoritm care învață din greșeli
def create_model(optimizer='adam', dropout_rate=0.2, units1=64, units2=32):
  model = Sequential([ # straturile sunt aranjate unul după altul, secvențial
      Input(shape=(X_train.shape[1],)), # primul start de intrare care va primi numărul de coloane din X_train
      Dense(units1, activation='relu'), # relu = Rectified Linear Unit. ce face? dacă input-ul este un număr pozitiv îl va lăsa să treacă, dar dacă este negativ îl va transforma în 0
      Dropout(dropout_rate), # oprim aleatoriu 20% din neuroni. Dacă memorează prea bine datele de antrenare nu va fucționa și pe unele noi
      Dense(units2, activation='relu'),
      Dropout(dropout_rate),
      Dense(1, activation='sigmoid') # folosim doar un neuron, pentru ca vrem să știm dacă studentul trece sau nu (1 sau 0), iar sigmoid face această transformare
  ])
  # compilăm modelul și măsurăm acuratețea, adică procentul de răspunsuri corecte
  model.compile(optimizer = optimizer, loss = 'binary_crossentropy', metrics = ['accuracy']) # binary_crossentropy = măsoară distanța dintre etichetele claselor reale și probabilitățile prezise de model
  return model

# creăm obiectul clasei KerasClassifier, pentru a putea folosi sklearn
model = KerasClassifier(model=create_model, verbose=0)
# creăm dicționarul parametrilor pe care vrem să îi ajustăm și transmite ca argument în GridSearchCV
# OPTIMIZAREA HIPERPARAMETRILOR REȚELEI NEURONALE
dict_params = { # valorile sunt transmise ca argumente în funcția create_model(), pentru a încerca diverse variante
    'model__optimizer': ['adam', 'rmsprop'],
    'model__dropout_rate': [0.2, 0.3],
    'model__units1': [64, 32],
    'model__units2': [32, 16],
    'batch_size': [32], # câte exemple să proceseze odată
    'epochs': [20] # de câte ori să treacă prin toate datele de antrenare
} # Astfel, GridSearchCV poate testa toate combinațiile posibile
# Creăm obiectul GridSearchCV
grid = GridSearchCV(estimator=model, param_grid=dict_params, scoring='accuracy', cv=3, error_score='raise') # cv împarte datele de antrenare (X_train și y_train) în 3 părti egale
# Executăm căutarea
grid.fit(X_train, y_train)
# Afișăm rezultatele
print("Cele mai bune valori ale hiperparametrilor: ", grid.best_params_)
print("Cel mai bun scor: ", grid.best_score_)

# Evaluam modelul final pe datele de test care nu au fost folosite in GridSerchCV
# extragem modelul cu cei mai buni hiperparametri găsiți
best_model = grid.best_estimator_.model_
# calculăm acuratețea finală
accuracy = grid.best_estimator_.score(X_test, y_test) # folosim verbose=0 pentru a nu mai afișa progresul în timpul evaluării
print(f"Acuratețea setului de test: {accuracy:.2f}")
# transformăm probabilitățile, adică probabilitate mai mare de 0.5 va trece (deci 1), altfel 0
y_pred = (best_model.predict(X_test) > 0.5).astype(int)
print("Matricea de confuzie: \n", confusion_matrix(y_test, y_pred))
print("Raportul de clasificare: \n", classification_report(y_test, y_pred))

Cele mai bune valori ale hiperparametrilor:  {'batch_size': 32, 'epochs': 20, 'model__dropout_rate': 0.2, 'model__optimizer': 'adam', 'model__units1': 32, 'model__units2': 32}
Cel mai bun scor:  0.9075309997841045
Acuratețea setului de test: 0.92
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step 
Matricea de confuzie: 
 [[ 12  14]
 [  2 172]]
Raportul de clasificare: 
               precision    recall  f1-score   support

           0       0.86      0.46      0.60        26
           1       0.92      0.99      0.96       174

    accuracy                           0.92       200
   macro avg       0.89      0.73      0.78       200
weighted avg       0.92      0.92      0.91       200



GridSearchCV a găsit cea mai bună combinație de parametri ca fiind:
* optimizer: Adam,
* dropout rate: 20%,
* 32 de neuroni în primul strat ascuns,
* 32 în al doilea,
* batch size: 32,
* epochs: 20.

Acuratețea finală este de 92%, deci clasifică corect 92% din studenți, fiind o performanță foarte bună.

În matricea de confuzie a prezis:
* corect că nu vor trece (TN) 12 studenți,
* corect că vor trece (TP) 172 studenți,
* greșit că nu vor trece, dar au trecut 2 studenți (FN),
* greșit că vor trece, dar nu au trecut (FP) 14 studenți.

Astfel, putem observa o problemă în identificarea studenților care nu au promovat.

Același lucru putem observa și în raportul de clasificare: studenții care au promovat (1) sunt identificați corect în cele mai multe cazuri, dar cei care nu au promovat (0) sunt identificați în număr mult prea mic.

Un motiv posibil pentru inacuratețea în cazul nepromovării este dezechilibrul dintre clase, fiindcă în datele noastre avem 174 de studenți care promovează și doar 26 care nu. Prin urmare, modelul va învăța să favorizeze clasa majoritară pentru a obține o acuratețe mai mare, în timp ce va avea o performanță slabă în detectarea cazurilor din clasa minoritară.

În consecință, vom echilibra clasele prin undersampling, adică vom reduce numărul de observații din categoria majoritară pentru a obține un set balansat.

In [None]:
from sklearn.utils import resample
# Separăm clasele în 2 df-uri
passed = pandas_df[pandas_df['passed_exam'] == 1]
failed = pandas_df[pandas_df['passed_exam'] == 0]
print("Înainte de undersampling:")
print(pandas_df['passed_exam'].value_counts())
print(f"Clasa minoritară are {len(failed)} observații")
# Undersampling-ul
passed_undersampled = resample(passed, replace=False, n_samples=len(failed), random_state=42)
undersampled_df = pd.concat([passed_undersampled, failed])

# Verificăm
print("După undersampling: ")
print(undersampled_df['passed_exam'].value_counts())

# Pregătim datele
X_undersampled = undersampled_df.drop('passed_exam', axis=1)
y_undersampled = undersampled_df['passed_exam'].astype(int)

# Standardizarea și împărțirea datelor
X_undersampled_scaled = scaler.fit_transform(X_undersampled)
X_train_undersampled, X_test_undersampled, y_train_undersampled, y_test_undersampled = train_test_split(X_undersampled_scaled, y_undersampled, test_size=0.2, random_state=42, stratify=y_undersampled)

# creăm obiectul clasei KerasClassifier
model_undersampled = KerasClassifier(model=create_model, verbose=0)

# Creăm obiectul GridSearchCV
grid_undersampled = GridSearchCV(estimator=model_undersampled, param_grid=dict_params, scoring='accuracy', cv=3)
# Executăm căutarea
grid_undersampled.fit(X_train_undersampled, y_train_undersampled)

# Afișăm rezultatele
print("Cele mai bune valori ale hiperparametrilor: ", grid_undersampled.best_params_)
print("Cel mai bun scor: ", grid_undersampled.best_score_)

# Evaluam modelul final pe datele de test care nu au fost folosite in GridSearchCV
# extragem modelul cu cei mai buni hiperparametri găsiți
best_model_undersampled = grid_undersampled.best_estimator_.model_
# calculăm acuratețea finală
loss, accuracy = best_model_undersampled.evaluate(X_test_undersampled, y_test_undersampled, verbose=0) # folosim verbose=0 pentru a nu mai afișa progresul în timpul evaluării
print(f"Acuratețea setului de test: {accuracy:.2f}")
# transformăm probabilitățile, adică probabilitate mai mare de 0.5 va trece (deci 1), altfel 0
y_pred = (best_model_undersampled.predict(X_test_undersampled) > 0.5).astype(int)
print("Matricea de confuzie: \n", confusion_matrix(y_test_undersampled, y_pred))
print("Raportul de clasificare: \n", classification_report(y_test_undersampled, y_pred))

Înainte de undersampling:
passed_exam
1.0    869
0.0    131
Name: count, dtype: int64
Clasa minoritară are 131 observații
După undersampling: 
passed_exam
1.0    131
0.0    131
Name: count, dtype: int64
Cele mai bune valori ale hiperparametrilor:  {'batch_size': 32, 'epochs': 20, 'model__dropout_rate': 0.2, 'model__optimizer': 'rmsprop', 'model__units1': 64, 'model__units2': 32}
Cel mai bun scor:  0.8946169772256729
Acuratețea setului de test: 0.89
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 57ms/step
Matricea de confuzie: 
 [[26  1]
 [ 5 21]]
Raportul de clasificare: 
               precision    recall  f1-score   support

           0       0.84      0.96      0.90        27
           1       0.95      0.81      0.88        26

    accuracy                           0.89        53
   macro avg       0.90      0.89      0.89        53
weighted avg       0.90      0.89      0.89        53



**Interpretarea finală**

După aplicarea undersampling-ului, GridSearchCV a găsit o combinație optimă de hiperparametri similară cu modelul anterior, obținând o acuratețe de 89%, aparent mai scăzută comparativ cu modelul neechilibrat (92%), dar trebuie să avem în vedere și celelalte aspecte.

În matricea de confuzie observăm o îmbunătățire considerabilă. Acesta a prezis:
* corect că nu vor trece (TN) 26 studenți,
* corect că vor trece (TP) 21 studenți,
* greșit că nu va trece, dar a trecut 1 student (FP),
* greșit că vor trece, dar nu au trecut (FN) 5 studenți.

Modelul neechilibrat detecta doar 46% din studenții nepromovați, dar funcționa excelent pentru clasa majoritară (99% recall pentru promovați).

Modelul echilibrat oferă o performanță potrivită pentru ambele cazuri: detectează 96% din studenții nepromovați și 81% din cei promovați.

În concluzie, în ciuda faptului că acuratețea scade cu 3%, modelul echilibrat este semnificativ mai eficient în identificarea corectă a ambelor grupuri de studenți.



## 7. Proces de streaming
Vom crea un sistem care să primească noi date despre studenți în timp real și să prezică dacă aceștia vor promova examenul, folosind modelul antrenat anterior.

In [None]:
# Definim structura datelor care vor veni prin streaming
from pyspark.sql.types import *
from pyspark.sql.functions import col, udf
import time
import shutil
import json
import os
import builtins

# Definim o schemă cu informațiile pe care le vom avea despre fiecare student
schema = StructType([
    StructField("age", IntegerType(), True),
    StructField("gender_num", IntegerType(), True),
    StructField("study_hours_per_day", DoubleType(), True),
    StructField("screen_time", DoubleType(), True),
    StructField("attendance_percentage", DoubleType(), True),
    StructField("sleep_hours", DoubleType(), True),
    StructField("exercise_frequency", DoubleType(), True)])

# Creăm un director temporar unde vor ajunge fișierele cu datele noi
dir = "/tmp/streaming"
# Dacă există deja îl ștergem
if os.path.exists(dir):
  shutil.rmtree(dir)
# Și îl creăm din nou
os.makedirs(dir)

# Creăm o funcție pentru a folosi modelul
def predict_student_result(age, gender_num, study_hours_per_day, screen_time, attendance_percentage, sleep_hours, exercise_frequency):
  # Creăm un array care conține datele formatate pentru model
  input_data = np.array([[float(age), float(gender_num), float(study_hours_per_day), float(screen_time), float(attendance_percentage), float(sleep_hours), float(exercise_frequency)]])
  # Normalizăm datele
  scaled_input = scaler.transform(input_data)
  # Utilizăm modelul de machine learning antrenat anterior pentru a face predicția
  prediction = best_model_undersampled.predict(scaled_input)[0][0]
  # Dacă probabilitatea este mai mare de 0.5 studentul va promova
  return "Promovează" if prediction > 0.5 else "Nu promovează", float(prediction)

# Creăm un UDF care face ca funcția de mai sus să fie compatibilă cu Spark
predict_udf = udf(predict_student_result, StructType([
    StructField("result", StringType()),
    StructField("probability", DoubleType())
]))
# Configurăm citirea în timp real a stream-ului
stream_df = spark.readStream.schema(schema).json(dir)
# Pentru fiecare rând aplicăm UDF-ul și selectăm coloanele relevante
result_df = stream_df.withColumn("prediction_result", predict_udf(col("age"), col("gender_num"), col("study_hours_per_day"), col("screen_time"), col("attendance_percentage"), col("sleep_hours"), col("exercise_frequency"))).select("*", "prediction_result.result", "prediction_result.probability")
# Afișăm rezultatul
query = result_df.writeStream.outputMode("append").format("console").start()

# Creăm o funcție care simulează primirea de studenți noi la fiecare 3 secunde
def generate_data():
  students_generated = []
  # Creăm 10 fișiere, adică 10 studenți
  for i in range(10):
    data = {
      "age": int(np.random.randint(17, 25)),
      "gender_num": int(np.random.randint(0, 3)),
      "study_hours_per_day": float(builtins.round(np.random.uniform(1.0, 8.3), 2)),
      "screen_time": float(builtins.round(np.random.uniform(0.0, 7.2), 2)),
      "attendance_percentage": float(builtins.round(np.random.uniform(56.0, 100.0), 2)),
      "sleep_hours": float(builtins.round(np.random.uniform(6.0, 10.0), 2)),
      "exercise_frequency": float(builtins.round(np.random.uniform(0.0, 6.0), 2))}
    # Adăugăm datele în fișier
    with open(f"{dir}/data_{i}.json", 'w') as f:
      json.dump(data, f)
    students_generated.append(data)
    time.sleep(3)
  return students_generated
students_data = generate_data()
# Așteptăm să se procedeze toate datele
time.sleep(20)
# Oprim stream-ul
query.stop()

# Citim datele
processed_data = spark.read.schema(schema).json(dir)
# Verificăm dacă există datele
if processed_data.count() > 0:
  print(f"Numărul total de studenți procesați {processed_data.count()}")
  # Aplicăm modelul de predicție pentru fiecare rând pentru a obține rezultatul și probabilitatea
  analysis_df = processed_data.withColumn("prediction_result", predict_udf(col("age"), col("gender_num"), col("study_hours_per_day"), col("screen_time"), col("attendance_percentage"), col("sleep_hours"), col("exercise_frequency")))
  # Numarăul total de înregistrări
  total = analysis_df.count()
  # Câți studenți au promovat
  promovat = analysis_df.filter(col("prediction_result.result") == "Promovează").count()
  # Câți studenți nu au promovat
  nu_au_promovat = analysis_df.filter(col("prediction_result.result") == "Nu promovează").count()
  print(f"Total studenți: {total}")
  print(f"Au promovat: {promovat}")
  print(f"Nu au promovat: {nu_au_promovat}")
  # Afișăm pentru fiecare student valorile de input, rezultatul și probabilitatea
  analysis_df.select("age", "gender_num", "study_hours_per_day", "sleep_hours", "attendance_percentage", "exercise_frequency", "prediction_result.result", "prediction_result.probability").show()
# Dacă nu există date afisăm un mesaj
else:
  print("Nu au fost găsite date pentru procesare.")


Numărul total de studenți procesați 10
Total studenți: 10
Au promovat: 7
Nu au promovat: 3
+---+----------+-------------------+-----------+---------------------+------------------+-------------+--------------------+
|age|gender_num|study_hours_per_day|sleep_hours|attendance_percentage|exercise_frequency|       result|         probability|
+---+----------+-------------------+-----------+---------------------+------------------+-------------+--------------------+
| 24|         0|               5.45|       6.32|                67.27|              2.22|   Promovează|  0.9994384050369263|
| 18|         1|               3.87|       9.94|                90.91|              2.56|   Promovează|  0.7588656544685364|
| 21|         0|               5.49|       9.97|                74.91|              5.83|   Promovează|  0.9709483981132507|
| 24|         0|               7.35|       6.69|                95.33|              3.26|   Promovează|  0.9999445676803589|
| 24|         0|               5.7

Referințe bibliografice:
* https://www.geeksforgeeks.org/logistic-regression-using-pyspark-python/
* https://h2o.ai/wiki/logistic-regression/
* https://www.geeksforgeeks.org/understanding-logistic-regression/
* https://www.machinelearningplus.com/pyspark/pyspark-logistic-regression/
* https://www.geeksforgeeks.org/f1-score-in-machine-learning/
* https://www.geeksforgeeks.org/deep-learning/implementing-neural-networks-using-tensorflow/
* https://www.geeksforgeeks.org/deep-learning/relu-activation-function-in-deep-learning/
* https://www.geeksforgeeks.org/binary-cross-entropy-log-loss-for-binary-classification/
* https://www.geeksforgeeks.org/machine-learning/hyperparameter-tuning-using-gridsearchcv-and-kerasclassifier/
* https://www.geeksforgeeks.org/neural-networks-a-beginners-guide/
* https://www.geeksforgeeks.org/machine-learning/how-to-handle-imbalanced-classes-in-machine-learning/
* https://medium.com/expedia-group-tech/apache-spark-structured-streaming-input-sources-2-of-6-6a72f798838c