# Installation Tutorial &  Lab
## Dr. Aurelle TCHAGNA
# Course: Advanced AI / Big Data Lab (Landmark University, M.Tech)  
**Goal:** Install and validate **Apache Spark**, **PySpark**, **MongoDB**, **PyMongo**, **MongoDB Spark Connector**, and **MongoDB Compass**, then practice PySpark with a **WordCount** pipeline and DataFrame analytics.

This notebook contains:  
1 installation steps (Windows/Linux/macOS)  
2 verification commands  
3 PySpark lab (RDD + DataFrame)  
4 MongoDB operations (PyMongo + Spark Connector config)  
5 exercises


## 0) What you will install
### Required
- **Java (JDK 11 or 17)**: Spark runs on the JVM.
- **Apache Spark**: distributed compute engine.
- **Python 3.9 to 3.11.14 (Higher versions have reported bugs for windows)**: for PySpark scripts and notebooks.
- **PySpark**: Python API for Spark.
- **MongoDB Community Server**: NoSQL database.
- **PyMongo**: Python driver for MongoDB.
- **MongoDB Compass**: GUI to view/edit MongoDB data.

### Optional (Recommended)
- **MongoDB Spark Connector**: enables Spark to read/write MongoDB efficiently.


## 1) Install Java (JDK)
### Windows
1. Install **JDK 17** (or 11).
2. Set environment variables:
   - `JAVA_HOME=C:\Program Files\Java\jdk-17`
   - Add `%JAVA_HOME%\bin` to `PATH`

### Ubuntu/Debian
```bash
sudo apt update
sudo apt install -y openjdk-17-jdk
java -version
```

### macOS (Homebrew)
```bash
brew install openjdk@17
java -version
```

✅ **Check**
```bash
java -version
echo %JAVA_HOME%   # Windows
echo $JAVA_HOME    # Linux/macOS
```


## 2) Install Apache Spark
Download Spark (prebuilt for Hadoop) from the official Apache Spark site.

**Recommendations for Windows Users**
- Choose a Spark release: 3.5.8 (Jan 15 2026)
- Choose a package type: Pre-built for Apache Hadoop 3.3 and later (Scala 2.13)

After extracting, set:
- `SPARK_HOME` to the Spark folder
- Add `$SPARK_HOME/bin` to `PATH`

**Windows example**
- `SPARK_HOME=C:\spark\spark-3.5.1-bin-hadoop3`
- Add `C:\spark\spark-3.5.1-bin-hadoop3\bin` to PATH

**Linux/macOS example**
```bash
export SPARK_HOME=$HOME/spark/spark-3.5.1-bin-hadoop3
export PATH=$SPARK_HOME/bin:$PATH
```

✅ **Check**
```bash
spark-submit --version
pyspark --version
```


## 3) Install Python packages (PySpark + PyMongo)
Create a virtual environment (recommended) then install packages.

### Windows (PowerShell)
```powershell
python -m venv .venv
.venv\Scripts\activate
pip install --upgrade pip
pip install pyspark==3.5.8 pymongo
```

### Linux/macOS
```bash
python3 -m venv .venv
source .venv/bin/activate
pip install --upgrade pip
pip install pyspark pymongo
```


## 4) Install MongoDB Community Server + Compass
### MongoDB Server
Install MongoDB Community Server (Windows/macOS/Linux) from MongoDB official downloads.

✅ After installation, ensure MongoDB is running.

**Windows**
- Open **Services** → start **MongoDB Server**  
- Default URI: `mongodb://localhost:27017`

**Linux (systemd)**
```bash
sudo systemctl start mongod
sudo systemctl status mongod
```

### MongoDB Compass
Install **MongoDB Compass** and connect using:
- `mongodb://localhost:27017`


## 6) Quick verification inside Jupyter
Run the next cells. They check PySpark/PyMongo and start a local Spark session.


## 5) MongoDB Spark Connector (important notes)
Spark needs the MongoDB connector **JAR** when reading/writing MongoDB via Spark.

### Option A: Use Maven coordinate (easy)
In SparkSession config:
- `org.mongodb.spark:mongo-spark-connector_2.12:10.3.0` (example)

### Option B: Download connector JAR manually
Provide it to Spark using `--jars` or `spark.jars`.


In [40]:
# !pip install pyspark==3.5.1 findspark


In [42]:
!java --version


openjdk 17.0.17 2025-10-21
OpenJDK Runtime Environment Temurin-17.0.17+10 (build 17.0.17+10)
OpenJDK 64-Bit Server VM Temurin-17.0.17+10 (build 17.0.17+10, mixed mode, sharing)


In [38]:
import pyspark
print(pyspark.__version__)


3.5.8


In [39]:
import pymongo
print(pymongo.__version__)


4.16.0


In [41]:
!python --version

Python 3.11.14


In [74]:
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [75]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MongoSparkExample") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.13:10.4.0") \
    .config("spark.mongodb.read.connection.uri", "mongodb://localhost:27017/testdb.testcollection") \
    .config("spark.mongodb.write.connection.uri", "mongodb://localhost:27017/testdb.testcollection") \
    .getOrCreate()

print("Spark connected to MongoDB successfully!")


Spark connected to MongoDB successfully!


In [77]:
import sys
print("Python:", sys.version)

try:
    import pyspark
    print("PySpark version:", pyspark.__version__)
except Exception as e:
    print("PySpark error:", e)

try:
    import pymongo
    print("PyMongo version:", pymongo.__version__)
except Exception as e:
    print("PyMongo error:", e)


Python: 3.11.14 | packaged by Anaconda, Inc. | (main, Oct 21 2025, 18:30:03) [MSC v.1929 64 bit (AMD64)]
PySpark version: 3.5.8
PyMongo version: 4.16.0


In [78]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .appName("Landmark-PySpark-Lab")
         .master("local[*]")
         .getOrCreate())

spark


In [79]:
spark.version


'3.5.8'

# Part A — PySpark Lab (RDD): WordCount 


In [48]:
text = [
    "Spark is fast. Spark is general-purpose.",
    "PySpark lets you use Spark with Python.",
    "Big data processing with Spark is scalable and efficient.",
    "MongoDB is a NoSQL database. PyMongo connects Python to MongoDB."
]

rdd = spark.sparkContext.parallelize(text)
rdd.take(3)


['Spark is fast. Spark is general-purpose.',
 'PySpark lets you use Spark with Python.',
 'Big data processing with Spark is scalable and efficient.']

In [50]:
text = [
    "Spark is fast. Spark is general-purpose.",
    "PySpark lets you use Spark with Python.",
    "Big data processing with Spark is scalable and efficient.",
    "MongoDB is a NoSQL database. PyMongo connects Python to MongoDB."
]

try:
    rdd = spark.sparkContext.parallelize(text)
    rdd.take(3)
except Exception as err:
    print(f"Error parallelize: {err}")


In [51]:
import re

def tokenize(line: str):
    return re.findall(r"[a-z0-9]+", line.lower())

word_counts = (
    rdd.flatMap(tokenize)
       .map(lambda w: (w, 1))
       .reduceByKey(lambda a, b: a + b)
       .sortBy(lambda x: x[1], ascending=False)
)

word_counts.take(20)


[('is', 4),
 ('spark', 4),
 ('python', 2),
 ('mongodb', 2),
 ('with', 2),
 ('and', 1),
 ('pymongo', 1),
 ('pyspark', 1),
 ('general', 1),
 ('you', 1),
 ('connects', 1),
 ('processing', 1),
 ('a', 1),
 ('database', 1),
 ('fast', 1),
 ('lets', 1),
 ('use', 1),
 ('big', 1),
 ('to', 1),
 ('purpose', 1)]

In [55]:
# Save results (local folder)
out_dir = "wordcount_out"

import shutil, os
if os.path.exists(out_dir):
    shutil.rmtree(out_dir)

word_counts.coalesce(1).saveAsTextFile(out_dir)
print("Saved to:", out_dir)


Saved to: wordcount_out


# Part B — PySpark Lab (DataFrames): Cleaning + analytics


In [56]:
from pyspark.sql import functions as F

df_wc = word_counts.toDF(["word", "count"])
df_wc.show(10)


+-------+-----+
|   word|count|
+-------+-----+
|     is|    4|
|  spark|    4|
| python|    2|
|mongodb|    2|
|   with|    2|
|    and|    1|
|pymongo|    1|
|pyspark|    1|
|general|    1|
|    you|    1|
+-------+-----+
only showing top 10 rows



In [57]:
# Top 10 words
df_wc.orderBy(F.desc("count")).limit(10).show()


+-------+-----+
|   word|count|
+-------+-----+
|     is|    4|
|  spark|    4|
| python|    2|
|mongodb|    2|
|   with|    2|
|    and|    1|
|pymongo|    1|
|pyspark|    1|
|general|    1|
|    you|    1|
+-------+-----+



In [58]:
# Add derived features
total_words = df_wc.agg(F.sum("count").alias("total")).collect()[0]["total"]
df_features = (df_wc
               .withColumn("length", F.length("word"))
               .withColumn("freq", F.col("count") / F.lit(total_words)))

df_features.orderBy(F.desc("count")).show(10)


+-------+-----+------+--------------------+
|   word|count|length|                freq|
+-------+-----+------+--------------------+
|     is|    4|     2| 0.12121212121212122|
|  spark|    4|     5| 0.12121212121212122|
| python|    2|     6| 0.06060606060606061|
|mongodb|    2|     7| 0.06060606060606061|
|   with|    2|     4| 0.06060606060606061|
|    and|    1|     3|0.030303030303030304|
|pymongo|    1|     7|0.030303030303030304|
|pyspark|    1|     7|0.030303030303030304|
|general|    1|     7|0.030303030303030304|
|    you|    1|     3|0.030303030303030304|
+-------+-----+------+--------------------+
only showing top 10 rows



In [59]:
# Spark SQL
df_features.createOrReplaceTempView("wc")

query = '''
SELECT word, count, length, freq
FROM wc
WHERE length >= 6
ORDER BY count DESC
LIMIT 10
'''
spark.sql(query).show()


+----------+-----+------+--------------------+
|      word|count|length|                freq|
+----------+-----+------+--------------------+
|    python|    2|     6| 0.06060606060606061|
|   mongodb|    2|     7| 0.06060606060606061|
|   pymongo|    1|     7|0.030303030303030304|
|   pyspark|    1|     7|0.030303030303030304|
|   general|    1|     7|0.030303030303030304|
|  connects|    1|     8|0.030303030303030304|
|processing|    1|    10|0.030303030303030304|
|  database|    1|     8|0.030303030303030304|
|   purpose|    1|     7|0.030303030303030304|
| efficient|    1|     9|0.030303030303030304|
+----------+-----+------+--------------------+



# Part C — MongoDB with PyMongo 
Make sure MongoDB is running and Compass can connect to `mongodb://localhost:27017`.


In [60]:
from pymongo import MongoClient

MONGO_URI = "mongodb://localhost:27017"
client = MongoClient(MONGO_URI)

db = client["landmark_ai_lab"]
col = db["wordcount"]

# reset collection
col.delete_many({})

top50 = df_features.orderBy(F.desc("count")).limit(50).toPandas()
records = top50.to_dict(orient="records")
col.insert_many(records)

print("Inserted documents:", col.count_documents({}))
records[:2]


Inserted documents: 24


[{'word': 'is',
  'count': 4,
  'length': 2,
  'freq': 0.12121212121212122,
  '_id': ObjectId('697c4f7089189cc886d65374')},
 {'word': 'spark',
  'count': 4,
  'length': 5,
  'freq': 0.12121212121212122,
  '_id': ObjectId('697c4f7089189cc886d65375')}]

In [61]:
# Read back (PyMongo)
list(col.find({}, {"_id": 0}).sort("count", -1).limit(10))


[{'word': 'is', 'count': 4, 'length': 2, 'freq': 0.12121212121212122},
 {'word': 'spark', 'count': 4, 'length': 5, 'freq': 0.12121212121212122},
 {'word': 'mongodb', 'count': 2, 'length': 7, 'freq': 0.06060606060606061},
 {'word': 'with', 'count': 2, 'length': 4, 'freq': 0.06060606060606061},
 {'word': 'python', 'count': 2, 'length': 6, 'freq': 0.06060606060606061},
 {'word': 'pyspark', 'count': 1, 'length': 7, 'freq': 0.030303030303030304},
 {'word': 'general', 'count': 1, 'length': 7, 'freq': 0.030303030303030304},
 {'word': 'you', 'count': 1, 'length': 3, 'freq': 0.030303030303030304},
 {'word': 'and', 'count': 1, 'length': 3, 'freq': 0.030303030303030304},
 {'word': 'pymongo', 'count': 1, 'length': 7, 'freq': 0.030303030303030304}]

# Part D — MongoDB Spark Connector 
This requires the connector JAR (via Maven package or local file).
If it fails, students still get full marks using Part C with PyMongo.


In [62]:
# OPTIONAL: SparkSession configured for Mongo Spark Connector
spark.stop()

from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .appName("Landmark-PySpark-Mongo-Connector")
         .master("local[*]")
         .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.13:10.4.0")
         .config("spark.mongodb.read.connection.uri", "mongodb://localhost:27017/landmark_ai_lab.wordcount")
         .config("spark.mongodb.write.connection.uri", "mongodb://localhost:27017/landmark_ai_lab.wordcount")
         .getOrCreate())

df_mongo = spark.read.format("mongodb").load()
df_mongo.show(10)


+--------------------+-----+--------------------+------+-------+
|                 _id|count|                freq|length|   word|
+--------------------+-----+--------------------+------+-------+
|697c4f7089189cc88...|    4| 0.12121212121212122|     2|     is|
|697c4f7089189cc88...|    4| 0.12121212121212122|     5|  spark|
|697c4f7089189cc88...|    2| 0.06060606060606061|     6| python|
|697c4f7089189cc88...|    2| 0.06060606060606061|     7|mongodb|
|697c4f7089189cc88...|    2| 0.06060606060606061|     4|   with|
|697c4f7089189cc88...|    1|0.030303030303030304|     3|    and|
|697c4f7089189cc88...|    1|0.030303030303030304|     7|pymongo|
|697c4f7089189cc88...|    1|0.030303030303030304|     7|pyspark|
|697c4f7089189cc88...|    1|0.030303030303030304|     7|general|
|697c4f7089189cc88...|    1|0.030303030303030304|     3|    you|
+--------------------+-----+--------------------+------+-------+
only showing top 10 rows



# Exercises (lab) — to be completed 
## Exercise 1 (RDD)
Using the same corpus:
1. Remove stopwords: `{"is","a","the","to","with","and"}`  
2. Recompute word counts  
3. Compare top 10 before vs after stopword removal

## Exercise 2 (DataFrames) 
Create a DataFrame with columns: `word, count, length, freq` and:
1. Compute average word length weighted by frequency  
2. Return the 10 longest words and their counts  
3. Filter words with `count >= 2` and show their share of total frequency

## Exercise 3 (MongoDB + PyMongo) 
1. Store all words (not only top 50) into MongoDB  
2. Create an index on `count` descending  
3. Query: return words with `length >= 7` sorted by count

## Exercise 4
Download any public text dataset (or scrape a few articles), and build a Spark pipeline:
- tokenization + cleaning  
- word count  
- top bigrams (pairs of consecutive words)  
- store results in MongoDB, visualize in Compass

## Exercise 5 
Use Spark ML to compute TF‑IDF:
- `pyspark.ml.feature.Tokenizer`, `HashingTF`, `IDF`


In [65]:
# --- EXERCISE 1: COMPLETE FIX ---
import re
from pyspark.sql import SparkSession

# 1. Restart the Spark Engine (Get a fresh session)
# This fixes the "NoneType" error by reconnecting the engine
spark = SparkSession.builder.appName("Exercise1").master("local[*]").getOrCreate()

# 2. Re-create the input data
text = [
    "Spark is fast. Spark is general-purpose.",
    "PySpark lets you use Spark with Python.",
    "Big data processing with Spark is scalable and efficient.",
    "MongoDB is a NoSQL database. PyMongo connects Python to MongoDB."
]
rdd = spark.sparkContext.parallelize(text)

# 3. Define the helper function and stopwords
def tokenize(text):
    return re.findall(r'[a-z]+', text.lower())

stopwords = {"is", "a", "the", "to", "with", "and"}

# 4. Run the Pipeline: Tokenize -> Filter -> Count
word_counts_filtered = (
    rdd.flatMap(tokenize)
       .filter(lambda word: word not in stopwords) # <--- The Filter Step
       .map(lambda word: (word, 1))
       .reduceByKey(lambda a, b: a + b)
       .sortBy(lambda x: x[1], ascending=False)
)

# 5. Print Result
print("TOP 10 AFTER STOPWORD REMOVAL:")
for word, count in word_counts_filtered.take(10):
    print(f"{word}: {count}")

TOP 10 AFTER STOPWORD REMOVAL:
spark: 4
python: 2
mongodb: 2
pymongo: 1
pyspark: 1
general: 1
you: 1
connects: 1
processing: 1
database: 1


# Solution To Exercise 2

In [66]:
# --- EXERCISE 2: COMPLETE SOLUTION ---
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import re

# 1. SETUP: Restart Spark and Re-create Data (To avoid errors)
spark = SparkSession.builder.appName("Exercise2").master("local[*]").getOrCreate()

text = [
    "Spark is fast. Spark is general-purpose.",
    "PySpark lets you use Spark with Python.",
    "Big data processing with Spark is scalable and efficient.",
    "MongoDB is a NoSQL database. PyMongo connects Python to MongoDB."
]

# Quick RDD pipeline to get word counts (from Exercise 1)
rdd = spark.sparkContext.parallelize(text)
def tokenize(t): return re.findall(r'[a-z]+', t.lower())
word_counts_rdd = rdd.flatMap(tokenize).map(lambda w: (w, 1)).reduceByKey(lambda a,b: a+b)

# ---------------------------------------------------------
# STEP 0: Create the DataFrame with columns: word, count, length, freq
# ---------------------------------------------------------
# Convert RDD to DataFrame
df = word_counts_rdd.toDF(["word", "count"])

# Calculate Total Words (needed for frequency)
total_words = df.agg(F.sum("count")).collect()[0][0]

# Add 'length' and 'freq' columns
df_features = (
    df.withColumn("length", F.length("word"))
      .withColumn("freq", F.col("count") / F.lit(total_words))
)

print("--- The DataFrame Structure ---")
df_features.show(5)

# ---------------------------------------------------------
# QUESTION 1: Compute average word length weighted by frequency
# ---------------------------------------------------------
# Logic: If 'is' (len 2) appears 4 times and 'general' (len 7) appears 1 time,
# we shouldn't just average 2 and 7. We must weight them by how often they appear.
# Formula: Sum(length * freq)
avg_weighted_length = df_features.agg(F.sum(F.col("length") * F.col("freq"))).collect()[0][0]

print(f"1. Average Word Length (Weighted): {avg_weighted_length:.4f} characters")

# ---------------------------------------------------------
# QUESTION 2: Return the 10 longest words and their counts
# ---------------------------------------------------------
print("\n--- 2. Top 10 Longest Words ---")
(df_features
    .orderBy(F.desc("length")) # Sort by length descending
    .select("word", "count", "length")
    .show(10)
)

# ---------------------------------------------------------
# QUESTION 3: Filter words with count >= 2 and show share of total frequency
# ---------------------------------------------------------
# Logic: We find all words that appear at least twice, and sum up their frequencies.
freq_share = (
    df_features
    .filter(F.col("count") >= 2)
    .agg(F.sum("freq"))
    .collect()[0][0]
)

print(f"3. Share of total frequency for words with count >= 2: {freq_share:.2%}")

--- The DataFrame Structure ---
+-------+-----+------+--------------------+
|   word|count|length|                freq|
+-------+-----+------+--------------------+
| python|    2|     6| 0.06060606060606061|
|    and|    1|     3|0.030303030303030304|
|pymongo|    1|     7|0.030303030303030304|
|     is|    4|     2| 0.12121212121212122|
|pyspark|    1|     7|0.030303030303030304|
+-------+-----+------+--------------------+
only showing top 5 rows

1. Average Word Length (Weighted): 5.0000 characters

--- 2. Top 10 Longest Words ---
+----------+-----+------+
|      word|count|length|
+----------+-----+------+
|processing|    1|    10|
| efficient|    1|     9|
|  scalable|    1|     8|
|  database|    1|     8|
|  connects|    1|     8|
|   pymongo|    1|     7|
|   mongodb|    2|     7|
|   purpose|    1|     7|
|   general|    1|     7|
|   pyspark|    1|     7|
+----------+-----+------+
only showing top 10 rows

3. Share of total frequency for words with count >= 2: 42.42%


# Solution To Exercise 3

In [67]:
# --- EXERCISE 3: MONGODB + PYMONGO ---
from pymongo import MongoClient, DESCENDING

# 1. Connect to MongoDB
# Ensure your MongoDB Server is running as per Part C instructions
client = MongoClient("mongodb://localhost:27017")
db = client["landmark_ai_lab"]
col = db["full_wordcount"]

# Clear existing data so we start fresh
col.delete_many({})

# ---------------------------------------------------------
# STEP 1: Store ALL words (not only top 50)
# ---------------------------------------------------------
# Convert the entire Spark DataFrame to a list of dictionaries for MongoDB
all_records = df_features.toPandas().to_dict(orient="records")
col.insert_many(all_records)

print(f"1. Successfully stored {col.count_documents({})} words in MongoDB.")

# ---------------------------------------------------------
# STEP 2: Create an index on 'count' descending
# ---------------------------------------------------------
# This makes searching and sorting by count much faster
col.create_index([("count", DESCENDING)])
print("2. Created descending index on the 'count' field.")

# ---------------------------------------------------------
# STEP 3: Query words with length >= 7 sorted by count
# ---------------------------------------------------------
# We use a filter: {"length": {"$gte": 7}} means length >= 7
query_filter = {"length": {"$gte": 7}}
results = col.find(query_filter, {"_id": 0}).sort("count", DESCENDING)

print("\n3. Words with length >= 7 (Sorted by count):")
for doc in results:
    print(doc)

1. Successfully stored 24 words in MongoDB.
2. Created descending index on the 'count' field.

3. Words with length >= 7 (Sorted by count):
{'word': 'mongodb', 'count': 2, 'length': 7, 'freq': 0.06060606060606061}
{'word': 'pymongo', 'count': 1, 'length': 7, 'freq': 0.030303030303030304}
{'word': 'pyspark', 'count': 1, 'length': 7, 'freq': 0.030303030303030304}
{'word': 'general', 'count': 1, 'length': 7, 'freq': 0.030303030303030304}
{'word': 'connects', 'count': 1, 'length': 8, 'freq': 0.030303030303030304}
{'word': 'processing', 'count': 1, 'length': 10, 'freq': 0.030303030303030304}
{'word': 'database', 'count': 1, 'length': 8, 'freq': 0.030303030303030304}
{'word': 'purpose', 'count': 1, 'length': 7, 'freq': 0.030303030303030304}
{'word': 'efficient', 'count': 1, 'length': 9, 'freq': 0.030303030303030304}
{'word': 'scalable', 'count': 1, 'length': 8, 'freq': 0.030303030303030304}


#### Solution To Exercise 4

In [68]:
# --- EXERCISE 4: COMPLETE PIPELINE ---
import urllib.request
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import Tokenizer, StopWordsRemover, NGram
from pymongo import MongoClient

# 1. SETUP: Start Spark and MongoDB connection
spark = SparkSession.builder.appName("Exercise4_Pipeline").master("local[*]").getOrCreate()
client = MongoClient("mongodb://localhost:27017") #
db = client["landmark_ai_lab"] #

# 2. DOWNLOAD DATA: Fetching a public text dataset (Sherlock Holmes)
url = "https://www.gutenberg.org/files/1661/1661-0.txt"
response = urllib.request.urlopen(url)
raw_text = response.read().decode('utf-8')[1000:10000] # Taking a slice for speed

# 3. CLEANING PIPELINE: Create a DataFrame and clean the text
# Create a single-row DataFrame with the text
data = spark.createDataFrame([(raw_text,)], ["raw_content"])

# Step A: Tokenization (Break text into words)
tokenizer = Tokenizer(inputCol="raw_content", outputCol="words")
words_df = tokenizer.transform(data)

# Step B: Cleaning (Remove the lab's specific stopwords + standard ones)
lab_stopwords = ["is", "a", "the", "to", "with", "and"] #
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words", stopWords=lab_stopwords)
clean_df = remover.transform(words_df)

# 4. BIGRAM GENERATION: Create pairs of words
ngram = NGram(n=2, inputCol="filtered_words", outputCol="bigrams")
bigram_df = ngram.transform(clean_df)

# 5. ANALYSIS: Count the top 10 bigrams
# Explode the list of bigrams into individual rows to count them
top_bigrams = (
    bigram_df.select(F.explode("bigrams").alias("bigram"))
    .groupBy("bigram")
    .count()
    .orderBy(F.desc("count"))
    .limit(10)
)

print("--- TOP 10 BIGRAMS ---")
top_bigrams.show()

# 6. STORAGE: Save results in MongoDB
# Convert to list and insert
bigram_records = top_bigrams.toPandas().to_dict(orient="records")
db["top_bigrams"].delete_many({}) # Clear old results
db["top_bigrams"].insert_many(bigram_records)

print(f"Pipeline Complete! Saved {len(bigram_records)} bigrams to 'top_bigrams' collection.")

--- TOP 10 BIGRAMS ---
+------------+-----+
|      bigram|count|
+------------+-----+
|            |  134|
|      of his|    8|
|    that you|    7|
|adventure of|    6|
|    you have|    6|
|   adventure|    6|
|         of |    6|
|          i |    5|
|      he was|    5|
|      do you|    4|
+------------+-----+

Pipeline Complete! Saved 10 bigrams to 'top_bigrams' collection.


#Solution To Exercise 4

In [71]:
# --- EXERCISE 5: TF-IDF WITH SPARK ML ---
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, HashingTF, IDF

# 1. Setup Spark Session
spark = SparkSession.builder.appName("Exercise5_TFIDF").getOrCreate()

# 2. Create the data (using the same corpus from previous exercises)
sentence_data = spark.createDataFrame([
    (0, "Spark is fast Spark is general-purpose"),
    (1, "PySpark lets you use Spark with Python"),
    (2, "Big data processing with Spark is scalable and efficient"),
    (3, "MongoDB is a NoSQL database PyMongo connects Python to MongoDB")
], ["id", "sentence"])

# STEP A: Tokenization
# Converts the sentence into a list of words
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
words_data = tokenizer.transform(sentence_data)

# STEP B: HashingTF (Term Frequency)
# numFeatures=20 means we map words into 20 possible numeric "buckets"
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurized_data = hashingTF.transform(words_data)

# STEP C: IDF (Inverse Document Frequency)
# This calculates the rarity of words across all 4 sentences
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurized_data)
rescaled_data = idfModel.transform(featurized_data)

# 3. Show the Results
print("TF-IDF Features (Word Importance):")
rescaled_data.select("sentence", "features").show(truncate=False)

TF-IDF Features (Word Importance):
+--------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------+
|sentence                                                      |features                                                                                                                                         |
+--------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------+
|Spark is fast Spark is general-purpose                        |(20,[0,3,6,9],[0.9162907318741551,0.9162907318741551,0.44628710262841953,0.0])                                                                   |
|PySpark lets you use Spark with Python                        |(20,[1,6,9,10,13,17,18],[0.5108256237659907,0.22314355131

# Exercise 6
1) Build a Spark pipeline that reads **multiple text files** from a folder and produces:
- per-file word count  
- global word count  
- top 20 keywords per file

2) Extend the pipeline to store both results in MongoDB:
- Collection 1: `global_wordcount`  
- Collection 2: `per_file_wordcount`


In [72]:
# --- EXERCISE 6: MULTI-FILE PIPELINE ---
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pymongo import MongoClient

# 1. PREPARATION: Create a folder with 3 sample files
os.makedirs("my_corpus", exist_ok=True)
with open("my_corpus/file1.txt", "w") as f: f.write("apple orange apple banana")
with open("my_corpus/file2.txt", "w") as f: f.write("banana cherry banana apple")
with open("my_corpus/file3.txt", "w") as f: f.write("cherry cherry apple mango")

# 2. SETUP: Initialize Spark
spark = SparkSession.builder.appName("Exercise6").getOrCreate()

# 3. READ MULTIPLE FILES: Use wholeTextFiles to get (FilePath, Content)
raw_rdd = spark.sparkContext.wholeTextFiles("my_corpus")

# 4. PROCESSING: Create the "Per-File Word Count"
def process_files(path_content):
    path, content = path_content
    filename = os.path.basename(path) # Get just 'file1.txt' instead of full path
    words = content.lower().split()
    return [(filename, word) for word in words]

# This creates an RDD of (filename, word)
per_file_words = raw_rdd.flatMap(process_files)

# Convert to DataFrame for easier math
# Schema: [file, word]
df_all = per_file_words.toDF(["file", "word"])

# ---------------------------------------------------------
# GOAL A: Per-File Word Count
# ---------------------------------------------------------
per_file_count_df = df_all.groupBy("file", "word").count().orderBy("file", F.desc("count"))
print("--- Per-File Word Count ---")
per_file_count_df.show()

# ---------------------------------------------------------
# GOAL B: Global Word Count
# ---------------------------------------------------------
global_wordcount_df = df_all.groupBy("word").count().orderBy(F.desc("count"))
print("--- Global Word Count ---")
global_wordcount_df.show()

# ---------------------------------------------------------
# GOAL C: Top 20 Keywords Per File
# ---------------------------------------------------------
# We use a Window function to "rank" words inside each file
from pyspark.sql.window import Window
windowSpec = Window.partitionBy("file").orderBy(F.desc("count"))

top_20_per_file = (
    per_file_count_df
    .withColumn("rank", F.row_number().over(windowSpec))
    .filter(F.col("rank") <= 20)
    .drop("rank")
)
print("--- Top 20 Keywords Per File ---")
top_20_per_file.show()

# ---------------------------------------------------------
# STORAGE: Store in MongoDB Collections
# ---------------------------------------------------------
client = MongoClient("mongodb://localhost:27017")
db = client["landmark_ai_lab"]

# Collection 1: Global Wordcount
db["global_wordcount"].delete_many({})
db["global_wordcount"].insert_many(global_wordcount_df.toPandas().to_dict(orient="records"))

# Collection 2: Per-File Wordcount
db["per_file_wordcount"].delete_many({})
db["per_file_wordcount"].insert_many(top_20_per_file.toPandas().to_dict(orient="records"))

print("\nMISSION COMPLETE: Data stored in MongoDB!")

--- Per-File Word Count ---
+---------+------+-----+
|     file|  word|count|
+---------+------+-----+
|file1.txt| apple|    2|
|file1.txt|orange|    1|
|file1.txt|banana|    1|
|file2.txt|banana|    2|
|file2.txt|cherry|    1|
|file2.txt| apple|    1|
|file3.txt|cherry|    2|
|file3.txt| mango|    1|
|file3.txt| apple|    1|
+---------+------+-----+

--- Global Word Count ---
+------+-----+
|  word|count|
+------+-----+
| apple|    4|
|cherry|    3|
|banana|    3|
|orange|    1|
| mango|    1|
+------+-----+

--- Top 20 Keywords Per File ---
+---------+------+-----+
|     file|  word|count|
+---------+------+-----+
|file1.txt| apple|    2|
|file1.txt|orange|    1|
|file1.txt|banana|    1|
|file2.txt|banana|    2|
|file2.txt|cherry|    1|
|file2.txt| apple|    1|
|file3.txt|cherry|    2|
|file3.txt| mango|    1|
|file3.txt| apple|    1|
+---------+------+-----+


MISSION COMPLETE: Data stored in MongoDB!
