In [None]:
dbfs_path = 'dbfs:/datalake/deezer/dbtools.py' 
local_path = f'file:/tmp/datalake/deezer/ingestion/{dbfs_path.split("/")[-1]}'
dbutils.fs.cp(dbfs_path, local_path)

True

In [None]:
import sys
sys.path.append('/tmp/datalake/deezer/ingestion/')

import dbtools
import os
import json
import delta
from pyspark.sql import types
from pyspark.sql.dataframe import DataFrame as pyspark_df
from pyspark.sql.session import SparkSession
from pyspark.sql.streaming.query import StreamingQuery
from typing import Literal
import delta

In [None]:
class BronzeIngestion:

    def __init__(
            self,
            table_name: str,
            database_name: str,
            id_fields: list,
            timestamp_field: str,
            endpoint: Literal['history', 'charts'] = 'history',
            file_format: str = 'parquet', 
            schema: types.StructType | None = None,
            read_options: dict = {},
            partition_fields: list = [],
            spark: SparkSession = spark
        ):
        
        self.table_name = table_name
        self.database_name = database_name
        self.id_fields = id_fields
        self.timestamp_field = timestamp_field
        self.endpoint = endpoint
        self.file_format = file_format
        self.schema = schema
        self.read_options = read_options
        self.partition_fields = partition_fields
        self.spark = spark

        self.path_full_load = f'dbfs:/datalake/deezer/user/{self.endpoint}/{self.file_format}'
        self.path_incremental = f'dbfs:/datalake/deezer/user/{self.endpoint}/{self.file_format}'
        self.table_full_name = f'{self.database_name}.{self.table_name}'
        self.checkpoint_path = f'{self.path_incremental.rstrip("/")}_{self.table_name}_checkpoint'

        self.set_schema()
        self.set_query()

    def _make_dir(path: str) -> None:
        try:
            dbutils.fs.mkdirs(path)
        except:
            pass

    def infer_schema(self) -> types.StructType:
        return self.load_full().schema
    
    def load_schema(self) -> types.StructType:
        schema_path_dbfs = f'dbfs:/datalake/deezer/schema/{self.table_name}.json'
        schema_path_local = schema_path_dbfs.replace('dbfs:', '/tmp')
        try:
            dbutils.fs.cp(schema_path_dbfs, f'file:{schema_path_local}')
        except:
            pass

        if os.path.exists(schema_path_local):
            with open(schema_path_local) as f:
                return types.StructType.fromJson(json.load(f))

    def set_schema(self) -> types.StructType:
        schema = self.load_schema()

        if schema is None:
            print('Inferindo schema...')
            schema = self.infer_schema()
            print('Ok!')

        self.schema = schema

    def save_schema(self) -> None:
        data = self.schema.jsonValue()
        schema_path = '/tmp/datalake/deezer/schema'
        self._make_dir(f'file:{schema_path}')
        schema_path_local = f'{schema_path}/{self.table_name}.json'

        with open(schema_path_local, 'w') as f:
            json.dump(data, f, indent=2)

        schema_path_dbfs = schema_path_local.replace('/tmp', 'dbfs:')
        dbutils.fs.cp(schema_path_local, schema_path_dbfs)

    def load_full(self) -> pyspark_df:
        reader = (spark
                    .read
                    .format(self.file_format)
                    .option("recursiveFileLookup","true")
                    .options(**self.read_options)
                )
        
        if self.schema is not None:
            reader = reader.schema(self.schema)

        sdf = reader.load(self.path_full_load)
        
        return sdf
    
    def save_full(self, sdf: pyspark_df) -> None:
        writer = (
            sdf.coalesce(1)
            .write.format('delta')
            .mode('overwrite')
            .option("recursiveFileLookup","true")
            .option('overwriteSchema', 'true')
        )

        if len(self.partition_fields) > 0:
            writer = writer.partitionBy(*self.partition_fields)

        writer.saveAsTable(self.table_full_name)

    def read_query(self) -> str | None:
        query_path_dbfs = f'dbfs:/datalake/deezer/etl/{self.table_name}.sql'
        query_path_local = query_path_dbfs.replace('dbfs:', '/tmp')
        try:
            dbutils.fs.cp(query_path_dbfs, f'file:{query_path_local}')
        except:
            pass

        if os.path.exists(query_path_local):
            with open(query_path_local) as f:
                return f.read()
        return None
    
    def default_query(self) -> str:
        base_query = '''SELECT *,
                        NOW() as ingestion_at
                        FROM {table} '''
        ids = ','.join(self.id_fields)
        window = f'''QUALIFY row_number() OVER (PARTITION BY {ids} ORDER BY {self.timestamp_field} DESC) = 1'''
        return base_query + window
    
    def set_query(self) -> None:
        query = self.read_query()
        if query is None:
            print('Loading default query...')
            query = self.default_query()
            print('Ok!')
        self.query = query

    def transform(self, table_name: str) -> pyspark_df:
        query = self.query.format(table=table_name)
        return self.spark.sql(query)

    def process_full(self) -> None:
        sdf = self.load_full()
        view_name_tmp = f'tb_full_{self.table_name}'
        sdf.createOrReplaceTempView(view_name_tmp)
        sdf_transform = self.transform(view_name_tmp)
        self.save_full(sdf_transform)

    def load_stream(self) -> pyspark_df:
        df_stream = (
            self.spark
            .readStream
            .format('cloudFiles')
            .option('cloudFiles.format', self.file_format)
            .option('cloudFiles.maxFilesPerTrigger', 10000)
            .option("recursiveFileLookup","true")
            .options(**self.read_options)
            .schema(self.schema)
            .load(self.path_incremental)
        )

        return df_stream
    
    def upsert(self, df: pyspark_df, delta_table: delta.tables.DeltaTable) -> None:
        df.createOrReplaceGlobalTempView(f'tb_stream_{self.table_name}')

        df_new = self.transform(table_name=f'global_temp.tb_stream_{self.table_name}')

        join = ' AND '.join(f'd.{id_field} = n.{id_field}' for id_field in self.id_fields)
        (
            delta_table.alias('d')
            .merge(df_new.alias('n'), join)
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
        )
    
    def save_stream(self, df_stream: pyspark_df) -> None:
        table_delta = delta.DeltaTable.forName(self.spark, self.table_full_name)

        return (
            df_stream.writeStream.trigger(availableNow=True)
            .option('checkpointLocation', self.checkpoint_path)
            .foreachBatch(lambda df, batchID: self.upsert(df, table_delta))
        )

    def process_stream(self) -> StreamingQuery:
        df_stream = self.load_stream()
        stream = self.save_stream(df_stream)

        return stream.start()


In [None]:
settings = {
    "history": {
        "id_fields": ["timestamp"],
        "timestamp_field": "timestamp"
    },
    "charts": {
        'id_fields': ['id', 'chart', '`timestamp`'],
        "timestamp_field": "timestamp"
    }
}

# table_name = dbutils.widgets.get('datasource')
datasource = 'history'
table_name = f'deezer_{datasource}'
database_name = 'bronze'

config = settings[datasource]

ingestao = BronzeIngestion(
            table_name=table_name,
            database_name=database_name,
            spark=spark,
            endpoint=datasource,
            **config
)

Inferindo schema...
Ok!
Loading default query...
Ok!


In [None]:
if not dbtools.table_exists(spark, database_name, table_name):
    df_null = spark.createDataFrame(data=[], schema=ingestao.schema)
    ingestao.save_full(df_null)
    dbutils.fs.rm(ingestao.checkpoint_path, True)

In [None]:
stream = ingestao.process_stream()
stream.awaitTermination()

table = delta.DeltaTable.forName(spark, f"{database_name}.{table_name}")
table.vacuum()

DataFrame[]

In [None]:
datasource = 'charts'
table_name = f'deezer_{datasource}'
database_name = 'bronze'

config = settings[datasource]

ingestao_charts = BronzeIngestion(
            table_name=table_name,
            database_name=database_name,
            spark=spark,
            endpoint=datasource,
            **config
)

if not dbtools.table_exists(spark, database_name, table_name):
    df_null = spark.createDataFrame(data=[], schema=ingestao_charts.schema)
    ingestao_charts.save_full(df_null)
    dbutils.fs.rm(ingestao_charts.checkpoint_path, True)

stream_charts = ingestao_charts.process_stream()
stream_charts.awaitTermination() 

table = delta.DeltaTable.forName(spark, f"{database_name}.{table_name}")
table.vacuum()

Inferindo schema...
Ok!
Loading default query...
Ok!


DataFrame[]

In [None]:
# spark.sql('''
#           select
#           id, title, artist.name, album.title, `timestamp`, `rank`
#           from bronze.deezer_history
#           order by `timestamp` desc
#           ''').display()

id,title,name,title.1,timestamp,rank
2470364805,The Day I Was Born,Ed Sheeran,Autumn Variations,1696022685,100000
2470364795,When Will I Be Alright,Ed Sheeran,Autumn Variations,1696022509,100000
2470364785,Punchline,Ed Sheeran,Autumn Variations,1696022301,100000
2470364775,Spring,Ed Sheeran,Autumn Variations,1696022123,100000
2470364765,Midnight,Ed Sheeran,Autumn Variations,1696021927,100000
1370378452,Time Traveling,Sarah Kang,Time Traveling,1696021806,237695
2470364755,Page,Ed Sheeran,Autumn Variations,1696021696,100000
425175112,Flume (Bon Iver Cover),Hundreds,Flume (Bon Iver Cover),1696021613,160673
2470364735,That’s On Me,Ed Sheeran,Autumn Variations,1696021468,100000
1452441092,Nothing,Sarah Kang,Nothing,1696021409,78648


In [None]:
# (spark.sql('''
#           SELECT
#             TITLE, 
#             ARTIST.NAME as ARTIST_NAME, 
#             ALBUM.TITLE AS ALBUM_TITLE, 
#             CHART,
#             COALESCE(
#                 LAG(CHART) OVER (PARTITION BY id ORDER BY `TIMESTAMP` ASC), 'Fora do Top 100') AS LAST_ONE,
#             LAG(TITLE) OVER (PARTITION BY CHART ORDER BY `TIMESTAMP` ASC) AS LAST_SONG_IN_THIS_CHART,
#             LAG(ARTIST.NAME) OVER (PARTITION BY CHART ORDER BY `TIMESTAMP` ASC) AS AN_FROM_LS_IN_THIS_CHART
#           FROM
#             BRONZE.DEEZER_CHARTS 
#           QUALIFY 
#             ROW_NUMBER() OVER (PARTITION BY id ORDER BY `TIMESTAMP` DESC) = 1
#           ORDER BY 
#             CHART ASC, `TIMESTAMP` DESC
#           ''')
#           .display()
# )

id,TITLE,ARTIST_NAME,ALBUM_TITLE,CHART,LAST_ONE,LAST_SONG_IN_THIS_CHART,AN_FROM_LS_IN_THIS_CHART
2373296555,Home,HAIM,Barbie The Album,1,Fora do Top 100,,
1509486932,22 Break,Oh Wonder,22 Break,2,8,Seasons,Thirty Seconds to Mars
1509486942,Free,Oh Wonder,22 Break,3,9,Ahay,Of Monsters And Men
2444545705,Seasons,Thirty Seconds to Mars,It’s The End Of The World But It’s A Beautiful Day,4,2,Get Up Kid,Thirty Seconds to Mars
716516292,Ahay,Of Monsters And Men,FEVER DREAM,5,3,Stuck,Thirty Seconds to Mars
2422608825,Used To Be Young,Miley Cyrus,Used To Be Young,6,69,Life Is Beautiful,Thirty Seconds to Mars
2444545715,Get Up Kid,Thirty Seconds to Mars,It’s The End Of The World But It’s A Beautiful Day,7,4,Down,Oh Wonder
2444545695,Life Is Beautiful,Thirty Seconds to Mars,It’s The End Of The World But It’s A Beautiful Day,8,6,22 Break,Oh Wonder
2444545685,Stuck,Thirty Seconds to Mars,It’s The End Of The World But It’s A Beautiful Day,9,5,Free,Oh Wonder
2444545725,Love These Days,Thirty Seconds to Mars,It’s The End Of The World But It’s A Beautiful Day,10,29,Baby,Oh Wonder


In [None]:
# spark.sql('''
#           with fav_artists as (
#            select
#             artist_name,
#             count(artist_name) as num_aparicoes
#           from
#             bronze.deezer_charts
#           group by
#             artist_name
#           order by
#             num_aparicoes desc
#           )
#            select distinct
#             t1.artist_name as fav_artist,
#             t1.num_aparicoes,
#             first_value(t2.title) over (partition by t2.artist_name order by t2.chart asc) as fav_song
#           from 
#             fav_artists as t1
#           inner join
#             bronze.deezer_charts as t2
#           using (artist_name)   
#           order by
#             num_aparicoes desc
#           ''').display()

fav_artist,num_aparicoes,fav_song
Demi Lovato,15,Tell Me You Love Me (Rock Version)
Oh Wonder,11,I Wish I Never Met You
Ed Sheeran,9,Curtains
Thirty Seconds to Mars,9,Seasons
Post Malone,8,Chemical
Olivia Rodrigo,7,vampire
Miley Cyrus,7,Jaded
Maroon 5,6,What Lovers Do
Jack Johnson,5,The Sharing Song
Harry Styles,4,As It Was
