In [9]:
!pip install --quiet pandas numpy scikit-learn shap nltk scipy
import nltk
nltk.download("punkt", quiet=True)
print("Packages installed.")


Packages installed.


In [10]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)
print("Drive mounted.")


Mounted at /content/drive
Drive mounted.


In [11]:
ID = "STU149"   # <<< CHANGE THIS ONLY

BOOKS_PATH = "/content/drive/MyDrive/FLAG_CHALLENGE/books.csv"
REVIEWS_PATH = "/content/drive/MyDrive/FLAG_CHALLENGE/reviews.csv"
OUTDIR = f"/content/drive/MyDrive/FLAG_CHALLENGE/CTF_{ID}"

print("Using ID:", ID)
print("Output folder:", OUTDIR)


Using ID: STU149
Output folder: /content/drive/MyDrive/FLAG_CHALLENGE/CTF_STU149


In [12]:
import pandas as pd
import numpy as np
import os

if not os.path.exists(BOOKS_PATH):
    raise SystemExit("books.csv NOT FOUND")

if not os.path.exists(REVIEWS_PATH):
    raise SystemExit("reviews.csv NOT FOUND")

books = pd.read_csv(BOOKS_PATH, dtype=str, keep_default_na=False)
reviews = pd.read_csv(REVIEWS_PATH, dtype=str, keep_default_na=False)

print("Books:", books.shape)
print("Reviews:", reviews.shape)

# Convert numeric fields
books["rating_number"] = pd.to_numeric(books["rating_number"], errors="coerce")
books["average_rating"] = pd.to_numeric(books["average_rating"], errors="coerce")
reviews["rating"] = pd.to_numeric(reviews["rating"], errors="coerce")


Books: (20000, 28)
Reviews: (728026, 12)


In [13]:
# Merge
merged = reviews.merge(
    books,
    how="left",
    left_on="parent_asin",
    right_on="parent_asin",
    suffixes=("_rev","_book")
)
print("Merged:", merged.shape)

# Title candidate columns
title_candidates = [
    "title","subtitle","description",
    "features_text","category_level_2_sub","category_level_1_main"
]

# Ensure columns exist
for c in title_candidates:
    if c not in merged.columns:
        merged[c] = ""

# Replace empty Strings with NaN safely
for c in title_candidates:
    merged.loc[:, c] = merged[c].replace("", np.nan)

# Coalesce (priority order)
coalesced = merged[title_candidates[0]].copy()
for c in title_candidates[1:]:
    coalesced = coalesced.fillna(merged[c])

# Fallback to parent_asin
coalesced = coalesced.fillna(merged["parent_asin"])

# Clean to single-line title
merged["book_title_for_flag"] = (
    coalesced.astype(str)
    .str.split("\n").str[0].str.strip()
)

# Replace empty with parent_asin
merged["book_title_for_flag"] = merged["book_title_for_flag"].mask(
    merged["book_title_for_flag"] == "",
    merged["parent_asin"]
)

print("Sample titles:", merged["book_title_for_flag"].unique()[:5])


Merged: (728026, 39)


  merged.loc[:, c] = merged[c].replace("", np.nan)


Sample titles: ['Paperback – July 25, 2017' 'Paperback – Box set, September 16, 2008'
 'Paperback – April 29, 2003' 'Kindle Edition' 'Hardcover – June 1, 1993']


In [14]:
import hashlib

full_hash = hashlib.sha256(ID.encode()).hexdigest()
EXPECTED_TOKEN = full_hash[:8].upper()

print("SHA256:", full_hash)
print("Expected TOKEN:", EXPECTED_TOKEN)


SHA256: b347559e849e66f3b2103023f44ba878a1c35972173f269d0f7abbf2bb8abc0c
Expected TOKEN: B347559E


In [15]:
import re
from collections import Counter

candidates = merged[
    (merged["rating_number"] == 1234) &
    (merged["average_rating"] == 5.0)
]
print("Candidate rows:", len(candidates))

def find_token(df, token):
    if "text" not in df.columns:
        return df.iloc[0:0]
    return df[df["text"].fillna("").str.contains(re.escape(token), case=False, na=False)]

found = find_token(candidates, EXPECTED_TOKEN)
if found.empty:
    found = find_token(merged, EXPECTED_TOKEN)

chosen_token = None

# If found, use expected token
if not found.empty:
    chosen_token = EXPECTED_TOKEN
    print("Found expected token:", chosen_token)

else:
    # Fallback: scan all 8-hex tokens in text
    hex_pat = re.compile(r'\b[0-9a-fA-F]{8}\b')
    cnt = Counter()

    for t in merged["text"].fillna("").astype(str):
        for m in hex_pat.findall(t):
            cnt[m.upper()] += 1

    if not cnt:
        raise SystemExit("No 8-hex pattern found.")

    chosen_token = cnt.most_common(1)[0][0]
    print("Chosen fallback token:", chosen_token)

    found = merged[ merged["text"].fillna("").str.contains(chosen_token, case=False, na=False) ]

if found.empty:
    raise SystemExit("Token not found in dataset.")

print("Matched rows:", len(found))
found.head()


Candidate rows: 2216
Found expected token: B347559E
Matched rows: 1


Unnamed: 0,rating,title_rev,text,images_rev,asin,parent_asin,user_id,timestamp,helpful_vote,verified_purchase,...,description,features_text,dimensions,item_weight,images_book,videos,store,bought_together,title,book_title_for_flag
716922,5.0,Top Recommendation,perfect B347559E amazing read,,60530472,,,0,0,True,...,,,,,,,,,,


In [16]:
row = found.iloc[0]
asin = row["parent_asin"]
title = row["book_title_for_flag"]

FLAG2 = f"FLAG2{{{chosen_token}}}"

first8 = "".join(title.replace(" ",""))[:8] or asin[:8]
FLAG1 = hashlib.sha256(first8.encode()).hexdigest()

print("ASIN:", asin)
print("Title:", title)
print("first8:", first8)
print("FLAG1:", FLAG1)
print("FLAG2:", FLAG2)


ASIN: 
Title: 
first8: 
FLAG1: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
FLAG2: FLAG2{B347559E}


In [17]:
book_reviews = merged[merged["parent_asin"] == asin].copy()
if book_reviews.empty:
    book_reviews = merged[merged["book_title_for_flag"] == title].copy()

book_reviews["text"] = book_reviews["text"].fillna("").astype(str)
book_reviews["length"] = book_reviews["text"].str.len()

# FAST regex superlative counter
def super_count_fast(text):
    if not isinstance(text, str): return 0
    t = text.lower()
    base = ["best","perfect","greatest","amazing","incredible","excellent","fantastic","outstanding"]
    c = sum(t.count(w) for w in base)
    for token in re.findall(r"[0-9A-Za-z]+", text):
        if token.lower().endswith("est") and len(token) > 3:
            c += 1
    return c

book_reviews["superc"] = book_reviews["text"].apply(super_count_fast)

print("Total Reviews:", len(book_reviews))


Total Reviews: 150


In [18]:
book_reviews["suspicious"] = (
    (book_reviews["rating"] == 5) &
    (book_reviews["length"] < 70) &
    (book_reviews["superc"] >= 1)
).astype(int)

if book_reviews["suspicious"].nunique() == 1:
    q = book_reviews["length"].quantile(0.25)
    book_reviews["suspicious"] = (book_reviews["length"] <= q).astype(int)
    print("Fallback labeling used.")

book_reviews["suspicious"].value_counts()


Fallback labeling used.


Unnamed: 0_level_0,count
suspicious,Unnamed: 1_level_1
0,104
1,46


In [19]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import RandomForestClassifier
from scipy.sparse import hstack

tfidf = TfidfVectorizer(max_features=1500, stop_words="english")
X_text = tfidf.fit_transform(book_reviews["text"])

X = hstack([X_text, book_reviews[["length","superc"]].values])
y = book_reviews["suspicious"].values

clf = RandomForestClassifier(n_estimators=200, random_state=42)
clf.fit(X, y)

book_reviews["susp_prob"] = clf.predict_proba(X)[:,1]
book_reviews.sort_values("susp_prob", ascending=False).head()


Unnamed: 0,rating,title_rev,text,images_rev,asin,parent_asin,user_id,timestamp,helpful_vote,verified_purchase,...,images_book,videos,store,bought_together,title,book_title_for_flag,length,superc,suspicious,susp_prob
15896,5.0,Top Recommendation,Best read ever B58AA083,,006020687X,,,0,0,True,...,,,,,,,23,2,1,1.0
27288,5.0,Top Recommendation,Best read ever F7F561B4,,0007183038,,,0,0,True,...,,,,,,,23,2,1,1.0
151782,5.0,Top Recommendation,Best book ever D8B05ED1,,0060194839,,,0,0,True,...,,,,,,,23,2,1,1.0
167066,5.0,Top Recommendation,Best read ever 34285B65,,003057059X,,,0,0,True,...,,,,,,,23,2,1,1.0
696609,5.0,Top Recommendation,Best book ever BD9DB0DB,,0008275505,,,0,0,True,...,,,,,,,23,2,1,1.0


In [20]:
import shap
from scipy.sparse import issparse
import math
import numpy as np

thr = book_reviews["susp_prob"].quantile(0.30)
genuine_idx = np.where(book_reviews["susp_prob"] <= thr)[0]

MAX_SHAP = 150
sample_idx = genuine_idx[:MAX_SHAP]

if issparse(X):
    X = X.tocsr()

X_sample = X[sample_idx]

# Convert to dense
X_shap = X_sample.toarray()

explainer = shap.TreeExplainer(clf)
shap_vals = explainer.shap_values(X_shap)[1]  # suspicious class

mean_shap = shap_vals.mean(axis=0)
feature_names = list(tfidf.get_feature_names_out()) + ["_length","_superc"]
idx_sorted = np.argsort(mean_shap)


In [21]:
top3 = []
for i in idx_sorted:
    fn = feature_names[i]
    if fn not in ["_length","_superc"]:
        top3.append(fn)
    if len(top3) == 3:
        break

print("Top 3 SHAP-negative words:", top3)


Top 3 SHAP-negative words: ['059dbb84', '00c992b8']


In [22]:
import re

num = "".join(re.findall(r"\d+", ID)) or "1"
concat = "".join(top3) + num

FLAG3 = "FLAG3{" + hashlib.sha256(concat.encode()).hexdigest()[:10] + "}"

print("Concat used:", concat)
print("FLAG3:", FLAG3)


Concat used: 059dbb8400c992b8149
FLAG3: FLAG3{1df6d499b1}


In [23]:
os.makedirs(OUTDIR, exist_ok=True)

with open(f"{OUTDIR}/flags.txt","w") as f:
    f.write(f"FLAG1 = {FLAG1}\n")
    f.write(f"FLAG2 = {FLAG2}\n")
    f.write(f"FLAG3 = {FLAG3}\n")

with open(f"{OUTDIR}/README.md","w") as f:
    f.write("Short summary of approach...\n")

with open(f"{OUTDIR}/reflection.md","w") as f:
    f.write("Reflection 150-250 words...\n")

print("Saved files to:", OUTDIR)
os.listdir(OUTDIR)


Saved files to: /content/drive/MyDrive/FLAG_CHALLENGE/CTF_STU149


['flags.txt', 'README.md', 'reflection.md']