In [0]:
spark.sql("CREATE CATALOG IF NOT EXISTS telecom_catalog_assign")

In [0]:
# Create schema and volume
spark.sql("CREATE SCHEMA IF NOT EXISTS telecom_catalog_assign.landing_zone")
spark.sql("CREATE VOLUME IF NOT EXISTS telecom_catalog_assign.landing_zone.landing_vol")

# Create required folders in the volume
dbutils.fs.mkdirs("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/")
dbutils.fs.mkdirs("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/")
dbutils.fs.mkdirs("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/")


In [0]:
import pandas as pd
from io import StringIO

customer_csv = '''101,Arun,31,Chennai,PREPAID
102,Meera,45,Bangalore,POSTPAID
103,Irfan,29,Hyderabad,PREPAID
104,Raj,52,Mumbai,POSTPAID
105,,27,Delhi,PREPAID
106,Sneha,abc,Pune,PREPAID'''

usage_tsv = '''customer_id\tvoice_mins\tdata_mb\tsms_count
101\t320\t1500\t20
102\t120\t4000\t5
103\t540\t600\t52
104\t45\t200\t2
105\t0\t0\t0'''

tower_logs_region1 = '''event_id|customer_id|tower_id|signal_strength|timestamp
5001|101|TWR01|-80|2025-01-10 10:21:54
5004|104|TWR05|-75|2025-01-10 11:01:12'''

customer_df = pd.read_csv(StringIO(customer_csv), header=None, names=["customer_id", "name", "age", "city", "plan"])
usage_df = pd.read_csv(StringIO(usage_tsv), sep="\t")
tower_df = pd.read_csv(StringIO(tower_logs_region1), sep="|")

customer_spark_df = spark.createDataFrame(customer_df)
usage_spark_df = spark.createDataFrame(usage_df)
tower_spark_df = spark.createDataFrame(tower_df)

display(customer_spark_df)
display(usage_spark_df)
display(tower_spark_df)

In [0]:
from pyspark.sql import Row

# Example DataFrames
customer_spark_df = spark.createDataFrame([
    Row(customer_id=1, name="Alice"),
    Row(customer_id=2, name="Bob")
])

usage_spark_df = spark.createDataFrame([
    Row(customer_id=1, usage=100),
    Row(customer_id=2, usage=150)
])

tower_spark_df = spark.createDataFrame([
    Row(tower_id=101, region="region1"),
    Row(tower_id=102, region="region1")
])

# Write DataFrames to the respective volume folders as CSV
customer_spark_df.write.mode("overwrite").option("header", True).csv(
    "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/"
)
usage_spark_df.write.mode("overwrite").option("header", True).csv(
    "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/"
)
tower_spark_df.write.mode("overwrite").option("header", True).csv(
    "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/region1/"
)

# Create empty region2 folder for tower logs
dbutils.fs.mkdirs("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/region2/")

# Validate files were successfully copied
display(dbutils.fs.ls("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/"))
display(dbutils.fs.ls("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/"))
display(dbutils.fs.ls("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/region1/"))
display(dbutils.fs.ls("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/region2/"))

In [0]:
# 1. Using pathGlobFilter
tower_logs_glob_df = spark.read.option("header", True).option("pathGlobFilter", "*.csv").csv(
    "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/region1/"
)
display(tower_logs_glob_df)

# 2. Using list of paths
region1_path = "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/region1/"
region2_path = "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/region2/"
tower_logs_multi_df = spark.read.option("header", True).csv([region1_path, region2_path])
display(tower_logs_multi_df)

# 3. Using recursiveFileLookup
tower_logs_recursive_df = spark.read.option("header", True).option("recursiveFileLookup", "true").csv(
    "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/"
)
display(tower_logs_recursive_df)

In [0]:

customer_noheader_noschema_df = spark.read.format("csv") \
    .option("header", False) \
    .option("inferSchema", False) \
    .load("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/")
display(customer_noheader_noschema_df)

customer_header_schema_df = spark.read.format("csv") \
    .option("header", True) \
    .option("inferSchema", True) \
    .load("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/")
display(customer_header_schema_df)

usage_noheader_noschema_df = spark.read.format("csv") \
    .option("header", False) \
    .option("inferSchema", False) \
    .load("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/")
display(usage_noheader_noschema_df)

usage_header_schema_df = spark.read.format("csv") \
    .option("header", True) \
    .option("inferSchema", True) \
    .load("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/")
display(usage_header_schema_df)


In [0]:
print(customer_spark_df.columns)
customer_columns = customer_spark_df.columns  
customer_named_df = customer_spark_df.toDF(*customer_columns)
display(customer_named_df)

In [0]:
# 1. Apply column names using string with toDF for customer data
if len(customer_spark_df.columns) == 5:
    customer_named_df = customer_spark_df.toDF(
        "customer_id",
        "name",
        "age",
        "city",
        "plan"
    )
    display(customer_named_df)
else:
    display(customer_spark_df)

# 2. Apply column names and datatype using schema function for usage data
if len(usage_spark_df.columns) == 4:
    usage_schema = "customer_id INT, voice_mins INT, data_mb FLOAT, sms_count INT"
    usage_schema_df = spark.createDataFrame(
        usage_spark_df,
        schema=usage_schema
    )
    display(usage_schema_df)
else:
    display(usage_spark_df)

# 3. Apply column names and datatype using StructType for towers data
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, TimestampType

tower_schema = StructType([
    StructField("event_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("tower_id", StringType(), True),
    StructField("signal_strength", IntegerType(), True),
    StructField("timestamp", TimestampType(), True)
])
if len(tower_spark_df.columns) == 5:
    tower_struct_df = spark.createDataFrame(
        tower_spark_df,
        schema=tower_schema
    )
    display(tower_struct_df)
else:
    display(tower_spark_df)