<a href="https://colab.research.google.com/github/eiwahab00works/BigDataAnalytics_Project/blob/main/BigData_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#**Understanding**

The first part of the assignment focuses on **data ingestion**, which means bringing raw data into our pipeline so it can later be processed and analyzed.

There are two common ingestion approaches:

**Batch ingestion**: Data is collected and loaded in bulk at scheduled intervals (e.g., daily, weekly).

**Stream ingestion**: Data flows in continuously in real time (e.g., live sensors, API streams).

Since our dataset is **static** (not continuously updating), we are using batch ingestion.
This allows us to process large volumes of data at once and store it in an optimized format for future use.



---



**Step 1: Data Ingestion**
---


1️⃣ Data Source Selection

**Dataset**: Amazon Books Reviews

**Source**: Kaggle

The dataset contained two CSV files:

1.   books_data.csv — Book metadata (titles, authors, genres, etc.)
2.   Books_rating.csv — User ratings and review details.

Size of the Dataset is > 1GB.


---

**FLOW OF OUR WORKING**
---


CSV file (from Kaggle/Drive)
    
     ⬇

Read in Colab using Pandas
    
     ⬇

Convert to Parquet format
     
     ⬇

Store back to Drive (for next steps like Kafka or Lakehouse)

---

**What is Parquet format?**

The Parquet format was chosen because it is:

*   Columnar — stores data column-wise, making analytical queries faster.

*   Compressed — takes much less storage than CSV.

*   Schema-aware — retains data types and metadata automatically.


Optimized for tools like Spark, Hadoop, and BigQuery.

Converting CSV files to Parquet is an important step in data engineering pipelines since it improves performance and scalability for later transformations or machine learning tasks.

---






In [None]:
# {"username":"eimanwahab","key":"f7190e64a652d196f196b11c4d1f643e"}

In [1]:
!pip install opendatasets

Collecting opendatasets
  Downloading opendatasets-0.1.22-py3-none-any.whl.metadata (9.2 kB)
Downloading opendatasets-0.1.22-py3-none-any.whl (15 kB)
Installing collected packages: opendatasets
Successfully installed opendatasets-0.1.22


In [2]:
import opendatasets as od

od.download("https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews")

Please provide your Kaggle credentials to download this dataset. Learn more: http://bit.ly/kaggle-creds
Your Kaggle username: eimanwahab
Your Kaggle Key: ··········
Dataset URL: https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews
Downloading amazon-books-reviews.zip to ./amazon-books-reviews


100%|██████████| 1.06G/1.06G [00:08<00:00, 140MB/s]





In [3]:
import pandas as pd

# Step 1: Define paths
data_Books_info = "/content/amazon-books-reviews/books_data.csv"
data_Books_reviews = "/content/amazon-books-reviews/Books_rating.csv"

# Step 2: Read both CSV files
books_df = pd.read_csv(data_Books_info)
ratings_df = pd.read_csv(data_Books_reviews)


In [4]:
# Step 3: Preview
print("Books Information Data:")
books_df.head()

Books Information Data:


Unnamed: 0,Title,description,authors,image,previewLink,publisher,publishedDate,infoLink,categories,ratingsCount
0,Its Only Art If Its Well Hung!,,['Julie Strain'],http://books.google.com/books/content?id=DykPA...,http://books.google.nl/books?id=DykPAAAACAAJ&d...,,1996,http://books.google.nl/books?id=DykPAAAACAAJ&d...,['Comics & Graphic Novels'],
1,Dr. Seuss: American Icon,Philip Nel takes a fascinating look into the k...,['Philip Nel'],http://books.google.com/books/content?id=IjvHQ...,http://books.google.nl/books?id=IjvHQsCn_pgC&p...,A&C Black,2005-01-01,http://books.google.nl/books?id=IjvHQsCn_pgC&d...,['Biography & Autobiography'],
2,Wonderful Worship in Smaller Churches,This resource includes twelve principles in un...,['David R. Ray'],http://books.google.com/books/content?id=2tsDA...,http://books.google.nl/books?id=2tsDAAAACAAJ&d...,,2000,http://books.google.nl/books?id=2tsDAAAACAAJ&d...,['Religion'],
3,Whispers of the Wicked Saints,Julia Thomas finds her life spinning out of co...,['Veronica Haddon'],http://books.google.com/books/content?id=aRSIg...,http://books.google.nl/books?id=aRSIgJlq6JwC&d...,iUniverse,2005-02,http://books.google.nl/books?id=aRSIgJlq6JwC&d...,['Fiction'],
4,"Nation Dance: Religion, Identity and Cultural ...",,['Edward Long'],,http://books.google.nl/books?id=399SPgAACAAJ&d...,,2003-03-01,http://books.google.nl/books?id=399SPgAACAAJ&d...,,


In [5]:
print("\nBooks Rating Data:")
ratings_df.head()



Books Rating Data:


Unnamed: 0,Id,Title,Price,User_id,profileName,review/helpfulness,review/score,review/time,review/summary,review/text
0,1882931173,Its Only Art If Its Well Hung!,,AVCGYZL8FQQTD,"Jim of Oz ""jim-of-oz""",7/7,4.0,940636800,Nice collection of Julie Strain images,This is only for Julie Strain fans. It's a col...
1,826414346,Dr. Seuss: American Icon,,A30TK6U7DNS82R,Kevin Killian,10/10,5.0,1095724800,Really Enjoyed It,I don't care much for Dr. Seuss but after read...
2,826414346,Dr. Seuss: American Icon,,A3UH4UZ4RSVO82,John Granger,10/11,5.0,1078790400,Essential for every personal and Public Library,"If people become the books they read and if ""t..."
3,826414346,Dr. Seuss: American Icon,,A2MVUWT453QH61,"Roy E. Perry ""amateur philosopher""",7/7,4.0,1090713600,Phlip Nel gives silly Seuss a serious treatment,"Theodore Seuss Geisel (1904-1991), aka &quot;D..."
4,826414346,Dr. Seuss: American Icon,,A22X4XUPKF66MR,"D. H. Richards ""ninthwavestore""",3/3,4.0,1107993600,Good academic overview,Philip Nel - Dr. Seuss: American IconThis is b...


## Now converting this in parquet format and saving it in our drive.

In [6]:
!pip install pyarrow



In [7]:
books_df.to_parquet("/content/drive/MyDrive/amazon-books-reviews/books_data.parquet", index=False)
ratings_df.to_parquet("/content/drive/MyDrive/amazon-books-reviews/books_rating.parquet", index=False)

print("✅ Both CSVs converted to Parquet format successfully!")

✅ Both CSVs converted to Parquet format successfully!


In [8]:
df_parquet = pd.read_parquet("/content/drive/MyDrive/amazon-books-reviews/books_data.parquet")
df_parquet.head()


Unnamed: 0,Title,description,authors,image,previewLink,publisher,publishedDate,infoLink,categories,ratingsCount
0,Its Only Art If Its Well Hung!,,['Julie Strain'],http://books.google.com/books/content?id=DykPA...,http://books.google.nl/books?id=DykPAAAACAAJ&d...,,1996,http://books.google.nl/books?id=DykPAAAACAAJ&d...,['Comics & Graphic Novels'],
1,Dr. Seuss: American Icon,Philip Nel takes a fascinating look into the k...,['Philip Nel'],http://books.google.com/books/content?id=IjvHQ...,http://books.google.nl/books?id=IjvHQsCn_pgC&p...,A&C Black,2005-01-01,http://books.google.nl/books?id=IjvHQsCn_pgC&d...,['Biography & Autobiography'],
2,Wonderful Worship in Smaller Churches,This resource includes twelve principles in un...,['David R. Ray'],http://books.google.com/books/content?id=2tsDA...,http://books.google.nl/books?id=2tsDAAAACAAJ&d...,,2000,http://books.google.nl/books?id=2tsDAAAACAAJ&d...,['Religion'],
3,Whispers of the Wicked Saints,Julia Thomas finds her life spinning out of co...,['Veronica Haddon'],http://books.google.com/books/content?id=aRSIg...,http://books.google.nl/books?id=aRSIgJlq6JwC&d...,iUniverse,2005-02,http://books.google.nl/books?id=aRSIgJlq6JwC&d...,['Fiction'],
4,"Nation Dance: Religion, Identity and Cultural ...",,['Edward Long'],,http://books.google.nl/books?id=399SPgAACAAJ&d...,,2003-03-01,http://books.google.nl/books?id=399SPgAACAAJ&d...,,


In [9]:
df_parquet = pd.read_parquet("/content/drive/MyDrive/amazon-books-reviews/books_rating.parquet")
df_parquet.head()


Unnamed: 0,Id,Title,Price,User_id,profileName,review/helpfulness,review/score,review/time,review/summary,review/text
0,1882931173,Its Only Art If Its Well Hung!,,AVCGYZL8FQQTD,"Jim of Oz ""jim-of-oz""",7/7,4.0,940636800,Nice collection of Julie Strain images,This is only for Julie Strain fans. It's a col...
1,826414346,Dr. Seuss: American Icon,,A30TK6U7DNS82R,Kevin Killian,10/10,5.0,1095724800,Really Enjoyed It,I don't care much for Dr. Seuss but after read...
2,826414346,Dr. Seuss: American Icon,,A3UH4UZ4RSVO82,John Granger,10/11,5.0,1078790400,Essential for every personal and Public Library,"If people become the books they read and if ""t..."
3,826414346,Dr. Seuss: American Icon,,A2MVUWT453QH61,"Roy E. Perry ""amateur philosopher""",7/7,4.0,1090713600,Phlip Nel gives silly Seuss a serious treatment,"Theodore Seuss Geisel (1904-1991), aka &quot;D..."
4,826414346,Dr. Seuss: American Icon,,A22X4XUPKF66MR,"D. H. Richards ""ninthwavestore""",3/3,4.0,1107993600,Good academic overview,Philip Nel - Dr. Seuss: American IconThis is b...




---


**Step 2: Streaming Data to Kafka**
---


To stream the cleaned and combined dataset (stored in Parquet format) from Google Colab to Kafka, where it can be used for real-time processing or stored for further analytics.

1. Data Preparation

Two Parquet files were stored in Google Drive.

These files were loaded into Colab and combined into a single DataFrame.

2. Kafka Setup

Kafka acts as a real-time message broker, allowing data to be published by a producer and consumed by other systems (like Spark, Flink, or databases).

---

*Kafka Topic Creation*

A Kafka topic is like a logical channel or queue where messages are sent and stored.

we can create it like using Aiven Cloud Console:

Navigate to your Kafka service.

Go to Topics → Create New Topic.

Enter a name (e.g., amazon-book-data etc).

Click Create.

---

4. Key Concepts


---

Term	| Description

---


**Producer**	|	  Application that sends data (messages) to Kafka.

**Consumer**		| Application that reads data from Kafka.

**Topic**		| Logical data stream in Kafka that stores messages.

**Partition**		| Subdivision of a topic for parallel processing.

**Broker**	|	Kafka server that stores and manages topics.

---



**Sending Data to Kafka : Using Aiven stream processing**
---


The producer sends each row of the DataFrame as a message to the Kafka topic.

---


In [None]:
# now trying kafka from

In [10]:
!pip install confluent-kafka


Collecting confluent-kafka
  Downloading confluent_kafka-2.12.1-cp312-cp312-manylinux_2_28_x86_64.whl.metadata (31 kB)
Downloading confluent_kafka-2.12.1-cp312-cp312-manylinux_2_28_x86_64.whl (3.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.9/3.9 MB[0m [31m37.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: confluent-kafka
Successfully installed confluent-kafka-2.12.1


In [None]:
# ✅ Producer Code
from confluent_kafka import Producer
import pandas as pd
import json
import time

# Step 1: Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Step 2: Load both Parquet files (update paths if your folder name differs)
df1 = pd.read_parquet("/content/drive/MyDrive/amazon-books-reviews/books_data.parquet")
df2 = pd.read_parquet("/content/drive/MyDrive/amazon-books-reviews/books_rating.parquet")

# Combine both
df = pd.concat([df1, df2], ignore_index=True)
print("✅ Combined data shape:", df.shape)

# Step 3: Kafka Config
conf = {
    'bootstrap.servers': 'kafka-ca309a9-cloud-8d32.f.aivencloud.com:19253',  # Aiven Kafka server
    'security.protocol': 'SSL',
    'ssl.ca.location': '/content/drive/MyDrive/ca.pem',
    'ssl.certificate.location': '/content/drive/MyDrive/service.cert',
    'ssl.key.location': '/content/drive/MyDrive/service.key'

}

producer = Producer(conf)
topic_name = "amazon-books"  # Aiven topic name

# Step 4: Send first few rows to Kafka
for i, row in df.head(10).iterrows():  # only sending 10 for test
    message = row.to_json()
    producer.produce(topic_name, value=message.encode('utf-8'))
    print(f"✅ Sent record {i+1} to Kafka")
    time.sleep(1)

producer.flush()
print("🎉 All messages sent successfully to Kafka!")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
✅ Combined data shape: (3212404, 19)
✅ Sent record 1 to Kafka
✅ Sent record 2 to Kafka
✅ Sent record 3 to Kafka
✅ Sent record 4 to Kafka
✅ Sent record 5 to Kafka
✅ Sent record 6 to Kafka
✅ Sent record 7 to Kafka
✅ Sent record 8 to Kafka
✅ Sent record 9 to Kafka
✅ Sent record 10 to Kafka
🎉 All messages sent successfully to Kafka!


In [None]:
# ✅ Consumer Code
from confluent_kafka import Consumer

conf = {
    'bootstrap.servers': 'your-service-name.aivencloud.com:29091',  # same as before
    'security.protocol': 'SSL',
    'ssl.ca.location': '/content/drive/MyDrive/ca.pem',
    'ssl.certificate.location': '/content/drive/MyDrive/service.cert',
    'ssl.key.location': '/content/drive/MyDrive/service.key',
    'group.id': 'colab-consumer-group',
    'auto.offset.reset': 'earliest'
}

consumer = Consumer(conf)
topic_name = "amazon-books"
consumer.subscribe([topic_name])

print("🟢 Listening for messages from Kafka... (Press stop when done)")

try:
    while True:
        msg = consumer.poll(2.0)  # waits 2 sec for message
        if msg is None:
            break
        if msg.error():
            print(f"⚠️ Error: {msg.error()}")
        else:
            print("📦 Received message:", msg.value().decode('utf-8'))
finally:
    consumer.close()


🟢 Listening for messages from Kafka... (Press stop when done)


---

now this means your Kafka consumer is successfully connected to our Aiven Kafka service and subscribed to the "amazon-books" topic

Right now, our consumer is sitting there saying:
🟢 Listening for messages from Kafka... (Press stop when done)

That means it’s waiting for new messages from the topic "amazon-books".

But...
if you see no messages printed, it simply means nothing new has been produced to the topic since this consumer started.

Kafka only delivers new messages that are continuously coming from producer.

---



In [None]:
# Let’s test the full producer → topic → consumer flow: Sending Data from one side and receiving here

Now we are Going to send our produced data directly to Postgres Database from Aiven.

---
Connecting Our Postgres Server here
---

---

Creating a table in our postgres server for the data storing.



In [None]:
import psycopg2

# Your Aiven PostgreSQL credentials
conn = psycopg2.connect(
    host="pg-14218fd-cloud-8d32.b.aivencloud.com",
    port=19251,
    database="defaultdb",
    user="avnadmin",
    password="AVNS_5ityIf7qKKbop26NPzS",
    sslmode='verify-ca',
    sslrootcert="/content/drive/MyDrive/_postgressca.pem"
)

cursor = conn.cursor()

# Create the table where our data is to be stored
cursor.execute("""
CREATE TABLE IF NOT EXISTS amazon_books (
    id SERIAL PRIMARY KEY,
    data JSONB,
    inserted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""")

conn.commit()
print("✅ Table 'amazon_books' created successfully in PostgreSQL!")

cursor.close()
conn.close()


✅ Table 'amazon_books' created successfully in PostgreSQL!


In [None]:
from confluent_kafka import Consumer
import psycopg2
import json

# --- PostgreSQL connection ---
conn = psycopg2.connect(
    host="pg-14218fd-cloud-8d32.b.aivencloud.com",
    port=19251,
    database="defaultdb",
    user="avnadmin",
    password="AVNS_5ityIf7qKKbop26NPzS",
    sslmode='verify-ca',
    sslrootcert="/content/drive/MyDrive/_postgressca.pem"
)
cursor = conn.cursor()

# --- Kafka consumer config ---
conf = {
    'bootstrap.servers': 'kafka-ca309a9-cloud-8d32.f.aivencloud.com:19253',
    'security.protocol': 'SSL',
    'ssl.ca.location': '/content/drive/MyDrive/ca.pem',
    'ssl.certificate.location': '/content/drive/MyDrive/service.cert',
    'ssl.key.location': '/content/drive/MyDrive/service.key',
    'group.id': 'colab-consumer-group',
    'auto.offset.reset': 'earliest'
}

consumer = Consumer(conf)
topic_name = "amazon-books"
consumer.subscribe([topic_name])

print("🟢 Listening for Kafka messages and inserting into PostgreSQL...")

try:
    while True:
        msg = consumer.poll(2.0)
        if msg is None:
            continue
        if msg.error():
            print(f"⚠️ Error: {msg.error()}")
        else:
            # Decode message
            data = msg.value().decode('utf-8')
            print("📦 Received:", data)

            # Insert into PostgreSQL
            cursor.execute("INSERT INTO amazon_books (data) VALUES (%s)", [json.dumps(data)])
            conn.commit()
            print("✅ Inserted into PostgreSQL!")

except KeyboardInterrupt:
    print("🛑 Stopped by user")

finally:
    consumer.close()
    cursor.close()
    conn.close()
    print("🔒 Connections closed.")


🟢 Listening for Kafka messages and inserting into PostgreSQL...
📦 Received: {"Title":"Its Only Art If Its Well Hung!","description":null,"authors":"['Julie Strain']","image":"http:\/\/books.google.com\/books\/content?id=DykPAAAACAAJ&printsec=frontcover&img=1&zoom=1&source=gbs_api","previewLink":"http:\/\/books.google.nl\/books?id=DykPAAAACAAJ&dq=Its+Only+Art+If+Its+Well+Hung!&hl=&cd=1&source=gbs_api","publisher":null,"publishedDate":"1996","infoLink":"http:\/\/books.google.nl\/books?id=DykPAAAACAAJ&dq=Its+Only+Art+If+Its+Well+Hung!&hl=&source=gbs_api","categories":"['Comics & Graphic Novels']","ratingsCount":null,"Id":null,"Price":null,"User_id":null,"profileName":null,"review\/helpfulness":null,"review\/score":null,"review\/time":null,"review\/summary":null,"review\/text":null}
✅ Inserted into PostgreSQL!
📦 Received: {"Title":"Dr. Seuss: American Icon","description":"Philip Nel takes a fascinating look into the key aspects of Seuss's career - his poetry, politics, art, marketing, and 

In [None]:
# Printing the data again to check our data is indeed in the Postgress server.
import psycopg2
import pandas as pd
import json

# Connect to your PostgreSQL
conn = psycopg2.connect(
    host="pg-14218fd-cloud-8d32.b.aivencloud.com",
    port=19251,
    database="defaultdb",
    user="avnadmin",
    password="AVNS_5ityIf7qKKbop26NPzS",
    sslmode='verify-ca',
    sslrootcert="/content/drive/MyDrive/_postgressca.pem"
)

query = "SELECT data FROM amazon_books;"
df = pd.read_sql(query, conn)

# Convert JSONB text to columns
df = pd.json_normalize(df['data'].apply(json.loads))
print("✅ Retrieved and expanded data:", df.shape)
df.head()


  df = pd.read_sql(query, conn)


✅ Retrieved and expanded data: (60, 19)


Unnamed: 0,Title,description,authors,image,previewLink,publisher,publishedDate,infoLink,categories,ratingsCount,Id,Price,User_id,profileName,review/helpfulness,review/score,review/time,review/summary,review/text
0,Its Only Art If Its Well Hung!,,['Julie Strain'],http://books.google.com/books/content?id=DykPA...,http://books.google.nl/books?id=DykPAAAACAAJ&d...,,1996,http://books.google.nl/books?id=DykPAAAACAAJ&d...,['Comics & Graphic Novels'],,,,,,,,,,
1,Dr. Seuss: American Icon,Philip Nel takes a fascinating look into the k...,['Philip Nel'],http://books.google.com/books/content?id=IjvHQ...,http://books.google.nl/books?id=IjvHQsCn_pgC&p...,A&C Black,2005-01-01,http://books.google.nl/books?id=IjvHQsCn_pgC&d...,['Biography & Autobiography'],,,,,,,,,,
2,Wonderful Worship in Smaller Churches,This resource includes twelve principles in un...,['David R. Ray'],http://books.google.com/books/content?id=2tsDA...,http://books.google.nl/books?id=2tsDAAAACAAJ&d...,,2000,http://books.google.nl/books?id=2tsDAAAACAAJ&d...,['Religion'],,,,,,,,,,
3,Whispers of the Wicked Saints,Julia Thomas finds her life spinning out of co...,['Veronica Haddon'],http://books.google.com/books/content?id=aRSIg...,http://books.google.nl/books?id=aRSIgJlq6JwC&d...,iUniverse,2005-02,http://books.google.nl/books?id=aRSIgJlq6JwC&d...,['Fiction'],,,,,,,,,,
4,"Nation Dance: Religion, Identity and Cultural ...",,['Edward Long'],,http://books.google.nl/books?id=399SPgAACAAJ&d...,,2003-03-01,http://books.google.nl/books?id=399SPgAACAAJ&d...,,,,,,,,,,,



---


**Step 3: Data Processing and Transformation**
---
---


1.   Load the Parquet Data Back into Colab.
2.   List item




---



In [1]:
import pandas as pd

# Load the two Parquet files (adjust paths if needed)
books_df = pd.read_parquet("/content/drive/MyDrive/amazon-books-reviews/books_data.parquet")
reviews_df = pd.read_parquet("/content/drive/MyDrive/amazon-books-reviews/books_rating.parquet")

print("✅ Files Loaded Successfully!")
print("Books Data Shape:", books_df.shape)
print("Reviews Data Shape:", reviews_df.shape)



✅ Files Loaded Successfully!
Books Data Shape: (212404, 10)
Reviews Data Shape: (3000000, 10)


In [2]:
print("\n📗 Books Info Columns:")
print(books_df.columns.tolist())

print("\n📘 Reviews Columns:")
print(reviews_df.columns.tolist())


📗 Books Info Columns:
['Title', 'description', 'authors', 'image', 'previewLink', 'publisher', 'publishedDate', 'infoLink', 'categories', 'ratingsCount']

📘 Reviews Columns:
['Id', 'Title', 'Price', 'User_id', 'profileName', 'review/helpfulness', 'review/score', 'review/time', 'review/summary', 'review/text']


In [3]:
books = pd.merge(books_df,reviews_df, on = 'Title')
books.shape

(3000000, 19)

In [4]:
df = books[['Title','review/score','review/text','categories']]
df.head()

Unnamed: 0,Title,review/score,review/text,categories
0,Its Only Art If Its Well Hung!,4.0,This is only for Julie Strain fans. It's a col...,['Comics & Graphic Novels']
1,Dr. Seuss: American Icon,5.0,I don't care much for Dr. Seuss but after read...,['Biography & Autobiography']
2,Dr. Seuss: American Icon,5.0,"If people become the books they read and if ""t...",['Biography & Autobiography']
3,Dr. Seuss: American Icon,4.0,"Theodore Seuss Geisel (1904-1991), aka &quot;D...",['Biography & Autobiography']
4,Dr. Seuss: American Icon,4.0,Philip Nel - Dr. Seuss: American IconThis is b...,['Biography & Autobiography']


In [5]:
df.drop_duplicates(inplace = True)
df.shape

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df.drop_duplicates(inplace = True)


(2617289, 4)

In [6]:
# df.dropna(inplace=True)
df.isna().sum()

Unnamed: 0,0
Title,207
review/score,0
review/text,8
categories,465848


We can easily drop the title and reviews which are null.

In [7]:
# Drop rows with missing title or review text
df = df.dropna(subset=['Title', 'review/text'])

but for the categories we have alot of null value. so we will not drop them

In [8]:
df.isna().sum()

Unnamed: 0,0
Title,0
review/score,0
review/text,0
categories,465639


---

So our Plan: **Semantic Category Imputation**

We’ll use embeddings + similarity search to fill missing categories.

---

Step 1 — Separate the Data

Split your DataFrame into:

Known categories (we’ll call this df_known)

Missing categories (df_missing)

---


In [9]:
df_known = df.dropna(subset=['categories']).reset_index(drop=True)
df_missing = df[df['categories'].isna()].reset_index(drop=True)
print(f"Known categories: {len(df_known)}, Missing categories: {len(df_missing)}")


Known categories: 2151435, Missing categories: 465639


---
Step 2️ — Choose the Text for Embeddings

Combine both title and review text for better semantic meaning.

---


In [10]:
df_known['combined_text'] = df_known['Title'].astype(str) + " " + df_known['review/text'].astype(str)
df_missing['combined_text'] = df_missing['Title'].astype(str) + " " + df_missing['review/text'].astype(str)


---
Step 3️ — Generate Embeddings (Hugging Face)

We’ll use sentence-transformers (all-MiniLM-L6-v2), which is light but powerful.

---


In [13]:
!pip install -q sentence-transformers


In [None]:
# from sentence_transformers import SentenceTransformer, util
# import torch

# model = SentenceTransformer('all-MiniLM-L6-v2')

# # Generate embeddings
# known_embeddings = model.encode(df_known['combined_text'].tolist(), convert_to_tensor=True)
# missing_embeddings = model.encode(df_missing['combined_text'].tolist(), convert_to_tensor=True)




# ----------------------------------------------------------------


# from sentence_transformers import SentenceTransformer
# import torch
# import numpy as np
# from tqdm.auto import tqdm

# # Check GPU
# device = "cuda" if torch.cuda.is_available() else "cpu"
# print("Using device:", device)

# # Load the model directly on GPU
# model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2', device=device)

# # Function to embed in batches safely
# def batched_encode(texts, batch_size=256):
#     embeddings = []
#     for i in tqdm(range(0, len(texts), batch_size)):
#         batch = texts[i:i+batch_size]
#         emb = model.encode(batch, convert_to_tensor=True, show_progress_bar=False)
#         embeddings.append(emb.cpu())  # move from GPU to CPU memory
#     return torch.cat(embeddings)

# # ⚠️ Don’t process millions at once — sample first
# df_known_sample = df_known.sample(20000, random_state=42)
# df_missing_sample = df_missing.sample(5000, random_state=42)

# # Combine text columns (if not done already)
# df_known_sample["combined_text"] = df_known_sample["Title"] + " " + df_known_sample["review/text"]
# df_missing_sample["combined_text"] = df_missing_sample["Title"] + " " + df_missing_sample["review/text"]

# # Generate embeddings
# known_embeddings = batched_encode(df_known_sample['combined_text'].tolist(), batch_size=256)
# missing_embeddings = batched_encode(df_missing_sample['combined_text'].tolist(), batch_size=256)


# ------------------------------------------------------------------


from sentence_transformers import SentenceTransformer
import torch
import numpy as np
import pandas as pd
from tqdm.auto import tqdm

# ✅ Check GPU availability
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"\n🔥 Using device: {device}")

# ✅ Load embedding model on GPU
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2', device=device)

# ✅ Function to embed text safely in batches
def batched_encode(texts, batch_size=256):
    embeddings = []
    for i in tqdm(range(0, len(texts), batch_size), desc="Embedding batch progress"):
        batch = texts[i:i+batch_size]
        emb = model.encode(batch, convert_to_tensor=True, show_progress_bar=False)
        embeddings.append(emb.cpu())  # move from GPU to CPU
    return torch.cat(embeddings)

# ✅ Process full dataset in manageable chunks
def process_full_dataset(df, chunk_size=20000, batch_size=256, label="known"):
    print(f"\n🚀 Starting embedding for '{label}' dataset with {len(df):,} rows...")
    all_embeddings = []

    for start in range(0, len(df), chunk_size):
        end = min(start + chunk_size, len(df))
        print(f"➡️  Processing rows {start} to {end} ({end - start} rows)...")

        df_chunk = df.iloc[start:end].copy()
        df_chunk["combined_text"] = (
            df_chunk["Title"].astype(str) + " " + df_chunk["review/text"].astype(str)
        )

        emb = batched_encode(df_chunk["combined_text"].tolist(), batch_size=batch_size)
        all_embeddings.append(emb)

    final_emb = torch.cat(all_embeddings)
    print(f"✅ Done! Shape for '{label}' embeddings: {final_emb.shape}")
    return final_emb


# 🧩 Run on both datasets
known_embeddings = process_full_dataset(df_known, chunk_size=20000, batch_size=256, label="known")
missing_embeddings = process_full_dataset(df_missing, chunk_size=20000, batch_size=256, label="missing")



🔥 Using device: cuda

🚀 Starting embedding for 'known' dataset with 2,151,435 rows...
➡️  Processing rows 0 to 20000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 20000 to 40000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 40000 to 60000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 60000 to 80000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 80000 to 100000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 100000 to 120000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 120000 to 140000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 140000 to 160000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 160000 to 180000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 180000 to 200000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 200000 to 220000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 220000 to 240000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 240000 to 260000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 260000 to 280000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 280000 to 300000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 300000 to 320000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 320000 to 340000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 340000 to 360000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 360000 to 380000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 380000 to 400000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 400000 to 420000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 420000 to 440000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 440000 to 460000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 460000 to 480000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 480000 to 500000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 500000 to 520000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 520000 to 540000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 540000 to 560000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 560000 to 580000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 580000 to 600000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 600000 to 620000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 620000 to 640000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 640000 to 660000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 660000 to 680000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 680000 to 700000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 700000 to 720000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 720000 to 740000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 740000 to 760000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 760000 to 780000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 780000 to 800000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 800000 to 820000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 820000 to 840000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 840000 to 860000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 860000 to 880000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 880000 to 900000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 900000 to 920000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 920000 to 940000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 940000 to 960000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 960000 to 980000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 980000 to 1000000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1000000 to 1020000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1020000 to 1040000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1040000 to 1060000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1060000 to 1080000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1080000 to 1100000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1100000 to 1120000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1120000 to 1140000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1140000 to 1160000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1160000 to 1180000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1180000 to 1200000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1200000 to 1220000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1220000 to 1240000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1240000 to 1260000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1260000 to 1280000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1280000 to 1300000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1300000 to 1320000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1320000 to 1340000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1340000 to 1360000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1360000 to 1380000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1380000 to 1400000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1400000 to 1420000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1420000 to 1440000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1440000 to 1460000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1460000 to 1480000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1480000 to 1500000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1500000 to 1520000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1520000 to 1540000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1540000 to 1560000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1560000 to 1580000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1580000 to 1600000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1600000 to 1620000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1620000 to 1640000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1640000 to 1660000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1660000 to 1680000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1680000 to 1700000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1700000 to 1720000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1720000 to 1740000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1740000 to 1760000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1760000 to 1780000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1780000 to 1800000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1800000 to 1820000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1820000 to 1840000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1840000 to 1860000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1860000 to 1880000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1880000 to 1900000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1900000 to 1920000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1920000 to 1940000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1940000 to 1960000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1960000 to 1980000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 1980000 to 2000000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 2000000 to 2020000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 2020000 to 2040000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 2040000 to 2060000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 2060000 to 2080000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 2080000 to 2100000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 2100000 to 2120000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 2120000 to 2140000 (20000 rows)...


Embedding batch progress:   0%|          | 0/79 [00:00<?, ?it/s]

➡️  Processing rows 2140000 to 2151435 (11435 rows)...


Embedding batch progress:   0%|          | 0/45 [00:00<?, ?it/s]

In [2]:
print("known_embeddings:", known_embeddings.shape)
print("missing_embeddings:", missing_embeddings.shape)


---
Step 4️ — Find Similar Books

For each missing category book, find the most semantically similar book from the known ones.

We’ll use cosine similarity.

---


In [None]:
filled_categories = []

for i, emb in enumerate(missing_embeddings):
    similarities = util.cos_sim(emb, known_embeddings)[0]
    top_idx = torch.argmax(similarities).item()
    filled_categories.append(df_known.iloc[top_idx]['categories'])


---

Step 5️ — Assign the Inferred Categories

---


In [None]:
df_missing['categories'] = filled_categories

Then merge both parts back

In [None]:
df_final = pd.concat([df_known, df_missing]).reset_index(drop=True)
print("✅ Missing categories filled using semantic similarity!")


---

Step 6️ — Check the Fill Quality

---




In [None]:
print(df_final['categories'].value_counts().head(10))
print("Total rows:", len(df_final))
print("Missing categories after fill:", df_final['categories'].isna().sum())