# DELTA LAKE INGESTION TECHINQUES

**OBJECTIVE:** Create a Delta table from the employees.csv file using various methods.

- CREATE TABLE AS (CTAS)
- UPLOAD IJI (User Interface)
- COPY INTO
- AUTOLOADER (Overview only, outside the scope of this study)


## CREATE TABLE (CTAS)
1. Create a table from the bmw_stock_1966_2024.csv file using the CREATE TABLE AS statement. Run the query and confirm that the table **bmw_stock_ctas** was successfully created.

In [0]:
%sql
-- drop table if it exists
drop table if exists bmw.bmw_stock_ctas;

-- create table using CTAS
create table bmw.bmw_stock_ctas as
SELECT
  Date,
  Adj_Close,
  Close,
  High,
  Low,
  Open,
  Volume
  FROM read_files (
'/Volumes/workspace/bmw/bwm_sales/bmw_stock_data_1996_2024.csv',
format => 'csv',
header => true,
inferSchema => true
  );

-- display available tables
show tables in bmw;

-- display table
select * from bmw.bmw_stock_ctas;


Date,Adj_Close,Close,High,Low,Open,Volume
1996-11-08,8.100290298,18.17099953,18.20999908,18.17099953,18.20999908,767000
1996-11-11,8.078445435,18.12199974,18.20000076,18.08200073,18.19000053,260000
1996-11-12,8.139519691,18.25900078,18.32799911,18.09199905,18.1609993,1066000
1996-11-13,8.126591682,18.22999954,18.34399986,18.19000053,18.34399986,793000
1996-11-14,8.152893066,18.28899956,18.28899956,18.13199997,18.20499992,351000
1996-11-15,8.219314575,18.43799973,18.44000053,18.2689991,18.30800056,624000
1996-11-18,8.231796265,18.4659996,18.58399963,18.36599922,18.37299919,624000
1996-11-19,8.26656723,18.54400063,18.54400063,18.31399918,18.48500061,442000
1996-11-20,8.13595295,18.25099945,18.60300064,18.24300003,18.48500061,546000
1996-11-21,8.143977165,18.2689991,18.30800056,18.19400024,18.30800056,286000


In [0]:
%sql
show tables in bmw;

database,tableName,isTemporary
bmw,bmw_sales,False
bmw,bmw_sales_py,False
bmw,bmw_stock_ctas,False


# UPLOAD UI METHOD
> This will no be part of the scope of the study because it requires a compute cluster to be running. For the study, it been only used the serveless option.

# COPY INTO METHOD
> This method consists in creating a table first, using sql dll statement, then using the COPY INTO method to load the data into the table.

In [0]:
%sql
-- Drop the table first, in the case it exists
drop table if exists bmw.bmw_stock_copyinto;

-- Create an empty table with the column data types
create table bmw.bmw_stock_copyinto (
  Date date,
  Adj_Close double,
  Close double,
  High double,
  Low double,
  Open double,
  Volume int
);

In [0]:
# Inserting data from csv file, using spark sql

spark.sql(f'''
          copy into bmw.bmw_stock_copyinto
          from '/Volumes/workspace/bmw/bmw'
          fileformat = csv
          format_options (
              'header' = 'true',
              'inferSchema' = 'true'
          )
          ''').display()

num_affected_rows,num_inserted_rows,num_skipped_corrupt_files
7215,7215,0


In [0]:
%sql
select * from bmw.bmw_stock_copyinto;

Date,Adj_Close,Close,High,Low,Open,Volume
1996-11-08,8.100290298,18.17099953,18.20999908,18.17099953,18.20999908,767000
1996-11-11,8.078445435,18.12199974,18.20000076,18.08200073,18.19000053,260000
1996-11-12,8.139519691,18.25900078,18.32799911,18.09199905,18.1609993,1066000
1996-11-13,8.126591682,18.22999954,18.34399986,18.19000053,18.34399986,793000
1996-11-14,8.152893066,18.28899956,18.28899956,18.13199997,18.20499992,351000
1996-11-15,8.219314575,18.43799973,18.44000053,18.2689991,18.30800056,624000
1996-11-18,8.231796265,18.4659996,18.58399963,18.36599922,18.37299919,624000
1996-11-19,8.26656723,18.54400063,18.54400063,18.31399918,18.48500061,442000
1996-11-20,8.13595295,18.25099945,18.60300064,18.24300003,18.48500061,546000
1996-11-21,8.143977165,18.2689991,18.30800056,18.19400024,18.30800056,286000


In [0]:
# Idempotent does no allow you to load the same file twice.

spark.sql(f'''
          copy into bmw.bmw_stock_copyinto
          from '/Volumes/workspace/bmw/bmw'
          fileformat = csv
          format_options (
              'header' = 'true',
              'inferSchema' = 'true'
          )
          ''').display()

num_affected_rows,num_inserted_rows,num_skipped_corrupt_files
0,0,0


In [0]:
# If you load a folder full of files, only the non-processed rows will be inserted, despite the amount of files in the folder.

spark.sql(f'''
          copy into bmw.bmw_stock_copyinto
          from '/Volumes/workspace/bmw/bmw'
          fileformat = csv
          format_options (
              'header' = 'true',
              'inferSchema' = 'true'
          )
          ''').display()

num_affected_rows,num_inserted_rows,num_skipped_corrupt_files
1,1,0


In [0]:
%sql
select count(1) from bmw.bmw_stock_copyinto;

count(1)
7216


In [0]:
%sql
describe history bmw.bmw_stock_copyinto;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
2,2025-10-25T18:55:15.000Z,4355037857660891,etunon@gmail.com,COPY INTO,Map(statsOnLoad -> true),,List(4130134441156579),1025-182503-nerrsnfu-v2n,1.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 1, numOutputBytes -> 1864, numSkippedCorruptFiles -> 0)",,Databricks-Runtime/17.2.x-aarch64-photon-scala2.13
1,2025-10-25T18:53:40.000Z,4355037857660891,etunon@gmail.com,COPY INTO,Map(statsOnLoad -> true),,List(4130134441156579),1025-182503-nerrsnfu-v2n,0.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 7215, numOutputBytes -> 186037, numSkippedCorruptFiles -> 0)",,Databricks-Runtime/17.2.x-aarch64-photon-scala2.13
0,2025-10-25T18:53:19.000Z,4355037857660891,etunon@gmail.com,CREATE TABLE,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.parquet.compression.codec"":""zstd"",""delta.enableDeletionVectors"":""true"",""delta.enableRowTracking"":""true"",""delta.rowTracking.materializedRowCommitVersionColumnName"":""_row-commit-version-col-bd850ff7-8956-4d57-9ac3-52f12b5c4d22"",""delta.rowTracking.materializedRowIdColumnName"":""_row-id-col-2fcbe489-2f75-4be8-90c1-4b5f1bc58d91""}, statsOnLoad -> false)",,List(4130134441156579),1025-182503-nerrsnfu-v2n,,WriteSerializable,True,Map(),,Databricks-Runtime/17.2.x-aarch64-photon-scala2.13
