- !pip install delta-spark
- from delta.tables import DeltaTable

In [0]:
import pandas as pd
import os

In [0]:
VOLUME_STORAGE = '/Volumes/geih/raw_data/parquet/'

In [0]:
class ErrorLake(Exception):
    pass

class LakeReader:
    def __init__(self):
        self._filter  = None
        self._format  = None
        self._colums  = None
        self._formats = ['geih', 'parquet']

    def select(self, *cols):
        if cols:
            self._colums = cols
        return self

    def format(self, name: str):
        if name.lower() not in self._formats: 
            raise ErrorLake(f'[ERROR.READ.FORMAT]: Formato `{name}` no soportado')
        self._format = name
        return self

    def filter(self, **kvargs): 
        if kvargs:
            self._filter = kvargs
        return self

    def load(self, path: str):
        if not path.startswith('/Volumes/'):
            raise ErrorLake(f'[ERROR.READ.PATH]: Lake solo soporta `VOLUMES` storage por ahora')
        #TODO(): Realizar `load` dependiendo del format.

        return self
    
    def __repr__(self):
        return 'LakeReader()'

class LakeWriter:
    def __repr__(self):
        return 'LakeWriter()'
    
class DeltaLake:
    """ Wrapper class para el manejo de lagos de datos en delta lake """
    @property
    def read(self):  return LakeReader()
    @property
    def write(self): return LakeWriter()

In [0]:
lake = DeltaLake()

geih = (
    lake
        .read
        .load(VOLUME_STORAGE)
)

In [0]:
geih

In [0]:
Volumes = '/Volumes/geih/raw_data/parquet/'
os.listdir(Volumes)[:12]
# df = pd.read_parquet(Volumes)

In [0]:
df.display()

In [0]:
%sql
-- ===================================================================
-- The source data base are finding in the `GEIH.Default` schema.
-- To create each Table we read the lake tables and filters it
-- by year and month.
-- ==================================================================

In [0]:
def get_periodo_from_parquets(volume : str) -> set:
    """ Retorna el listado de periodos i.e 202301, de los archivos parquet en el `path` """
    files = dbutils.fs.ls(volume)
    files = [f.name.split('.parquet')[0][-6 : ] for f in files if f.endswith('.parquet')] #i.e 202301
    return set(files)

def get_periodo_from_stream(schema : str) -> set:
    """ Listado de periodos i.e 202301, del stream en el `schema` """
    rows = spark.read.table(schema).select('time').collect()
    periodos = [str(row.time) for row in rows]
    return set(periodos)

def get_dlt_table(dlt_source : str):
    """ Lee una dlt table desde un source `dlt` """
    return spark.read.table(dlt_source)
   

In [0]:
df = spark.read.table('Geih.Default.carac')

In [0]:
part_col = 'MES'
spark \
    .table('Geih.Default.carac') \
    .write \
    .format('delta') \
    .partitionBy(part_col) \
    .mode('overwrite') \
    .option("overwriteSchema", "true") \
    .saveAsTable('Geih.Default.carac_dt')

In [0]:
dt = spark.read.table('Geih.Default.carac_dt').where(f"{part_col} = '01'")

In [0]:
dt.groupBy(part_col).count().show()

In [0]:
display(spark.read.format("delta").load("/Volumes/geih/default/_delta_log"))

In [0]:
%sql
Describe Detail GEIH.Default.carac_dt

In [0]:
target = get_periodo_from_stream('Geih.Default.periodos')
source = get_periodo_from_parquets('/Volumes/geih/bronze/parquet/')
part_per = list(target - source)
part_col = 'time'
print('Particiones:', part_per, '&', part_col)

In [0]:
from pyspark.sql.functions import expr, col

source_path = 'geih.default.carac'
target_path = '/Volumes/geih/bronze/parquet/'

source_table = spark.read.table(source_path)
# source_table = source_table.filter(col('time').isin(partitions[:2]))

In [0]:
dbutils.fs.ls('/Volumes/geih/bronze/parquet/')

In [0]:
%sql
List '/Volumes/geih/bronze/parquet'