# Cloud Storage Integration Example

This notebook demonstrates how to use Google Cloud Storage with the `colab_env` environment for data storage and retrieval.

## Prerequisites
1. Environment activated: `mamba activate colab_env`
2. Google Cloud authentication completed
3. PROJECT_ID environment variable set
4. At least one Cloud Storage bucket created

## Setup Instructions
```bash
# In terminal:
export PROJECT_ID="your-project-id"
gcloud auth application-default login
gsutil mb gs://your-unique-bucket-name/
```

## Setup and Authentication

In [None]:
import os
import pandas as pd
import numpy as np
from google.cloud import storage
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import json
import io

# Set up project ID
PROJECT_ID = os.environ.get('PROJECT_ID')
if not PROJECT_ID:
    PROJECT_ID = input("Enter your Google Cloud Project ID: ")

print(f"Using project: {PROJECT_ID}")

# Initialize Cloud Storage client
client = storage.Client(project=PROJECT_ID)
print("Cloud Storage client initialized successfully")

## Bucket Operations

In [None]:
# List all buckets in your project
print("Available buckets:")
buckets = []
try:
    for bucket in client.list_buckets():
        buckets.append(bucket.name)
        print(f"  - {bucket.name} (created: {bucket.time_created.strftime('%Y-%m-%d')})")
    
    if not buckets:
        print("  No buckets found. You may need to create one first.")
        print("  Use: gsutil mb gs://your-unique-bucket-name/")
except Exception as e:
    print(f"Error listing buckets: {e}")
    buckets = []

In [None]:
# Select or create a bucket for this example
if buckets:
    BUCKET_NAME = buckets[0]  # Use first available bucket
    print(f"Using existing bucket: {BUCKET_NAME}")
else:
    # Create a new bucket (uncomment and modify as needed)
    BUCKET_NAME = f"{PROJECT_ID}-data-analysis-{datetime.now().strftime('%Y%m%d')}"
    print(f"Creating new bucket: {BUCKET_NAME}")
    
    # Uncomment to create bucket:
    # try:
    #     bucket = client.create_bucket(BUCKET_NAME, location="US")
    #     print(f"Bucket {bucket.name} created successfully")
    # except Exception as e:
    #     print(f"Error creating bucket: {e}")
    #     BUCKET_NAME = input("Enter an existing bucket name: ")

bucket = client.bucket(BUCKET_NAME)
print(f"Working with bucket: {BUCKET_NAME}")

## Creating Sample Data

In [None]:
# Create sample financial data for demonstration
np.random.seed(42)

# Sample stock data
dates = pd.date_range(start='2023-01-01', end='2024-01-01', freq='D')
symbols = ['AAPL', 'GOOGL', 'MSFT', 'TSLA', 'AMZN']

stock_data = []
for symbol in symbols:
    base_price = np.random.uniform(100, 300)
    prices = []
    current_price = base_price
    
    for date in dates:
        # Simulate price movement
        daily_change = np.random.normal(0, 0.02)  # 2% daily volatility
        current_price *= (1 + daily_change)
        prices.append(current_price)
        
        stock_data.append({
            'date': date,
            'symbol': symbol,
            'price': current_price,
            'volume': np.random.randint(1000000, 50000000),
            'market_cap': current_price * np.random.uniform(1e9, 3e12)
        })

df_stocks = pd.DataFrame(stock_data)
print(f"Created sample dataset with {len(df_stocks)} rows")
print("\nSample data:")
df_stocks.head(10)

## Uploading Data to Cloud Storage

In [None]:
# Method 1: Upload CSV file
csv_data = df_stocks.to_csv(index=False)
blob_csv = bucket.blob('data/stock_data.csv')

try:
    blob_csv.upload_from_string(csv_data, content_type='text/csv')
    print(f"✅ CSV uploaded to gs://{BUCKET_NAME}/data/stock_data.csv")
    print(f"   Size: {len(csv_data):,} bytes")
except Exception as e:
    print(f"❌ Error uploading CSV: {e}")

In [None]:
# Method 2: Upload Parquet file (more efficient for large datasets)
parquet_buffer = io.BytesIO()
df_stocks.to_parquet(parquet_buffer, index=False)
parquet_data = parquet_buffer.getvalue()

blob_parquet = bucket.blob('data/stock_data.parquet')

try:
    blob_parquet.upload_from_string(parquet_data, content_type='application/octet-stream')
    print(f"✅ Parquet uploaded to gs://{BUCKET_NAME}/data/stock_data.parquet")
    print(f"   Size: {len(parquet_data):,} bytes")
    print(f"   Compression ratio: {len(csv_data) / len(parquet_data):.1f}x smaller")
except Exception as e:
    print(f"❌ Error uploading Parquet: {e}")

In [None]:
# Method 3: Upload JSON metadata
metadata = {
    'dataset_name': 'sample_stock_data',
    'created_date': datetime.now().isoformat(),
    'symbols': symbols,
    'date_range': {
        'start': dates[0].isoformat(),
        'end': dates[-1].isoformat()
    },
    'record_count': len(df_stocks),
    'columns': list(df_stocks.columns)
}

blob_metadata = bucket.blob('metadata/stock_data_metadata.json')

try:
    blob_metadata.upload_from_string(
        json.dumps(metadata, indent=2), 
        content_type='application/json'
    )
    print(f"✅ Metadata uploaded to gs://{BUCKET_NAME}/metadata/stock_data_metadata.json")
except Exception as e:
    print(f"❌ Error uploading metadata: {e}")

## Listing and Managing Files

In [None]:
# List all files in the bucket
print(f"Files in bucket '{BUCKET_NAME}':")
print("-" * 60)

try:
    blobs = list(bucket.list_blobs())
    
    if blobs:
        for blob in blobs:
            size_mb = blob.size / (1024 * 1024) if blob.size else 0
            print(f"📄 {blob.name}")
            print(f"   Size: {size_mb:.2f} MB")
            print(f"   Created: {blob.time_created.strftime('%Y-%m-%d %H:%M:%S')}")
            print(f"   Content-Type: {blob.content_type}")
            print()
    else:
        print("No files found in bucket")
        
except Exception as e:
    print(f"Error listing files: {e}")

## Downloading and Reading Data

In [None]:
# Method 1: Read CSV directly from Cloud Storage
print("Reading CSV from Cloud Storage...")
try:
    df_from_csv = pd.read_csv(f'gs://{BUCKET_NAME}/data/stock_data.csv')
    print(f"✅ Successfully loaded {len(df_from_csv)} rows from CSV")
    print("First few rows:")
    display(df_from_csv.head())
except Exception as e:
    print(f"❌ Error reading CSV: {e}")

In [None]:
# Method 2: Read Parquet (faster for large files)
print("Reading Parquet from Cloud Storage...")
try:
    df_from_parquet = pd.read_parquet(f'gs://{BUCKET_NAME}/data/stock_data.parquet')
    print(f"✅ Successfully loaded {len(df_from_parquet)} rows from Parquet")
    
    # Verify data integrity
    if df_from_parquet.equals(df_from_csv):
        print("✅ Data integrity verified - CSV and Parquet match")
    else:
        print("⚠️ Warning: CSV and Parquet data don't match")
        
except Exception as e:
    print(f"❌ Error reading Parquet: {e}")

In [None]:
# Method 3: Download and read metadata
print("Reading metadata from Cloud Storage...")
try:
    blob_metadata = bucket.blob('metadata/stock_data_metadata.json')
    metadata_content = blob_metadata.download_as_text()
    metadata_loaded = json.loads(metadata_content)
    
    print("✅ Metadata loaded successfully:")
    for key, value in metadata_loaded.items():
        print(f"  {key}: {value}")
        
except Exception as e:
    print(f"❌ Error reading metadata: {e}")

## Data Analysis with Cloud Storage Data

In [None]:
# Perform analysis on the data loaded from Cloud Storage
df = df_from_parquet.copy()
df['date'] = pd.to_datetime(df['date'])

# Calculate returns
df = df.sort_values(['symbol', 'date'])
df['daily_return'] = df.groupby('symbol')['price'].pct_change()

# Summary statistics
print("Stock Performance Summary:")
print("=" * 50)

summary = df.groupby('symbol').agg({
    'price': ['first', 'last', 'min', 'max'],
    'daily_return': ['mean', 'std'],
    'volume': 'mean'
}).round(4)

summary.columns = ['Start_Price', 'End_Price', 'Min_Price', 'Max_Price', 
                  'Avg_Return', 'Volatility', 'Avg_Volume']

# Calculate total return
summary['Total_Return'] = ((summary['End_Price'] / summary['Start_Price']) - 1) * 100

display(summary)

In [None]:
# Visualize the data
plt.figure(figsize=(15, 10))

# Price trends
plt.subplot(2, 2, 1)
for symbol in symbols:
    symbol_data = df[df['symbol'] == symbol]
    plt.plot(symbol_data['date'], symbol_data['price'], label=symbol, linewidth=2)

plt.title('Stock Price Trends')
plt.xlabel('Date')
plt.ylabel('Price ($)')
plt.legend()
plt.grid(True, alpha=0.3)

# Returns distribution
plt.subplot(2, 2, 2)
for symbol in symbols:
    symbol_returns = df[df['symbol'] == symbol]['daily_return'].dropna()
    plt.hist(symbol_returns, alpha=0.6, bins=30, label=symbol)

plt.title('Daily Returns Distribution')
plt.xlabel('Daily Return')
plt.ylabel('Frequency')
plt.legend()
plt.grid(True, alpha=0.3)

# Volatility comparison
plt.subplot(2, 2, 3)
volatility_data = summary['Volatility'].sort_values(ascending=True)
colors = plt.cm.viridis(np.linspace(0, 1, len(volatility_data)))
bars = plt.barh(volatility_data.index, volatility_data.values, color=colors)
plt.title('Volatility Comparison')
plt.xlabel('Daily Return Standard Deviation')

# Risk vs Return scatter
plt.subplot(2, 2, 4)
plt.scatter(summary['Volatility'], summary['Total_Return'], 
           s=100, alpha=0.7, c=range(len(summary)), cmap='viridis')

for i, symbol in enumerate(summary.index):
    plt.annotate(symbol, 
                (summary.loc[symbol, 'Volatility'], summary.loc[symbol, 'Total_Return']),
                xytext=(5, 5), textcoords='offset points')

plt.title('Risk vs Return')
plt.xlabel('Volatility (Risk)')
plt.ylabel('Total Return (%)')
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## Saving Analysis Results

In [None]:
# Save analysis results back to Cloud Storage
results = {
    'analysis_date': datetime.now().isoformat(),
    'summary_statistics': summary.to_dict(),
    'best_performer': summary['Total_Return'].idxmax(),
    'worst_performer': summary['Total_Return'].idxmin(),
    'highest_volatility': summary['Volatility'].idxmax(),
    'lowest_volatility': summary['Volatility'].idxmin()
}

# Upload results
try:
    blob_results = bucket.blob(f'results/analysis_results_{datetime.now().strftime("%Y%m%d_%H%M")}.json')
    blob_results.upload_from_string(
        json.dumps(results, indent=2, default=str), 
        content_type='application/json'
    )
    print(f"✅ Analysis results saved to gs://{BUCKET_NAME}/{blob_results.name}")
except Exception as e:
    print(f"❌ Error saving results: {e}")

# Save the plot
plt.figure(figsize=(10, 6))
summary['Total_Return'].plot(kind='bar', color='skyblue')
plt.title('Total Returns by Stock')
plt.ylabel('Return (%)')
plt.xticks(rotation=45)
plt.grid(True, alpha=0.3)
plt.tight_layout()

# Save plot to buffer and upload
try:
    img_buffer = io.BytesIO()
    plt.savefig(img_buffer, format='png', dpi=300, bbox_inches='tight')
    img_data = img_buffer.getvalue()
    
    blob_plot = bucket.blob(f'plots/returns_chart_{datetime.now().strftime("%Y%m%d_%H%M")}.png')
    blob_plot.upload_from_string(img_data, content_type='image/png')
    print(f"✅ Plot saved to gs://{BUCKET_NAME}/{blob_plot.name}")
    
    plt.show()
except Exception as e:
    print(f"❌ Error saving plot: {e}")

## Advanced Cloud Storage Features

In [None]:
# Set lifecycle management (example)
print("Bucket information and settings:")
print(f"Bucket name: {bucket.name}")
print(f"Location: {bucket.location}")
print(f"Storage class: {bucket.storage_class}")
print(f"Creation time: {bucket.time_created}")

# Check bucket size
total_size = sum(blob.size for blob in bucket.list_blobs() if blob.size)
print(f"Total bucket size: {total_size / (1024*1024):.2f} MB")

In [None]:
# Generate signed URLs for sharing (optional)
from datetime import timedelta

print("Generating signed URLs for file sharing:")
try:
    # Generate URL valid for 1 hour
    blob_csv = bucket.blob('data/stock_data.csv')
    url = blob_csv.generate_signed_url(
        version="v4",
        expiration=timedelta(hours=1),
        method="GET"
    )
    print(f"\n📎 Signed URL for CSV file (valid for 1 hour):")
    print(f"   {url[:100]}...")
    print("   ⚠️ This URL allows temporary access without authentication")
except Exception as e:
    print(f"❌ Error generating signed URL: {e}")

## Cleanup (Optional)

In [None]:
# Uncomment to delete test files (be careful!)
# cleanup = input("Delete test files? (yes/no): ")
# if cleanup.lower() == 'yes':
#     try:
#         blobs_to_delete = ['data/stock_data.csv', 'data/stock_data.parquet', 
#                           'metadata/stock_data_metadata.json']
#         
#         for blob_name in blobs_to_delete:
#             blob = bucket.blob(blob_name)
#             if blob.exists():
#                 blob.delete()
#                 print(f"🗑️ Deleted {blob_name}")
#                 
#         # Delete results and plots folders
#         for blob in bucket.list_blobs(prefix='results/'):
#             blob.delete()
#             print(f"🗑️ Deleted {blob.name}")
#             
#         for blob in bucket.list_blobs(prefix='plots/'):
#             blob.delete()
#             print(f"🗑️ Deleted {blob.name}")
#             
#         print("✅ Cleanup completed")
#     except Exception as e:
#         print(f"❌ Error during cleanup: {e}")
# else:
#     print("Files preserved")

print("\n📁 Current bucket contents:")
for blob in bucket.list_blobs():
    print(f"  📄 {blob.name}")

## Best Practices Summary

### 1. **File Organization**
- Use logical folder structure (`data/`, `results/`, `metadata/`, `plots/`)
- Include timestamps in filenames for versioning
- Use appropriate file formats (Parquet for large datasets, JSON for metadata)

### 2. **Performance**
- Use Parquet format for large datasets (better compression and faster reads)
- Read data directly from GCS URLs when possible
- Consider data partitioning for very large datasets

### 3. **Cost Management**
- Set up lifecycle policies to automatically delete or archive old files
- Use appropriate storage classes (Standard, Nearline, Coldline, Archive)
- Monitor storage costs regularly

### 4. **Security**
- Use IAM roles and permissions appropriately
- Be careful with signed URLs and their expiration times
- Never commit credentials to version control

### 5. **Integration**
- Combine with BigQuery for analysis of large datasets
- Use with Cloud Functions for automated processing
- Integrate with other GCP services as needed

## Next Steps

1. **Explore bucket policies**: Set up lifecycle management and access controls
2. **Automate workflows**: Use Cloud Functions or Cloud Run for scheduled data processing
3. **Scale up**: Work with larger datasets and implement data partitioning
4. **Integrate with BigQuery**: Load data from GCS to BigQuery for advanced analytics
5. **Set monitoring**: Implement logging and monitoring for your data pipelines

## Useful Resources

- [Cloud Storage Documentation](https://cloud.google.com/storage/docs)
- [Cloud Storage Python Client](https://cloud.google.com/storage/docs/reference/libraries#client-libraries-install-python)
- [gsutil Command Reference](https://cloud.google.com/storage/docs/gsutil)
- [Cloud Storage Pricing](https://cloud.google.com/storage/pricing)