#Telecom Domain ReadOps Assignment
This notebook contains assignments to practice Spark read options and Databricks volumes. <br>
Sections: Sample data creation, Catalog & Volume creation, Copying data into Volumes, Path glob/recursive reads, toDF() column renaming variants, inferSchema/header/separator experiments, and exercises.<br>

![](https://fplogoimages.withfloats.com/actual/68009c3a43430aff8a30419d.png)
![](https://theciotimes.com/wp-content/uploads/2021/03/TELECOM1.jpg)

##First Import all required libraries & Create spark session object

In [0]:
from pyspark.sql.session import SparkSession
spark=SparkSession.builder.appName("Spark").getOrCreate()

##1. Write SQL statements to create:
1. A catalog named telecom_catalog_assign
2. A schema landing_zone
3. A volume landing_vol
4. Using dbutils.fs.mkdirs, create folders:<br>
/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/
/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/
/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/
5. Explain the difference between (Just google and understand why we are going for volume concept for prod ready systems):<br>
a. Volume vs DBFS/FileStore<br>
b. Why production teams prefer Volumes for regulated data<br>

In [0]:
%sql
create catalog if not exists telecom_catalog_assign;
create schema if not exists telecom_catalog_assign.landing_zone;
create volume if not exists telecom_catalog_assign.landing_zone.landing_vol;




In [0]:
dbutils.fs.mkdirs("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/")
dbutils.fs.mkdirs("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/")
dbutils.fs.mkdirs("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/")

##Data files to use in this usecase:
customer_csv = '''
101,Arun,31,Chennai,PREPAID
102,Meera,45,Bangalore,POSTPAID
103,Irfan,29,Hyderabad,PREPAID
104,Raj,52,Mumbai,POSTPAID
105,,27,Delhi,PREPAID
106,Sneha,abc,Pune,PREPAID
'''

usage_tsv = '''customer_id\tvoice_mins\tdata_mb\tsms_count
101\t320\t1500\t20
102\t120\t4000\t5
103\t540\t600\t52
104\t45\t200\t2
105\t0\t0\t0
'''

tower_logs_region1 = '''event_id|customer_id|tower_id|signal_strength|timestamp
5001|101|TWR01|-80|2025-01-10 10:21:54
5004|104|TWR05|-75|2025-01-10 11:01:12
'''

##2. Filesystem operations
1. Write code to copy the above datasets into your created Volume folders:
Customer → /Volumes/.../customer/
Usage → /Volumes/.../usage/
Tower (region-based) → /Volumes/.../tower/region1/ and /Volumes/.../tower/region2/

2. Write a command to validate whether files were successfully copied

In [0]:
customer_csv = ''' 101,Arun,31,Chennai,PREPAID 102,Meera,45,Bangalore,POSTPAID 103,Irfan,29,Hyderabad,PREPAID 104,Raj,52,Mumbai,POSTPAID 105,,27,Delhi,PREPAID 106,Sneha,abc,Pune,PREPAID '''

usage_tsv = '''customer_id\tvoice_mins\tdata_mb\tsms_count 101\t320\t1500\t20 102\t120\t4000\t5 103\t540\t600\t52 104\t45\t200\t2 105\t0\t0\t0 '''
tower_logs_region1 = '''event_id|customer_id|tower_id|signal_strength|timestamp 5001|101|TWR01|-80|2025-01-10 10:21:54 5004|104|TWR05|-75|2025-01-10 11:01:12 '''

customer_path = "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer.csv"
usage_path = "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage.tsv"
tower_logs_path = "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_logs_region1.csv"

dbutils.fs.put(customer_path, customer_csv, overwrite=True)
dbutils.fs.put(usage_path, usage_tsv, overwrite=True)
dbutils.fs.put(tower_logs_path, tower_logs_region1, overwrite=True)

In [0]:
dbutils.fs.ls("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer")
dbutils.fs.ls("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage")
dbutils.fs.ls("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower")

##3. Directory Read Use Cases
1. Read all tower logs using:
Path glob filter (example: *.csv)
Multiple paths input
Recursive lookup

2. Demonstrate these 3 reads separately:
Using pathGlobFilter
Using list of paths in spark.read.csv([path1, path2])
Using .option("recursiveFileLookup","true")

3. Compare the outputs and understand when each should be used.

In [0]:
tower_logs_path = "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/"
df_glob = (
    spark.read
    .option("header", "true")
    .option("delimiter", "|")
    .option("pathGlobFilter", "*.csv")
    .csv(tower_logs_path)
)
display(df_glob)

In [0]:
tower_logs_path = "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/"
df_recursive = (
    spark.read
    .option("header", "true")
    .option("delimiter", "|")
    .option("recursiveFileLookup", "true")
    .csv(tower_logs_path)
)

display(df_recursive)

In [0]:
paths = [
    "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_logs_region1.csv",
    "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_logs_region2.csv"
]

df_multiple_paths = (
    spark.read
    .option("header", "true")
    .option("delimiter", "|")
    .csv(paths)
)

display(df_multiple_paths)

##4. Schema Inference, Header, and Separator
1. Try the Customer, Usage files with the option and options using read.csv and format function:<br>
header=false, inferSchema=false<br>
or<br>
header=true, inferSchema=true<br>
2. Write a note on What changed when we use header or inferSchema  with true/false?<br>
3. How schema inference handled “abc” in age?<br>

In [0]:
##Try the Customer, Usage files with the option and options using read.csv and format function:
#header=false, inferSchema=falseorheader=true, inferSchema=true
customer_path="/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer.csv"

df_customer=spark.read.csv(customer_path,header=True,sep=",",inferSchema=True)
print(df_customer.printSchema)

display(df_customer)
#df_customer=spark.read.format("csv")
#df_usage=spark.read.csv(usage_path,header=True,inferSchema=True)

**When we use the inferschema=false then Spark does NOT try to detect data types and ALL columns are read as StringType**
_df_customer:pyspark.sql.connect.dataframe.DataFrame
id:string
name:string
age:string
city:string
type:string

**when we use the inferschema=True then the data types are assigned according the original source file**
_df_customer:pyspark.sql.connect.dataframe.DataFrame
id:integer
name:string
age:string
city:string
type:string_

**In case of age:**
with schema inference it age datatype is considered as string and displayed the value abc


In [0]:
usage_path="/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage.tsv"
df_usage = (
    spark.read
    .options(
        header="true",
        sep="\t",
        inferSchema="true"
    )
    .csv(usage_path)
)

df_usage.printSchema()
display(df_usage)


df_customer=spark.read.format("csv").option("header","true").option("sep",",").option("inferSchema","true").load(customer_path)
print(df_customer.printSchema)
display(df_customer)

##5. Column Renaming Usecases
1. Apply column names using string using toDF function for customer data
2. Apply column names and datatype using the schema function for usage data
3. Apply column names and datatype using the StructType with IntegerType, StringType, TimestampType and other classes for towers data 

In [0]:
cols = "customer_id,customer_name,customer_age,customer_city,customer_type"

df_customer_named = df_customer.toDF(*cols.split(","))

df_customer_named.printSchema()
display(df_customer_named)

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType
usage_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("voice_mins", IntegerType(), True),
    StructField("data_mb", IntegerType(), True),
    StructField("sms_count", IntegerType(), True)
])
usage_path = "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage.tsv"

df_raw = (
    spark.read
    .options(
        header="true",
        sep="\t"
    )
    .csv(usage_path)
)

df_raw.printSchema()
display(df_raw)

df_usage = (
    spark.read
    .schema(usage_schema)
    .options(
        header="true",
        sep="\t"
    )
    .csv(usage_path)
)

df_usage.printSchema()
display(df_usage)

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
tower_schema = StructType([
    StructField("event_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("tower_id", StringType(), True),
    StructField("signal_strength", IntegerType(), True),
    StructField("timestamp", TimestampType(), True)
])
tower_path = "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_logs_region1.csv"

df_tower = (
    spark.read
    .schema(tower_schema)
    .options(
        header="true",
        sep="|"
    )
    .csv(tower_path)
)

df_tower.printSchema()
display(df_tower)

## 6. More to come (stay motivated)....