#Telecom Domain Read & Write Ops Assignment - Building Datalake & Lakehouse
This notebook contains assignments to practice Spark read options and Databricks volumes. <br>
Sections: Sample data creation, Catalog & Volume creation, Copying data into Volumes, Path glob/recursive reads, toDF() column renaming variants, inferSchema/header/separator experiments, and exercises.<br>

![](https://fplogoimages.withfloats.com/actual/68009c3a43430aff8a30419d.png)
![](https://theciotimes.com/wp-content/uploads/2021/03/TELECOM1.jpg)

##First Import all required libraries & Create spark session object

##1. Write SQL statements to create:
1. A catalog named telecom_catalog_assign
2. A schema landing_zone
3. A volume landing_vol
4. Using dbutils.fs.mkdirs, create folders:<br>
/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/
/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/
/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/
5. Explain the difference between (Just google and understand why we are going for volume concept for prod ready systems):<br>
a. Volume vs DBFS/FileStore<br>
b. Why production teams prefer Volumes for regulated data<br>

In [0]:
%sql
create catalog if not exists telecom_catalog_assign;
create schema if not exists landing_zone;
create volume if not exists landing_vol;

# In the above cell,I didnt mentioned the full schema name(telecom_catalog_assign.landing_zone) and volume name(telecom_catalog_assign.landing_zone.landing_vol.So I dropped the schema and volume here.

In [0]:
%sql



drop schema if exists landing_zone;
drop volume if exists landing_vol;


In [0]:
%sql
create schema if not exists telecom_catalog_assign.landing_zone;
create volume if not exists telecom_catalog_assign.landing_zone.landing_vol;
    


In [0]:
# Created New directories
dbutils.fs.mkdirs("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/")
dbutils.fs.mkdirs("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/")
dbutils.fs.mkdirs("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/")

**_Volumes_** are the modern, governed way to store files in Databricks using Unity Catalog paths like /Volumes/catalog/schema/volume/file.csv—think secure folders with permissions and tracking.
​

**_DBFS/FileStore_** is the older filesystem (paths like dbfs:/FileStore/file.csv) for quick workspace files without much governance.
​

_Volumes_ provide centralized governance, security, and auditing for production files through Unity Catalog, preventing unauthorized access and tracking usage across teams.
​

Key Production Benefits
Permissions Control: Fine-grained access (e.g., only data engineers read raw files, analysts read processed)—no one bypasses Unity Catalog rules.
​

Audit & Lineage: Tracks who accessed/modified files, integrates with table lineage for full pipeline visibility (critical for compliance like GDPR).
​

Sharing Across Workspaces: Securely share volumes between dev/test/prod environments or users without copying data.
​

Why Not DBFS/FileStore?
DBFS lacks these controls—anyone with workspace access can read/write, no audit trail, and it's not scalable for teams or regulated industries.
​

Use Volumes in production to avoid "wild west" file access and ensure your data pipelines stay compliant and secure.
​


##Data files to use in this usecase:
customer_csv = '''
101,Arun,31,Chennai,PREPAID
102,Meera,45,Bangalore,POSTPAID
103,Irfan,29,Hyderabad,PREPAID
104,Raj,52,Mumbai,POSTPAID
105,,27,Delhi,PREPAID
106,Sneha,abc,Pune,PREPAID
'''

usage_tsv = '''customer_id\tvoice_mins\tdata_mb\tsms_count
101\t320\t1500\t20
102\t120\t4000\t5
103\t540\t600\t52
104\t45\t200\t2
105\t0\t0\t0
'''

tower_logs_region1 = '''event_id|customer_id|tower_id|signal_strength|timestamp
5001|101|TWR01|-80|2025-01-10 10:21:54
5004|104|TWR05|-75|2025-01-10 11:01:12
'''

##2. Filesystem operations
1. Write dbutils.fs code to copy the above datasets into your created Volume folders:
Customer → /Volumes/.../customer/
Usage → /Volumes/.../usage/
Tower (region-based) → /Volumes/.../tower/region1/ and /Volumes/.../tower/region2/

2. Write a command to validate whether files were successfully copied

In [0]:
# Writing the files inside the Volumes
customer_csv = ''' 101,Arun,31,Chennai,PREPAID 102,Meera,45,Bangalore,POSTPAID 103,Irfan,29,Hyderabad,PREPAID 104,Raj,52,Mumbai,POSTPAID 105,,27,Delhi,PREPAID 106,Sneha,abc,Pune,PREPAID '''
dbutils.fs.put("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer.csv", customer_csv, True)
usage_tsv = '''customer_id\tvoice_mins\tdata_mb\tsms_count 101\t320\t1500\t20 102\t120\t4000\t5 103\t540\t600\t52 104\t45\t200\t2 105\t0\t0\t0 '''
dbutils.fs.put("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage.tsv", usage_tsv, True)
tower_logs_region1 = '''event_id|customer_id|tower_id|signal_strength|timestamp 5001|101|TWR01|-80|2025-01-10 10:21:54 5004|104|TWR05|-75|2025-01-10 11:01:12 '''
dbutils.fs.put("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_logs_region1.txt", tower_logs_region1, True)



In [0]:
#Listing the files
display(dbutils.fs.ls("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/"))
print(dbutils.fs.head("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer.csv"))
display(dbutils.fs.ls("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/"))
display(dbutils.fs.ls("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/"))




##3. Spark Directory Read Use Cases
1. Read all tower logs using:
Path glob filter (example: *.csv)
Multiple paths input
Recursive lookup

2. Demonstrate these 3 reads separately:
Using pathGlobFilter
Using list of paths in spark.read.csv([path1, path2])
Using .option("recursiveFileLookup","true")

3. Compare the outputs and understand when each should be used.

In [0]:
#Reading the files using Recursivefilelookup and pathglobalfilter
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.appName("Spark DataFrames").getOrCreate()
df1=spark.read.text("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/",pathGlobFilter="*",recursiveFileLookup=True)
#df2=spark.read.text("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/",pathGlobFilter="*")
print(dbutils.fs.head("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_logs_region1.txt"))
display(df1)
#display(df2)

In [0]:
#Reading the files using Recursivefilelookup and pathglobalfilter
from pyspark.sql.session import SparkSession
spark=SparkSession.builder.appName("Spark DataFrames").getOrCreate()
paths = [
    "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/",
    "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/",
    "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/"
]
df2=spark.read.options(pathGlobFilter="*",recursiveFileLookup=True).text(paths)
display(df2)

##4. Schema Inference, Header, and Separator
1. Try the Customer, Usage files with the option and options using read.csv and format function:<br>
header=false, inferSchema=false<br>
or<br>
header=true, inferSchema=true<br>
2. Write a note on What changed when we use header or inferSchema  with true/false?<br>
3. How schema inference handled “abc” in age?<br>

In [0]:
#Reading file using the option & options format
from pyspark.sql.session import SparkSession
spark=SparkSession.builder.appName("Spark DataFrames").getOrCreate()
df1=spark.read.option("header","false").option("inferSchema","false").csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/")
display(df1)
df1.printSchema()
df2=spark.read.options(header="false",inferSchema="True").csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/")
display(df2)
df2.printSchema()


**header=True**
Takes the __first_ record_ as header<br>
**inferSchema=True**
It find the each columns datatypes appropriately.Generally CSV consider all the datas as a _"String"_ datatype<br>
**Printschema**
It is used to print the infered datatypes


**How schema inference handled “abc” in age?**
It handles "abc" as a "String" datatype

##5. Column Renaming Usecases
1. Apply column names using string using toDF function for customer data
2. Apply column names and datatype using the schema function for usage data
3. Apply column names and datatype using the StructType with IntegerType, StringType, TimestampType and other classes for towers data 

In [0]:
#Applying the toDF method and some formats like sep,lineSep
df1=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/",sep=",",inferSchema="true",header="true",lineSep=" ").toDF("customer_id","name","age","city","plan")
df1.show()
df1.printSchema()#In the age column we have a data as abc,so it consider the datatype as String


usage_tsv = '''customer_id\tvoice_mins\tdata_mb\tsms_count 101\t320\t1500\t20 102\t120\t4000\t5 103\t540\t600\t52 104\t45\t200\t2 105\t0\t0\t0 '''

In [0]:
schema_val="customer_id int,voice_mins int,data_mb int,sms_count int"
df1_sam=spark.read.schema(schema_val).options(sep="\t",lineSep=" ",header="true").csv(path="/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage.tsv")
df1_sam.display()


In [0]:
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,TimestampType
custom_schema=StructType([StructField("event_id",IntegerType()),
                          StructField("customer_id",IntegerType()),
                          StructField("tower_id",StringType()),
                          StructField("signal_strength",IntegerType()),
                          StructField("timestamp",TimestampType())])
df1=spark.read.format("csv").options(sep="|",linesep=" ",header="true").schema(custom_schema).load("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_logs_region1.txt")
df1.display()
df1.printSchema()


In the above output(Timestamp),time is not displayed.Because of the linesep=" ",It takes the space between the date,time and displays the time into newline.But the datatype of each column is not matched,it displays null value.

In [0]:
#I tried this program by asking databricks to infer the schema by what datatype it understood and how it displays
df1 = spark.read.format("csv").options(header="true",sep="|",inferSchema="true",linesep=" ").load("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_logs_region1.txt")
display(df1)

## Spark Write Operations using 
- csv, json, orc, parquet, delta, saveAsTable, insertInto, xml with different write mode, header and sep options

##6. Write Operations (Data Conversion/Schema migration) – CSV Format Usecases
1. Write customer data into CSV format using overwrite mode
2. Write usage data into CSV format using append mode
3. Write tower data into CSV format with header enabled and custom separator (|)
4. Read the tower data in a dataframe and show only 5 rows.
5. Download the file into local from the catalog volume location and see the data of any of the above files opening in a notepad++.

In [0]:
from pyspark.sql.session import SparkSession
spark=SparkSession.builder.appName("Spark Dataframes").getOrCreate()
customer_csv = ''' 101,Arun,31,Chennai,PREPAID 102,Meera,45,Bangalore,POSTPAID 103,Irfan,29,Hyderabad,PREPAID 104,Raj,52,Mumbai,POSTPAID 105,,27,Delhi,PREPAID 106,Sneha,abc,Pune,PREPAID '''
dfnew=spark.createDataFrame(customer_csv)
display(dfnew)
dfnew.write.format("csv").save("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/custnew.csv",mode="overwrite",sep=",",lineSep=" ")
display(dfnew)
dfnew.printSchema() 

In [0]:
dfnew=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/custnew.csv")
display(dfnew)
dfnew.printSchema()

In [0]:
df1.write.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/custwrt.csv",header="false",sep=",",lineSep=" ",mode="overwrite")
df2=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/custwrt.csv")
display(df2)
df2.printSchema()

In [0]:
df1.write.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usagenew.csv",header="false",sep=",",lineSep=" ",mode="append")

In [0]:
#df1.write.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_logs_new_region.csv",header="True",sep="|",lineSep=" ")
df2=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_logs_new_region.csv",header="true",sep="|",lineSep=" ")
df2.show(2)
df2.printSchema()

##7. Write Operations (Data Conversion/Schema migration)– JSON Format Usecases
1. Write customer data into JSON format using overwrite mode
2. Write usage data into JSON format using append mode and snappy compression format
3. Write tower data into JSON format using ignore mode and observe the behavior of this mode
4. Read the tower data in a dataframe and show only 5 rows.
5. Download the file into local harddisk from the catalog volume location and see the data of any of the above files opening in a notepad++.

In [0]:
df_cust=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/custwrt.csv", lineSep=" ", sep=",")
display(df_cust)
df_cust.write.json("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/cust_json.json", mode="overwrite")
df_cust_json=spark.read.json("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/cust_json.json") 
display(df_cust_json.limit(3))

In [0]:
df_usage=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage.tsv",header="false",sep="\t",lineSep=" ")
display(df_usage)
df_usage.write.json("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_json.json", mode="ignore", compression="snappy")
df_usage_json=spark.read.json("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_json.json") 
display(df_usage_json)

In [0]:
df_tower=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_logs_new_region.csv",header="true",sep="|",lineSep=" ")
display(df_tower)
df_tower.write.json("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_json.json", mode="ignore")
df_tower_json=spark.read.json("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_json.json") 
display(df_tower_json)

##8. Write Operations (Data Conversion/Schema migration) – Parquet Format Usecases
1. Write customer data into Parquet format using overwrite mode and in a gzip format
2. Write usage data into Parquet format using error mode
3. Write tower data into Parquet format with gzip compression option
4. Read the usage data in a dataframe and show only 5 rows.
5. Download the file into local harddisk from the catalog volume location and see the data of any of the above files opening in a notepad++.

In [0]:
df_cust=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/custwrt.csv", lineSep=" ", sep=",")
display(df_cust)
df_cust.write.parquet("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/cust_parquet", mode="overwrite", compression="gzip")
df_cust_parquet=spark.read.parquet("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/cust_parquet")
display(df_cust_parquet)

In [0]:
df_usage=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage.tsv",header="true",sep="\t",lineSep=" ")
display(df_usage)
df_usage.write.parquet("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_parquet", mode="error")
df_usage_parquet=spark.read.parquet("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_parquet")
display(df_usage_parquet)

##9. Write Operations (Data Conversion/Schema migration) – Orc Format Usecases
1. Write customer data into ORC format using overwrite mode
2. Write usage data into ORC format using append mode
3. Write tower data into ORC format and see the output file structure
4. Read the usage data in a dataframe and show only 5 rows.
5. Download the file into local harddisk from the catalog volume location and see the data of any of the above files opening in a notepad++.

In [0]:
df_tower=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_logs_new_region.csv",header="true",sep="|",lineSep=" ")
display(df_tower)
df_tower.write.orc("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_orc", mode="append")
df_tower_orc=spark.read.orc("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_orc")
display(df_tower_orc)

##10. Write Operations (Data Conversion/Schema migration) – Delta Format Usecases
1. Write customer data into Delta format using overwrite mode
2. Write usage data into Delta format using append mode
3. Write tower data into Delta format and see the output file structure
4. Read the usage data in a dataframe and show only 5 rows.
5. Download the file into local harddisk from the catalog volume location and see the data of any of the above files opening in a notepad++.
6. Compare the parquet location and delta location and try to understand what is the differentiating factor, as both are parquet files only.

In [0]:
df_tower=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_logs_new_region.csv",header="true",sep="|",lineSep=" ")
display(df_tower)
df_tower.write.format("delta").save("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_delta", mode="overwrite")
df_tower_delta=spark.read.load("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_delta")
display(df_tower_delta)

##11. Write Operations (Lakehouse Usecases) – Delta table Usecases
1. Write customer data using saveAsTable() as a managed table
2. Write usage data using saveAsTable() with overwrite mode
3. Drop the managed table and verify data removal
4. Go and check the table overview and realize it is in delta format in the Catalog.
5. Use spark.read.sql to write some simple queries on the above tables created.


In [0]:
df_cust=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/custwrt.csv", lineSep=" ", sep=",")
display(df_cust)
df_cust.write.saveAsTable("cust_table", mode="overwrite")
df_cust_table=spark.read.table("cust_table")
display(df_cust_table)
df1=spark.sql("""select * from cust_table where _c3='Chennai'""")
display(df1)

In [0]:
df_tower=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_logs_new_region.csv",header="true",sep="|",lineSep=" ")
df_tower.write.insertInto("cust_table")
df_tower_delta=spark.read.table("cust_table")
display(df_tower_delta)

In [0]:
%sql
drop table if exists cust_table

##12. Write Operations (Lakehouse Usecases) – Delta table Usecases
1. Write customer data using insertInto() in a new table and find the behavior
2. Write usage data using insertTable() with overwrite mode

Below code is to test the 1st usecase as mentioned above. We Can't insertinto a "New" table. it is only possible to insertinto an "Existing" table.

In [0]:
df_cust=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/custwrt.csv", lineSep=" ", sep=",")
display(df_cust)
df_tower.write.insertInto("cust_table_new")
df_tower_delta=spark.read.table("cust_table_new")
display(df_tower_delta)

##13. Write Operations (Lakehouse Usecases) – Delta table Usecases
1. Write customer data into XML format using rowTag as cust
2. Write usage data into XML format using overwrite mode with the rowTag as usage
3. Download the xml data and open the file in notepad++ and see how the xml file looks like.

In [0]:
df_cust_parquet=spark.read.parquet("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/cust_parquet")
display(df_cust_parquet)
df_cust_parquet.write.xml("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/cust_xml",mode="append", rowTag="customer", rootTag="Cust_Check")
df_cust_xml=spark.read.format("xml").option("rowTag","customer").load("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/cust_xml")
display(df_cust_xml)

##14. Compare all the downloaded files (csv, json, orc, parquet, delta and xml) 
1. Capture the size occupied between all of these file formats and list the formats below based on the order of size from small to big.

###15. Try to do permutation and combination of performing Schema Migration & Data Conversion operations like...
1. Read any one of the above orc data in a dataframe and write it to dbfs in a parquet format
2. Read any one of the above parquet data in a dataframe and write it to dbfs in a delta format
3. Read any one of the above delta data in a dataframe and write it to dbfs in a xml format
4. Read any one of the above delta table in a dataframe and write it to dbfs in a json format
5. Read any one of the above delta table in a dataframe and write it to another table

##16. Do a final exercise of defining one/two liner of... 
1. When to use/benifits csv
2. When to use/benifits json
3. When to use/benifit orc
4. When to use/benifit parquet
5. When to use/benifit delta
6. When to use/benifit xml
7. When to use/benifit delta tables


**When to use/benefits csv**- when we have the data in structured format, we can use CSV.

**When to use/benefits json/XML** - when we have the data in semi structured format (dictonary like Key:value pair), we can use json. It will be used between APIs as its commonly used format. XML is rarely used though.

**When to use/benefit orc
When to use/benefit parquet
When to use/benefit delta**

All the above formats shall be used to store big amount of data in efficient storage & secured way, process optimization in downstream systems. It will be used for interoperability (Between different platforms like Bigquery, Azure, etc..)

