<a href="https://colab.research.google.com/github/Clirmoo/Big-Data-Engineering/blob/main/Copy_of_Hands_On_Activity_1_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Hands-On-Activity-1.1

## Objective(s)
The objective of this activity is to familiarize students with PySpark and Google Colab by setting up a foundational big data environment suitable for distributed data processing.

## Intended Learning Outcomes

At the end of this activity, the students should be able to:


*   Analyze how PySpark handles data processing compared to traditional tools.
*   Evaluate the benefits of distributed computing using simulated large-scale operations.
*   Create a basic data processing workflow using PySpark DataFrames on Google Colab



## Discussion

**What is PySpark?**

Apache Spark is an open-source, distributed computing system designed for fast processing of large-scale data. PySpark is the Python interface for Apache Spark. It handles large datasets efficiently with parallel computation in Python workflows, ideal for batch processing, real-time streaming, machine learning, data analytics, and SQL querying. PySpark supports industries like finance, healthcare, and e-commerce with speed and scalability.


**Spark Cluster**


A key component of working with PySpark is clusters. A Spark cluster is a group of computers (nodes) that collaboratively process large datasets using Apache Spark, with a master node coordinating multiple worker nodes. This architecture enables distributed processing. The master node manages resources and tasks, while worker nodes execute assigned compute tasks.


**Spark Session**


A SparkSession is the entry point into PySpark, enabling interaction with Apache Spark's core capabilities. It allows us to execute queries, process data, and manage resources in the Spark cluster.

## Materials and Equipment


*   Internet connection
*   Google Colab
*   Personal Computer



## Procedure

Importing Google Colab and Initializationof SparkSession

In [None]:
from pyspark.sql import SparkSession
my_spark = SparkSession.builder.appName("my_spark").getOrCreate()
print(my_spark)

<pyspark.sql.session.SparkSession object at 0x78e4ac18a110>


### Data Loading

In [None]:
from google.colab import drive
drive.mount('/content/drive')

MessageError: Error: credential propagation was unsuccessful

In [None]:
username_df = my_spark.read.csv("/content/username.csv", header=True, inferSchema=True) ## TODO: Change path according to your folder setup
username_df.show()

+---------------+----------+----------+---------+
|       Username|Identifier|First Name|Last Name|
+---------------+----------+----------+---------+
|  graylooker123|      1002|    Robert|  Jenkins|
|    ironlord982|      1003|     Linux| Cromwell|
|  skyrimlord023|      1004| Cassandra|    Ramos|
|wannabefortnite|      1005|    Joseph|   Pandas|
|    smithSummer|      1006|    Tensor|     Flow|
+---------------+----------+----------+---------+



Follow up question, what happens when you run this code?



```
# This is formatted as code
```

**When I run the code the output will be an error because the file path is incorrect or does not exist at all.**

count the number of rows

In [None]:
username_df.count()

5

### Sample Filtering of dataframe

In [None]:
df_filtered = username_df.filter(username_df["Identifier"] > 1002)

In [None]:
df_filtered.show()

+---------------+----------+----------+---------+
|       Username|Identifier|First Name|Last Name|
+---------------+----------+----------+---------+
|    ironlord982|      1003|     Linux| Cromwell|
|  skyrimlord023|      1004| Cassandra|    Ramos|
|wannabefortnite|      1005|    Joseph|   Pandas|
|    smithSummer|      1006|    Tensor|     Flow|
+---------------+----------+----------+---------+



What’s the output?

**The output will be the data points.**

### Aggregating and Grouping

In [None]:
from pyspark.sql.functions import avg
username_df.groupBy("Username").agg(avg("Identifier")).show()

+---------------+---------------+
|       Username|avg(Identifier)|
+---------------+---------------+
|wannabefortnite|         1005.0|
|  skyrimlord023|         1004.0|
|    ironlord982|         1003.0|
|  graylooker123|         1002.0|
|    smithSummer|         1006.0|
+---------------+---------------+



Summarize the procedure conducted

**Initializing a SparkSession, the gateway to all Spark capabilities, was the first step in the process.  Next, a Spark DataFrame was loaded with a CSV file called username.csv.  The DataFrame's row count was determined.  The DataFrame was then filtered to only contain rows with a value of 1002 in the "Identifier" column.  Lastly, the data was sorted by "Username" and the "Identifier" average for each username was determined.**

# Supplementary Activity

1. Create a dataframe
2. Use the old SparkSession instead of creating a new one
3. Load the Employee_salaries.csv
4. Filter the Employees based on their gender
5. Group the Employees based on their Gender and Average their Salaries
6. Compute annual salary for each employee.
7. Sort the result and display the highest average.

*Note that for each number in the supplementary activity (1-7), provide and explanation of the procedure, output, and/or analysis*

In [None]:
#1 Create a dataframe
employee_df = my_spark.read.csv("/content/Employee_Salaries.csv", header=True, inferSchema=True)

+----------+--------------------+--------------------+------+-----------+------------+-------------+-----+
|Department|     Department_Name|            Division|Gender|Base_Salary|Overtime_Pay|Longevity_Pay|Grade|
+----------+--------------------+--------------------+------+-----------+------------+-------------+-----+
|       ABS|Alcohol Beverage ...|ABS 85 Administra...|     M|   175873.0|         0.0|          0.0|   M2|
|       ABS|Alcohol Beverage ...|ABS 85 Administra...|     M|  145613.36|         0.0|          0.0|   M3|
|       ABS|Alcohol Beverage ...|ABS 85 Administra...|     F|   136970.0|         0.0|          0.0|   M3|
|       ABS|Alcohol Beverage ...|ABS 85 Administra...|     F|  89432.694|         0.0|       2490.0|   21|
|       ABS|Alcohol Beverage ...|ABS 85 Administra...|     F|    78947.0|      456.68|       6257.7|   16|
|       ABS|Alcohol Beverage ...|ABS 85 Administra...|     F|    98228.0|       518.8|       998.28|   21|
|       ABS|Alcohol Beverage ...|ABS 

**Observation:** I create a dataframe and then indicate the file path.

In [None]:
#2 use the old SparkSession instead creating of a new one
from pyspark.sql import SparkSession
my_spark = SparkSession.builder.appName("my_spark").getOrCreate()
print(my_spark)

<pyspark.sql.session.SparkSession object at 0x78e4ac18a110>


**Observation:** i use the old spark session that I use in the procedure.

In [None]:
#3 Load the Employee_salaries.csv
employee_df.show()

+----------+--------------------+--------------------+------+-----------+------------+-------------+-----+
|Department|     Department_Name|            Division|Gender|Base_Salary|Overtime_Pay|Longevity_Pay|Grade|
+----------+--------------------+--------------------+------+-----------+------------+-------------+-----+
|       ABS|Alcohol Beverage ...|ABS 85 Administra...|     M|   175873.0|         0.0|          0.0|   M2|
|       ABS|Alcohol Beverage ...|ABS 85 Administra...|     M|  145613.36|         0.0|          0.0|   M3|
|       ABS|Alcohol Beverage ...|ABS 85 Administra...|     F|   136970.0|         0.0|          0.0|   M3|
|       ABS|Alcohol Beverage ...|ABS 85 Administra...|     F|  89432.694|         0.0|       2490.0|   21|
|       ABS|Alcohol Beverage ...|ABS 85 Administra...|     F|    78947.0|      456.68|       6257.7|   16|
|       ABS|Alcohol Beverage ...|ABS 85 Administra...|     F|    98228.0|       518.8|       998.28|   21|
|       ABS|Alcohol Beverage ...|ABS 

**Observation:** I loaded my dataframe the using the function employee_df.show()

In [None]:
#4 Filter the Employees based on their gender
employee_df.filter(employee_df["Gender"] == "M").show()
employee_df.filter(employee_df["Gender"] == "F").show()

+----------+--------------------+--------------------+------+-----------+------------+-------------+-----+
|Department|     Department_Name|            Division|Gender|Base_Salary|Overtime_Pay|Longevity_Pay|Grade|
+----------+--------------------+--------------------+------+-----------+------------+-------------+-----+
|       ABS|Alcohol Beverage ...|ABS 85 Administra...|     M|   175873.0|         0.0|          0.0|   M2|
|       ABS|Alcohol Beverage ...|ABS 85 Administra...|     M|  145613.36|         0.0|          0.0|   M3|
|       ABS|Alcohol Beverage ...|ABS 85 Administra...|     M|    93986.0|     1187.06|      2452.94|  N20|
|       ABS|Alcohol Beverage ...|ABS 85 Administra...|     M|   117424.0|         0.0|          0.0|  N25|
|       ABS|Alcohol Beverage ...|ABS 85 Administra...|     M| 65961.8438|      2092.7|          0.0|   13|
|       ABS|Alcohol Beverage ...|ABS 85 Administra...|     M|   59288.86|     1013.01|          0.0|   13|
|       ABS|Alcohol Beverage ...|ABS 

**Observation:** I use two functions to filter the gender in my dataframe.

In [None]:
#5 Group the Employees based on their Gender and Average their Salaries
from pyspark.sql.functions import avg
employee_df.groupBy("Gender").agg(avg("Base_Salary")).show()

+------+-----------------+
|Gender| avg(Base_Salary)|
+------+-----------------+
|     F|87497.50279041701|
|     M|92382.92975236966|
+------+-----------------+



**Observation:** I add all the base salary of an employee to get the average salaries of the male and female employees.

In [None]:
#6 Compute annual salary for each employee.
employee_df = employee_df.withColumn("Annual_Salary", employee_df["Base_Salary"] * 12)
employee_df.show()

+----------+--------------------+--------------------+------+-----------+------------+-------------+-----+------------------+
|Department|     Department_Name|            Division|Gender|Base_Salary|Overtime_Pay|Longevity_Pay|Grade|     Annual_Salary|
+----------+--------------------+--------------------+------+-----------+------------+-------------+-----+------------------+
|       ABS|Alcohol Beverage ...|ABS 85 Administra...|     M|   175873.0|         0.0|          0.0|   M2|         2110476.0|
|       ABS|Alcohol Beverage ...|ABS 85 Administra...|     M|  145613.36|         0.0|          0.0|   M3|1747360.3199999998|
|       ABS|Alcohol Beverage ...|ABS 85 Administra...|     F|   136970.0|         0.0|          0.0|   M3|         1643640.0|
|       ABS|Alcohol Beverage ...|ABS 85 Administra...|     F|  89432.694|         0.0|       2490.0|   21|       1073192.328|
|       ABS|Alcohol Beverage ...|ABS 85 Administra...|     F|    78947.0|      456.68|       6257.7|   16|          94

**Observation:** I simply do arithmetic by multiplying the base salary of the employee to 12 months to get the annual salary of each employee.

In [None]:
#7 Sort the result and display the highest average.
employee_df.orderBy("Annual_Salary", ascending=False).show()

+----------+--------------------+--------------------+------+-----------+------------+-------------+-----+-------------+
|Department|     Department_Name|            Division|Gender|Base_Salary|Overtime_Pay|Longevity_Pay|Grade|Annual_Salary|
+----------+--------------------+--------------------+------+-----------+------------+-------------+-----+-------------+
|       CEX|Offices of the Co...|CEX 15 Chief Admi...|     M|   292000.0|         0.0|          0.0|  EX0|    3504000.0|
|       CAT|County Attorney's...|CAT 30 County Att...|     M|   258000.0|         0.0|          0.0|  EX1|    3096000.0|
|       POL|Department of Police|POL 47 HQ Police ...|     M|   258000.0|         0.0|          0.0|  EX1|    3096000.0|
|       CCL|      County Council|CCL 01 Council Ce...|     F|  246162.47|         0.0|          0.0| NULL|   2953949.64|
|       DGS|Department of Gen...|     DGS 36 Director|     M|   246000.0|         0.0|          0.0|  EX1|    2952000.0|
|       DOT|Department of Tra...

**Observation:** I sort the average of each employee from descending order.

# HOA Conclusion

**I have effectively set up a PySpark environment in Google Colab based on the practical exercises, exhibiting a fundamental comprehension of distributed data processing. I have examined how PySpark manages data processing by building and modifying Spark DataFrames, observing its effectiveness in comparison to conventional techniques. By executing procedures on the supplied datasets, I have additionally assessed the advantages of distributed computing. I've successfully loaded data, filtered it, and carried out aggregations to establish a simple data processing workflow. I have thereby achieved all of the activity's goals.**