# Azure Synapse Analytics & Spark - Complete Portfolio

**Consolidated Notebook for Azure DP-203 Certification**

This notebook consolidates 4 key Azure Synapse and Spark notebooks:
1. Serverless SQL Pool Analysis
2. Spark Data Analysis with Schema
3. Spark Data Transformations & Partitioning
4. Delta Lake Implementation with Time Travel

---

## Section 1: Serverless SQL Pool to Analyze Data

In [None]:
# Load product data from Azure Data Lake
df = spark.read.load('abfss://files@datalaketx24ncr.dfs.core.windows.net/product_data/products.csv',
                     format='csv', header=True)
display(df.limit(10))

In [None]:
# Group products by category
df_counts = df.groupby(df.Category).count()
display(df_counts)

In [None]:
# Using Pandas for data analysis
import pandas as pd
df_pd = pd.read_csv('abfss://files@datalaketx24ncr.dfs.core.windows.net/product_data/products.csv')
(
    df_pd
    .groupby('Category')
    .agg(productCount = ('ProductID','count'))
    .reset_index()
    .sort_values(by = ['productCount'], ascending = False)
)

---
## Section 2: Spark Schema Analysis & Data Quality

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Define explicit schema
OrderSchema = StructType([
    StructField('SalesOrderNumber', StringType()),
    StructField('SalesOrderLineNumber', IntegerType()),
    StructField('OrderDate', DateType()),
    StructField('CustomerName', StringType()),
    StructField('Email', StringType()),
    StructField('Item', StringType()),
    StructField('Quantity', IntegerType()),
    StructField('UnitPrice', FloatType()),
    StructField('Tax', FloatType())
])

# Load with schema
df = spark.read.load('abfss://files@datalakey8hs4l2.dfs.core.windows.net/sales/orders/*.csv',
                     format='csv', schema=OrderSchema)
display(df.limit(5))

In [None]:
# Customer analysis
customers = df.select('CustomerName', 'Email')
print(f'Total records: {customers.count()}')
print(f'Distinct customers: {customers.distinct().count()}')
display(customers.distinct().limit(5))

In [None]:
# Revenue by year
(
    df
    .select(year('OrderDate').alias('year'), 'Quantity', 'UnitPrice', 'Tax')
    .groupBy('year')
    .agg(expr("round(sum(Quantity*UnitPrice) + sum(Tax),2)").alias('GrossRevenue'))
    .orderBy(desc('year'))
    .show()
)

In [None]:
# Create temp view
df.createOrReplaceTempView('salesorders')

In [None]:
%%sql
SELECT YEAR(OrderDate) AS OrderYear,
       SUM((UnitPrice * Quantity) + Tax) AS GrossRevenue
FROM salesorders
GROUP BY YEAR(OrderDate)
ORDER BY OrderYear;

---
## Section 3: Spark Data Transformations & Partitioning

In [None]:
import pyspark.sql.functions as f

# Load order data
order_details = spark.read.csv('/data/*.csv', header=True, inferSchema=True)
display(order_details.limit(5))

In [None]:
# Transform: Split customer name
transformed_df = (
    order_details
    .withColumn('FirstName', f.split(f.col('CustomerName'), ' ').getItem(0))
    .withColumn('LastName', f.split(f.col('CustomerName'), ' ').getItem(1))
    .drop('CustomerName')
)
display(transformed_df.limit(5))

In [None]:
# Save as Parquet
transformed_df.coalesce(1).write.mode('overwrite').parquet('/transformed_data/orders.parquet')

In [None]:
# Partition by year and month
dated_df = (
    transformed_df
    .withColumn('Year', f.year(f.col('OrderDate')))
    .withColumn('Month', f.month(f.col('OrderDate')))
)

dated_df.write.partitionBy('Year', 'Month').mode('Overwrite').parquet(
    'abfss://files@datalakebtaj9gv.dfs.core.windows.net/partitioned_data/'
)

In [None]:
# Create external table
spark.sql('CREATE DATABASE IF NOT EXISTS sales')
order_details.coalesce(1).write.saveAsTable(
    'sales.sales_orders',
    format='parquet',
    mode='overwrite',
    path='/sales_orders_table'
)

In [None]:
%%sql
SELECT * FROM sales.sales_orders LIMIT 5;

---
## Section 4: Delta Lake Implementation

In [None]:
# Load product data
df = spark.read.load('abfss://files@datalakemxp43de.dfs.core.windows.net/products/products.csv',
                     format='csv', header=True)
display(df.limit(5))

In [None]:
# Create Delta table
delta_table_path = '/delta/products-delta'
df.write.format('delta').save(delta_table_path)
print('Delta table created!')

In [None]:
from delta.tables import *

# Create DeltaTable object and update
deltaTable = DeltaTable.forPath(spark, delta_table_path)
deltaTable.update(
    condition='ProductID==771',
    set={'ListPrice': 'ListPrice * 0.9'}
)
display(deltaTable.toDF().limit(5))

In [None]:
# Time travel - access previous version
previous_df = spark.read.format('delta').option('versionAsOf', 0).load(delta_table_path)
display(previous_df.limit(5))

In [None]:
# View transaction history
deltaTable.history(10).show(20, False, True)

In [None]:
# Create catalog table
spark.sql('CREATE DATABASE IF NOT EXISTS AdventureWorks')
spark.sql(f"CREATE TABLE AdventureWorks.ProductsExternal USING DELTA LOCATION '{delta_table_path}'")

In [None]:
%%sql
USE AdventureWorks;
SELECT * FROM ProductsExternal LIMIT 5;

---
## Summary

This consolidated notebook demonstrates:

1. **Serverless SQL Pool**: Loading and analyzing data
2. **Schema Enforcement**: Defining explicit schemas
3. **Data Transformations**: Column operations and aggregations
4. **Partitioning**: Optimizing storage and query performance
5. **Delta Lake**: ACID transactions and time travel
6. **SQL Integration**: Queries and external tables

**Technologies**: Azure Synapse | Apache Spark | Delta Lake | PySpark | SQL