# Setup

In [1]:
%%sh
python --version

Python 3.11.6


In [2]:
import os
from enum import StrEnum, auto
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

In [3]:
builder = SparkSession.builder.appName("colibri_de_test") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [4]:
spark.version

'3.5.0'

In [5]:
class Layer(StrEnum):
    RAW = auto()
    BRONZE = auto()
    SILVER = auto()
    GOLD = auto()

### Get paths

In [6]:
def get_dir(data_dir, layer):
    return f'{data_dir}/{layer}'

In [7]:
DATA_DIR = '/home/jovyan/data'
RAW_DIR = get_dir(DATA_DIR, Layer.RAW)
BRONZE_DIR = get_dir(DATA_DIR, Layer.BRONZE)
SILVER_DIR = get_dir(DATA_DIR, Layer.SILVER)
GOLD_DIR = get_dir(DATA_DIR, Layer.GOLD)

In [8]:
raw_files = [f'{RAW_DIR}/{file}' for file in os.listdir(RAW_DIR)]
raw_files

['/home/jovyan/data/raw/data_group_1.csv',
 '/home/jovyan/data/raw/data_group_2.csv',
 '/home/jovyan/data/raw/data_group_3.csv']

In [9]:
TABLE_NAME = 'turbine'

# Bronze

In [10]:
raw_data = spark.read.csv(raw_files, header=True)
raw_data.count()

11160

In [11]:
raw_data.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- turbine_id: string (nullable = true)
 |-- wind_speed: string (nullable = true)
 |-- wind_direction: string (nullable = true)
 |-- power_output: string (nullable = true)



In [25]:
# Write the data to bronze layer
raw_data.write.format("delta").mode("overwrite").save(f'{BRONZE_DIR}/{TABLE_NAME}')

# Silver