<a href="https://colab.research.google.com/github/KhurramRashid6893/1_Portfolio_MERN/blob/main/TF_IDF.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("TFIDFExample").getOrCreate()

# Sample documents
sentenceData = spark.createDataFrame([
    (0, "I love AI and machine learning"),
    (1, "Deep learning and AI are fun")
], ["id", "sentence"])

# Tokenize text
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

# Compute Term Frequency
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)

# Compute IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("words", "features").show(truncate=False)


+-------------------------------------+---------------------------------------------------------------------------------------------+
|words                                |features                                                                                     |
+-------------------------------------+---------------------------------------------------------------------------------------------+
|[i, love, ai, and, machine, learning]|(20,[0,11,12,13,16],[0.8109302162163288,0.0,0.0,0.0,0.4054651081081644])                     |
|[deep, learning, and, ai, are, fun]  |(20,[2,3,11,12,13,19],[0.4054651081081644,0.4054651081081644,0.0,0.0,0.0,0.4054651081081644])|
+-------------------------------------+---------------------------------------------------------------------------------------------+



In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.sql.functions import explode, col

# --- Step 1: Spark Session ---
spark = SparkSession.builder.appName("TFIDFExample").getOrCreate()

# --- Step 2: Sample documents ---
docs = [
    (0, "I love AI and machine learning"),
    (1, "Deep learning and AI are fun")
]

df = spark.createDataFrame(docs, ["id", "sentence"])

# --- Step 3: Tokenize ---
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(df)

# --- Step 4: Term Frequency ---
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)

# --- Step 5: IDF ---
idf = IDF(inputCol="rawFeatures", outputCol="tfidf_features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

# --- Step 6: Explode words to print TF values ---
# Compute raw TF counts
wordsData_exploded = wordsData.select("id", explode("words").alias("word"))

# Count term frequency per document
tf_counts = wordsData_exploded.groupBy("id", "word").count()
tf_counts.show()

# --- Step 7: Show TF-IDF vectors ---
rescaledData.select("id", "words", "tfidf_features").show(truncate=False)


+---+--------+-----+
| id|    word|count|
+---+--------+-----+
|  0|     and|    1|
|  0| machine|    1|
|  0|       i|    1|
|  0|    love|    1|
|  0|learning|    1|
|  0|      ai|    1|
|  1|     fun|    1|
|  1|learning|    1|
|  1|     are|    1|
|  1|      ai|    1|
|  1|     and|    1|
|  1|    deep|    1|
+---+--------+-----+

+---+-------------------------------------+---------------------------------------------------------------------------------------------+
|id |words                                |tfidf_features                                                                               |
+---+-------------------------------------+---------------------------------------------------------------------------------------------+
|0  |[i, love, ai, and, machine, learning]|(20,[0,11,12,13,16],[0.8109302162163288,0.0,0.0,0.0,0.4054651081081644])                     |
|1  |[deep, learning, and, ai, are, fun]  |(20,[2,3,11,12,13,19],[0.4054651081081644,0.4054651081081644,0.0,0.0

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.sql.functions import explode, col
from pyspark.ml.linalg import DenseVector

# --- Step 1: Spark session ---
spark = SparkSession.builder.appName("TFIDFWordImportance").getOrCreate()

# --- Step 2: Sample documents ---
docs = [
    (0, "I love AI and machine learning"),
    (1, "Deep learning and AI are fun")
]

df = spark.createDataFrame(docs, ["id", "sentence"])

# --- Step 3: Tokenize ---
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(df)

# --- Step 4: Compute TF ---
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)

# --- Step 5: Compute IDF ---
idf = IDF(inputCol="rawFeatures", outputCol="tfidf_features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

# --- Step 6: Explode words to get TF counts ---
words_exploded = wordsData.select("id", explode("words").alias("word"))
tf_counts = words_exploded.groupBy("id", "word").count()

# --- Step 7: Join with TF-IDF ---
# Convert TF-IDF sparse vector to dense array for each document and pair with words
def tfidf_per_word(row):
    words = row['words']
    tfidf_vector = row['tfidf_features']
    tfidf_values = tfidf_vector.toArray()
    # Use HashingTF to get the index for each word
    word_indices = [hashingTF.indexOf(w) for w in words]
    # Pair each word with its TF-IDF value using the correct index
    return [(row['id'], w, tfidf_values[idx]) for w, idx in zip(words, word_indices)]

tfidf_rdd = rescaledData.rdd.flatMap(tfidf_per_word)
tfidf_df = spark.createDataFrame(tfidf_rdd, ["id", "word", "tfidf"])

# --- Step 8: Join TF and TF-IDF ---
tf_tfidf_df = tf_counts.join(tfidf_df, on=['id','word']).orderBy('id','tfidf', ascending=False)
tf_tfidf_df.show(truncate=False)

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.sql.functions import explode, col
from pyspark.ml.linalg import DenseVector
from pyspark.sql import functions as F

# --- Step 1: Spark session ---
spark = SparkSession.builder.appName("TFIDFTopBottom").getOrCreate()

# --- Step 2: Sample documents ---
docs = [
    (0, "I love AI and machine learning"),
    (1, "Deep learning and AI are fun")
]

df = spark.createDataFrame(docs, ["id", "sentence"])

# --- Step 3: Tokenize ---
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(df)

# --- Step 4: Compute TF ---
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)

# --- Step 5: Compute IDF ---
idf = IDF(inputCol="rawFeatures", outputCol="tfidf_features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

# --- Step 6: Explode words to get TF counts ---
words_exploded = wordsData.select("id", explode("words").alias("word"))
tf_counts = words_exploded.groupBy("id", "word").count()

# --- Step 7: Flatten TF-IDF vectors ---
def tfidf_per_word(row):
    words = row['words']
    tfidf_vector = row['tfidf_features']
    tfidf_values = tfidf_vector.toArray()
    # Use HashingTF to get the index for each word
    word_indices = [hashingTF.indexOf(w) for w in words]
    # Pair each word with its TF-IDF value using the correct index
    return [(row['id'], w, tfidf_values[idx]) for w, idx in zip(words, word_indices)]

tfidf_rdd = rescaledData.rdd.flatMap(tfidf_per_word)
tfidf_df = spark.createDataFrame(tfidf_rdd, ["id", "word", "tfidf"])

# --- Step 8: Join TF counts with TF-IDF ---
tf_tfidf_df = tf_counts.join(tfidf_df, on=['id','word'])

# --- Step 9: Find highest and lowest TF-IDF words per document ---
from pyspark.sql.window import Window

window = Window.partitionBy("id")

# Highest TF-IDF
high_df = tf_tfidf_df.withColumn("rank_high", F.row_number().over(window.orderBy(F.desc("tfidf")))) \
                     .filter(col("rank_high") == 1) \
                     .select("id", col("word").alias("high_word"), "count", col("tfidf").alias("high_tfidf"))

# Lowest TF-IDF
low_df = tf_tfidf_df.withColumn("rank_low", F.row_number().over(window.orderBy("tfidf"))) \
                    .filter(col("rank_low") == 1) \
                    .select("id", col("word").alias("low_word"), "count", col("tfidf").alias("low_tfidf"))

# Join high and low
result = high_df.join(low_df, on="id")
result.show(truncate=False)