In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MySparkApp") \
    .config("spark.default.parallelism", "4") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import datetime

# Create Spark session with limited parallelism
spark = SparkSession.builder \
    .appName("SalesTransactionSample") \
    .config("spark.default.parallelism", 4) \
    .config("spark.sql.shuffle.partitions", 4) \
    .getOrCreate()

# Define schema
schema = StructType([
    StructField("transaction_id", StringType(), False),
    StructField("customer_id", StringType(), False),
    StructField("customer_name", StringType(), False),
    StructField("product_id", StringType(), False),
    StructField("product_name", StringType(), False),
    StructField("category", StringType(), False),
    StructField("amount", DoubleType(), False),
    StructField("quantity", IntegerType(), False),
    StructField("transaction_date", TimestampType(), False),
    StructField("payment_mode", StringType(), False),
    StructField("store_location", StringType(), False),
    StructField("discount_applied", BooleanType(), False),
    StructField("discount_amount", DoubleType(), False),
    StructField("loyalty_points_earned", IntegerType(), False),
    StructField("is_return", BooleanType(), False),
    StructField("sales_rep", StringType(), False)
])

# Sample data
data = [
    ("T001", "C001", "Alice Smith", "P001", "Laptop", "Electronics", 1200.0, 1, datetime(2024, 6, 1, 10, 30), "Credit Card", "New York", True, 100.0, 120, False, "SR01"),
    ("T002", "C002", "Bob Lee", "P002", "Jeans", "Clothing", 60.0, 2, datetime(2024, 6, 2, 14, 15), "Cash", "Los Angeles", False, 0.0, 12, False, "SR02"),
    ("T003", "C003", "Carla Gomez", "P003", "Organic Apples", "Grocery", 15.0, 5, datetime(2024, 6, 3, 9, 45), "PayPal", "Chicago", True, 2.0, 3, False, "SR03"),
    ("T004", "C004", "David Kim", "P004", "Bluetooth Speaker", "Electronics", 85.0, 1, datetime(2024, 6, 4, 16, 0), "Gift Card", "Houston", False, 0.0, 8, True, "SR01"),
    ("T005", "C005", "Eva Brown", "P005", "Dress", "Clothing", 120.0, 1, datetime(2024, 6, 5, 11, 20), "Credit Card", "Miami", True, 20.0, 15, False, "SR02"),
]

# Create DataFrame
df = spark.createDataFrame(data, schema=schema)
df.show(truncate=False)


Generate additional 1000 rows of random data with the same schema and append it to the
original DataFrame (keep 4 partitions) use native python

In [0]:
import random
from datetime import datetime, timedelta

first_names = ["Alice", "Bob", "Carol", "Dave", "Eve"]
last_names = ["Smith", "Johnson", "Brown", "Lee", "Garcia"]
cities = ["New York", "San Francisco", "Chicago", "Austin", "Seattle"]

def random_name():
    return f"{random.choice(first_names)} {random.choice(last_names)}"

def random_city():
    return random.choice(cities)

def random_date_within_30_days():
    now = datetime.now()
    delta = timedelta(days=random.randint(0, 30), seconds=random.randint(0, 86400))
    return now - delta

categories = ["Electronics", "Clothing", "Grocery", "Toys", "Books"]
products = {
    "Electronics": [("P100", "Smartphone"), ("P101", "Tablet"), ("P102", "Headphones")],
    "Clothing": [("P200", "T-Shirt"), ("P201", "Jacket"), ("P202", "Sneakers")],
    "Grocery": [("P300", "Bananas"), ("P301", "Milk"), ("P302", "Bread")],
    "Toys": [("P400", "Lego Set"), ("P401", "Puzzle"), ("P402", "Action Figure")],
    "Books": [("P500", "Novel"), ("P501", "Comics"), ("P502", "Cookbook")]
}
payment_modes = ["Credit Card", "Cash", "PayPal", "Gift Card", "Debit Card"]
sales_reps = ["SR01", "SR02", "SR03", "SR04"]

new_data = []
for i in range(1000):
    category = random.choice(categories)
    product_id, product_name = random.choice(products[category])
    amount = round(random.uniform(5, 2000), 2)
    quantity = random.randint(1, 5)
    discount_applied = random.choice([True, False])
    discount_amount = round(random.uniform(0, amount * 0.2), 2) if discount_applied else 0.0
    loyalty_points_earned = int(amount // 10)
    is_return = random.choice([True, False])
    transaction_date = random_date_within_30_days()
    row = (
        f"T{1000 + i:03d}",
        f"C{1000 + i:03d}",
        random_name(),
        product_id,
        product_name,
        category,
        amount,
        quantity,
        transaction_date,
        random.choice(payment_modes),
        random_city(),
        discount_applied,
        discount_amount,
        loyalty_points_earned,
        is_return,
        random.choice(sales_reps)
    )
    new_data.append(row)

df_new = spark.createDataFrame(new_data, schema=schema)
df_combined = df.union(df_new).repartition(4)
df_combined.show(5, truncate=False)
print(f"Total rows: {df_combined.count()}")


Generate the code to write to uc and read from it. I am writing and reading to UC instead of creating the parquet and csv files due to missing cloud storage.

In [6]:
# Unity Catalog only: write managed Delta table and validate via SQL (Spark Connect safe)
catalog = "workspace"   # replace with your UC catalog if different
schema = "default"     # replace with your UC schema if different
table = "sales_transactions_uc"

# Ensure target partitions and write
df_to_save = df_combined.repartition(4)

try:
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{schema}")
    df_to_save.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.{table}")
    print("Saved Delta table:", f"{catalog}.{schema}.{table}")
except Exception as e:
    print("UC write failed:", e)

try:
    # Use SQL to validate (avoids DataFrame.rdd which Spark Connect doesn't implement)
    spark.sql(f"SELECT COUNT(*) AS cnt FROM {catalog}.{schema}.{table}").show()
    spark.sql(f"DESCRIBE DETAIL {catalog}.{schema}.{table}").show(truncate=False)
    spark.sql(f"SELECT * FROM {catalog}.{schema}.{table} LIMIT 10").show(truncate=False)
except Exception as e:
    print("UC read/validation failed:", e)

NameError: name 'df_combined' is not defined

In [None]:
# Remove null values and add order_year column
from pyspark.sql.functions import year, col

# Drop rows with any nulls in any column
df_clean = df_combined.dropna()

# Add integer order_year extracted from transaction_date
df_with_year = df_clean.withColumn("order_year", year(col("transaction_date")))

try:
    # show basic validation
    print(f"Rows after dropna: {df_with_year.count()}")
    df_with_year.printSchema()
    df_with_year.show(10, truncate=False)
except Exception as e:
    print("Validation show failed:", e)

# Optionally save cleaned table in UC (uncomment to persist).
# Replace catalog/schema/table names if needed.
try:
    cleaned_table = "sales_transactions_uc_cleaned"
    df_with_year.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.{cleaned_table}")
    print("Saved cleaned table:", f"{catalog}.{schema}.{cleaned_table}")
except Exception as e:
    print("Saving cleaned UC table failed (you can ignore if you don't want to persist):", e)

Get total sales per year from the table using SQL

In [None]:
# Get total sales per year from UC table (try cleaned table first, fallback to original)
catalog = "workspace"   # adjust if different
schema = "default"     # adjust if different
cleaned_table = "sales_transactions_uc_cleaned"
orig_table = "sales_transactions_uc"

try:
    print(f"Querying cleaned table: {catalog}.{schema}.{cleaned_table}")
    spark.sql(f"SELECT order_year, SUM(amount * quantity) AS total_sales, COUNT(*) AS transactions FROM {catalog}.{schema}.{cleaned_table} GROUP BY order_year ORDER BY order_year").show(truncate=False)
except Exception as e1:
    print('Could not query cleaned table (fallback):', e1)
    try:
        print(f"Querying original table and extracting year: {catalog}.{schema}.{orig_table}")
        spark.sql(f"SELECT YEAR(transaction_date) AS order_year, SUM(amount * quantity) AS total_sales, COUNT(*) AS transactions FROM {catalog}.{schema}.{orig_table} GROUP BY YEAR(transaction_date) ORDER BY YEAR(transaction_date)").show(truncate=False)
    except Exception as e2:
        print('Failed to compute total sales per year from UC tables:', e2)

Get total transaction amount per payment method

In [None]:
# Total transaction amount per payment method (try cleaned table first, fallback to original)
catalog = "workspace"   # adjust if different
schema = "default"     # adjust if different
cleaned_table = "sales_transactions_uc_cleaned"
orig_table = "sales_transactions_uc"

try:
    print(f"Querying cleaned table: {catalog}.{schema}.{cleaned_table}")
    spark.sql(f"SELECT payment_mode, SUM(amount * quantity) AS total_amount, COUNT(*) AS transactions FROM {catalog}.{schema}.{cleaned_table} GROUP BY payment_mode ORDER BY total_amount DESC").show(truncate=False)
except Exception as e1:
    print('Could not query cleaned table (fallback):', e1)
    try:
        print(f"Querying original table: {catalog}.{schema}.{orig_table}")
        spark.sql(f"SELECT payment_mode, SUM(amount * quantity) AS total_amount, COUNT(*) AS transactions FROM {catalog}.{schema}.{orig_table} GROUP BY payment_mode ORDER BY total_amount DESC").show(truncate=False)
    except Exception as e2:
        print('Failed to compute total transaction amount per payment method from UC tables:', e2)

Optimize previous queries for better performance

In [None]:
# Optimized aggregations (serverless-safe) - NO CACHE TABLE
catalog = "workspace"   # adjust if different
schema = "default"     # adjust if different
cleaned_table = "sales_transactions_uc_cleaned"
orig_table = "sales_transactions_uc"

# Tune shuffle partitions for these aggregations (small number for demo)
spark.conf.set('spark.sql.shuffle.partitions', 4)

try:
    # choose source table (cleaned preferred)
    try:
        spark.sql(f"SELECT 1 FROM {catalog}.{schema}.{cleaned_table} LIMIT 1").show()
        source = f"{catalog}.{schema}.{cleaned_table}"
    except Exception:
        source = f"{catalog}.{schema}.{orig_table}"

    # create narrow temp view with only required columns; compute order_year if needed
    if source.endswith(cleaned_table):
        create_sql = f"CREATE OR REPLACE TEMP VIEW tmp_sales AS SELECT order_year, amount, quantity, payment_mode FROM {source}"
    else:
        create_sql = f"CREATE OR REPLACE TEMP VIEW tmp_sales AS SELECT YEAR(transaction_date) AS order_year, amount, quantity, payment_mode FROM {source}"
    spark.sql(create_sql)

    # 1) total sales per year (amount * quantity) - run on temp view (no cache)
    spark.sql("""
    SELECT order_year,
           SUM(amount * quantity) AS total_sales,
           COUNT(*) AS transactions
    FROM tmp_sales
    GROUP BY order_year
    ORDER BY order_year
    """).show(truncate=False)

    # 2) total transaction amount per payment method
    spark.sql("""
    SELECT payment_mode,
           SUM(amount * quantity) AS total_amount,
           COUNT(*) AS transactions
    FROM tmp_sales
    GROUP BY payment_mode
    ORDER BY total_amount DESC
    """).show(truncate=False)

    # cleanup temp view
    spark.sql("DROP VIEW IF EXISTS tmp_sales")
except Exception as e:
    print('Optimized aggregation failed (serverless-safe):', e)

Identify transactions with negative amounts

In [None]:
# Identify transactions with negative total amount (amount * quantity < 0)
catalog = "workspace"   # adjust if different
schema = "default"     # adjust if different
cleaned_table = f"{catalog}.{schema}.sales_transactions_uc_cleaned"
orig_table = f"{catalog}.{schema}.sales_transactions_uc"

sql_template = "SELECT transaction_id, customer_id, transaction_date, amount, quantity, (amount * quantity) AS total_amount, payment_mode, store_location FROM {table} WHERE (amount * quantity) < 0 ORDER BY transaction_date DESC LIMIT 100"

try:
    print(f"Querying cleaned table: {cleaned_table}")
    spark.sql(sql_template.format(table=cleaned_table)).show(truncate=False)
except Exception as e1:
    print('Cleaned table query failed or table missing, falling back to original:', e1)
    try:
        print(f"Querying original table: {orig_table}")
        spark.sql(sql_template.format(table=orig_table)).show(truncate=False)
    except Exception as e2:
        print('Both cleaned and original table queries failed:', e2)

Validate if Credit transactions are lower than Debit transactions per each customer and not higher
than 10% of the total amount

In [None]:
# Validate per-customer: credit < debit AND credit <= 10% of total_amount; show violations
catalog = "workspace"   # adjust if different
schema = "default"     # adjust if different
cleaned_table = f"{catalog}.{schema}.sales_transactions_uc_cleaned"
orig_table = f"{catalog}.{schema}.sales_transactions_uc"

try:
    # prefer cleaned table if present
    try:
        spark.sql(f"SELECT 1 FROM {cleaned_table} LIMIT 1").show()
        source = cleaned_table
    except Exception:
        source = orig_table

    validation_sql = f'''
    WITH agg AS (
      SELECT customer_id,
             SUM(CASE WHEN lower(payment_mode) LIKE '%credit%' THEN amount * quantity ELSE 0 END) AS total_credit,
             SUM(CASE WHEN lower(payment_mode) LIKE '%debit%' THEN amount * quantity ELSE 0 END) AS total_debit,
             SUM(amount * quantity) AS total_amount
      FROM {source}
      GROUP BY customer_id
    )
    SELECT
      customer_id,
      total_credit,
      total_debit,
      total_amount,
      CASE WHEN total_amount = 0 THEN 0 ELSE total_credit / total_amount END AS credit_pct,
      (total_credit < total_debit) AS credit_lt_debit,
      (total_credit <= 0.1 * total_amount) AS credit_le_10pct
    FROM agg
    WHERE NOT (total_credit < total_debit AND total_credit <= 0.1 * total_amount)
    ORDER BY credit_pct DESC
    LIMIT 100
    '''

    spark.sql(validation_sql).show(truncate=False)
except Exception as e:
    print('Customer-level credit/debit validation failed:', e)

In [None]:
# Generate calendar DataFrame for year 2024
from pyspark.sql.functions import sequence, to_date, explode, col, date_format, dayofweek, weekofyear, quarter, expr

start = '2024-01-01'
end = '2024-12-31'

df_calendar = (
    spark.createDataFrame([(start, end)], ['start', 'end'])
    .select(sequence(to_date(col('start')), to_date(col('end')), expr('interval 1 day')).alias('dates'))
    .select(explode(col('dates')).alias('date'))
    .withColumn('year', date_format(col('date'), 'yyyy').cast('int'))
    .withColumn('month', date_format(col('date'), 'MM').cast('int'))
    .withColumn('day', date_format(col('date'), 'dd').cast('int'))
    .withColumn('day_of_week', date_format(col('date'), 'E'))
    .withColumn('is_weekend', dayofweek(col('date')).isin(1, 7))
    .withColumn('week_of_year', weekofyear(col('date')))
    .withColumn('month_name', date_format(col('date'), 'MMMM'))
    .withColumn('quarter', quarter(col('date')))
)

df_calendar.printSchema()
df_calendar.show(10, truncate=False)