### [User Input Required] Read CSV Source with Autoloader.

__Common data loading patterns__: https://docs.databricks.com/en/ingestion/auto-loader/patterns.html

__Autoloader Options for CSV__: https://docs.databricks.com/en/ingestion/auto-loader/options.html#csv-options

__Common Autoloader Options__: https://docs.databricks.com/en/ingestion/auto-loader/options.html#common-auto-loader-options

__Schema Evolution Modes__: https://docs.databricks.com/en/ingestion/auto-loader/schema.html#how-does-auto-loader-schema-evolution-work

In [0]:

from pipelines.shared_utils.autoloader_helper import generated_autoloader_schema_path

# [User Input Required] Set the ingest location.
ingest_location = "dbfs:/databricks-datasets/nyctaxi/tripdata/yellow"

# [User Input Required] Configure schema evolution and rescue data.
schema_evolution_mode = "addNewColumns"
rescue_data_column_name = "_rescued_data"


# Auto-Generate Schema Location Based on Ingest Location
autoloader_schema_location = generated_autoloader_schema_path(ingest_location)
print("Autoloader Schema Location: " +autoloader_schema_location)

# [User Input Required] Configure Autoloader settings
df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.schemaLocation", autoloader_schema_location) # Required
    # [Input Required] Common Autoloader Settings
    .option("cloudFiles.includeExistingFiles", "true")
    .option("cloudFiles.schemaEvolutionMode", "rescue")
    .option("cloudFiles.maxFilesPerTrigger", "1000")
    # [Input Required] Error Handling Settings
    .option("ignoreCorruptFiles", "false")
    # [Input Required] CSV Autoloader Settings
    .option("cloudFiles.format", "csv")
    .option("header", "true")
    .option("inferSchema","true")
    .load(ingest_location)
)

%md
### [User Input Required] Optional Transformations (SQL or Python)

In [0]:

df.createOrReplaceTempView("tmp_view")

In [0]:
%sql
SELECT *
FROM
tmp_view

### [User Input Required] Write Data to the Lake

In [0]:
from pipelines.shared_utils.autoloader_delta_writer import AutoloaderDeltaWriter
from pipelines.base_class.data_class import WriteMode, TriggerMode


# [User Input Required] Result DataFrame
output_df = _sqldf

# [User Input Required] Target Location and Write Configuration
write_mode = WriteMode.UC_EXTERNAL_TABLE
trigger_mode = TriggerMode.BATCH
datalake_location: str = ""
uc_namespace: str = ""

# [User Input Required]  Instantiate writer classs instance
csv_writer: AutoloaderDeltaWriter = AutoloaderDeltaWriter(
    df=output_df,
    write_mode=write_mode,
    trigger_mode=trigger_mode,
    data_lake_path=datalake_location,
    uc_namespace=uc_namespace,
)

# [User Input Required] Write data.
csv_writer.write_append()

# Other options include
# csv_writer.write_overwrite()
# csv_writer.write_scd_1()
# csv_writer.write_scd_2()