In [1]:
'''
Question 1

Given sample table 1
+------+----+---+
|Emp_Id|Dept|Sal|
+------+----+---+
|    11|SAMA|100|
|    22|  HR|120|
|    33|SAMA|110|
|    44|SAMA| 90|
|    55|  HR| 80|
|    66|SAMA|100|
+------+----+---+

Table 2
+------+--------+
|Emp_Id|Emp_Name|
+------+--------+
|    11|   Mr. A|
|    22|   Mr. B|
|    33|   Mr. C|
|    44|   Mr. D|
|    55|   Mr. E|
|    66|   Mr. F|
+------+--------+

Find second highest salry for each department
'''

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession \
    .builder \
    .appName("CreateDataFrames") \
    .config("spark.pyspark.python", "python") \
    .master("local[2]") \
    .getOrCreate()

# Data for df1
data1 = [
    (11, "SAMA", 100),
    (22, "HR", 120),
    (33, "SAMA", 110),
    (44, "SAMA", 90),
    (55, "HR", 80),
    (66, "SAMA", 100)
]

# Data for df2
data2 = [
    (11, "Mr. A"),
    (22, "Mr. B"),
    (33, "Mr. C"),
    (44, "Mr. D"),
    (55, "Mr. E"),
    (66, "Mr. F")
]

# Column names
columns1 = ["Emp_Id", "Dept", "Sal"]
columns2 = ["Emp_Id", "Emp_Name"]

# Creating DataFrames
df1 = spark.createDataFrame(data1, columns1)
df2 = spark.createDataFrame(data2, columns2)

# Show the DataFrames
df1.show()
df2.show()

+------+----+---+
|Emp_Id|Dept|Sal|
+------+----+---+
|    11|SAMA|100|
|    22|  HR|120|
|    33|SAMA|110|
|    44|SAMA| 90|
|    55|  HR| 80|
|    66|SAMA|100|
+------+----+---+

+------+--------+
|Emp_Id|Emp_Name|
+------+--------+
|    11|   Mr. A|
|    22|   Mr. B|
|    33|   Mr. C|
|    44|   Mr. D|
|    55|   Mr. E|
|    66|   Mr. F|
+------+--------+



In [2]:
'''
Solution
'''
from pyspark.sql.window import Window
from pyspark.sql import functions as f
join_condition = (df1.Emp_Id == df2.Emp_Id)
Window_fun=Window.partitionBy(df1.Dept).orderBy(df1.Sal.desc())


ordered_df=df1.join(df2,join_condition,"inner") \
    .withColumn("row_ordering",f.dense_rank().over(Window_fun)) \
    .drop(df2.Emp_Id)

ordered_df \
    .select("Dept","Emp_Name","Sal") \
    .where(ordered_df.row_ordering == 2) \
    .orderBy(ordered_df.Dept.desc()) \
    .show()

+----+--------+---+
|Dept|Emp_Name|Sal|
+----+--------+---+
|SAMA|   Mr. A|100|
|SAMA|   Mr. F|100|
|  HR|   Mr. E| 80|
+----+--------+---+



In [3]:
from pyspark.sql import types as t
def sal_emp_multiplier(sal,emp):
    return float(sal)*float(emp)

mutiplier_udf = f.udf(sal_emp_multiplier, t.FloatType())

ordered_df \
    .withColumn("mutiplied sal with emp id ",mutiplier_udf(ordered_df.Sal,ordered_df.Emp_Id)) \
    .show()

+------+----+---+--------+------------+--------------------------+
|Emp_Id|Dept|Sal|Emp_Name|row_ordering|mutiplied sal with emp id |
+------+----+---+--------+------------+--------------------------+
|    22|  HR|120|   Mr. B|           1|                    2640.0|
|    55|  HR| 80|   Mr. E|           2|                    4400.0|
|    33|SAMA|110|   Mr. C|           1|                    3630.0|
|    11|SAMA|100|   Mr. A|           2|                    1100.0|
|    66|SAMA|100|   Mr. F|           2|                    6600.0|
|    44|SAMA| 90|   Mr. D|           3|                    3960.0|
+------+----+---+--------+------------+--------------------------+



In [19]:
'''
Write a solution to find the IDs of the invalid tweets. 
The tweet is invalid if the number of characters used in the content of the tweet is strictly greater than 15.

Input: 
Tweets table:
+----------+----------------------------------+
| tweet_id | content                          |
+----------+----------------------------------+
| 1        | Vote for Biden                   |
| 2        | Let us make America great again! |
+----------+----------------------------------+
Output: 
+----------+
| tweet_id |
+----------+
| 2        |
+----------+
Explanation: 
Tweet 1 has length = 14. It is a valid tweet.
Tweet 2 has length = 32. It is an invalid tweet.

'''

from pyspark.sql import SparkSession
from pyspark.sql import functions as f

spark = SparkSession \
    .builder \
    .appName("Tweet Length checker") \
    .master("local[2]") \
    .getOrCreate()

data = [(1,'Vote for Biden'),(2,'Let us make America great again!')]
columns =['tweet_id','content']


Tweets_df= spark.createDataFrame(data, columns)

Tweets_df.show(truncate=False)

+--------+--------------------------------+
|tweet_id|content                         |
+--------+--------------------------------+
|1       |Vote for Biden                  |
|2       |Let us make America great again!|
+--------+--------------------------------+



In [16]:
'''
Solution
'''

Tweets_df \
    .where(f.length(Tweets_df.content)>15) \
    .select(Tweets_df.tweet_id) \
    .distinct() \
    .show()


+--------+
|tweet_id|
+--------+
|       2|
+--------+



In [21]:
'''
Table: Queries
+-------------+---------+
| Column Name | Type    |
+-------------+---------+
| query_name  | varchar |
| result      | varchar |
| position    | int     |
| rating      | int     |
+-------------+---------+
This table may have duplicate rows.
This table contains information collected from some queries on a database.
The position column has a value from 1 to 500.
The rating column has a value from 1 to 5. Query with rating less than 3 is a poor query.
 

We define query quality as:

The average of the ratio between query rating and its position.

We also define poor query percentage as:

The percentage of all queries with rating less than 3.

Write a solution to find each query_name, the quality and poor_query_percentage.

Both quality and poor_query_percentage should be rounded to 2 decimal places.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
Queries table:
+------------+-------------------+----------+--------+
| query_name | result            | position | rating |
+------------+-------------------+----------+--------+
| Dog        | Golden Retriever  | 1        | 5      |
| Dog        | German Shepherd   | 2        | 5      |
| Dog        | Mule              | 200      | 1      |
| Cat        | Shirazi           | 5        | 2      |
| Cat        | Siamese           | 3        | 3      |
| Cat        | Sphynx            | 7        | 4      |
+------------+-------------------+----------+--------+
Output: 
+------------+---------+-----------------------+
| query_name | quality | poor_query_percentage |
+------------+---------+-----------------------+
| Dog        | 2.50    | 33.33                 |
| Cat        | 0.66    | 33.33                 |
+------------+---------+-----------------------+
Explanation: 
Dog queries quality is ((5 / 1) + (5 / 2) + (1 / 200)) / 3 = 2.50
Dog queries poor_ query_percentage is (1 / 3) * 100 = 33.33

Cat queries quality equals ((2 / 5) + (3 / 3) + (4 / 7)) / 3 = 0.66
Cat queries poor_ query_percentage is (1 / 3) * 100 = 33.33

'''

from pyspark.sql import SparkSession
from pyspark.sql import functions as f

spark = SparkSession \
    .builder \
    .appName("Quality checker") \
    .master("local[1]") \
    .getOrCreate()

data = [
    ("Dog", "Golden Retriever", 1, 5),
    ("Dog", "German Shepherd", 2, 5),
    ("Dog", "Mule", 200, 1),
    ("Cat", "Shirazi", 5, 2),
    ("Cat", "Siamese", 3, 3),
    ("Cat", "Sphynx", 7, 4)
]

# Define the schema (column names)
columns = ["query_name", "result", "position", "rating"]

Queries_df = spark.createDataFrame(data,columns)

Queries_df.show()

+----------+----------------+--------+------+
|query_name|          result|position|rating|
+----------+----------------+--------+------+
|       Dog|Golden Retriever|       1|     5|
|       Dog| German Shepherd|       2|     5|
|       Dog|            Mule|     200|     1|
|       Cat|         Shirazi|       5|     2|
|       Cat|         Siamese|       3|     3|
|       Cat|          Sphynx|       7|     4|
+----------+----------------+--------+------+



In [45]:
'''
Solution
'''

agg_quality = f.round(f.avg(Queries_df.rating / Queries_df.position ),2).alias('quality')
agg_poor_query_percentage = f.round(
    (f.sum(f.when(Queries_df.rating < 3, 1).otherwise(0)) / f.count(f.lit(1))) * 100, 2
).alias('poor_query_percentage') #use count("*") instead of f.lit(1)

'''
df_result = Queries_df.groupBy("query_name").agg(
    round(avg(col("rating") / col("position")), 2).alias("quality"),
    round((sum(when(col("rating") < 3, 1).otherwise(0)) / count("*")) * 100, 2).alias("poor_query_percentage")
).show()
'''
Queries_df \
    .groupBy(Queries_df.query_name) \
    .agg(agg_quality,agg_poor_query_percentage) \
    .where(Queries_df.query_name.isNotNull()) \
    .show()

+----------+-------+---------------------+
|query_name|quality|poor_query_percentage|
+----------+-------+---------------------+
|       Dog|    2.5|                33.33|
|       Cat|   0.66|                33.33|
+----------+-------+---------------------+



In [32]:
'''
Table: Activity

+--------------+---------+
| Column Name  | Type    |
+--------------+---------+
| player_id    | int     |
| device_id    | int     |
| event_date   | date    |
| games_played | int     |
+--------------+---------+
(player_id, event_date) is the primary key (combination of columns with unique values) of this table.
This table shows the activity of players of some games.
Each row is a record of a player who logged in and played a number of games (possibly 0) before logging out on someday using some device.
 

Write a solution to report the fraction of players that logged in again on the day after the day they first logged in, rounded to 2 decimal places. In other words, you need to count the number of players that logged in for at least two consecutive days starting from their first login date, then divide that number by the total number of players.

The result format is in the following example.

 

Example 1:

Input: 
Activity table:
+-----------+-----------+------------+--------------+
| player_id | device_id | event_date | games_played |
+-----------+-----------+------------+--------------+
| 1         | 2         | 2016-03-01 | 5            |
| 1         | 2         | 2016-03-02 | 6            |
| 2         | 3         | 2017-06-25 | 1            |
| 3         | 1         | 2016-03-02 | 0            |
| 3         | 4         | 2018-07-03 | 5            |
+-----------+-----------+------------+--------------+
Output: 
+-----------+
| fraction  |
+-----------+
| 0.33      |
+-----------+
Explanation: 
Only the player with id 1 logged back in after the first day he had logged in so the answer is 1/3 = 0.33
'''

from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

spark = SparkSession \
    .builder \
    .appName("Second Time Visiter") \
    .master("local[2]") \
    .getOrCreate()

schema = StructType([
    StructField("player_id", IntegerType(), True),
    StructField("device_id", IntegerType(), True),
    StructField("event_date", StringType(), True),
    StructField("games_played", IntegerType(), True)
])

# Data
data = [
    (1, 2, "2016-03-01", 5),
    (1, 2, "2016-03-02", 6),
    (2, 3, "2017-06-25", 1),
    (3, 1, "2016-03-02", 0),
    (3, 4, "2018-07-03", 5)
]

# Create DataFrame
Activity_df = spark.createDataFrame(data, schema)

Activity_df.show()

+---------+---------+----------+------------+
|player_id|device_id|event_date|games_played|
+---------+---------+----------+------------+
|        1|        2|2016-03-01|           5|
|        1|        2|2016-03-02|           6|
|        2|        3|2017-06-25|           1|
|        3|        1|2016-03-02|           0|
|        3|        4|2018-07-03|           5|
+---------+---------+----------+------------+



In [33]:
Activity_df = Activity_df.withColumn("event_date", f.to_date(Activity_df.event_date, "yyyy-MM-dd"))

window_lag_fn = Window.partitionBy(Activity_df.player_id).orderBy(Activity_df.event_date.asc())

Activity_ordered_df=Activity_df \
    .withColumn("lead_date",f.lead(Activity_df.event_date).over(window_lag_fn)) \
    .withColumn("event_number",f.row_number().over(window_lag_fn)) \
    .select("player_id","event_date","lead_date","event_number")

In [36]:
count_distinct_players = f.countDistinct(Activity_ordered_df.player_id).alias("distinct_players")
count_distinct_2nd_login_players = f.countDistinct(f.when( (Activity_ordered_df.event_date == Activity_ordered_df.lead_date-1),
                                                  Activity_ordered_df.player_id)).alias("distinct_2nd_login")

agg_data_df =Activity_ordered_df \
    .where(Activity_ordered_df.event_number == 1) \
    .agg(count_distinct_players,count_distinct_2nd_login_players)

agg_data_df \
    .withColumn("fraction",f.round(agg_data_df.distinct_2nd_login/agg_data_df.distinct_players,2)) \
    .show()

+----------------+------------------+--------+
|distinct_players|distinct_2nd_login|fraction|
+----------------+------------------+--------+
|               3|                 1|    0.33|
+----------------+------------------+--------+



In [3]:
'''
Table: Sales

+-------------+-------+
| Column Name | Type  |
+-------------+-------+
| sale_id     | int   |
| product_id  | int   |
| year        | int   |
| quantity    | int   |
| price       | int   |
+-------------+-------+
(sale_id, year) is the primary key (combination of columns with unique values) of this table.
product_id is a foreign key (reference column) to Product table.
Each row of this table shows a sale on the product product_id in a certain year.
Note that the price is per unit.
 

Table: Product

+--------------+---------+
| Column Name  | Type    |
+--------------+---------+
| product_id   | int     |
| product_name | varchar |
+--------------+---------+
product_id is the primary key (column with unique values) of this table.
Each row of this table indicates the product name of each product.

Write a solution to select the product id, year, quantity, and price for the first year of every product sold.
Return the resulting table in any order.
The result format is in the following example.

Example 1:

Input: 
Sales table:
+---------+------------+------+----------+-------+
| sale_id | product_id | year | quantity | price |
+---------+------------+------+----------+-------+ 
| 1       | 100        | 2008 | 10       | 5000  |
| 2       | 100        | 2009 | 12       | 5000  |
| 7       | 200        | 2011 | 15       | 9000  |
+---------+------------+------+----------+-------+
Product table:
+------------+--------------+
| product_id | product_name |
+------------+--------------+
| 100        | Nokia        |
| 200        | Apple        |
| 300        | Samsung      |
+------------+--------------+
Output: 
+------------+------------+----------+-------+
| product_id | first_year | quantity | price |
+------------+------------+----------+-------+ 
| 100        | 2008       | 10       | 5000  |
| 200        | 2011       | 15       | 9000  |
+------------+------------+----------+-------+
'''

from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

spark = SparkSession \
    .builder \
    .appName("First year data selector") \
    .master("local[2]") \
    .getOrCreate()

sales_data = [
    (1, 100, 2008, 10, 5000),
    (2, 100, 2009, 12, 5000),
    (7, 200, 2011, 15, 9000)
]
sales_columns = ["sale_id", "product_id", "year", "quantity", "price"]
sales_df = spark.createDataFrame(sales_data, sales_columns)
# Show Sales DataFrame
sales_df.show()


product_data = [
    (100, "Nokia"),
    (200, "Apple"),
    (300, "Samsung")
]
product_columns = ["product_id", "product_name"]
product_df = spark.createDataFrame(product_data, product_columns)
# Show Product DataFrame
product_df.show()

+-------+----------+----+--------+-----+
|sale_id|product_id|year|quantity|price|
+-------+----------+----+--------+-----+
|      1|       100|2008|      10| 5000|
|      2|       100|2009|      12| 5000|
|      7|       200|2011|      15| 9000|
+-------+----------+----+--------+-----+

+----------+------------+
|product_id|product_name|
+----------+------------+
|       100|       Nokia|
|       200|       Apple|
|       300|     Samsung|
+----------+------------+



In [5]:
'''
Solution
'''
window_expr = Window.partitionBy(sales_df.product_id).orderBy(sales_df.year.asc())
order_sales_df = sales_df \
    .withColumn("ordering",f.dense_rank().over(window_expr)) \
    .withColumnRenamed("year","first_year")

order_sales_df \
    .where(order_sales_df.ordering == 1) \
    .select(order_sales_df.product_id,order_sales_df.first_year,order_sales_df.quantity,order_sales_df.price) \
    .show()

+----------+----------+--------+-----+
|product_id|first_year|quantity|price|
+----------+----------+--------+-----+
|       100|      2008|      10| 5000|
|       200|      2011|      15| 9000|
+----------+----------+--------+-----+



In [6]:
'''
Table: Employees

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| id            | int     |
| name          | varchar |
+---------------+---------+
id is the primary key (column with unique values) for this table.
Each row of this table contains the id and the name of an employee in a company.
 

Table: EmployeeUNI

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| id            | int     |
| unique_id     | int     |
+---------------+---------+
(id, unique_id) is the primary key (combination of columns with unique values) for this table.
Each row of this table contains the id and the corresponding unique id of an employee in the company.
 

Write a solution to show the unique ID of each user, If a user does not have a unique ID replace just show null.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
Employees table:
+----+----------+
| id | name     |
+----+----------+
| 1  | Alice    |
| 7  | Bob      |
| 11 | Meir     |
| 90 | Winston  |
| 3  | Jonathan |
+----+----------+
EmployeeUNI table:
+----+-----------+
| id | unique_id |
+----+-----------+
| 3  | 1         |
| 11 | 2         |
| 90 | 3         |
+----+-----------+
Output: 
+-----------+----------+
| unique_id | name     |
+-----------+----------+
| null      | Alice    |
| null      | Bob      |
| 2         | Meir     |
| 3         | Winston  |
| 1         | Jonathan |
+-----------+----------+
Explanation: 
Alice and Bob do not have a unique ID, We will show null instead.
The unique ID of Meir is 2.
The unique ID of Winston is 3.
The unique ID of Jonathan is 1.

'''

from pyspark.sql import SparkSession
from pyspark.sql import functions as f

spark = SparkSession \
        .builder \
        .appName("Unique ID finder") \
        .master("local[2]") \
        .getOrCreate()

data_1 =[(1,"Alice"),(7,"Bob"),(11,"Meir"),(90,"Winston"),(3,"Jonathan")]
columns_1 =["id","name"]

Employees_df = spark.createDataFrame(data_1,columns_1)
Employees_df.show()


data_2 =[(3,1),(11,2),(90,3)]
columns_2 =["id","unique_id"]

EmployeeUNI_df = spark.createDataFrame(data_2,columns_2)
EmployeeUNI_df.show()

+---+--------+
| id|    name|
+---+--------+
|  1|   Alice|
|  7|     Bob|
| 11|    Meir|
| 90| Winston|
|  3|Jonathan|
+---+--------+

+---+---------+
| id|unique_id|
+---+---------+
|  3|        1|
| 11|        2|
| 90|        3|
+---+---------+



In [7]:
Employees_df \
    .join(EmployeeUNI_df,Employees_df.id == EmployeeUNI_df.id,"left") \
    .select(EmployeeUNI_df.unique_id,Employees_df.name) \
    .show()

+---------+--------+
|unique_id|    name|
+---------+--------+
|     NULL|     Bob|
|     NULL|   Alice|
|        1|Jonathan|
|        2|    Meir|
|        3| Winston|
+---------+--------+



In [8]:
'''
Table: Sales

+-------------+-------+
| Column Name | Type  |
+-------------+-------+
| sale_id     | int   |
| product_id  | int   |
| year        | int   |
| quantity    | int   |
| price       | int   |
+-------------+-------+
(sale_id, year) is the primary key (combination of columns with unique values) of this table.
product_id is a foreign key (reference column) to Product table.
Each row of this table shows a sale on the product product_id in a certain year.
Note that the price is per unit.
 

Table: Product

+--------------+---------+
| Column Name  | Type    |
+--------------+---------+
| product_id   | int     |
| product_name | varchar |
+--------------+---------+
product_id is the primary key (column with unique values) of this table.
Each row of this table indicates the product name of each product.
 

Write a solution to report the product_name, year, and price for each sale_id in the Sales table.

Return the resulting table in any order.

The result format is in the following example.

 

Example 1:

Input: 
Sales table:
+---------+------------+------+----------+-------+
| sale_id | product_id | year | quantity | price |
+---------+------------+------+----------+-------+ 
| 1       | 100        | 2008 | 10       | 5000  |
| 2       | 100        | 2009 | 12       | 5000  |
| 7       | 200        | 2011 | 15       | 9000  |
+---------+------------+------+----------+-------+
Product table:
+------------+--------------+
| product_id | product_name |
+------------+--------------+
| 100        | Nokia        |
| 200        | Apple        |
| 300        | Samsung      |
+------------+--------------+
Output: 
+--------------+-------+-------+
| product_name | year  | price |
+--------------+-------+-------+
| Nokia        | 2008  | 5000  |
| Nokia        | 2009  | 5000  |
| Apple        | 2011  | 9000  |
+--------------+-------+-------+
Explanation: 
From sale_id = 1, we can conclude that Nokia was sold for 5000 in the year 2008.
From sale_id = 2, we can conclude that Nokia was sold for 5000 in the year 2009.
From sale_id = 7, we can conclude that Apple was sold for 9000 in the year 2011.
'''

from pyspark.sql import SparkSession
from pyspark.sql import Row

# Initialize a Spark session
spark = SparkSession.builder.appName("SalesAndProduct").getOrCreate()

# Create the Sales DataFrame
sales_data = [
    Row(sale_id=1, product_id=100, year=2008, quantity=10, price=5000),
    Row(sale_id=2, product_id=100, year=2009, quantity=12, price=5000),
    Row(sale_id=7, product_id=200, year=2011, quantity=15, price=9000)
]

sales_df = spark.createDataFrame(sales_data)

# Create the Product DataFrame
product_data = [
    Row(product_id=100, product_name='Nokia'),
    Row(product_id=200, product_name='Apple'),
    Row(product_id=300, product_name='Samsung')
]

product_df = spark.createDataFrame(product_data)

# Show the data in both DataFrames
sales_df.show()
product_df.show()


+-------+----------+----+--------+-----+
|sale_id|product_id|year|quantity|price|
+-------+----------+----+--------+-----+
|      1|       100|2008|      10| 5000|
|      2|       100|2009|      12| 5000|
|      7|       200|2011|      15| 9000|
+-------+----------+----+--------+-----+

+----------+------------+
|product_id|product_name|
+----------+------------+
|       100|       Nokia|
|       200|       Apple|
|       300|     Samsung|
+----------+------------+



In [11]:
"""
+--------------+-------+-------+
| product_name | year  | price |
+--------------+-------+-------+
| Nokia        | 2008  | 5000  |
| Nokia        | 2009  | 5000  |
| Apple        | 2011  | 9000  |
+--------------+-------+-------+
"""

sales_df \
    .join(product_df,sales_df.product_id == product_df.product_id,"left") \
    .select(product_df.product_name,sales_df.year,sales_df.price) \
    .show()

+------------+----+-----+
|product_name|year|price|
+------------+----+-----+
|       Nokia|2008| 5000|
|       Nokia|2009| 5000|
|       Apple|2011| 9000|
+------------+----+-----+



In [12]:
'''
Table: Logs

+-------------+---------+
| Column Name | Type    |
+-------------+---------+
| id          | int     |
| num         | varchar |
+-------------+---------+
In SQL, id is the primary key for this table.
id is an autoincrement column starting from 1.
 

Find all numbers that appear at least three times consecutively.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
Logs table:
+----+-----+
| id | num |
+----+-----+
| 1  | 1   |
| 2  | 1   |
| 3  | 1   |
| 4  | 2   |
| 5  | 1   |
| 6  | 2   |
| 7  | 2   |
+----+-----+
Output: 
+-----------------+
| ConsecutiveNums |
+-----------------+
| 1               |
+-----------------+
Explanation: 1 is the only number that appears consecutively for at least three times.
'''

from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder.appName("SimpleTable").getOrCreate()

# Create the DataFrame for the given table without using Row class
data = [
    {'id': 1, 'num': 1},
    {'id': 2, 'num': 1},
    {'id': 3, 'num': 1},
    {'id': 4, 'num': 2},
    {'id': 5, 'num': 1},
    {'id': 6, 'num': 2},
    {'id': 7, 'num': 2}
]

df = spark.createDataFrame(data)

# Show the DataFrame
df.show()


+---+---+
| id|num|
+---+---+
|  1|  1|
|  2|  1|
|  3|  1|
|  4|  2|
|  5|  1|
|  6|  2|
|  7|  2|
+---+---+



In [25]:
from pyspark.sql import functions as f
from pyspark.sql.window import Window

window_spec = Window.orderBy(df.id.asc())
ordered_df = df \
    .withColumn("lagged_one",f.lag(df.num,1).over(window_spec)) \
    .withColumn("lagged_two",f.lag(df.num,2).over(window_spec))

ordered_df \
    .where((ordered_df.lagged_one == ordered_df.lagged_two) & (ordered_df.lagged_one == ordered_df.num)) \
    .withColumnRenamed("num","ConsecutiveNums") \
    .select("ConsecutiveNums") \
    .show()

+---------------+
|ConsecutiveNums|
+---------------+
|              1|
+---------------+



In [3]:
'''
Table: Employee

+-------------+---------+
| Column Name | Type    |
+-------------+---------+
| id          | int     |
| name        | varchar |
| department  | varchar |
| managerId   | int     |
+-------------+---------+
id is the primary key (column with unique values) for this table.
Each row of this table indicates the name of an employee, their department, and the id of their manager.
If managerId is null, then the employee does not have a manager.
No employee will be the manager of themself.
 

Write a solution to find managers with at least five direct reports.
Return the result table in any order.
The result format is in the following example.

 
Example 1:

Input: 
Employee table:
+-----+-------+------------+-----------+
| id  | name  | department | managerId |
+-----+-------+------------+-----------+
| 101 | John  | A          | null      |
| 102 | Dan   | A          | 101       |
| 103 | James | A          | 101       |
| 104 | Amy   | A          | 101       |
| 105 | Anne  | A          | 101       |
| 106 | Ron   | B          | 101       |
+-----+-------+------------+-----------+
Output: 
+------+
| name |
+------+
| John |
+------+
'''

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Create Spark session
spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()

# Define schema for the DataFrame
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("department", StringType(), True),
    StructField("managerId", IntegerType(), True)
])

# Define the data
data = [
    (101, 'John', 'A', None),
    (102, 'Dan', 'A', 101),
    (103, 'James', 'A', 101),
    (104, 'Amy', 'A', 101),
    (105, 'Anne', 'A', 101),
    (106, 'Ron', 'B', 101)
]

# Create DataFrame
Employee_df = spark.createDataFrame(data, schema=schema)

# Show the DataFrame
Employee_df.show()


+---+-----+----------+---------+
| id| name|department|managerId|
+---+-----+----------+---------+
|101| John|         A|     NULL|
|102|  Dan|         A|      101|
|103|James|         A|      101|
|104|  Amy|         A|      101|
|105| Anne|         A|      101|
|106|  Ron|         B|      101|
+---+-----+----------+---------+



In [12]:
from pyspark.sql import functions as f

agg_Employee_df = Employee_df \
                    .groupBy(Employee_df.managerId) \
                    .agg(f.count("*").alias("tcount")) \
                    .withColumnRenamed("managerId","mId")

agg_Employee_df.show()

result_df = agg_Employee_df.join(Employee_df, Employee_df.id == agg_Employee_df.mId, "inner") \
            .drop(Employee_df.managerId) \
            .where (agg_Employee_df.tcount>= 5)

result_df \
    .select(result_df.name) \
    .show()

+----+------+
| mId|tcount|
+----+------+
|NULL|     1|
| 101|     5|
+----+------+

+----+
|name|
+----+
|John|
+----+

