In [13]:
# Install PySpark package so we can use Apache Spark in Colab
!pip install pyspark

#  Import necessary libraries

# PySpark for distributed data processing
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count

# Pandas is used to convert Spark DataFrame to Pandas for saving or visualization
import pandas as pd

# Matplotlib for creating basic visualizations
import matplotlib.pyplot as plt

# This helps upload files from local machine into Colab
from google.colab import files

# Upload the Age Dataset CSV file manually using the file upload prompt
# Once uploaded, the file will be available in the Colab environment
# uploaded = files.upload("/content/AgeDataset-V1.csv")
df = pd.read_csv("/content/AgeDataset-V1.csv")

#  Create or get a Spark session which is required to use PySpark
# This step starts a local Spark engine in the Colab environment
spark = SparkSession.builder.appName("AgeDatasetBatchPipeline").getOrCreate()

#  Load the CSV file into a PySpark DataFrame
# The 'header=True' means it will use the first row as column names
# 'inferSchema=True' tells Spark to guess the data types automatically
df = spark.read.csv("/content/AgeDataset-V1.csv", header=True, inferSchema=True)

#  Print the schema to understand data structure and types
df.printSchema()

#  Show the first 5 rows to quickly preview the data
df.show(5)

#  Function to clean and standardize the dataset
def preprocess_data(df):
    #  Remove any rows that have null (missing) values
    df_clean = df.dropna()

    #  Standardize column names (lowercase, no spaces)
    # Loop through each column name and rename
    for colname in df_clean.columns:
        new_name = colname.strip().lower().replace(" ", "_")
        df_clean = df_clean.withColumnRenamed(colname, new_name)

    #  Return the cleaned DataFrame
    return df_clean

#  Apply the cleaning function to our raw data
df_clean = preprocess_data(df)

#  Show the cleaned data to verify
df_clean.show(5)

from pyspark.sql.functions import col

#  Convert 'age_of_death' to integer for proper aggregation
df_clean = df_clean.withColumn("age_of_death", col("age_of_death").cast("int"))

# Perform aggregation: group by 'country' and 'gender'
# You can adjust this grouping depending on your report focus

batch_result = df_clean.groupBy("country", "gender").agg(
    count("*").alias("record_count"),                  # Count number of people in each group
    avg("age_of_death").alias("average_age_of_death")  # Compute average age at death
)

#  Order the result to make it cleaner
batch_result = batch_result.orderBy("country", "gender")

#  Show the aggregated results
batch_result.show(10)

#  Convert the Spark DataFrame to a Pandas DataFrame for saving or exporting
batch_result_pd = batch_result.toPandas()

# Save the aggregated data to a CSV file in the Colab filesystem
batch_result_pd.to_csv("processed_batch_output.csv", index=False)

#  Download the CSV file to your local computer
files.download("processed_batch_output.csv")



root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Short description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Birth year: string (nullable = true)
 |-- Death year: string (nullable = true)
 |-- Manner of death: string (nullable = true)
 |-- Age of death: string (nullable = true)

+----+--------------------+--------------------+------+--------------------+----------+----------+----------+---------------+------------+
|  Id|                Name|   Short description|Gender|             Country|Occupation|Birth year|Death year|Manner of death|Age of death|
+----+--------------------+--------------------+------+--------------------+----------+----------+----------+---------------+------------+
| Q23|   George Washington|1st president of ...|  Male|United States of ...|Politician|      1732|      1799| natural causes|          67|
| Q42|       Douglas Adam

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>