In [None]:
import re
import numpy as np
import pandas as pd

SEED = 42
np.random.seed(SEED)

ModuleNotFoundError: No module named 'numpy'

In [None]:
df = pd.read_csv("stock_trend.csv")

print("Shape:", df.shape)
print("\nColumns:", df.columns.tolist())
print("\nDtypes:\n", df.dtypes)

print("\nMissing values:\n", df.isna().sum())

# Show 10 lines (useful for rubric and sanity check)
df.head(10)

In [None]:
df = df.drop_duplicates()

df["Before"] = pd.to_numeric(df["Before"], errors="coerce")
df["After"]  = pd.to_numeric(df["After"],  errors="coerce")

df = df.dropna(subset=["Title", "Before", "After"])
df = df[df["Before"] > 0].copy()

print("After cleaning shape:", df.shape)

# Show 10 lines
df.head(10)

In [None]:
df["rel_change"] = (df["After"] - df["Before"]) / df["Before"]

def label_trend(x):
    if x > 0.10:
        return "uptrend"
    elif x < -0.10:
        return "downtrend"
    else:
        return "flat"

df["trend"] = df["rel_change"].apply(label_trend)

print(df["trend"].value_counts())

# Show 10 lines
df[["Title", "Before", "After", "rel_change", "trend"]].head(10)

In [None]:
def clean_text(s: str) -> str:
    s = str(s).lower()
    s = re.sub(r"http\S+|www\.\S+", " ", s)
    s = re.sub(r"[^a-z0-9\s]", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

df["text"] = df["Title"].apply(clean_text)

# Show 10 lines
df[["Title", "text", "trend"]].head(10)

In [None]:
label2id = {"downtrend": 0, "flat": 1, "uptrend": 2}
id2label = {v: k for k, v in label2id.items()}

df["label"] = df["trend"].map(label2id)

print(df["label"].value_counts())

# Show 10 lines
df[["text", "trend", "label"]].head(10)

In [None]:
from sklearn.model_selection import train_test_split

final_df = df[["text", "label", "trend", "Time", "Name", "Quote", "Before", "After", "rel_change"]].copy()

train_df, temp_df = train_test_split(
    final_df,
    test_size=0.30,
    random_state=SEED,
    stratify=final_df["label"]
)

val_df, test_df = train_test_split(
    temp_df,
    test_size=(1/3),   # 10% out of total = 1/3 of the 30%
    random_state=SEED,
    stratify=temp_df["label"]
)

train_df = train_df.reset_index(drop=True)
val_df   = val_df.reset_index(drop=True)
test_df  = test_df.reset_index(drop=True)

print("Train:", train_df.shape)
print("Val:  ", val_df.shape)
print("Test: ", test_df.shape)

print("\nTrain dist:\n", train_df["trend"].value_counts(normalize=True))
print("\nVal dist:\n", val_df["trend"].value_counts(normalize=True))
print("\nTest dist:\n", test_df["trend"].value_counts(normalize=True))

# Show 10 lines from each
train_df.head(10), val_df.head(10), test_df.head(10)

In [None]:
y_train = train_df["label"].values
y_val   = val_df["label"].values
y_test  = test_df["label"].values

print(y_train[:10])

In [None]:
sentence_length = 15
n_embedding = 50
n_output = 3
batch_size = 4
epochs = 20

In [None]:
from gensim import downloader

model_glove = downloader.load("glove-wiki-gigaword-50")

In [None]:
import nltk
nltk.download("punkt", quiet=True)

from nltk.tokenize import TweetTokenizer

tweet_tokenizer = TweetTokenizer()

def tokenize_sentence(s: str):
    s = s.lower()
    return tweet_tokenizer.tokenize(s)

In [None]:
def tokens_to_vectors(tokens):
    vectors = []
    for tok in tokens:
        if tok in model_glove:
            vectors.append(model_glove[tok])
        else:
            vectors.append(np.zeros(n_embedding, dtype=np.float32))  # OOV
    return vectors

In [None]:
def normalize_vectors(vectors):
    out = []
    for v in vectors:
        norm = np.linalg.norm(v)
        if norm > 0:
            out.append((v / norm).astype(np.float32))
        else:
            out.append(v.astype(np.float32))
    return out

In [None]:
from tensorflow.keras.preprocessing.sequence import pad_sequences

def texts_to_padded_embeddings(text_series, do_normalize=True):
    seqs = []
    for s in text_series:
        tokens = tokenize_sentence(s)
        vecs = tokens_to_vectors(tokens)
        if do_normalize:
            vecs = normalize_vectors(vecs)
        seqs.append(vecs)
    
    X = pad_sequences(
        seqs,
        maxlen=sentence_length,
        dtype="float32",
        padding="post",
        truncating="post"
    )
    return X

X_train = texts_to_padded_embeddings(train_df["text"])
X_val   = texts_to_padded_embeddings(val_df["text"])
X_test  = texts_to_padded_embeddings(test_df["text"])

print("X_train shape:", X_train.shape)
print("X_val shape:  ", X_val.shape)
print("X_test shape: ", X_test.shape)

# Show 10 samples (short preview)
X_train[:10].shape

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Input, LSTM, Dense, Flatten
from tensorflow.keras.models import Model

tf.random.set_seed(SEED)

inputs = Input(shape=(sentence_length, n_embedding))
lstm = LSTM(2, return_sequences=True, return_state=True)
outputs_seq, state_h, state_c = lstm(inputs)

flat = Flatten()(outputs_seq)
outputs = Dense(n_output, activation="softmax")(flat)

model = Model(inputs=inputs, outputs=outputs)

model.compile(
    optimizer="adam",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"]
)

model.summary()

In [None]:
history = model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=epochs,
    batch_size=batch_size,
    verbose=1
)

In [None]:
from sklearn.metrics import classification_report, accuracy_score

y_pred_probs = model.predict(X_test)
y_pred = np.argmax(y_pred_probs, axis=1)

print("Test Accuracy:", accuracy_score(y_test, y_pred))
print("\nClassification Report:\n")
print(classification_report(
    y_test, y_pred,
    target_names=["downtrend", "flat", "uptrend"]
))

In [None]:
from sklearn.metrics import confusion_matrix

cm = confusion_matrix(y_test, y_pred)
cm

In [None]:
model.save("lstm_stock_trend_model.keras")

# Save splits if needed
train_df.to_csv("train_split.csv", index=False)
val_df.to_csv("val_split.csv", index=False)
test_df.to_csv("test_split.csv", index=False)