# Dreamy
**Pretty much like the twitter db notebook but this uses the databank and the dreamy [repository](https://github.com/lorenzoscottb/DReAMy) to make predictions instead of training our own neural network. It relies on a pretrained BERT model using [labeled data](https://dreambank.net/) (available on request)**

cite as:
```
@article{BERTOLINI2024406,
title = {DReAMy: a library for the automatic analysis and annotation of dream reports with multilingual large language models},
journal = {Sleep Medicine},
volume = {115},
pages = {406-407},
year = {2024},
note = {Abstracts from the 17th World Sleep Congress},
issn = {1389-9457},
doi = {https://doi.org/10.1016/j.sleep.2023.11.1092},
url = {https://www.sciencedirect.com/science/article/pii/S1389945723015186},
author = {L. Bertolini and A. Michalak and J. Weeds}
}
```

In [None]:
#####
# Imports
#####
from pydantic import BaseModel
from typing import List, Optional
import pandas as pd
import pickle
import plotly.graph_objects as go
import dreamy
label_dict = {
    "AP": "apprehension",
    "CO": "confusion",
    "HA": "happiness",
    "AN": "anger",
    "SD": "sadness"
}
def dreamy_pred(report):
    task        = "SA"
    batch_size = 16
    device     = "cpu"  # or "cuda" / device number (e.g., 0) for GPU
    SA_predictions = dreamy.annotate_reports(
        [report], 
        task=task, 
        device=device,
        batch_size=batch_size, 
    )
    SA_predictions[0][0]
    return [entry['score'] for entry in SA_predictions[0]]
# Load your CSV file containing emotion labels and texts.
def prep_data(training_data, samples):
    df = pd.read_csv(training_data)

    # Determine the number of unique labels.
    unique_labels = df['label'].unique()
    num_labels = len(unique_labels)

    # Compute the number of samples per label.
    samples_per_label = samples // num_labels

    # Use groupby and sample to get a balanced dataset.
    balanced_df = df.groupby('label', group_keys=False).apply(
    lambda group: group.sample(n=samples_per_label, random_state=42)
    )

# Extract texts and labels.
    balanced_texts = balanced_df['text'].tolist()
    balanced_labels = balanced_df['label'].tolist()
    return balanced_texts,balanced_labels
#####
# Datamodels
#####
class Run(BaseModel):
    run_number: int
    report: str
    embeddings: Optional[List[float]] = None

class Report(BaseModel):
    name: str
    run: List[Run]
    condition: List[str]

class Task(BaseModel):
    task_name: str
    description: str
    embeddings: Optional[List[float]] = None

#####
# Paths & Naming
#####
total_samples = 500
embedded_reports = "data/interim/reports.pkl"
embedded_tasks = "data/interim/tasks.pkl"
training_data = "data/raw/emotions.csv"
task_names = ["Gehen","Schreibtisch","Tisch"]
embeddings_path = f"data/model/input/emotions_embeddings_{total_samples}.pkl"
labels_path = f"data/model/labels/emotions_labels_{total_samples}.pkl"
conditions_map = {
    1: "complete",
    2: "incomplete",
    3: "interrupted"
}
colors = [
    "rgba(255, 0, 0, 0.6)", # red
    "rgba(0, 255, 0, 0.6)", # green
    "rgba(0, 0, 255, 0.6)" # blue
    ]


In [None]:
with open(embedded_tasks, "rb") as f:
    tasks_data = pickle.load(f)
tasks = [Task(**data) for data in tasks_data]

with open(embedded_reports, "rb") as f:
    report_data = pickle.load(f)
reports = [Report(**data) for data in report_data]

In [None]:
# Dictionary to collect probability distributions per condition.
# Key: condition (e.g. "Condition complete"), Value: list of probability arrays for each run.
condition_probabilities = {}

for report in reports:
    # If you want to associate a run with only one condition (say, the first)
    cond_key = f"Condition {report.condition[0]}"
    if cond_key not in condition_probabilities:
        condition_probabilities[cond_key] = []
        
    for run in report.run:
        if run.embeddings is not None and run.report.strip() != "":
            probs = dreamy_pred(run.report)
            # Append the probabilities to the list for this condition
            condition_probabilities[cond_key].append(probs)


In [None]:
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Define emotion names (order must match the predictor's output).
emotion_names = list(label_dict.values())
# Create angles; we repeat the first emotion to close the polygon.
angles = emotion_names + [emotion_names[0]]
sorted_conditions = sorted(condition_probabilities.keys())

# Create a subplot with one polar chart per condition.
fig = make_subplots(
    rows=1, cols=len(sorted_conditions),
    specs=[[{'type': 'polar'}] * len(sorted_conditions)],
    subplot_titles=[f"{cond}" for cond in sorted_conditions]
)

# Define colors for conditions.
colors = {
    sorted_conditions[0]: "red",
    sorted_conditions[1]: "blue",
    sorted_conditions[2]: "green"
}

for i, cond in enumerate(sorted_conditions):
    # Convert the list of probability arrays to a NumPy array.
    data = np.array(condition_probabilities[cond])
    
    # If the data has an extra dimension (e.g., shape (n_runs, 2, n_emotions))
    # and the two rows are identical, select the first row.
    if data.ndim == 3 and data.shape[1] == 2:
        data = data[:, 0, :]  # Now data shape becomes (n_runs, n_emotions)
    
    # Compute mean and standard deviation for each emotion.
    mean_values = data.mean(axis=0)   # shape: (n_emotions,)
    std_values = data.std(axis=0)     # shape: (n_emotions,)
    
    # Close the polygons by appending the first value at the end.
    mean_closed = np.concatenate([mean_values, [mean_values[0]]])
    # Compute the upper and lower bounds for the fill.
    upper_bound = mean_values + std_values
    lower_bound = mean_values - std_values
    # Close the bounds.
    upper_closed = np.concatenate([upper_bound, [upper_bound[0]]])
    lower_closed = np.concatenate([lower_bound, [lower_bound[0]]])
    
    # Build a polygon for the shaded area (upper bound then reversed lower bound).
    fill_r = np.concatenate([upper_closed, lower_closed[::-1]])
    fill_theta = np.concatenate([angles, angles[::-1]])
    
    # Add the shaded area for ±1 standard deviation.
    fig.add_trace(go.Scatterpolar(
        r=fill_r,
        theta=fill_theta,
        fill='toself',
        fillcolor=colors.get(cond, "black"),
        opacity=0.2,
        line=dict(color='rgba(0,0,0,0)'),
        showlegend=False,
        name=f'{cond} Std'
    ), row=1, col=i+1)
    
    # Add the mean line.
    fig.add_trace(go.Scatterpolar(
        r=mean_closed,
        theta=angles,
        mode='lines+markers',
        name=f'{cond} Mean',
        line=dict(color=colors.get(cond, "black"))
    ), row=1, col=i+1)

# Let Plotly auto-scale the radial axis or set an appropriate range.
for i in range(1, len(sorted_conditions) + 1):
    polar_id = f"polar{i}" if i > 1 else "polar"
    fig.update_layout({
        polar_id: dict(
            radialaxis=dict(
                range=[0, 1],  # Adjust this range if needed.
                autorange=False
            )
        )
    })

fig.show()
