In [1]:
# Import Libraries
from pyspark.sql import SparkSession

## Spark Session

In [2]:
spark = SparkSession.builder \
        .appName("SQLClass1App") \
        .getOrCreate()

In [3]:
spark.version

'3.5.4'

In [4]:
# mysql connector property for java com.mysql:mysql-connector-j:9.0.0

In [5]:
# Define MySQL credentials
mysql_url = "jdbc:mysql://localhost:3307/class_1"
mysql_user = "root"
mysql_password = "00000"
mysql_driver = "com.mysql.cj.jdbc.Driver"

# MySQL connection with spark

In [6]:
# create table

In [7]:
query = "select * from prices"

In [8]:
df = spark.read \
    .format("jdbc") \
    .option("url", mysql_url) \
    .option("user", mysql_user) \
    .option("query", query) \
    .option("password", mysql_password) \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .load()

In [9]:
def mysql_read_action(query):
    """
    Perform MySQL actions like read, write, update and delete
    """
    return spark.read \
    .format("jdbc") \
    .option("url", mysql_url) \
    .option("user", mysql_user) \
    .option("query", query) \
    .option("password", mysql_password) \
    .option("driver", mysql_driver) \
    .load()

In [10]:
df = mysql_read_action(query)

In [11]:
df.show()

+----------+----------+----------+-----+
|product_id|start_date|  end_date|price|
+----------+----------+----------+-----+
|         1|2019-02-17|2019-02-28|    5|
|         1|2019-03-01|2019-03-22|   20|
|         2|2019-02-01|2019-02-20|   15|
|         2|2019-02-21|2019-03-31|   30|
+----------+----------+----------+-----+



# SQL and Pyspark Practice NoteBook

#### Q1. Query all columns for all American cities in the CITY table with populations larger than 100000.The CountryCode for America is USA. The CITY table is described as follows:

## pyspark solution

In [None]:
from pyspark.sql.functions import col

In [None]:
df = mysql_read_action(query)

In [None]:
df_filtered = df.filter(col('POPULATION')>100000).where(col("COUNTRYCODE")=="USA")

In [None]:
df_filtered.show()

In [None]:
# another way in pyspark
df_filtered_1 = df.filter((col('POPULATION')>100000) & (col("COUNTRYCODE")=="USA"))

In [None]:
df_filtered_1.show()

In [None]:
df_filtered.describe()

1. Both filter() and where() do the same job in the DataFrame API.
2. where() is just an alias for filter(), provided for SQL-like syntax consistency.
3. You can use column expressions or SQL-like string conditions in both.

## SQL solution

In [None]:
sql_query = "select * from CITY where COUNTRYCODE='USA' and population>100000"

In [None]:
df_sql = mysql_read_action(sql_query)

In [None]:
df_sql.show()

###  Q2. Query the NAME field for all American cities in the CITY table with populations larger than 120000.The CountryCode for America is USA. The CITY table is described as follows:

In [None]:
# pyspark solution

In [None]:
df.show()

In [None]:
df_filtered_2 = df.select(col("NAME"), col("POPULATION")).filter(col("POPULATION")>120000)

In [None]:
df_filtered_2.show()

In [None]:
# SQL solution

In [None]:
sql_query_2 = "select name, population from city where population>120000"

In [None]:
df_sql_2 = mysql_read_action(sql_query_2)

In [None]:
df_sql_2.show()

### Q3. Query all columns (attributes) for every row in the CITY table.

In [None]:
# spark solution

In [None]:
df.show()

In [None]:
df_filtered_3 = df.select(col("*"))

In [None]:
df_filtered_3.show()

In [None]:
# SQL solution

In [None]:
sql_query_3 = "select * from city"

df_sql_3 = mysql_read_action(sql_query_3)

In [None]:
df_sql_3.show()

### Q4. Query all columns for a city in CITY with the ID 1661.

In [None]:
 # spark solution

In [None]:
df.show()

In [None]:
df_filtered_4 = df.where(col("ID")==1661)

In [None]:
df_filtered_4.show()

In [None]:
# sql solution

In [None]:
sql_query_4 = "select * from city where id=1661"

In [None]:
df_sql4 = mysql_read_action(sql_query_4)

In [None]:
df_sql4.show()

### Q5. Query all attributes of every Japanese city in the CITY table. The COUNTRYCODE for Japan is  JPN.

In [None]:
# spark solution

In [None]:
df.show()

In [None]:
df_filtered_5 = df.select(col("*")).filter(col("COUNTRYCODE")=="JPN")

In [None]:
df_filtered_5.show()

In [None]:
# sql solution

In [None]:
sql_query_5 = "select * from city where countrycode='JPN'"

In [None]:
df_sql_5 = mysql_read_action(sql_query_5)

In [None]:
df_sql_5.show()

###  Q6. Query the names of all the Japanese cities in the CITY table. The COUNTRYCODE for Japan is JPN.

In [None]:
# sql solution

In [None]:
sql_query_6 = "select name from CITY where COUNTRYCODE='JPN'"

In [None]:
df_sql_6 = mysql_read_action(sql_query_6)

In [None]:
df_sql_6.show()

In [None]:
# pyspark solution

In [None]:
df.show()

In [None]:
df_filtered_6 = df.select(col('name')).where(col("countrycode")=="JPN")

In [None]:
df_filtered_6.show()

###  Q7. Query a list of CITY and STATE from the STATION table

In [None]:
station_table_query = "select * from station"

In [None]:
station_df = mysql_read_action(station_table_query)

In [None]:
station_df.show()

In [None]:
station_df.columns

In [None]:
sql_query_7 = " select city, state from station"

In [None]:
df_sql_7 = mysql_read_action(sql_query_7)

In [None]:
df_sql_7.show()

In [None]:
# spark solution

In [None]:
df_filtered_7 = station_df.select(col("city"), col("state"))

In [None]:
df_filtered_7.show()

In [None]:
df_filtered_7 = station_df.selectExpr("city", "state")

In [None]:
df_filtered_7.show()

 ### Q8. Query a list of CITY names from STATION for cities that have an even ID number. Print the results in any order, but exclude duplicates from the answer.

In [None]:
sql_query_8 = " select distinct city from station where ID%2=0"

In [None]:
df_sql_8 = mysql_read_action(sql_query_8)

In [None]:
df_sql_8.show()

In [None]:
# spark solution

In [None]:
station_df.show()

In [None]:
df_filtered_8 = station_df.selectExpr("city").distinct().filter(col("ID")%2==0)

In [None]:
df_filtered_8.show()

### Q9. Find the difference between the total number of CITY entries in the table and the number of distinct CITY entries in the table.

In [None]:
sql_query_9 = "select count(city) as total_number_of_cities, count(distinct city) as unique_cities, (count(city) - count(distinct city)) as difference from station"

In [None]:
df_sql_9 = mysql_read_action(sql_query_9)

In [None]:
df_sql_9.show()

In [None]:
# pyspark solution

In [None]:
df_filtered_9 = station_df.selectExpr("count(city) as total_cities", 
                                     "count(distinct city) as unique_cities",
                                     "count(city) - count(distinct city) as difference")

In [None]:
df_filtered_9.show()

###  Q10. Query the two cities in STATION with the shortest and longest CITY names, as well as their
### respective lengths (i.e.: number of characters in the name). If there is more than one smallest or
### largest city, choose the one that comes first when ordered alphabetically.

In [None]:
# sql query 

In [None]:
sql_query_9_1 = "select city, length(city) as city_length from station order by length(city) desc limit 1"

In [None]:
df_sql_9_1 = mysql_read_action(sql_query_9_1)

In [None]:
df_sql_9_1.show()

In [None]:
sql_query_9_2 = "select city, length(city) as city_length from station order by length(city), city asc limit 1"

In [None]:
df_sql_9_2 = mysql_read_action(sql_query_9_2)

In [None]:
df_sql_9_2.show()

In [None]:
# pyspark solution

In [None]:
from pyspark.sql.functions import length
df_filtered_9_1 = station_df.select(col("city"), length(col("city")).alias("city_length")).orderBy(col("city_length").desc()).limit(1)

In [None]:
df_filtered_9_1.show()

In [None]:
df_filtered_9_2 = station_df.select(col("city"), length(col("city")).alias("city_length")).orderBy(col("city")).limit(1)

In [None]:
df_filtered_9_2.show()

###  Q11. Query the list of CITY names starting with vowels (i.e., a, e, i, o, or u) from STATION. Your result cannot contain duplicates.

In [None]:
# sql_solution

In [None]:
sql_query_10 = "SELECT DISTINCT(CITY) AS DISTINCT_CITY_NAME FROM STATION WHERE lower(SUBSTR(city,1,1)) in ('a','e','i','o','u')"

In [None]:
df_sql_10 = mysql_read_action(sql_query_10)

In [None]:
df_sql_10.show()

In [None]:
# pyspark solution

In [None]:
from pyspark.sql.functions import lower
df_filtered_11 = station_df.select(col("city")).distinct().filter(lower(col('city')).substr(1, 1).isin("a", "e", "i", "o", "u"))

In [None]:
df_filtered_11.show()

### Q12. Query the list of CITY names ending with vowels (a, e, i, o, u) from STATION. Your result cannot contain duplicates.

In [None]:
sql_query_11 = "SELECT DISTINCT(CITY) AS DISTINCT_CITY_NAME FROM STATION WHERE lower(SUBSTR(city,-1,1)) in ('a','e','i','o','u')"

In [None]:
df_sql_11 = mysql_read_action(sql_query_11)

In [None]:
df_sql_11.show()

In [None]:
# pyspark solution
df_filtered_12 = station_df.select(col("city")).distinct().filter(lower(col('city')).substr(-1, 1).isin("a", "e", "i", "o", "u"))

In [None]:
df_filtered_12.show()

### Q13. Query the list of CITY names from STATION that do not start with vowels. Your result cannot contain duplicates.

In [None]:
sql_query_13 = "select distinct city from station where lower(substr(city, 1, 1)) not in ('a', 'b', 'c', 'd', 'e')"

In [None]:
df_sql_13 = mysql_read_action(sql_query_13)

In [None]:
df_sql_13.show()

In [None]:
from pyspark.sql.functions import col, lower, substr

df_filtered_13 = station_df \
    .select("city") \
    .distinct() \
    .filter(~lower(col("city")).substr(1, 1).isin("a", "e", "i", "o", "u"))

In [None]:
df_filtered_13.show()

 ### Q14. Query the list of CITY names from STATION that do not end with vowels. Your result cannot contain duplicates.

In [None]:
# sql_solution

In [None]:
sql_query_14 = "select distinct city from station where lower(substr(city, -1, 1)) not in ('a', 'e', 'i', 'o', 'u')"

In [None]:
df_query_14 = mysql_read_action(sql_query_14)

In [None]:
df_query_14.show()

In [None]:
# spark solution

In [None]:
df_filtered_14 = station_df.select(col('city'))\
                           .distinct()\
                           .filter(~lower(col('city')).substr(-1,1).isin('a', 'e', 'i', 'o', 'u'))

In [None]:
df_filtered_14.show()

### Q15. Query the list of CITY names from STATION that either do not start with vowels and do not end with vowels. Your result cannot contain duplicates.

In [None]:
sql_query_15 = "select city from station where substr(lower(city), 1, 1) not in ('a', 'e', 'i', 'o', 'u') and substr(lower(city), -1, 1) not in  ('a', 'e', 'i', 'o', 'u')"

In [None]:
df_sql_15 = mysql_read_action(sql_query_15)

In [None]:
df_sql_15.show()

In [None]:
# pyspsark solution

In [None]:
df_filtered_15 = station_df.select(col('city'))\
                           .distinct()\
                           .filter(~lower(col('city')).substr(1,1).isin('a', 'e', 'i', 'o', 'u') & ~lower(col("city")).substr(-1, 1).isin('a', 'e', 'i', 'o', 'u'))

In [None]:
df_filtered_15.show()

### Q16. Query the list of CITY names from STATION that do not start with vowels or do not end with vowels. Your result cannot contain duplicates.

In [None]:
sql_query_16 = "select city from station where substr(lower(city), 1, 1) not in ('a', 'e', 'i', 'o', 'u') or substr(lower(city), -1, 1) not in  ('a', 'e', 'i', 'o', 'u')"

In [None]:
df_sql_16 = mysql_read_action(sql_query_16)

In [None]:
df_sql_16.show()

In [None]:
# pyspark solution

In [None]:
df_filtered_16 = station_df.select(col('city'))\
                           .distinct()\
                           .filter(~lower(col('city')).substr(1,1).isin('a', 'e', 'i', 'o', 'u') | ~lower(col("city")).substr(-1, 1).isin('a', 'e', 'i', 'o', 'u'))

In [None]:
df_filtered_16.show()

### Q17.
 Table: Product
 Column Name Type
 product_id
 int
 product_name varchar
 unit_price int
 product_id is the primary key of this table.
 Each row of this table indicates the name and the price of each product.
 Table: Sales
 Column Name Type
 seller_id
 int
 product_id
 buyer_id
 int
 int
 sale_date
 quantity
 price
 date
 int
 int
 This table has no primary key, it can have repeated rows.
 product_id is a foreign key to the Product table.
 Each row of this table contains some information about one sale.
 Write an SQL query that reports the products that were only sold in the first quarter of 2019. That is,
 between 2019-01-01 and 2019-03-31 inclusive.
 Return the result table in any order.
 The query result format is in the following example.
 Input:
 Product table:
 product_id
 1
 product_name unit_price
 S8
 1000
 2
 3
 Sales table:
 seller_id
 G4
 iPhone
 product_id
 800
 1400
 buyer_id
 sale_date
 quantity
 1
 1
 1
 2019-01-21
 2
 price
 2000
 1
 2
 3
 Output:
 product_id
 1
 2
 2
 3
 product_name
 S8
 2
 3
 4
 2019-02-17
 2019-06-02
 2019-05-13
 1
 1
 2
 800
 800
 2800
 Explanation:
 The product with id 1 was only sold in the spring of 2019.
 The product with id 2 was sold in the spring of 2019 but was also sold after the spring of 2019.
 The product with id 3 was sold after spring 2019.
 We return only product 1 as it is the product that was only sold in the spring of 2019

### 17. Write an SQL query that reports the products that were only sold in the first quarter of 2019. That is, between 2019-01-01 and 2019-03-31 inclusive.

In [None]:
# sql query

In [None]:
sql_query_17 = """SELECT p.product_id, p.product_name
FROM product p
LEFT JOIN (
    SELECT DISTINCT product_id
    FROM sales
    WHERE sale_date NOT BETWEEN '2019-01-01' AND '2019-03-31'
) s ON p.product_id = s.product_id
WHERE s.product_id IS NULL"""

# select product_id, product_name from product where product_id not in (select product_id from sales where sale_date not between "2019-01-01" and "2019-03-31")

In [None]:
df_sql_17 = mysql_read_action(sql_query_17)

In [None]:
df_sql_17.show()

In [None]:
product_query = "select * from product"

In [None]:
sales_query = "select * from sales"

In [None]:
product_df = mysql_read_action(product_query)

In [None]:
product_df.show()

In [None]:
sales_df = mysql_read_action(sales_query)

In [None]:
sales_df.show()

In [None]:
# Step 1: Filter sales that occurred OUTSIDE the date range
sales_outside = sales_df.filter(~col("sale_date").between("2019-01-01", "2019-03-31")) \
                        .select("product_id") \
                        .distinct()

# Step 2: Join product_df with the "sales_outside" using left anti join
df_filtered_17 = product_df.join(sales_outside, on="product_id", how="left_anti") \
                      .select("product_id", "product_name")


In [None]:
df_filtered_17.show()

### 18. Write an SQL query to find all the authors that viewed at least one of their own articles. Return the result table sorted by id in ascending order.

In [None]:
sql_query_18 = "select distinct author_id from views where author_id=viewer_id order by author_id asc"

In [None]:
df_sql_18 = mysql_read_action(sql_query_18)

In [None]:
df_sql_18.show()

In [None]:
# pyspark solution

In [None]:
views_query = "select * from views"
views_df = mysql_read_action(views_query)
views_df.show()

In [None]:
df_filtered_18 = views_df.select(col("author_id")) \
                         .filter(col('author_id')==col("viewer_id")) \
                         .distinct()\
                         .orderBy(col('author_id').asc())

In [None]:
df_filtered_18.show()

### 19. delivery_id is the primary key of this table. The table holds information about food delivery to customers that make orders at some date and specify a preferred delivery date (on the same order date or after it). If the customer's preferred delivery date is the same as the order date, then the order is called immediately; otherwise, it is called scheduled.
### Write an SQL query to find the percentage of immediate orders in the table, rounded to 2 decimal places.

In [None]:
sql_query_19 = """
SELECT 
    ROUND(
        100.0 * SUM(CASE WHEN order_date = customer_pref_delivery_date THEN 1 ELSE 0 END) 
        / COUNT(*), 
        2
    ) AS immediate_percentage
FROM Delivery
"""

In [None]:
df_sql_19 = mysql_read_action(sql_query_19)

In [None]:
df_sql_19.show()

In [None]:
# spark solution

In [None]:
delivery_query = "select * from delivery"

In [None]:
delivery_df = mysql_read_action(delivery_query)

In [None]:
delivery_df.show()

In [None]:
from pyspark.sql.functions import when, sum, count, round
delivery_df1 = delivery_df.withColumn("is_immediate",
                                     when(col('order_date') == col('customer_pref_delivery_date'), 1).otherwise(0))

In [None]:
delivery_df1.show()

In [None]:
df_filtered_19 = delivery_df1.select(round(100*(sum(col("is_immediate"))/count('delivery_id')), 2).alias("delivery_percentage"))

In [None]:
df_filtered_19.show()

### 20. Write an SQL query to find the ctr of each Ad. Round ctr to two decimal points.Return the result table ordered by ctr in descending order and by ad_id in ascending order in case of a tie.

In [None]:
sql_query_20 = """select 
b.ad_id,
round(case when (b.total_clicks+b.total_views) = 0 then 0 else 100*(b.total_clicks/(b.total_clicks+b.total_views)) end,2) as ctr
from
(select 
ad_id,
sum(case when action="Clicked" then 1 else 0 end) as total_clicks,
sum(case when action="Viewed" then 1 else 0 end) as total_views
from ads group by ad_id) as b"""

In [None]:
df_sql_20 = mysql_read_action(sql_query_20)

In [None]:
df_sql_20.show()

In [None]:
# spark solution

In [None]:
ads_query = "select * from ads"

In [None]:
ads_df = mysql_read_action(ads_query)

In [None]:
ads_df.show()

In [None]:
# spark solution

In [None]:
from pyspark.sql.functions import when, sum as _sum
df_filtered_20_1 = ads_df.groupby(col("ad_id")).agg(_sum(when(col("action") =="Clicked", 1).otherwise(0)).alias("total_clicks"))
df_filtered_20_2 = ads_df.groupby(col("ad_id")).agg(_sum(when(col("action") =="Viewed", 1).otherwise(0)).alias("total_views"))

In [None]:
df_filtered_20_1.show()

In [None]:
df_filtered_20_2.show()

In [None]:
df_filtered_20_3 = df_filtered_20_1.join(df_filtered_20_2, on="ad_id", how="left").select(col("ad_id"), col("total_clicks"), col("total_views"))

In [None]:
df_filtered_20_3.show()

In [None]:
df_filtered_20 = df_filtered_20_3.withColumn("ctr",
                                            round(when(col("total_clicks")+col("total_views") == 0, 0).otherwise(100 * col('total_clicks')/(col('total_clicks')+col('total_views'))), 2))

In [None]:
df_filtered_20.show()

### 21.  Write an SQL query to find the team size of each of the employees.

In [12]:
sql_query_21 = """
select 
employee_id,
count(employee_id) over (partition by team_id) as team_size
from employee order by team_size desc
"""

In [13]:
df_query_21 = mysql_read_action(sql_query_21)

In [14]:
df_query_21.show()

+-----------+---------+
|employee_id|team_size|
+-----------+---------+
|          1|        3|
|          2|        3|
|          3|        3|
|          5|        2|
|          6|        2|
|          4|        1|
+-----------+---------+



In [15]:
# spark solution

In [16]:
employee_query = "select * from employee"

In [17]:
employee_df = mysql_read_action(employee_query)

In [18]:
employee_df.show()

+-----------+-------+
|employee_id|team_id|
+-----------+-------+
|          1|      8|
|          2|      8|
|          3|      8|
|          4|      7|
|          5|      9|
|          6|      9|
+-----------+-------+



In [22]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, count, sum
window_fun_21 = Window.partitionBy("team_id")
df_filtered_21 = employee_df.withColumn("team_size", count("team_id").over(window_fun_21)).orderBy("employee_id")

In [23]:
df_filtered_21.show()

+-----------+-------+---------+
|employee_id|team_id|team_size|
+-----------+-------+---------+
|          1|      8|        3|
|          2|      8|        3|
|          3|      8|        3|
|          4|      7|        1|
|          5|      9|        2|
|          6|      9|        2|
+-----------+-------+---------+



### 22 --Write an SQL query to find the type of weather in each country for November 2019.

### 23. Write an SQL query to find the average selling price for each product. average_price should be rounded to 2 decimal places.

In [13]:
sql_query_23 = """
select 
u.product_id,
round(sum(p.price * u.units)/sum(u.units), 2) as avg_prices
from UnitsSold u left join prices p on (u.product_id=p.product_id 
										and u.purchase_date >= p.start_date
                                        and u.purchase_date <= p.end_date) group by u.product_id
"""

In [15]:
df_filtered_23 = mysql_read_action(sql_query_23)

In [16]:
df_filtered_23.show()

+----------+----------+
|product_id|avg_prices|
+----------+----------+
|         1|      6.96|
|         2|     16.96|
+----------+----------+



In [17]:
# pyspark solution

In [18]:
prices_query = "select * from prices"

In [19]:
prices_df = mysql_read_action(prices_query)

In [20]:
prices_df.show()

+----------+----------+----------+-----+
|product_id|start_date|  end_date|price|
+----------+----------+----------+-----+
|         1|2019-02-17|2019-02-28|    5|
|         1|2019-03-01|2019-03-22|   20|
|         2|2019-02-01|2019-02-20|   15|
|         2|2019-02-21|2019-03-31|   30|
+----------+----------+----------+-----+



In [21]:
unitsSold_query = "select * from unitssold"

In [23]:
unitsSold_df = mysql_read_action(unitsSold_query)

In [24]:
unitsSold_df.show()

+----------+-------------+-----+
|product_id|purchase_date|units|
+----------+-------------+-----+
|         1|   2019-02-25|  100|
|         1|   2019-03-01|   15|
|         2|   2019-02-10|  200|
|         2|   2019-03-22|   30|
+----------+-------------+-----+



In [53]:
from pyspark.sql.functions import col, sum as _sum, round
df_filtered_23_1 = unitsSold_df.join(prices_df, ((unitsSold_df["product_id"]==prices_df["product_id"]) & \
                                                (unitsSold_df.purchase_date>=prices_df.start_date) & \
                                                (unitsSold_df.purchase_date <= prices_df.end_date)), how="left") \
                                                .groupBy(unitsSold_df.product_id) \
                                                .agg(round(_sum(unitsSold_df.units*prices_df.price)/_sum(unitsSold_df.units), 2).alias("avg_price"))

In [54]:
df_filtered_23_1.show()

+----------+---------+
|product_id|avg_price|
+----------+---------+
|         1|     6.96|
|         2|    16.96|
+----------+---------+



### 24.Write an SQL query to report the first login date for each player.

In [55]:
sql_query_24 = """
select * from (
select 
a.player_id,
a.event_date as first_login,
row_number() over(partition by a.player_id) as row_num
from activity a) b where b.row_num=1
"""

In [56]:
df_sql_24 = mysql_read_action(sql_query_24)

In [57]:
df_sql_24.show()

+---------+-----------+-------+
|player_id|first_login|row_num|
+---------+-----------+-------+
|        1| 2016-03-01|      1|
|        2| 2017-06-25|      1|
|        3| 2016-03-02|      1|
+---------+-----------+-------+



In [58]:
# spark solution

In [60]:
activities_query = "select * from activity"
activity_df = mysql_read_action(activities_query)
activity_df.show()

+---------+---------+----------+------------+
|player_id|device_id|event_date|games_played|
+---------+---------+----------+------------+
|        1|        2|2016-03-01|           5|
|        1|        2|2016-05-02|           6|
|        2|        3|2017-06-25|           1|
|        3|        1|2016-03-02|           0|
|        3|        4|2018-07-03|           5|
+---------+---------+----------+------------+



In [71]:
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window
window_s = Window.partitionBy("player_id")
df_filtered_24 = activity_df.withColumn("row_number", row_number().over(window_s.orderBy("event_date"))).where(col("row_number")==1).select("player_id", "event_date")

In [72]:
df_filtered_24.show()

+---------+----------+
|player_id|event_date|
+---------+----------+
|        1|2016-03-01|
|        2|2017-06-25|
|        3|2016-03-02|
+---------+----------+



### 25. Write an SQL query to report the device that is first logged in for each player.

In [73]:
df_filtered_25 = activity_df.withColumn("row_number", row_number().over(window_s.orderBy("event_date"))).where(col("row_number")==1).select("player_id", "device_id")

In [74]:
df_filtered_25.show()

+---------+---------+
|player_id|device_id|
+---------+---------+
|        1|        2|
|        2|        3|
|        3|        1|
+---------+---------+



### 26. Write an SQL query to get the names of products that have at least 100 units ordered in February 2020 and their amount.

In [76]:
sql_query_26 = """
select 
p.product_id,
p.product_name
from products p inner join
(select
a.product_id,
sum(a.unit) as total_units
from(
select 
o.product_id,
extract(year from o.order_date) as _year,
extract(month from o.order_date) as _month,
o.unit
from orders o where extract(year from o.order_date)=2020 and extract(month from o.order_date)=2) a group by a.product_id having total_units>=100) b on p.product_id=b.product_id
"""

In [77]:
df_sql_26 = mysql_read_action(sql_query_26)

In [78]:
df_sql_26.show()

+----------+------------------+
|product_id|      product_name|
+----------+------------------+
|         1|Leetcode Solutions|
|         5|     Leetco de Kit|
+----------+------------------+



In [79]:
# pyspark solution

In [80]:
orders_query = "select * from orders"

In [81]:
orders_df = mysql_read_action(orders_query)

In [82]:
orders_df.show()

+----------+----------+----+
|product_id|order_date|unit|
+----------+----------+----+
|         1|2020-02-05|  60|
|         1|2020-02-10|  70|
|         2|2020-01-18|  30|
|         2|2020-02-11|  80|
|         3|2020-02-17|   2|
|         3|2020-02-24|   3|
|         4|2020-03-01|  20|
|         4|2020-03-04|  30|
|         4|2020-03-04|  60|
|         5|2020-02-25|  50|
|         5|2020-02-27|  50|
|         5|2020-03-01|  50|
+----------+----------+----+



In [83]:
products_query = "select * from products"

In [84]:
products_df = mysql_read_action(products_query)

In [85]:
products_df.show()

+----------+--------------------+----------------+
|product_id|        product_name|product_category|
+----------+--------------------+----------------+
|         1|  Leetcode Solutions|            Book|
|         2|Jewels of Stringo...|            Book|
|         3|                  HP|          Laptop|
|         4|              Lenovo|          Laptop|
|         5|       Leetco de Kit|         T-shirt|
+----------+--------------------+----------------+



In [106]:
from pyspark.sql.functions import extract, month, year, lit, sum as _sum
df_filtered_26_1 = orders_df.select(orders_df.product_id, col('unit'),extract(lit("month"), orders_df.order_date).alias("month"), extract(lit("year"), orders_df.order_date).alias("year"))\
                            .filter((col("month")==2) & (col("year")==2020))\
                            .groupby("product_id")\
                            .agg(_sum(col('unit')).alias("total_units"))\
                            .where(col("total_units")>=100)\
                            .join(products_df, orders_df.product_id==products_df.product_id, how="inner")\
                            .select(products_df.product_id, products_df.product_name)

In [107]:
df_filtered_26_1.show()

+----------+------------------+
|product_id|      product_name|
+----------+------------------+
|         1|Leetcode Solutions|
|         5|     Leetco de Kit|
+----------+------------------+



### Write an SQL query to find the users who have valid emails.

In [108]:
sql_query_27 = """select * from users where mail rlike '^[a-zA-Z][a-zA-Z0-9_.-]*@leetcode.com$'"""

In [109]:
df_sql_27 = mysql_read_action(sql_query_27)

In [110]:
df_sql_27.show()

+-------+---------+--------------------+
|user_id|     name|                mail|
+-------+---------+--------------------+
|      1|  Winston|winston@leetcode.com|
|      3|Annabelle| bella-@leetcode.com|
|      4|    Sally|sally.come@leetco...|
+-------+---------+--------------------+



In [111]:
# spark solution

In [114]:
users_query = "select * from users"
users_df = mysql_read_action(users_query)
users_df.show()

+-------+---------+--------------------+
|user_id|     name|                mail|
+-------+---------+--------------------+
|      1|  Winston|winston@leetcode.com|
|      2| Jonathan|     jonathanisgreat|
|      3|Annabelle| bella-@leetcode.com|
|      4|    Sally|sally.come@leetco...|
|      5|   Marwan|quarz#2020@leetco...|
|      6|    David|   david69@gmail.com|
|      7|  Shapiro| .shapo@leetcode.com|
+-------+---------+--------------------+



In [116]:
df_filtered_27 = users_df.filter(col("mail").rlike("^[a-zA-Z][a-zA-Z0-9_.-]*@leetcode.com$"))

In [117]:
df_filtered_27.show()

+-------+---------+--------------------+
|user_id|     name|                mail|
+-------+---------+--------------------+
|      1|  Winston|winston@leetcode.com|
|      3|Annabelle| bella-@leetcode.com|
|      4|    Sally|sally.come@leetco...|
+-------+---------+--------------------+



### 28. Write an SQL query to report the customer_id and customer_name of customers who have spent at least $100 in each month of June and July 2020.

In [118]:
sql_query_28 = """
select o.customer_id, c.name
from Customers c, Product p, Orders1 o
where c.customer_id = o.customer_id and p.product_id = o.product_id
group by o.customer_id
having
(
sum(case when o.order_date like '2020-06%' then o.quantity*p.price
else 0 end) >= 100
and
sum(case when o.order_date like '2020-07%' then o.quantity*p.price
else 0 end) >= 100
)
"""

In [119]:
df_sql_28 = mysql_read_action(sql_query_28)

In [120]:
df_sql_28.show()

+-----------+-------+
|customer_id|   name|
+-----------+-------+
|          1|Winston|
+-----------+-------+



In [121]:
# spark sulution

In [123]:
customers_query = "select *from customers"
customers_df = mysql_read_action(customers_query)
customers_df.show()

+-----------+--------+-------+
|customer_id|    name|country|
+-----------+--------+-------+
|          1| Winston|    USA|
|          2|Jonathan|   Peru|
|          3|Moustafa|  Egypt|
+-----------+--------+-------+



In [124]:
orders1_query = "select *from orders1"
orders1_df = mysql_read_action(orders1_query)
orders1_df.show()

+--------+-----------+----------+----------+--------+
|order_id|customer_id|product_id|order_date|quantity|
+--------+-----------+----------+----------+--------+
|       1|          1|        10|2020-06-10|       1|
|       2|          1|        20|2020-07-01|       1|
|       3|          1|        30|2020-07-08|       2|
|       4|          2|        10|2020-06-15|       2|
|       5|          2|        40|2020-07-01|      10|
|       6|          3|        20|2020-06-24|       2|
|       7|          3|        30|2020-06-25|       2|
|       9|          3|        30|2020-05-08|       3|
+--------+-----------+----------+----------+--------+



In [125]:
product_query = "select *from product"
product_df = mysql_read_action(product_query)
product_df.show()

+----------+-----------+-----+
|product_id|description|price|
+----------+-----------+-----+
|        10|   LC Phone|  300|
|        20|  LCT-Shirt|   10|
|        30|    LC Book|   45|
|        40|LC Keychain|    2|
+----------+-----------+-----+



In [138]:
df_filtered_28_1 = orders1_df.join(customers_df, customers_df.customer_id==orders1_df.customer_id, how="left")\
                             .select(orders1_df.order_id, customers_df.customer_id, orders1_df.product_id, orders1_df.order_date, orders1_df.quantity)

In [139]:
df_filtered_28_1.show()

+--------+-----------+----------+----------+--------+
|order_id|customer_id|product_id|order_date|quantity|
+--------+-----------+----------+----------+--------+
|       1|          1|        10|2020-06-10|       1|
|       2|          1|        20|2020-07-01|       1|
|       3|          1|        30|2020-07-08|       2|
|       6|          3|        20|2020-06-24|       2|
|       7|          3|        30|2020-06-25|       2|
|       9|          3|        30|2020-05-08|       3|
|       4|          2|        10|2020-06-15|       2|
|       5|          2|        40|2020-07-01|      10|
+--------+-----------+----------+----------+--------+



In [141]:
df_filtered_28_2 = df_filtered_28_1.join(product_df, product_df.product_id==df_filtered_28_1.product_id, how="left")

In [142]:
df_filtered_28_2.show()

+--------+-----------+----------+----------+--------+----------+-----------+-----+
|order_id|customer_id|product_id|order_date|quantity|product_id|description|price|
+--------+-----------+----------+----------+--------+----------+-----------+-----+
|       1|          1|        10|2020-06-10|       1|        10|   LC Phone|  300|
|       2|          1|        20|2020-07-01|       1|        20|  LCT-Shirt|   10|
|       3|          1|        30|2020-07-08|       2|        30|    LC Book|   45|
|       6|          3|        20|2020-06-24|       2|        20|  LCT-Shirt|   10|
|       7|          3|        30|2020-06-25|       2|        30|    LC Book|   45|
|       9|          3|        30|2020-05-08|       3|        30|    LC Book|   45|
|       4|          2|        10|2020-06-15|       2|        10|   LC Phone|  300|
|       5|          2|        40|2020-07-01|      10|        40|LC Keychain|    2|
+--------+-----------+----------+----------+--------+----------+-----------+-----+



In [152]:
from pyspark.sql.functions import col, when, sum as _sum, expr

# Step 1: Join Customers, Orders1, and Product
joined_df = customers_df.alias("c") \
    .join(orders1_df.alias("o"), col("c.customer_id") == col("o.customer_id")) \
    .join(product_df.alias("p"), col("o.product_id") == col("p.product_id"))

# Step 2: Add total price per order
with_price = joined_df.withColumn("total_price", col("o.quantity") * col("p.price"))

# Step 3: Conditional aggregation
df_filtered_28 = with_price.groupBy("o.customer_id", "c.name").agg(
    _sum(when(col("o.order_date").like("2020-06%"), col("total_price")).otherwise(0)).alias("june_total"),
    _sum(when(col("o.order_date").like("2020-07%"), col("total_price")).otherwise(0)).alias("july_total")
).filter(
    (col("june_total") >= 100) & (col("july_total") >= 100)
).select("customer_id", "name")

In [153]:
df_filtered_28.show()

+-----------+-------+
|customer_id|   name|
+-----------+-------+
|          1|Winston|
+-----------+-------+



### 29. Write an SQL query to report the distinct titles of the kid-friendly movies streamed in June 2020. Return the result table in any order.

In [154]:
sql_query_29 = """
select c.title from content c where c.Kids_content="Y" and c.content_type = 'Movies' and c.content_id in(
select 
tv.content_id
from tvprogram tv where tv.program_date like "2020-06%")
"""

In [155]:
df_sql_29 = mysql_read_action(sql_query_29)

In [156]:
df_sql_29.show()

+-------+
|  title|
+-------+
|Aladdin|
+-------+



In [157]:
# spark solution

In [158]:
tvProgram_query = "select * from tvprogram"
tvProgram_df = mysql_read_action(tvProgram_query)
tvProgram_df.show()

+------------+----------+----------+
|program_date|content_id|   channel|
+------------+----------+----------+
|  2020-05-11|         2|LC-Channel|
|  2020-05-12|         3|LC-Channel|
|  2020-05-13|         4| Disney Ch|
|  2020-06-10|         1|LC-Channel|
|  2020-06-18|         4| Disney Ch|
|  2020-07-15|         5| Disney Ch|
+------------+----------+----------+



In [159]:
content_query = "select * from content"
content_df = mysql_read_action(content_query)
content_df.show()

+----------+--------------+------------+------------+
|content_id|         title|Kids_content|content_type|
+----------+--------------+------------+------------+
|         1|Leetcode Movie|           N|      Movies|
|         2|  Alg.for Kids|           Y|      Series|
|         3|  DatabaseSols|           N|      Series|
|         4|       Aladdin|           Y|      Movies|
|         5|    Cinderella|           Y|      Movies|
+----------+--------------+------------+------------+



In [174]:
df_filtered_29 = content_df.filter((content_df.Kids_content=="Y") & (content_df.content_type=="Movies"))\
                           .join(tvProgram_df, content_df.content_id==tvProgram_df.content_id)\
                           .filter(tvProgram_df.program_date.rlike("^2020-06")).select("title")

In [175]:
df_filtered_29.show()

+-------+
|  title|
+-------+
|Aladdin|
+-------+



### 30.Write an SQL query to find the npv of each query of the Queries table.

In [176]:
sql_query_30 = """
select q.id, q.year, COALESCE(n.npv, 0) as npv 
from queries q 
left join npv n 
on q.id=n.id and q.year=n.year
"""

In [177]:
df_sql_30 = mysql_read_action(sql_query_30)

In [178]:
df_sql_30.show()

+---+----+---+
| id|year|npv|
+---+----+---+
|  1|2019|113|
|  2|2008|121|
|  3|2009| 12|
|  7|2018|  0|
|  7|2019|  0|
|  7|2020| 30|
| 13|2019| 40|
+---+----+---+



In [179]:
# pyspark solution

In [181]:
npv_query = "select * from npv"
npv_df = mysql_read_action(npv_query)
npv_df.show()

+---+----+---+
| id|year|npv|
+---+----+---+
|  1|2018|100|
|  1|2019|113|
|  2|2008|121|
|  3|2009| 12|
|  7|2019|  0|
|  7|2020| 30|
| 11|2020| 99|
| 13|2019| 40|
+---+----+---+



In [183]:
queries_query = "select * from queries"
queries_df = mysql_read_action(queries_query)
queries_df.show()

+---+----+
| id|year|
+---+----+
|  1|2019|
|  2|2008|
|  3|2009|
|  7|2018|
|  7|2019|
|  7|2020|
| 13|2019|
+---+----+



In [195]:
from pyspark.sql.functions import coalesce, lit

# Perform the left join
joined_df = queries_df.join(
    npv_df,
    on=(queries_df.id == npv_df.id) & (queries_df.year == npv_df.year),
    how="left"
)

# Replace nulls in 'npv' with 0 using coalesce
df_filtered_30 = joined_df.select(
    queries_df.id,
    queries_df.year,
    coalesce(npv_df.npv, lit(0)).alias("npv")
)


In [197]:
df_filtered_30.show()

+---+----+---+
| id|year|npv|
+---+----+---+
|  7|2018|  0|
|  7|2020| 30|
|  2|2008|121|
|  1|2019|113|
|  7|2019|  0|
| 13|2019| 40|
|  3|2009| 12|
+---+----+---+



In [198]:
# 31 duplicate quation of 30

### 32. Write an SQL query to show the unique ID of each user, If a user does not have a unique ID replace just show null.

In [200]:
sql_query_32 = "select e.name, u.unique_id from Employees e left join EmployeeUNI u on u.id=e.id"
df_sql_32 = mysql_read_action(sql_query_32)
df_sql_32.show()

+--------+---------+
|    name|unique_id|
+--------+---------+
|   Alice|     NULL|
|Jonathan|        1|
|     Bob|     NULL|
|    Meir|        2|
| Winston|        3|
+--------+---------+



In [203]:
# spark solution

In [205]:
employees_query = "select * from employees"
employees_df = mysql_read_action(employees_query)
employees_df.show()

+---+--------+
| id|    name|
+---+--------+
|  1|   Alice|
|  3|Jonathan|
|  7|     Bob|
| 11|    Meir|
| 90| Winston|
+---+--------+



In [207]:
employeeuni_query = "select * from employeeuni"
employeeuni_df = mysql_read_action(employeeuni_query)
employeeuni_df.show()

+---+---------+
| id|unique_id|
+---+---------+
|  3|        1|
| 11|        2|
| 90|        3|
+---+---------+



In [210]:
df_filtered_32 = employees_df.join(employeeuni_df, on="id", how="left").select(employees_df.name, employeeuni_df.unique_id)

In [211]:
df_filtered_32.show()

+--------+---------+
|    name|unique_id|
+--------+---------+
|   Alice|     NULL|
|Jonathan|        1|
|     Bob|     NULL|
| Winston|        3|
|    Meir|        2|
+--------+---------+



### 33. Write an SQL query to report the distance travelled by each user. Return the result table ordered by travelled_distance in descending order, if two or more users travelled the same distance, order them by their name in ascending order.


In [84]:
sql_query_33 = "select u.name, coalesce(sum(r.distance), 0) as distance_travelled from users1 u left join rides r on u.id=r.user_id group by u.name order by distance_travelled desc, u.name asc"
df_sql_33 = mysql_read_action(sql_query_33)
df_sql_33.show()

+--------+------------------+
|    name|distance_travelled|
+--------+------------------+
|   Elvis|               450|
|     Lee|               450|
|     Bob|               317|
|Jonathan|               312|
|    Alex|               222|
|   Alice|               120|
|  Donald|                 0|
+--------+------------------+



In [85]:
# pyspark solution

In [86]:
users1_query = "select * from users1"
users1_df = mysql_read_action(users1_query)
users1_df.show()

+---+--------+
| id|    name|
+---+--------+
|  1|   Alice|
|  2|     Bob|
|  3|    Alex|
|  4|  Donald|
|  7|     Lee|
| 13|Jonathan|
| 19|   Elvis|
+---+--------+



In [87]:
rides_query = "select * from rides"
rides_df = mysql_read_action(rides_query)
rides_df.show()

+---+-------+--------+
| id|user_id|distance|
+---+-------+--------+
|  1|      1|     120|
|  2|      2|     317|
|  3|      3|     222|
|  4|      7|     100|
|  5|     13|     312|
|  6|     19|      50|
|  7|      7|     120|
|  8|     19|     400|
|  9|      7|     230|
+---+-------+--------+



In [88]:
from pyspark.sql import functions as F

# Assuming users1_df and rides_df are your DataFrames

# Join users and rides
joined_df = users1_df.join(rides_df, users1_df.id == rides_df.user_id, how="left")

# Group by name and aggregate distance
df_filtered_33 = (
    joined_df
    .groupBy("name")
    .agg(F.coalesce(F.sum("distance"), F.lit(0)).alias("distance_travelled"))
    .orderBy(F.desc("distance_travelled"), F.asc("name"))
)

# To show result
df_filtered_33.show()


+--------+------------------+
|    name|distance_travelled|
+--------+------------------+
|   Elvis|               450|
|     Lee|               450|
|     Bob|               317|
|Jonathan|               312|
|    Alex|               222|
|   Alice|               120|
|  Donald|                 0|
+--------+------------------+



### 34.Write an SQL query to get the names of products that have at least 100 units ordered in February 2020 and their amount.

In [12]:
sql_query_34 = """
SELECT a.product_name, SUM(unit) AS unit 
FROM Products1 a 
LEFT JOIN Orders2 b 
  ON a.product_id = b.product_id 
WHERE b.order_date BETWEEN '2020-02-01' AND '2020-02-29' 
GROUP BY a.product_id 
HAVING SUM(unit) >= 100
"""

In [13]:
df_sql_34 = mysql_read_action(sql_query_34)

In [14]:
df_sql_34.show()

+--------------------+----+
|        product_name|unit|
+--------------------+----+
|  Leetcode Solutions| 520|
|Jewels of Stringo...| 320|
|        Leetcode Kit| 400|
+--------------------+----+



In [15]:
# pyspark solution

In [16]:
products_1_query = "select * from products1"
product_1_df = mysql_read_action(products_1_query)
product_1_df.show()

+----------+--------------------+----------------+
|product_id|        product_name|product_category|
+----------+--------------------+----------------+
|         1|  Leetcode Solutions|            Book|
|         2|Jewels of Stringo...|            Book|
|         3|                  HP|          Laptop|
|         4|              Lenovo|          Laptop|
|         5|        Leetcode Kit|         T-shirt|
+----------+--------------------+----------------+



In [17]:
orders_2_query = "select * from orders2"
orders_2_df = mysql_read_action(orders_2_query)
orders_2_df.show()

+----------+----------+----+
|product_id|order_date|unit|
+----------+----------+----+
|         1|2020-02-05|  60|
|         1|2020-02-10|  70|
|         2|2020-01-18|  30|
|         2|2020-02-11|  80|
|         3|2020-02-17|   2|
|         3|2020-02-24|   3|
|         4|2020-03-01|  20|
|         4|2020-03-04|  30|
|         4|2020-03-04|  60|
|         5|2020-02-25|  50|
|         5|2020-02-27|  50|
|         5|2020-03-01|  50|
|         1|2020-02-05|  60|
|         1|2020-02-10|  70|
|         2|2020-01-18|  30|
|         2|2020-02-11|  80|
|         3|2020-02-17|   2|
|         3|2020-02-24|   3|
|         4|2020-03-01|  20|
|         4|2020-03-04|  30|
+----------+----------+----+
only showing top 20 rows



In [18]:
from pyspark.sql.functions import sum as _sum, col
df_filtered_34 = product_1_df.join(orders_2_df, product_1_df.product_id==orders_2_df.product_id)\
                             .where(orders_2_df.order_date.between("2020-02-01", "2020-02-29"))\
                             .groupBy("product_name").agg(_sum(col("unit")).alias("units")).filter(col("units")>100)

In [19]:
df_filtered_34.show()

+--------------------+-----+
|        product_name|units|
+--------------------+-----+
|Jewels of Stringo...|  320|
|        Leetcode Kit|  400|
|  Leetcode Solutions|  520|
+--------------------+-----+



### 35. Write an SQL query to:
### ● Find the name of the user who has rated the greatest number of movies. In case of a tie,
### return the lexicographically smaller user name.
### ● Find the movie name with the highest average rating in February 2020. In case of a tie, return
### the lexicographically smaller movie name.

In [21]:
sql_query_35 = """
(select 
u.name
from MovieRating m left join users2 u on m.user_id=u.user_id group by u.name order by count(*) desc, u.name asc limit 1)
union 
(select 
title
from movierating mr right join Movies mv on mr.movie_id=mv.movie_id where mr.created_at like "2020-02%" group by mv.movie_id order by avg(rating) desc, mv.title ASC limit 1)
"""
df_sql_35 = mysql_read_action(sql_query_35)
df_sql_35.show()

+--------+
|    name|
+--------+
|  Daniel|
|Frozen 2|
+--------+



In [22]:
# pyspark solution

In [23]:
movies_query = "select * from movies"
movies_df = mysql_read_action(movies_query)
movies_df.show()

+--------+--------+
|movie_id|   title|
+--------+--------+
|       1|Avengers|
|       2|Frozen 2|
|       3|   Joker|
+--------+--------+



In [24]:
users2_query = "select * from users2"
users2_df = mysql_read_action(users2_query)
users2_df.show()

+-------+------+
|user_id|  name|
+-------+------+
|      1|Daniel|
|      2|Monica|
|      3| Maria|
|      4| James|
+-------+------+



In [25]:
movierating_query = "select * from movierating"
movierating_df = mysql_read_action(movierating_query)
movierating_df.show()

+--------+-------+------+----------+
|movie_id|user_id|rating|created_at|
+--------+-------+------+----------+
|       1|      1|     3|2020-01-12|
|       1|      2|     4|2020-02-11|
|       1|      3|     2|2020-02-12|
|       1|      4|     1|2020-01-01|
|       2|      1|     5|2020-02-17|
|       2|      2|     2|2020-02-01|
|       2|      3|     2|2020-03-01|
|       3|      1|     3|2020-02-22|
|       3|      2|     4|2020-02-25|
+--------+-------+------+----------+



In [54]:
from pyspark.sql.functions import col, avg, sum as _sum, count, avg

df_filtered_35_1 = movierating_df.join(
                        users2_df, 
                        movierating_df.user_id == users2_df.user_id, 
                        how="left"
                    )\
                    .groupBy(users2_df.name)\
                    .agg(
                        count(movierating_df.movie_id).alias("movie_count")
                    ).orderBy(count(movierating_df.movie_id).desc(), users2_df.name.asc()).select("name").limit(1)

In [55]:
df_filtered_35_1.show()

+------+
|  name|
+------+
|Daniel|
+------+



In [80]:
df_filtered_35_2 = movierating_df.join(movies_df, movierating_df.movie_id==movies_df.movie_id, how="left")\
                                 .filter(movierating_df.created_at.like("2020-02%"))\
                                 .groupBy(movies_df.title)\
                                 .agg(avg(movierating_df.rating).alias("movie_rating")).orderBy(col("movie_rating").desc(),movies_df.title.asc()).select("title").limit(1)

In [81]:
df_filtered_35_2.show()

+--------+
|   title|
+--------+
|Frozen 2|
+--------+



In [82]:
df_filtered_35 = df_filtered_35_1.union(df_filtered_35_2)

In [83]:
df_filtered_35.show()

+--------+
|    name|
+--------+
|  Daniel|
|Frozen 2|
+--------+



### 36 Write an SQL query to report the distance travelled by each user. Return the result table ordered by travelled_distance in descending order, if two or more users travelled the same distance, order them by their name in ascending orde

In [91]:
# 33 quation answer

## 37. Write an SQL query to show the unique ID of each user, If a user does not have a unique ID replace just show null

In [92]:
# 32 quation answer

### 38.Write an SQL query to find the id and the name of all students who are enrolled in departments that no longer exist

In [93]:
sql_query_38 = "select id, name from Students where department_id not in (select id from Departments)"
df_sql_38 = mysql_read_action(sql_query_38)
df_sql_38.show()

+---+-------+
| id|   name|
+---+-------+
|  2|   John|
|  3|  Steve|
|  4|Jasmine|
|  7| Daiana|
+---+-------+



In [94]:
# pyspark solution

In [95]:
departments_query = "select * from departments"
departments_df = mysql_read_action(departments_query)
departments_df.show()

+---+--------------------+
| id|                name|
+---+--------------------+
|  1|Electrical Engine...|
|  7|Computer Engineering|
| 13|Business Administ...|
+---+--------------------+



In [96]:
students_query = "select * from students"
students_df = mysql_read_action(students_query)
students_df.show()

+---+--------+-------------+
| id|    name|department_id|
+---+--------+-------------+
|  1|     Bob|            7|
|  2|    John|           14|
|  3|   Steve|           74|
|  4| Jasmine|           77|
|  5|Jennifer|           13|
|  6|    Luis|            1|
|  7|  Daiana|           33|
|  8|Jonathan|            7|
| 11|Madelynn|            1|
| 23|   Alice|            1|
+---+--------+-------------+



In [107]:
df_filtered_38 = students_df.join(departments_df, students_df.department_id==departments_df.id, how="left_anti").select(students_df.id, students_df.name)

In [108]:
df_filtered_38.show()

+---+-------+
| id|   name|
+---+-------+
|  4|Jasmine|
|  7| Daiana|
|  2|   John|
|  3|  Steve|
+---+-------+



### 39 .Write an SQL query to report the number of calls and the total call duration between each pair of distinct persons (person1, person2) where person1 < person2.

In [109]:
sql_query_39 = """
SELECT LEAST(from_id,to_id) as person1,
GREATEST(from_id,to_id) as person2,
COUNT(*) as call_count,
SUM(duration) as total_duration
FROM Calls
GROUP BY person1,person2
"""
df_sql_39 = mysql_read_action(sql_query_39)
df_sql_39.show()

+-------+-------+----------+--------------+
|person1|person2|call_count|total_duration|
+-------+-------+----------+--------------+
|      1|      2|         2|            70|
|      1|      3|         1|            20|
|      3|      4|         4|           999|
+-------+-------+----------+--------------+



In [110]:
calls_query = "select * from calls"
calls_df = mysql_read_action(calls_query)
calls_df.show()

+-------+-----+--------+
|from_id|to_id|duration|
+-------+-----+--------+
|      1|    2|      59|
|      2|    1|      11|
|      1|    3|      20|
|      3|    4|     100|
|      3|    4|     200|
|      3|    4|     200|
|      4|    3|     499|
+-------+-----+--------+



In [114]:
from pyspark.sql.functions import least, greatest, count, sum as _sum
df_filtered_39 = calls_df.select(least(calls_df.from_id, calls_df.to_id).alias("person1"), greatest(calls_df.from_id, calls_df.to_id).alias("person2"), calls_df.duration)\
                         .groupBy("person1", "person2")\
                         .agg(count(col("person1")).alias("count"), _sum(calls_df.duration).alias("total_duration"))\
                         .select("person1", "person2", "count", "total_duration")

In [115]:
df_filtered_39.show()

+-------+-------+-----+--------------+
|person1|person2|count|total_duration|
+-------+-------+-----+--------------+
|      1|      2|    2|            70|
|      1|      3|    1|            20|
|      3|      4|    4|           999|
+-------+-------+-----+--------------+



In [116]:
#40 is duplicate quation

### 41.Write an SQL query to report the number of cubic feet of volume the inventory occupies in each warehouse.

In [117]:
sql_query_41 = """
select name as warehouse_name, sum(units * vol) as volume
from Warehouse w
join (select product_id, Width*Length*Height as vol
from Products2) p
on w.product_id = p.product_id
group by name
"""
df_sql_41 = mysql_read_action(sql_query_41)
df_sql_41.show()

+--------------+------+
|warehouse_name|volume|
+--------------+------+
|      LCHouse1| 12250|
|      LCHouse2| 20250|
|      LCHouse3|   800|
+--------------+------+



In [118]:
# spark solution

In [119]:
warehouse_query = "select * from warehouse"
warehouse_df = mysql_read_action(warehouse_query)
warehouse_df.show()

+--------+----------+-----+
|    name|product_id|units|
+--------+----------+-----+
|LCHouse1|         1|    1|
|LCHouse1|         2|   10|
|LCHouse1|         3|    5|
|LCHouse2|         1|    2|
|LCHouse2|         2|    2|
|LCHouse3|         4|    1|
+--------+----------+-----+



In [121]:
product2_query = "select * from products2"
products2_df = mysql_read_action(product2_query)
products2_df.show()

+----------+------------+-----+------+------+
|product_id|product_name|Width|Length|Height|
+----------+------------+-----+------+------+
|         1|       LC-TV|    5|    50|    40|
|         2| LC-KeyChain|    5|     5|     5|
|         3|    LC-Phone|    2|    10|    10|
|         4|  LC-T-Shirt|    4|    10|    20|
+----------+------------+-----+------+------+



In [125]:
df_filtered_41 = warehouse_df.join(products2_df, warehouse_df.product_id==products2_df.product_id, how="left")\
                        .select(warehouse_df.product_id, warehouse_df.name, 
                                (warehouse_df.units*(products2_df.Width*products2_df.Length*products2_df.Height)).alias("volume"))\
                        .groupBy(warehouse_df.name)\
                        .agg(_sum(col("volume")))

In [126]:
df_filtered_41.show()

+--------+-----------+
|    name|sum(volume)|
+--------+-----------+
|LCHouse2|      20250|
|LCHouse3|        800|
|LCHouse1|      12250|
+--------+-----------+



### 42.Write an SQL query to report the difference between the number of apples and oranges sold each day.Return the result table ordered by sale_date.

In [12]:
sql_query_42 = """
SELECT 
  sale_date,
  SUM(CASE WHEN fruit = 'apples' THEN sold_num ELSE 0 END) - 
  SUM(CASE WHEN fruit = 'oranges' THEN sold_num ELSE 0 END) AS diff
FROM sales
GROUP BY sale_date
ORDER BY sale_date
"""

In [13]:
df_sql_42 = mysql_read_action(sql_query_42)
df_sql_42.show()

+----------+----+
| sale_date|diff|
+----------+----+
|2020-05-01|   2|
|2020-05-02|   0|
|2020-05-03|  20|
|2020-05-04|  -1|
+----------+----+



In [14]:
# pyspark solution
sales_query = "select * from sales"
sales_df = mysql_read_action(sales_query)
sales_df.show()

+----------+-------+--------+
| sale_date|  fruit|sold_num|
+----------+-------+--------+
|2020-05-01|apples |      10|
|2020-05-01|oranges|       8|
|2020-05-02|apples |      15|
|2020-05-02|oranges|      15|
|2020-05-03|apples |      20|
|2020-05-03|oranges|       0|
|2020-05-04|apples |      15|
|2020-05-04|oranges|      16|
+----------+-------+--------+



In [18]:
from pyspark.sql import functions as F

# Compute apples and oranges sold per date
result = (
    sales_df.groupBy("sale_date")
    .agg(
        F.sum(F.when(F.col("fruit") == "apples", F.col("sold_num")).otherwise(0)).alias("apples_sold"),
        F.sum(F.when(F.col("fruit") == "oranges", F.col("sold_num")).otherwise(0)).alias("oranges_sold")
    )
    .withColumn("diff", F.col("apples_sold") - F.col("oranges_sold"))
    .orderBy("sale_date")
)

result.show()


+----------+-----------+------------+----+
| sale_date|apples_sold|oranges_sold|diff|
+----------+-----------+------------+----+
|2020-05-01|         10|           8|   2|
|2020-05-02|         15|          15|   0|
|2020-05-03|         20|           0|  20|
|2020-05-04|         15|          16|  -1|
+----------+-----------+------------+----+



### 43.Write an SQL query to report the fraction of players that logged in again on the day after the day they first logged in, rounded to 2 decimal places. In other words, you need to count the number of players that logged in for at least two consecutive days starting from their first login date, then divide that number by the total number of players.


In [19]:
sql_query_43 = """
select 
round(count(distinct a.player_id)/(select count(distinct player_id) from activity1), 2) as fraction
from(
SELECT 
  player_id, 
  event_date,
  DATEDIFF(LEAD(event_date) OVER (PARTITION BY player_id ORDER BY event_date), event_date) AS diff
FROM activity1) a where a.diff=1
"""
df_sql_43 = mysql_read_action(sql_query_43)
df_sql_43.show()

+--------+
|fraction|
+--------+
|    0.33|
+--------+



In [20]:
# spark solution

In [24]:
activity1_query = "select * from activity1"
activity1_df = mysql_read_action(activity1_query)
activity1_df.show()

+---------+---------+----------+------------+
|player_id|device_id|event_date|games_played|
+---------+---------+----------+------------+
|        1|        2|2016-03-01|           5|
|        1|        2|2016-03-02|           6|
|        2|        3|2017-06-25|           1|
|        3|        1|2016-03-02|           0|
|        3|        4|2018-07-03|           5|
+---------+---------+----------+------------+



In [52]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
total_players = activity1_df.select(col("player_id")).distinct().count()
window = Window.partitionBy("player_id").orderBy("event_date")
filtered_df_43_1 = (activity1_df.select(activity1_df.player_id, activity1_df.event_date)\
                               .withColumn("diff",
                               F.datediff(F.lead(col("event_date")).over(window), col("event_date"))).filter(col("diff")==1).select("player_id").count())/total_players

In [55]:
filtered_df_43_1

0.3333333333333333

### 44.Write an SQL query to report the managers with at least five direct reports.

In [56]:
sql_query_44 = """select 
e.name
from employee e join employee a on e.id=a.managerid group by e.name having count(distinct a.id)>=5
"""
df_sql_44 = mysql_read_action(sql_query_44)
df_sql_44.show()

+----+
|name|
+----+
|John|
+----+



In [57]:
# pyspark solution

In [58]:
employee_query = "select * from employee"
employee_df = mysql_read_action(employee_query)
employee_df.show()

+---+-----+----------+---------+
| id| name|department|managerId|
+---+-----+----------+---------+
|101| John|         A|     NULL|
|102|  Dan|         A|      101|
|103|James|         A|      101|
|104|  Amy|         A|      101|
|105| Anne|         A|      101|
|106|  Ron|         B|      101|
+---+-----+----------+---------+



In [71]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col

# Aliasing
a = employee_df.alias("a")
b = employee_df.alias("b")

# Self join: employee's managerId matches manager's id
joined_df = a.join(b, col("a.managerId") == col("b.id"))

# Select needed columns before grouping
result_df = joined_df.select(
    col("a.id").alias("employee_id"),
    col("b.id").alias("manager_id"),
    col("b.name").alias("manager_name")
)

# Group by manager_id and manager_name, count reports
grouped_df = result_df.groupBy("manager_name").agg(
    F.count("employee_id").alias("report_count")
)

grouped_df.show()


+------------+------------+
|manager_name|report_count|
+------------+------------+
|        John|           5|
+------------+------------+



### 45. Write an SQL query to report the respective department name and number of students majoring in each department for all departments in the Department table (even ones with no current students). Return the result table ordered by student_number in descending order. In case of a tie, order them by dept_name alphabetically.


In [72]:
sql_query_45 = "select d.dept_name, coalesce(count(distinct s.student_id), 0) as count_vs from student s right join department d on s.dept_id=d.dept_id group by d.dept_name order by d.dept_name asc"
df_sql_45 = mysql_read_action(sql_query_45)
df_sql_45.show()

+-----------+--------+
|  dept_name|count_vs|
+-----------+--------+
|Engineering|       2|
|        Law|       0|
|    Science|       1|
+-----------+--------+



In [73]:
# pyspark solution

In [75]:
student_query = "select * from student"
student_df = mysql_read_action(student_query)
student_df.show()

+----------+------------+------+-------+
|student_id|student_name|gender|dept_id|
+----------+------------+------+-------+
|         1|        Jack|     M|      1|
|         2|        Jane|     F|      1|
|         3|        Mark|     M|      2|
+----------+------------+------+-------+



In [76]:
department_query = "select * from department"
department_df = mysql_read_action(department_query)
department_df.show()

+-------+-----------+
|dept_id|  dept_name|
+-------+-----------+
|      1|Engineering|
|      2|    Science|
|      3|        Law|
+-------+-----------+



In [85]:
filtered_df_45 = department_df.join(student_df, department_df.dept_id==student_df.dept_id, how="left")\
                              .groupBy(department_df.dept_name).agg(F.count(student_df.student_id).alias("count"))\
                              .orderBy(col("count").desc(),department_df.dept_name.asc())

In [86]:
filtered_df_45.show()

+-----------+-----+
|  dept_name|count|
+-----------+-----+
|Engineering|    2|
|    Science|    1|
|        Law|    0|
+-----------+-----+



### 46. Write an SQL query to report the customer ids from the Customer table that bought all the products in the Product table.


In [88]:
sql_query_46 = """
select 
customer_id
from customer group by customer_id
having count(distinct product_key) = (select count(*) from product1)
"""
df_sql_46 = mysql_read_action(sql_query_46)
df_sql_46.show()

+-----------+
|customer_id|
+-----------+
|          1|
|          3|
+-----------+



In [89]:
# spark solution

In [91]:
customer_query = "select * from customer"
customer_df = mysql_read_action(customer_query)
customer_df.show()

+-----------+-----------+
|customer_id|product_key|
+-----------+-----------+
|          1|          5|
|          2|          6|
|          3|          5|
|          3|          6|
|          1|          6|
+-----------+-----------+



In [92]:
product_query = "select * from product1"
product_df = mysql_read_action(product_query)
product_df.show()

+-----------+
|product_key|
+-----------+
|          5|
|          6|
+-----------+



In [101]:
product_key_count = product_df.count()
df_filtered_46 = customer_df.groupby(col("customer_id")).agg(F.count(col("product_key")).alias("count"))\
                            .filter(col("count")==product_key_count).select("customer_id")

In [102]:
df_filtered_46.show()

+-----------+
|customer_id|
+-----------+
|          1|
|          3|
+-----------+



### 47. Write an SQL query that reports the most experienced employees in each project. In case of a tie, report all employees with the maximum number of experience years.

In [13]:
sql_query_47 = """
select 
project_id, employee_id from (
select 
p.project_id, e.employee_id, e.experience_years, rank() over (order by e.experience_years desc) as r
from EMPLOYEE1 E 
left join project p 
on e.employee_id = p.employee_id) o where o.r=1 order by o.project_id asc
"""
df_sql_47 = mysql_read_action(sql_query_47)
df_sql_47.show()

+----------+-----------+
|project_id|employee_id|
+----------+-----------+
|         1|          1|
|         1|          3|
|         2|          1|
+----------+-----------+



In [14]:
# pyspark solution

In [15]:
project_query = "select * from project"
project_df = mysql_read_action(project_query)
project_df.show()

+----------+-----------+
|project_id|employee_id|
+----------+-----------+
|         1|          1|
|         2|          1|
|         1|          2|
|         1|          3|
|         2|          4|
+----------+-----------+



In [16]:
employee1_query = "select * from employee1"
employee1_df =  mysql_read_action(employee1_query)
employee1_df.show()

+-----------+------+----------------+
|employee_id|  name|experience_years|
+-----------+------+----------------+
|          1|Khaled|               3|
|          2|   Ali|               2|
|          3|  John|               3|
|          4|   Doe|               2|
+-----------+------+----------------+



In [26]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
window = Window.orderBy(F.col("experience_years").desc())
filtered_df_47 = employee1_df.join(project_df, employee1_df.employee_id==project_df.employee_id, how="left")\
                             .withColumn("rank",F.rank().over(window))\
                             .filter(F.col("rank")==1)\
                             .select(project_df.project_id, employee1_df.employee_id)\
                             .orderBy(F.col("project_id").asc())
filtered_df_47.show()

+----------+-----------+
|project_id|employee_id|
+----------+-----------+
|         1|          1|
|         1|          3|
|         2|          1|
+----------+-----------+



### 48. Write an SQL query that reports the books that have sold less than 10 copies in the last year, excluding books that have been available for less than one month from today. Assume today is 2019-06-23.

In [29]:
sql_query_48 = """
SELECT 
  b.book_id,
  b.name,
  COALESCE(SUM(o.quantity), 0) AS total_quantity
FROM books b
LEFT JOIN orders3 o 
  ON b.book_id = o.book_id 
  AND o.dispatch_date BETWEEN DATE_SUB(DATE '2019-06-23', INTERVAL 1 YEAR)
                          AND DATE('2019-06-23')
WHERE b.available_from < DATE_SUB(DATE '2019-06-23', INTERVAL 1 MONTH)
GROUP BY b.book_id, b.name
HAVING total_quantity < 10
"""
df_sql_48 = mysql_read_action(sql_query_48)
df_sql_48.show()

+-------+------------------+--------------+
|book_id|              name|total_quantity|
+-------+------------------+--------------+
|      1|"Kalila And Demna"|             3|
|      2|      "28 Letters"|             0|
|      5|"The Hunger Games"|             0|
+-------+------------------+--------------+



In [30]:
# spark solution

In [13]:
orders3_query = "select * from orders3"
orders3_df = mysql_read_action(orders3_query)
orders3_df.show()

+--------+-------+--------+-------------+
|order_id|book_id|quantity|dispatch_date|
+--------+-------+--------+-------------+
|       1|      1|       2|   2018-07-26|
|       2|      1|       1|   2018-11-05|
|       3|      3|       8|   2019-06-11|
|       4|      4|       6|   2019-06-05|
|       5|      4|       5|   2019-06-20|
|       6|      5|       9|   2009-02-02|
|       7|      5|       8|   2010-04-13|
+--------+-------+--------+-------------+



In [14]:
books_query = "select * from books"
books_df = mysql_read_action(books_query)
books_df.show()

+-------+------------------+--------------+
|book_id|              name|available_from|
+-------+------------------+--------------+
|      1|"Kalila And Demna"|    2010-01-01|
|      2|      "28 Letters"|    2012-05-12|
|      3|      "The Hobbit"|    2019-06-10|
|      4|  "13 Reasons Why"|    2010-01-01|
|      5|"The Hunger Games"|    2008-09-21|
+-------+------------------+--------------+



In [15]:
from pyspark.sql.functions import col, sum as _sum, coalesce, lit, to_date, date_sub

# Assume books_df and orders3_df are already created DataFrames

# Set base analysis date
today = to_date(lit('2019-06-23'))

# Filter books available for more than 1 month from "today"
filtered_books = books_df.filter(
    col("available_from") < date_sub(today, 30)
)

# Filter orders within the last year
filtered_orders = orders3_df.filter(
    (col("dispatch_date") >= date_sub(today, 365)) &
    (col("dispatch_date") <= today)
)

# Left join books with filtered orders
joined_df = filtered_books.join(
    filtered_orders,
    on="book_id",
    how="left"
)

# Group by book and compute total quantity sold
result_df = joined_df.groupBy("book_id", "name").agg(
    coalesce(_sum("quantity"), lit(0)).alias("total_quantity")
).filter(col("total_quantity") < 10)

# Display result
result_df.show()

+-------+------------------+--------------+
|book_id|              name|total_quantity|
+-------+------------------+--------------+
|      1|"Kalila And Demna"|             3|
|      5|"The Hunger Games"|             0|
|      2|      "28 Letters"|             0|
+-------+------------------+--------------+



### 49. Write a SQL query to find the highest grade with its corresponding course for each student. In case of a tie, you should find the course with the smallest course_id.


In [17]:
sql_query_49 = """
select * from (
select 
student_id,
course_id,
grade,
rank() over(partition by student_id order by grade desc,course_id asc) as r
from Enrollments) o where o.r=1
"""
df_sql_49 = mysql_read_action(sql_query_49)
df_sql_49.show()

+----------+---------+-----+---+
|student_id|course_id|grade|  r|
+----------+---------+-----+---+
|         1|        2|   99|  1|
|         2|        2|   95|  1|
|         3|        3|   82|  1|
+----------+---------+-----+---+



In [18]:
# spark solution

In [19]:
enrollments_query = "select * from enrollments"
enrollments_df = mysql_read_action(enrollments_query)
enrollments_df.show()

+----------+---------+-----+
|student_id|course_id|grade|
+----------+---------+-----+
|         1|        1|   90|
|         1|        2|   99|
|         2|        2|   95|
|         2|        3|   95|
|         3|        1|   80|
|         3|        2|   75|
|         3|        3|   82|
+----------+---------+-----+



In [21]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F
window_spec = Window.partitionBy(enrollments_df.student_id).orderBy(enrollments_df.grade.desc(), enrollments_df.course_id.asc())
filtered_df_49 = enrollments_df.withColumn("rank", F.rank().over(window_spec)).filter(F.col("rank")==1)
filtered_df_49.show()

+----------+---------+-----+----+
|student_id|course_id|grade|rank|
+----------+---------+-----+----+
|         1|        2|   99|   1|
|         2|        2|   95|   1|
|         3|        3|   82|   1|
+----------+---------+-----+----+



#### 50.

In [14]:
sql_query_50 = """
SELECT group_id, player_id
FROM (
    SELECT 
        p.group_id,
        p.player_id,
        SUM(
            CASE 
                WHEN p.player_id = m.first_player THEN m.first_score
                WHEN p.player_id = m.second_player THEN m.second_score
                ELSE 0
            END
        ) AS total_score,
        ROW_NUMBER() OVER (
            PARTITION BY p.group_id 
            ORDER BY 
                SUM(
                    CASE 
                        WHEN p.player_id = m.first_player THEN m.first_score
                        WHEN p.player_id = m.second_player THEN m.second_score
                        ELSE 0
                    END
                ) DESC,
                p.player_id
        ) AS rn
    FROM Players p
    JOIN Matches m ON p.player_id = m.first_player OR p.player_id = m.second_player
    GROUP BY p.group_id, p.player_id
) ranked
WHERE rn = 1
ORDER BY group_id
"""
df_sql_50 = mysql_read_action(sql_query_50)
df_sql_50.show()

+--------+---------+
|group_id|player_id|
+--------+---------+
|       1|       15|
|       2|       35|
|       3|       40|
+--------+---------+



In [12]:
# spark solution

In [15]:
matches_query = "select * from Matches"
matches_df = mysql_read_action(matches_query)
matches_df.show()

+--------+------------+-------------+-----------+------------+
|match_id|first_player|second_player|first_score|second_score|
+--------+------------+-------------+-----------+------------+
|       1|          15|           45|          3|           0|
|       2|          30|           25|          1|           2|
|       3|          30|           15|          2|           0|
|       4|          40|           20|          5|           2|
|       5|          35|           50|          1|           1|
+--------+------------+-------------+-----------+------------+



In [16]:
players_query = "select * from players"
players_df = mysql_read_action(players_query)
players_df.show()

+---------+--------+
|player_id|group_id|
+---------+--------+
|       10|       2|
|       15|       1|
|       20|       3|
|       25|       1|
|       30|       1|
|       35|       2|
|       40|       3|
|       45|       1|
|       50|       2|
+---------+--------+



In [18]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Join Players and Matches where the player is involved
joined_df = players_df.alias("p") \
    .join(matches_df.alias("m"),
          (F.col("p.player_id") == F.col("m.first_player")) |
          (F.col("p.player_id") == F.col("m.second_player")))

# Add a column for the score for that player
scored_df = joined_df.withColumn(
    "score",
    F.when(F.col("p.player_id") == F.col("m.first_player"), F.col("m.first_score"))
     .when(F.col("p.player_id") == F.col("m.second_player"), F.col("m.second_score"))
     .otherwise(0)
)

# Sum scores per group_id and player_id
agg_df = scored_df.groupBy("p.group_id", "p.player_id") \
    .agg(F.sum("score").alias("total_score"))

# Use window to get the top scorer per group
window = Window.partitionBy("group_id").orderBy(F.desc("total_score"), F.asc("player_id"))

top_players = agg_df.withColumn("rank", F.row_number().over(window)) \
    .filter(F.col("rank") == 1) \
    .select("group_id", "player_id")

top_players.show()


+--------+---------+
|group_id|player_id|
+--------+---------+
|       1|       15|
|       2|       35|
|       3|       40|
+--------+---------+

