## 1.Loading Data

In [0]:
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("Exercise 1").getOrCreate()
# Load data from CSV into a DataFrame
data_df = spark.read.csv("dbfs:/FileStore/addresses.csv", header=True, inferSchema=True)
# Show the first few rows of the DataFrame
data_df.show(4)

+---+-----------+--------------------+---------+------------+--------------+-----------+-------+
| id|location_id|           address_1|address_2|        city|state_province|postal_code|country|
+---+-----------+--------------------+---------+------------+--------------+-----------+-------+
|  1|          1|2600 Middlefield ...|     null|Redwood City|            CA|      94063|     US|
|  2|          2|    24 Second Avenue|     null|   San Mateo|            CA|      94401|     US|
|  3|          3|    24 Second Avenue|     null|   San Mateo|            CA|      94403|     US|
|  4|          4|    24 Second Avenue|     null|   San Mateo|            CA|      94401|     US|
+---+-----------+--------------------+---------+------------+--------------+-----------+-------+
only showing top 4 rows



## 2.Data exploration

In [0]:
# Display the schema of the DataFrame
data_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- location_id: integer (nullable = true)
 |-- address_1: string (nullable = true)
 |-- address_2: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state_province: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- country: string (nullable = true)



## 3.Data Filtering

In [0]:
from pyspark.sql.functions import col
# Filter the DataFrame
filtered_df = data_df.filter(col("id") > 10)
# Show the filtered DataFrame
filtered_df.show(4)

+---+-----------+--------------------+---------+------------+--------------+-----------+-------+
| id|location_id|           address_1|address_2|        city|state_province|postal_code|country|
+---+-----------+--------------------+---------+------------+--------------+-----------+-------+
| 11|         11| 2140 Euclid Avenue.|     null|Redwood City|            CA|      94061|     US|
| 12|         12|1044 Middlefield ...|2nd Floor|Redwood City|            CA|      94063|     US|
| 13|         13| 399 Marine Parkway.|     null|Redwood City|            CA|      94065|     US|
| 14|         14|  660 Veterans Blvd.|     null|Redwood City|            CA|      94063|     US|
+---+-----------+--------------------+---------+------------+--------------+-----------+-------+
only showing top 4 rows



## 4.Data Transformation

In [0]:
from pyspark.sql.functions import expr
# Create the new column
transformed_df = data_df.withColumn("id", expr("id * 2"))
# Show the DataFrame with the new column
transformed_df.show(4)

+---+-----------+--------------------+---------+------------+--------------+-----------+-------+
| id|location_id|           address_1|address_2|        city|state_province|postal_code|country|
+---+-----------+--------------------+---------+------------+--------------+-----------+-------+
|  2|          1|2600 Middlefield ...|     null|Redwood City|            CA|      94063|     US|
|  4|          2|    24 Second Avenue|     null|   San Mateo|            CA|      94401|     US|
|  6|          3|    24 Second Avenue|     null|   San Mateo|            CA|      94403|     US|
|  8|          4|    24 Second Avenue|     null|   San Mateo|            CA|      94401|     US|
+---+-----------+--------------------+---------+------------+--------------+-----------+-------+
only showing top 4 rows



## 5.Daat Aggregation

In [0]:
from pyspark.sql.functions import avg
# Calculate the average age
avg_id = data_df.select(avg("id")).first()[0]
print("Average id:", avg_id)

Average id: 11.0


## 6.Data Grouping

In [0]:
# Group by "gender" and calculate the average age
grouped_df = data_df.groupBy("gender").agg(avg("age").alias("average_age"))
# Show the grouped DataFrame
grouped_df.show()

## 7.Joining dataframes

In [0]:
# Load employees data from CSV into DataFrame
employees_df = spark.read.csv("employees.csv", header=True, inferSchema=True)

# Load departments data from CSV into DataFrame
departments_df = spark.read.csv("departments.csv", header=True, inferSchema=True)

# Join the DataFrames on "department_id"
joined_df = employees_df.join(departments_df, "department_id")

# Show the joined DataFrame
joined_df.show()

## 8.Data Duplication

In [0]:
# Deduplicate the DataFrame
deduplicated_df = joined_df.dropDuplicates()
# Show the deduplicated DataFrame
deduplicated_df.show()

## 9.Data Imputation
* Replace the missing values in the "salary" column with the average salary.

In [0]:
from pyspark.sql.functions import avg

# Calculate the average salary
avg_salary = joined_df.select(avg("salary")).first()[0]

# Replace missing values with average salary
imputed_df = joined_df.na.fill(avg_salary, subset=["salary"])

# Show the DataFrame with imputed values
imputed_df.show()

## 10. Data Aggregation with GroupBy
* Group the DataFrame by the "department_name" column and calculate the total salary for each department.

In [0]:
# Group by "department_name" and calculate the total salary
salary_by_department_df = joined_df.groupBy("department_name").agg(sum("salary").alias("total_salary"))

# Show the DataFrame with total salary for each department
salary_by_department_df.show()

## 11.Data sorting
* sort the DataFrame based on the "age" column in ascending order.

In [0]:
# Sort the DataFrame by "age" in ascending order
sorted_df = joined_df.orderBy("age")

# Show the sorted DataFrame
sorted_df.show()

## 12.Data Joining with Different Column Names
* Load two CSV files "employees.csv" and "departments.csv" into separate DataFrames. Join the DataFrames on "dept_id" from the "employees" DataFrame and "id" from the "departments" DataFrame to get a single DataFrame containing both employee and department information.

In [0]:
# Load employees data from CSV into DataFrame with custom column names
employees_df = spark.read.csv("employees.csv", header=True, inferSchema=True).withColumnRenamed("dept_id", "department_id")

# Load departments data from CSV into DataFrame
departments_df = spark.read.csv("departments.csv", header=True, inferSchema=True)

# Join the DataFrames on different column names
joined_df = employees_df.join(departments_df, employees_df["department_id"] == departments_df["id"])

# Show the joined DataFrame
joined_df.show()

## 13.Data Repartitioning
* Repartition the DataFrame into 5 partitions for better parallelism during processing.
* The provided code will repartition the DataFrame into 5 partitions, allowing for better parallelism during processing, and display the repartitioned DataFrame.

In [0]:
# Repartition the DataFrame into 5 partitions
repartitioned_df = joined_df.repartition(5)

# Show the repartitioned DataFrame
repartitioned_df.show()

## 14.Data Window Functions
* Calculate the rank of employees based on their salaries within each department

In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import rank

# Define the window specification
window_spec = Window.partitionBy("department_name").orderBy(joined_df["salary"].desc())

# Calculate the rank of employees based on salaries within each department
ranked_df = joined_df.withColumn("salary_rank", rank().over(window_spec))

# Show the DataFrame with salary ranks
ranked_df.show()

## 15.Data Filtering with SQL
* Filter the DataFrame to include only employees whose salary is above 50000 using SQL syntax.

In [0]:
# Register the DataFrame as a temporary SQL table
joined_df.createOrReplaceTempView("employee_data")

# Perform SQL filtering
filtered_df = spark.sql("SELECT * FROM employee_data WHERE salary > 50000")

# Show the filtered DataFrame
filtered_df.show()

## 16.Data Pivot
* Pivot the DataFrame to transform rows into columns based on the "gender" column, and calculate the average salary for each gender.

In [0]:
# Pivot the DataFrame
pivot_df = joined_df.groupBy("department_name").pivot("gender").avg("salary")

# Show the pivoted DataFrame
pivot_df.show()

##17.Data Joins - Left Outer Join
* Perform a left outer join between the "employees" DataFrame and the "departments" DataFrame on the "department_id" column, and display the combined DataFrame.

In [0]:
# Perform the left outer join
left_outer_join_df = employees_df.join(departments_df, on="department_id", how="left")
# Show the combined DataFrame
left_outer_join_df.show()

## 18.Data Coalesce
* Create a new DataFrame with the "first_name" and "last_name" columns coalesced into a single "full_name" column

In [0]:
# Create the new DataFrame with coalesced column
coalesced_df = joined_df.withColumn("full_name", concat(col("first_name"), lit(" "), col("last_name")))

# Show the DataFrame with the new "full_name" column
coalesced_df.show()

## 19.Data Aggregation - Count and GroupBy
* Count the number of employees in each department and display the result

In [0]:
# Count the number of employees in each department
employee_count_df = joined_df.groupBy("department_name").count()

# Show the DataFrame with employee counts
employee_count_df.show()

## 20.Data Sampling
* Randomly sample 10% of the data from the DataFrame.

In [0]:
# Randomly sample 10% of the data
sampled_df = joined_df.sample(0.1)

# Show the sampled DataFrame
sampled_df.show()

##21.Data Window Functions - Lag
* Create a new column "lag_salary" that contains the previous salary of each employee based on the "salary" column, within each department.

*Create a new column "lag_salary" that contains the previous salary of each employee based on the "salary" column, within each department.

In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import lag

# Define the window specification
window_spec = Window.partitionBy("department_name").orderBy("employee_id")

# Create the new column with lagged salary
lagged_df = joined_df.withColumn("lag_salary", lag("salary").over(window_spec))

# Show the DataFrame with the new column
lagged_df.show()

## 22.Data UDF - User-Defined Function
* Define a user-defined function (UDF) to categorize employees based on their salary. The function should return "High" for salaries greater than 75000, "Medium" for salaries between 50000 and 75000, and "Low" for salaries below 50000.

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define the UDF function
def categorize_salary(salary):
    if salary > 75000:
        return "High"
    elif 50000 <= salary <= 75000:
        return "Medium"
    else:
        return "Low"

# Register the UDF
categorize_salary_udf = udf(categorize_salary, StringType())

# Apply the UDF to create a new column
categorized_df = joined_df.withColumn("salary_category", categorize_salary_udf("salary"))

# Show the DataFrame with the new column
categorized_df.show()

## 23.Data Filtering - Multiple Conditions
* Filter the DataFrame to include only male employees who have a salary above 60000.

In [0]:
# Filter the DataFrame with multiple conditions
filtered_df = joined_df.filter((joined_df["gender"] == "Male") & (joined_df["salary"] > 60000))

# Show the filtered DataFrame
filtered_df.show()

## 24.Data Aggregation - Maximum Salary
* Calculate the maximum salary from the DataFrame.

In [0]:
# Calculate the maximum salary
max_salary = joined_df.agg({"salary": "max"}).collect()[0][0]

# Display the maximum salary
print("Maximum Salary:", max_salary)

## 25.Data Sampling - Stratified Sampling
* Perform stratified sampling on the DataFrame to get a sample of 20% for each department.

In [0]:
# Perform stratified sampling on the DataFrame
stratified_sample_df = joined_df.sampleBy("department_name", fractions={"HR": 0.2, "Engineering": 0.2, "Marketing": 0.2})

# Show the stratified sampled DataFrame
stratified_sample_df.show()