# **Spark-Powered Personalised Finance Recommender with NLP**
This demo showcases an end-to-end scalable pipeline that combines Spark’s big data processing capabilities with advanced NLP techniques to analyse customer sentiment from financial product reviews. Leveraging collaborative filtering and sentiment analysis, it delivers personalised product recommendations that enhance customer engagement and business insights for financial services.

**1. Environment Setup (PySpark in Colab):**

In [1]:
# Install Spark
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.4.3/spark-3.4.3-bin-hadoop3.tgz
!tar -xzf spark-3.4.3-bin-hadoop3.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.3-bin-hadoop3"

import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("FinanceRecommender").getOrCreate()

**2. Prepare Dummy Data:**

In [2]:
from pyspark.sql import Row

data = [
    Row(userId=1, itemId=101, rating=5.0, text="Low fees and great rewards program."),
    Row(userId=1, itemId=102, rating=3.0, text="Good interest rates but limited customer service."),
    Row(userId=2, itemId=101, rating=1.0, text="Hidden charges and poor online interface."),
    Row(userId=2, itemId=103, rating=4.0, text="Reliable service and responsive customer support."),
    Row(userId=3, itemId=104, rating=5.0, text="Excellent mobile app experience and fast transfers."),
    Row(userId=3, itemId=105, rating=2.0, text="High minimum balance and confusing terms."),
    Row(userId=4, itemId=102, rating=4.0, text="Helpful staff and competitive loan rates."),
    Row(userId=4, itemId=106, rating=1.0, text="App crashes frequently and support is unhelpful."),
    Row(userId=5, itemId=101, rating=3.0, text="Decent account features, but not innovative."),
    Row(userId=5, itemId=107, rating=5.0, text="Outstanding investment tools and analytics."),
    Row(userId=6, itemId=104, rating=2.0, text="Slow customer service and outdated interface."),
    Row(userId=6, itemId=108, rating=4.0, text="Good savings plan and clear statements."),
]

df = spark.createDataFrame(data)
df.show()

+------+------+------+--------------------+
|userId|itemId|rating|                text|
+------+------+------+--------------------+
|     1|   101|   5.0|Low fees and grea...|
|     1|   102|   3.0|Good interest rat...|
|     2|   101|   1.0|Hidden charges an...|
|     2|   103|   4.0|Reliable service ...|
|     3|   104|   5.0|Excellent mobile ...|
|     3|   105|   2.0|High minimum bala...|
|     4|   102|   4.0|Helpful staff and...|
|     4|   106|   1.0|App crashes frequ...|
|     5|   101|   3.0|Decent account fe...|
|     5|   107|   5.0|Outstanding inves...|
|     6|   104|   2.0|Slow customer ser...|
|     6|   108|   4.0|Good savings plan...|
+------+------+------+--------------------+



**3. Basic NLP Pipeline (Spark only):**

Scalable NLP - Spark NLP pipeline that processes user reviews

- Scalable: Uses distributed processing in Spark — not just small datasets in memory.
- NLP: Handles tokenization, stopword removal, TF-IDF vectorization.
- Pipeline Architecture: Prepares data for downstream ML tasks like classification or recommendation.

*This code shows ability to design scalable NLP pipelines using distributed frameworks — a key requirement for roles involving big data and AI at enterprise scale.*

Defines a Spark ML pipeline for scalable NLP, performing:
- Tokenization
- Stop word removal
- TF-IDF vectorization
- Logistic regression classification:

Perfect for sentiment classification or categorizing user feedback, especially in high-volume financial platforms.

*Potential next step: add Named Entity Recognition (NER), sentiment classification, or BERT embeddings using Spark NLP.*



In [6]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol="text", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
tf = HashingTF(inputCol="filtered", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")
classifier = LogisticRegression(featuresCol="features", labelCol="rating")

nlp_pipeline = Pipeline(stages=[tokenizer, remover, tf, idf, classifier])
nlp_model = nlp_pipeline.fit(df)

df_predicted = nlp_model.transform(df)
df_predicted.select("userId", "itemId", "text", "prediction").show()

+------+------+--------------------+----------+
|userId|itemId|                text|prediction|
+------+------+--------------------+----------+
|     1|   101|Low fees and grea...|       5.0|
|     1|   102|Good interest rat...|       3.0|
|     2|   101|Hidden charges an...|       1.0|
|     2|   103|Reliable service ...|       4.0|
|     3|   104|Excellent mobile ...|       5.0|
|     3|   105|High minimum bala...|       2.0|
|     4|   102|Helpful staff and...|       4.0|
|     4|   106|App crashes frequ...|       1.0|
|     5|   101|Decent account fe...|       3.0|
|     5|   107|Outstanding inves...|       5.0|
|     6|   104|Slow customer ser...|       2.0|
|     6|   108|Good savings plan...|       4.0|
+------+------+--------------------+----------+



**4. ALS Collaborative Filtering:**

- Collaborative Filtering – ALS-based personalized product recommendation
- uses Spark MLlib's Alternating Least Squares (ALS) algorithm to generate personalized recommendations

Personalized product/content recommendation:
- Suggesting financial tools/products to users
- Offering tailored investment content

In [7]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

als = ALS(userCol="userId", itemCol="itemId", ratingCol="rating", coldStartStrategy="drop")
als_model = als.fit(df)

userRecs = als_model.recommendForAllUsers(2)
userRecs.show(truncate=False)


+------+------------------------------------+
|userId|recommendations                     |
+------+------------------------------------+
|1     |[{107, 5.12471}, {101, 4.826678}]   |
|2     |[{103, 3.8860695}, {102, 1.4257795}]|
|3     |[{104, 4.852529}, {108, 2.6690392}] |
|4     |[{102, 3.8416784}, {101, 2.7848132}]|
|5     |[{107, 4.899176}, {101, 2.9993615}] |
|6     |[{108, 3.8856165}, {104, 2.0011468}]|
+------+------------------------------------+



**5. Model Serialisation (Optional in Colab):**

- Big Data Readiness – Uses Spark pipelines + serialization"
- use Spark ML pipelines and serialize models

In [8]:
als_model.write().overwrite().save("/content/als_model")
nlp_model.write().overwrite().save("/content/nlp_pipeline_model")

**6. Wrap-up application relevance**

| Highlight                   | Example                                                     |
| --------------------------- | ----------------------------------------------------------- |
| **Scalable NLP**            | Spark NLP pipeline that processes user reviews              |
| **Collaborative Filtering** | ALS-based personalized product recommendation               |
| **Big Data Readiness**      | Uses Spark pipelines + serialization                        |
| **Practical Application**   | Can recommend financial products, assess customer sentiment |

For larger datasets, tap into several real-world and publicly available sources, especially for finance/product reviews and user interactions:

- Financial product reviews from sites like:
- Yelp/Factual (business reviews with some financial products)
- Amazon Reviews (for financial books, software, devices)
- Kaggle datasets: e.g., Amazon product reviews, Yelp reviews

**Open financial datasets:**

- Quandl or Alpha Vantage — for market and financial product data (may include user ratings/comments if integrated)
- Financial news/commentary APIs — could be mined for sentiment analysis
- Banking or financial app data (if you have access or through partnerships):
- Real user feedback on loans, credit cards, investment products
- Transactional data with implicit ratings or feedback
- Synthetic or simulated data generation:
- Generate large-scale simulated user-item interactions and review texts to test scalability

Social media and forums:
- Twitter, Reddit (finance-related subreddits), StockTwits — rich sources for sentiment and user opinion mining

**Zip the model as an archive folder:**

In [None]:
!zip -r als_model.zip /content/als_model
!zip -r nlp_pipeline_model.zip /content/nlp_pipeline_model

  adding: content/als_model/ (stored 0%)
  adding: content/als_model/itemFactors/ (stored 0%)
  adding: content/als_model/itemFactors/part-00003-a6519213-e28c-4d63-857d-db11adda1a64-c000.snappy.parquet (deflated 30%)
  adding: content/als_model/itemFactors/.part-00002-a6519213-e28c-4d63-857d-db11adda1a64-c000.snappy.parquet.crc (stored 0%)
  adding: content/als_model/itemFactors/._SUCCESS.crc (stored 0%)
  adding: content/als_model/itemFactors/.part-00003-a6519213-e28c-4d63-857d-db11adda1a64-c000.snappy.parquet.crc (stored 0%)
  adding: content/als_model/itemFactors/part-00002-a6519213-e28c-4d63-857d-db11adda1a64-c000.snappy.parquet (deflated 30%)
  adding: content/als_model/itemFactors/part-00000-a6519213-e28c-4d63-857d-db11adda1a64-c000.snappy.parquet (deflated 31%)
  adding: content/als_model/itemFactors/.part-00001-a6519213-e28c-4d63-857d-db11adda1a64-c000.snappy.parquet.crc (stored 0%)
  adding: content/als_model/itemFactors/.part-00000-a6519213-e28c-4d63-857d-db11adda1a64-c000.sn

In [None]:
from google.colab import files
files.download('als_model.zip')
files.download('nlp_pipeline_model.zip')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

**Load the model back to use it:**

In [9]:
from pyspark.ml.recommendation import ALSModel
from pyspark.ml import PipelineModel

# Load ALS model
als_model_loaded = ALSModel.load("/content/als_model")

# Load NLP pipeline model
nlp_model_loaded = PipelineModel.load("/content/nlp_pipeline_model")

Prepare some example input data -
Create a small DataFrame similar to the training data to run predictions on:

In [26]:
from pyspark.sql import Row

test_data = [
    Row(userId=1, itemId=103, rating=0.0, text="Unreliable service and support is unhelpful."),
    Row(userId=2, itemId=101, rating=0.0, text="Sophisticated investment tools and competitive loan rates."),
]

test_df = spark.createDataFrame(test_data)

Verify user/item IDs in test data vs training data:

Note:
- Cold start problem: By default, the ALS model cannot predict for user or item IDs that were not present in the training data. These unknown users/items get filtered out (especially if you used coldStartStrategy="drop").

- Input Data: If test_df contains user or item IDs not in the training set, predictions for those pairs will be missing because the model doesn’t know how to handle them.

In [27]:
df.select("userId").distinct().show()
df.select("itemId").distinct().show()

test_df.select("userId").distinct().show()
test_df.select("itemId").distinct().show()

+------+
|userId|
+------+
|     1|
|     3|
|     2|
|     6|
|     5|
|     4|
+------+

+------+
|itemId|
+------+
|   103|
|   104|
|   105|
|   101|
|   102|
|   107|
|   106|
|   108|
+------+

+------+
|userId|
+------+
|     1|
|     2|
+------+

+------+
|itemId|
+------+
|   103|
|   101|
+------+



**Run predictions:**

For the ALS recommendation model (predict ratings or get recommendations):
- Learns embeddings for users and items
- Makes predictions based on user–item interactions only

Output: User (ID) is likely to rate Item (ID) around level (prediction)

In [28]:
als_predictions = als_model.transform(test_df)
#als_predictions = als_model_loaded.transform(test_df)
als_predictions.select("userId", "itemId", "prediction").show()

+------+------+----------+
|userId|itemId|prediction|
+------+------+----------+
|     2|   101|  0.999046|
|     1|   103| 1.4784565|
+------+------+----------+



For the NLP pipeline (predict sentiment or classification):
- Learns from language content
- Predicts sentiment, often used to augment or explain ratings

Output: 5 could mean most the positive sentiment, and 0 means negative

In [29]:
nlp_predictions = nlp_model.transform(test_df)
# nlp_predictions = nlp_model_loaded.transform(test_df)
nlp_predictions.select("userId", "itemId", "text", "prediction").show()

+------+------+--------------------+----------+
|userId|itemId|                text|prediction|
+------+------+--------------------+----------+
|     1|   103|Unreliable servic...|       1.0|
|     2|   101|Sophisticated inv...|       4.0|
+------+------+--------------------+----------+



**Outputs Explained:**

ALS: prediction shows the estimated rating for user-item pairs, which you can interpret as a recommendation score.

NLP pipeline: prediction shows the predicted class label (e.g., sentiment) for the input text.