In [13]:
import boto3
import pandas as pd
import io
from botocore.exceptions import NoCredentialsError

# MinIO Configuration
minio_endpoint = 'http://localhost:9000'
minio_access_key = 'minioadmin'
minio_secret_key = 'minioadmin'
processed_bucket = 'processed'
analytics_bucket = 'analytics'

# Connect to MinIO
s3_client = boto3.client(
    's3',
    endpoint_url=minio_endpoint,
    aws_access_key_id=minio_access_key,
    aws_secret_access_key=minio_secret_key
)

# Function to List Files in a Bucket
def list_files(bucket_name):
    try:
        print(f"Listing files in bucket: {bucket_name}")
        response = s3_client.list_objects_v2(Bucket=bucket_name)
        if 'Contents' in response:
            files = [file['Key'] for file in response['Contents']]
            print(f"Files found: {files}")
            return files
        else:
            print(f"No files found in bucket: {bucket_name}")
            return []
    except Exception as e:
        print(f"Error listing files: {e}")
        return []

# Function to Read Parquet Files from MinIO
def read_parquet_from_minio(bucket_name, key):
    try:
        print(f"Reading {key} from bucket {bucket_name}")
        response = s3_client.get_object(Bucket=bucket_name, Key=key)
        return pd.read_parquet(io.BytesIO(response['Body'].read()))
    except Exception as e:
        print(f"Error reading {key}: {e}")
        return None

# Function to Write DataFrame to MinIO
def write_parquet_to_minio(df, bucket_name, key):
    try:
        print(f"Writing {key} to bucket {bucket_name}")
        buffer = io.BytesIO()
        df.to_parquet(buffer, index=False)
        buffer.seek(0)
        s3_client.put_object(Bucket=bucket_name, Key=key, Body=buffer.getvalue())
        print(f"Successfully wrote {key} to bucket {bucket_name}")
    except Exception as e:
        print(f"Error writing {key}: {e}")

# Function to Generate Aggregations
def generate_analytics():
    try:
        # List processed files
        processed_files = list_files(processed_bucket)
        if not processed_files:
            print("No processed files available for analytics.")
            return

        # Example 1: Customer Sales Summary
        if "Invoice_cleaned.parquet" in processed_files:
            invoice_data = read_parquet_from_minio(processed_bucket, "Invoice_cleaned.parquet")
            customer_summary = invoice_data.groupby("CustomerId").agg({
                "Total": "sum",  # Total sales
                "InvoiceId": "count"  # Number of invoices
            }).reset_index()
            customer_summary.columns = ["CustomerId", "TotalSales", "InvoiceCount"]
            write_parquet_to_minio(customer_summary, analytics_bucket, "customer_sales_summary.parquet")




        # Example 3: Monthly Sales Trends
        if "Invoice_cleaned.parquet" in processed_files:
            invoice_data = read_parquet_from_minio(processed_bucket, "Invoice_cleaned.parquet")
            invoice_data['InvoiceDate'] = pd.to_datetime(invoice_data['InvoiceDate'])
            invoice_data['YearMonth'] = invoice_data['InvoiceDate'].dt.to_period('M')
            monthly_sales = invoice_data.groupby("YearMonth").agg({"Total": "sum"}).reset_index()
            monthly_sales.columns = ["YearMonth", "TotalSales"]
            write_parquet_to_minio(monthly_sales, analytics_bucket, "monthly_sales_trends.parquet")

        print("Analytics generation completed successfully.")

    except Exception as e:
        print(f"Error generating analytics: {e}")

# Main Workflow
if __name__ == "__main__":
    print("Starting analytics generation...")
    generate_analytics()
    print("Analytics generation completed successfully!")


Starting analytics generation...
Listing files in bucket: processed
Files found: ['Album_cleaned.parquet', 'Artist_cleaned.parquet', 'Customer_cleaned.parquet', 'Employee_cleaned.parquet', 'Genre_cleaned.parquet', 'InvoiceLine_cleaned.parquet', 'Invoice_cleaned.parquet', 'MediaType_cleaned.parquet', 'PlaylistTrack_cleaned.parquet', 'Playlist_cleaned.parquet', 'Track_cleaned.parquet']
Reading Invoice_cleaned.parquet from bucket processed
Writing customer_sales_summary.parquet to bucket analytics
Successfully wrote customer_sales_summary.parquet to bucket analytics
Reading Invoice_cleaned.parquet from bucket processed
Writing monthly_sales_trends.parquet to bucket analytics
Successfully wrote monthly_sales_trends.parquet to bucket analytics
Analytics generation completed successfully.
Analytics generation completed successfully!


In [None]:
# Example: Read and display customer_sales_summary.parquet
customer_data = read_parquet_from_minio(analytics_bucket, "customer_sales_summary.parquet")
print(customer_data.head())

# Example: Read and display monthly_sales_trends.parquet
monthly_data = read_parquet_from_minio(analytics_bucket, "monthly_sales_trends.parquet")
print(monthly_data.head())
