This is a notebook that converts data in LeetCode SQL 50 to spark dataframe and solves the same question using spark

In [75]:
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/20 22:02:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Q1) 1757. Recyclable and Low Fat Products

### Products Table

| Column Name | Type    |
|-------------|---------|
| product_id  | int     |
| low_fats    | enum    |
| recyclable  | enum    |

- `product_id` is the primary key (column with unique values) for this table.
- `low_fats` is an ENUM (category) of type ('Y', 'N') where 'Y' means this product is low fat and 'N' means it is not.
- `recyclable` is an ENUM (category) of types ('Y', 'N') where 'Y' means this product is recyclable and 'N' means it is not.

### Solution Requirement

Write a solution to find the ids of products that are both low fat and recyclable. Return the result table in any order.

### Example 1

#### Input

**Products Table:**

| product_id | low_fats | recyclable |
|------------|----------|------------|
| 0          | Y        | N          |
| 1          | Y        | Y          |
| 2          | N        | Y          |
| 3          | Y        | Y          |
| 4          | N        | N          |

#### Output

| product_id |
|------------|
| 1          |
| 3          |

### Explanation

Only products 1 and 3 are both low fat and recyclable.


In [18]:
data = [['0', 'Y', 'N'], ['1', 'Y', 'Y'], ['2', 'N', 'Y'], ['3', 'Y', 'Y'], ['4', 'N', 'N']]
products = pd.DataFrame(data, columns=['product_id', 'low_fats', 'recyclable']).astype({'product_id':'int64', 'low_fats':'category', 'recyclable':'category'})

In [95]:
q1_df = spark.createDataFrame(products)

In [20]:
q1_df.show()

+----------+--------+----------+
|product_id|low_fats|recyclable|
+----------+--------+----------+
|         0|       Y|         N|
|         1|       Y|         Y|
|         2|       N|         Y|
|         3|       Y|         Y|
|         4|       N|         N|
+----------+--------+----------+



In [21]:
res = q1_df \
    .filter((q1_df['low_fats'] == 'Y') & (q1_df['recyclable'] == 'Y')) \
    .select('product_id')

res.show()

+----------+
|product_id|
+----------+
|         1|
|         3|
+----------+



## Q2) 584. Find Customer Referee

### Customer Table

| Column Name | Type    |
|-------------|---------|
| id          | int     |
| name        | varchar |
| referee_id  | int     |

- In SQL, `id` is the primary key column for this table.
- Each row of this table indicates the id of a customer, their name, and the id of the customer who referred them.

### Solution Requirement

Find the names of the customers that are not referred by the customer with id = 2. Return the result table in any order.

### Example 1

#### Input

**Customer Table:**

| id | name | referee_id |
|----|------|------------|
| 1  | Will | null       |
| 2  | Jane | null       |
| 3  | Alex | 2          |
| 4  | Bill | null       |
| 5  | Zack | 1          |
| 6  | Mark | 2          |

#### Output

| name |
|------|
| Will |
| Jane |
| Bill |
| Zack |

### Explanation

Customers Will, Jane, Bill, and Zack are not referred by the customer with id = 2.


In [22]:
data = [[1, 'Will', None], [2, 'Jane', None], [3, 'Alex', 2], [4, 'Bill', None], [5, 'Zack', 1], [6, 'Mark', 2]]
customer = pd.DataFrame(data, columns=['id', 'name', 'referee_id']).astype({'id':'Int64', 'name':'object', 'referee_id':'Int64'})

In [23]:
q2_df = spark.createDataFrame(customer)

In [25]:
res = q2_df \
    .filter((q2_df['referee_id'] != 2) | (q2_df['referee_id'] == F.lit(None))) \
    .select('name')

res.show()

+----+
|name|
+----+
|Will|
|Jane|
|Bill|
|Zack|
+----+



## Q3) 595. Big Countries

### World Table

| Column Name | Type    |
|-------------|---------|
| name        | varchar |
| continent   | varchar |
| area        | int     |
| population  | int     |
| gdp         | bigint  |

- `name` is the primary key (column with unique values) for this table.
- Each row of this table gives information about the name of a country, the continent to which it belongs, its area, the population, and its GDP value.

### Solution Requirement

A country is big if:
- It has an area of at least three million (i.e., 3000000 km²), or
- It has a population of at least twenty-five million (i.e., 25000000).

Write a solution to find the name, population, and area of the big countries. Return the result table in any order.

### Example 1

#### Input

**World Table:**

| name        | continent | area    | population | gdp          |
|-------------|-----------|---------|------------|--------------|
| Afghanistan | Asia      | 652230  | 25500100   | 20343000000  |
| Albania     | Europe    | 28748   | 2831741    | 12960000000  |
| Algeria     | Africa    | 2381741 | 37100000   | 188681000000 |
| Andorra     | Europe    | 468     | 78115      | 3712000000   |
| Angola      | Africa    | 1246700 | 20609294   | 100990000000 |

#### Output

| name        | population | area    |
|-------------|------------|---------|
| Afghanistan | 25500100   | 652230  |
| Algeria     | 37100000   | 2381741 |

### Explanation

Afghanistan has a population of 25,500,100, making it a big country. Algeria has a population of 37,100,000, also qualifying as a big country.


In [26]:
data = [['Afghanistan', 'Asia', 652230, 25500100, 20343000000], ['Albania', 'Europe', 28748, 2831741, 12960000000], ['Algeria', 'Africa', 2381741, 37100000, 188681000000], ['Andorra', 'Europe', 468, 78115, 3712000000], ['Angola', 'Africa', 1246700, 20609294, 100990000000]]
world = pd.DataFrame(data, columns=['name', 'continent', 'area', 'population', 'gdp']).astype({'name':'object', 'continent':'object', 'area':'Int64', 'population':'Int64', 'gdp':'Int64'})

In [27]:
q3_df = spark.createDataFrame(world)

In [29]:
res = q3_df \
    .filter((q3_df['area'] >= 3000000) | (q3_df['population'] >= 25000000)) \
    .select('name', 'population', 'area')

res.show()

+-----------+----------+-------+
|       name|population|   area|
+-----------+----------+-------+
|Afghanistan|  25500100| 652230|
|    Algeria|  37100000|2381741|
+-----------+----------+-------+



## Q4) 1148. Article Views I

### Views Table

| Column Name   | Type    |
|---------------|---------|
| article_id    | int     |
| author_id     | int     |
| viewer_id     | int     |
| view_date     | date    |

- There is no primary key (column with unique values) for this table; the table may have duplicate rows.
- Each row of this table indicates that some viewer viewed an article (written by some author) on some date. 
- Note that equal `author_id` and `viewer_id` indicate the same person.

### Solution Requirement

Write a solution to find all the authors that viewed at least one of their own articles. Return the result table sorted by id in ascending order.

### Example 1

#### Input

**Views Table:**

| article_id | author_id | viewer_id | view_date  |
|------------|-----------|-----------|------------|
| 1          | 3         | 5         | 2019-08-01 |
| 1          | 3         | 6         | 2019-08-02 |
| 2          | 7         | 7         | 2019-08-01 |
| 2          | 7         | 6         | 2019-08-02 |
| 4          | 7         | 1         | 2019-07-22 |
| 3          | 4         | 4         | 2019-07-21 |
| 3          | 4         | 4         | 2019-07-21 |

#### Output

| id   |
|------|
| 4    |
| 7    |

### Explanation

Author 4 viewed their own article, and author 7 also viewed at least one of their own articles.


In [30]:
data = [[1, 3, 5, '2019-08-01'], [1, 3, 6, '2019-08-02'], [2, 7, 7, '2019-08-01'], [2, 7, 6, '2019-08-02'], [4, 7, 1, '2019-07-22'], [3, 4, 4, '2019-07-21'], [3, 4, 4, '2019-07-21']]
views = pd.DataFrame(data, columns=['article_id', 'author_id', 'viewer_id', 'view_date']).astype({'article_id':'Int64', 'author_id':'Int64', 'viewer_id':'Int64', 'view_date':'datetime64[ns]'})

In [31]:
q4_df = spark.createDataFrame(views)

In [38]:
res = q4_df \
    .filter(q4_df['author_id'] == q4_df['viewer_id']) \
    .selectExpr('author_id as id') \
    .distinct()

res.show()

+---+
| id|
+---+
|  7|
|  4|
+---+



## Q5) 1683. Invalid Tweets

### Tweets Table

| Column Name | Type    |
|-------------|---------|
| tweet_id    | int     |
| content     | varchar |

- `tweet_id` is the primary key (column with unique values) for this table.
- This table contains all the tweets in a social media app.

### Solution Requirement

Write a solution to find the IDs of the invalid tweets. The tweet is invalid if the number of characters used in the content of the tweet is strictly greater than 15. Return the result table in any order.

### Example 1

#### Input

**Tweets Table:**

| tweet_id | content                           |
|----------|-----------------------------------|
| 1        | Let us Code                       |
| 2        | More than fifteen chars are here! |

#### Output

| tweet_id |
|----------|
| 2        |

### Explanation

- Tweet 1 has length = 11. It is a valid tweet.
- Tweet 2 has length = 33. It is an invalid tweet.


In [39]:
data = [[1, 'Let us Code'], [2, 'More than fifteen chars are here!']]
tweets = pd.DataFrame(data, columns=['tweet_id', 'content']).astype({'tweet_id':'Int64', 'content':'object'})

In [40]:
q5_df = spark.createDataFrame(tweets)

In [44]:
res = q5_df.filter(F.length(q5_df['content']) > 15).select('tweet_id')

res.show()

+--------+
|tweet_id|
+--------+
|       2|
+--------+



## Q6) 1378. Replace Employee ID With The Unique Identifier

### Employees Table

| Column Name | Type    |
|-------------|---------|
| id          | int     |
| name        | varchar |

- `id` is the primary key (column with unique values) for this table.
- Each row of this table contains the `id` and the name of an employee in a company.

### EmployeeUNI Table

| Column Name | Type    |
|-------------|---------|
| id          | int     |
| unique_id   | int     |

- `(id, unique_id)` is the primary key (combination of columns with unique values) for this table.
- Each row of this table contains the `id` and the corresponding unique id of an employee in the company.

### Solution Requirement

Write a solution to show the unique ID of each user. If a user does not have a unique ID, replace it with `null`. Return the result table in any order.

### Example 1

#### Input

**Employees Table:**

| id | name     |
|----|----------|
| 1  | Alice    |
| 7  | Bob      |
| 11 | Meir     |
| 90 | Winston  |
| 3  | Jonathan |

**EmployeeUNI Table:**

| id | unique_id |
|----|-----------|
| 3  | 1         |
| 11 | 2         |
| 90 | 3         |

#### Output

| unique_id | name     |
|-----------|----------|
| null      | Alice    |
| null      | Bob      |
| 2         | Meir     |
| 3         | Winston  |
| 1         | Jonathan |

### Explanation

- Alice and Bob do not have a unique ID; we will show `null` instead.
- The unique ID of Meir is 2.
- The unique ID of Winston is 3.
- The unique ID of Jonathan is 1.


In [46]:
data = [[1, 'Alice'], [7, 'Bob'], [11, 'Meir'], [90, 'Winston'], [3, 'Jonathan']]
employees = pd.DataFrame(data, columns=['id', 'name']).astype({'id':'int64', 'name':'object'})
data = [[3, 1], [11, 2], [90, 3]]
employee_uni = pd.DataFrame(data, columns=['id', 'unique_id']).astype({'id':'int64', 'unique_id':'int64'})

In [47]:
emp_df = spark.createDataFrame(employees)
emp_uni = spark.createDataFrame(employee_uni)

In [51]:
res = emp_df \
    .join(emp_uni, on='id', how='left') \
    .select('unique_id', 'name')

res.show()

+---------+--------+
|unique_id|    name|
+---------+--------+
|     NULL|   Alice|
|     NULL|     Bob|
|        2|    Meir|
|        3| Winston|
|        1|Jonathan|
+---------+--------+



## Q7) 1068. Product Sales Analysis I

### Sales Table

| Column Name | Type  |
|-------------|-------|
| sale_id     | int   |
| product_id  | int   |
| year        | int   |
| quantity    | int   |
| price       | int   |

- `(sale_id, year)` is the primary key (combination of columns with unique values) of this table.
- `product_id` is a foreign key (reference column) to the Product table.
- Each row of this table shows a sale on the product `product_id` in a certain year. Note that the price is per unit.

### Product Table

| Column Name  | Type    |
|--------------|---------|
| product_id   | int     |
| product_name | varchar |

- `product_id` is the primary key (column with unique values) of this table.
- Each row of this table indicates the product name of each product.

### Solution Requirement

Write a solution to report the `product_name`, `year`, and `price` for each `sale_id` in the Sales table. Return the resulting table in any order.

### Example 1

#### Input

**Sales Table:**

| sale_id | product_id | year | quantity | price |
|---------|------------|------|----------|-------|
| 1       | 100        | 2008 | 10       | 5000  |
| 2       | 100        | 2009 | 12       | 5000  |
| 7       | 200        | 2011 | 15       | 9000  |

**Product Table:**

| product_id | product_name |
|------------|--------------|
| 100        | Nokia        |
| 200        | Apple        |
| 300        | Samsung      |

#### Output

| product_name | year  | price |
|--------------|-------|-------|
| Nokia        | 2008  | 5000  |
| Nokia        | 2009  | 5000  |
| Apple        | 2011  | 9000  |

### Explanation

- From `sale_id = 1`, we can conclude that Nokia was sold for 5000 in the year 2008.
- From `sale_id = 2`, we can conclude that Nokia was sold for 5000 in the year 2009.
- From `sale_id = 7`, we can conclude that Apple was sold for 9000 in the year 2011.


In [53]:
data = [[1, 100, 2008, 10, 5000], [2, 100, 2009, 12, 5000], [7, 200, 2011, 15, 9000]]
sales = pd.DataFrame(data, columns=['sale_id', 'product_id', 'year', 'quantity', 'price']).astype({'sale_id':'Int64', 'product_id':'Int64', 'year':'Int64', 'quantity':'Int64', 'price':'Int64'})
data = [[100, 'Nokia'], [200, 'Apple'], [300, 'Samsung']]
product = pd.DataFrame(data, columns=['product_id', 'product_name']).astype({'product_id':'Int64', 'product_name':'object'})

In [54]:
sales_df = spark.createDataFrame(sales)
product_df = spark.createDataFrame(product)

In [56]:
res = sales_df \
    .join(product_df, on='product_id') \
    .select('product_name', 'year', 'price')

res.show()

+------------+----+-----+
|product_name|year|price|
+------------+----+-----+
|       Nokia|2008| 5000|
|       Nokia|2009| 5000|
|       Apple|2011| 9000|
+------------+----+-----+



## Q8) 1581. Customer Who Visited but Did Not Make Any Transactions

### Visits Table

| Column Name | Type    |
|-------------|---------|
| visit_id    | int     |
| customer_id | int     |

- `visit_id` is the column with unique values for this table.
- This table contains information about the customers who visited the mall.

### Transactions Table

| Column Name    | Type    |
|----------------|---------|
| transaction_id | int     |
| visit_id       | int     |
| amount         | int     |

- `transaction_id` is the column with unique values for this table.
- This table contains information about the transactions made during the `visit_id`.

### Solution Requirement

Write a solution to find the IDs of the users who visited without making any transactions and the number of times they made these types of visits. Return the result table sorted in any order.

### Example 1

#### Input

**Visits Table:**

| visit_id | customer_id |
|----------|-------------|
| 1        | 23          |
| 2        | 9           |
| 4        | 30          |
| 5        | 54          |
| 6        | 96          |
| 7        | 54          |
| 8        | 54          |

**Transactions Table:**

| transaction_id | visit_id | amount |
|----------------|----------|--------|
| 2              | 5        | 310    |
| 3              | 5        | 300    |
| 9              | 5        | 200    |
| 12             | 1        | 910    |
| 13             | 2        | 970    |

#### Output

| customer_id | count_no_trans |
|-------------|----------------|
| 54          | 2              |
| 30          | 1              |
| 96          | 1              |

### Explanation

- Customer with id = 23 visited the mall once and made one transaction during the visit with id = 12.
- Customer with id = 9 visited the mall once and made one transaction during the visit with id = 13.
- Customer with id = 30 visited the mall once and did not make any transactions.
- Customer with id = 54 visited the mall three times. During 2 visits they did not make any transactions, and during one visit they made 3 transactions.
- Customer with id = 96 visited the mall once and did not make any transactions.

As we can see, users with IDs 30 and 96 visited the mall one time without making any transactions. Also, user 54 visited the mall twice and did not make any transactions.


In [57]:
data = [[1, 23], [2, 9], [4, 30], [5, 54], [6, 96], [7, 54], [8, 54]]
visits = pd.DataFrame(data, columns=['visit_id', 'customer_id']).astype({'visit_id':'Int64', 'customer_id':'Int64'})
data = [[2, 5, 310], [3, 5, 300], [9, 5, 200], [12, 1, 910], [13, 2, 970]]
transactions = pd.DataFrame(data, columns=['transaction_id', 'visit_id', 'amount']).astype({'transaction_id':'Int64', 'visit_id':'Int64', 'amount':'Int64'})


In [58]:
visits_df = spark.createDataFrame(visits)
trans_df = spark.createDataFrame(transactions)

In [65]:
res = visits_df \
    .join(trans_df, on='visit_id', how='left')

res = res \
    .filter(res['amount'].isNull()) \
    .groupby('customer_id').count()

res.show()

+-----------+-----+
|customer_id|count|
+-----------+-----+
|         54|    2|
|         96|    1|
|         30|    1|
+-----------+-----+



## Q9) 197. Rising Temperature

### Weather Table

| Column Name   | Type    |
|---------------|---------|
| id            | int     |
| recordDate    | date    |
| temperature   | int     |

- `id` is the column with unique values for this table.
- There are no different rows with the same `recordDate`.
- This table contains information about the temperature on a certain day.

### Solution Requirement

Write a solution to find all dates' id with higher temperatures compared to its previous dates (yesterday). Return the result table in any order.

### Example 1

#### Input

**Weather Table:**

| id | recordDate | temperature |
|----|------------|-------------|
| 1  | 2015-01-01 | 10          |
| 2  | 2015-01-02 | 25          |
| 3  | 2015-01-03 | 20          |
| 4  | 2015-01-04 | 30          |

#### Output

| id |
|----|
| 2  |
| 4  |

### Explanation

- On 2015-01-02, the temperature was higher than the previous day (10 -> 25).
- On 2015-01-04, the temperature was higher than the previous day (20 -> 30).


In [77]:
data = [[1, '2015-01-01', 10], [2, '2015-01-02', 25], [3, '2015-01-03', 20], [4, '2015-01-04', 30]]
weather = pd.DataFrame(data, columns=['id', 'recordDate', 'temperature']).astype({'id':'Int64', 'recordDate':'datetime64[ns]', 'temperature':'Int64'})

In [78]:
weather_df = spark.createDataFrame(weather)

In [90]:
window_spec = Window.partitionBy().orderBy('recordDate')

res = weather_df.withColumns({'previous_date': F.lag(weather_df['recordDate']).over(window_spec),
                              'previous_temp': F.lag(weather_df['temperature']).over(window_spec)}) \

res = res \
    .filter((res['previous_date'].isNotNull()) & \
            (F.datediff(res['recordDate'], res['previous_date']) == 1) & \
            (res['temperature'] > res['previous_temp'])) \
    .select('id')

res.show()

+---+
| id|
+---+
|  2|
|  4|
+---+



24/09/20 23:16:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/20 23:16:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/20 23:16:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/20 23:16:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/20 23:16:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


## Q10) 1661. Average Time of Process per Machine

### Table: Activity

| Column Name    | Type    |
|----------------|---------|
| machine_id     | int     |
| process_id     | int     |
| activity_type  | enum    |
| timestamp      | float   |

- `(machine_id, process_id, activity_type)` is the primary key (combination of columns with unique values) of this table.
- `machine_id` is the ID of a machine.
- `process_id` is the ID of a process running on the machine with ID `machine_id`.
- `activity_type` is an ENUM (category) of type ('start', 'end').
- `timestamp` is a float representing the current time in seconds.

The 'start' timestamp will always be before the 'end' timestamp for every `(machine_id, process_id)` pair. It is guaranteed that each `(machine_id, process_id)` pair has a 'start' and 'end' timestamp.

### Solution Requirement

Write a solution to find the average time each machine takes to complete a process. The time to complete a process is the 'end' timestamp minus the 'start' timestamp. The average time is calculated by the total time to complete every process on the machine divided by the number of processes that were run. The resulting table should have the `machine_id` along with the average time as `processing_time`, rounded to 3 decimal places.

### Example 1

#### Input

**Activity Table:**

| machine_id | process_id | activity_type | timestamp |
|------------|------------|---------------|-----------|
| 0          | 0          | start         | 0.712     |
| 0          | 0          | end           | 1.520     |
| 0          | 1          | start         | 3.140     |
| 0          | 1          | end           | 4.120     |
| 1          | 0          | start         | 0.550     |
| 1          | 0          | end           | 1.550     |
| 1          | 1          | start         | 0.430     |
| 1          | 1          | end           | 1.420     |
| 2          | 0          | start         | 4.100     |
| 2          | 0          | end           | 4.512     |
| 2          | 1          | start         | 2.500     |
| 2          | 1          | end           | 5.000     |

#### Output

| machine_id | processing_time |
|------------|-----------------|
| 0          | 0.894           |
| 1          | 0.995           |
| 2          | 1.456           |

### Explanation

- Machine 0's average time is ((1.520 - 0.712) + (4.120 - 3.140)) / 2 = 0.894
- Machine 1's average time is ((1.550 - 0.550) + (1.420 - 0.430)) / 2 = 0.995
- Machine 2's average time is ((4.512 - 4.100) + (5.000 - 2.500)) / 2 = 1.456


In [91]:
data = [[0, 0, 'start', 0.712], [0, 0, 'end', 1.52], [0, 1, 'start', 3.14], [0, 1, 'end', 4.12], [1, 0, 'start', 0.55], [1, 0, 'end', 1.55], [1, 1, 'start', 0.43], [1, 1, 'end', 1.42], [2, 0, 'start', 4.1], [2, 0, 'end', 4.512], [2, 1, 'start', 2.5], [2, 1, 'end', 5]]
activity = pd.DataFrame(data, columns=['machine_id', 'process_id', 'activity_type', 'timestamp']).astype({'machine_id':'Int64', 'process_id':'Int64', 'activity_type':'object', 'timestamp':'Float64'})

In [92]:
activity_df = spark.createDataFrame(activity)

## Q11) 577. Employee Bonus

### Table: Employee

| Column Name | Type    |
|-------------|---------|
| empId       | int     |
| name        | varchar |
| supervisor  | int     |
| salary      | int     |

- `empId` is the column with unique values for this table.
- Each row of this table indicates the name and the ID of an employee in addition to their salary and the ID of their manager.

### Table: Bonus

| Column Name | Type |
|-------------|------|
| empId       | int  |
| bonus       | int  |

- `empId` is the column of unique values for this table.
- `empId` is a foreign key (reference column) to `empId` from the Employee table.
- Each row of this table contains the ID of an employee and their respective bonus.

### Solution Requirement

Write a solution to report the name and bonus amount of each employee with a bonus less than 1000. Return the result table in any order.

### Example 1

#### Input

**Employee Table:**

| empId | name   | supervisor | salary |
|-------|--------|------------|--------|
| 3     | Brad   | null       | 4000   |
| 1     | John   | 3          | 1000   |
| 2     | Dan    | 3          | 2000   |
| 4     | Thomas | 3          | 4000   |

**Bonus Table:**

| empId | bonus |
|-------|-------|
| 2     | 500   |
| 4     | 2000  |

#### Output

| name | bonus |
|------|-------|
| Brad | null  |
| John | null  |
| Dan  | 500   |

### Explanation

- Brad has no bonus recorded, so it shows as null.
- John has no bonus recorded, so it shows as null.
- Dan has a bonus of 500, which is less than 1000.


In [93]:
data = [[3, 'Brad', None, 4000], [1, 'John', 3, 1000], [2, 'Dan', 3, 2000], [4, 'Thomas', 3, 4000]]
employee = pd.DataFrame(data, columns=['empId', 'name', 'supervisor', 'salary']).astype({'empId':'Int64', 'name':'object', 'supervisor':'Int64', 'salary':'Int64'})
data = [[2, 500], [4, 2000]]
bonus = pd.DataFrame(data, columns=['empId', 'bonus']).astype({'empId':'Int64', 'bonus':'Int64'})

In [94]:
employee_df = spark.createDataFrame(employee)
bonus_df = spark.createDataFrame(bonus)

In [105]:
res = employee_df \
    .join(bonus_df, how='left', on='empId')

res = res \
    .filter((res['bonus'].isNull()) | (res['bonus'] < 1000)) \
    .select('name', 'bonus')

res.show()

+----+-----+
|name|bonus|
+----+-----+
|Brad| NULL|
|John| NULL|
| Dan|  500|
+----+-----+



## Q12) 1280. Students and Examinations

### Table: Students

| Column Name   | Type    |
|---------------|---------|
| student_id    | int     |
| student_name  | varchar |

- `student_id` is the primary key (column with unique values) for this table.
- Each row of this table contains the ID and the name of one student in the school.

### Table: Subjects

| Column Name  | Type    |
|--------------|---------|
| subject_name | varchar |

- `subject_name` is the primary key (column with unique values) for this table.
- Each row of this table contains the name of one subject in the school.

### Table: Examinations

| Column Name  | Type    |
|--------------|---------|
| student_id   | int     |
| subject_name | varchar |

- There is no primary key (column with unique values) for this table. It may contain duplicates.
- Each student from the Students table takes every course from the Subjects table.
- Each row of this table indicates that a student with ID `student_id` attended the exam of `subject_name`.

### Solution Requirement

Write a solution to find the number of times each student attended each exam. Return the result table ordered by `student_id` and `subject_name`.

### Example 1

#### Input

**Students Table:**

| student_id | student_name |
|------------|--------------|
| 1          | Alice        |
| 2          | Bob          |
| 13         | John         |
| 6          | Alex         |

**Subjects Table:**

| subject_name |
|--------------|
| Math         |
| Physics      |
| Programming  |

**Examinations Table:**

| student_id | subject_name |
|------------|--------------|
| 1          | Math         |
| 1          | Physics      |
| 1          | Programming  |
| 2          | Programming  |
| 1          | Physics      |
| 1          | Math         |
| 13         | Math         |
| 13         | Programming  |
| 13         | Physics      |
| 2          | Math         |
| 1          | Math         |

#### Output

| student_id | student_name | subject_name | attended_exams |
|------------|--------------|--------------|----------------|
| 1          | Alice        | Math         | 3              |
| 1          | Alice        | Physics      | 2              |
| 1          | Alice        | Programming  | 1              |
| 2          | Bob          | Math         | 1              |
| 2          | Bob          | Physics      | 0              |
| 2          | Bob          | Programming  | 1              |
| 6          | Alex         | Math         | 0              |
| 6          | Alex         | Physics      | 0              |
| 6          | Alex         | Programming  | 0              |
| 13         | John         | Math         | 1              |
| 13         | John         | Physics      | 1              |
| 13         | John         | Programming  | 1              |

### Explanation

- The result table contains all students and all subjects.
- Alice attended the Math exam 3 times, the Physics exam 2 times, and the Programming exam 1 time.
- Bob attended the Math exam 1 time, the Programming exam 1 time, and did not attend the Physics exam.
- Alex did not attend any exams.
- John attended the Math exam 1 time, the Physics exam 1 time, and the Programming exam 1 time.


In [96]:
data = [[1, 'Alice'], [2, 'Bob'], [13, 'John'], [6, 'Alex']]
students = pd.DataFrame(data, columns=['student_id', 'student_name']).astype({'student_id':'Int64', 'student_name':'object'})
data = [['Math'], ['Physics'], ['Programming']]
subjects = pd.DataFrame(data, columns=['subject_name']).astype({'subject_name':'object'})
data = [[1, 'Math'], [1, 'Physics'], [1, 'Programming'], [2, 'Programming'], [1, 'Physics'], [1, 'Math'], [13, 'Math'], [13, 'Programming'], [13, 'Physics'], [2, 'Math'], [1, 'Math']]
examinations = pd.DataFrame(data, columns=['student_id', 'subject_name']).astype({'student_id':'Int64', 'subject_name':'object'})

In [97]:
students_df = spark.createDataFrame(students)
subjects_df = spark.createDataFrame(subjects)
exam_df = spark.createDataFrame(examinations)

## Q13) 570. Managers with at Least 5 Direct Reports

### Table: Employee

| Column Name | Type    |
|-------------|---------|
| id          | int     |
| name        | varchar |
| department  | varchar |
| managerId   | int     |

- `id` is the primary key (column with unique values) for this table.
- Each row of this table indicates the name of an employee, their department, and the id of their manager.
- If `managerId` is null, then the employee does not have a manager.
- No employee will be the manager of themselves.

### Solution Requirement

Write a solution to find managers with at least five direct reports. Return the result table in any order.

### Example 1

#### Input

**Employee Table:**

| id  | name  | department | managerId |
|-----|-------|------------|-----------|
| 101 | John  | A          | null      |
| 102 | Dan   | A          | 101       |
| 103 | James | A          | 101       |
| 104 | Amy   | A          | 101       |
| 105 | Anne  | A          | 101       |
| 106 | Ron   | B          | 101       |

#### Output

| name |
|------|
| John |

### Explanation

- In this example, John is the manager of Dan, James, Amy, Anne, and Ron, totaling five direct reports.
- Therefore, the output is John, who meets the criteria of having at least five direct reports.


In [98]:
data = [[101, 'John', 'A', None], [102, 'Dan', 'A', 101], [103, 'James', 'A', 101], [104, 'Amy', 'A', 101], [105, 'Anne', 'A', 101], [106, 'Ron', 'B', 101]]
employee = pd.DataFrame(data, columns=['id', 'name', 'department', 'managerId']).astype({'id':'Int64', 'name':'object', 'department':'object', 'managerId':'Int64'})

In [99]:
e_df = spark.createDataFrame(employee)

In [111]:
m_df = e_df \
    .groupby('managerId') \
    .count() \
    .dropna()

res = e_df \
    .join(m_df, m_df['managerId'] == e_df['id']) \
    .select('name')

res.show()

+----+
|name|
+----+
|John|
+----+



## Q14) 1934. Confirmation Rate

### Table: Signups

| Column Name | Type     |
|-------------|----------|
| user_id     | int      |
| time_stamp   | datetime |

- `user_id` is the column of unique values for this table.
- Each row contains information about the signup time for the user with ID `user_id`.

### Table: Confirmations

| Column Name | Type     |
|-------------|----------|
| user_id     | int      |
| time_stamp   | datetime |
| action      | ENUM     |

- `(user_id, time_stamp)` is the primary key (combination of columns with unique values) for this table.
- `user_id` is a foreign key (reference column) to the Signups table.
- `action` is an ENUM (category) of the type ('confirmed', 'timeout').
- Each row indicates that the user with ID `user_id` requested a confirmation message at `time_stamp` and that confirmation message was either confirmed ('confirmed') or expired without confirming ('timeout').

### Solution Requirement

The confirmation rate of a user is defined as the number of 'confirmed' messages divided by the total number of requested confirmation messages. If a user did not request any confirmation messages, the confirmation rate should be 0. Round the confirmation rate to two decimal places.

### Example 1

#### Input

**Signups Table:**

| user_id | time_stamp          |
|---------|---------------------|
| 3       | 2020-03-21 10:16:13 |
| 7       | 2020-01-04 13:57:59 |
| 2       | 2020-07-29 23:09:44 |
| 6       | 2020-12-09 10:39:37 |

**Confirmations Table:**

| user_id | time_stamp          | action    |
|---------|---------------------|-----------|
| 3       | 2021-01-06 03:30:46 | timeout   |
| 3       | 2021-07-14 14:00:00 | timeout   |
| 7       | 2021-06-12 11:57:29 | confirmed |
| 7       | 2021-06-13 12:58:28 | confirmed |
| 7       | 2021-06-14 13:59:27 | confirmed |
| 2       | 2021-01-22 00:00:00 | confirmed |
| 2       | 2021-02-28 23:59:59 | timeout   |

#### Output

| user_id | confirmation_rate |
|---------|-------------------|
| 6       | 0.00              |
| 3       | 0.00              |
| 7       | 1.00              |
| 2       | 0.50              |

### Explanation

- User 6 did not request any confirmation messages, so the confirmation rate is 0.
- User 3 made 2 requests, both of which timed out, resulting in a confirmation rate of 0.
- User 7 made 3 requests, all confirmed, giving a confirmation rate of 1.
- User 2 made 2 requests (1 confirmed, 1 timed out), resulting in a confirmation rate of 0.5.


In [100]:
data = [[3, '2020-03-21 10:16:13'], [7, '2020-01-04 13:57:59'], [2, '2020-07-29 23:09:44'], [6, '2020-12-09 10:39:37']]
signups = pd.DataFrame(data, columns=['user_id', 'time_stamp']).astype({'user_id':'Int64', 'time_stamp':'datetime64[ns]'})
data = [[3, '2021-01-06 03:30:46', 'timeout'], [3, '2021-07-14 14:00:00', 'timeout'], [7, '2021-06-12 11:57:29', 'confirmed'], [7, '2021-06-13 12:58:28', 'confirmed'], [7, '2021-06-14 13:59:27', 'confirmed'], [2, '2021-01-22 00:00:00', 'confirmed'], [2, '2021-02-28 23:59:59', 'timeout']]
confirmations = pd.DataFrame(data, columns=['user_id', 'time_stamp', 'action']).astype({'user_id':'Int64', 'time_stamp':'datetime64[ns]', 'action':'object'})

In [101]:
signup_df = spark.createDataFrame(signups)
confirm_df = spark.createDataFrame(confirmations)

10, 12, 14 remaining to solve