Phase 1: Bronze Layer – Data Ingestion & Storage (1.5 Hours)
- Load	raw	text	files	(TXT,	JSON,	CSV,	or	scraped	data)	into	Databricks	Delta	Lake.
- Store	data	in	the	Bronze	Table	without	modifications.
- Log	metadata,	file	schema,	and	perform	basic	validation.
- Apply	basic	data	quality	checks	(e.g.,	missing	values,	schema	validation).
- Create	a	data	ingestion	pipeline	using	Databricks	Auto	Loader	or	PySpark.

In [0]:
from pyspark.sql.functions import col, lit

# Uploaded the source csv file to Delta Lake as a table
# This table can be read as Spark tabel
news_df = spark.read.table("bbc_news")

bronze_table_path = "/mnt/delta/bronze/bbc_news"
news_df.write.format("delta").mode("overwrite").save(bronze_table_path)
spark.read.format("delta").load("/mnt/delta/bronze/bbc_news").limit(10).display()

In [0]:
news_df = spark.read.format("delta").load("/mnt/delta/bronze/bbc_news")
news_df.printSchema()

In [0]:
# Check null % count
record_count = news_df.count()
for column_name in news_df.columns:
    missing_values = news_df.filter(col(column_name).isNull()).count()
    print(f"Missing values in {column_name} : {round((missing_values /record_count) * 100, 2) }% of total records")

Phase 2: Silver Layer – Data Cleansing & AI Enrichment (2 Hours)

•	Perform	text	preprocessing	such	as	lowercasing,	punctuation	removal,	tokenization.
•	Handle	stopwords,	special	characters,	and	formatting	inconsistencies.
•	Apply	GPT-2	for	text	augmentation	including	missing	text	generation,	readability	improvement,	and	text	expansion.
•	Store	cleaned	and	AI-enhanced	data	in	the	Silver	Table.
•	Track	data	lineage	to	show	transformations.

In [0]:
%python
from pyspark.sql.functions import lower, regexp_replace, split, trim, col
from pyspark.sql.functions import array_remove
from transformers import pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from transformers import pipeline
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

# Lowercasing
news_df = news_df.withColumn("description", lower(col("description")))

# Remove punctuation & special characters (excluding spaces)
news_df = news_df.withColumn("description", regexp_replace(col("description"), "[^\w\s]", ""))

# Remove multiple spaces and trim extra whitespace
news_df = news_df.withColumn("description", regexp_replace(col("description"), "\s+", " "))
news_df = news_df.withColumn("description", trim(col("description")))

# Tokenization (split on whitespace)
news_df = news_df.withColumn("tokens", split(col("description"), " "))

#stopword removal
stopwords = ["a", "an", "the", "is", "in", "on", "at", "and", "to", "of", "for", "with", "this", "that", "it", "as", "are", "was", "by"]  

for stopword in stopwords:
    news_df = news_df.withColumn("tokens", array_remove(col("tokens"), stopword))

# Reconstruct text after stopword removal
news_df = news_df.withColumn("description", regexp_replace(col("tokens").cast("string"), "[\[\],']", ""))

# Drop unnecessary columns
news_df = news_df.drop("tokens")

# Display result
display(news_df)

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from transformers import pipeline

# Load GPT-2 model once
generator = pipeline('text-generation', model='gpt2')

# Function to augment text using GPT-2 with size restrictions
def augment_text(text, max_length=50):  # Limit generated text length
    input_text = text[:100]  # Limit input text length to 300 characters (if too long)
    augmented = generator(input_text, max_length=max_length, num_return_sequences=1)
    generated_text = augmented[0]['generated_text']
    return generated_text[:50]  # Limit output to 500 characters

# Register the function as a UDF
augment_text_udf = udf(lambda text: augment_text(text), StringType())

# Limit the number of rows to process and repartition into smaller batches
limited_df = news_df.limit(100).repartition(10)  # Split into smaller partitions

# Apply text augmentation to the 'description' column
augmented_df = limited_df.withColumn('augmented_description', augment_text_udf(limited_df['description']))


In [0]:
display(augmented_df['augmented_description'])
silver_table_path = "/mnt/delta/silver/bbc_news"
news_df.write.format("delta").mode("overwrite").save(silver_table_path)

Phase 3: Gold Layer – Summarization & AI-Driven Insights (2.5 Hours)
•	Apply	GPT-2-based	summarization	to	generate	key	insights.
•	Perform	topic	modeling	(LDA,	BERT	embeddings)	for	text	classification.
•	Conduct	sentiment	analysis.
•	Generate	text	embeddings	for	AI-powered	search.
•	Store	final	structured	dataset	in	the	Gold	Table.

In [0]:
from transformers import pipeline
import pyspark.sql.functions as F
summarizer = pipeline("summarization", model = "facebook/bart-large-cnn")

def summary_text(text):
    summary = summarizer(text, max_length=50, min_length=5, do_sample=False)
    return summary[0]['summary_text']

# Add a UDF to find summary of each of the description
summary_udf = udf(summary_text, F.StringType())

news_df = news_df.withColumn("summary", summary_udf(F.col("description")))

In [0]:
spark.conf.set("spark.databricks.driver.maxResultSize", "2g")

In [0]:
# Phase 4: Model Training & Fine-Tuning (1.5 Hours)
# •	Fine-tune	GPT-2	on	custom.
# •	Train	a	simple	text	classification	model	(e.g.,	classifying	reviews,	topics).
# •	Compare	pre-trained	vs	fine-tuned	model	outputs.
# •	Save	trained	models	in	MLflow	for	tracking

In [0]:
!pip install torch spacy transformers datasets
!python -m spacy download en_core_web_sm
!pip install 'accelerate>=0.26.0'

In [0]:
!pip install 'accelerate>=0.26.0'

In [0]:
import torch
import re
import spacy
import mlflow
import mlflow.pytorch
from transformers import GPT2Tokenizer, GPT2ForSequenceClassification, Trainer, TrainingArguments, pipeline
from datasets import Dataset
from sklearn.metrics import accuracy_score

# Load spaCy model
nlp = spacy.load("en_core_web_sm")

# Load GPT-2 tokenizer
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
tokenizer.pad_token = tokenizer.eos_token

def clean_text(text):
    doc = nlp(text.lower())
    words = [token.lemma_ for token in doc if not token.is_stop and token.is_alpha]
    return " ".join(words)

# Create small dataset
data = [
    {"text": "I love this movie, it was fantastic!", "label": 1},
    {"text": "The plot was boring and predictable.", "label": 0},
    {"text": "Amazing cinematography and great acting!", "label": 1},
    {"text": "Terrible script and poor character development.", "label": 0},
    {"text": "A masterpiece! Highly recommended.", "label": 1},
    {"text": "Worst movie I have ever seen.", "label": 0}
]

# Clean dataset
for item in data:
    item["text"] = clean_text(item["text"])

def tokenize_function(examples):
    return tokenizer(examples["text"], padding="max_length", truncation=True, max_length=128)

# Convert dataset to Hugging Face format
dataset = Dataset.from_list(data)
dataset = dataset.map(tokenize_function, batched=True)

# Split dataset
train_test = dataset.train_test_split(test_size=0.3)
train_dataset, test_dataset = train_test["train"], train_test["test"]

# Load Pre-trained GPT-2 model with classification head
pretrained_model = GPT2ForSequenceClassification.from_pretrained("gpt2", num_labels=2)
pretrained_model.config.pad_token_id = pretrained_model.config.eos_token_id

# Fine-tuned model
finetuned_model = GPT2ForSequenceClassification.from_pretrained("gpt2", num_labels=2)
finetuned_model.config.pad_token_id = finetuned_model.config.eos_token_id

# Training arguments
training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    per_device_train_batch_size=2,
    per_device_eval_batch_size=2,
    num_train_epochs=3,
    logging_dir="./logs",
)

# Trainer
trainer = Trainer(
    model=finetuned_model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
)

with mlflow.start_run():
    trainer.train()
    
    # Save fine-tuned model
    torch.save(finetuned_model.state_dict(), "gpt2_finetuned.pth")
    mlflow.pytorch.log_model(finetuned_model, "gpt2_finetuned")
    
    # Load pipelines
    pretrained_classifier = pipeline("text-classification", model=pretrained_model, tokenizer=tokenizer)
    finetuned_classifier = pipeline("text-classification", model=finetuned_model, tokenizer=tokenizer)
    
    # Compare outputs
    test_texts = [item["text"] for item in data]
    test_labels = [item["label"] for item in data]
    
    pretrained_predictions = [int(p["label"].split("_")[-1]) for p in pretrained_classifier(test_texts)]
    finetuned_predictions = [int(p["label"].split("_")[-1]) for p in finetuned_classifier(test_texts)]
    
    pretrained_accuracy = accuracy_score(test_labels, pretrained_predictions)
    finetuned_accuracy = accuracy_score(test_labels, finetuned_predictions)
    
    print("Pretrained Model Accuracy:", pretrained_accuracy)
    print("Fine-tuned Model Accuracy:", finetuned_accuracy)
    
    mlflow.log_metric("pretrained_accuracy", pretrained_accuracy)
    mlflow.log_metric("finetuned_accuracy", finetuned_accuracy)
    
    print("Training complete. Model comparison done.")


In [0]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from wordcloud import WordCloud
from collections import Counter
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

def generate_business_insights_plots(df):
    # 1. Word Cloud of Titles
    plt.figure(figsize=(10, 6))
    title_wordcloud = WordCloud(width=800, height=400, background_color='white').generate(' '.join(df['cleaned_title']))
    plt.imshow(title_wordcloud, interpolation='bilinear')
    plt.axis('off')
    plt.title('Word Cloud of Titles')
    plt.show()

    # 2. Word Cloud of Descriptions
    plt.figure(figsize=(10, 6))
    description_wordcloud = WordCloud(width=800, height=400, background_color='white').generate(' '.join(df['cleaned_description']))
    plt.imshow(description_wordcloud, interpolation='bilinear')
    plt.axis('off')
    plt.title('Word Cloud of Descriptions')
    plt.show()

    # 3. Top 10 Most Frequent Words in Titles
    title_words = ' '.join(df['cleaned_title']).split()
    top_title_words = Counter(title_words).most_common(10)
    top_title_words_df = pd.DataFrame(top_title_words, columns=['Word', 'Frequency'])

    plt.figure(figsize=(10, 6))
    sns.barplot(x='Frequency', y='Word', data=top_title_words_df, palette='viridis')
    plt.title('Top 10 Most Frequent Words in Titles')
    plt.show()

    # 4. Top 10 Most Frequent Words in Descriptions
    description_words = ' '.join(df['cleaned_description']).split()
    top_description_words = Counter(description_words).most_common(10)
    top_description_words_df = pd.DataFrame(top_description_words, columns=['Word', 'Frequency'])

    plt.figure(figsize=(10, 6))
    sns.barplot(x='Frequency', y='Word', data=top_description_words_df, palette='viridis')
    plt.title('Top 10 Most Frequent Words in Descriptions')
    plt.show()

    # 5. Length Distribution of Titles
    df['title_length'] = df['title'].apply(len)
    plt.figure(figsize=(10, 6))
    sns.histplot(df['title_length'], kde=True, color='blue', bins=30)
    plt.title('Length Distribution of Titles')
    plt.show()

    # 6. Length Distribution of Descriptions
    df['description_length'] = df['description'].apply(len)
    plt.figure(figsize=(10, 6))
    sns.histplot(df['description_length'], kde=True, color='green', bins=30)
    plt.title('Length Distribution of Descriptions')
    plt.show()

    # 7. Cosine similarity between the title and description for each article
    vectorizer = TfidfVectorizer(stop_words='english')

    # Vectorize the title and description columns
    title_tfidf = vectorizer.fit_transform(df['title'])
    description_tfidf = vectorizer.transform(df['description'])

    # Compute cosine similarity between the title and description for each article
    similarity_scores = cosine_similarity(title_tfidf, description_tfidf)

    # Add the cosine similarity scores to the DataFrame
    df['similarity_score'] = similarity_scores.diagonal()

    # Plotting the distribution of similarity scores
    plt.figure(figsize=(10, 6))
    sns.histplot(df['similarity_score'], kde=True, color='blue', bins=10)
    plt.title('Distribution of Cosine Similarity Scores between Titles and Descriptions')
    plt.xlabel('Similarity Score')
    plt.ylabel('Frequency')
    plt.show()

    # Plotting the similarity scores in a box plot
    plt.figure(figsize=(10, 6))
    sns.boxplot(x=df['similarity_score'], color='green')
    plt.title('Box Plot of Cosine Similarity Scores between Titles and Descriptions')
    plt.xlabel('Similarity Score')
    plt.show()



Phase 5: Visualization, Queries & Reporting (1.5 Hours)
•	Query	structured	insights	using	Databricks	SQL	&	PySpark.
•	Build	a	basic	visualization	dashboard	using	Databricks	Notebooks	&	Plotly.
•	Validate	data	lineage	from	Bronze	→	Silver	→	Gold.
•	Discuss	real-world	applications	of	this	pipeline	(e.g.,	news	summarization,	chatbot	
training)

All	of	the	below	should	be	uploaded	in	the	group’s	github	repository.	
1.	Databricks	Notebooks	for	each	transformation	phase.
2.	Bronze,	Silver,	and	Gold	Delta	Tables	with	respective	data	stages (screenshots).
3.	AI-enriched,	structured	insights	stored	in	Gold	Layer (screenshots).
4.	Basic	ML	model	&	embeddings (print the	model	and	embeddings	in	a	file)
5.	Final	queries,	dashboards,	and	visualizations