In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

cdr_schema = StructType([
    StructField("call_id", StringType(), True),
    StructField("caller_number", StringType(), True),
    StructField("receiver_number", StringType(), True),
    StructField("call_start_time", TimestampType(), True),
    StructField("call_end_time", TimestampType(), True),
    StructField("call_duration_sec", IntegerType(), True),
    StructField("call_type", StringType(), True),
    StructField("cell_tower_id", StringType(), True),
    StructField("location", StringType(), True),
    StructField("call_status", StringType(), True),
    StructField("data_usage_mb", DoubleType(), True),
    StructField("call_date", DateType(), True),
    StructField("hour", IntegerType(), True),
])

# Define source path
raw_path = "abfss://hourlycdr@covid19storagedb.dfs.core.windows.net/cdr_data"

# Define checkpoint location
checkpoint_path = "/mnt/checkpoints/bronze_cdr"

# Read streaming data using Auto Loader
df_stream = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .schema(cdr_schema)
        .option("cloudFiles.schemaLocation", checkpoint_path)
        .load(raw_path)
)

df_stream = df_stream.writeStream \
    .format("delta") \
    .option("checkpointLocation", checkpoint_path) \
    .option("mergeSchema",True) \
    .trigger(once=True) \
    .table("telecom_ete.bronze.cdr_raw")


In [0]:
spark.table("telecom_ete.bronze.cdr_raw").count()

30003