#Telecom Domain Read & Write Ops - Building Datalake & Lakehouse
This notebook contains assignments to practice Spark read options and Databricks volumes. <br>
Sections: Sample data creation, Catalog & Volume creation, Copying data into Volumes, Path glob/recursive reads, toDF() column renaming variants, inferSchema/header/separator experiments, and exercises.<br>

In [0]:
%python
from pyspark.sql.session import SparkSession
print(spark)#already instantiated by databricks
spark1=SparkSession.builder.getOrCreate()
print(spark1)#we instantiated

In [0]:
%sql
create catalog if not exists telecom_catalog_assign;
create schema if not exists telecom_catalog_assign.landing_zone;
create volume if not exists telecom_catalog_assign.landing_zone.landing_vol;

In [0]:
for folder in [
      "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/",
    "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/",
    "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/",
]:
  dbutils.fs.mkdirs(folder)


In [0]:
for subfolder in [
       "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/region1",
    "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/region2"
]:
  dbutils.fs.mkdirs(subfolder)

####DBFS
1. DBFS / FileStore (Old approach)
**What it is**
DBFS (Databricks File System) is a workspace-level file abstraction
FileStore is a publicly accessible subfolder of DBFS
Mainly designed for experimentation, demos, notebooks
**Key characteristics**
Not governed by Unity Catalog
No fine-grained access control (only workspace-level permissions)
No table-level or column-level lineage
FileStore files can be exposed via public URLs
Weak auditability
**Typical usage**
Temporary files
Sample datasets
Notebook outputs
Quick testing
**Why it‚Äôs not prod-ready**
‚ùå No centralized governance
‚ùå No row/column/file-level security
‚ùå Hard to audit ‚Äúwho accessed what‚Äù
‚ùå Not compliant for sensitive data

#### VOLUMES
**What it is**
Volumes are governed storage objects under Unity Catalog
They provide secure file storage similar to tables
Backed by cloud storage (ADLS / S3 / GCS)

**Key characteristics**
Fully integrated with Unity Catalog
Supports fine-grained access control
Audited and tracked
Secure, no public URLs
Clear ownership and lifecycle management
**Typical usage**
Ingestion landing zones
Raw / bronze data
ML artifacts
Regulated datasets
Production pipelines
**Why it is prod-ready**
‚úÖ Central governance
‚úÖ Strong security & compliance
‚úÖ Auditing & lineage
‚úÖ Works across all Databricks workspaces

#### why volume instead of dbfs?
DBFS/FileStore is meant for development and experimentation, while Volumes are Unity Catalog‚Äìgoverned, secure, auditable storage objects designed for production and regulated data. Production teams prefer Volumes because they provide fine-grained access control, auditability, and compliance that DBFS cannot offer.

##### b. Why production teams prefer Volumes for regulated data?
**1. Regulated data needs governance**
Regulated data includes:
PII (Aadhaar, PAN, phone, email)
Financial data
Healthcare data
Customer records
Production teams must answer:
Who accessed this data?
When was it accessed?
Was access authorized?
‚û°Ô∏è DBFS cannot answer these questions reliably
‚û°Ô∏è Volumes can

**2. Fine-grained access control (critical)**
With Volumes, teams can:
GRANT READ FILES ON VOLUME main.sales.raw_data TO analyst_role;
This means:
Only authorized roles can read/write
Access can be revoked instantly
No accidental exposure

DBFS:
Either you have workspace access or you don‚Äôt
No file-level control

**3. Audit & compliance (non-negotiable)**
Regulators require:
Audit logs
Access history
Ownership tracking
Volumes provide:
‚úÖ Who accessed which file
‚úÖ Which pipeline wrote the data
‚úÖ When access happened
DBFS:
‚ùå Weak or no audit trail

**4. Separation of concerns (clean architecture)**
Raw data  ‚Üí  Processed data  ‚Üí  Curated data
Volumes help enforce this:
Raw volumes (restricted)
Processed volumes (controlled)
Curated tables (consumer-facing)
DBFS mixes everything ‚Üí chaos in prod.

**5. Future-proof & multi-workspace support**

Volumes:
Work across multiple Databricks workspaces
Central governance via Unity Catalog
Scales for enterprise growth

DBFS:
Tied to a single workspace
Legacy approach



---------------------------------------------------------------------------------

In [0]:
customer_csv = '''
101,Arun,31,Chennai,PREPAID
102,Meera,45,Bangalore,POSTPAID
103,Irfan,29,Hyderabad,PREPAID
104,Raj,52,Mumbai,POSTPAID
105,,27,Delhi,PREPAID
106,Sneha,abc,Pune,PREPAID
'''
# we use put as we are dealing with smaller dataset
#dbutils.fs.put() is used to create or overwrite small text-based files in Databricks storage (DBFS or Volumes) by writing string content directly.
'''Because overwrite=False is a safety flag:
Prevents accidental data loss
Forces you to explicitly allow replacement
When overwrite=False, dbutils.fs.put() will fail if the file already exists, protecting against accidental overwrites; when True, it replaces the existing file.
'''
dbutils.fs.put("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer.csv",customer_csv,overwrite=True)

#tsv-tab separeated values
usage_tsv = '''customer_id\tvoice_mins\tdata_mb\tsms_count
101\t320\t1500\t20
102\t120\t4000\t5
103\t540\t600\t52
104\t45\t200\t2
105\t0\t0\t0
'''
dbutils.fs.put("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage.tsv",usage_tsv,overwrite=True)

tower_logs_region1 = '''event_id|customer_id|tower_id|signal_strength|timestamp
5001|101|TWR01|-80|2025-01-10 10:21:54
5004|104|TWR05|-75|2025-01-10 11:01:12
'''
dbutils.fs.put("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/region1/tower_logs_region1.csv",tower_logs_region1,overwrite=True)

tower_logs_region2 = '''event_id|customer_id|tower_id|signal_strength|timestamp
6001|102|TWR01|-90|2025-02-15 10:21:54
6004|106|TWR05|-55|2025-02-15 11:01:12
'''
dbutils.fs.put("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/region2/tower_logs_region2.csv",tower_logs_region1,overwrite=True)





In [0]:
paths = [
    "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer",
    "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage",
    "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower"
]

for path in paths:
    files = dbutils.fs.ls(path)
    if files:
        print(f"{path} ‚Üí {len(files)} files found")
    else:
        print(f"{path} ‚Üí No files found!")

In [0]:
#Read all tower logs using: Path glob filter (example: *.csv) Multiple paths input Recursive lookup
'''
Purpose: Read all files in a folder and its nested subfolders.
Default behavior: Spark only reads files in the top-level folder.
Option: Set .option("recursiveFileLookup", "true") to include subfolders.
'''
df=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/region*",recursiveFileLookup=True)
print(f"Total rows in all tower logs: {df.count()}")
#Demonstrate these 3 reads separately: Using pathGlobFilter Using list of paths in spark.read.csv([path1, path2]) Using .option("recursiveFileLookup","true")
df=spark.read.option("recursiveFileLookup","true").csv(['/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/region1/tower_logs_region1.csv','/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/region2/tower_logs_region2.csv'])
print(f"Total rows in all tower logs: {df.count()}")

df=spark.read.options(header=True,inferSchema=True,recursiveFileLookup=True,pathGlobFilter="*.csv",sep='|').format('csv').load("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/region*")
df.show(2)

In [0]:
'''Try the Customer, Usage files with the option and options using read.csv and format function:
header=false, inferSchema=false
or
header=true, inferSchema=true'''
df=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer.csv",header=True,inferSchema=False)
df.printSchema()
df.show()
df=spark.read.options(header=True,inferSchema=True).csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer.csv")
df.printSchema()
df.show()

#if infereschema is false all the columns will be treated as string,and also the first row of the dataset is set as the header check the columns names
#How schema inference handled ‚Äúabc‚Äù in age? it is treated as a string column instead of int

In [0]:
#Apply column names using string using toDF function for customer data
df=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer.csv",header=False,inferSchema=True).toDF("customer_id","name","age","city","plan")
df.printSchema()
df.show()
#Apply column names and datatype using the schema function for usage data
schema_data="""
customer_id int,
name string,
age string,
city string
"""
df=spark.read.schema(schema_data).csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage.tsv",header=True,sep="\t")
df.printSchema()
df.show()
    # IF HEADER FALSE IN HERE FIRST ROW IS HEADER INFO GIVEN IN DATASET BECOZ OF WHICH IT ADDED AS ROW IN THE DATAFRAME
    #||||here is nullable is not given so we can zero added in those places||||

In [0]:
#Apply column names and datatype using the StructType with IntegerType, StringType, TimestampType and other classes for towers data
from pyspark.sql.types import *
schema_data=StructType([
    StructField("event_id",IntegerType(),True),
    StructField("customer_id",IntegerType(),True),
    StructField("tower_id",StringType(),True),
    StructField("signalstrength",IntegerType(),True),
    StructField("timestamp",TimestampType(),True)]
)

df1=spark.read.schema(schema_data).csv(f"/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/region*",header=True,sep="|")
df1.printSchema()
df1.show()


####write operations
##### Spark Write Operations using 
- csv, json, orc, parquet, delta, saveAsTable, insertInto, xml with different write mode, header and sep options


In [0]:
%sql

CREATE TABLE IF NOT EXISTS telecom_catalog_assign.landing_zone.ingestion_volume;

In [0]:
%sql
create volume if not exists telecom_catalog_assign.landing_zone.ingestion_volume;

In [0]:
inc_df=spark.read.csv('/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer.csv',header=True,inferSchema=True)
inu_df=spark.read.csv('/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage.tsv',header=True,inferSchema=True)
int_df=spark.read.csv('/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/region2/tower_logs_region2.csv',header=True,inferSchema=True)

In [0]:
#csv write operations in different modes


inc_df.write.csv("/Volumes/telecom_catalog_assign/landing_zone/ingestion_volume/cus_csv_out",header=True,sep="|",mode="overwrite")
inu_df.write.csv("/Volumes/telecom_catalog_assign/landing_zone/ingestion_volume/usage_csv_out",header=True,sep="|",mode="append")
int_df.write.csv("/Volumes/telecom_catalog_assign/landing_zone/ingestion_volume/tower_csv_out",header=True,sep="|",mode="overwrite")
#4 modes of writing - append,overwrite,ignore,error

In [0]:
#orc operations
inc_df.write.orc("/Volumes/telecom_catalog_assign/landing_zone/ingestion_volume/cus_orc_out",mode="overwrite",compression="zlib")
inu_df.write.mode("append").orc("/Volumes/telecom_catalog_assign/landing_zone/ingestion_volume/usageorc_out")
#This is a Unity Catalog Volume (not a regular path on DBFS).
#Volumes are special storage mounts, they do not support all Spark write modes, including errorIfExists.
#Even though Parquet normally supports errorIfExists, Volumes only allow ‚Äúappend‚Äù or ‚Äúoverwrite‚Äù writes, not errorIfExists or Delta-specific modes.
int_df.write.orc("/Volumes/telecom_catalog_assign/landing_zone/ingestion_volume/tower_orc_out",mode="overwrite",compression="zlib")

In [0]:
#parquet operations
inc_df.write.parquet("/Volumes/telecom_catalog_assign/landing_zone/ingestion_volume/cus_p_out",mode="overwrite",compression="gzip")
inu_df.write.mode("append").parquet("/Volumes/telecom_catalog_assign/landing_zone/ingestion_volume/usagep_out")
#This is a Unity Catalog Volume (not a regular path on DBFS).
#Volumes are special storage mounts, they do not support all Spark write modes, including errorIfExists.
#Even though Parquet normally supports errorIfExists, Volumes only allow ‚Äúappend‚Äù or ‚Äúoverwrite‚Äù writes, not errorIfExists or Delta-specific modes.
int_df.write.parquet("/Volumes/telecom_catalog_assign/landing_zone/ingestion_volume/tower_p_out",mode="overwrite",compression="gzip")



In [0]:
#json operations
inc_df.write.option("pretty", "true").json("/Volumes/telecom_catalog_assign/landing_zone/ingestion_volume/cus_json_out",mode="overwrite")
inu_df.write.option("pretty", "true").json("/Volumes/telecom_catalog_assign/landing_zone/ingestion_volume/usage_json_out",mode="append",compression="snappy")
int_df.write.option("pretty", "true").json("/Volumes/telecom_catalog_assign/landing_zone/ingestion_volume/tower_json_out",mode="ignore")
int_df.show(5)

In [0]:

#lakehouse operations
inc_df.write.format("delta").saveAsTable("telecom_catalog_assign.landing_zone.ingest_c_table",mode="overwrite")
#inu_df.write.format("delta").saveAsTable("telecom_catalog_assign.landing_zone.ingest_u_table",mode="append")
int_df.write.format("delta").saveAsTable("telecom_catalog_assign.landing_zone.ingest_t_table",mode="append")
'''Delta Lake does not allow these modes:
errorIfExists
ignore
Writing Delta directly to Volume (NOT allowed)
Unity Catalog Volumes are storage-only and do not support Delta transaction logs, so Delta tables must be written as managed or external tables‚Äînot directly to Volumes.'''
#load() ‚Üí READ data (Input)
#save() ‚Üí WRITE data (Output)

####LAKEHOUSE OPERATION

In [0]:
from pyspark.sql.functions import col, expr

inc_df_fixed = inc_df.select(
    col("id").cast("STRING"),
    col("name").cast("STRING"),
    expr("try_cast(age as INT)").alias("age"),
    col("city").cast("STRING"),
    col("plan").cast("STRING")
)

inc_df_fixed.write.mode("overwrite").insertInto(
    "telecom_catalog_assign.landing_zone.bronze_tablenew"
)

In [0]:
# Remove invalid characters (e.g., tabs, spaces) from column names
inu_df_clean = inu_df.toDF(
    *[c.replace('\t', '_').replace(' ', '_') for c in inu_df.columns]
)

inu_df_clean.write.mode("overwrite").saveAsTable(
    "telecom_catalog_assign.landing_zone.usage_table"
)

In [0]:
inc_df.write.xml("/Volumes/telecom_catalog_assign/landing_zone/ingestion_volume/cust_xml_out",mode="overwrite",rowTag="customer")
inc_df.write.xml("/Volumes/telecom_catalog_assign/landing_zone/ingestion_volume/usage_xml_out",mode="overwrite",rowTag="usage")

#### csv-167 b
#### json-494 b
#### orc-836 b
#### parquet-1.84b
#### xml-837 b


# üìä Data File Formats in Spark ‚Äì When to Use & Benefits

---

## 1Ô∏è‚É£ CSV (Comma Separated Values)

### ‚úÖ When to Use
- Simple data exchange between systems
- Small datasets
- When human readability is required
- Legacy system compatibility

### ‚≠ê Benefits
- Easy to read and write
- Supported by almost all tools
- Lightweight and simple

### ‚ùå Limitations
- No schema enforcement
- No compression by default
- Poor performance on large datasets
- Does not support nested structures

---

## 2Ô∏è‚É£ JSON

### ‚úÖ When to Use
- Semi-structured data
- API responses, logs, streaming data
- Frequently changing schema

### ‚≠ê Benefits
- Supports nested and hierarchical data
- Flexible schema
- Human readable

### ‚ùå Limitations
- Large file size
- Slower parsing
- Not efficient for analytics

---

## 3Ô∏è‚É£ ORC (Optimized Row Columnar)

### ‚úÖ When to Use
- Hive-based data warehouses
- Large analytical workloads
- Hadoop ecosystem

### ‚≠ê Benefits
- Columnar storage
- High compression
- Predicate pushdown
- Faster query performance

### ‚ùå Limitations
- Limited support outside Hive ecosystem
- Not suitable for frequent updates

---

## 4Ô∏è‚É£ Parquet

### ‚úÖ When to Use
- Analytics and reporting workloads
- Spark, Databricks, cloud platforms
- Read-heavy processing

### ‚≠ê Benefits
- Columnar format
- Efficient compression
- Faster reads
- Schema stored with data

### ‚ùå Limitations
- No ACID transactions
- Poor support for updates/deletes

---

## 5Ô∏è‚É£ Delta (Delta Lake Format)

### ‚úÖ When to Use
- Reliable data lakes
- Data with frequent changes
- Production pipelines

### ‚≠ê Benefits
- Built on Parquet
- ACID transactions
- Schema enforcement and evolution
- Time travel support

### ‚ùå Limitations
- Slight write overhead
- Requires Delta-compatible engines

---

## 6Ô∏è‚É£ XML

### ‚úÖ When to Use
- Legacy enterprise systems
- Configuration files
- Data exchange standards (SOAP)

### ‚≠ê Benefits
- Strong schema validation (XSD)
- Self-describing structure

### ‚ùå Limitations
- Very verbose
- Large file size
- Slow processing

---

## 7Ô∏è‚É£ Delta Tables (Managed Delta Lake Tables)

### ‚úÖ When to Use
- Enterprise-grade data platforms
- Unity Catalog enabled environments
- Multi-user analytics

### ‚≠ê Benefits
- All Delta Lake advantages
- Table-level security
- Data lineage and governance
- SQL and Spark interoperability

### ‚ùå Limitations
- Platform dependent (Databricks preferred)

---

## üîÅ Summary Comparison

| Format | Storage | ACID | Compression | Schema | Updates |
|------|--------|------|------------|--------|---------|
| CSV | Row | ‚ùå | ‚ùå | ‚ùå | ‚ùå |
| JSON | Row | ‚ùå | ‚ùå | Flexible | ‚ùå |
| XML | Row | ‚ùå | ‚ùå | Strong | ‚ùå |
| ORC | Column | ‚ùå | ‚úÖ | ‚úÖ | ‚ùå |
| Parquet | Column | ‚ùå | ‚úÖ | ‚úÖ | ‚ùå |
| Delta | Column | ‚úÖ | ‚úÖ | ‚úÖ | ‚úÖ |
| Delta Table | Column | ‚úÖ | ‚úÖ | ‚úÖ | ‚úÖ |

---

## üß† Quick Tip
- **CSV / JSON / XML** ‚Üí Data ingestion & exchange  
- **Parquet / ORC** ‚Üí Analytics & performance  
- **Delta / Delta Tables** ‚Üí Production-ready lakehouse  


#####reading and writing the files in different combinations

In [0]:
spark.read.format("orc").load("/Volumes/telecom_catalog_assign/landing_zone/ingestion_volume/cus_orc_out").write.parquet("/Volumes/telecom_catalog_assign/landing_zone/ingestion_volume/cus_orc_parquet_out",mode="overwrite")
spark.read.format("parquet").load("/Volumes/telecom_catalog_assign/landing_zone/ingestion_volume/cus_p_out").write.mode("overwrite").format("delta").saveAsTable("telecom_catalog_assign.landing_zone.p_to_d_table")

In [0]:
df = spark.table("telecom_catalog_assign.landing_zone.p_to_d_table")
df.write.mode("overwrite").format("delta").saveAsTable(
    "telecom_catalog_assign.landing_zone.d_to_d_table"
)
# use spark.read to read a delta table which is already added under table