%md
####Notebook Name : 01-Ingest-Daily-Pricing-HTTP-Source-Data
##### Source File Details
Source File URL : "https://retailpricing.blob.core.windows.net/daily-pricing"

Source File Ingestion Path : "abfss://bronze@datalakestorageaccountname.dfs.core.windows.net/daily-pricing/"

##### Python Core Library Documentation
- <a href="https://pandas.pydata.org/docs/user_guide/index.html#user-guide" target="_blank">pandas</a>
- <a href="https://pypi.org/project/requests/" target="_blank">requests</a>
- <a href="https://docs.python.org/3/library/csv.html" target="_blank">csv</a>

##### Spark Methods
- <a href="https://spark.apache.org/docs/latest/sql-getting-started.html#starting-point-sparksession" target="_blank">SparkSession</a>

In [0]:
processName = dbutils.widgets.get('prm_processName')

nextSourceFileDateSql = f"""SELECT NVL(MAX(PROCESSED_FILE_TABLE_DATE)+1,'2023-01-01') AS NEXT_SOURCE_FILE_DATE FROM pricing_analytics.processrunlogs.DELTALAKEHOUSE_PROCESS_RUNS 
WHERE PROCESS_NAME = '{processName}' AND PROCESS_STATUS = 'Completed'"""

nextSourceFileDateDF = spark.sql(nextSourceFileDateSql)


In [0]:
from datetime import datetime

In [0]:
dailyPricingSourceBaseURL = 'https://retailpricing.blob.core.windows.net/'
dailyPricingSourceFolder = 'daily-pricing/'
dailyPricingSourceFileDate = (datetime.strptime(str(nextSourceFileDateDF.select('NEXT_SOURCE_FILE_DATE').collect()[0]['NEXT_SOURCE_FILE_DATE']),'%Y-%m-%d')).strftime('%d%m%Y')
dailyPricingSourceFileName = f"PW_MW_DR_{dailyPricingSourceFileDate}.csv"

dailyPricingSinkLayerName = 'bronze'
dailyPricingSinkStorageAccountName= 'adlsdatalkehousedev'
dailyPricingSinkFolderName ='daily-pricing'

In [0]:
import pandas as pds

In [0]:
dailyPricingSourceURL = dailyPricingSourceBaseURL + dailyPricingSourceFolder + dailyPricingSourceFileName

In [0]:
dailyPricingPandasDF = pds.read_csv(dailyPricingSourceURL)

In [0]:
dailyPricingSparkDF = spark.createDataFrame(dailyPricingPandasDF)

In [0]:
from pyspark.sql.functions import current_timestamp
dailyPricingSinkFolderPath = f"abfss://{dailyPricingSinkLayerName}@{dailyPricingSinkStorageAccountName}.dfs.core.windows.net/{dailyPricingSinkFolderName}"

(
    dailyPricingSparkDF
    .withColumn("source_file_load_date",current_timestamp())
    .write
    .mode("append")
    .option("header","true")
    .csv(dailyPricingSinkFolderPath)
)

In [0]:

processFileDate = nextSourceFileDateDF.select('NEXT_SOURCE_FILE_DATE').collect()[0]['NEXT_SOURCE_FILE_DATE']
processStatus ='Completed'

processInsertSQL =f""" INSERT INTO pricing_analytics.processrunlogs.DELTALAKEHOUSE_PROCESS_RUNS(PROCESS_NAME,PROCESSED_FILE_TABLE_DATE,PROCESS_STATUS) VALUES('{processName}','{processFileDate}','{processStatus}')"""

spark.sql(processInsertSQL)