In [0]:
%run ../utils/_DeltaLakeInit

In [0]:
bronze_location = f"/{session}/parquet/badges1gb/"
delta_location = f"/{session}/delta/badges/"

# Clean

In [0]:
%sql
DROP TABLE IF EXISTS Bronze.badges1gb

In [0]:
remove_files(delta_location, True);

In [0]:
remove_files(bronze_location, True);

# Convert: CSV to Parquet

In [0]:
file_location = f'abfss://stackoverflow@{storage_account}.dfs.core.windows.net/Badges.csv';
#dbutils.fs.head(file_location)

In [0]:
schema = StructType([
    StructField("Id",IntegerType(), True),
    StructField("Name",StringType(), True),
    StructField("Points",IntegerType(), True),
    StructField("CreatedOn", TimestampType(), True),
  ])

In [0]:
badges_df = spark.read.load(file_location, format='csv', header=False, sep='|', schema = schema);
display(badges_df.limit(10))

## Save as Parquet (Python)

In [0]:
print(bronze_location)

In [0]:
badges_df.write.mode("overwrite").parquet(bronze_location)

In [0]:
display(dbutils.fs.ls(bronze_location))

## Create table (SQL)

In [0]:
%sql
--CREATE CATALOG IF NOT EXISTS DataScotland
--MANAGED LOCATION 'abfss://sqlplayer2020.blob.core.windows.net/unitycatalog';

CREATE SCHEMA IF NOT EXISTS hive_metastore.Bronze;

USE CATALOG hive_metastore;


In [0]:
spark.sql("DROP TABLE IF EXISTS Bronze.badges1gb");

spark.sql("""
CREATE TABLE Bronze.badges1gb
USING PARQUET
LOCATION 'dbfs:/${v.session}/parquet/badges1gb/'
"""
)

In [0]:
%sql
SELECT * FROM bronze.badges1gb LIMIT 100

# Convert: Parquet to Delta

In [0]:
print(delta_location)

In [0]:
badges_df.write.mode("overwrite").parquet(delta_location)

In [0]:
%sql
CONVERT TO DELTA parquet.`/${v.session}/delta/badges/`

Now, we can see new folder created: `_delta_log`

In [0]:
display(dbutils.fs.ls(delta_location))

## With Python

In [0]:
from delta.tables import *
DeltaTable.convertToDelta(spark, f"parquet.`/{session}/delta/badges/`")