# 03 – Raw Data Storage

This notebook implements data storage strategies including:

1. **Data Compression**
   - Efficient storage formats (Parquet, compressed CSV)
   - Compression ratio analysis
   - Storage cost optimization

2. **Data Partitioning**
   - Time-based partitioning
   - Feature-based partitioning
   - Optimization for query patterns

3. **Data Catalog**
   - Automated metadata collection
   - Schema versioning
   - Data lineage tracking

4. **Storage Analytics**
   - Space usage monitoring
   - Access patterns analysis
   - Storage optimization recommendations

In [15]:
import pandas as pd
import numpy as np
import os
import sys
import json
import pyarrow as pa
import pyarrow.parquet as pq
from datetime import datetime
import gzip
import shutil
from pathlib import Path
import matplotlib.pyplot as plt
import seaborn as sns

# Add project root to path
sys.path.insert(0, os.path.abspath('..'))

# Set up basic logging
import logging
logging.basicConfig(level=logging.INFO)

# Raw Data Storage Design

The raw layer stores ingested data in a structured but minimally transformed form.  To
support efficient ingestion and downstream processing the following
layout conventions are used:

* **Source partitioning** – Data is organised first by its origin.  There
  are separate top‑level folders for each source type, e.g. `source_csv` and
  `source_api`.  Additional sources (e.g. database dumps, SFTP imports)
  would appear as additional top‑level partitions.
* **Date partitioning** – Within each source folder data is stored in
  subdirectories labelled with the extraction date in `YYYYMMDD` format.
  This makes it easy to discover and process data from a particular
  ingestion run.
* **File naming** – Files under `source_csv/<date>/` retain their original
  names (`customers.csv`, `transactions.csv`, …).  Files under
  `source_api/<date>/` similarly retain their source names (e.g.
  `web_logs.jsonl`).  When ingested into the raw layer a timestamped
  prefix `ingested_<timestamp>_` is prepended to prevent name
  collisions across runs.

Example directory tree:

```
data/
  raw/
    source_csv/
      20250821/
        customers.csv
        transactions.csv
    source_api/
      20250821/
        web_logs.jsonl
    ingested_20250821_153000_customers.csv
    ingested_20250821_153000_transactions.csv
    ingested_20250821_153000_web_logs.jsonl
  logs/
    ingestion/
  validated/
    20250821/
  prepared/
  transformed/
```

Although this assignment uses a local filesystem, the same structure
would apply to cloud object stores (e.g., AWS S3 or GCS) with
appropriate bucket prefixes (e.g., `s3://company-data/raw/source_csv/…`).


In [16]:
import os
from pprint import pprint
raw_root = os.path.join('..', 'data', 'raw')
print('Raw directory contents:')
pprint(os.listdir(raw_root))

Raw directory contents:
['compressed',
 'ingested_20250821_185705_customers.csv',
 'ingested_20250821_185705_transactions.csv',
 'ingested_20250821_185705_web_logs.jsonl',
 'ingested_20250821_185841_customers.csv',
 'ingested_20250821_185841_transactions.csv',
 'ingested_20250821_185841_web_logs.jsonl',
 'ingested_20250824_142601_telco_train.csv',
 'ingested_20250824_142601_transactions.csv',
 'ingested_20250824_142601_web_logs.jsonl',
 'ingested_20250824_142608_telco_train.csv',
 'ingested_20250824_142608_transactions.csv',
 'ingested_20250824_142608_web_logs.jsonl',
 'ingested_20250824_142727_customers.csv',
 'ingested_20250824_142727_telco_train.csv',
 'ingested_20250824_142727_transactions.csv',
 'ingested_20250824_142727_web_logs.jsonl',
 'ingested_20250824_143055_customers.csv',
 'ingested_20250824_143055_transactions.csv',
 'ingested_20250824_143055_web_logs.jsonl',
 'partitioned',
 'source_api',
 'source_csv']


In [17]:
def compress_csv(input_path, output_path=None, compression_type='gzip'):
    """Compress CSV file and analyze compression ratio."""
    if output_path is None:
        output_path = input_path + '.gz'
    
    original_size = os.path.getsize(input_path)
    
    if compression_type == 'gzip':
        with open(input_path, 'rb') as f_in:
            with gzip.open(output_path, 'wb') as f_out:
                shutil.copyfileobj(f_in, f_out)
    
    compressed_size = os.path.getsize(output_path)
    compression_ratio = original_size / compressed_size
    
    return {
        'original_size': original_size,
        'compressed_size': compressed_size,
        'compression_ratio': compression_ratio,
        'output_path': output_path
    }

def convert_to_parquet(csv_path, output_path=None):
    """Convert CSV to Parquet format with optimization."""
    if output_path is None:
        output_path = csv_path.replace('.csv', '.parquet')
    
    # Read CSV
    df = pd.read_csv(csv_path)
    
    # Optimize categorical columns
    categorical_columns = df.select_dtypes(include=['object']).columns
    for col in categorical_columns:
        df[col] = pd.Categorical(df[col])
    
    # Convert to parquet with compression
    table = pa.Table.from_pandas(df)
    pq.write_table(table, output_path, compression='snappy')
    
    result = {
        'original_size': os.path.getsize(csv_path),
        'parquet_size': os.path.getsize(output_path),
        'compression_ratio': os.path.getsize(csv_path) / os.path.getsize(output_path)
    }
    
    return result

In [18]:
def create_partitioned_structure(df, partition_cols, base_path):
    """Create a partitioned data structure based on specified columns."""
    
    def get_partition_path(partition_values):
        """Generate partition path for given partition values."""
        parts = []
        if not isinstance(partition_values, tuple):
            partition_values = (partition_values,)
        for col, val in zip(partition_cols, partition_values):
            # Clean partition value
            val = str(val).replace('/', '_').replace('\\', '_')
            parts.append(f"{col}={val}")
        return os.path.join(*parts)
    
    # Ensure the base path exists
    os.makedirs(base_path, exist_ok=True)
    
    # Group and save data
    for partition_values, partition_df in df.groupby(partition_cols):
        # Create partition path
        partition_path = get_partition_path(partition_values)
        full_path = os.path.join(base_path, partition_path)
        os.makedirs(full_path, exist_ok=True)
        
        # Convert categorical columns to strings to avoid pyarrow extension type issues
        for col in partition_df.select_dtypes(include=['category']):
            partition_df[col] = partition_df[col].astype(str)
        
        # Save using pyarrow directly
        table = pa.Table.from_pandas(partition_df)
        pq.write_table(table, os.path.join(full_path, 'data.parquet'))
    
    return base_path

def analyze_partition_distribution(base_path):
    """Analyze the distribution of data across partitions."""
    partition_sizes = []
    
    for root, dirs, files in os.walk(base_path):
        for file in files:
            if file == 'data.parquet':
                file_path = os.path.join(root, file)
                size = os.path.getsize(file_path)
                relative_path = os.path.relpath(root, base_path)
                
                # Read parquet file using pyarrow directly
                parquet_file = pq.ParquetFile(file_path)
                num_rows = parquet_file.metadata.num_rows
                
                partition_sizes.append({
                    'partition': relative_path,
                    'size': size,
                    'records': num_rows
                })
    
    return pd.DataFrame(partition_sizes)

In [19]:
class DataCatalog:
    def __init__(self, catalog_path):
        self.catalog_path = catalog_path
        self.catalog = self._load_catalog()
    
    def _load_catalog(self):
        """Load existing catalog or create new one."""
        if os.path.exists(self.catalog_path):
            with open(self.catalog_path, 'r') as f:
                return json.load(f)
        return {'datasets': {}}
    
    def _save_catalog(self):
        """Save catalog to disk."""
        os.makedirs(os.path.dirname(self.catalog_path), exist_ok=True)
        with open(self.catalog_path, 'w') as f:
            json.dump(self.catalog, f, indent=2)
    
    def register_dataset(self, dataset_name, file_path, metadata=None):
        """Register a new dataset with metadata."""
        # Get file stats
        stats = os.stat(file_path)
        
        # Read sample of data for schema inference
        df_sample = None
        if file_path.endswith('.csv'):
            df_sample = pd.read_csv(file_path, nrows=1000)
        elif file_path.endswith('.parquet'):
            df_sample = pd.read_parquet(file_path)
        
        # Build metadata
        dataset_metadata = {
            'file_path': file_path,
            'size_bytes': stats.st_size,
            'created_at': datetime.fromtimestamp(stats.st_ctime).isoformat(),
            'modified_at': datetime.fromtimestamp(stats.st_mtime).isoformat(),
            'schema': {
                'columns': list(df_sample.columns),
                'dtypes': df_sample.dtypes.astype(str).to_dict()
            },
            'format': os.path.splitext(file_path)[1][1:],
            'row_count': len(df_sample) if df_sample is not None else None,
        }
        
        if metadata:
            dataset_metadata.update(metadata)
        
        self.catalog['datasets'][dataset_name] = dataset_metadata
        self._save_catalog()
    
    def get_dataset_info(self, dataset_name):
        """Get information about a registered dataset."""
        return self.catalog['datasets'].get(dataset_name)
    
    def list_datasets(self):
        """List all registered datasets."""
        return list(self.catalog['datasets'].keys())
    
    def generate_report(self):
        """Generate a summary report of the data catalog."""
        report = {
            'total_datasets': len(self.catalog['datasets']),
            'total_size_bytes': sum(d['size_bytes'] for d in self.catalog['datasets'].values()),
            'formats': {},
            'created_dates': [],
        }
        
        for dataset in self.catalog['datasets'].values():
            fmt = dataset['format']
            report['formats'][fmt] = report['formats'].get(fmt, 0) + 1
            report['created_dates'].append(dataset['created_at'][:10])  # Just the date part
        
        return report

In [20]:
# Initialize paths
raw_root = os.path.join('..', 'data', 'raw')
catalog_path = os.path.join(raw_root, 'catalog.json')
compressed_root = os.path.join(raw_root, 'compressed')
partitioned_root = os.path.join(raw_root, 'partitioned')

# Create directories
os.makedirs(compressed_root, exist_ok=True)
os.makedirs(partitioned_root, exist_ok=True)

# Initialize data catalog
catalog = DataCatalog(catalog_path)

# Process existing raw data
print("Processing raw data files...")
for filename in os.listdir(raw_root):
    if not filename.endswith('.csv'):
        continue
    
    input_path = os.path.join(raw_root, filename)
    print(f"\nProcessing {filename}:")
    
    # 1. Compress data
    print("Compressing data...")
    compressed_results = compress_csv(input_path, 
                                   os.path.join(compressed_root, f"{filename}.gz"))
    print(f"Compression ratio: {compressed_results['compression_ratio']:.2f}x")
    
    # 2. Convert to Parquet
    print("Converting to Parquet...")
    parquet_results = convert_to_parquet(input_path, 
                                       os.path.join(compressed_root, filename.replace('.csv', '.parquet')))
    print(f"Parquet compression ratio: {parquet_results['compression_ratio']:.2f}x")
    
    # 3. Create partitioned structure for customer data
    if 'customer' in filename.lower():
        print("Creating partitioned structure...")
        df = pd.read_csv(input_path)
        partition_path = os.path.join(partitioned_root, 'customers')
        create_partitioned_structure(df, ['contract', 'internet_service'], partition_path)
        
        # Analyze partition distribution
        partition_stats = analyze_partition_distribution(partition_path)
        print("\nPartition Distribution:")
        print(partition_stats)
    
    # 4. Register in catalog
    print("Registering in catalog...")
    catalog.register_dataset(
        dataset_name=filename,
        file_path=input_path,
        metadata={
            'compressed_path': compressed_results['output_path'],
            'parquet_path': parquet_results['output_path'] if 'parquet_path' in locals() else None,
            'compression_stats': {
                'original_size': compressed_results['original_size'],
                'compressed_size': compressed_results['compressed_size'],
                'compression_ratio': compressed_results['compression_ratio']
            }
        }
    )

# Generate and display catalog report
print("\nData Catalog Report:")
print("-" * 50)
report = catalog.generate_report()
print(f"Total Datasets: {report['total_datasets']}")
print(f"Total Size: {report['total_size_bytes'] / 1024 / 1024:.2f} MB")
print("\nFile Formats:")
for fmt, count in report['formats'].items():
    print(f"- {fmt}: {count} files")

Processing raw data files...

Processing ingested_20250821_185705_customers.csv:
Compressing data...
Compression ratio: 4.52x
Converting to Parquet...
Parquet compression ratio: 1.74x
Creating partitioned structure...

Partition Distribution:
                                           partition   size  records
0       contract=Month-to-month\internet_service=DSL  11786      129
1  contract=Month-to-month\internet_service=Fiber...  12174      144
2        contract=Month-to-month\internet_service=No   9680       52
3             contract=One year\internet_service=DSL   9757       56
4     contract=One year\internet_service=Fiber optic   9701       50
5              contract=One year\internet_service=No   8603       15
6             contract=Two year\internet_service=DSL   8882       24
7     contract=Two year\internet_service=Fiber optic   8835       21
8              contract=Two year\internet_service=No   8436        9
Registering in catalog...

Processing ingested_20250821_185705_tran