# Import Required Libraries
Import the necessary libraries, including PySpark.

In [None]:
# Import Required Libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, dayofmonth, col
from datetime import datetime
import os

# Initialize Spark session
spark = SparkSession.builder.appName("DataAggregation").getOrCreate()

# Create Widget for Dataset Name
Create a Databricks widget to input the dataset name.

In [None]:
# Create Widget for Dataset Name
dbutils.widgets.text("dataset_name", "", "Enter Dataset Name")

# Get the dataset name from the widget
dataset_name = dbutils.widgets.get("dataset_name")

# Define the path to the raw folder
raw_folder_path = f"/dbfs/raw/{dataset_name}.csv"

# Read the CSV file into a DataFrame
df = spark.read.csv(raw_folder_path, header=True, inferSchema=True)

# Display the first few rows of the DataFrame
df.show(5)

# Read CSV from Raw Folder
Read the CSV file from the raw folder using the dataset name provided by the widget.

In [None]:
# Read CSV from Raw Folder
# Read the CSV file from the raw folder using the dataset name provided by the widget.

# Define the path to the stage folder
stage_folder_path = f"/dbfs/stage/{dataset_name}.csv"

# Function to aggregate data by year, month, and day
def aggregate_data(df):
    df = df.withColumn("year", year(col("date"))) \
           .withColumn("month", month(col("date"))) \
           .withColumn("day", dayofmonth(col("date")))
    aggregated_df = df.groupBy("year", "month", "day").sum()
    return aggregated_df

# Aggregate the data
aggregated_df = aggregate_data(df)

# Add created_ts column with the current date and time
aggregated_df = aggregated_df.withColumn("created_ts", lit(datetime.now()))

# Append the aggregated data to the stage folder
if os.path.exists(stage_folder_path):
    existing_df = spark.read.csv(stage_folder_path, header=True, inferSchema=True)
    aggregated_df = existing_df.union(aggregated_df)

aggregated_df.write.csv(stage_folder_path, mode="overwrite", header=True)

# Display the first few rows of the aggregated DataFrame
aggregated_df.show(5)

# Define Aggregation Function
Define a function that aggregates the data by year, month, and day.

In [None]:
from pyspark.sql.functions import year, month, dayofmonth, col, lit
from datetime import datetime

# Function to aggregate data by year, month, and day
def aggregate_data(df):
    df = df.withColumn("year", year(col("date"))) \
           .withColumn("month", month(col("date"))) \
           .withColumn("day", dayofmonth(col("date")))
    aggregated_df = df.groupBy("year", "month", "day").sum()
    return aggregated_df

# Aggregate Data and Append to Stage Folder
Aggregate the data, add a created_ts column with the current date and time, and append the result to the stage folder.

In [None]:
from pyspark.sql.functions import year, month, dayofmonth, col, lit
from datetime import datetime

# Function to aggregate data by year, month, and day
def aggregate_data(df):
    df = df.withColumn("year", year(col("date"))) \
           .withColumn("month", month(col("date"))) \
           .withColumn("day", dayofmonth(col("date")))
    aggregated_df = df.groupBy("year", "month", "day").sum()
    return aggregated_df

# Aggregate the data
aggregated_df = aggregate_data(df)

# Add created_ts column with the current date and time
aggregated_df = aggregated_df.withColumn("created_ts", lit(datetime.now()))

# Append the aggregated data to the stage folder
if os.path.exists(stage_folder_path):
    existing_df = spark.read.csv(stage_folder_path, header=True, inferSchema=True)
    aggregated_df = existing_df.union(aggregated_df)

aggregated_df.write.csv(stage_folder_path, mode="overwrite", header=True)

# Display the first few rows of the aggregated DataFrame
aggregated_df.show(5)