# Ingesting Data into Delta Lake

## Course: Getting Started with Databricks for Data Engineering
https://customer-academy.databricks.com/learn/courses/2469/get-started-with-databricks-for-data-engineering?hash=37c86bbf5e6d8b76aa4f54f1fa988c33404292ed&generated_by=1212990

## Connect to Catalog and Schema

In [0]:
%run ./workspaceSetup

In [0]:
spark.sql(f"LIST '/Volumes/fundamentals/default/myfiles'").display()

## Delta Lake Ingestion
- Uses CTAS (Create Table As)
- UPLOAD UI (Directly to the volume using drag and drop)
- COPY INTO (Incremental loading using COPY INTO method)
- AUTOLOAD (*NOT COVERED HERE)

In [0]:
%sql

-- Drop table if exists
DROP TABLE IF EXISTS current_employees;

-- Create table using CTAS
CREATE TABLE current_employees
AS
SELECT ID, FirstName, Country, Role
FROM read_files(
  '/Volumes/fundamentals/default/myfiles',
  format => 'csv',
  header => true,
  inferSchema => true
);

-- Display table in the schema
SHOW TABLES;

## Create Table using COPY INTO

In [0]:
%sql

-- Drop table if exists
DROP TABLE IF EXISTS current_employees_copyinto;

-- Create table using CTAS
CREATE TABLE current_employees_copyinto (
  ID INT,
  FirstName STRING,
  Country STRING,
  Role STRING
);

In [0]:
spark.sql(f'''
          COPY INTO current_employees_copyinto
          FROM '/Volumes/fundamentals/default/myfiles/'
          FILEFORMAT = CSV
          FORMAT_OPTIONS (
              'header' = 'true',
              'inferSchema' = 'true'
              )                 
          ''').display()

In [0]:
%sql

SELECT *
FROM current_employees_copyinto;

Now lets add additional csv "employees2.csv" to the volume and run COPY INTO

In [0]:
spark.sql(f'''
          COPY INTO current_employees_copyinto
          FROM '/Volumes/fundamentals/default/myfiles/'
          FILEFORMAT = CSV
          FORMAT_OPTIONS (
              'header' = 'true',
              'inferSchema' = 'true'
              )                 
          ''').display()


In [0]:
%sql

SELECT *
FROM current_employees_copyinto;

In [0]:
%sql
DESCRIBE HISTORY current_employees_copyinto

## Cleanup

In [0]:
%sql

DROP TABLE IF EXISTS current_employees_copyinto;
DROP TABLE IF EXISTS current_employees;

SHOW TABLES;

Delete files from volume

In [0]:
## Delete files from Volume using "dbutils.fs.rm"

dbutils.fs.rm('/Volumes/fundamentals/default/myfiles/employees2.csv', True)