In [2]:
# Cell 1: Install package from GitHub
%pip install "git+https://github.com/54F-A/library-pipeline.git@fabric"

StatementMeta(, 8088fb67-3921-46fa-9fc9-723175d5ed4c, 15, Finished, Available, Finished)

Collecting git+https://github.com/54F-A/library-pipeline.git@fabric
  Cloning https://github.com/54F-A/library-pipeline.git (to revision fabric) to /tmp/pip-req-build-9tzh5i3m
  Running command git clone --filter=blob:none --quiet https://github.com/54F-A/library-pipeline.git /tmp/pip-req-build-9tzh5i3m
  Running command git checkout -b fabric --track origin/fabric
  Switched to a new branch 'fabric'
  branch 'fabric' set up to track 'origin/fabric'.
  Resolved https://github.com/54F-A/library-pipeline.git to commit ac2babd3df2f4a0b75239c0ac86b2cd45704fe89
  Installing build dependencies ... [?25l- \ | / - \ done
[?25h  Getting requirements to build wheel ... [?25l- \ done
[?25h  Preparing metadata (pyproject.toml) ... [?25l- \ done
[?25hBuilding wheels for collected packages: library-pipeline
  Building wheel for library-pipeline (pyproject.toml) ... [?25l- \ | done
[?25h  Created wheel for library-pipeline: filename=library_pipeline-0.1.0-py3-none-an

In [3]:
# Cell 2: Import your functions
from data_processing.ingestion import load_csv, load_json
from data_processing.cleaning import (
    remove_duplicates, 
    handle_missing_values, 
    standardize_dates
)

print("✅ Package installed and imported successfully!")

StatementMeta(, 8088fb67-3921-46fa-9fc9-723175d5ed4c, 17, Finished, Available, Finished)

✅ Package installed and imported successfully!


In [4]:
# Cell 3: Load data from Lakehouse Files
import pandas as pd

# Read CSV from Files
file_path = "/lakehouse/default/Files/bronze/circulation_data.csv"
df_raw = pd.read_csv(file_path)

print(f"Loaded {len(df_raw)} rows")
print(df_raw.head())

StatementMeta(, 8088fb67-3921-46fa-9fc9-723175d5ed4c, 18, Finished, Available, Finished)

Loaded 5100 rows
  transaction_id member_id               isbn checkout_date return_date  \
0      TXN000000    M93810  978-0-433-21819-7    2024-08-17  2024-08-25   
1      TXN000001    M28289  978-0-338-90838-4    2024-08-15  2024-09-02   
2      TXN000002    M21395  978-1-02-654235-4    2023-12-27         NaN   
3      TXN000003    M38657  978-0-559-40781-9    2025-09-24         NaN   
4      TXN000004    M36062  978-0-8495-9310-9    2025-02-08  2025-02-16   

  branch_id  
0     BR012  
1     BR011  
2     BR001  
3     BR010  
4     BR012  


In [5]:
# Cell 4: Apply your cleaning functions (BRONZE → SILVER)
print("Applying data cleaning pipeline...")

# Remove duplicates
df_clean = remove_duplicates(df_raw, subset=['transaction_id'])
print(f"After removing duplicates: {len(df_clean)} rows")

# Handle missing values
df_clean = handle_missing_values(df_clean, strategy='drop')
print(f"After handling missing values: {len(df_clean)} rows")

# Standardize dates
df_clean = standardize_dates(df_clean, ['checkout_date', 'return_date'])
print("Dates standardized")

print(f"\n✅ Cleaning complete! {len(df_raw)} → {len(df_clean)} rows")

StatementMeta(, 8088fb67-3921-46fa-9fc9-723175d5ed4c, 19, Finished, Available, Finished)

Applying data cleaning pipeline...
After removing duplicates: 5000 rows
After handling missing values: 4227 rows
Dates standardized

✅ Cleaning complete! 5100 → 4227 rows


In [6]:
# Cell 5: Save as Delta table (SILVER layer)
# Convert pandas to Spark DataFrame
df_spark = spark.createDataFrame(df_clean)

# Write as Delta table
table_name = "silver_circulation"
df_spark.write.format("delta").mode("overwrite").saveAsTable(table_name)

print(f"✅ Created Delta table: {table_name}")

StatementMeta(, 8088fb67-3921-46fa-9fc9-723175d5ed4c, 20, Finished, Available, Finished)

✅ Created Delta table: silver_circulation


In [7]:
# Cell 6: Query the Delta table
query = f"""
SELECT 
    COUNT(*) as total_transactions,
    COUNT(DISTINCT member_id) as unique_members,
    COUNT(DISTINCT isbn) as unique_books,
    COUNT(DISTINCT branch_id) as branches
FROM {table_name}
"""

result = spark.sql(query)
result.show()

print("✅ Silver layer ready for analysis!")

StatementMeta(, 8088fb67-3921-46fa-9fc9-723175d5ed4c, 21, Finished, Available, Finished)

+------------------+--------------+------------+--------+
|total_transactions|unique_members|unique_books|branches|
+------------------+--------------+------------+--------+
|              4227|          4127|        4227|      30|
+------------------+--------------+------------+--------+

✅ Silver layer ready for analysis!
