In [1]:
!apt-get install openjdk-17-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

!pip install -q pyspark


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Test").getOrCreate()
df = spark.read.csv("/content/sample_data/california_housing_test.csv", header=True, inferSchema=True)
df.show()


+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -122.05|   37.37|              27.0|     3885.0|         661.0|    1537.0|     606.0|       6.6085|          344700.0|
|   -118.3|   34.26|              43.0|     1510.0|         310.0|     809.0|     277.0|        3.599|          176500.0|
|  -117.81|   33.78|              27.0|     3589.0|         507.0|    1484.0|     495.0|       5.7934|          270500.0|
|  -118.36|   33.82|              28.0|       67.0|          15.0|      49.0|      11.0|       6.1359|          330000.0|
|  -119.67|   36.33|              19.0|     1241.0|         244.0|     850.0|     237.0|       2.9375|           81700.0|
|  -119.56|   36.51|    

In [3]:
# Install Java 17
!apt-get install openjdk-17-jdk-headless -qq > /dev/null

# Download Spark 3.5.0
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz

# Extract Spark
!tar xf spark-3.5.0-bin-hadoop3.tgz

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

# Install PySpark
!pip install -q pyspark


Create SparkSession & Read CSV

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Week7_PySpark_Practice") \
    .getOrCreate()

print("Spark Session Created Successfully!")


Spark Session Created Successfully!


Uploading CSV file

In [5]:
from google.colab import files
uploaded = files.upload()


Saving data.csv to data.csv


Read CSV

In [6]:
df = spark.read.csv(list(uploaded.keys())[0], header=True, inferSchema=True)
df.show(5)
df.printSchema()


+---+-------+-----------+------+-----+----------+-------+
| id|   name| department|salary|bonus| join_date|country|
+---+-------+-----------+------+-----+----------+-------+
|  1|  Alice|Engineering|120000| 8000|2018-03-12|    USA|
|  2|    Bob|  Marketing| 90000| 5000|2019-07-19|     UK|
|  3|Charlie|    Finance|110000| 7000|2020-02-10|  India|
|  4|  David|Engineering|130000|10000|2017-11-03| Canada|
|  5|    Eva|         HR| 85000| 4000|2021-01-15|    USA|
+---+-------+-----------+------+-----+----------+-------+
only showing top 5 rows

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- bonus: integer (nullable = true)
 |-- join_date: date (nullable = true)
 |-- country: string (nullable = true)



DataFrame Operations

In [10]:
df.select("name").show(5)



+-------+
|   name|
+-------+
|  Alice|
|    Bob|
|Charlie|
|  David|
|    Eva|
+-------+
only showing top 5 rows



In [12]:
df.filter(df.salary > 100).show(5)

+---+-------+-----------+------+-----+----------+-------+
| id|   name| department|salary|bonus| join_date|country|
+---+-------+-----------+------+-----+----------+-------+
|  1|  Alice|Engineering|120000| 8000|2018-03-12|    USA|
|  2|    Bob|  Marketing| 90000| 5000|2019-07-19|     UK|
|  3|Charlie|    Finance|110000| 7000|2020-02-10|  India|
|  4|  David|Engineering|130000|10000|2017-11-03| Canada|
|  5|    Eva|         HR| 85000| 4000|2021-01-15|    USA|
+---+-------+-----------+------+-----+----------+-------+
only showing top 5 rows



In [13]:
df.printSchema()


root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- bonus: integer (nullable = true)
 |-- join_date: date (nullable = true)
 |-- country: string (nullable = true)



Aggregations

In [15]:
from pyspark.sql import functions as F

df_grouped = df.groupBy("department") \
               .agg(
                   F.avg("salary").alias("avg_salary"),
                   F.sum("salary").alias("sum_salary")
               )

df_grouped.show()

+-----------+------------------+----------+
| department|        avg_salary|sum_salary|
+-----------+------------------+----------+
|      Sales|           93750.0|    750000|
|Engineering|          138125.0|   2210000|
|         HR|           87600.0|    438000|
|    Finance|110714.28571428571|    775000|
|  Marketing| 96571.42857142857|    676000|
|    Support| 72857.14285714286|    510000|
+-----------+------------------+----------+



Save Processed Data (CSV/JSON)

In [16]:
df.write.mode("overwrite").csv("/content/output_csv")
print("Saved to /content/output_csv")


Saved to /content/output_csv


In [17]:
df.write.mode("overwrite").json("/content/output_json")
print("Saved to /content/output_json")


Saved to /content/output_json


In [18]:
!zip -r output_csv.zip /content/output_csv
files.download("output_csv.zip")


  adding: content/output_csv/ (stored 0%)
  adding: content/output_csv/_SUCCESS (stored 0%)
  adding: content/output_csv/.part-00000-39f7d9b9-58fa-4db5-b092-df0d272abfeb-c000.csv.crc (stored 0%)
  adding: content/output_csv/._SUCCESS.crc (stored 0%)
  adding: content/output_csv/part-00000-39f7d9b9-58fa-4db5-b092-df0d272abfeb-c000.csv (deflated 59%)


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Mini Project: Spark ETL Pipeline

In [27]:
# Read Raw Data
raw_df = df

# Clean Data
clean_df = raw_df.dropna() \
                 .withColumnRenamed("old_name", "new_name") # This line might need adjustment if there's no 'old_name' column

# Transform Data (Filtering + Aggregation)
transformed_df = clean_df.filter(clean_df.salary > 0) \
                         .groupBy("department") \
                         .agg(F.count("*").alias("count_records"))

# Save Final Output
transformed_df.write.mode("overwrite").json("/content/final_output")
print("Final ETL output saved!")

# Download ETL Output
!zip -r final_output.zip /content/final_output
files.download("final_output.zip")


Final ETL output saved!
  adding: content/final_output/ (stored 0%)
  adding: content/final_output/part-00000-683e51f3-2b3e-4b7b-b444-4dd341054020-c000.json (deflated 62%)
  adding: content/final_output/_SUCCESS (stored 0%)
  adding: content/final_output/._SUCCESS.crc (stored 0%)
  adding: content/final_output/.part-00000-683e51f3-2b3e-4b7b-b444-4dd341054020-c000.json.crc (stored 0%)


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>