# Module 02 - Data Ingestion and Storage

## Overview

This module covers data ingestion and storage in Databricks. You'll learn how to work with DBFS, integrate with Azure Data Lake Storage Gen2, and read/write various file formats.

## Learning Objectives

By the end of this module, you will understand:
- Working with Databricks File System (DBFS)
- Mounting external storage (ADLS Gen2)
- Reading and writing different file formats (CSV, JSON, Parquet, etc.)
- Best practices for data storage in Databricks
- Working with large datasets efficiently


## Understanding DBFS (Databricks File System)

DBFS is a distributed file system mounted into a Databricks workspace and available on Databricks clusters. It's an abstraction layer over cloud storage that makes it easy to work with files.

### DBFS Structure

- `/` - Root directory
- `/FileStore` - Files uploaded through UI or generated outputs
- `/databricks` - System files and libraries
- `/tmp` - Temporary files
- `/mnt` - Mount points for external storage (ADLS, S3, etc.)

### Key Benefits

1. **Unified Interface**: Same API for local and cloud storage
2. **Performance**: Optimized for Spark workloads
3. **Persistence**: Files persist across cluster restarts
4. **Access Control**: Integrated with Databricks security


In [0]:
# Explore DBFS structure
print("DBFS Root Contents:")
for item in dbutils.fs.ls("/"):
    print(f"  {item.name:20s} - Size: {item.size} bytes, Modified: {item.modificationTime}")


In [0]:
%sql
show volumes

In [0]:
%sql
CREATE VOLUME IF NOT EXISTS databricks_demo

In [0]:
# Create a directory structure for our demo
demo_path = "/Volumes/workspace/default/databricks_demo"

# List contents
print(f"\nContents of {demo_path}:")
for item in dbutils.fs.ls(demo_path):
    print(f"  {item.name}")


In [0]:
# Use your existing Volume path
# Format: /Volumes/<catalog>/<schema>/<volume_name>/<folder>
demo_path = "/Volumes/workspace/default/databricks_demo"

# Create the directory inside the Volume
dbutils.fs.mkdirs(demo_path)
print(f"Created directory in Volume: {demo_path}")

# List contents
print(f"\nContents of {demo_path}:")
for item in dbutils.fs.ls(demo_path):
    print(f"  {item.name}")

## Working with FileStore

FileStore is the default location for files uploaded through the Databricks UI. It's accessible via `/FileStore` in DBFS.


In [0]:
# Create sample data and save to FileStore
import pandas as pd
from pyspark.sql import SparkSession

# Create sample DataFrame
sample_data = {
    'id': range(1, 101),
    'name': [f'User_{i}' for i in range(1, 101)],
    'age': [20 + (i % 40) for i in range(1, 101)],
    'city': ['New York', 'London', 'Tokyo', 'Paris', 'Sydney'] * 20,
    'salary': [50000 + (i * 1000) for i in range(1, 101)]
}

pandas_df = pd.DataFrame(sample_data)
spark_df = spark.createDataFrame(pandas_df)

# Save to FileStore
output_path = "/Volumes/workspace/default/databricks_demo/sample_users.parquet"
spark_df.write.mode("overwrite").parquet(output_path)
print(f"Data saved to: {output_path}")

# Verify the file exists
files = dbutils.fs.ls("/Volumes/workspace/default/databricks_demo")
print(f"\n/Volumes/workspace/default/databricks_demo:")
for file in files:
    if "sample_users" in file.name:
        print(f"  {file.name} - {file.size} bytes")


## Mounting Azure Data Lake Storage Gen2

Mounting ADLS Gen2 allows you to access data as if it were on the local file system. This is the recommended way to access external storage in Databricks.

### Prerequisites for Mounting

1. **Storage Account**: Your ADLS Gen2 storage account name
2. **Container**: The container name in your storage account
3. **Authentication**: Service Principal or Access Key

### Mount Process

1. Configure authentication (Service Principal recommended)
2. Create mount point using dbutils.fs.mount()
3. Access data via `/mnt/mount_name/...`



Add secret scope here:

https://<<youraccount>>.cloud.databricks.com/#secrets/createScope

In [0]:
# 1. Configuration (Replace with your actual values)
storage_account_name = "cetpastorage"
container_name = "practice-dataset"
# client_id = dbutils.secrets.get(scope="adls-scope", key="adls-sp-client-id")
# client_secret = dbutils.secrets.get(scope="adls-scope", key="adls-sp-client-secret")
# tenant_id = dbutils.secrets.get(scope="adls-scope", key="adls-sp-tenant-id")



# 2. Set Spark Configuration directly for this session
spark.conf.set(f"fs.azure.account.auth.type.{storage_account_name}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account_name}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account_name}.dfs.core.windows.net", client_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account_name}.dfs.core.windows.net", client_secret)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account_name}.dfs.core.windows.net", f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

# 3. Access the data directly using the ABFSS path
path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/"

print("Listing files in ADLS Gen2:")
display(dbutils.fs.ls(path))

In [0]:
storage_account_name = "cetpastorage"
container_name = "practice-dataset"



# Define the connection options as a dictionary
adls_options = {
  "fs.azure.account.auth.type": "OAuth",
  "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
  "fs.azure.account.oauth2.client.id": client_id,
  "fs.azure.account.oauth2.client.secret": client_secret,
  "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"
}

# The Direct Path
path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/"

# 2026 Recommended Method: Passing options directly to the reader
# Replace 'csv' with 'parquet' or 'json' as needed
try:
    df = (spark.read
          .format("csv") 
          .options(**adls_options) # This injects your credentials for THIS read only
          .load(path + "your_file.csv"))
    
    display(df)
except Exception as e:
    print(f"Connection failed: {e}")

In [0]:
# Method 1: Mounting ADLS Gen2 using Service Principal (Recommended)
# This is a template - replace with your actual values

# Configuration
storage_account_name = "cetpastorage"
container_name = "practice-dataset"
mount_point = "/mnt/adls_demo"

# Get secrets from Databricks Secret Scope
# Make sure to create a secret scope with these keys:
# - adls-sp-client-id
# - adls-sp-client-secret
# - adls-sp-tenant-id

try:
    # Check if already mounted
    if any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
        print(f"{mount_point} is already mounted")
    else:
        # Get credentials from secret scope
        
        
        # Configure Spark
        configs = {
            "fs.azure.account.auth.type": "OAuth",
            "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
            "fs.azure.account.oauth2.client.id": client_id,
            "fs.azure.account.oauth2.client.secret": client_secret,
            "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"
        }
        
        # Mount the storage
        source = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/"
        dbutils.fs.mount(
            source=source,
            mount_point=mount_point,
            extra_configs=configs
        )
        print(f"Successfully mounted {source} to {mount_point}")
except Exception as e:
    print(f"Mounting failed (this is expected if credentials are not configured): {e}")
    print("\nTo mount ADLS Gen2, you need to:")
    print("1. Create a Databricks Secret Scope")
    print("2. Add Service Principal credentials to the scope")
    print("3. Update the storage_account_name and container_name variables")
    print("4. Run this cell again")


In [0]:
# Method 2: Mounting using Access Key (Alternative method)
# Note: Access keys are less secure than Service Principal

storage_account_name = "YOUR_STORAGE_ACCOUNT_NAME"
container_name = "YOUR_CONTAINER_NAME"
mount_point = "/mnt/adls_access_key"

try:
    # Get access key from secret scope
    access_key = dbutils.secrets.get(scope="adls-scope", key="adls-access-key")
    
    configs = {
        "fs.azure.account.key." + storage_account_name + ".dfs.core.windows.net": access_key
    }
    
    source = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/"
    
    if any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
        print(f"{mount_point} is already mounted")
    else:
        dbutils.fs.mount(
            source=source,
            mount_point=mount_point,
            extra_configs=configs
        )
        print(f"Successfully mounted using access key")
except Exception as e:
    print(f"Mounting with access key failed: {e}")
    print("This is expected if credentials are not configured")


In [0]:
# List all current mounts
print("Current Mount Points:")
mounts = dbutils.fs.mounts()
for mount in mounts:
    print(f"  {mount.mountPoint:30s} -> {mount.source}")


## Direct Access to ADLS Gen2 (Without Mounting)

You can also access ADLS Gen2 directly without mounting. This is useful for one-time access or when you don't want to maintain mount points.

### Direct Access Methods

1. **Using abfss:// protocol** - Azure Blob File System Secure
2. **Using spark.conf** - Configure authentication at Spark level
3. **Using dbutils.secrets** - Secure credential management


In [0]:
# Direct access to ADLS Gen2 using Service Principal
# Configure at SparkSession level

storage_account_name = "YOUR_STORAGE_ACCOUNT_NAME"
container_name = "YOUR_CONTAINER_NAME"

# Get credentials from secret scope
try:
    
    
    # Configure Spark for direct access
    spark.conf.set(f"fs.azure.account.auth.type.{storage_account_name}.dfs.core.windows.net", "OAuth")
    spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account_name}.dfs.core.windows.net", 
                  "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account_name}.dfs.core.windows.net", client_id)
    spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account_name}.dfs.core.windows.net", client_secret)
    spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account_name}.dfs.core.windows.net",
                  f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")
    
    # Now you can read directly
    direct_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/path/to/your/file.parquet"
    print(f"Configured for direct access. Use path: {direct_path}")
    print("Example: df = spark.read.parquet(direct_path)")
except Exception as e:
    print(f"Configuration failed: {e}")
    print("This is expected if credentials are not configured")


## Reading Different File Formats

Databricks supports reading various file formats. Let's explore the most common ones.


In [0]:
# Create sample data for demonstrations
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType
from datetime import datetime

# Sample data
data = [
    ("2024-01-01", "Product A", 100.0, 10),
    ("2024-01-02", "Product B", 150.0, 15),
    ("2024-01-03", "Product A", 120.0, 12),
    ("2024-01-04", "Product C", 200.0, 20),
    ("2024-01-05", "Product B", 180.0, 18),
]

schema = StructType([
    StructField("date", StringType(), True),
    StructField("product", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("quantity", IntegerType(), True)
])

df = spark.createDataFrame(data, schema)
print("Sample DataFrame created:")
df.show()


### 1. Reading and Writing CSV Files


In [0]:
# Write CSV
csv_path = "/Volumes/workspace/default/databricks_demo/sales_data.csv"
df.write.mode("overwrite").option("header", "true").csv(csv_path)
print(f"CSV written to: {csv_path}")

# Read CSV
df_csv = spark.read.option("header", "true").csv(csv_path)
print("\nReading CSV:")
df_csv.show()

# CSV with custom options
df.write.mode("overwrite") \
    .option("header", "true") \
    .option("delimiter", ",") \
    .option("inferSchema", "true") \
    .csv(f"{csv_path}_custom")


### 2. Reading and Writing JSON Files


In [0]:
# Write JSON
json_path = "/Volumes/workspace/default/databricks_demo/sales_data.json"
df.write.mode("overwrite").json(json_path)
print(f"JSON written to: {json_path}")

# Read JSON
df_json = spark.read.json(json_path)
print("\nReading JSON:")
df_json.show()

# JSON with multiline option (for pretty-printed JSON)
df.write.mode("overwrite").option("multiline", "true").json(f"{json_path}_multiline")


### 3. Reading and Writing Parquet Files (Recommended)

Parquet is the recommended format for data engineering in Databricks because:
- **Columnar storage**: Efficient for analytics queries
- **Compression**: Reduces storage costs
- **Schema evolution**: Supports schema changes
- **Performance**: Optimized for Spark


In [0]:
# Write Parquet
parquet_path = "/Volumes/workspace/default/databricks_demo/sales_data.parquet"
df.write.mode("overwrite").parquet(parquet_path)
print(f"Parquet written to: {parquet_path}")

# Read Parquet
df_parquet = spark.read.parquet(parquet_path)
print("\nReading Parquet:")
df_parquet.show()

# Parquet with partitioning
partitioned_path = "/Volumes/workspace/default/databricks_demo/sales_data_partitioned"
df.write.mode("overwrite").partitionBy("product").parquet(partitioned_path)
print(f"\nPartitioned Parquet written to: {partitioned_path}")

# Read partitioned data
df_partitioned = spark.read.parquet(partitioned_path)
print("\nReading partitioned Parquet:")
df_partitioned.show()


### 4. Reading and Writing Delta Tables

Delta Lake is Databricks' optimized storage layer that provides ACID transactions, time travel, and schema enforcement. We'll cover this in detail in Day 4, but here's a quick example.


In [0]:
# Write Delta table
delta_path = "/Volumes/workspace/default/databricks_demo/sales_data_delta"
df.write.format("delta").mode("overwrite").save(delta_path)
print(f"Delta table written to: {delta_path}")

# Read Delta table
df_delta = spark.read.format("delta").load(delta_path)
print("\nReading Delta table:")
df_delta.show()

# Delta table with partitioning
df.write.format("delta").mode("overwrite").partitionBy("product").save(f"{delta_path}_partitioned")
print(f"\nPartitioned Delta table written")


### 5. Reading Excel Files

For Excel files, you'll need to use pandas first, then convert to Spark DataFrame.


In [0]:
# Create sample Excel file using pandas
import pandas as pd

# Create pandas DataFrame
pandas_df = pd.DataFrame({
    'employee_id': [1, 2, 3, 4, 5],
    'name': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
    'department': ['Engineering', 'Sales', 'Engineering', 'Marketing', 'Sales'],
    'salary': [75000, 65000, 80000, 60000, 70000]
})

# Save to Excel (this would typically be uploaded to /FileStore)
# For demo, we'll convert directly to Spark
print("Pandas DataFrame:")
print(pandas_df)

# Convert pandas to Spark
df_excel = spark.createDataFrame(pandas_df)
print("\nConverted to Spark DataFrame:")
df_excel.show()

# If you have an Excel file in Volume:
# pandas_df = pd.read_excel("/dbfs/FileStore/tables/data.xlsx")
# df_excel = spark.createDataFrame(pandas_df)


## Reading from Multiple Files and Directories

Spark can read from multiple files and directories efficiently.


In [0]:
# Create multiple files for demonstration
base_path = "/Volumes/workspace/default/databricks_demo/multi_file"

# Create data for different dates
for i in range(1, 4):
    date_data = [
        (f"2024-01-0{i}", f"Product A", 100.0 + i * 10, 10 + i),
        (f"2024-01-0{i}", f"Product B", 150.0 + i * 10, 15 + i),
    ]
    date_df = spark.createDataFrame(date_data, ["date", "product", "amount", "quantity"])
    date_df.write.mode("overwrite").parquet(f"{base_path}/date=2024-01-0{i}")

print("Created multiple partitioned files")

# Read all files at once
df_all = spark.read.parquet(f"{base_path}/*")
print("\nReading all files:")
df_all.show()

# Read with partition discovery
df_partitioned = spark.read.option("basePath", base_path).parquet(f"{base_path}/date=2024-01-01", 
                                                                   f"{base_path}/date=2024-01-02")
print("\nReading specific partitions:")
df_partitioned.show()


## Working with Large Datasets

When working with large datasets, consider these best practices:

1. **Use Parquet or Delta**: Better compression and performance
2. **Partition your data**: Improves query performance
3. **Use appropriate file sizes**: 128MB - 1GB per file is optimal
4. **Lazy evaluation**: Spark only executes when an action is called
5. **Caching**: Cache frequently used DataFrames


In [0]:
# Create a larger dataset for demonstration
from pyspark.sql.functions import col, rand, when

# Generate 10,000 rows
large_df = spark.range(0, 500000).withColumn(
    "value", rand() * 100
).withColumn(
    "category", 
    when(rand() < 0.33, "A")
    .when(rand() < 0.66, "B")
    .otherwise("C")
)

print(f"Created DataFrame with {large_df.count()} rows")

# Write with optimal partitioning
output_path = "/Volumes/workspace/default/databricks_demo/large_dataset"
large_df.write.mode("overwrite").partitionBy("category").parquet(output_path)

# Check file sizes
print("\nFile sizes in output directory:")
files = dbutils.fs.ls(f"{output_path}/category=A")
for file in files[:5]:  # Show first 5 files
    print(f"  {file.name}: {file.size / 1024 / 1024:.2f} MB")


## Best Practices for Data Storage

1. **Choose the right format**:
   - **Parquet**: For analytics workloads (recommended)
   - **Delta**: For ACID transactions and time travel
   - **CSV**: Only for small datasets or compatibility
   - **JSON**: For semi-structured data

2. **Partition your data**:
   - Partition by date, region, or other frequently filtered columns
   - Avoid over-partitioning (too many small files)
   - Aim for 128MB - 1GB per partition

3. **File naming**:
   - Use descriptive names
   - Include date/version in path structure
   - Use partitioning instead of file naming for organization

4. **Storage location**:
   - Use ADLS Gen2 for production data
   - Use DBFS /tmp for temporary files
   - Use /FileStore for small uploads

5. **Security**:
   - Use Service Principal for authentication
   - Store credentials in Databricks Secret Scopes
   - Use mount points for frequently accessed storage


## Summary

In this module, you learned:

✅ **DBFS** - Databricks File System structure and usage

✅ **Mounting ADLS Gen2** - How to mount external storage using Service Principal or Access Key

✅ **Direct access** - Accessing ADLS Gen2 without mounting

✅ **File formats** - Reading and writing CSV, JSON, Parquet, Delta, and Excel files

✅ **Multiple files** - Reading from directories and multiple files

✅ **Large datasets** - Best practices for working with big data

✅ **Storage best practices** - Guidelines for efficient data storage

### Next Steps

In the next module, we'll explore:
- Spark SQL in Databricks
- Advanced DataFrame operations
- Working with temporary views and global views
- Performance optimization techniques


## Exercise

Try these exercises to practice:

1. Create a mount point to your ADLS Gen2 storage (if you have access)
2. Upload a CSV file to FileStore and read it into a Spark DataFrame
3. Create a partitioned Parquet dataset with at least 1000 rows
4. Read data from multiple files in a directory
5. Compare the file sizes of CSV vs Parquet for the same dataset
6. Write a DataFrame to Delta format and read it back
