### Step 1: Set up the connection

Assign access key and set spark configurations

In [None]:
storage_account_name = "adfcookbookv2sa"
storage_account_access_key = "xxxx"
spark.conf.set(
  "fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
  storage_account_access_key)

### Step 2: Read the data

Now that we have specified our file metadata, we can create a DataFrame. Notice that we use an *option* to specify that we want to infer the schema from the file. We can also explicitly set this to a particular schema if we have one already.

First, let's create a DataFrame in Python.

In [None]:
raw_container = 'raw'
ratings_location = f'wasbs://{raw_container}@{storage_account_name}.blob.core.windows.net/ratings.csv'
movies_location = f'wasbs://{raw_container}@{storage_account_name}.blob.core.windows.net/movies.csv'
links_location = f'wasbs://{raw_container}@{storage_account_name}.blob.core.windows.net/links.csv'
tags_location = f'wasbs://{raw_container}@{storage_account_name}.blob.core.windows.net/tags.csv'

In [None]:
ratingsDF = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load(ratings_location)
linksDF = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load(links_location)
moviesDF = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load(movies_location)
tagsDF = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load(tags_location)

### Step 3: Transform the data

Use spark sql functions to clean, aggregate and transform the dataframes

In [None]:
from pyspark.sql.functions import avg, round, current_timestamp, concat_ws, collect_set, from_unixtime, max, date_format, to_date
agg_ratingsDF = ratingsDF.groupBy('movieId').agg(round(avg('rating'), 2).alias("avg_rating"), max('timestamp').alias("unix_timestamp"))
agg_tagsDF = tagsDF.groupBy('movieId').agg(concat_ws(' | ', collect_set('tag')).alias('agg_tags'))

In [None]:
resultDF = (moviesDF.join(agg_ratingsDF, ['movieId'], 'left')
                  .join(linksDF, ['movieId'], 'left')
                  .join(agg_tagsDF, ['movieId'], 'left')
                  .withColumn("rating_datetime", to_date(date_format(from_unixtime("unix_timestamp"), "yyyy-MM-dd")))
                  .withColumn('load_timestamp', current_timestamp())
                  .drop('unix_timestamp')
)

### Step 4: Write the resulting dataframe as a Delta table

- For the first run use the `write` method
- Later on use the `merge` method

In [None]:
target_container = 'curated'
target_path = f'wasbs://{target_container}@{storage_account_name}.blob.core.windows.net/movie_lens'

# You can comment it after the first run
resultDF.write.format("delta").mode("overwrite").partitionBy('rating_datetime').saveAsTable(path=target_path, name='movie_lens')

In [None]:
from delta.tables import *
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("app").getOrCreate()
target_table = DeltaTable.forPath(spark, target_path)

key_column = "movieId"

# merge
(target_table.alias("target")
    .merge(resultDF.alias("source"), f"source.{key_column} = target.{key_column}")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute())

### Step 5: Query the data

Now that we have created our Delta table, we can query it. For instance, you can identify particular columns to select and display.

In [None]:
%sql
select * from movie_lens

movieId,count(1)
