In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.window import *

In [0]:
import pandas as pd 

### 175. Combine Two Tables

Write a solution to report the first name, last name, city, and state of each person in the Person table. If the address of a personId is not present in the Address table, report null instead.

Return the result table in any order.

In [0]:
# Define schema for Person table
person_schema = StructType([
    StructField("personId", IntegerType(), True),
    StructField("lastName", StringType(), True),
    StructField("firstName", StringType(), True)
])

# Create Person DataFrame
data_person = [
    (1, "Wang", "Allen"),
    (2, "Alice", "Bob")
]

df_person = spark.createDataFrame(data_person, schema=person_schema)
df_person.show()

# Define schema for Address table
address_schema = StructType([
    StructField("addressId", IntegerType(), True),
    StructField("personId", IntegerType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True)
])

# Create Address DataFrame
data_address = [
    (1, 2, "New York City", "New York"),
    (2, 3, "Leetcode", "California")
]

df_address = spark.createDataFrame(data_address, schema=address_schema)
df_address.show()

In [0]:
df_person.join(
    df_address, df_person['personId'] == df_address['personId'], 'left')\
        .select(["lastName", "firstName", "city", "state"]).show()

In [0]:
# Pandas
pdf_address = df_address.toPandas()
pdf_person = df_person.toPandas()
pdf_address.display()
pdf_person.display()

In [0]:
pdf_person.merge(pdf_address, on='personId', how = 'left')[['lastName', 'firstName', 'city', 'state']].display()

### 181. Employees Earning More Than Their Managers
id is the primary key (column with unique values) for this table.
Each row of this table indicates the ID of an employee, their name, salary, and the ID of their manager.

In [0]:
# Define schema for Employee table
employee_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("salary", IntegerType(), True),
    StructField("managerId", IntegerType(), True)
])

# Create Employee DataFrame
data_employee = [
    (1, "Joe", 70000, 3),
    (2, "Henry", 80000, 4),
    (3, "Sam", 60000, None),
    (4, "Max", 90000, None)
]

df_employee1 = spark.createDataFrame(data_employee, schema=employee_schema)
df_employee2 = spark.createDataFrame(data_employee, schema=employee_schema)


In [0]:
df_employee1.show()
df_employee2.show()

In [0]:
df_employee1.join(df_employee2, df_employee1.id == df_employee2.managerId, 'inner')\
    .select(df_employee2["name"]).filter(df_employee2["salary"] > df_employee1["salary"]).show()

In [0]:
#pandas

pdf_employee = df_employee1.toPandas()

In [0]:
pdf_merge = pdf_employee.merge(pdf_employee, left_on = 'managerId', right_on = 'id', how = 'left' ).iloc[:,[1,2,5,6]]

In [0]:
pdf_merge.loc[pdf_merge['salary_x'] > pdf_merge['salary_y']].iloc[:,[0]].display()

### 182. Duplicate Emails
Write a solution to report all the duplicate emails. Note that it's guaranteed that the email field is not NULL.

Return the result table in any order.

In [0]:
# Define schema for Email table
email_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("email", StringType(), True)
])

# Create Email DataFrame
data_email = [
    (1, "a@b.com"),
    (2, "c@d.com"),
    (3, "a@b.com")
]

df_email = spark.createDataFrame(data_email, schema=email_schema)
df_email.show()

In [0]:
df_email.groupby("email").count().filter("count > 1").select("email").show()

In [0]:
pdf_email = df_email.toPandas()

In [0]:
pdf_email = pdf_email.groupby("email").count().reset_index()

In [0]:
pdf_email.loc[pdf_email['id'] > 1].iloc[:,[0]].display()

### 183. Customers Who Never Order
Write a solution to find all customers who never order anything.

Return the result table in any order.

The result format is in the following example.

In [0]:
# Define schema for Customers table
customers_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True)
])

# Create Customers DataFrame
data_customers = [
    (1, "Joe"),
    (2, "Henry"),
    (3, "Sam"),
    (4, "Max")
]

df_customers = spark.createDataFrame(data_customers, schema=customers_schema)
df_customers.show()

# Define schema for Orders table
orders_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("customerId", IntegerType(), True)
])

# Create Orders DataFrame
data_orders = [
    (1, 3),
    (2, 1)
]

df_orders = spark.createDataFrame(data_orders, schema=orders_schema)
df_orders.show() 

In [0]:
df_customers.join(df_orders, df_customers.id == df_orders.customerId, 'anti')\
    .select('name').alias('Customers').show()

In [0]:
pdf_customers = df_customers.toPandas()
pdf_orders = df_orders.toPandas()

In [0]:
pdf_customers.display()
pdf_orders.display()

In [0]:
pdf_customers = pdf_customers.merge(pdf_orders, left_on = 'id', right_on = 'customerId', how = 'left')
pdf_customers.display()

In [0]:
pdf_customers.loc[pd.isnull(pdf_customers['id_y']), ['name']].display()

### 176. Second Highest Salary
Write a solution to find the second highest distinct salary from the Employee table. If there is no second highest salary, return null (return None in Pandas).

The result format is in the following example.

In [0]:
# Define schema for simple Employee salary table
simple_employee_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("salary", IntegerType(), True)
])

# Create simple Employee DataFrame
data_simple_employee = [
    (1, 100),
    (2, 200),
    (3, 300)
]

df_simple_employee = spark.createDataFrame(data_simple_employee, schema=simple_employee_schema)
df_simple_employee.show()

In [0]:
window_spec = Window.orderBy(col("salary").desc())
df_win = df_simple_employee.withColumn('rank', dense_rank().over(window_spec))
df_filtered = df_win.select(df_win['salary']).filter(df_win["rank"] == 2)
df_filtered.show()

In [0]:
pdf_sim_emp = df_simple_employee.toPandas()
pdf_sim_emp.display()

In [0]:
pdf_sim_emp['rank'] = pdf_sim_emp['salary'].rank(method = 'dense', ascending = 'True')
pdf_sim_emp.display()

In [0]:
pdf_sim_emp.loc[pdf_sim_emp['rank'] == 2, ['salary']].display()

### 184. Department Highest Salary
Write a solution to find employees who have the highest salary in each of the departments.

Return the result table in any order.

The result format is in the following example.

In [0]:
# Define schema for Employee table
employee_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("salary", IntegerType(), True),
    StructField("departmentId", IntegerType(), True)
])

# Create Employee DataFrame
employee = [
    (1, "Joe", 70000, 1),
    (2, "Jim", 90000, 1),
    (3, "Henry", 80000, 2),
    (4, "Sam", 60000, 2),
    (5, "Max", 90000, 1)
]

df_employee = spark.createDataFrame(employee, schema=employee_schema)
df_employee.show()

# Define schema for Department table
department_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True)
])

# Create Department DataFrame
data_department = [
    (1, "IT"),
    (2, "Sales")
]

df_department = spark.createDataFrame(data_department, schema=department_schema)
df_department.show()


In [0]:
%python
df_joined = df_employee.join(
    df_department,
    df_employee.departmentId == df_department.id,
    'inner'
)

df_selected = df_joined.select(
    df_employee['name'].alias('employee_name'),
    df_employee['salary'],
    df_employee['departmentId'],
    df_department['name'].alias('departmentname')
)

display(df_selected)

In [0]:
df_group = df_selected.groupBy('departmentname').agg(max('salary').alias('salary'))

df_selected.select(['employee_name', 'salary', 'departmentname']).join(df_group, df_selected.salary == df_group.salary, 'inner').show()

In [0]:
pdf_employee = df_employee.toPandas()
pdf_department = df_department.toPandas()
pdf_merge = pdf_employee.merge(pdf_department, left_on='departmentId', right_on = 'id', how = 'left').iloc[:, [1,2,5]]
print(pdf_merge)

In [0]:
maxsal = pdf_merge.groupby('name_y')['salary'].idxmax()
display(maxsal)
result = pdf_merge.loc[maxsal, ['name_y', 'salary']]
result = result.reset_index(drop=True)
display(result)



In [0]:
pdf_employee.display()

In [0]:
pdf_employee.join(result, left_on = 'salary', right_on = 'salary', how= 'left' ).display()

### 196. Delete Duplicate Emails
Write a solution to delete all duplicate emails, keeping only one unique email with the smallest id.

For SQL users, please note that you are supposed to write a DELETE statement and not a SELECT one.

For Pandas users, please note that you are supposed to modify Person in place.

After running your script, the answer shown is the Person table. The driver will first compile and run your piece of code and then show the Person table. The final order of the Person table does not matter.

In [0]:
# Define schema
person_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("email", StringType(), True)
])

# Sample data
data_person = [
    (1, "john@example.com"),
    (2, "bob@example.com"),
    (3, "john@example.com")
]

# Create DataFrame
df_person = spark.createDataFrame(data_person, schema=person_schema)

# Show DataFrame
df_person.show()

In [0]:
df_person.drop_duplicates(['email']).show()

### 197. Rising Temperature
Write a solution to find all dates' id with higher temperatures compared to its previous dates (yesterday).

Return the result table in any order.

In [0]:
from datetime import *

# Define schema
weather_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("recordDate", DateType(), True),
    StructField("temperature", IntegerType(), True)
])

# Sample data with proper datetime conversion
data_weather = [
    (1, datetime.strptime("2015-01-01", "%Y-%m-%d").date(), 10),
    (2, datetime.strptime("2015-01-02", "%Y-%m-%d").date(), 25),
    (3, datetime.strptime("2015-01-03", "%Y-%m-%d").date(), 20),
    (4, datetime.strptime("2015-01-04", "%Y-%m-%d").date(), 30)
]

# Create DataFrame
df_weather = spark.createDataFrame(data_weather, schema=weather_schema)

# Show DataFrame
df_weather.show()

In [0]:
window_spec = Window.orderBy('recordDate')
df_result = df_weather.withColumn('prevtemp', lag('temperature').over(window_spec))
df_result.show()


In [0]:
df_result.select('id').filter(df_result['temperature'] > df_result['prevtemp']).show()

### 511. Game Play Analysis I

Input: 
Activity table:
+-----------+-----------+------------+--------------+
| player_id | device_id | event_date | games_played |
+-----------+-----------+------------+--------------+
| 1         | 2         | 2016-03-01 | 5            |
| 1         | 2         | 2016-05-02 | 6            |
| 2         | 3         | 2017-06-25 | 1            |
| 3         | 1         | 2016-03-02 | 0            |
| 3         | 4         | 2018-07-03 | 5            |
+-----------+-----------+------------+--------------+




Output: 
+-----------+-------------+
| player_id | first_login |
+-----------+-------------+
| 1         | 2016-03-01  |
| 2         | 2017-06-25  |
| 3         | 2016-03-02  |
+-----------+-------------+



Write a solution to find the first login date for each player.

Return the result table in any order.

The result format is in the following example.

In [0]:
from datetime import datetime

In [0]:
# Define schema
schema = StructType([
    StructField("player_id", IntegerType(), False),
    StructField("device_id", IntegerType(), False),
    StructField("event_date", DateType(), False),
    StructField("games_played", IntegerType(), False)
])

# Create data
data = [
    (1, 2, datetime.strptime("2016-03-01", "%Y-%m-%d").date(), 5),
    (1, 2, datetime.strptime("2016-05-02", "%Y-%m-%d").date(), 6),
    (2, 3, datetime.strptime("2017-06-25", "%Y-%m-%d").date(), 1),
    (3, 1, datetime.strptime("2016-03-02", "%Y-%m-%d").date(), 0),
    (3, 4, datetime.strptime("2018-07-03", "%Y-%m-%d").date(), 5)
]



In [0]:
# Create DataFrame
df_game = spark.createDataFrame(data, schema=schema)

# Show DataFrame
df_game.show()

In [0]:
df_game.select('player_id', 'event_date').groupBy('player_id').agg(min('event_date').alias('first_login')).display()

In [0]:
#pandas
pdf_game = df_game.toPandas()
display(pdf_game)

In [0]:
pdf_game.groupby('player_id')['event_date'].min().reset_index().display()


### 577. Employee Bonus

Write a solution to report the name and bonus amount of each employee with a bonus less than 1000.

Return the result table in any order.

The result format is in the following example.

Input: 
Employee table:
| empId | name   | supervisor | salary |
|-------|--------|------------|--------|
| 3     | Brad   | null       | 4000   |
| 1     | John   | 3          | 1000   |
| 2     | Dan    | 3          | 2000   |
| 4     | Thomas | 3          | 4000   |

### Bonus Table:
| empId | bonus |
|-------|-------|
| 2     | 500   |
| 4     | 2000  |

### Output Table:
| name  | bonus |
|-------|-------|
| Brad  | null  |
| John  | null  |
| Dan   | 500   |


In [0]:
# 1. Employee Table Schema
employee_schema = StructType([
    StructField("empId", IntegerType(), False),
    StructField("name", StringType(), False),
    StructField("supervisor", IntegerType(), True),  # Nullable
    StructField("salary", IntegerType(), False)
])

# Employee Table Data
employee_data = [
    (3, "Brad", None, 4000),
    (1, "John", 3, 1000),
    (2, "Dan", 3, 2000),
    (4, "Thomas", 3, 4000)
]

# Create Employee DataFrame
df_employee = spark.createDataFrame(employee_data, schema=employee_schema)
df_employee.show()

# 2. Bonus Table Schema
bonus_schema = StructType([
    StructField("empId", IntegerType(), False),
    StructField("bonus", IntegerType(), True)  # Nullable
])

# Bonus Table Data
bonus_data = [
    (2, 500),
    (4, 2000)
]

# Create Bonus DataFrame
df_bonus = spark.createDataFrame(bonus_data, schema=bonus_schema)
df_bonus.show()

In [0]:
df_group = df_employee.join(df_bonus, df_employee.empId == df_bonus.empId, 'left')\
    .select('name', 'bonus')
df_group.filter((df_group['bonus'] < 1000) | (df_group['bonus'].isNull())).display()

In [0]:
#Pandas

pdf_employee = df_employee.toPandas()
pdf_employee.display()
pdf_bonus = df_bonus.toPandas()
pdf_bonus.display()

In [0]:
pdf_merge = pdf_employee.merge(pdf_bonus, on = 'empId', how = 'left').iloc[:, [1,4]]
pdf_merge.loc[(pdf_merge['bonus'] < 1000) | (pdf_merge['bonus'].isnull())].display()

### 584. Find Customer Referee

Find the names of the customer that are not referred by the customer with id = 2.

Return the result table in any order.

The result format is in the following example.

### Input: Customer Table
| id | name | referee_id |
|----|------|------------|
| 1  | Will | null       |
| 2  | Jane | null       |
| 3  | Alex | 2          |
| 4  | Bill | null       |
| 5  | Zack | 1          |
| 6  | Mark | 2          |

### Output:
| name |
|------|
| Will |
| Jane |
| Bill |
| Zack |


In [0]:
# Define Schema for Customer Table
customer_schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), False),
    StructField("referee_id", IntegerType(), True)  # Nullable
])

# Data for Customer Table
customer_data = [
    (1, "Will", None),
    (2, "Jane", None),
    (3, "Alex", 2),
    (4, "Bill", None),
    (5, "Zack", 1),
    (6, "Mark", 2)
]

# Create DataFrame
df_customer = spark.createDataFrame(customer_data, schema=customer_schema)

# Show DataFrame
df_customer.show()

In [0]:
df_customer.select('name').filter((df_customer['referee_id'] != 2) | (df_customer['referee_id'].isNull())).display()

In [0]:
# Pandas
pdf_customer = df_customer.toPandas()
pdf_customer['name'][(pdf_customer['referee_id'] != 2) | (pdf_customer['referee_id'].isnull())]

### 620. Not Boring Movies

Write a solution to report the movies with an odd-numbered ID and a description that is not "boring".

Return the result table ordered by rating in descending order.

The result format is in the following example.

##### **Input** -
Cinema table

| id | movie      | description | rating |
|----|-----------|-------------|--------|
| 1  | War       | great 3D    | 8.9    |
| 2  | Science   | fiction     | 8.5    |
| 3  | irish     | boring      | 6.2    |
| 4  | Ice song  | Fantacy     | 8.6    |
| 5  | House card| Interesting | 9.1    |


##### **Output Table** -

| id | movie      | description | rating |
|----|-----------|-------------|--------|
| 5  | House card| Interesting | 9.1    |
| 1  | War       | great 3D    | 8.9    |

**Explanation**: 
We have three movies with odd-numbered IDs: 1, 3, and 5. The movie with ID = 3 is boring so we do not include it in the answer.

In [0]:
from decimal import Decimal

In [0]:
# Define Schema with DecimalType (Precision 3, Scale 1)
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("movie", StringType(), True),
    StructField("description", StringType(), True),
    StructField("rating", DecimalType(4, 1), True)  # Precision 4, Scale 1 (e.g., 9.1)
])

# Ensure Float Values are Converted to Decimal
data = [
    (1, "War", "great 3D", Decimal("8.9")),
    (2, "Science", "fiction", Decimal("8.5")),
    (3, "irish", "boring", Decimal("6.2")),
    (4, "Ice song", "Fantacy", Decimal("8.6")),
    (5, "House card", "Interesting", Decimal("9.1"))
]

# Create DataFrame
df_cinema = spark.createDataFrame(data, schema=schema)

# Show DataFrame
df_cinema.show()

In [0]:
df_cinema.filter((col('description') != 'boring') & (col('id') % 2 == 1)) \
    .orderBy(col('rating'), ascending=False)\
         .display()

In [0]:
pdf_cinema = df_cinema.toPandas()

In [0]:
pdf_cinema.loc[(pdf_cinema['description'] != 'boring') & (pdf_cinema['id'] % 2 == 1)].sort_values(by = 'rating', ascending = False)

### 627 Swap Salary
Write a solution to swap all 'f' and 'm' values (i.e., change all 'f' values to 'm' and vice versa) with a single update statement and no intermediate temporary tables.
Note that you must write a single update statement, do not write any select statement for this problem.
The result format is in the following example.

### Input: Salary Table
| id | name | sex | salary |
|----|------|-----|--------|
| 1  | A    | m   | 2500   |
| 2  | B    | f   | 1500   |
| 3  | C    | m   | 5500   |
| 4  | D    | f   | 500    |

### Output: Salary Table (Swapped Sex Column)
| id | name | sex | salary |
|----|------|-----|--------|
| 1  | A    | f   | 2500   |
| 2  | B    | m   | 1500   |
| 3  | C    | f   | 5500   |
| 4  | D    | m   | 500    |


Explanation: 
(1, A) and (3, C) were changed from 'm' to 'f'.
(2, B) and (4, D) were changed from 'f' to 'm'.
 

In [0]:
# Define Schema
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("sex", StringType(), True),
    StructField("salary", IntegerType(), True)
])

# Create Data
data = [
    (1, "A", "m", 2500),
    (2, "B", "f", 1500),
    (3, "C", "m", 5500),
    (4, "D", "f", 500)
]

# Create DataFrame
df_salary = spark.createDataFrame(data, schema=schema)

# Show DataFrame
df_salary.show()

In [0]:
df_salary.withColumn('sex',
                     when(col('sex') == 'f', 'm')\
                         .when(col('sex') == 'm', 'f')).display()

### 1050. Actors and Directors Who Cooperated At Least Three Times

Write a solution to find all the pairs (actor_id, director_id) where the actor has cooperated with the director at least three times.
Return the result table in any order.
The result format is in the following example.

### Input: ActorDirector Table
| actor_id  | director_id | timestamp |
|-----------|------------|-----------|
| 1         | 1          | 0         |
| 1         | 1          | 1         |
| 1         | 1          | 2         |
| 1         | 2          | 3         |
| 1         | 2          | 4         |
| 2         | 1          | 5         |
| 2         | 1          | 6         |

### Output: ActorDirector Table (Most Frequent Pair)
| actor_id  | director_id |
|-----------|------------|
| 1         | 1          |

**Explanation**: The only pair is (1, 1) where they cooperated exactly 3 times.

In [0]:
# Define Schema
schema = StructType([
    StructField("actor_id", IntegerType(), True),
    StructField("director_id", IntegerType(), True),
    StructField("timestamp", IntegerType(), True)
])

# Create Data
data = [
    (1, 1, 0),
    (1, 1, 1),
    (1, 1, 2),
    (1, 2, 3),
    (1, 2, 4),
    (2, 1, 5),
    (2, 1, 6)
]

# Create DataFrame
df_actor_director = spark.createDataFrame(data, schema=schema)

# Show DataFrame
df_actor_director.show()

In [0]:
df_actor_director.groupBy('actor_id', 'director_id').agg(count('*').alias('count')).select('actor_id', 'director_id').filter(col('count') == 3).display()