# Data Engineering Pipeline Notebook

So, as we explained last week, we introduced the concept of a three-layer data pipeline: Bronze, Silver, and Gold. In last week's notebook, we focused on the Bronze layer, where we scraped raw data from a website and stored it in its most basic, unprocessed form in our Minio object storage.

Today, we'll continue to build on that foundation and move to the next stages of the pipeline. Specifically, we will:

1. Clean and process the data from the Bronze layer to create the Silver layer. This layer represents data that has been transformed and standardized, but it’s still not yet enriched or ready for final analysis.

2. Enrich the data for the Gold layer, where we will apply additional transformations such as sentiment analysis to extract more value and insight from the data. This enriched data will be stored and ready for reporting and analysis.

In this notebook, we will:

- Set up our Minio client and ensure that the appropriate buckets exist for each layer.
- Transform the raw data from the Bronze layer to clean and standardized data for the Silver layer.
- Apply enrichment techniques, like sentiment analysis, for the Gold layer.
- Store the Silver data as Parquet files (for optimized storage and processing) and the Gold data as CSV files (for easy use in reporting or other tools).

By the end of this notebook, we will have taken raw, unprocessed data from the Bronze layer and transformed it into clean, enriched data that is stored and ready for further use in downstream analysis or business applications.

### Imports:


In [4]:
import pandas as pd
from io import BytesIO
from minio import Minio
from datetime import datetime
import re
from textblob import TextBlob
import csv
from io import StringIO
import pyarrow as pa
import pyarrow.parquet as pq
import requests
from bs4 import BeautifulSoup

### 1. Setup Minio Client
In this step, we will set up the Minio client and check if the required buckets for the Bronze, Silver, and Gold layers exist. If they don't exist, we will create them.

In [5]:

def setup_minio_client():
    minio_client = Minio('localhost:9000',
                         access_key='ROOTUSER',
                         secret_key='DATAINCUBATOR',
                         secure=False)
    # Create buckets if they don't exist
    for bucket in ['bronze', 'silver', 'gold']:
        if not minio_client.bucket_exists(bucket):
            minio_client.make_bucket(bucket)
            print(f"Bucket '{bucket}' created successfully")
    
    return minio_client


minio_client = setup_minio_client()
print("Minio client setup completed. Buckets checked/created.")

Minio client setup completed. Buckets checked/created.


Expected Output:
- Bucket 'bronze' created successfully
- Bucket 'silver' created successfully
- Bucket 'gold' created successfully
- Minio client setup completed. Buckets checked/created.


### 2. Bronze Layer: Scrape Raw Data
In this section, we will scrape raw data from a source (e.g., quotes or books) and store it in the Bronze layer. This data is unprocessed and in its raw form.

In [6]:

def scrape_books_data():
    url = "https://books.toscrape.com/catalogue/page-1.html"
    response = requests.get(url)

    if response.status_code == 200:
        html_content = response.text
        soup = BeautifulSoup(html_content, 'html.parser')
        book_rows = soup.find_all('article', class_='product_pod')  
        
        books_data = []
        for book in book_rows:
            title = book.find('h3').find('a')['title']
            price = book.find('p', class_='price_color').text.strip()
            availability = book.find('p', class_='instock availability').text.strip()
            rating = book.find('p', class_='star-rating')['class'][1]

            books_data.append({
                'title': title,
                'price': price,
                'availability': availability,
                'rating': rating
            })

        return books_data
    else:
        print(f"Failed to fetch page, status code: {response.status_code}")
        return None
    
    
book_data = scrape_books_data()
print("Raw book data scraped for Bronze layer.")
print(book_data[:3])  # Displaying first 3 items to check the data

Raw book data scraped for Bronze layer.
[{'title': 'A Light in the Attic', 'price': 'Â£51.77', 'availability': 'In stock', 'rating': 'Three'}, {'title': 'Tipping the Velvet', 'price': 'Â£53.74', 'availability': 'In stock', 'rating': 'One'}, {'title': 'Soumission', 'price': 'Â£50.10', 'availability': 'In stock', 'rating': 'One'}]


#### Expected output

- Raw book data scraped for Bronze layer.
- [{'title': 'A Light in the Attic', 'price': 'Â£51.77', 'availability': 'In stock', 'rating': 'Three'}, {'title': 'Tipping the Velvet', 'price': 'Â£53.74', 'availability': 'In stock', 'rating': 'One'}, {'title': 'Soumission', 'price': 'Â£50.10', 'availability': 'In stock', 'rating': 'One'}]

In [7]:

def scrape_quotes_data():
    url = "http://quotes.toscrape.com/page/1/"
    response = requests.get(url)

    if response.status_code == 200:
        html_content = response.text
        soup = BeautifulSoup(html_content, 'html.parser')
        quote_blocks = soup.find_all('div', class_='quote')  
        
        quotes_data = []
        for quote in quote_blocks:
            text = quote.find('span', class_='text').text.strip()
            author = quote.find('small', class_='author').text.strip()
            quotes_data.append({
                'text': text,
                'author': author,
            })

        return quotes_data
    else:
        print(f"Failed to fetch page, status code: {response.status_code}")
        return None
    

quote_data = scrape_quotes_data()
print("Raw quote data scraped for Bronze layer.")
print(quote_data[:3])  # Displaying first 3 items to check the data

Raw quote data scraped for Bronze layer.
[{'text': '“The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking.”', 'author': 'Albert Einstein'}, {'text': '“It is our choices, Harry, that show what we truly are, far more than our abilities.”', 'author': 'J.K. Rowling'}, {'text': '“There are only two ways to live your life. One is as though nothing is a miracle. The other is as though everything is a miracle.”', 'author': 'Albert Einstein'}]


- Raw quote data scraped for Bronze layer.
- [{'text': '“The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking.”', 'author': 'Albert Einstein'}, {'text': '“It is our choices, Harry, that show what we truly are, far more than our abilities.”', 'author': 'J.K. Rowling'}, {'text': '“There are only two ways to live your life. One is as though nothing is a miracle. The other is as though everything is a miracle.”', 'author': 'Albert Einstein'}]


### 3. Silver Layer: Clean and Process Data

In this step, we will clean and standardize the data from the Bronze layer to prepare it for more advanced processing. For instance, we may remove unwanted symbols, standardize certain fields, and handle missing data.

In [8]:

def clean_books_data(books_data):
    cleaned_data = []
    timestamp = datetime.now().isoformat()
    for book in books_data:
        # Remove non-numeric characters except the decimal point
        price_str = re.sub(r'[^\d.]', '', book['price'])
        try:
            price = float(price_str)
        except ValueError:
            print(f"Could not convert price to float for book: {book['title']}")
            price = None  # Set price to None if conversion fails

        # Standardize availability field
        availability = book['availability'].replace('\n', '').strip()
        
        # Add timestamp metadata
        enriched_book = {
            'title': book['title'],
            'price': price,
            'availability': availability,
            'rating': book['rating'],
            'scrape_timestamp': timestamp
        }
        cleaned_data.append(enriched_book)
    return cleaned_data

cleaned_books_data = clean_books_data(book_data)
print("Silver layer data cleaned and standardized.")
print(cleaned_books_data[:3])  # Displaying first 3 cleaned items

Silver layer data cleaned and standardized.
[{'title': 'A Light in the Attic', 'price': 51.77, 'availability': 'In stock', 'rating': 'Three', 'scrape_timestamp': '2024-11-06T22:37:18.432391'}, {'title': 'Tipping the Velvet', 'price': 53.74, 'availability': 'In stock', 'rating': 'One', 'scrape_timestamp': '2024-11-06T22:37:18.432391'}, {'title': 'Soumission', 'price': 50.1, 'availability': 'In stock', 'rating': 'One', 'scrape_timestamp': '2024-11-06T22:37:18.432391'}]


#### Expected output

- Silver layer data cleaned and standardized.
- [{'title': 'A Light in the Attic', 'price': 51.77, 'availability': 'In stock', 'rating': 'Three', 'scrape_timestamp': '2024-11-06T22:09:19.228605'}, {'title': 'Tipping the Velvet', 'price': 53.74, 'availability': 'In stock', 'rating': 'One', 'scrape_timestamp': '2024-11-06T22:09:19.228605'}, {'title': 'Soumission', 'price': 50.1, 'availability': 'In stock', 'rating': 'One', 'scrape_timestamp': '2024-11-06T22:09:19.228605'}]


In [9]:

def clean_quotes_data(quotes_data):
    cleaned_data = []
    timestamp = datetime.now().isoformat()
    for quote in quotes_data:
        # Only keep entries with 'text' and 'author' fields
        if 'text' in quote and 'author' in quote:
            cleaned_quote = {
                'text': quote['text'],
                'author': quote['author'],
                'scrape_timestamp': timestamp
            }
            cleaned_data.append(cleaned_quote)
    return cleaned_data

cleaned_quotes_data = clean_quotes_data(quote_data)
print("Silver layer data cleaned and standardized.")
print(cleaned_quotes_data[:3])  # Displaying first 3 cleaned items

Silver layer data cleaned and standardized.
[{'text': '“The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking.”', 'author': 'Albert Einstein', 'scrape_timestamp': '2024-11-06T22:37:24.370208'}, {'text': '“It is our choices, Harry, that show what we truly are, far more than our abilities.”', 'author': 'J.K. Rowling', 'scrape_timestamp': '2024-11-06T22:37:24.370208'}, {'text': '“There are only two ways to live your life. One is as though nothing is a miracle. The other is as though everything is a miracle.”', 'author': 'Albert Einstein', 'scrape_timestamp': '2024-11-06T22:37:24.370208'}]


#### Expected output

- Silver layer data cleaned and standardized.
- [{'text': '“The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking.”', 'author': 'Albert Einstein', 'scrape_timestamp': '2024-11-06T22:11:29.852715'}, {'text': '“It is our choices, Harry, that show what we truly are, far more than our abilities.”', 'author': 'J.K. Rowling', 'scrape_timestamp': '2024-11-06T22:11:29.852715'}, {'text': '“There are only two ways to live your life. One is as though nothing is a miracle. The other is as though everything is a miracle.”', 'author': 'Albert Einstein', 'scrape_timestamp': '2024-11-06T22:11:29.852715'}]


### 4. Silver Layer: Save Cleaned Data to Parquet

Now that the data is cleaned, we'll save it to the Silver layer as a Parquet file. This format is efficient for both storage and processing.

In [10]:
def save_data_to_minio_parquet(data, minio_client, bucket_name, object_name):
    # Convert data to a pandas DataFrame
    df = pd.DataFrame(data)
    
    # Save DataFrame as Parquet to BytesIO
    parquet_data = BytesIO()
    df.to_parquet(parquet_data, engine='pyarrow', index=False)
    parquet_data.seek(0)  # Reset pointer to the start of the file

    if not minio_client.bucket_exists(bucket_name):
        minio_client.make_bucket(bucket_name)
    
    minio_client.put_object(
        bucket_name, object_name, parquet_data, len(parquet_data.getvalue())
    )
    print(f"Data saved successfully as {object_name} in bucket '{bucket_name}'.")

if book_data:
    cleaned_books_data = clean_books_data(book_data)
    save_data_to_minio_parquet(cleaned_books_data, minio_client, 'silver', f'books_data_silver_{datetime.now().strftime("%Y%m%d")}.parquet')

if quote_data:
    cleaned_quotes_data = clean_quotes_data(quote_data)
    save_data_to_minio_parquet(cleaned_quotes_data, minio_client, 'silver', f'quotes_data_silver_{datetime.now().strftime("%Y%m%d")}.parquet')

Data saved successfully as books_data_silver_20241106.parquet in bucket 'silver'.
Data saved successfully as quotes_data_silver_20241106.parquet in bucket 'silver'.


### 5. Gold Layer: Enrich Data with Sentiment Analysis and Price Category

In this step, we apply sentiment analysis to the Silver layer data to enrich it. This adds additional value to the data and allows us to perform more advanced analytics on it.

In [12]:

def add_price_category(books_data):
    for book in books_data:
        if book['price'] < 10:
            book['price_category'] = 'cheap'
        elif book['price'] < 20:
            book['price_category'] = 'moderate'
        else:
            book['price_category'] = 'expensive'
    return books_data

def add_sentiment_analysis(quotes_data):
    enriched_data = []
    for quote in quotes_data:
        sentiment = TextBlob(quote['text']).sentiment.polarity  # Sentiment value between -1 and 1
        enriched_quote = quote.copy()
        enriched_quote['sentiment'] = sentiment
        enriched_data.append(enriched_quote)
    return enriched_data


gold_books_data = add_price_category(cleaned_books_data)
gold_quotes_data = add_sentiment_analysis(cleaned_quotes_data)
print("Gold layer data enriched with sentiment analysis.")
print(gold_quotes_data[:3])  # Displaying first 3 enriched items

Gold layer data enriched with sentiment analysis.
[{'text': '“The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking.”', 'author': 'Albert Einstein', 'scrape_timestamp': '2024-11-06T22:37:31.415431', 'sentiment': 0.0}, {'text': '“It is our choices, Harry, that show what we truly are, far more than our abilities.”', 'author': 'J.K. Rowling', 'scrape_timestamp': '2024-11-06T22:37:31.415431', 'sentiment': 0.3}, {'text': '“There are only two ways to live your life. One is as though nothing is a miracle. The other is as though everything is a miracle.”', 'author': 'Albert Einstein', 'scrape_timestamp': '2024-11-06T22:37:31.415431', 'sentiment': 0.0037878787878787845}]


#### Expected output:
- Gold layer data enriched with sentiment analysis.
- [{'text': '“The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking.”', 'author': 'Albert Einstein', 'scrape_timestamp': '2024-11-06T22:28:36.654356', 'sentiment': 0.0}, {'text': '“It is our choices, Harry, that show what we truly are, far more than our abilities.”', 'author': 'J.K. Rowling', 'scrape_timestamp': '2024-11-06T22:28:36.654356', 'sentiment': 0.3}, {'text': '“There are only two ways to live your life. One is as though nothing is a miracle. The other is as though everything is a miracle.”', 'author': 'Albert Einstein', 'scrape_timestamp': '2024-11-06T22:28:36.654356', 'sentiment': 0.0037878787878787845}]


### 6. Gold Layer: Save Enriched Data to CSV
Finally, we will save the enriched data to the Gold layer as a CSV file for easy reporting and downstream analysis.

In [13]:

def save_data_to_minio_csv(data, minio_client, bucket_name, object_name):
    # Convert data to a pandas DataFrame
    df = pd.DataFrame(data)
    
    # Save DataFrame as CSV to BytesIO
    csv_data = StringIO()
    df.to_csv(csv_data, index=False)
    csv_data.seek(0)  # Reset pointer to the start of the file

    if not minio_client.bucket_exists(bucket_name):
        minio_client.make_bucket(bucket_name)
    
    minio_client.put_object(
        bucket_name, object_name, BytesIO(csv_data.getvalue().encode('utf-8')), len(csv_data.getvalue())
    )
    print(f"Data saved successfully as {object_name} in bucket '{bucket_name}'.")


# Gold level data with added analysis (CSV)
if cleaned_books_data:
    gold_books_data = add_price_category(cleaned_books_data)
    save_data_to_minio_csv(gold_books_data, minio_client, 'gold', f'books_data_gold_{datetime.now().strftime("%Y%m%d")}.csv')

if cleaned_quotes_data:
    gold_quotes_data = add_sentiment_analysis(cleaned_quotes_data)
    save_data_to_minio_csv(gold_quotes_data, minio_client, 'gold', f'quotes_data_gold_{datetime.now().strftime("%Y%m%d")}.csv')


Data saved successfully as books_data_gold_20241106.csv in bucket 'gold'.
Data saved successfully as quotes_data_gold_20241106.csv in bucket 'gold'.
