In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType
from pyspark.sql import Row
from datetime import datetime

# Define schema
sales_schema = StructType([
    StructField("OrderID", StringType(), False),
    StructField("OrderDate", DateType(), False),
    StructField("Region", StringType(), True),
    StructField("Product", StringType(), False),
    StructField("Category", StringType(), False),
    StructField("Price", DoubleType(), True),
    StructField("Quantity", IntegerType(), False)
])

# Sample Data
sales_data = [
    ("ORD009", datetime(2022, 1, 10), "West", "iPhone 15", "Electronics", 1099.99, 3),
    ("ORD010", datetime(2022, 2, 18), "East", "MacBook Pro", "Electronics", 1999.99, 1),
    ("ORD011", datetime(2022, 3, 6), "South", "Ergonomic Chair", "Furniture", 499.99, 2),
    ("ORD012", datetime(2022, 4, 20), "North", "4K Monitor", "Electronics", 329.99, 2),
    ("ORD013", datetime(2022, 5, 1), "West", "Echo Dot", "Electronics", 99.99, 1),
    ("ORD014", datetime(2022, 6, 15), "East", "iPad Pro", "Electronics", 899.99, 2),
    ("ORD015", datetime(2022, 7, 28), "South", "Standing Desk", "Furniture", 499.99, 1),
    ("ORD016", datetime(2022, 8, 12), "North", "Echo Dot", "Electronics", 99.99, 2),
    ("ORD001", datetime(2022, 1, 10), "West", "Macbook Air", "Electronics", 1199.49, 1),
    ("ORD002", datetime(2022, 1, 10), "East", "Desk Chair", "Furniture", 149.99, 4),
    ("ORD003", datetime(2022, 1, 10), "South", "Monitor", "Electronics", 249.99, 2),
    ("ORD004", datetime(2022, 1, 10), "North", "Office Desk", "Furniture", 399.99, 1),
    ("ORD005", datetime(2022, 1, 10), "West", "Printer", "Electronics", 149.49, 1),
    ("ORD006", datetime(2022, 1, 10), "East", "iPad", "Electronics", 499.99, 3),
    ("ORD007", datetime(2022, 1, 10), "South", "Bookshelf", "Furniture", 199.99, 2),
    ("ORD008", datetime(2022, 1, 10), "North", "Printer", "Electronics", 149.49, 1),
]

# Create DataFrame
sales_df = spark.createDataFrame(data=sales_data, schema=sales_schema)

In [0]:
# Data Transformation
from pyspark.sql.functions import year, month, col, expr

sales_transformed = (
    sales_df.withColumn("Year", year(col("OrderDate")))
            .withColumn("Month", month(col("OrderDate")))
            .withColumn("SalesAmount", expr("Price * Quantity"))
)

In [0]:
# Aggregation
sales_summary = (
    sales_transformed.groupBy("Region", "Category")
                     .sum("SalesAmount")
                     .withColumnRenamed("sum(SalesAmount)", "TotalSales")
                     .orderBy("Region", "Category")
)

sales_summary.display()

In [0]:
# Save as Delta Table (overwrite mode)
sales_transformed.write.format("delta").mode("overwrite").saveAsTable("one_data_uc.sales_schema.sales_git_table")