# stream customers data from clous files to delta lake

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, TimestampType
 
customers_schema = StructType(fields=[StructField("customer_id", IntegerType()),
                                     StructField("customer_name", StringType()),
                                     StructField("date_of_birth", DateType()),
                                     StructField("telephone", StringType()),
                                     StructField("email", StringType()),
                                     StructField("member_since", DateType()),
                                     StructField("created_timestamp", TimestampType())
                                    ]
                              )
 

In [0]:
customer_df = (
    spark.readStream
    .format("json")
    .schema(customers_schema)
    .load('/Volumes/serio_cat/default/order_vol/data/')
)

In [0]:
from pyspark.sql.functions import current_timestamp, col
customer_transformed_df = (
    customer_df
    .withColumn('file_path', col('_metadata.file_path'))
    .withColumn("ingestion_date", current_timestamp())
)

write transform data stream to delta table

In [0]:
streaming_query = (
  customer_transformed_df.writeStream
  .format('delta')
  .option("checkpointLocation", "/Volumes/serio_cat/default/order_vol/data/_checkpoint_new")
  .trigger(once=True)
  .toTable('serio_cat.serio_schema.data')
)

In [0]:
streaming_query.stop()

In [0]:
%sql
select * from serio_cat.serio_schema.data

read files using auto loader

In [0]:
customers_df = (
  spark.readStream
  .format('cloudFiles') # this is autoloader
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "/Volumes/serio_cat/default/order_vol/data/customer_autoloader/_schema")
  .load('/Volumes/serio_cat/default/order_vol/data/customer_autoloader')
)

In [0]:
customers_df = (
  spark.readStream
  .format('cloudFiles') # this is autoloader
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "/Volumes/serio_cat/default/order_vol/data/customer_autoloader/_schema")
  .option("cloudFiles.inferColumnTypes", 'true')
  .load('/Volumes/serio_cat/default/order_vol/data/customer_autoloader')
)

In [0]:
customers_df = (
  spark.readStream
  .format('cloudFiles') # this is autoloader
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "/Volumes/serio_cat/default/order_vol/data/customer_autoloader/_schema")
  .option("cloudFiles.inferColumnTypes", 'true')
  .option('cloudFiles.schemaHints','date_of_birth DATE, member_since DATE, created_timestamp TIMESTAMP')
  .load('/Volumes/serio_cat/default/order_vol/data/customer_autoloader')
)

transform date

In [0]:
from pyspark.sql.functions import current_timestamp, col

customer_transformed_df = (
    customers_df.withColumn('file_path', col('_metadata.file_path'))
                .withColumn('ingestion_data', current_timestamp())
)

write stream to delta table

In [0]:
streaming_query=(
  customer_transformed_df.writeStream
  .format('delta')
  .option('checkpointLocation', '/Volumes/serio_cat/default/order_vol/data/customer_autoloader/_checkpoint')
  .trigger(once=True)
  .toTable('serio_cat.serio_schema.customer_autoloader')
)

In [0]:
streaming_query.stop()

In [0]:
%sql
select * from serio_cat.serio_schema.customer_autoloader