In [ ]:
df_raw = spark.read.table("sales_raw")
df_raw.printSchema()  # Verify schema
display(df_raw.limit(10))  # Preview

StatementMeta(, e70f30ff-bc9a-42eb-b1bd-f383b079d70a, 13, Finished, Available, Finished)

root
 |-- Description: string (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: long (nullable = true)
 |-- Country: string (nullable = true)
 |-- Stockcodenew: string (nullable = true)
 |-- new_quantity: double (nullable = true)
 |-- price: double (nullable = true)
 |-- inv_number: string (nullable = true)



SynapseWidget(Synapse.DataFrame, 119e4da9-35d5-47a7-a18c-1b2f96c4c560)

**Clean and enrich**

In [ ]:
from pyspark.sql.functions import col, to_timestamp, when
df_clean = (df_raw
            .dropDuplicates(['inv_number', 'Stockcodenew'])  # Deduplicate transactions
            .filter(col('new_quantity') > 0)  # Remove negative/zero quantities (optional, see note)
            .withColumn('InvoiceDate', to_timestamp('InvoiceDate', 'M/d/yyyy H:mm'))  # Parse date
            .withColumn('TotalPrice', col('new_quantity') * col('UnitPrice'))  # Recalculate TotalPrice
            .withColumn('Country', when(col('Country') == 'United Kingdom', 'UK').otherwise(col('Country')))  # Normalize
           )

StatementMeta(, e70f30ff-bc9a-42eb-b1bd-f383b079d70a, 14, Finished, Available, Finished)

**Create customers_dim (unique customers)**

In [ ]:
customers_dim = (df_clean.select('CustomerID', 'Country')
                 .distinct()
                 .withColumnRenamed('CustomerID', 'customer_id'))
customers_dim.write.mode('overwrite').saveAsTable('customers_dim')

StatementMeta(, e70f30ff-bc9a-42eb-b1bd-f383b079d70a, 15, Finished, Available, Finished)

**Create sales_fact (transactions)**

In [ ]:
sales_fact = df_clean.select(
    col('inv_number'),
    col('CustomerID').alias('customer_id'),  # Correct alias syntax
    col('InvoiceDate'),
    col('Stockcodenew'),
    col('Description'),
    col('new_quantity'),
    col('UnitPrice'),
    col('TotalPrice')
)
sales_fact.write.mode('overwrite').saveAsTable('sales_fact')

StatementMeta(, e70f30ff-bc9a-42eb-b1bd-f383b079d70a, 17, Finished, Available, Finished)

**Preview results**

In [ ]:
display(spark.read.table('sales_fact').limit(10))
display(spark.read.table('customers_dim').limit(10))

StatementMeta(, e70f30ff-bc9a-42eb-b1bd-f383b079d70a, 18, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 6347dfb0-ec74-462d-850e-2fbc8728af47)

SynapseWidget(Synapse.DataFrame, 85438183-a320-47c1-bafa-9c0c462cdeb0)