#Telecom Domain Read & Write Ops Assignment - Building Datalake & Lakehouse
This notebook contains assignments to practice Spark read options and Databricks volumes. <br>
Sections: Sample data creation, Catalog & Volume creation, Copying data into Volumes, Path glob/recursive reads, toDF() column renaming variants, inferSchema/header/separator experiments, and exercises.<br>

![](https://fplogoimages.withfloats.com/actual/68009c3a43430aff8a30419d.png)
![](https://theciotimes.com/wp-content/uploads/2021/03/TELECOM1.jpg)

##First Import all required libraries & Create spark session object

##1. Write SQL statements to create:
1. A catalog named telecom_catalog_assign
2. A schema landing_zone
3. A volume landing_vol
4. Using dbutils.fs.mkdirs, create folders:<br>
/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/
/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/
/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/
5. Explain the difference between (Just google and understand why we are going for volume concept for prod ready systems):<br>
a. Volume vs DBFS/FileStore<br>
b. Why production teams prefer Volumes for regulated data<br>

In [0]:
%sql
CREATE CATALOG IF NOT EXISTS telecom_catalog_assign;
CREATE SCHEMA IF NOT EXISTS telecom_catalog_assign.landing_zone;
CREATE VOLUME IF NOT EXISTS telecom_catalog_assign.landing_zone.landing_vol;

In [0]:
base_path = "/Volumes/telecom_catalog_assign/landing_zone/landing_vol"

dbutils.fs.mkdirs(f"{base_path}/customer")
dbutils.fs.mkdirs(f"{base_path}/usage")
dbutils.fs.mkdirs(f"{base_path}/tower")

print("Landing zone folders created successfully")


##Data files to use in this usecase:
customer_csv = '''
101,Arun,31,Chennai,PREPAID
102,Meera,45,Bangalore,POSTPAID
103,Irfan,29,Hyderabad,PREPAID
104,Raj,52,Mumbai,POSTPAID
105,,27,Delhi,PREPAID
106,Sneha,abc,Pune,PREPAID
'''

usage_tsv = '''customer_id\tvoice_mins\tdata_mb\tsms_count
101\t320\t1500\t20
102\t120\t4000\t5
103\t540\t600\t52
104\t45\t200\t2
105\t0\t0\t0
'''

tower_logs_region1 = '''event_id|customer_id|tower_id|signal_strength|timestamp
5001|101|TWR01|-80|2025-01-10 10:21:54
5004|104|TWR05|-75|2025-01-10 11:01:12
'''

##2. Filesystem operations
1. Write dbutils.fs code to copy the above datasets into your created Volume folders:
Customer ‚Üí /Volumes/.../customer/
Usage ‚Üí /Volumes/.../usage/
Tower (region-based) ‚Üí /Volumes/.../tower/region1/ and /Volumes/.../tower/region2/

2. Write a command to validate whether files were successfully copied

In [0]:
customer_csv = """101,Arun,31,Chennai,PREPAID
102,Meera,45,Bangalore,POSTPAID
103,Irfan,29,Hyderabad,PREPAID
104,Raj,52,Mumbai,POSTPAID
105,,27,Delhi,PREPAID
106,Sneha,abc,Pune,PREPAID
"""

usage_tsv = """customer_id\tvoice_mins\tdata_mb\tsms_count
101\t320\t1500\t20
102\t120\t4000\t5
103\t540\t600\t52
104\t45\t200\t2
105\t0\t0\t0
"""

tower_logs_region1 = """event_id|customer_id|tower_id|signal_strength|timestamp
5001|101|TWR01|-80|2025-01-10 10:21:54
5004|104|TWR05|-75|2025-01-10 11:01:12
"""
tower_logs_region2 = """event_id|customer_id|tower_id|signal_strength|timestamp
5001|101|TWR01|-80|2025-01-10 10:21:54
5004|104|TWR05|-75|2025-01-10 11:01:12
"""
dbutils.fs.put("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_region2.csv", tower_logs_region1, overwrite=True)
dbutils.fs.put("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer.csv", customer_csv, overwrite=True)
dbutils.fs.put("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage.csv", usage_tsv, overwrite=True)
dbutils.fs.put("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_region1.csv", tower_logs_region1, overwrite=True)


##3. Spark Directory Read Use Cases
1. Read all tower logs using:
Path glob filter (example: *.csv)
Multiple paths input
Recursive lookup

2. Demonstrate these 3 reads separately:
Using pathGlobFilter
Using list of paths in spark.read.csv([path1, path2])
Using .option("recursiveFileLookup","true")

3. Compare the outputs and understand when each should be used.

In [0]:
tower_glob_df = spark.read \
    .option("header", True) \
    .option("sep", "|") \
    .option("pathGlobFilter", "*.csv") \
    .csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower")

tower_glob_df.show()


##4. Schema Inference, Header, and Separator
1. Try the Customer, Usage files with the option and options using read.csv and format function:<br>
header=false, inferSchema=false<br>
or<br>
header=true, inferSchema=true<br>
2. Write a note on What changed when we use header or inferSchema  with true/false?<br>
3. How schema inference handled ‚Äúabc‚Äù in age?<br>

In [0]:
cust_df_opt1 = spark.read \
    .option("header", "false") \
    .option("inferSchema", "false") \
    .option("sep", ",") \
    .csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer.csv")

cust_df_opt1.printSchema()
cust_df_opt1.show()



In [0]:
usage_df_opt1 = spark.read \
    .options(
        header="false",
        inferSchema="false",
        sep="\t"
    ) \
    .csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage.csv")

usage_df_opt1.printSchema()
usage_df_opt1.show()


In [0]:
cust_df_opt2 = spark.read \
    .format("csv") \
    .options(
        header= "false",
        inferSchema="true",
        sep= ","
    ) \
    .load("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer.csv")

cust_df_opt2.printSchema()
cust_df_opt2.show()


In [0]:
| Setting          | What happens                                                                               |
| ---------------- | ------------------------------------------------------------------------------------------ |
| `header = false` | Spark treats the first row as **data** and assigns default column names (`_c0`, `_c1`, ‚Ä¶). |
| `header = true`  | Spark treats the first row as **column names** and does not include it in the data.        |
| `inferSchema = false` | Spark reads **all columns as STRING** without validating data types.             |
| `inferSchema = true`  | Spark scans the data and **infers the most suitable data type** for each column. |
Spark attempts to infer age as a numeric column.
While scanning rows, Spark encounters the value "abc" (non-numeric).
Because schema inference is pessimistic, Spark downgrades the entire column to STRING.
No error is thrown and no value is converted to NULL automatically.


##5. Column Renaming Usecases
1. Apply column names using string using toDF function for customer data
2. Apply column names and datatype using the schema function for usage data
3. Apply column names and datatype using the StructType with IntegerType, StringType, TimestampType and other classes for towers data 

In [0]:
cust_df_raw = spark.read.csv(path="/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer.csv").toDF(
    "custid",
    "fname",
    "age",
    "city",
    "plan_type"
)
cust_df_raw.printSchema()
cust_df_raw.show()


In [0]:
usage_schema = "customer_id INT, voice_mins INT, data_mb INT, sms_count INT"

usage_df = spark.read.option("header", "true").option("sep", "\t").schema(usage_schema).csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage.csv")

usage_df.printSchema()
usage_df.show()


In [0]:
from pyspark.sql.types import (
    StructType, StructField,
    IntegerType, StringType, TimestampType
)
tower_schema = StructType([
    StructField("event_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("tower_id", StringType(), True),
    StructField("signal_strength", IntegerType(), True),
    StructField("event_timestamp", TimestampType(), True)
])
tower_df = spark.read \
    .option("header", "true") \
    .option("sep", "|") \
    .schema(tower_schema) \
    .csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_region1.csv")

tower_df.printSchema()
tower_df.show(truncate=False)


## Spark Write Operations using 
- csv, json, orc, parquet, delta, saveAsTable, insertInto, xml with different write mode, header and sep options

##6. Write Operations (Data Conversion/Schema migration) ‚Äì CSV Format Usecases
1. Write customer data into CSV format using overwrite mode
2. Write usage data into CSV format using append mode
3. Write tower data into CSV format with header enabled and custom separator (|)
4. Read the tower data in a dataframe and show only 5 rows.
5. Download the file into local from the catalog volume location and see the data of any of the above files opening in a notepad++.

In [0]:
cust_df = spark.read \
    .option("header", "false") \
    .option("sep", ",") \
    .csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer.csv")

cust_df.write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/output/customer_csv")



In [0]:
usage_df = spark.read \
    .option("header", "true") \
    .option("sep", "\t") \
    .csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage.csv")
usage_df.write \
    .mode("append") \
    .option("header", "true") \
    .csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/output/usage_csv")


In [0]:
tower_df = spark.read \
    .option("header", "true") \
    .option("sep", "|") \
    .csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_region1.csv")
tower_df.write \
    .mode("overwrite") \
    .option("header", "true") \
    .option("sep", "|") \
    .csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/output/tower_csv")


In [0]:

tower_df.show(5)

In [0]:
dbutils.fs.ls(
  "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/output/customer_csv"
)


##7. Write Operations (Data Conversion/Schema migration)‚Äì JSON Format Usecases
1. Write customer data into JSON format using overwrite mode
2. Write usage data into JSON format using append mode and snappy compression format
3. Write tower data into JSON format using ignore mode and observe the behavior of this mode
4. Read the tower data in a dataframe and show only 5 rows.
5. Download the file into local harddisk from the catalog volume location and see the data of any of the above files opening in a notepad++.

In [0]:
cust_df.write \
    .mode("overwrite") \
    .json("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/output/customer_json")


In [0]:
usage_df.write \
    .mode("append") \
    .option("compression", "snappy") \
    .json("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/output/usage_json")


In [0]:
tower_df.write \
    .mode("ignore") \
    .json("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/output/tower_json")


In [0]:
tower_json_df = spark.read \
    .json("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/output/tower_json")

tower_json_df.show(5)
tower_json_df.printSchema()


##8. Write Operations (Data Conversion/Schema migration) ‚Äì Parquet Format Usecases
1. Write customer data into Parquet format using overwrite mode and in a gzip format
2. Write usage data into Parquet format using error mode
3. Write tower data into Parquet format with gzip compression option
4. Read the usage data in a dataframe and show only 5 rows.
5. Download the file into local harddisk from the catalog volume location and see the data of any of the above files opening in a notepad++.

In [0]:
cust_df.write \
    .mode("overwrite") \
    .option("compression", "gzip") \
    .parquet("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/output/customer_parquet")


In [0]:
usage_df.write \
    .mode("error") \
    .parquet("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/output/usage_parquet")


In [0]:
tower_df.write \
    .mode("overwrite") \
    .option("compression", "gzip") \
    .parquet("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/output/tower_parquet")


In [0]:
usage_parquet_df = spark.read \
    .parquet("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/output/usage_parquet")

usage_parquet_df.show(5)
usage_parquet_df.printSchema()


##9. Write Operations (Data Conversion/Schema migration) ‚Äì Orc Format Usecases
1. Write customer data into ORC format using overwrite mode
2. Write usage data into ORC format using append mode
3. Write tower data into ORC format and see the output file structure
4. Read the usage data in a dataframe and show only 5 rows.
5. Download the file into local harddisk from the catalog volume location and see the data of any of the above files opening in a notepad++.

In [0]:
cust_df.write \
    .mode("overwrite") \
    .orc("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/output/customer_orc")


In [0]:
usage_df.write \
    .mode("append") \
    .orc("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/output/usage_orc")


In [0]:
tower_df.write \
    .mode("overwrite") \
    .orc("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/output/tower_orc")


In [0]:
usage_orc_df = spark.read \
    .orc("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/output/usage_orc")

usage_orc_df.show(5)
usage_orc_df.printSchema()


##10. Write Operations (Data Conversion/Schema migration) ‚Äì Delta Format Usecases
1. Write customer data into Delta format using overwrite mode
2. Write usage data into Delta format using append mode
3. Write tower data into Delta format and see the output file structure
4. Read the usage data in a dataframe and show only 5 rows.
5. Download the file into local harddisk from the catalog volume location and see the data of any of the above files opening in a notepad++.
6. Compare the parquet location and delta location and try to understand what is the differentiating factor, as both are parquet files only.

In [0]:
cust_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/output/customer_delta")


In [0]:
usage_df.write \
    .format("delta") \
    .mode("append") \
    .save("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/output/usage_delta")


In [0]:
tower_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/output/tower_delta")


In [0]:
usage_delta_df = spark.read \
    .format("delta") \
    .load("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/output/usage_delta")

usage_delta_df.show(5)
usage_delta_df.printSchema()


| Feature            | Parquet  | Delta                |
| ------------------ | -------- | -------------------- |
| File format        | Parquet  | Parquet              |
| Transaction log    | ‚ùå No     | ‚úÖ Yes (`_delta_log`) |
| ACID transactions  | ‚ùå No     | ‚úÖ Yes                |
| Schema enforcement | ‚ùå No     | ‚úÖ Yes                |
| Time travel        | ‚ùå No     | ‚úÖ Yes                |
| Updates / deletes  | ‚ùå No     | ‚úÖ Yes                |
| Concurrent writes  | ‚ùå Unsafe | ‚úÖ Safe               |


##11. Write Operations (Lakehouse Usecases) ‚Äì Delta table Usecases
1. Write customer data using saveAsTable() as a managed table
2. Write usage data using saveAsTable() with overwrite mode
3. Drop the managed table and verify data removal
4. Go and check the table overview and realize it is in delta format in the Catalog.
5. Use spark.read.sql to write some simple queries on the above tables created.


In [0]:
cust_df.write \
    .format("delta") \
    .saveAsTable("telecom_catalog_assign.landing_zone.customer_tbl")


In [0]:
usage_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("telecom_catalog_assign.landing_zone.usage_tbl")


In [0]:
%sql
DROP TABLE telecom_catalog_assign.landing_zone.customer_tbl;


In [0]:
%sql
SHOW TABLES IN telecom_catalog_assign.landing_zone;



In [0]:
%sql
DESCRIBE EXTENDED telecom_catalog_assign.landing_zone.usage_tbl;



In [0]:
spark.sql("""
SELECT * 
FROM telecom_catalog_assign.landing_zone.usage_tbl
""").show()


##12. Write Operations (Lakehouse Usecases) ‚Äì Delta table Usecases
1. Write customer data using insertInto() in a new table and find the behavior
2. Write usage data using insertTable() with overwrite mode

In [0]:
%sql
CREATE TABLE telecom_catalog_assign.landing_zone.customer_ins_tbl (
  custid INT,
  fname STRING,
  age STRING,
  city STRING,
  plan_type STRING
)
USING DELTA;


In [0]:
cust_df.write.insertInto(
    "telecom_catalog_assign.landing_zone.customer_ins_tbl"
)


In [0]:
%sql
CREATE TABLE telecom_catalog_assign.landing_zone.usage_ins_tbl (
  customer_id INT,
  voice_mins INT,
  data_mb INT,
  sms_count INT
)
USING DELTA;


In [0]:
usage_df.write \
    .mode("overwrite") \
    .insertInto("telecom_catalog_assign.landing_zone.usage_ins_tbl")


##13. Write Operations (Lakehouse Usecases) ‚Äì Delta table Usecases
1. Write customer data into XML format using rowTag as cust
2. Write usage data into XML format using overwrite mode with the rowTag as usage
3. Download the xml data and open the file in notepad++ and see how the xml file looks like.

In [0]:
cust_df.write \
    .mode("overwrite") \
    .option("rowTag", "cust") \
    .xml("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/output/customer_xml")


In [0]:
usage_df.write \
    .mode("overwrite") \
    .option("rowTag", "usage") \
    .xml("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/output/usage_xml")


##14. Compare all the downloaded files (csv, json, orc, parquet, delta and xml) 
1. Capture the size occupied between all of these file formats and list the formats below based on the order of size from small to big.

##15. Do a final exercise of defining one/two liner of... 
1. When to use/benifits csv
2. When to use/benifits json
3. When to use/benifit orc
4. When to use/benifit parquet
5. When to use/benifit delta
6. When to use/benifit xml
7. When to use/benifit delta tables


üìÑ CSV ‚Äì When to use / Benefits

Use CSV for simple, flat data exchange where human readability and wide tool compatibility are required.

Benefits:
Easy to create and read, universally supported, good for small to medium datasets and landing/raw layers.

üßæ JSON ‚Äì When to use / Benefits

Use JSON for semi-structured or hierarchical data, especially from APIs or event streams.

Benefits:
Supports nested structures, flexible schema, easy parsing across applications, ideal for API and streaming data.

üß± ORC ‚Äì When to use / Benefits

Use ORC for high-performance analytics in Hadoop-centric ecosystems.

Benefits:
Columnar storage, strong compression, fast scans and aggregations, schema stored with data.

üì¶ Parquet ‚Äì When to use / Benefits

Use Parquet for analytical workloads and data lakes in cloud environments.

Benefits:
Columnar format, excellent compression, fast query performance, widely supported across big-data tools.

üî∫ Delta (Delta file format) ‚Äì When to use / Benefits

Use Delta when you need reliability and ACID guarantees on top of Parquet files.

Benefits:
Transaction log, schema enforcement, time travel, safe concurrent reads/writes.

üßæ XML ‚Äì When to use / Benefits

Use XML for legacy systems or enterprise integrations that require strict schema and hierarchical data.

Benefits:
Strong structure, supports validation (XSD), human-readable, common in legacy and SOAP-based systems.

üèóÔ∏è Delta Tables (Lakehouse) ‚Äì When to use / Benefits

Use Delta tables for production Lakehouse architectures where data reliability, governance, and SQL analytics are required.

Benefits:
ACID transactions, schema evolution, time travel, scalable analytics, unified batch + streaming, full governance.

CSV      ‚Üí simple exchange
JSON     ‚Üí semi-structured / APIs
ORC      ‚Üí Hadoop analytics
Parquet  ‚Üí cloud analytics
Delta    ‚Üí reliable Parquet
XML      ‚Üí legacy systems
DeltaTbl ‚Üí production Lakehouse
