In [None]:
# Get the return values from the parameters of Azure Data Factory
file_name = dbutils.widgets.get('fileName')
file_path = dbutils.widgets.get('filePath')

print(file_name, file_path)

In [None]:
# Define mount configs
configs = {
  "fs.azure.account.auth.type": "CustomAccessToken",
  "fs.azure.account.custom.token.provider.class": spark.conf.get(<your_token_provider_class_name_here>)
}

# Mount the ADLS Container
dbutils.fs.mount(
  source = "abfss://movies@armieldatalake.dfs.core.windows.net/",
  mount_point = "/mnt/movies/",
  extra_configs = configs)

In [None]:
# Read the created CSV File
df = spark.read.csv(
    f'/mnt/{file_path}/{file_name}',
    header=True,
    inferSchema=True
)

display(df)

In [None]:
from pyspark.sql.functions import col, mean, first, max, format_number, count

# Remove invalid row values from the column `Budget (in $)` and `Distributor`, rename column `_c0` to `movie_id`
df_cleanup = df.filter((col('Budget (in $)').rlike('^[0-9]+$')) & (col('Distributor').rlike('^[^0-9]*$'))).withColumnRenamed('_c0', 'movie_id')

# Apply aggregate functions
grouped_df = df_cleanup.groupby('distributor').agg(
    format_number(mean('World Wide Sales (in $)'), 0).alias('avg_sale_per_distributor'),
    count('movie_id').alias('distributor_n_movies'),
    first('Title').alias('top_movie'),
    format_number(max('World Wide Sales (in $)').cast('int'), 0).alias('top_movie_sales')
    )

# Get the top 10 distributor in terms of sales
result = grouped_df.orderBy('avg_sale_per_distributor', ascending=False).limit(10)
result.show(truncate=False)

In [None]:
# Define the container output path
output_path = '/mnt/movies/transformed/'

# Save the dataframe in parquet format
result.write.mode('overwrite').csv(output_path, header=True)

In [None]:
# Define the Azure SQL Database connection parameters
jdbc_url = dbutils.secrets.get(scope='azure-sql-secret', key='jdbc-url')
user = dbutils.secrets.get(scope='azure-sql-secret', key='jdbc-user')
password = dbutils.secrets.get(scope='azure-sql-secret', key='jdbc-password')

# Define the table name to save the DataFrame
table_name = "top_grossing_movies"

# Save the DataFrame to Azure SQL Database
result.write.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", table_name) \
    .option("user", user) \
    .option("password", password) \
    .mode("overwrite") \
    .save()

In [None]:
# Define the Azure Synapse SQL Database connection parameters
jdbc_url = dbutils.secrets.get(scope='azure-sql-secret', key='dw-jdbc-url')
user = dbutils.secrets.get(scope='azure-sql-secret', key='dw-jdbc-user')
password = dbutils.secrets.get(scope='azure-sql-secret', key='dw-jdbc-password')

# Define the table name to save the DataFrame
table_name = "top_grossing_movies"

# Save the DataFrame to Azure Synapse SQL pool
result.write \
  .format("com.databricks.spark.sqldw") \
  .option("url", jdbc_url) \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("dbtable", table_name) \
  .option("user", user) \
  .option("password", password) \
  .mode('overwrite') \
  .save()

In [None]:
# Unmount the ADLS
dbutils.fs.unmount('/mnt/movies/')