In [None]:
import pandas as pd
from azure.storage.blob import BlobServiceClient
import io

# Configuration
account_name = "retailstorage11"
account_key = "Key of Azure Account"
container_name = "retail"

print("🔄 Attempting to connect to Azure Blob Storage...")

try:
    # Create blob service client
    account_url = f"https://{account_name}.blob.core.windows.net"
    print(f"📍 Account URL: {account_url}")
    
    blob_service_client = BlobServiceClient(account_url=account_url, credential=account_key)
    
    # Test connection by getting container client
    container_client = blob_service_client.get_container_client(container_name)
    print(f"📂 Container: {container_name}")
    
    # List blobs in container
    print("🔍 Listing blobs...")
    blob_list = list(container_client.list_blobs())
    
    print(f"✅ Successfully connected! Found {len(blob_list)} files:")
    
    if len(blob_list) == 0:
        print("📭 No files found in the container")
    else:
        for i, blob in enumerate(blob_list[:10]):  # Show first 10 files
            print(f"  {i+1}. {blob.name} ({blob.size} bytes)")
        
        if len(blob_list) > 10:
            print(f"  ... and {len(blob_list) - 10} more files")

except Exception as e:
    print(f"❌ Error: {e}")
    print(f"Error type: {type(e).__name__}")


In [None]:
# Convert types and clean data
df_transaction = df_transaction[['transaction_id', 'customer_id', 'product_id', 'store_id', 'quantity', 'transaction_date']].copy()
df_transaction['transaction_id'] = df_transaction['transaction_id'].astype('int')
df_transaction['customer_id'] = df_transaction['customer_id'].astype('int')
df_transaction['product_id'] = df_transaction['product_id'].astype('int')
df_transaction['store_id'] = df_transaction['store_id'].astype('int')
df_transaction['quantity'] = df_transaction['quantity'].astype('int')
df_transaction['transaction_date'] = pd.to_datetime(df_transaction['transaction_date']).dt.date

products_df = products_df[['product_id', 'product_name', 'category', 'price']].copy()
products_df['product_id'] = products_df['product_id'].astype('int')
products_df['price'] = products_df['price'].astype('float')

df_store = df_store[['store_id', 'store_name', 'location']].copy()
df_store['store_id'] = df_store['store_id'].astype('int')

customers_df = customers_df[['customer_id', 'first_name', 'last_name', 'email', 'city', 'registration_date']].drop_duplicates(subset=['customer_id'])

In [None]:
# Join all data
df_silver = df_transaction \
    .merge(customers_df, on="customer_id") \
    .merge(products_df, on="product_id") \
    .merge(df_store, on="store_id")

df_silver['total_amount'] = df_silver['quantity'] * df_silver['price']

In [None]:
# Save to ADLS silver layer
silver_blob_name = "silver/cleaned_transactions/cleaned_transactions.parquet"

try:
    print(f"💾 Saving cleaned data to silver layer...")
    
    # Convert DataFrame to parquet bytes
    parquet_buffer = io.BytesIO()
    df_silver.to_parquet(parquet_buffer, index=False)
    parquet_buffer.seek(0)
    
    # Upload to blob storage
    blob_client = container_client.get_blob_client(silver_blob_name)
    blob_client.upload_blob(parquet_buffer.getvalue(), overwrite=True)
    
    print(f"✅ Successfully saved to {silver_blob_name}")
    print(f"   Records saved: {df_silver.shape[0]}")
    
except Exception as e:
    print(f"❌ Error saving to silver layer: {e}")  

In [None]:
# Read the silver dataset we just created
silver_blob_name = "silver/cleaned_transactions/cleaned_transactions.parquet"

try:
    print(f"📖 Reading silver dataset...")
    
    # Read the silver layer data
    blob_client = container_client.get_blob_client(silver_blob_name)
    blob_data = blob_client.download_blob().readall()
    
    # Create the silver dataset DataFrame
    retail_silver_cleaned = pd.read_parquet(io.BytesIO(blob_data))
    
    print(f"✅ Successfully loaded silver dataset")
    print(f"   Shape: {retail_silver_cleaned.shape}")
    print(f"   Columns: {list(retail_silver_cleaned.columns)}")
    print("\nFirst 5 rows:")
    print(retail_silver_cleaned.head())
    
except Exception as e:
    print(f"❌ Error reading silver dataset: {e}")

In [None]:
# Read all data from retail_silver_cleaned 
print("📊 Displaying all data from retail_silver_cleaned:")
print("=" * 60)

# Show basic info
print(f"Total Records: {retail_silver_cleaned.shape[0]}")
print(f"Total Columns: {retail_silver_cleaned.shape[1]}")
print(f"Columns: {list(retail_silver_cleaned.columns)}")

print("\n📋 All Data:")
print(retail_silver_cleaned)

# Optional: Show data types and info
print(f"\n📈 Data Types:")
print(retail_silver_cleaned.dtypes)

print(f"\n📊 Data Info:")
print(retail_silver_cleaned.info())

In [None]:
# Load cleaned transactions from Silver layer 
silver_df = retail_silver_cleaned.copy()

print("📊 Loaded cleaned transactions from Silver layer:")
print("=" * 50)
print(f"Shape: {silver_df.shape}")
print(f"Columns: {list(silver_df.columns)}")
print("\nFirst 5 rows:")
print(silver_df.head())