In [1]:
_pipeline_module_file = '/home/mlops/airflow/dags/deltalake_pipeline.py'

In [3]:
%%writefile {_pipeline_module_file}
import datetime
import logging
import os
import sys

from airflow import DAG
from airflow.decorators import task

# import airflow.example_dags.example_complex.example_python_operator
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
import pyspark
from delta import *
from delta.tables import *

import firebase_admin
from firebase_admin import credentials
from firebase_admin import firestore

import pyspark.pandas as ps
from pyspark.ml.feature import StringIndexer
import pyspark.sql.functions as F
from pyspark.sql.types import Row
from pyspark.sql.functions import explode, collect_list, mean
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import regexp_replace

from collections import Counter
import re

import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
from airflow.operators.python import Context




log = logging.getLogger(__name__)
spark = None
db = None



user_metadata_baseline = dict()
user_metadata_baseline['pipeline_type'] = 'Apache Airflow Python DAG'
user_metadata_baseline['dag_file_location'] = '/home/mlops/airflow/dags/deltalake_pipeline.py'
user_metadata_baseline['pipeline_name'] = 'Deltalake Pipeline'
user_metadata_baseline['python_version'] = sys.version
user_metadata_baseline['env_yaml'] = '/home/mlops/env.yaml'





with DAG(
    dag_id="DeltalakePipeline",
    description='Collects, processes, and saves Ticketmaster, Spotify and Spotify Userdata to the Deltalake. For more info visit https://concert.raphaelmitas.com/blog',
    schedule_interval=None,
    start_date= datetime.datetime(year=2022, month=2, day=1)
) as dag:

    if spark is None:
    #Initialize Spark
        builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

        spark = configure_spark_with_delta_pip(builder).getOrCreate()

    if db is None:
        #Initialize Firebase Connection
        cred = credentials.Certificate("/home/mlops/project/DeltaLake/firebaseServiceAccountKey.json")
        firebase_admin.initialize_app(cred)
        db = firestore.client()


    #[START GENRE FLOW]
    @task(task_id='BRONZE_genre_map_table')
    def import_genre_map():
        user_metadata = user_metadata_baseline

        user_metadata['task_id'] = 'BRONZE_genre_map_table'
        user_metadata['data_origin'] = 'file:///home/mlops/project/DeltaLake/bronze_data/genre_map.csv'
        user_metadata['data_origin_task'] = 'BRONZE_genre_map_table'


        pandas_df = ps.read_csv('file:///home/mlops/project/DeltaLake/bronze_data/genre_map.csv', ';')
        spark_df = pandas_df.to_spark()
        spark_df.write.format("delta").mode("overwrite").option("userMetadata", user_metadata).save("file:///home/mlops/project/DeltaLake/bronze_data/genre_map_table")

        spark_df.show(5)
        return "SUCCESS"


    @task(task_id='PLATINUM_genre_map_indexed')
    def genre_map_with_index():
        user_metadata = user_metadata_baseline

        user_metadata['task_id'] = 'PLATINUM_genre_map_indexed'
        user_metadata['data_origin'] = 'file:///home/mlops/project/DeltaLake/bronze_data/genre_map.csv'
        user_metadata['data_origin_task'] = 'BRONZE_genre_map_table'



        genre_map = spark.read.format('delta').load('file:///home/mlops/project/DeltaLake/bronze_data/genre_map_table')
        renamed_map = genre_map
        indexer = StringIndexer(inputCol="main_genre_9", outputCol="main_genre_9_index")
        renamed_map = indexer.fit(renamed_map).transform(renamed_map)

        indexer = StringIndexer(inputCol="main_genre_18", outputCol="main_genre_18_index")
        renamed_map = indexer.fit(renamed_map).transform(renamed_map)

        indexer = StringIndexer(inputCol="genres", outputCol="genres_index")
        renamed_map = indexer.fit(renamed_map).transform(renamed_map)

        renamed_map = renamed_map.withColumn("main_genre_9_index",renamed_map.main_genre_9_index.cast('int'))
        renamed_map = renamed_map.withColumn("main_genre_18_index",renamed_map.main_genre_18_index.cast('int'))
        renamed_map = renamed_map.withColumn("genres_index",renamed_map.genres_index.cast('int'))
        renamed_map.write.format("delta").mode("overwrite")\
            .option("userMetadata", user_metadata).save("file:///home/mlops/project/DeltaLake/platinum_data/genre_map_indexed")

        renamed_map.show(5)
        return "SUCCESS"


    #[END GENRE FLOW]

    #[START SPOTIFY FLOW]
    @task(task_id='BRONZE_spotify_user_data_table')
    def fetch_spotify_user_data():

        user_metadata = user_metadata_baseline

        user_metadata['task_id'] = 'BRONZE_spotify_user_data_table'
        user_metadata['data_origin'] = 'firebase_firestore: spotify-user-data'
        user_metadata['data_origin_task'] = 'BRONZE_spotify_user_data_table'


        docs = db.collection('spotify-user-data').stream()

        def flatten(df):
            df = df.withColumn("url", df.external_urls.spotify)
            df = df.withColumn("followers", df.followers.total)
            df = df.withColumn("image_url", df.images[0].url)

            df = df.drop('external_urls', 'images')
            return df

        spark_df = None

        for doc in docs:

            user_data = doc.to_dict()
            if 'items' in user_data and len(user_data['items']) > 0:
                pandas_df = ps.DataFrame(user_data['items'])
                pandas_df['uid'] = doc.id
                pandas_df['create_time'] = doc.create_time
                pandas_df['update_time'] = doc.update_time
                pandas_df['read_time'] = doc.read_time

                if spark_df is None:

                    spark_df = flatten(pandas_df.to_spark())
                else:
                    # pandas_df.to_spark().show()
                    spark_df = spark_df.unionByName(flatten(pandas_df.to_spark()))
                    spark_df.count()

        if spark_df is not None:
            spark_df.show(5)

        spark_df.write.format("delta").mode("overwrite")\
            .option("userMetadata", user_metadata).save("file:///home/mlops/project/DeltaLake/bronze_data/spotify_user_data_table")

        return 'SUCCESS'


    @task(task_id='GOLD_spotify_cleansed_table')
    def user_data_cleanse():

        user_metadata = user_metadata_baseline

        user_metadata['task_id'] = 'GOLD_spotify_cleansed_table'
        user_metadata['data_origin'] = 'firebase_firestore: spotify-user-data'
        user_metadata['data_origin_task'] = 'BRONZE_spotify_user_data_table'


        spotify = spark.read.format('delta').load('file:///home/mlops/project/DeltaLake/bronze_data/spotify_user_data_table')

        spotify_cleansed = spotify
        spotify_cleansed = spotify_cleansed.filter(F.size("genres") > 0)

        spotify_cleansed.write.format("delta").mode("overwrite")\
            .option("userMetadata", user_metadata).save("file:///home/mlops/project/DeltaLake/gold_data/spotify_cleansed_table")

        spotify_cleansed.show(5)
        return 'SUCCESS'

    @task(task_id='PLATINUM_spotify_genre_agg_table')
    def user_data_aggregation():

        user_metadata = user_metadata_baseline

        user_metadata['task_id'] = 'PLATINUM_spotify_genre_agg_table'
        user_metadata['data_origin'] = 'firebase_firestore: spotify-user-data'
        user_metadata['data_origin_task'] = 'BRONZE_spotify_user_data_table'


        spotify = spark.read.format('delta').load('file:///home/mlops/project/DeltaLake/gold_data/spotify_cleansed_table')

        genre_map = spark.read.format('delta').load('file:///home/mlops/project/DeltaLake/bronze_data/genre_map_table')

        #Genre Aggregation
        spotify_aggregate = spotify

        genre_list = genre_map.to_pandas_on_spark().to_dict('list')

        def f(row):
            genre_set_9 = set()
            genre_set_18 = set()
            for x in row.genres:
                if x in genre_list['genres']:
                    index = genre_list['genres'].index(x)
                    genre_set_9.add(genre_list['main_genre_9'][index])
                    genre_set_18.add(genre_list['main_genre_18'][index])
            row = row.asDict()
            row['genres_9'] = list(genre_set_9)
            row['genres_18'] = list(genre_set_18)
            return Row(**row)

        spotify_aggregate = spotify_aggregate.rdd.map(f)
        spotify_aggregate = spotify_aggregate.toDF()


        #User Aggregation
        spotify_unify = spotify_aggregate

        spotify_unify_followers = spotify_unify.groupby('uid', 'create_time', 'update_time', 'read_time').agg(mean('followers').alias("followers"))
        spotify_unify_popularity = spotify_unify.groupby('uid', 'create_time', 'update_time', 'read_time').agg(mean('popularity').alias("popularity"))

        spotify_unify_9 = spotify_unify.select('*', explode(spotify_unify.genres_9).alias("genre"))
        spotify_unify_9 = spotify_unify_9.groupby('uid', 'create_time', 'update_time', 'read_time').agg(collect_list('genre').alias("genres_9"))
        spotify_unify_18 = spotify_unify.select('*', explode(spotify_unify.genres_18).alias("genre"))
        spotify_unify_18 = spotify_unify_18.groupby('uid', 'create_time', 'update_time', 'read_time').agg(collect_list('genre').alias("genres_18"))
        spotify_unify = spotify_unify.select('*', explode(spotify_unify.genres).alias("genre"))
        spotify_unify = spotify_unify.groupby('uid', 'create_time', 'update_time', 'read_time').agg(collect_list('genre').alias("genres"))

        spotify_unify = spotify_unify.join(spotify_unify_9, ['uid', 'create_time', 'update_time', 'read_time'])\
            .join(spotify_unify_18, ['uid', 'create_time', 'update_time', 'read_time'])\
            .join(spotify_unify_followers, ['uid', 'create_time', 'update_time', 'read_time'])\
            .join(spotify_unify_popularity, ['uid', 'create_time', 'update_time', 'read_time'])

        def f_agg(row):
            row = row.asDict()
            row['genres'] = dict(Counter(row['genres']))
            row['genres_9'] = dict(Counter(row['genres_9']))
            row['genres_18'] = dict(Counter(row['genres_18']))
            return Row(**row)

        spotify_unify = spotify_unify.rdd.map(f_agg)

        spotify_unify = spotify_unify.toDF()


        # Extract top genre

        spotify_extracted = spotify_unify

        def f(row):
            row = row.asDict()
            highest_number = 0
            highest_label = 'undefined'
            for genre  in row['genres_9'].items():
                if genre[1] > highest_number:
                    highest_number = genre[1]
                    highest_label = genre[0]
            row['genres_1'] = highest_label
            return Row(**row)

        spotify_extracted = spotify_extracted.rdd.map(f)

        spotify_extracted = spotify_extracted.toDF()


        #Rename Columns
        spotify_renamed = spotify_extracted

        spotify_renamed = spotify_renamed.withColumnRenamed("genres_9","spotify_genres_9") \
            .withColumnRenamed("genres_18","spotify_genres_18") \
            .withColumnRenamed("uid","spotify_uid") \
            .withColumnRenamed("create_time","spotify_create_time") \
            .withColumnRenamed("update_time","spotify_update_time") \
            .withColumnRenamed("read_time","spotify_read_time") \
            .withColumnRenamed("genres","spotify_genres") \
            .withColumnRenamed("followers","spotify_followers") \
            .withColumnRenamed("popularity","spotify_popularity") \
            .withColumnRenamed("genres_1","spotify_genres_1")
        spotify_renamed.printSchema()

        #Save
        spotify_renamed.write.format("delta").mode("overwrite")\
            .option("userMetadata", user_metadata).option("mergeSchema", "true").save("file:///home/mlops/project/DeltaLake/platinum_data/spotify_genre_agg_table")

        spotify_renamed.show(5)
    #[END SPOTIFY FLOW]


    #[START TICKETMASTER FLOW]
    @task(task_id='BRONZE_ticketmaster_table')
    def ticketmaster_table():
        print()
        return 'SUCCESS'

    @task(task_id='SILBER_ticketmaster_augmented_table')
    def ingest_spotify_data():

        user_metadata = user_metadata_baseline

        user_metadata['task_id'] = 'SILBER_ticketmaster_augmented_table'
        user_metadata['data_description'] = 'Ticketmaster Concert Data'
        user_metadata['data_origin'] = 'ticketmaster_api: /discovery/v2/events?latlong=48.7758459%2C9.1829321&segmentId=KZFzniwnSyZfZ7v7nJ&page=40&radius=100'
        user_metadata['data_origin_task'] = 'BRONZE_spotify_user_data_table'
        user_metadata['data_augmentation_description'] = 'Spotify Artist Data'
        user_metadata['data_augmentation_origin'] = 'Spotify API: https://api.spotify.com/v1/search?q=[ARTIST_NAME]&type=artist&limit=1'

        #read deltalake table
        ticketmaster = spark.read.format('delta').load('file:///home/mlops/project/DeltaLake/bronze_data/ticketmaster_table')

        #prepare data
        ticketmaster_list = ticketmaster.rdd.collect()
        for event in ticketmaster_list:
            artist_name = re.split('[&\-:|]',event.name)[0]


        #read spotify data
        spotify = spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials(client_id='7b4d7d2626b04188a79a02ecdda845e2', client_secret='e01be9dfdd854f74bfe0d0af10c5a4ee'))

        print(f"SPOTIPY: {spotify}")
        ticketmaster_df = ps.DataFrame(ticketmaster)

        spotify_list = list()
        for index, event in ticketmaster_df.iterrows():
            artist_name = re.split('[&\-:|]',event['name'])[0].strip()
            if artist_name == "":
                artist_name = "asdfghjklö"
            result = spotify.search(artist_name, type='artist', limit=1)
            print(f"RESULT: {result},\n ARTIST_NAME: {artist_name}")
            if len(result['artists']['items']) > 0:
                spotify_list.insert(index, result['artists']['items'][0])
            else:
                spotify_list.insert(index, {})

        spotify_df = ps.DataFrame(spotify_list)

        ps.set_option('compute.ops_on_diff_frames', True)
        complete_df = ps.concat([spotify_df.rename(columns={
            'name': 'spotify_name',
            'id': 'spotify_id',
            'external_urls': 'spotify_external_urls',
            'followers': 'spotify_followers',
            'genres': 'spotify_genres',
            'href': 'spotify_href',
            'images': 'spotify_images',
            'popularity': 'spotify_popularity',
            'type': 'spotify_type',
            'uri': 'spotify_uri'
        }), ticketmaster_df], axis=1, join="inner")

        spark_df = complete_df.to_spark()
        spark_df.write.format("delta").mode("overwrite")\
            .option("userMetadata", user_metadata).save(
            "file:///home/mlops/project/DeltaLake/silver_data/ticketmaster_augmented_table")
        spark_df.show(5)

        return 'SUCCESS'

    @task(task_id='GOLD_ticketmaster_flattened_table')
    def ticketmaster_flat():

        user_metadata = user_metadata_baseline

        user_metadata['task_id'] = 'GOLD_ticketmaster_flattened_table'
        user_metadata['data_description'] = 'Ticketmaster Concert Data'
        user_metadata['data_origin'] = 'ticketmaster_api: /discovery/v2/events?latlong=48.7758459%2C9.1829321&segmentId=KZFzniwnSyZfZ7v7nJ&page=40&radius=100'
        user_metadata['data_origin_task'] = 'BRONZE_spotify_user_data_table'
        user_metadata['data_augmentation_description'] = 'Spotify Artist Data'
        user_metadata['data_augmentation_origin'] = 'Spotify API: https://api.spotify.com/v1/search?q=[ARTIST_NAME]&type=artist&limit=1'


        #read ticketmaster_flattened_table
        ticketmaster_augmented = spark.read.format('delta').load('file:///home/mlops/project/DeltaLake/silver_data/ticketmaster_augmented_table')

        #flatten spotify cols
        spotify_flattened = ticketmaster_augmented

        spotify_flattened = spotify_flattened.withColumn("spotify_url", ticketmaster_augmented.spotify_external_urls.spotify)
        spotify_flattened = spotify_flattened.withColumn("spotify_followers", ticketmaster_augmented.spotify_followers.total)
        spotify_flattened = spotify_flattened.withColumn("spotify_image_url", ticketmaster_augmented.spotify_images[0].url)

        spotify_flattened = spotify_flattened.drop('spotify_external_urls', 'spotify_images')


        #flatten ticketmaster cols
        ticketmaster_flattened = spotify_flattened

        ticketmaster_flattened = ticketmaster_flattened.withColumn("ticketmaster_date", ticketmaster_flattened.dates['start']['localDate'])
        ticketmaster_flattened = ticketmaster_flattened.withColumn("ticketmaster_venue_name", ticketmaster_flattened._embedded['venues'][0]['name'])
        ticketmaster_flattened = ticketmaster_flattened.withColumn("ticketmaster_venue_id", ticketmaster_flattened._embedded['venues'][0]['id'])
        ticketmaster_flattened = ticketmaster_flattened.withColumn("ticketmaster_venue_url", ticketmaster_flattened._embedded['venues'][0]['url'])
        ticketmaster_flattened = ticketmaster_flattened.withColumn("ticketmaster_venue_postal_code", ticketmaster_flattened._embedded['venues'][0]['postalCode'])
        ticketmaster_flattened = ticketmaster_flattened.withColumn("ticketmaster_venue_timezone", ticketmaster_flattened._embedded['venues'][0]['timezone'])
        ticketmaster_flattened = ticketmaster_flattened.withColumn("ticketmaster_venue_city", ticketmaster_flattened._embedded['venues'][0]['city'])
        ticketmaster_flattened = ticketmaster_flattened.withColumn("ticketmaster_venue_city", regexp_replace('ticketmaster_venue_city', '\{name=', ""))
        ticketmaster_flattened = ticketmaster_flattened.withColumn("ticketmaster_venue_city", regexp_replace('ticketmaster_venue_city', '}', ""))
        ticketmaster_flattened = ticketmaster_flattened.withColumn("ticketmaster_venue_country", ticketmaster_flattened._embedded['venues'][0]['country'])
        ticketmaster_flattened = ticketmaster_flattened.withColumn("ticketmaster_venue_country", regexp_replace('ticketmaster_venue_country', '\{name=', ""))
        ticketmaster_flattened = ticketmaster_flattened.withColumn("ticketmaster_venue_country", regexp_replace('ticketmaster_venue_country', ',.+', ""))
        ticketmaster_flattened = ticketmaster_flattened.withColumn("ticketmaster_venue_address", ticketmaster_flattened._embedded['venues'][0]['address'])
        ticketmaster_flattened = ticketmaster_flattened.withColumn("ticketmaster_venue_address", regexp_replace('ticketmaster_venue_address', '\{line1=', ""))
        ticketmaster_flattened = ticketmaster_flattened.withColumn("ticketmaster_venue_address", regexp_replace('ticketmaster_venue_address', '}', ""))
        ticketmaster_flattened = ticketmaster_flattened.withColumn("ticketmaster_url", ticketmaster_flattened._embedded['attractions'][0].url)
        ticketmaster_flattened = ticketmaster_flattened.withColumn("ticketmaster_artist_locale", ticketmaster_flattened._embedded['attractions'][0].locale)
        ticketmaster_flattened = ticketmaster_flattened.withColumn("ticketmaster_venue_latitude", ticketmaster_flattened._embedded['venues'][0]['location'])
        ticketmaster_flattened = ticketmaster_flattened.withColumn("ticketmaster_venue_longitude", regexp_replace('ticketmaster_venue_latitude', '.+longitude=', ""))
        ticketmaster_flattened = ticketmaster_flattened.withColumn("ticketmaster_venue_longitude", regexp_replace('ticketmaster_venue_longitude', '}', ""))
        ticketmaster_flattened = ticketmaster_flattened.withColumn("ticketmaster_venue_latitude", regexp_replace('ticketmaster_venue_latitude', '.+latitude=', ""))
        ticketmaster_flattened = ticketmaster_flattened.withColumn("ticketmaster_venue_latitude", regexp_replace('ticketmaster_venue_latitude', ', longitude=.+', ""))
        ticketmaster_flattened = ticketmaster_flattened.withColumn("ticketmaster_venue_latitude", ticketmaster_flattened['ticketmaster_venue_latitude'].cast(DoubleType()))
        ticketmaster_flattened = ticketmaster_flattened.withColumn("ticketmaster_venue_longitude", ticketmaster_flattened['ticketmaster_venue_longitude'].cast(DoubleType()))


        ticketmaster_flattened = ticketmaster_flattened.drop('dates', 'classifications', '_links', '_embedded')


        #rename cols
        ticketmaster_renamed = ticketmaster_flattened

        ticketmaster_renamed = ticketmaster_renamed.withColumnRenamed('name', 'ticketmaster_name')
        ticketmaster_renamed = ticketmaster_renamed.withColumnRenamed('id', 'ticketmaster_id')
        ticketmaster_renamed = ticketmaster_renamed.withColumnRenamed('locale', 'ticketmaster_locale')
        ticketmaster_renamed = ticketmaster_renamed.withColumnRenamed('distance', 'ticketmaster_distance')
        ticketmaster_renamed = ticketmaster_renamed.withColumnRenamed('units', 'ticketmaster_units')
        ticketmaster_renamed = ticketmaster_renamed.withColumnRenamed('price_min', 'ticketmaster_price_min')
        ticketmaster_renamed = ticketmaster_renamed.withColumnRenamed('price_max', 'ticketmaster_price_max')

        #save data
        ticketmaster_renamed.write.format("delta").mode("overwrite")\
            .option("userMetadata", user_metadata).save("file:///home/mlops/project/DeltaLake/gold_data/ticketmaster_flattened_table")

        ticketmaster_renamed.show(5)
        return 'SUCCESS'

    @task(task_id='GOLD_ticketmaster_cleansed_table')
    def ticketmaster_cleanse():

        user_metadata = user_metadata_baseline

        user_metadata['task_id'] = 'GOLD_ticketmaster_cleansed_table'
        user_metadata['data_description'] = 'Ticketmaster Concert Data'
        user_metadata['data_origin'] = 'ticketmaster_api: /discovery/v2/events?latlong=48.7758459%2C9.1829321&segmentId=KZFzniwnSyZfZ7v7nJ&page=40&radius=100'
        user_metadata['data_origin_task'] = 'BRONZE_spotify_user_data_table'
        user_metadata['data_augmentation_description'] = 'Spotify Artist Data'
        user_metadata['data_augmentation_origin'] = 'Spotify API: https://api.spotify.com/v1/search?q=[ARTIST_NAME]&type=artist&limit=1'

        #read ticketmaster_flattened_table
        ticketmaster = spark.read.format('delta').load('file:///home/mlops/project/DeltaLake/gold_data/ticketmaster_flattened_table')

        #drop cols
        ticketmaster_cleansed = ticketmaster

        ticketmaster_cleansed = ticketmaster_cleansed.na.drop(subset=["spotify_id", "spotify_image_url"])

        #filter
        ticketmaster_cleansed = ticketmaster_cleansed.filter(F.size("spotify_genres") > 0)

        ticketmaster_cleansed.write.format("delta").mode("overwrite")\
            .option("userMetadata", user_metadata).save("file:///home/mlops/project/DeltaLake/gold_data/ticketmaster_cleansed_table")

        ticketmaster_cleansed.show(5)

        return 'SUCCESS'

    @task(task_id='PLATINUM_ticketmaster_genre_agg_table')
    def ticketmaster_genre_aggregation():

        user_metadata = user_metadata_baseline

        user_metadata['task_id'] = 'PLATINUM_ticketmaster_genre_agg_table'
        user_metadata['data_description'] = 'Ticketmaster Concert Data'
        user_metadata['data_origin'] = 'ticketmaster_api: /discovery/v2/events?latlong=48.7758459%2C9.1829321&segmentId=KZFzniwnSyZfZ7v7nJ&page=40&radius=100'
        user_metadata['data_origin_task'] = 'BRONZE_spotify_user_data_table'
        user_metadata['data_augmentation_description'] = 'Spotify Artist Data'
        user_metadata['data_augmentation_origin'] = 'Spotify API: https://api.spotify.com/v1/search?q=[ARTIST_NAME]&type=artist&limit=1'


        #read ticketmaster_cleansed_table
        ticketmaster = spark.read.format('delta').load('file:///home/mlops/project/DeltaLake/gold_data/ticketmaster_cleansed_table')

        #read genre_map_table
        genre_map = spark.read.format('delta').load('file:///home/mlops/project/DeltaLake/bronze_data/genre_map_table')

        #genre augmentation
        ticketmaster_genre_agg = ticketmaster

        genre_list = genre_map.to_pandas_on_spark().to_dict('list')
        def f(row):

            genre_set_9 = set()
            genre_set_18 = set()
            for x in row.spotify_genres:
                if x in genre_list['genres']:
                    index = genre_list['genres'].index(x)
                    genre_set_9.add(genre_list['main_genre_9'][index])
                    genre_set_18.add(genre_list['main_genre_18'][index])
            row = row.asDict()
            row['spotify_genres_9'] = list(genre_set_9)
            row['spotify_genres_18'] = list(genre_set_18)
            return Row(**row)

        ticketmaster_genre_agg = ticketmaster_genre_agg.rdd.map(f)
        ticketmaster_genre_agg = ticketmaster_genre_agg.toDF()


        #save data
        ticketmaster_genre_agg.write.format("delta").mode("overwrite")\
            .option("userMetadata", user_metadata).save("file:///home/mlops/project/DeltaLake/platinum_data/ticketmaster_genre_agg_table")

        ticketmaster_genre_agg.show(5)

        return 'SUCCESS'

    @task(task_id='PLATINUM_tfx_genre_as_label')
    def ticketmaster_genre_as_label():

        user_metadata = user_metadata_baseline

        user_metadata['task_id'] = 'PLATINUM_tfx_genre_as_label'
        user_metadata['data_description'] = 'Ticketmaster Concert Data'
        user_metadata['data_origin'] = 'ticketmaster_api: /discovery/v2/events?latlong=48.7758459%2C9.1829321&segmentId=KZFzniwnSyZfZ7v7nJ&page=40&radius=100'
        user_metadata['data_origin_task'] = 'BRONZE_spotify_user_data_table'
        user_metadata['data_augmentation_description'] = 'Spotify Artist Data'
        user_metadata['data_augmentation_origin'] = 'Spotify API: https://api.spotify.com/v1/search?q=[ARTIST_NAME]&type=artist&limit=1'

        #read ticketmaster_genre_agg_table
        ticketmaster = spark.read.format('delta').load('file:///home/mlops/project/DeltaLake/platinum_data/ticketmaster_genre_agg_table')

        genre_as_label = ticketmaster

        #take first genre and set label
        genre_as_label = genre_as_label.withColumn('spotify_genres_1', genre_as_label.spotify_genres_9[0])

        #save
        genre_as_label.write.format("delta").mode("overwrite")\
            .option("userMetadata", user_metadata).save(
    "file:///home/mlops/project/DeltaLake/platinum_data/tfx_genre_as_label")

        genre_as_label.show(5)

        return 'SUCCESS'

    @task(task_id='PLATINUM_ticketmaster_rdy_to_serve')
    def ticketmaster_ready_to_serve():

        user_metadata = user_metadata_baseline

        user_metadata['task_id'] = 'PLATINUM_ticketmaster_rdy_to_serve'
        user_metadata['data_description'] = 'Ticketmaster Concert Data'
        user_metadata['data_origin'] = 'ticketmaster_api: /discovery/v2/events?latlong=48.7758459%2C9.1829321&segmentId=KZFzniwnSyZfZ7v7nJ&page=40&radius=100'
        user_metadata['data_origin_task'] = 'BRONZE_spotify_user_data_table'
        user_metadata['data_augmentation_description'] = 'Spotify Artist Data'
        user_metadata['data_augmentation_origin'] = 'Spotify API: https://api.spotify.com/v1/search?q=[ARTIST_NAME]&type=artist&limit=1'


        #read tfx_genre_as_label
        ticketmaster = spark.read.format('delta').load(
    'file:///home/mlops/project/DeltaLake/platinum_data/tfx_genre_as_label')

        #read genre_map_indexed
        genre_map = spark.read.format('delta').load('file:///home/mlops/project/DeltaLake/platinum_data/genre_map_indexed')


        #select cols for ml
        rdy_to_serve = ticketmaster

        rdy_to_serve = rdy_to_serve.select('spotify_genres_1', 'spotify_followers', 'spotify_popularity')

        #cast to float
        rdy_to_serve2 = rdy_to_serve

        rdy_to_serve2 = rdy_to_serve2.withColumn('spotify_followers', rdy_to_serve2.spotify_followers.cast('float'))

        # index string cols
        rdy_to_serve3 = rdy_to_serve2

        genre_map_prepared = genre_map.select('main_genre_9', 'main_genre_9_index')\
            .withColumnRenamed('main_genre_9', 'spotify_genres_1')\
            .withColumnRenamed('main_genre_9_index', 'spotify_genres_1_index').distinct()

        rdy_to_serve3 = rdy_to_serve3.join(genre_map_prepared, 'spotify_genres_1').drop('spotify_genres_1')

        #save
        rdy_to_serve3.write.format("delta").mode("overwrite")\
            .option("userMetadata", user_metadata).save(
    "file:///home/mlops/project/DeltaLake/platinum_data/ticketmaster_rdy_to_serve")

        rdy_to_serve3.show(5)

        return 'SUCCESS'

    #[END TICKETMASTER FLOW]



    @task(task_id='save_metadata')
    def save_metadata():
        delta_meta_data = dict()
        delta_meta_data['dag_id'] = dag.dag_id
        delta_meta_data['description'] = dag.description
        delta_meta_data['start_date'] = dag.start_date
        delta_meta_data['end_date'] = dag.end_date
        delta_meta_data['schedule_interval'] = dag.schedule_interval

        #BRONZE_genre_map_table
        delta_table = DeltaTable.forPath(spark, '/home/mlops/project/DeltaLake/bronze_data/genre_map_table')
        last_operation_df = delta_table.history(1)
        last_operation_df.write.format("delta").mode("append")\
            .option("userMetadata", delta_meta_data).save("file:///home/mlops/project/DeltaLake/metadata/metadata_history")

        #PLATINUM_genre_map_indexed
        delta_table = DeltaTable.forPath(spark, '/home/mlops/project/DeltaLake/platinum_data/genre_map_indexed')
        last_operation_df = delta_table.history(1)
        last_operation_df.write.format("delta").mode("append")\
            .option("userMetadata", delta_meta_data).save("file:///home/mlops/project/DeltaLake/metadata/metadata_history")



        #BRONZE_spotify_user_data_table
        delta_table = DeltaTable.forPath(spark, '/home/mlops/project/DeltaLake/bronze_data/spotify_user_data_table')
        last_operation_df = delta_table.history(1)
        last_operation_df.write.format("delta").mode("append")\
            .option("userMetadata", delta_meta_data).save("file:///home/mlops/project/DeltaLake/metadata/metadata_history")


        #GOLD_spotify_cleansed_table
        delta_table = DeltaTable.forPath(spark, '/home/mlops/project/DeltaLake/gold_data/spotify_cleansed_table')
        last_operation_df = delta_table.history(1)
        last_operation_df.write.format("delta").mode("append")\
            .option("userMetadata", delta_meta_data).save("file:///home/mlops/project/DeltaLake/metadata/metadata_history")


        #PLATINUM_spotify_genre_agg_table
        delta_table = DeltaTable.forPath(spark, '/home/mlops/project/DeltaLake/platinum_data/spotify_genre_agg_table')
        last_operation_df = delta_table.history(1)
        last_operation_df.write.format("delta").mode("append")\
            .option("userMetadata", delta_meta_data).save("file:///home/mlops/project/DeltaLake/metadata/metadata_history")



        #BRONZE_ticketmaster_table

        #SILBER_ticketmaster_augmented_table
        delta_table = DeltaTable.forPath(spark, '/home/mlops/project/DeltaLake/silver_data/ticketmaster_augmented_table')
        last_operation_df = delta_table.history(1)
        last_operation_df.write.format("delta").mode("append")\
            .option("userMetadata", delta_meta_data).save("file:///home/mlops/project/DeltaLake/metadata/metadata_history")


        #GOLD_ticketmaster_flattened_table
        delta_table = DeltaTable.forPath(spark, '/home/mlops/project/DeltaLake/gold_data/ticketmaster_flattened_table')
        last_operation_df = delta_table.history(1)
        last_operation_df.write.format("delta").mode("append")\
            .option("userMetadata", delta_meta_data).save("file:///home/mlops/project/DeltaLake/metadata/metadata_history")


        #GOLD_ticketmaster_cleansed_table
        delta_table = DeltaTable.forPath(spark, '/home/mlops/project/DeltaLake/gold_data/ticketmaster_cleansed_table')
        last_operation_df = delta_table.history(1)
        last_operation_df.write.format("delta").mode("append")\
            .option("userMetadata", delta_meta_data).save("file:///home/mlops/project/DeltaLake/metadata/metadata_history")


        #PLATINUM_ticketmaster_genre_agg_table
        delta_table = DeltaTable.forPath(spark, '/home/mlops/project/DeltaLake/platinum_data/ticketmaster_genre_agg_table')
        last_operation_df = delta_table.history(1)
        last_operation_df.write.format("delta").mode("append")\
            .option("userMetadata", delta_meta_data).save("file:///home/mlops/project/DeltaLake/metadata/metadata_history")


        #PLATINUM_tfx_genre_as_label
        delta_table = DeltaTable.forPath(spark, '/home/mlops/project/DeltaLake/platinum_data/tfx_genre_as_label')
        last_operation_df = delta_table.history(1)
        last_operation_df.write.format("delta").mode("append")\
            .option("userMetadata", delta_meta_data).save("file:///home/mlops/project/DeltaLake/metadata/metadata_history")


        #PLATINUM_ticketmaster_rdy_to_serve
        delta_table = DeltaTable.forPath(spark, '/home/mlops/project/DeltaLake/platinum_data/ticketmaster_rdy_to_serve')
        last_operation_df = delta_table.history(1)
        last_operation_df.write.format("delta").mode("append")\
            .option("userMetadata", delta_meta_data).save("file:///home/mlops/project/DeltaLake/metadata/metadata_history")

        metadata = spark.read.format('delta').load('file:///home/mlops/project/DeltaLake/metadata/metadata_history')

        metadata.show(11)
        return 'SUCCESS'

    metadata_history = save_metadata()
    # spotify_genre_agg_table >> metadata_history
    # ticketmaster_rdy_to_serve >> metadata_history



    genre_map_table = import_genre_map()
    genre_map_indexed = genre_map_with_index()


    spotify_user_data_table = fetch_spotify_user_data()
    spotify_cleansed_table = user_data_cleanse()
    spotify_genre_agg_table = user_data_aggregation()

    BRONZE_ticketmaster_table = ticketmaster_table()
    ticketmaster_augmented_table = ingest_spotify_data()
    ticketmaster_flattened_table = ticketmaster_flat()
    ticketmaster_cleansed_table = ticketmaster_cleanse()
    ticketmaster_genre_agg_table = ticketmaster_genre_aggregation()
    tfx_genre_as_label = ticketmaster_genre_as_label()
    ticketmaster_rdy_to_serve = ticketmaster_ready_to_serve()




    genre_map_table >> [genre_map_indexed, spotify_genre_agg_table, ticketmaster_genre_agg_table]
    genre_map_indexed >> ticketmaster_rdy_to_serve

    spotify_user_data_table >> spotify_cleansed_table >> spotify_genre_agg_table

    BRONZE_ticketmaster_table >> ticketmaster_augmented_table >> ticketmaster_flattened_table >> ticketmaster_cleansed_table >> ticketmaster_genre_agg_table >> tfx_genre_as_label >> ticketmaster_rdy_to_serve

    ticketmaster_rdy_to_serve >> metadata_history
    spotify_genre_agg_table >> metadata_history


Overwriting /home/mlops/airflow/dags/deltalake_pipeline.py
