In [None]:
# Libs
from notebookutils import mssparkutils
import requests
from datetime import datetime, timedelta, date
from pathlib import Path
import json
from pyspark.sql.functions import to_timestamp, col, explode, lit
from pyspark.sql.types import StructType, DateType, StructField, StringType, IntegerType, ArrayType

In [None]:
# Parameters
is_incremental = False # True or False
start_date_manual = datetime(2025, 4, 20)
end_date_manual = datetime(2025, 4, 24)
landing_path = "Files/landing/"
loaded_path = "Files/loaded/"

In [None]:
# Creates the folders paths
mssparkutils.fs.mkdirs(landing_path)
mssparkutils.fs.mkdirs(loaded_path)

In [None]:
def extract_max_date_from_files(folder_path):
    """
    Function to extract max_date from files when is_incremental is true.
    """

    # List existing filenames from folder_path
    files = mssparkutils.fs.ls(folder_path)

    # A nullable list
    dates = []

    # From each file extracts the date and append to the list
    for file in files:
        filename = file.path.rsplit("/", 1)[-1]
        # Ensure json extension and format with date
        if filename.endswith(".json") and len(filename) == len("YYYY-MM-DD.json"):
            date_str = filename[:-5]
            # Try append with date format
            try:
                date_obj = datetime.strptime(date_str, "%Y-%m-%d").date()
                dates.append(date_obj)
            except ValueError:
                print(f"Ignored: {filename}")  

    # Extract max_date from the list
    max_date = max(dates) if dates else None

    return max_date


In [None]:
# Define the start and end dates by mode
if is_incremental:
    max_date = extract_max_date_from_files(loaded_path)
    start_date = max_date + timedelta(days=1)
    end_date = date.today() 
else:
    start_date = start_date_manual
    end_date = end_date_manual
    

In [None]:
# Getting data from blob storage
def date_range(start, end):
    while start <= end:
        yield start.strftime("%Y-%m-%d")
        start += timedelta(days=1)

for date_str in date_range(start_date, end_date):
    url = "https://raw.githubusercontent.com/" \
        "alisonpezzott/incr-json-spark-ms-fabric-sample/" \
        f"refs/heads/main/snapshots/completed/{date_str}.json"
    response = requests.get(url)
    
    if response.status_code == 200:
        local_file = f"/tmp/{date_str}.json"
        with open(local_file, "w", encoding="utf-8") as f:
            f.write(response.text)

        mssparkutils.fs.cp(f"file:{local_file}", f"{landing_path}{date_str}.json")
        print(f"{date_str}.json saved successfully.")
    else:
        print(f"Error downloading {url} (status {response.status_code})")
        break


In [None]:
# JSON to dataframe

schema = StructType([
    StructField("snapshot", DateType(), True),
    StructField("data", ArrayType(
        StructType([
            StructField("branch", StringType(), True),
            StructField("stocks", ArrayType(
                StructType([
                    StructField("sku", StringType(), True),
                    StructField("qty", IntegerType(), True)
                ])
            ), True)
        ])
    ), True)
])

df = spark.read \
    .option("multiline", "true") \
    .schema(schema) \
    .json(landing_path)

df_exploded = df.withColumn("data", explode("data")) \
    .withColumn("stocks", explode("data.stocks")) \
    .withColumn("snapshot", col("snapshot")) \
    .withColumn("branch", col("data.branch")) \
    .withColumn("sku", col("stocks.sku")) \
    .withColumn("qty", col("stocks.qty")) \
    .select("snapshot", "branch", "sku", "qty")

display(df_exploded)


In [None]:
# Save to delta tables
mode = 'append' if is_incremental else 'overwrite'
df_exploded.write.mode(mode).saveAsTable('fact_stocks')


In [None]:
# Moves the files from landing_path to loaded_path
files = mssparkutils.fs.ls(landing_path)

for file in files:
    source_path = file.path
    file_name = file.name
    target_path = f"{loaded_path}{file_name}"
    
    mssparkutils.fs.mv(source_path, target_path)