In [0]:
dbutils.secrets.listScopes()

In [0]:
pip install databricks-cli

In [0]:
storage_account_name = "aprdemoadls"
scope_name = "keyvaultscopeap"
client_id = dbutils.secrets.get(scope=scope_name, key="clientid")
tenant_id = dbutils.secrets.get(scope=scope_name, key="tenantid")
client_secret = dbutils.secrets.get(scope=scope_name, key="secret")

spark.conf.set(f"fs.azure.account.auth.type.{storage_account_name}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account_name}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account_name}.dfs.core.windows.net", client_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account_name}.dfs.core.windows.net", client_secret)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account_name}.dfs.core.windows.net", f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

In [0]:
container_name = "bronze"
file_name = "StockMovement 2"
storage_account_name = "aprdemoadls"
file_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{file_name}.csv"
df = spark.read.format("csv").option("header", "true").load(file_path)
display(df)

In [0]:
from pyspark.sql.functions import col

# Remove duplicate rows
df = df.dropDuplicates()

In [0]:
%python
# Handle missing values
  
df = df.dropna()  # Drop rows with any remaining missing values
display(df)

In [0]:
%python
from pyspark.sql.functions import col

# Standardize data types
df = df.withColumn('MovementDate', col('MovementDate').cast('date'))
df = df.withColumn('Quantity', col('Quantity').cast('double'))

display(df)

In [0]:
%python
from pyspark.sql.functions import col, date_format

# Standardize data types
df = df.withColumn('MovementDate', col('MovementDate').cast('date'))
df = df.withColumn('Quantity', col('Quantity').cast('double'))

# Extract time from MovementDate and create a new column
df = df.withColumn('MovementTime', date_format(col('MovementDate'), 'HH:mm:ss'))

display(df)

In [0]:
%python
from pyspark.sql.functions import avg

# Group by MovementType and calculate the average Quantity
df_grouped = df.groupBy('MovementType').agg(avg('Quantity').alias('AverageQuantity'))

display(df_grouped)

In [0]:
# # %python
# # # Save the cleaned DataFrame to Parquet files
# # output_path2 = f"abfss://silver@aprdemoadls.dfs.core.windows.net/StockMovement.parquet"
# # df.write.mode("overwrite").parquet(output_path2)

# # Save the cleaned DataFrame to a single Parquet file
# output_path2 = "abfss://silver@aprdemoadls.dfs.core.windows.net/StockMovement.parquet"
# df.coalesce(1).write.mode("overwrite").parquet(output_path2)

In [0]:
%python
# Save the cleaned DataFrame to a single Parquet file
output_path2 = "abfss://silver@aprdemoadls.dfs.core.windows.net/StockMovement.parquet"
temp_output_path = "abfss://silver@aprdemoadls.dfs.core.windows.net/temp_StockMovement.parquet"

# Write the DataFrame to a temporary path
df.coalesce(1).write.mode("overwrite").parquet(temp_output_path)

# List the files in the temporary directory
files = dbutils.fs.ls(temp_output_path)

# Find the part file
part_file = [file.path for file in files if file.path.endswith(".parquet")][0]

# Rename the part file to the desired output path
dbutils.fs.mv(part_file, output_path2 + "/StockMovement.parquet", True)

# Remove the temporary directory
dbutils.fs.rm(temp_output_path, True)

In [0]:
%python
# Read the Parquet file to verify its contents
df_parquet = spark.read.parquet(output_path2 + "/StockMovement.parquet")
display(df_parquet)

In [0]:
df_parquet = spark.read.parquet(output_path2)
display(df_parquet)

In [0]:
%python
# Save the cleaned DataFrame to a single CSV file
output_path_csv = "abfss://silver@aprdemoadls.dfs.core.windows.net/StockMovement.csv"
temp_output_path_csv = "abfss://silver@aprdemoadls.dfs.core.windows.net/temp_StockMovement"

# Write the DataFrame to a temporary path in CSV format
df.coalesce(1).write.mode("overwrite").csv(temp_output_path_csv, header=True)

# List the files in the temporary directory
files = dbutils.fs.ls(temp_output_path_csv)

# Find the part file
part_file = [file.path for file in files if file.path.endswith(".csv")][0]

# Rename the part file to the desired output path
dbutils.fs.mv(part_file, output_path_csv, True)

# Remove the temporary directory
dbutils.fs.rm(temp_output_path_csv, True)

In [0]:
%python
# Read the CSV file to verify its contents
df_csv = spark.read.csv(output_path_csv, header=True, inferSchema=True)
display(df_csv)