In [0]:
%run ../imports/imports

In [0]:
%run ../config/paths

In [0]:
%run ../utilities/functions

In [0]:
print(landing_path)
print(raw_path)
print(bronze_path)
print(kafka_config_path)
print(kafka_schema_registry_config_path)

In [0]:
class LandingStreamReader:

    ENVIRONMENT_ATTRIBUTES = ["landing_path", "raw_path", "bronze_path"]    

    KAFKA_CONFIG_ATTRIBUTES = ["kafka_config", "kafka_schema_registry_config"]

    DATASET_INGESTION_ATTRIBUTES = ["datasource", "dataset", "source_format", "source_options", "metadata_columns", "partition_column", "formatted_date_column_params"]

    KAFKA_INGESTION_ATTRIBUTES = ["kafka_key_schema", "kafka_value_schema", "kafka_flatten_value", "kafka_key_serializer", "kafka_value_serializer" ]

    ATTRIBUTES = [
        *ENVIRONMENT_ATTRIBUTES,
        *KAFKA_CONFIG_ATTRIBUTES,
        *DATASET_INGESTION_ATTRIBUTES,
        *KAFKA_INGESTION_ATTRIBUTES
    ]

    def __init__(self, builder):

        # Attributes setted via builder
        for attribute in LandingStreamReader.ATTRIBUTES:
            setattr(self, attribute, getattr(builder, attribute))
            
        # Calculated attributes
        self.dataset_landing_path = f'{self.landing_path}/{self.datasource}/{self.dataset}'
        self.dataset_bronze_schema_location = f'{self.bronze_path}/{self.datasource}/{self.dataset}_schema'
        
        # Create schema location directory
        dbutils.fs.mkdirs(self.dataset_bronze_schema_location)

    def __str__(self):
        return f"LandingStreamReader(datasource='{self.datasource}', dataset='{self.dataset}')"

    def read_cloudFiles(self):
        df = (spark.readStream
              .format("cloudFiles")
              .options(**self.source_options)
              .option("cloudFiles.schemaLocation", self.dataset_bronze_schema_location)
              .load(self.dataset_landing_path)
        )

        df = add_metadata_columns(
            df,
            self.source_format,
            self.landing_path,
            self.raw_path,
            self.metadata_columns
        )

        if self.partition_column and self.formatted_date_column_params:
            df = add_formatted_date_column(df,self.partition_column, **self.formatted_date_column_params)

        return df
    
    def read_kafka(self):
        df=(spark.readStream
            .format("kafka")
            .options(**self.kafka_config)
            .options(**self.source_options)
            .load()
        )

        columns = [F.col(column).alias(f'_{column}') for column in df.columns]
        
        df = df.select(*columns)

        df = deserialize_df(
            df = df,
            value_serializer=self.kafka_value_serializer,
            key_serializer=self.kafka_key_serializer,
            topic = self.dataset,
            schema_registry_config = self.kafka_schema_registry_config,
            flatten_value = self.kafka_flatten_value,
            value_schema = self.kafka_value_schema,
            key_schema = self.kafka_key_schema
        ) 

        df = add_metadata_columns(
            df,
            format = "kafka",
        )

        return df
    
    def read(self):
        if self.source_format == "cloudFiles":
            return self.read_cloudFiles()
        elif self.source_format == "kafka":
            return self.read_kafka()
        else:
            raise Exception(f"Format {self.source_format} not supported")

    class Builder:
        def __init__(self):
            for attribute in LandingStreamReader.ATTRIBUTES:
                setattr(self, attribute, None)

        @classmethod
        def _create_setters(cls):
            for attribute in LandingStreamReader.ATTRIBUTES:
                def setter(self, value, attribute=attribute):
                    setattr(self, attribute, value)
                    return self
                setattr(cls, f"set_{attribute}", setter)            

        def build(self):
            return LandingStreamReader(self)
        
LandingStreamReader.Builder._create_setters()

In [0]:
class BronzeStreamWriter:   

    ENVIRONMENT_ATTRIBUTES = ["landing_path", "raw_path", "bronze_path"] 

    BRONZE_CONFIG_ATTRIBUTES = ["bronze_table_format", "bronze_write_mode"]   

    DATASET_INGESTION_ATTRIBUTES = ["datasource", "dataset", "sink_options", "partition_column", "formatted_date_column_params"]

    TRIGGER_ATTRIBUTES = ["trigger_mode"]

    ATTRIBUTES = [
        *ENVIRONMENT_ATTRIBUTES,
        *BRONZE_CONFIG_ATTRIBUTES,
        *DATASET_INGESTION_ATTRIBUTES,
        *TRIGGER_ATTRIBUTES
    ]

    def __init__(self, builder):

        # Attributes setted via builder
        for attribute in BronzeStreamWriter.ATTRIBUTES:
            setattr(self, attribute, getattr(builder, attribute))

        # Calculated attributes
        self.dataset_landing_path = f"{self.landing_path}/{self.datasource}/{self.dataset}"
        self.dataset_raw_path = f"{self.raw_path}/{self.datasource}/{self.dataset}"
        self.dataset_bronze_path = f"{self.bronze_path}/{self.datasource}/{self.dataset}"
        self.dataset_checkpoint_location = f"{self.dataset_bronze_path}_checkpoint"
        self.table = f"hive_metastore.bronze.{self.datasource}__{self.dataset}"
        self.query_name = f"bronze-{self.datasource}-{self.dataset}"

        dbutils.fs.mkdirs(self.dataset_raw_path)
        dbutils.fs.mkdirs(self.dataset_bronze_path)
        dbutils.fs.mkdirs(self.dataset_checkpoint_location)

    def __str__(self):
        return f"BronzeStreamWriter(datasource='{self.datasource}', dataset='{self.dataset}')"
         
    def archive_raw_files(self, df):
        """
        Moves ingested raw files from landing to raw path after processing.
        """
        if "_ingested_filename" in df.columns:
            files = [row["_ingested_filename"] for row in df.select("_ingested_filename").distinct().collect()]
            for file in files:
                if file:
                    file_landing_path = file.replace(self.dataset_raw_path, self.dataset_landing_path)
                    dbutils.fs.mkdirs(file[0:file.rfind('/')+1])
                    dbutils.fs.mv(file_landing_path, file)
    
    def write_data(self, df):
        """
        Writes DataFrame to Delta table in bronze layer with schema merge and Delta Lake support.
        """
        spark.sql("CREATE DATABASE IF NOT EXISTS hive_metastore.bronze") 
        #spark.sql(f"CREATE TABLE IF NOT EXISTS {self.table} USING DELTA LOCATION '{self.dataset_bronze_path}'") 
        
        writer = (
            df.write
           .format(self.bronze_table_format)
           .mode(self.bronze_write_mode)
           .options(**self.sink_options)
        )

        if self.partition_column and not self.formatted_date_column_params:
            writer.partitionBy(self.partition_column)

        if self.partition_column and self.formatted_date_column_params:
            writer.partitionBy(self.formatted_date_column_params["output_col"])
        
        (writer
        .option("path", self.dataset_bronze_path)
        .saveAsTable(self.table)
        )
        
        
    def append_2_bronze(self, batch_df, batch_id):
        """
        Main entrypoint for Structured Streaming write logic.
        Persists, writes, archives, and unpersists the batch DataFrame.
        """
        batch_df.persist()
        self.write_data(batch_df)
        self.archive_raw_files(batch_df)
        batch_df.unpersist()

    class Builder:
        def __init__(self):
            for attribute in BronzeStreamWriter.ATTRIBUTES:
                setattr(self, attribute, None)

        @classmethod
        def _create_setters(cls):
            for attribute in BronzeStreamWriter.ATTRIBUTES:
                def setter(self, value, attribute=attribute):
                    setattr(self, attribute, value)
                    return self
                setattr(cls, f"set_{attribute}", setter)            

        def build(self):
            return BronzeStreamWriter(self)
        
BronzeStreamWriter.Builder._create_setters()

In [0]:
def build_landing_stream_reader_from_json(
    json_config_path: str,
    landing_path: str = None,
    raw_path: str = None,
    bronze_path: str = None,
    kafka_config_path: str = None,
    kafka_schema_registry_config_path: str =None
    ) -> LandingStreamReader: 

    VALID_INGESTION_ATTRIBUTES = [
        *LandingStreamReader.DATASET_INGESTION_ATTRIBUTES,
        *LandingStreamReader.KAFKA_INGESTION_ATTRIBUTES
    
    ]

    reader = LandingStreamReader.Builder()

    reader = (reader
              .set_landing_path(landing_path)
              .set_raw_path(raw_path)
              .set_bronze_path(bronze_path)
            )

    if kafka_config_path:
        kafka_config = get_kafka_config(read_config_file(kafka_config_path))
        reader.set_kafka_config(kafka_config)

    if kafka_schema_registry_config_path:
        kafka_schema_registry_config = ( 
            get_schema_registry_config(read_config_file(kafka_schema_registry_config_path)) 
        )
        reader.set_kafka_schema_registry_config(kafka_schema_registry_config)

    with open(json_config_path) as json_file:
        config = json.load(json_file)

    for key, value in config.items():
        if key in VALID_INGESTION_ATTRIBUTES:
            setter_name = f"set_{key}"
            setter = getattr(reader, setter_name)  
            setter(value)  

    return reader.build()

In [0]:
def build_bronze_stream_writer_from_json(
    bronze_config_path: str,
    json_config_path: str,
    landing_path: str = None,
    raw_path: str = None,
    bronze_path: str = None,
    ) -> BronzeStreamWriter: 

    VALID_INGESTION_ATTRIBUTES = [
        *BronzeStreamWriter.DATASET_INGESTION_ATTRIBUTES,
        *BronzeStreamWriter.TRIGGER_ATTRIBUTES
    
    ]

    VALID_BRONZE_ATTRIBUTES = BronzeStreamWriter.BRONZE_CONFIG_ATTRIBUTES

    writer = BronzeStreamWriter.Builder()

    writer = (writer
              .set_landing_path(landing_path)
              .set_raw_path(raw_path)
              .set_bronze_path(bronze_path)
            )
    
    with open(bronze_config_path) as bronze_config_file:
        bronze_config = json.load(bronze_config_file)

    for key, value in bronze_config.items():
        if key in VALID_BRONZE_ATTRIBUTES:
            setter_name = f"set_{key}"
            setter = getattr(writer, setter_name)  
            setter(value) 

    with open(json_config_path) as json_file:
        config = json.load(json_file)

    for key, value in config.items():
        if key in VALID_INGESTION_ATTRIBUTES:
            setter_name = f"set_{key}"
            setter = getattr(writer, setter_name)  
            setter(value)  

    return writer.build()

In [0]:
def start_ingestion_given_json(
    bronze_config_path: str,
    json_config_path: str,
    landing_path: str = None,
    raw_path: str = None,
    bronze_path: str = None,
    kafka_config_path: str = None,
    kafka_schema_registry_config_path: str =None
    ) -> None:

    reader = build_landing_stream_reader_from_json(
        json_config_path = json_config_path,
        landing_path = landing_path,
        raw_path = raw_path,
        bronze_path = bronze_path,
        kafka_config_path = kafka_config_path,
        kafka_schema_registry_config_path = kafka_schema_registry_config_path
    )

    writer = build_bronze_stream_writer_from_json(
        bronze_config_path = bronze_config_path,
        json_config_path = json_config_path,
        landing_path = landing_path,
        raw_path = raw_path,
        bronze_path = bronze_path,
    )

    (reader
        .read()
        .writeStream
        .foreachBatch(writer.append_2_bronze)
        .trigger(**writer.trigger_mode)
        .option("checkpointLocation", writer.dataset_checkpoint_location)
        .queryName(writer.query_name)
        .start()
    )

In [0]:
start_ingestion_given_json(
    bronze_config_path = bronze_config_path,
    json_config_path = "../config/datasets/batch/retail_sales_order_v2.json",
    landing_path = landing_path,
    raw_path = raw_path,
    bronze_path = bronze_path
)

In [0]:
start_ingestion_given_json(
    bronze_config_path = bronze_config_path,
    json_config_path = "../config/datasets/batch/orders_v3.json",
    landing_path = landing_path,
    raw_path = raw_path,
    bronze_path = bronze_path,
    kafka_config_path  = kafka_config_path,
    kafka_schema_registry_config_path = kafka_schema_registry_config_path
)

In [0]:
test_sales_orders_writer = build_bronze_stream_writer_from_json(
    json_config_path = "../config/datasets/batch/retail_sales_order_v2.json",
    landing_path = landing_path,
    raw_path = raw_path,
    bronze_path = bronze_path,
    kafka_config_path  = kafka_config_path,
    kafka_schema_registry_config_path = kafka_schema_registry_config_path
)

print(test_sales_orders_writer)

test_sales_orders_writer.sink_options

In [0]:
test_sales_orders = build_landing_stream_reader_from_json(
    json_config_path = "../config/datasets/batch/retail_sales_order_v2.json",
    landing_path = landing_path,
    raw_path = raw_path,
    bronze_path = bronze_path
)

print(test_sales_orders)

In [0]:
test_orders_v3 = build_landing_stream_reader_from_json(
    json_config_path = "../config/datasets/batch/orders_v3.json",
    landing_path = landing_path,
    raw_path = raw_path,
    bronze_path = bronze_path,
    kafka_config_path  = kafka_config_path,
    kafka_schema_registry_config_path = kafka_schema_registry_config_path
)

print(test_orders_v3)

In [0]:
display(test_orders_v3.read())

In [0]:

  datasource = "retail"
  dataset =  "sales_orders"
  source_format  = "cloudFiles"
  source_options =  {
                      "cloudFiles.format": "json",
                      "cloudFiles.inferColumnTypes": "true",
                      "cloudFiles.schemaEvolutionMode": "addNewColumns"
                    }
  metadata_columns =  "default"
  partition_column = "order_datetime"
  formatted_date_column_params=  {
                                    "output_col": "order_yyyymm",
                                    "input_type": "unix",
                                    "input_format": None,
                                    "output_format": "yyyy-MM"
                                  }
  sink_layer =  "bronze"
  sink_options = {
                    "mergeSchema": "true"
                  }



In [0]:
datasource = "pizzerie"
dataset = "orders_v2"
source_format = "kafka"
source_options = {
    "subscribe": "orders_v2",
    "includeHeaders": "true",
    "startingOffsets": "earliest"
}
metadata_columns = "default"
partition_column = "key"
kafka_flatten_value = "True"
kafka_key_serializer = "str"
kafka_value_serializer = "avro"
sink_layer = "bronze"
sink_options = {
    "mergeSchema": "true"
}

kafka_config = get_kafka_config(read_config_file('../config/kafka/client.properties'))
kafka_schema_registry_config = get_schema_registry_config(read_config_file('../config/kafka/schema_registry.properties'))

In [0]:
reader_orders_v2 = (
    LandingStreamReader.Builder()          
    .set_landing_path(landing_path)
    .set_raw_path(raw_path)
    .set_bronze_path(bronze_path)
    .set_datasource(datasource)
    .set_dataset(dataset)
    .set_source_format(source_format)
    .set_source_options(source_options)
    .set_metadata_columns(metadata_columns)
    .set_partition_column(partition_column)
    .set_kafka_flatten_value(kafka_flatten_value)
    .set_kafka_key_serializer(kafka_key_serializer)
    .set_kafka_value_serializer(kafka_value_serializer)
    .set_kafka_config(kafka_config)
    .set_kafka_schema_registry_config(kafka_schema_registry_config)
    .build()
)

In [0]:
reader_orders_v2.source_format

In [0]:
reader_orders_v2.source_options

In [0]:
df = reader_orders_v2.read()

In [0]:
display(df)

In [0]:
json_path = '../config/datasets/batch/orders.json'

with open(json_path) as f:
    config = json.load(f)

config.get('source').get('options').get('cloudFiles.format')

bronze_config_path = '../config/bronze.json'
with open(bronze_config_path) as f:
    bronze_config = json.load(f)

In [0]:
#format = "jpg"
#datasource = 'tensorflow'
#dataset = "flower_photos"

bronze_format = bronze_config.get('format')
bronze_mode = bronze_config.get('mode')

print(bronze_format)
print(bronze_mode)

#format = config.get('source').get('options').get('cloudFiles.format')
format = config.get('source').get('format')
datasource = config.get('datasource')
dataset = config.get('dataset')
options = config.get('source').get('options')
metadata_columns = config.get('metadata')
kafka_config = get_kafka_config(read_config_file('../config/kafka/client.properties'))
schema_registry_config = get_schema_registry_config(read_config_file('../config/kafka/schema_registry.properties'))
flatten_value = config.get('flatten_value')
value_serializer = config.get("value_serializer")
key_serializer = config.get("key_serializer")
kakfa_value_schema = config.get("kafka_value_schema")
partitionColumn = config.get("partition").get("column")
#formatted_date_column_params = config.get("partition").get("formatted_date_column_params")

print(format)
print(datasource)
print(dataset)
print(options)
print(metadata_columns)
print(kafka_config)
print(flatten_value)
print(partitionColumn)
print(kakfa_value_schema)
#print(formatted_date_column_params)
print(schema_registry_config)
dataset_landing_path = f"{landing_path}/{datasource}/{dataset}"
dataset_raw_path =  f"{raw_path}/{datasource}/{dataset}"
dataset_bronze_path = f"{bronze_path}/{datasource}/{dataset}"

print(dataset_landing_path)
print(dataset_raw_path)
print(dataset_bronze_path)

In [0]:
reader = (LandingStreamReader.Builder()          
  .set_datasource(datasource)
  .set_dataset(dataset)
  #.set_landing_path(landing_path)
  #.set_raw_path(raw_path)
  .set_bronze_path(bronze_path)
  .set_format(format)
  .set_options(options)
  #.set_metadata_columns(metadata_columns)
  .set_partitionColumn(partitionColumn)
  #.set_formatted_date_column_params(formatted_date_column_params)
  .set_kafka_config(kafka_config)
  .set_schema_registry_config(schema_registry_config)
  .set_flatten_value(flatten_value)
  .set_value_serializer(value_serializer)
  .set_key_serializer(key_serializer)
  .set_kafka_value_schema(kakfa_value_schema)
  .build()
)

print(reader)   

In [0]:
reader.value_serializer

In [0]:
df = reader.read_kafka()


In [0]:
 display(df)

In [0]:
reader.formatted_date_column_params

In [0]:
print(reader.options)

In [0]:
writer = (BronzeStreamWriter.Builder()
  .set_datasource(datasource)
  .set_dataset(dataset)
  .set_landing_path(landing_path)
  .set_raw_path(raw_path)
  .set_bronze_path(bronze_path)
  .set_bronze_table_format(bronze_format)
  .set_bronze_write_mode(bronze_mode)
  .set_partitionColumn(partitionColumn)
  .build()
)

print(writer)

In [0]:
writer.formatted_date_column_params

In [0]:
(reader
  .read_kafka()
  .writeStream
  .foreachBatch(writer.append_2_bronze)
  .trigger(availableNow=True)
  #.trigger(processingTime="60 seconds") # modo continuo
  .option("checkpointLocation", writer.dataset_checkpoint_location)
  .queryName(writer.query_name)
  .start()
)

In [0]:
query = f"""
select * 
from delta.`{writer.dataset_bronze_path}`
order by _ingested_at desc
limit 100
"""
display(spark.sql(query))

In [0]:
query = f"""
select distinct _ingested_filename 
from delta.`{writer.dataset_bronze_path}`
"""
display(spark.sql(query))