#Telecom Domain Write Ops Assignment - Building Datalake & Lakehouse
This notebook contains assignments to practice Spark read options and Databricks volumes. <br>
Sections: Sample data creation, Catalog & Volume creation, Copying data into Volumes, Path glob/recursive reads, toDF() column renaming variants, inferSchema/header/separator experiments, and exercises.<br>

##First Import all required libraries & Create spark session object

## Spark Write Operations using 
- csv, json, orc, parquet, delta, saveAsTable, insertInto, xml with different write mode, header and sep options

##6. Write Operations (Data Conversion/Schema migration) – CSV Format Usecases
1. Write customer data into CSV format using overwrite mode
2. Write usage data into CSV format using append mode
3. Write tower data into CSV format with header enabled and custom separator (|)
4. Read the tower data in a dataframe and show only 5 rows.
5. Download the file into local from the catalog volume location and see the data of any of the above files opening in a notepad++.

In [0]:
#Write customer data into CSV format using overwrite mode
#6.1. Write customer data into CSV format using overwrite mode
df_cust = spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer.csv",header=True,inferSchema=True,sep=",")

df_cust.write.csv(path="/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer_out_csv",header=True,mode="overwrite")
display(df_cust)

In [0]:
#6.2. Write usage data into CSV format using append mode
usage_df = spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage.csv", header=True, inferSchema=True, sep="\t")

usage_df.write.csv(path="/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_out_csv",header=True, mode="append")
display(usage_df)

In [0]:
#6.3. Write tower data into CSV format with header enabled and custom separator (|)
tower_final_df = spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower_all_data/tower_all_data_src/tower_all.csv",header = True, sep = '|')

tower_final_df.write.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower_all_data/tower_all_data_tgt/",header=True,mode='overwrite')

In [0]:
#6.4. Read the tower data in a dataframe and show only 5 rows.
df_tower_read = spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower_all_data/tower_all_data_tgt/",sep='|',header=True)
df_tower_read.show(5)

#6.5. Download the file into local from the catalog volume location and see the data of any of the above files opening in a notepad++


##7. Write Operations (Data Conversion/Schema migration)– JSON Format Usecases
1. Write customer data into JSON format using overwrite mode
2. Write usage data into JSON format using append mode and snappy compression format
3. Write tower data into JSON format using ignore mode and observe the behavior of this mode
4. Read the tower data in a dataframe and show only 5 rows.
5. Download the file into local harddisk from the catalog volume location and see the data of any of the above files opening in a notepad++.

In [0]:
#7.1. Write customer data into JSON format using overwrite mode
df_cust.write.json(path="/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer_out_json", mode="overwrite")
display(df_cust)

#7.2. Write usage data into JSON format using append mode and snappy compression format
usage_df.write.json(path="/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_out_json", mode="append", compression="snappy")
display(usage_df)

#7.3. Write tower data into JSON format using ignore mode and observe the behavior of this mode
tower_final_df.write.json(path="/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_out_json", mode="ignore")
display(tower_final_df)

#7.4. Read the tower data in a dataframe and show only 5 rows.
tower_df3 = spark.read.json("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_out_json")
display(tower_df3.limit(5))


##8. Write Operations (Data Conversion/Schema migration) – Parquet Format Usecases
1. Write customer data into Parquet format using overwrite mode and in a gzip format
2. Write usage data into Parquet format using error mode
3. Write tower data into Parquet format with gzip compression option
4. Read the usage data in a dataframe and show only 5 rows.
5. Download the file into local harddisk from the catalog volume location and see the data of any of the above files opening in a notepad++.

In [0]:
# 8.1. Write customer data into Parquet format using overwrite mode and in a gzip format
df_cust.write.parquet(path="/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer_out_json", mode="overwrite", compression="gzip")

#8.2. Write usage data into Parquet format using error mode
usage_df.write.parquet(path="/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_out_parquet", mode="error")

#8.3. Write tower data into Parquet format with gzip compression option
tower_final_df.write.parquet(path="/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_out_parquet", mode="overwrite",compression="gzip")

#8.4. Read the usage data in a dataframe and show only 5 rows.
tower_df3 = spark.read.json("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_out_json")
display(tower_df3.limit(5))

##9. Write Operations (Data Conversion/Schema migration) – Orc Format Usecases
1. Write customer data into ORC format using overwrite mode
2. Write usage data into ORC format using append mode
3. Write tower data into ORC format and see the output file structure
4. Read the usage data in a dataframe and show only 5 rows.
5. Download the file into local harddisk from the catalog volume location and see the data of any of the above files opening in a notepad++.

In [0]:
# 9.1. Write customer data into ORC format using overwrite mode
df_cust.write.orc(path="/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer_out_orc", mode="overwrite")

#9.2. Write usage data into ORC format using append mode
usage_df.write.orc(path="/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_out_orc", mode="append")

#9.3. Write tower data into ORC format and see the output file structure
tower_final_df.write.orc(path="/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_out_orc", mode="overwrite")
# List files in the output directory
dbutils.fs.ls("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_out_orc")

#9.4. Read the usage data in a dataframe and show only 5 rows.
tower_df3 = spark.read.json("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_out_json")
display(tower_df3.limit(5))

##10. Write Operations (Data Conversion/Schema migration) – Delta Format Usecases
1. Write customer data into Delta format using overwrite mode
2. Write usage data into Delta format using append mode
3. Write tower data into Delta format and see the output file structure
4. Read the usage data in a dataframe and show only 5 rows.
5. Download the file into local harddisk from the catalog volume location and see the data of any of the above files opening in a notepad++.
6. Compare the parquet location and delta location and try to understand what is the differentiating factor, as both are parquet files only.

In [0]:

# 10.1. Write Operations (Data Conversion/Schema migration) – Delta Format Usecases
df_cust.write.format("delta").save("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer_out_delta/", mode="overwrite")

# df_cust.write.format("delta").option("mergeSchema", "true").save("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer_out_delta/", mode="overwrite")
# In Databricks (and Delta Lake), the option mergeSchema = true is used when writing data to a Delta table and you want to automatically update the table’s schema if the incoming DataFrame has new columns that don’t exist in the table.
# 1. Basic Concept
# By default, if you try to write a DataFrame with columns not present in the Delta table, you’ll get an error.
# Setting mergeSchema = true allows automatic schema evolution, so new columns are added to the table without failing the write.

# 10.2. Write usage data into Delta format using append mode
usage_df.write.format("delta").save("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_out_delta", mode="append")

# 10.3. Write tower data into Delta format and see the output file structure
tower_final_df.write.format("delta").save("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_out_delta", mode="overwrite")
dbutils.fs.ls("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_out_delta")

# 10.4. Read the usage data in a dataframe and show only 5 rows.
usage_df_delta = spark.read.format("delta").load("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_out_delta")
display(usage_df_delta.limit(5))

# 10.6.Compare the parquet location and delta location and try to understand what is the differentiating factor, as both are parquet files only.
# both are parquet files, but delta has a _delta_log folder which contains the transaction logs and metadata for the table.
# The _delta_log folder contains information about the changes made to the table, including the version of the table, the timestamp of the change, and the details of the change itself.
# This information is used by Delta Lake to ensure data integrity and to recover from failures.
# The _delta_log folder also contains information about the schema of the table, including the names and data types of the columns.
# This information is used by Delta Lake to ensure that the schema of the table is consistent across all versions of the table


##11. Write Operations (Lakehouse Usecases) – Delta table Usecases
1. Write customer data using saveAsTable() as a managed table
2. Write usage data using saveAsTable() with overwrite mode
3. Drop the managed table and verify data removal
4. Go and check the table overview and realize it is in delta format in the Catalog.
5. Use spark.read.sql to write some simple queries on the above tables created.


In [0]:
# 11.1. Write customer data using saveAsTable() as a managed table
df_cust.write.saveAsTable("telecom_catalog_assign.landing_zone.customer_managed", mode="overwrite")
# df_cust.write.option("mergeSchema", "true").saveAsTable("telecom_catalog_assign.landing_zone.customer_managed", mode="overwrite")
# or
# df_cust.write.format("delta").mode("overwrite").saveAsTable("telecom_catalog_assign.landing_zone.customer_managed")

# 11.2. Write usage data using saveAsTable() with overwrite mode
usage_df.write.saveAsTable("telecom_catalog_assign.landing_zone.usage_managed", mode="overwrite")

# 11.3. Drop the managed table and verify data removal - using python
display(spark.sql("SHOW TABLES IN telecom_catalog_assign.landing_zone")) # display before dropping

spark.sql("DROP TABLE IF EXISTS telecom_catalog_assign.landing_zone.cust_other_del")

# Verify data removal by listing tables
display(spark.sql("SHOW TABLES IN telecom_catalog_assign.landing_zone")) # display after dropping


In [0]:
%sql
DROP TABLE telecom_catalog_assign.landing_zone.cust_other_del;
-- 11.3. Drop the managed table and verify data removal - using sql

In [0]:
display(spark.sql("SELECT * FROM telecom_catalog_assign.landing_zone.customer_managed").head(2))
spark.sql("SELECT * FROM telecom_catalog_assign.landing_zone.customer_managed").show(2)

##12. Write Operations (Lakehouse Usecases) – Delta table Usecases
1. Write customer data using insertInto() in a new table and find the behavior
2. Write usage data using insertTable() with overwrite mode

In [0]:
# 12.1 Write customer data using insertInto() in a new table and find the behavior
df_cust = spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer.csv", header=True, inferSchema=True,sep=",").toDF("customer_id", "name", "age", "city", "plan")
df_cust.write.saveAsTable("telecom_catalog_assign.landing_zone.customer_managed_other", mode="overwrite")
display(df_cust)
# df_cust.write.insertInto("telecom_catalog_assign.landing_zone.customer_managed", overwrite=True)]"/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer.csv").toDF(["customer_id", "name", "age", "city", "plan"])

delta_cust1 = spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer1/customer1.csv", inferSchema=True,sep=",").toDF("customer_id", "name", "age", "city", "plan")

display(delta_cust1)
delta_cust1.write.insertInto("telecom_catalog_assign.landing_zone.customer_managed_other", overwrite=True)

# delta_cust1.write.insertInto("telecom_catalog_assign.landing_zone.customer_managed_other", overwrite=True) - insertInto() defaults to append mode, insertInto() supports overwrite ONLY in Spark SQL, NOT reliably in the DataFrame API
#Schema should match for 2 file datasets

display(spark.sql("SELECT * FROM telecom_catalog_assign.landing_zone.customer_managed_other"))

# 12.2. Write usage data using insertTable() with overwrite mode
usage_df = spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage.csv", header=True, inferSchema=True, sep="\t")
usage_df.write.insertInto("telecom_catalog_assign.landing_zone.usage_managed", overwrite=True)
display(spark.sql("SELECT * FROM telecom_catalog_assign.landing_zone.usage_managed"))


In [0]:
#12.1. Write customer data using insertInto() in a new table and find the behavior

#12.2. Write usage data using insertTable() with overwrite mode
usage_df.write.insertTable("telecom_catalog_assign.landing_zone.usage_managed", mode="overwrite")




##13. Write Operations (Lakehouse Usecases) – Delta table Usecases
1. Write customer data into XML format using rowTag as cust
2. Write usage data into XML format using overwrite mode with the rowTag as usage
3. Download the xml data and open the file in notepad++ and see how the xml file looks like.

In [0]:
# 13.1 Write customer data into XML format using rowTag as cust
df_cust = spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer.csv", header=True, inferSchema=True).toDF("customer_id", "name", "age", "city", "plan")
display(df_cust)

df_cust.write.format("xml").mode("overwrite").option("rowTag", "cust").save("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer_out_xml")

# 13.2 Write usage data into XML format using overwrite mode with the rowTag as usage
usage_df = spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage.csv", header=True, inferSchema=True, sep="\t")
#actual data format used [space] as delimiter so to read the data as table give sep
display(usage_df)
usage_df.write.format("xml").mode("overwrite").option("rowTag", "usage").save("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_out_xml")


##14. Compare all the downloaded files (csv, json, orc, parquet, delta and xml) 
1. Capture the size occupied between all of these file formats and list the formats below based on the order of size from small to big.

###15. Try to do permutation and combination of performing Schema Migration & Data Conversion operations like...
1. Read any one of the above orc data in a dataframe and write it to dbfs in a parquet format
2. Read any one of the above parquet data in a dataframe and write it to dbfs in a delta format
3. Read any one of the above delta data in a dataframe and write it to dbfs in a xml format
4. Read any one of the above delta table in a dataframe and write it to dbfs in a json format
5. Read any one of the above delta table in a dataframe and write it to another table

##15. Do a final exercise of defining one/two liner of... 
1. When to use/benifits csv
2. When to use/benifits json
3. When to use/benifit orc
4. When to use/benifit parquet
5. When to use/benifit delta
6. When to use/benifit xml
7. When to use/benifit delta tables
