In [1]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import os
from functools import reduce
import re
import json
import numpy as np
import faiss
from textblob import TextBlob

from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, col, count, avg, sum as spark_sum, when
from pyspark.sql.functions import lit, to_date, col, concat_ws

from sentence_transformers import SentenceTransformer
import nltk
nltk.download('vader_lexicon') # Download vader_lexicon
from nltk.sentiment.vader import SentimentIntensityAnalyzer

  from .autonotebook import tqdm as notebook_tqdm
[nltk_data] Downloading package vader_lexicon to
[nltk_data]     /Users/sherlockpi/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


# Data Transformation

## 1. Load into Spark

Start a Spark session for data transformation

In [2]:
spark = SparkSession.builder \
    .appName("UbisoftDataTransform") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/07 14:49:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Load the various parquet files into Spark:

In [3]:
df_stock           = spark.read.parquet("data/ubisoft_stock.parquet")
df_news            = spark.read.parquet("data/ubisoft_news.parquet")
df_steam_reviews   = spark.read.parquet("data/steam_reviews.parquet")
df_reddit_posts    = spark.read.parquet("data/acshadows_reddit_posts.parquet")
df_reddit_comments = spark.read.parquet("data/acshadows_reddit_comments.parquet")

                                                                                

## 2. Fact Table: daily_summary

Standardise all datetime columns to join and aggregate by date later.

In [5]:
df_stock = df_stock.withColumn("date", to_date(col("Date")))  

# For df_news: convert the 'date' column (currently string) to a date type.
df_news = df_news.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))

# For df_steam_reviews: convert review_date to a date.
df_steam_reviews = df_steam_reviews.withColumn("date", to_date(col("review_date")))

# For df_reddit_posts: convert created_date to a date.
df_reddit_posts = df_reddit_posts.withColumn("date", to_date(col("created_date")))

# For df_reddit_comments: convert comment_created_date to a date.
df_reddit_comments = df_reddit_comments.withColumn("date", to_date(col("comment_created_date")))

Aggregate each data source by date.

In [6]:
# Aggregate Stock Data
df_stock_daily = df_stock.select("date", "Open", "Close", "Volume")

# Aggregate Steam Reviews: count reviews, average playtime, and calculate % positive reviews
df_reviews_daily = df_steam_reviews.groupBy("date").agg(
    count("*").alias("num_reviews"),
    avg("playtime_hours").alias("avg_playtime_hours"),
    (spark_sum(when(col("voted_up") == True, 1).otherwise(0)) / count("*")).alias("percent_positive")
)

# Aggregate Reddit Posts: count posts, and average score
df_reddit_posts_daily = df_reddit_posts.groupBy("date").agg(
    count("*").alias("num_reddit_posts"),
    avg("score").alias("avg_reddit_score")
)

# Aggregate Reddit Comments: count daily comments
df_reddit_comments_daily = df_reddit_comments.groupBy("date").agg(
    count("*").alias("num_reddit_comments")
)

# News – count the number of news articles per day
df_news_daily = df_news.groupBy("date").agg(
    count("*").alias("num_news_articles")
)

Join DataFrames: join all these daily aggregates on the common key (date) to produce a unified view (the fact table).

In [7]:
# Create a list of dataframes to join (using a full outer join to include all dates)
dfs = [df_stock_daily, df_reviews_daily, df_reddit_posts_daily, df_reddit_comments_daily, df_news_daily]

# Reduce the list with successive joins on the "date" column
df_unified = reduce(lambda df1, df2: df1.join(df2, on="date", how="full"), dfs)

# Select and rename columns to match the designed schema
df_unified = df_unified.select(
    "date",
    col("Open").alias("stock_open"),
    col("Close").alias("stock_close"),
    col("Volume").alias("stock_volume"),
    "num_reviews",
    "avg_playtime_hours",
    "percent_positive",
    "num_reddit_posts",
    "avg_reddit_score",
    "num_reddit_comments",
    "num_news_articles"
)

In [8]:
# Show the first 10 rows of the unified dataset
df_unified.orderBy(col("date").desc()).show(10, truncate=False)
df_unified.printSchema()

# Write the unified dataset to a parquet file for further analysis
df_unified.write.mode("overwrite").parquet("unified_dataset.parquet")

                                                                                

+----------+------------------+------------------+------------+-----------+------------------+------------------+----------------+------------------+-------------------+-----------------+
|date      |stock_open        |stock_close       |stock_volume|num_reviews|avg_playtime_hours|percent_positive  |num_reddit_posts|avg_reddit_score  |num_reddit_comments|num_news_articles|
+----------+------------------+------------------+------------+-----------+------------------+------------------+----------------+------------------+-------------------+-----------------+
|2025-04-05|NULL              |NULL              |NULL        |34         |38.470588235294116|0.7647058823529411|NULL            |NULL              |103                |NULL             |
|2025-04-04|10.194999694824219|9.54800033569336  |1626113     |120        |38.11424999999999 |0.825             |NULL            |NULL              |601                |NULL             |
|2025-04-03|10.399999618530273|10.345000267028809|765150    

## 3. Text Table: textual_context (for NLP / RAG)

Create a unified text table.

In [9]:
# Steam Reviews
df_steam_text = df_steam_reviews.select(
    to_date(col("review_date")).alias("date"),
    lit("steam_review").alias("source"),
    col("review").alias("content"),
    col("review_id").alias("id"),
    lit(None).cast("string").alias("url")
).where(col("review").isNotNull())

# Reddit Posts
df_reddit_posts_text = df_reddit_posts.select(
    to_date(col("created_date")).alias("date"),
    lit("reddit_post").alias("source"),
    concat_ws("\n", col("title"), col("selftext")).alias("content"),  # combine title + body
    col("id").alias("id"),
    col("url")
).where(col("title").isNotNull() | col("selftext").isNotNull())

# Reddit Comments
df_reddit_comments_text = df_reddit_comments.select(
    to_date(col("comment_created_date")).alias("date"),
    lit("reddit_comment").alias("source"),
    col("comment_body").alias("content"),
    col("comment_id").alias("id"),
    lit(None).cast("string").alias("url")
).where(col("comment_body").isNotNull())

# Ubisoft News
df_news_text = df_news.select(
    to_date(col("date")).alias("date"),
    lit("ubisoft_news").alias("source"),
    col("headline").alias("content"),
    col("headline").alias("id"),  # using headline as ID (or hash if you prefer)
    lit("https://news.ubisoft.com/en-gb/").alias("url")
).where(col("headline").isNotNull())

In [38]:
# Union all
df_textual_context = df_steam_text.unionByName(df_reddit_posts_text)\
                                  .unionByName(df_reddit_comments_text)\
                                  .unionByName(df_news_text)

# Preview
df_textual_context.show(5, truncate=80)
df_textual_context.printSchema()\

# Save to parquet
df_textual_context.write.mode("overwrite").parquet("textual_context.parquet")

+----------+------------+--------------------------------------------------------------------------------+---------+----+
|      date|      source|                                                                         content|       id| url|
+----------+------------+--------------------------------------------------------------------------------+---------+----+
|2025-04-05|steam_review|                                                                        Its fun.|192017461|NULL|
|2025-04-05|steam_review|                                                    "same shit different toilet"|192017163|NULL|
|2025-04-05|steam_review|I was a huge ac fan all the way back from AC 2. But some how starting from th...|192017026|NULL|
|2025-04-05|steam_review|very good, Assassins creed in Japan has been a long awaited installment in th...|192016829|NULL|
|2025-04-05|steam_review|                                                                 a piece of shit|192016602|NULL|
+----------+------------

                                                                                

Preprocessing (cleaning and generate embeddings) text for RAG workflow.

In [None]:
# Load textual context into pandas df
df = df_textual_context.toPandas()

# Define helper functions for text cleaning and chunking
def clean_text(text):
    """Normalize text by lowercasing and removing excessive whitespace."""
    text = text.lower()
    text = re.sub(r'\s+', ' ', text).strip()
    return text

def chunk_text(text, chunk_size=100, overlap=20):
    """
    Split text into chunks of up to `chunk_size` words with an overlap.
    Uses simple whitespace tokenization.
    """
    words = text.split()
    if len(words) <= chunk_size:
        return [text]
    
    chunks = []
    start = 0
    while start < len(words):
        end = min(start + chunk_size, len(words))
        chunk = " ".join(words[start:end])
        chunks.append(chunk)
        if end == len(words):
            break
        start = end - overlap  # Overlap for context continuity
    return chunks

                                                                                

In [None]:
# Clean and chunk the textual content
df['clean_content'] = df['content'].apply(lambda x: clean_text(x) if isinstance(x, str) else "")
df['chunks'] = df['clean_content'].apply(lambda x: chunk_text(x, chunk_size=100, overlap=20))

# Explode the list of chunks so each chunk gets its own row while preserving metadata
df_chunks = df.explode('chunks').reset_index(drop=True)
df_chunks.rename(columns={'chunks': 'text_chunk'}, inplace=True)

In [None]:
# Generate embeddings for each text chunk using a pre-trained SentenceTransformer model
model = SentenceTransformer('all-MiniLM-L6-v2')  # Choose a model that suits your needs
df_chunks['embedding'] = df_chunks['text_chunk'].apply(lambda x: model.encode(x).tolist())

Additional NLP feature engineering.

In [27]:
# Function to compute TextBlob-based sentiment and objectivity scores
def compute_textblob_sentiment(text):
    """
    Returns:
      - polarity: sentiment score between -1 (negative) and 1 (positive)
      - subjectivity: score between 0 (objective) and 1 (subjective)
      - objectivity: computed as 1 - subjectivity
    """
    blob = TextBlob(text)
    polarity = blob.sentiment.polarity
    subjectivity = blob.sentiment.subjectivity
    objectivity = 1 - subjectivity
    return polarity, subjectivity, objectivity

# Apply the TextBlob function to compute polarity, subjectivity, and objectivity
df_chunks[['polarity', 'subjectivity', 'objectivity']] = df_chunks['text_chunk'].apply(
    lambda x: pd.Series(compute_textblob_sentiment(x))
)

In [30]:
# Initialize VADER sentiment analyzer
sia = SentimentIntensityAnalyzer()

# Function to compute VADER sentiment scores
def compute_vader_scores(text):
    """
    Returns a pandas Series with VADER scores:
      - neg: Negative sentiment score
      - neu: Neutral sentiment score
      - pos: Positive sentiment score
      - compound: Normalized compound score (overall sentiment)
    """
    return pd.Series(sia.polarity_scores(text))

# Apply VADER to each text chunk
df_chunks[['vader_neg', 'vader_neu', 'vader_pos', 'vader_compound']] = df_chunks['text_chunk'].apply(
    compute_vader_scores
)

In [40]:
# Display a preview of the DataFrame with the new sentiment features
df_chunks.info()
df_chunks.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 20607 entries, 0 to 20606
Data columns (total 15 columns):
 #   Column          Non-Null Count  Dtype  
---  ------          --------------  -----  
 0   date            20607 non-null  object 
 1   source          20607 non-null  object 
 2   content         20607 non-null  object 
 3   id              20607 non-null  object 
 4   url             942 non-null    object 
 5   clean_content   20607 non-null  object 
 6   text_chunk      20607 non-null  object 
 7   embedding       20607 non-null  object 
 8   polarity        20607 non-null  float64
 9   subjectivity    20607 non-null  float64
 10  objectivity     20607 non-null  float64
 11  vader_neg       20607 non-null  float64
 12  vader_neu       20607 non-null  float64
 13  vader_pos       20607 non-null  float64
 14  vader_compound  20607 non-null  float64
dtypes: float64(7), object(8)
memory usage: 2.4+ MB


Unnamed: 0,date,source,content,id,url,clean_content,text_chunk,embedding,polarity,subjectivity,objectivity,vader_neg,vader_neu,vader_pos,vader_compound
0,2025-04-05,steam_review,Its fun.,192017461,,its fun.,its fun.,"[0.023728979751467705, -0.003998948726803064, ...",0.3,0.2,0.8,0.0,0.233,0.767,0.5106
1,2025-04-05,steam_review,"""same shit different toilet""",192017163,,"""same shit different toilet""","""same shit different toilet""","[0.007170666474848986, -0.012299861758947372, ...",-0.058333,0.38125,0.61875,0.545,0.455,0.0,-0.5574
2,2025-04-05,steam_review,I was a huge ac fan all the way back from AC 2...,192017026,,i was a huge ac fan all the way back from ac 2...,i was a huge ac fan all the way back from ac 2...,"[-0.029725132510066032, -0.01643313467502594, ...",-0.062155,0.400125,0.599875,0.031,0.875,0.094,0.6829
3,2025-04-05,steam_review,I was a huge ac fan all the way back from AC 2...,192017026,,i was a huge ac fan all the way back from ac 2...,"story line and the acting are generic, even un...","[-0.0009919829899445176, -0.017386212944984436...",0.054167,0.220833,0.779167,0.061,0.763,0.176,0.6478
4,2025-04-05,steam_review,"very good, Assassins creed in Japan has been a...",192016829,,"very good, assassins creed in japan has been a...","very good, assassins creed in japan has been a...","[-0.05293498933315277, 0.0007362031028605998, ...",0.43,0.59,0.41,0.042,0.837,0.121,0.4754


FAISS Index

In [None]:
# Build a FAISS index for efficient similarity search
embeddings = np.array(df_chunks['embedding'].tolist()).astype("float32")
embedding_dim = embeddings.shape[1]
index = faiss.IndexFlatL2(embedding_dim)  # L2 distance index
index.add(embeddings)

# Save the FAISS index to disk for later use
faiss.write_index(index, "faiss_index.index")

# Save metadata for each chunk
metadata = df_chunks.to_dict(orient='records')
with open("faiss_metadata.json", "w") as f:
    json.dump(metadata, f, default=str)

25/04/06 19:38:41 WARN TransportChannelHandler: Exception in connection from eduroam-int-dhcp-97-158-87.ucl.ac.uk/10.97.158.87:58916
java.io.IOException: Operation timed out
	at java.base/sun.nio.ch.SocketDispatcher.read0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:47)
	at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:330)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:284)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:259)
	at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:417)
	at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:254)
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:357)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.channel.nio.Ni