In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import logging 


In [0]:
from datetime import date, timedelta, datetime

In [0]:
dbutils.secrets.help()

In [0]:
#dbutils.secrets.listScopes()
dbutils.secrets.list(scope= 'capitals_scope')

[SecretMetadata(key='clientid'),
 SecretMetadata(key='clientsecret'),
 SecretMetadata(key='password1'),
 SecretMetadata(key='tenantid'),
 SecretMetadata(key='username1')]

In [0]:
application_id = dbutils.secrets.get(scope='capitals_scope', key='clientid')
authenticationKey = dbutils.secrets.get(scope='capitals_scope', key='clientsecret')
tenandId = dbutils.secrets.get(scope='capitals_scope', key='tenantid')

In [0]:
adlsAccountName = "capitaldev"
adlsContainerName = "stock-agg-silver"
mountPoint = "/mnt/stock-agg-silver"

In [0]:
adlsAccountName = "capitaldev"
adlsContainerName = "stock-agg-gold"
mountPoint = "/mnt/stock-agg-gold"

In [0]:
endpoint = "https://login.microsoftonline.com/" + tenandId + "/oauth2/token"
source = "abfss://" + adlsContainerName + "@" + adlsAccountName + ".dfs.core.windows.net/" 
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": application_id,
"fs.azure.account.oauth2.client.secret": authenticationKey,
"fs.azure.account.oauth2.client.endpoint": endpoint}

In [0]:
if not any(mount.mountPoint == mountPoint for mount in dbutils.fs.mounts()):
    dbutils.fs.mount(source=source,mount_point=mountPoint,extra_configs =configs)

In [0]:
dbutils.fs.mounts()

[MountInfo(mountPoint='/databricks-datasets', source='databricks-datasets', encryptionType=''),
 MountInfo(mountPoint='/mnt/gold', source='wasbs://gold@capitaldev.blob.core.windows.net/', encryptionType=''),
 MountInfo(mountPoint='/mnt/shiba-inshightl-ayer', source='wasbs://shiba-inshightl-ayer@capitaldev.blob.core.windows.net/', encryptionType=''),
 MountInfo(mountPoint='/mnt/gold-edwh', source='abfss://raw-edwh@capitaldev.dfs.core.windows.net/', encryptionType=''),
 MountInfo(mountPoint='/Volumes', source='UnityCatalogVolumes', encryptionType=''),
 MountInfo(mountPoint='/mnt/silver', source='wasbs://silver@capitaldev.blob.core.windows.net/', encryptionType=''),
 MountInfo(mountPoint='/mnt/silver-edwh', source='abfss://raw-edwh@capitaldev.dfs.core.windows.net/', encryptionType=''),
 MountInfo(mountPoint='/mnt/bronze1', source='wasbs://bronze@capitaldev.blob.core.windows.net/', encryptionType=''),
 MountInfo(mountPoint='/databricks/mlflow-tracking', source='databricks/mlflow-tracking',

In [0]:
# now = datetime.now()
# print("now =", now)
# dt_string = now.strftime("%Y-%m-%d")
# print(dt_string)

# Get today's date
today = date.today()
print("Today is: ", today)
 
# # Yesterday date
# yesterday = today - timedelta(days = 1)
# print("Yesterday was: ", yesterday)

dt_string = today.strftime("%Y/%m/%d")
print(dt_string)

Today is:  2023-10-24
2023/10/24


In [0]:
folder = 'london'
a = f"/mnt/stock-agg-silver/{folder}/{dt_string}".format(folder,dt_string)

In [0]:
schema_def = StructType([StructField('id', IntegerType(), True),
                         StructField('StockDate', DateType(), True),
                         StructField('WarehouseID', StringType(), True),
                         StructField('ItemName', StringType(), True),
                         StructField('OpeningStock', IntegerType(), True),
                         StructField('Receipts', IntegerType(), True),
                         StructField('Issues', IntegerType(), True),
                         StructField('UnitValue', FloatType(), True)])

In [0]:
london_df = spark.read.schema(schema_def).option("header", True).option('inferSchema', True).format('csv').load(a)

In [0]:
london_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- StockDate: date (nullable = true)
 |-- WarehouseID: string (nullable = true)
 |-- ItemName: string (nullable = true)
 |-- OpeningStock: integer (nullable = true)
 |-- Receipts: integer (nullable = true)
 |-- Issues: integer (nullable = true)
 |-- UnitValue: float (nullable = true)



In [0]:
london_df.show()

+---+----------+-----------+----------------+------------+--------+------+---------+
| id| StockDate|WarehouseID|        ItemName|OpeningStock|Receipts|Issues|UnitValue|
+---+----------+-----------+----------------+------------+--------+------+---------+
|  1|2023-10-24|     London|  Tape Dispenser|           0|      25|    42|     5.63|
|  2|2023-10-24|     London|Pencil Sharpener|          40|      14|    98|     0.15|
|  3|2023-10-24|     London|Labeling Machine|          84|      41|    49|    11.35|
|  4|2023-10-24|     London|      Calculator|          51|      12|    44|     8.97|
|  5|2023-10-24|     London|        Scissors|          26|      44|    78|    18.89|
|  6|2023-10-24|     London|    Sticky Notes|          89|      36|    30|    16.53|
|  7|2023-10-24|     London|        Notebook|           5|      44|    76|     3.75|
|  8|2023-10-24|     London|       Clipboard|          44|      26|    84|     6.75|
|  9|2023-10-24|     London|         Folders|          46|       

In [0]:
 # Create a temporary view
london_df.createOrReplaceTempView("GLOBAL_STOCK")

print("Total Records available : ")
spark.sql("SELECT count(*) FROM GLOBAL_STOCK").show()

# Perform the aggregation using DataFrame API
london_stockSummary = london_df.groupBy("StockDate", "ItemName").agg(
    count("*").alias("TOTAL_REC"),
    sum("OpeningStock").alias("OPENING_STOCK"),
    sum("Receipts").alias("RECEIPTS"),
    sum("Issues").alias("ISSUES"),
    sum((col("OpeningStock") + col("Receipts") - col("Issues"))).alias("CLOSING_STOCK"),
    sum((col("OpeningStock") + col("Receipts") - col("Issues")) * col("UnitValue")).alias("CLOSING_VALUE")
)

print("Global Stock Summary: ")
daily_london_agg = london_stockSummary.withColumn('CLOSING_VALUE', round(col('CLOSING_VALUE'),2))

daily_london_agg.show()

Total Records available : 
+--------+
|count(1)|
+--------+
|      17|
+--------+

Global Stock Summary: 
+----------+----------------+---------+-------------+--------+------+-------------+-------------+
| StockDate|        ItemName|TOTAL_REC|OPENING_STOCK|RECEIPTS|ISSUES|CLOSING_STOCK|CLOSING_VALUE|
+----------+----------------+---------+-------------+--------+------+-------------+-------------+
|2023-10-24|Labeling Machine|        2|          145|      55|    73|          127|      1556.71|
|2023-10-24|      Calculator|        2|           78|      20|    85|           13|       140.55|
|2023-10-24|        Scissors|        2|           88|      51|   138|            1|       -18.91|
|2023-10-24|        Notebook|        2|            7|      70|   140|          -63|      -143.37|
|2023-10-24|    Sticky Notes|        2|          183|      67|   117|          133|      1634.19|
|2023-10-24|  Tape Dispenser|        2|           64|      57|   140|          -19|       -97.25|
|2023-10-24|

In [0]:
#writing to Gold

year_month = today.strftime("%Y-%m")
print(year_month)
day = today.strftime("%d")
print(day)
output_path = '/mnt/stock-agg-gold/London/{0}/{1}/london_data_.csv'.format(year_month, day)
print(output_path)

2023-10
24
/mnt/stock-agg-gold/London/2023-10/24/london_data_.csv


In [0]:
daily_london_agg.write.mode('overwrite').format('csv').save(output_path)

+--------+---+
|    name|age|
+--------+---+
|lokeswar| 24|
|  Harsha| 24|
|     Ram| 28|
+--------+---+

