# PySpark Transformations Tutorial (Complete)

This notebook is a complete, self-contained walkthrough of core PySpark transformations.
It is provided as markdown-first with illustrative code examples so it renders well
without requiring a PySpark runtime in this environment. Copy any code cell into your
Spark-capable environment to execute it.

What you will learn:
- How to create a SparkSession
- How to build small DataFrames from in-memory data
- Core transformations: select, withColumn, filter, groupBy/agg
- Joins and broadcast joins
- Window functions (row_number, rank, lag)
- Practical optimization tips: cache/persist, repartition/coalesce, explain plans
- Simple I/O patterns and temp views for SQL


## 0) Environment setup

If you are running this in a plain Python environment, install PySpark first:




In [None]:
!pip install pyspark

In [None]:
Then start your SparkSession:

In [None]:
from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .appName("pyspark-transformations-demo")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions", "4")
    .getOrCreate()
)

## 1) Create sample DataFrames

We'll use tiny, in-memory datasets for transactions and customers.

```python
transactions = [
    (1, '2025-11-01', 101, 120.50, 'SUCCESS'),
    (2, '2025-11-01', 102,  75.00, 'FAILED'),
    (3, '2025-11-02', 101,  15.99, 'SUCCESS'),
    (4, '2025-11-02', 103, 220.00, 'PENDING'),
    (5, '2025-11-03', 104,  50.00, 'SUCCESS'),
    (6, '2025-11-03', 101,  80.00, 'SUCCESS'),
    (7, '2025-11-04', 105,  20.00, 'SUCCESS'),
]
customers = [
    (101, 'Ana',   'MOZ'),
    (102, 'Carlos','MOZ'),
    (103, 'Asha',  'TZA'),
    (104, 'Musa',  'KEN'),
    # 105 intentionally missing to show join behavior
]

from pyspark.sql import functions as F

tx_df = spark.createDataFrame(transactions, ['tx_id','tx_date','customer_id','amount','status'])
cust_df = spark.createDataFrame(customers, ['customer_id','customer_name','country'])

tx_df.show()
cust_df.show()
```


## 2) Select, withColumn, filter

Add derived columns and filter rows.

```python
enriched_df = (
    tx_df
    .select('tx_id','tx_date','customer_id','amount','status')
    .withColumn('tx_dt', F.to_date('tx_date'))
    .withColumn('is_success', F.col('status') == F.lit('SUCCESS'))
    .withColumn('fee', F.round(F.col('amount') * F.lit(0.02), 2))
)

enriched_df.filter('is_success').show()
enriched_df.printSchema()
```


## 3) GroupBy and Aggregations

Compute total successful amounts per customer and per day.

```python
success_df = enriched_df.filter(F.col('is_success'))

by_customer = success_df.groupBy('customer_id').agg(
    F.count('*').alias('success_count'),
    F.round(F.sum('amount'), 2).alias('total_amount'),
)

by_day = success_df.groupBy('tx_dt').agg(
    F.round(F.sum('amount'), 2).alias('total_amount'),
)

by_customer.show()
by_day.orderBy('tx_dt').show()
```


## 4) Joins and Broadcast Joins

Join transactions with customers. Use broadcast when the right side is small.

```python
# Regular left join
joined = tx_df.join(cust_df, on='customer_id', how='left')
joined.select('tx_id','customer_id','customer_name','country','amount','status').show()

# Broadcast join (hint)
bjoined = tx_df.hint('broadcast').join(cust_df.hint('broadcast'), 'customer_id', 'left')
bjoined.explain()
bjoined.show()
```


## 5) Window Functions

Compute row_number per customer and daily ranks and lags.

```python
from pyspark.sql.window import Window

w_by_customer_date = Window.partitionBy('customer_id').orderBy('tx_dt')
w_daily = Window.partitionBy('tx_dt').orderBy(F.col('amount').desc())

win_df = (
    enriched_df
    .withColumn('rn_per_customer', F.row_number().over(w_by_customer_date))
    .withColumn('rank_daily_amount', F.rank().over(w_daily))
    .withColumn('prev_amount', F.lag('amount', 1).over(w_by_customer_date))
)

win_df.orderBy('customer_id','tx_dt','rn_per_customer').show()
```


## 6) Partitioning and Caching

Repartition/coalesce and cache for repeated use.

```python
# Repartition by customer for potential join/group-by locality
rp = success_df.repartition('customer_id')
print('Partitions (repartitioned):', rp.rdd.getNumPartitions())

# Coalesce to reduce partitions without full shuffle
co = rp.coalesce(2)
print('Partitions (coalesced):', co.rdd.getNumPartitions())

# Cache if reused multiple times
co_cached = co.cache()
co_cached.count()  # materialize
co_cached.groupBy('customer_id').agg(F.sum('amount')).show()
```


## 7) I/O and Explain Plans

Write small output to a local temp folder and display query plan.

```python
out_path = '/tmp/pyspark_transformations_demo'
# Use overwrite mode to keep runs idempotent in local tests
by_customer.write.mode('overwrite').parquet(out_path)
print('Wrote output to:', out_path)

# Explain a representative query
by_customer.explain()

# Register a temp view for SQL examples
win_df.createOrReplaceTempView('tx_enriched')
spark.sql("SELECT customer_id, COUNT(*) AS cnt FROM tx_enriched GROUP BY customer_id").show()
```


## 8) Best Practices Recap

- Prefer DataFrames over RDDs for optimization via Catalyst.
- Use predicate pushdown by filtering early when reading from columnar formats.
- Broadcast small dimension tables in skewed or small-large joins.
- Manage partitions: align with join/aggregation keys, avoid too many tiny partitions.
- Cache only when reusing data; unpersist if no longer needed.
- Validate plans with `explain()` and monitor shuffles and stages in the UI.

See also in this folder:
- spark-architecture.md
- optimization-techniques.md


## 9) Cleanup

If running interactively, stop your session when done:

```python
spark.stop()
print('Spark session stopped.')
```