In [1]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j


import os
import sys
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql.functions import col, when
from pyspark.sql.functions import concat


spark= SparkSession \
       .builder \
       .appName("PySpark Challenge") \
       .getOrCreate()

spark

[33m0% [Working][0m            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ Packages [66.7 kB]
Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [1,311 kB]
Hit:6 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:7 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Get:9 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:10 http://security.ubuntu.com/ubuntu jammy-security/main amd64 Packages [2,606 kB]
Get:11 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease [24.3 kB]
Get:12 https://r2u.stat.illinois.edu/ubuntu jammy/main all Pack

# New section

In [2]:
data = [
    ("Alice", "Sales", 1000),
    ("Bob", "Sales", 1500),
    ("Charlie", "HR", 1200),
    ("David", "HR", 800),
    ("Eve", "IT", 2000),
    ("Frank", "IT", 2200),
    ("Jim", "ATA", 1300),
    ("Millie", "Finance", 2300),
    ("Samantha", "Sales", 1000)
]

#define schema
columns = ["Name", "Department", "Salary"]

employee_df = spark.createDataFrame(data = data, schema= columns)
employee_df.show()

+--------+----------+------+
|    Name|Department|Salary|
+--------+----------+------+
|   Alice|     Sales|  1000|
|     Bob|     Sales|  1500|
| Charlie|        HR|  1200|
|   David|        HR|   800|
|     Eve|        IT|  2000|
|   Frank|        IT|  2200|
|     Jim|       ATA|  1300|
|  Millie|   Finance|  2300|
|Samantha|     Sales|  1000|
+--------+----------+------+



In [4]:
# select name and dep
name_dep_df = employee_df.select('Name', 'Department')
name_dep_df.show()

+--------+----------+
|    Name|Department|
+--------+----------+
|   Alice|     Sales|
|     Bob|     Sales|
| Charlie|        HR|
|   David|        HR|
|     Eve|        IT|
|   Frank|        IT|
|     Jim|       ATA|
|  Millie|   Finance|
|Samantha|     Sales|
+--------+----------+



In [5]:
finance_dep_df = name_dep_df.filter(name_dep_df["Department"]=='Finance')
finance_dep_df.show()

+------+----------+
|  Name|Department|
+------+----------+
|Millie|   Finance|
+------+----------+



In [6]:
high_salary_df = employee_df.filter(employee_df['Salary']>1100)
high_salary_df.show()

+-------+----------+------+
|   Name|Department|Salary|
+-------+----------+------+
|    Bob|     Sales|  1500|
|Charlie|        HR|  1200|
|    Eve|        IT|  2000|
|  Frank|        IT|  2200|
|    Jim|       ATA|  1300|
| Millie|   Finance|  2300|
+-------+----------+------+



In [8]:
non_it_df = employee_df.filter(~(employee_df["Department"]=="IT"))
non_it_df.show()


+--------+----------+------+
|    Name|Department|Salary|
+--------+----------+------+
|   Alice|     Sales|  1000|
|     Bob|     Sales|  1500|
| Charlie|        HR|  1200|
|   David|        HR|   800|
|     Jim|       ATA|  1300|
|  Millie|   Finance|  2300|
|Samantha|     Sales|  1000|
+--------+----------+------+



In [10]:
total_salary_per_department = employee_df.groupBy('Department').sum("Salary").withColumnRenamed("sum(Salary)", "Total Salary")
total_salary_per_department.show()

+----------+------------+
|Department|Total Salary|
+----------+------------+
|     Sales|        3500|
|        HR|        2000|
|   Finance|        2300|
|        IT|        4200|
|       ATA|        1300|
+----------+------------+



In [12]:
num_of_employees_per_department = employee_df.groupBy("Department").count().withColumnRenamed("count", "Number of Employees")
num_of_employees_per_department.show()


+----------+-------------------+
|Department|Number of Employees|
+----------+-------------------+
|     Sales|                  3|
|        HR|                  2|
|   Finance|                  1|
|        IT|                  2|
|       ATA|                  1|
+----------+-------------------+



In [13]:
employees_with_bonus_df = employee_df.withColumn("Salary with Bonus", employee_df['Salary']*1.3)
employees_with_bonus_df.show()

+--------+----------+------+-----------------+
|    Name|Department|Salary|Salary with Bonus|
+--------+----------+------+-----------------+
|   Alice|     Sales|  1000|           1300.0|
|     Bob|     Sales|  1500|           1950.0|
| Charlie|        HR|  1200|           1560.0|
|   David|        HR|   800|           1040.0|
|     Eve|        IT|  2000|           2600.0|
|   Frank|        IT|  2200|           2860.0|
|     Jim|       ATA|  1300|           1690.0|
|  Millie|   Finance|  2300|           2990.0|
|Samantha|     Sales|  1000|           1300.0|
+--------+----------+------+-----------------+



In [18]:
employees_with_bonus_df.write.mode("overwrite").format("csv").save("sample_data/employees.csv", header=True)

In [21]:
employees_with_bonus_df_r = spark.read.option("header", "true").csv("sample_data/employees.csv")
employees_with_bonus_df_r.show()

+--------+----------+------+-----------------+
|    Name|Department|Salary|Salary with Bonus|
+--------+----------+------+-----------------+
|     Eve|        IT|  2000|           2600.0|
|   Frank|        IT|  2200|           2860.0|
|     Jim|       ATA|  1300|           1690.0|
|  Millie|   Finance|  2300|           2990.0|
|Samantha|     Sales|  1000|           1300.0|
|   Alice|     Sales|  1000|           1300.0|
|     Bob|     Sales|  1500|           1950.0|
| Charlie|        HR|  1200|           1560.0|
|   David|        HR|   800|           1040.0|
+--------+----------+------+-----------------+



In [23]:
dept_data = [
    ("Sales", "New York"),
    ("HR", "San Francisco"),
    ("IT", "Seattle"),
    ("Entertainment", "Boston"),
    ("Schmoozing", "London")
]

In [24]:
dept_columns = ['Department', 'City']

dept_df = spark.createDataFrame(data=dept_data, schema=dept_columns)
dept_df.show(2)

+----------+-------------+
|Department|         City|
+----------+-------------+
|     Sales|     New York|
|        HR|San Francisco|
+----------+-------------+
only showing top 2 rows



In [26]:
inner_join_df = employees_with_bonus_df_r.join(dept_df, on='Department', how='inner')
inner_join_df.show()

+----------+--------+------+-----------------+-------------+
|Department|    Name|Salary|Salary with Bonus|         City|
+----------+--------+------+-----------------+-------------+
|     Sales|     Bob|  1500|           1950.0|     New York|
|     Sales|   Alice|  1000|           1300.0|     New York|
|     Sales|Samantha|  1000|           1300.0|     New York|
|        HR|   David|   800|           1040.0|San Francisco|
|        HR| Charlie|  1200|           1560.0|San Francisco|
|        IT|   Frank|  2200|           2860.0|      Seattle|
|        IT|     Eve|  2000|           2600.0|      Seattle|
+----------+--------+------+-----------------+-------------+



"""
[
    ("1", "Apple", "Large", "green"),
    ("2", "Apple", "Large", "yellow"),
    ("3", "Apple", "Large", "red"),
    ("4", "Apple", "small", "red"),
    ("5", "Apple", "small", "green"),
    ("6", "Apple", "Large", "red"),
    ("7", "Apple", "small", "yellow")
]


Using pyspark write a simple application that reads above data into a dataframe (structure above).

Columns after loading should be (id, fruit, size, colour)

- Add an extra column "test_apple" which is a boolean flag. If apple is small and red set it to 1, otherwise to 0
- remove any duplicates from the data
- filter out all rows where test_apple is set to 0
- save data locally to a json format

"""


In [30]:
fruit_dat = [
    ("1", "Apple", "Large", "green"),
    ("2", "Apple", "Large", "yellow"),
    ("3", "Apple", "Large", "red"),
    ("4", "Apple", "small", "red"),
    ("5", "Apple", "small", "green"),
    ("6", "Apple", "Large", "red"),
    ("7", "Apple", "small", "yellow")
]
fruit_columns = ['id', 'fruit', 'size', 'colour']

In [34]:
fruit_df = spark.createDataFrame(data=fruit_dat, schema=fruit_columns)
fruit_df.show()

+---+-----+-----+------+
| id|fruit| size|colour|
+---+-----+-----+------+
|  1|Apple|Large| green|
|  2|Apple|Large|yellow|
|  3|Apple|Large|   red|
|  4|Apple|small|   red|
|  5|Apple|small| green|
|  6|Apple|Large|   red|
|  7|Apple|small|yellow|
+---+-----+-----+------+



In [47]:
red_col = fruit_df['colour']=='red'
small_col = fruit_df['size']=='small'
red_col

Column<'(colour = red)'>

In [41]:
fruit_bool_df = fruit_df.withColumn("test_apple", when(red_col & small_col, 1).otherwise(0))
# TODO: standardize `fruit`, and add extra condition
fruit_bool_df.show()

+---+-----+-----+------+----------+
| id|fruit| size|colour|test_apple|
+---+-----+-----+------+----------+
|  1|Apple|Large| green|     false|
|  2|Apple|Large|yellow|     false|
|  3|Apple|Large|   red|     false|
|  4|Apple|small|   red|      true|
|  5|Apple|small| green|     false|
|  6|Apple|Large|   red|     false|
|  7|Apple|small|yellow|     false|
+---+-----+-----+------+----------+



In [42]:
fruit_drop_na = fruit_bool_df.drop_duplicates()
fruit_drop_na.show()
# |  6|Apple|Large|   red|     false| dropped

+---+-----+-----+------+----------+
| id|fruit| size|colour|test_apple|
+---+-----+-----+------+----------+
|  1|Apple|Large| green|     false|
|  3|Apple|Large|   red|     false|
|  2|Apple|Large|yellow|     false|
|  5|Apple|small| green|     false|
|  7|Apple|small|yellow|     false|
|  4|Apple|small|   red|      true|
|  6|Apple|Large|   red|     false|
+---+-----+-----+------+----------+



In [43]:
fruit_filtered = fruit_drop_na.filter(~(fruit_drop_na['test_apple']==0))
fruit_filtered.show()

+---+-----+-----+------+----------+
| id|fruit| size|colour|test_apple|
+---+-----+-----+------+----------+
|  4|Apple|small|   red|      true|
+---+-----+-----+------+----------+



In [44]:
fruit_filtered.write.json("sample_data/fruits_cleaned.json")

In [None]:
class PySparkWrapperForDummies:

  def __init__(self, spark_instance) -> None:
    self.spark=spark_instance


  def create_df(self, data:list[tuple], schema:list):
    self.spark.createDataFrame(data=data, schema = schema)
