# PRACTISE

# PySpark Practice Notebook for Data Engineering & ETL on Databricks

This notebook is designed for hands-on practice with PySpark, focusing on ETL pipelines and data engineering tasks commonly performed in Databricks environments.

## Sections:

1. Environment Setup & Installation
2. SparkSession Initialization
3. Basic DataFrame Operations
4. Data Ingestion (CSV, Parquet, JSON)
5. Transformations & Actions
6. ETL Pipeline Example
7. DataFrame Joins & Aggregations
8. Writing Data (Parquet, Delta, etc.)
9. Useful Tips & Resources


## 1. Environment Setup & Installation

Ensure you have PySpark installed. If not, run the following cell to install it.


In [8]:
# # Install PySpark if not already installed
# !pip install pyspark

## 2. SparkSession Initialization

Create a SparkSession, which is the entry point to using PySpark.


In [2]:
# from pyspark.sql import SparkSession

# # Initialize SparkSession
# spark = SparkSession.builder.appName("PySparkPractice").getOrCreate()

# # Check Spark version
# print("Spark Version:", spark.version)

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PySparkPractice").getOrCreate()
print("Spark Version:", spark.version)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/11 20:34:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/07/11 20:35:00 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Spark Version: 4.0.0


## 3. Basic DataFrame Operations

Create a DataFrame, view schema, and perform simple operations.


In [4]:
# # Create a sample DataFrame
# data = [(1, "Alice", 29), (2, "Bob", 31), (3, "Cathy", 25)]
# columns = ["id", "name", "age"]
# df = spark.createDataFrame(data, columns)

# # Show DataFrame
# df.show()

# # Print schema
# df.printSchema()

# # Select columns
# df.select("name", "age").show()

In [5]:
data = [(1, "Alice", 29), (2, "Bob", 31), (3, "Cathy", 25)]
columns = ["id", "name", "age"]
df = spark.createDataFrame(data, columns)
df.show()

                                                                                

+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|Alice| 29|
|  2|  Bob| 31|
|  3|Cathy| 25|
+---+-----+---+



## 4. Data Ingestion (CSV, Parquet, JSON)

Read data from different file formats into DataFrames.


In [6]:
# # Example: Reading CSV, Parquet, and JSON files
# csv_path = '../data/sample.csv'  # Update with your file path
# parquet_path = '../data/sample.parquet'
# json_path = '../data/sample.json'

# # Read CSV
# df_csv = spark.read.option('header', True).csv(csv_path)
# df_csv.show()

# # Read Parquet
# df_parquet = spark.read.parquet(parquet_path)
# df_parquet.show()

# # Read JSON
# df_json = spark.read.json(json_path)
# df_json.show()

In [None]:
csv_path = "../data/sample.csv"
parquet_path = "../data/sample.parquet"
json_path = "../data/sample.json"

df_csv = spark.read.option("header", True).csv(csv_path)
df_parquet = spark.read.parquet(parquet_path)
df_json = spark.read.json(json_path)

+---+---+-----+
|age| id| name|
+---+---+-----+
| 29|  1|Alice|
| 31|  2|  Bob|
| 25|  3|Cathy|
+---+---+-----+



## 5. Transformations & Actions

Learn the difference between transformations (lazy) and actions (trigger execution) in Spark.


In [None]:
# Transformations: filter, select, withColumn
df_filtered = df.filter(df.age > 25)
df_selected = df.select('name', 'age')
df_newcol = df.withColumn('age_plus_10', df.age + 10)

# Actions: show, count, collect
df_filtered.show()
print('Count:', df.count())
print('Names:', df_selected.collect())

In [61]:
df.show()

+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|Alice| 29|
|  2|  Bob| 31|
|  3|Cathy| 25|
+---+-----+---+



In [64]:
df.filter(df.age>25).show()

+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|Alice| 29|
|  2|  Bob| 31|
+---+-----+---+



In [65]:
df.select("name", "age").show()

+-----+---+
| name|age|
+-----+---+
|Alice| 29|
|  Bob| 31|
|Cathy| 25|
+-----+---+



In [67]:
df_selected = df.select("name", "age")
df_selected.collect()

[Row(name='Alice', age=29), Row(name='Bob', age=31), Row(name='Cathy', age=25)]

In [69]:
df.withColumn('age_plus_10', df.age + 10).show()

+---+-----+---+-----------+
| id| name|age|age_plus_10|
+---+-----+---+-----------+
|  1|Alice| 29|         39|
|  2|  Bob| 31|         41|
|  3|Cathy| 25|         35|
+---+-----+---+-----------+



## 6. ETL Pipeline Example

A simple ETL pipeline: Extract data, Transform it, and Load it to storage.


In [None]:
# Simple ETL Pipeline Example
## Extract
input_path = '../data/input.csv'  # Update with your file path
df_etl = spark.read.option('header', True).csv(input_path)

## Transform
df_etl_clean = df_etl.dropna().withColumnRenamed('old_column', 'new_column')

## Load
output_path = '../data/output.parquet'
df_etl_clean.write.mode('overwrite').parquet(output_path)
print('ETL pipeline completed!')

In [71]:
input_path = "../data/input.csv"
df_etl = spark.read.option("header", True).csv(input_path)

df_etl_clean = df_etl.dropna().withColumnRenamed("old_column", "new_column")
df_etl_clean.show()

+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|Alice| 29|
|  2|  Bob| 31|
|  3|Cathy| 25|
+---+-----+---+



In [72]:
output_path = "../data/output.parquet"
df_etl_clean.write.mode('overwrite').parquet(output_path)
print('ETL pipeline completed')

ETL pipeline completed


## 7. DataFrame Joins & Aggregations

Practice joining DataFrames and performing aggregations.


In [None]:
# DataFrame Joins Example
df1 = spark.createDataFrame([(1, 'A'), (2, 'B')], ['id', 'val1'])
df2 = spark.createDataFrame([(1, 'X'), (2, 'Y')], ['id', 'val2'])

df_joined = df1.join(df2, on='id', how='inner')
df_joined.show()

# Aggregation Example
from pyspark.sql import functions as F
df_agg = df.groupBy('age').agg(F.count('id').alias('count'))
df_agg.show()

In [76]:
df1 = spark.createDataFrame([(1,'A'), (2, 'B')], ['id', 'val1'])
df2 = spark.createDataFrame([(1,'X'), (2, 'Y')], ['id', 'val2'])

df1.show(), df2.show()

+---+----+
| id|val1|
+---+----+
|  1|   A|
|  2|   B|
+---+----+

+---+----+
| id|val2|
+---+----+
|  1|   X|
|  2|   Y|
+---+----+



(None, None)

In [None]:
from pyspark.sql import functions as F

(
    df
    .groupBy('age')
    .agg(F.count('id').alias('count'))
).show()

+---+-----+
|age|count|
+---+-----+
| 29|    1|
| 31|    1|
| 25|    1|
+---+-----+



## 8. Writing Data (Parquet, Delta, etc.)

Save DataFrames to different formats.


In [None]:
# Write DataFrame to Parquet
df.write.mode('overwrite').parquet('..data/output_df.parquet')

# If using Delta Lake (Databricks), you can write as Delta format:
# df.write.format('delta').mode('overwrite').save('data/output_df_delta')

In [None]:
df.write.mode('overwrite').parquet('/Users/vamsi_mbmax/Developer/VAM_Documents/01_vam_PROJECTS/LEARNING/proj_Databases/dev_proj_Databases/practise_db_book_medallion_architecture/data/output_df.parquet')

## 9. Useful Tips & Resources

- [PySpark Documentation](https://spark.apache.org/docs/latest/api/python/)
- [Databricks Documentation](https://docs.databricks.com/)
- [Spark SQL, DataFrames, and Datasets Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html)
- Use `.explain()` to understand query plans.
- Use `.cache()` for performance when reusing DataFrames.

Happy Practising! 🚀
