Name: Soham Mukherjee, 
Year: 3rd, 
Section: A, 
Registration no. : 2301292114

Self note: 
'There's one thing to keep in mind, click over the spark_env for the python notebook that is available when the conda environment is set up.'

In [1]:
# Utility: simple timestamped logger for clean outputs
import datetime
def log(msg):
    now = datetime.datetime.now().strftime('%H:%M:%S')
    print(f"[LOG {now}] {msg}")
log('Logger ready.')

[LOG 21:52:22] Logger ready.


In [2]:
# Check Python version and critical env vars
import sys, os
log(f'Python version: {sys.version.split()[0]}')
log(f'JAVA_HOME={os.environ.get("JAVA_HOME")}')
log(f'SPARK_HOME={os.environ.get("SPARK_HOME")}')

[LOG 21:52:31] Python version: 3.11.8
[LOG 21:52:31] JAVA_HOME=C:\Program Files\Java\jdk-17
[LOG 21:52:31] SPARK_HOME=C:\Spark\spark-3.5.6-bin-hadoop3


In [3]:
# If Spark isn't auto-detected, set SPARK_HOME explicitly before findspark.init()
# Example (uncomment and edit):
# import os
# os.environ['SPARK_HOME'] = r'C:\\spark'  # or '/usr/local/opt/spark'
import findspark
findspark.init()
log('findspark initialized.')

[LOG 21:52:39] findspark initialized.


In [4]:
# Create a Spark session with a small shuffle partition count for local runs
from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .appName('Spark-Practical-Notebook')
         .config('spark.sql.shuffle.partitions', '4')
         .getOrCreate())
log(f'Spark started. Version: {spark.version}')
spark

[LOG 23:43:34] Spark started. Version: 3.5.6


In [5]:
# Define schema explicitly for clarity and type safety
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
    StructField('id', IntegerType(), True),
    StructField('name', StringType(), True),
    StructField('attendance', IntegerType(), True),
    StructField('score', IntegerType(), True),
    StructField('events', IntegerType(), True),
    StructField('projects', IntegerType(), True)
])

data = [
    (1, 'Amit',   92, 88, 3, 1),
    (2, 'Priya',  85, 91, 2, 2),
    (3, 'Rahul',  70, 76, 1, 0),
    (4, 'Neha',   88, 82, 2, 1),
    (5, 'Ravi',   90, 95, 3, 3),
    (6, 'Simran', 78, 89, 1, 1)
]

df = spark.createDataFrame(data, schema)
log('Student DataFrame created.')
df.show()
df.printSchema()

# Create temp view for Spark SQL queries
df.createOrReplaceTempView('students')
log('Temp view `students` is ready.')

[LOG 23:44:03] Student DataFrame created.
+---+------+----------+-----+------+--------+
| id|  name|attendance|score|events|projects|
+---+------+----------+-----+------+--------+
|  1|  Amit|        92|   88|     3|       1|
|  2| Priya|        85|   91|     2|       2|
|  3| Rahul|        70|   76|     1|       0|
|  4|  Neha|        88|   82|     2|       1|
|  5|  Ravi|        90|   95|     3|       3|
|  6|Simran|        78|   89|     1|       1|
+---+------+----------+-----+------+--------+

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- attendance: integer (nullable = true)
 |-- score: integer (nullable = true)
 |-- events: integer (nullable = true)
 |-- projects: integer (nullable = true)

[LOG 23:44:14] Temp view `students` is ready.


In [6]:
from pyspark.sql import functions as F
# 5.1 Simple select
df.select('name', 'score', 'attendance').show()

# 5.2 Filter rows
df.filter(F.col('attendance') > 85).show()

# 5.3 New derived column
df2 = df.withColumn('score_boosted', F.col('score') + 2)
df2.orderBy(F.desc('score_boosted')).show()
log('Core data operations complete.')

+------+-----+----------+
|  name|score|attendance|
+------+-----+----------+
|  Amit|   88|        92|
| Priya|   91|        85|
| Rahul|   76|        70|
|  Neha|   82|        88|
|  Ravi|   95|        90|
|Simran|   89|        78|
+------+-----+----------+

+---+----+----------+-----+------+--------+
| id|name|attendance|score|events|projects|
+---+----+----------+-----+------+--------+
|  1|Amit|        92|   88|     3|       1|
|  4|Neha|        88|   82|     2|       1|
|  5|Ravi|        90|   95|     3|       3|
+---+----+----------+-----+------+--------+

+---+------+----------+-----+------+--------+-------------+
| id|  name|attendance|score|events|projects|score_boosted|
+---+------+----------+-----+------+--------+-------------+
|  5|  Ravi|        90|   95|     3|       3|           97|
|  2| Priya|        85|   91|     2|       2|           93|
|  6|Simran|        78|   89|     1|       1|           91|
|  1|  Amit|        92|   88|     3|       1|           90|
|  4|  Neh

In [7]:
# Highest attendance
spark.sql('SELECT * FROM students ORDER BY attendance DESC LIMIT 1').show()
# Names with score > 90
spark.sql('SELECT name FROM students WHERE score > 90').show()
log('SQL queries executed.')

+---+----+----------+-----+------+--------+
| id|name|attendance|score|events|projects|
+---+----+----------+-----+------+--------+
|  1|Amit|        92|   88|     3|       1|
+---+----+----------+-----+------+--------+

+-----+
| name|
+-----+
|Priya|
| Ravi|
+-----+

[LOG 23:45:25] SQL queries executed.


In [8]:
out_path = '/mnt/data/students_parquet'
df.write.mode('overwrite').parquet(out_path)
log(f'Wrote DataFrame to {out_path}')

back = spark.read.parquet(out_path)
log('Read back Parquet file:')
back.show()

[LOG 23:45:40] Wrote DataFrame to /mnt/data/students_parquet
[LOG 23:45:40] Read back Parquet file:
+---+------+----------+-----+------+--------+
| id|  name|attendance|score|events|projects|
+---+------+----------+-----+------+--------+
|  6|Simran|        78|   89|     1|       1|
|  3| Rahul|        70|   76|     1|       0|
|  2| Priya|        85|   91|     2|       2|
|  1|  Amit|        92|   88|     3|       1|
|  4|  Neha|        88|   82|     2|       1|
|  5|  Ravi|        90|   95|     3|       3|
+---+------+----------+-----+------+--------+



In [9]:
# Overall aggregations
(df.groupBy()
   .agg(F.avg('score').alias('avg_score'), F.max('attendance').alias('max_attendance'))
   .show())

# Group by projects
(df.groupBy('projects')
   .agg(F.count('*').alias('n'), F.avg('score').alias('avg_score'))
   .orderBy('projects')
   .show())
log('Grouping & aggregations done.')

+-----------------+--------------+
|        avg_score|max_attendance|
+-----------------+--------------+
|86.83333333333333|            92|
+-----------------+--------------+

+--------+---+-----------------+
|projects|  n|        avg_score|
+--------+---+-----------------+
|       0|  1|             76.0|
|       1|  3|86.33333333333333|
|       2|  1|             91.0|
|       3|  1|             95.0|
+--------+---+-----------------+

[LOG 23:46:04] Grouping & aggregations done.


In [10]:
# Create a small lookup to join with students
sections = spark.createDataFrame([
    (1, 'A'), (2, 'B'), (3, 'A'), (5, 'C')
], ['id', 'section'])

joined = df.join(sections, on='id', how='left')
joined.show()
log('Left join with sections completed.')

+---+------+----------+-----+------+--------+-------+
| id|  name|attendance|score|events|projects|section|
+---+------+----------+-----+------+--------+-------+
|  1|  Amit|        92|   88|     3|       1|      A|
|  2| Priya|        85|   91|     2|       2|      B|
|  3| Rahul|        70|   76|     1|       0|      A|
|  4|  Neha|        88|   82|     2|       1|   NULL|
|  5|  Ravi|        90|   95|     3|       3|      C|
|  6|Simran|        78|   89|     1|       1|   NULL|
+---+------+----------+-----+------+--------+-------+

[LOG 23:46:26] Left join with sections completed.


In [11]:
complex_df = (df
    .withColumn('scores', F.array('score', 'attendance', 'events'))
    .withColumn('profile', F.struct('name', 'projects'))
)
complex_df.select('id', 'scores', 'profile').show(truncate=False)
log('Complex type demo done.')

+---+-----------+-----------+
|id |scores     |profile    |
+---+-----------+-----------+
|1  |[88, 92, 3]|{Amit, 1}  |
|2  |[91, 85, 2]|{Priya, 2} |
|3  |[76, 70, 1]|{Rahul, 0} |
|4  |[82, 88, 2]|{Neha, 1}  |
|5  |[95, 90, 3]|{Ravi, 3}  |
|6  |[89, 78, 1]|{Simran, 1}|
+---+-----------+-----------+

[LOG 23:46:49] Complex type demo done.


In [12]:
# Example path; change to your dataset path
path = '/mnt/data/sample_transactions.csv'  # e.g., 'C:/data/transactions.csv'
try:
    trans = (spark.read
             .option('header', True)
             .option('inferSchema', True)
             .csv(path))
    log(f'Loaded transactions: {trans.count()} rows')
    trans.printSchema()

    # Casting and simple KPIs
    trans2 = (trans
              .withColumn('amount', F.col('amount').cast('double'))
              .withColumn('transaction_date', F.to_date('transaction_date')))

    daily = (trans2.groupBy('transaction_date')
                    .agg(F.sum('amount').alias('total_amount')))
    daily.show(10)

    top_products = (trans2.groupBy('product')
                           .agg(F.sum('amount').alias('revenue'))
                           .orderBy(F.desc('revenue')))
    top_products.show(10)

    # Save outputs
    out_kpi = '/mnt/data/transaction_kpi_parquet'
    daily.write.mode('overwrite').parquet(out_kpi)
    log(f'Saved daily KPI to {out_kpi}')
except Exception as e:
    log(f'[Mini-Lab] Skipped or failed: {e}')

[LOG 23:46:55] [Mini-Lab] Skipped or failed: [PATH_NOT_FOUND] Path does not exist: file:/mnt/data/sample_transactions.csv.


In [13]:
spark.stop()
log('Spark session stopped. ✅')

[LOG 23:47:05] Spark session stopped. ✅
