# 0. connect to Data lake

In [0]:
service_credential = dbutils.secrets.get(scope="scope-keyVault",key="databricksServicePrincipal")

spark.conf.set("fs.azure.account.auth.type.museumsdatalakeraw.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.museumsdatalakeraw.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.museumsdatalakeraw.dfs.core.windows.net", "ee577ff8-4507-4b0a-87ae-06ca001ef8cc")
spark.conf.set("fs.azure.account.oauth2.client.secret.museumsdatalakeraw.dfs.core.windows.net", service_credential)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.museumsdatalakeraw.dfs.core.windows.net", "https://login.microsoftonline.com/4f7d484c-d3d2-428a-9840-6d5d56dd7249/oauth2/token")

In [0]:
dbutils.fs.ls("abfss://standardized@museumsdatalakeraw.dfs.core.windows.net/ateneum")

# 1. Load standardized tables into a list of data frames

In [0]:
dfArr = []
data_paths = ['ateneum', 'kiasma', 'met']
for path in data_paths:
    # Read the parquet file
    df = spark.read.parquet("abfss://standardized@museumsdatalakeraw.dfs.core.windows.net/" + path)
    
    # Limit to first 100 rows
    #df = df.limit(100)
    
    # Add to array
    dfArr.append(df)

# 2. create cleaning functions

In [0]:
#cleaning functions
from pyspark.sql import functions as F
import re
from pyspark.sql.types import *
def extract_year(text_column):
    """
    Extract a 4-digit number from a string column.
    If multiple 4-digit numbers exist, return the last one.
    Will not return a year greater than 2025.
    
    Args:
        text_column: Column name containing text to extract year from
        
    Returns:
        PySpark column with extracted year as integer
    """
    @F.udf(returnType=IntegerType())
    def extract_year_udf(text):
        if text is None:
            return None
        
        # Find all 4-digit numbers in the string
        matches = re.findall(r'\b\d{4}\b', str(text))
        
        # Return the last match if any found, otherwise None
        if matches:
            # Get the last match
            year = int(matches[-1])
            # Ensure the year is not greater than 2025
            if year <= 2025:
                return year
            else:
                # If the extracted year is in the future, return None or you could 
                # choose to return the most recent valid year found
                # Alternative approach: return the most recent valid year from matches
                valid_years = [int(y) for y in matches if int(y) <= 2025]
                if valid_years:
                    return valid_years[-1]
                return None
        return None
    
    return extract_year_udf(F.col(text_column))

# 4. Use functions for cleaning

In [0]:
# Data for future tables
artworks = []
artists = []
acquisition = []
dimensions = []
medium = []
museum = []

for df in dfArr:
    # Copy data so original df will not change
    museum_data = df.alias("museum_data")
    
    # drop all rows with na in object_id and inventory_n columns
    museum_data = museum_data.na.drop(subset=['object_id', 'inventory_n'])
    
    # Apply custom function to extract years - only if the source columns exist
    if 'production_raw' in museum_data.columns:
        museum_data = museum_data.withColumn('production_year', extract_year('production_raw'))
    if 'acquisition_raw' in museum_data.columns:
        museum_data = museum_data.withColumn('acquisition_year', extract_year('acquisition_raw'))
    if 'artist_birth_raw' in museum_data.columns:
        museum_data = museum_data.withColumn('artist_birth_year', extract_year('artist_birth_raw'))
    if 'artist_death_raw' in museum_data.columns:
        museum_data = museum_data.withColumn('artist_death_year', extract_year('artist_death_raw'))
    
    # Group by artist_id
    artists_df = museum_data.select('artist_id').distinct()
    
    # Create dataframes with desired columns, only including columns that exist    
    # Artwork dataframe
    artwork_columns = ["object_id", "inventory_n", "title",
                    "production_year", "acquisition_year", "artist_id", "museum_id"]
    df1 = museum_data.select(*artwork_columns)
    artworks.append(df1)
    
    # Artist dataframe
    artist_columns = ["artist_id", "artist_name", "artist_birth_year", 
                     "artist_death_year", "artist_group", "artist_gender", "artist_country_raw", "museum_id"]
    temp_df = museum_data
    for col in artist_columns:
        if col not in temp_df.columns:
            temp_df = temp_df.withColumn(col, F.lit(None))
    df2 = temp_df.select(*artist_columns)\
        .dropDuplicates(['artist_name', 'artist_id'])\
        .dropna(subset=['artist_name', 'artist_id'])
    artists.append(df2)
    
    # Acquisition dataframe
    acquisition_columns = ["object_id", "museum_id", "acquisition_method_raw", "acquisition_year"]
    temp_df = museum_data
    for col in acquisition_columns:
        if col not in temp_df.columns:
            temp_df = temp_df.withColumn(col, F.lit(None))
    df3 = temp_df.select(*acquisition_columns)
    acquisition.append(df3)
    
    # Dimensions dataframe
    dimensions_columns = ["object_id", "museum_id", "dimension_unit", "measurement_value1", 
                         "measurement_value2", "measurement_value3", "measurement_value4"]
    temp_df = museum_data
    for col in dimensions_columns:
        if col not in temp_df.columns:
            temp_df = temp_df.withColumn(col, F.lit(None))
    df4 = temp_df.select(*dimensions_columns)
    dimensions.append(df4)
    
    # Medium dataframe
    medium_columns = ["object_id", "museum_id", "medium_classification_raw", "medium1", 
                     "medium2", "medium3", "medium4", "medium5"]
    temp_df = museum_data
    for col in medium_columns:
        if col not in temp_df.columns:
            temp_df = temp_df.withColumn(col, F.lit(None))
    df5 = temp_df.select(*medium_columns)
    medium.append(df5)
    
    # Museum dataframe - keeping only distinct museum records
    museum_columns = ["museum_id", "museum_name"]
    existing_museum_cols = [col for col in museum_columns if col in museum_data.columns]
    df6 = museum_data.select(*existing_museum_cols).dropDuplicates(['museum_id'])
    museum.append(df6)


# 5. concatenate dataframes from each museum into one

In [0]:
# Combine all dataframes of each type
from functools import reduce
from pyspark.sql import DataFrame

combined_artworks = reduce(DataFrame.unionByName, artworks)
combined_artists = reduce(DataFrame.unionByName, artists)
combined_acquisition = reduce(DataFrame.unionByName, acquisition)
combined_dimensions = reduce(DataFrame.unionByName, dimensions)
combined_medium = reduce(DataFrame.unionByName, medium)
combined_museum = reduce(DataFrame.unionByName, museum)

# 6. save final dataframes into Clean Datalake layer using Delta format

In [0]:
combined_artworks.write.format("delta").mode("overwrite").save("abfss://clean@museumsdatalakeraw.dfs.core.windows.net//artworks_fact")

In [0]:
combined_artists.write.format("delta").mode("overwrite").save("abfss://clean@museumsdatalakeraw.dfs.core.windows.net//artists_dim")
combined_acquisition.write.format("delta").mode("overwrite").save("abfss://clean@museumsdatalakeraw.dfs.core.windows.net//acquisition_dim")
combined_dimensions.write.format("delta").mode("overwrite").save("abfss://clean@museumsdatalakeraw.dfs.core.windows.net//dimensions_dim")
combined_medium.write.format("delta").mode("overwrite").save("abfss://clean@museumsdatalakeraw.dfs.core.windows.net//medium_dim")
combined_museum.write.format("delta").mode("overwrite").save("abfss://clean@museumsdatalakeraw.dfs.core.windows.net//museum_dim")

In [0]:
display(spark.read.format("delta").load("abfss://clean@museumsdatalakeraw.dfs.core.windows.net/artworks_fact"))

object_id,inventory_n,title,production_year,acquisition_year,artist_id,museum_id
378978,A I 457:234,Vänrikin markkinamuisto. Kuvitusta Vänrikki Stålin tarinoihin,1869.0,1873.0,69483.0,1
381328,RAMSAY 585,Marion,,1919.0,65664.0,1
381635,A I 457:250,Kuoleva soturi. Kuvitusta Vänrikki Stålin tarinoihin,1869.0,1873.0,69483.0,1
382297,A I 457:189,"Näkymä italialaisesta merenpoukamassa olevasta kaupungista, taustalla tulivuori",,1873.0,69483.0,1
383944,A-2006-111,Vihreä Pariisi,2006.0,2006.0,63062.0,1
385746,A I 472:54,Seitsemän rokokoo-ornamenttia,1860.0,1890.0,60711.0,1
385804,A-2008-163,Harjalintu,1979.0,2008.0,70182.0,1
387171,A I 471:52,"Pilvinen taivas, harjoitelma",1855.0,1890.0,60711.0,1
389036,A I 51,Rekolankoski Jämsässä,1869.0,1872.0,60025.0,1
392019,A I 606:6,lehtimetsäympäristö,,1896.0,68727.0,1


# 7. create a hive metastore with data to use itnernally in Databricks

In [0]:
%sql
create database museums_data;

In [0]:
combined_artworks.createOrReplaceTempView("tempArtworks") 

In [0]:
%sql
create table museums_data.artworks_fact
as
select * from tempArtworks

num_affected_rows,num_inserted_rows
