In [0]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("Practice").getOrCreate()

In [0]:

# Employee Data
employees = [
    (1, 'Alice Johnson', 'Engineering', 85000, '2020-01-15', 28),
    (2, 'Bob Smith', 'Engineering', 92000, '2019-03-10', 35),
    (3, 'Charlie Brown', 'Sales', 65000, '2021-06-01', 26),
    (4, 'Diana Prince', 'Engineering', 78000, '2022-02-20', 29),
    (5, 'Eve Wilson', 'Marketing', 70000, '2020-08-15', 31),
    (6, 'Frank Miller', 'Sales', 72000, '2019-11-30', 42),
    (7, 'Grace Lee', 'Engineering', 95000, '2018-05-12', 33),
    (8, 'Henry Davis', 'Marketing', 68000, '2021-09-05', 27),
    (9, 'Iris Taylor', 'Sales', 58000, '2022-01-10', 24),
    (10, 'Jack Wilson', 'Engineering', 88000, '2020-12-01', 30)
]

#e=spark.createDataFrame(employees, schema=['id', 'name', 'department', 'salary', 'start_date', 'age'])
r=e.filter(e.salary > 80000).filter(e.department == 'Engineering').orderBy(F.desc(e.salary)).show()

e.createOrReplaceTempView("employees")
result=spark.sql("""
    SELECT * 
    FROM employees 
    WHERE salary > 80000 
      AND department = 'Engineering'
    ORDER BY salary DESC
""")

In [0]:
t = e.groupBy("department") \
    .agg(F.avg('salary').alias('avg_salary'),      # ← No brackets, just commas
         F.count('*').alias('emp_count'),          # ← Also changed to count('*')
         F.min('salary').alias('min_salary'),
         F.max('salary').alias('max_salary')) \
    .filter(F.col('emp_count') > 2) \
    .select('department', 'avg_salary', 'emp_count', 'min_salary', 'max_salary')
t.show()

t2=spark.sql("""
    SELECT department, AVG(salary) AS avg_salary, COUNT(*) AS emp_count, MIN(salary) AS min_salary, MAX(salary) AS max_salary
    FROM employees 
    GROUP BY department
    HAVING COUNT(*)>2
""")

In [0]:
# Department Data  
departments = [
    ('Engineering', 'Tech', 'John Manager', 50),
    ('Sales', 'Business', 'Sarah Director', 25), 
    ('Marketing', 'Business', 'Mike Lead', 15)
]
# d=spark.createDataFrame(departments, schema=['dept_name', 'category', 'manager', 'team_size'])
# d.createOrReplaceTempView("departments")

t3= e.join(d, e.department==d.dept_name)\
    .filter(d.category=='Business')\
    .withColumn('salary_level', F.when(F.col('salary')>80000, 'High').when(F.col('salary')>=60000, 'Medium').otherwise('Low'))\
    .withColumn('team_size_cat', F.when(F.col('team_size')>30, 'Large').otherwise('Small'))\
    .select("name", "salary", "manager", "salary_level", "team_size_cat")
t3.show()

s3 = """
SELECT e.name AS name, e.salary AS salary, d.manager AS manager, 
CASE WHEN e.salary>80000 THEN "High",
     WHEN e.salary>=60000 THEN "Medium"
     ELSE "Low" END AS salary_level,
CASE WHEN d.team_size>30 THEN "Large"
     ELSE 'Small' END AS team_size_cat,
FROM employees e
JOIN departments d ON e.department=d.dept_name
WHERE d.category='Business'
"""
ts3= spark.sql(s3)


In [0]:
w4 = Window.partitionBy('department').orderBy(F.desc('salary'))
t4 = e.withColumn('rank', F.rank().over(w4))\
    .filter(F.col('rank')<=2)\
        .select('name', 'department', 'salary', 'rank')
#t4.show()

s4 = """
WITH ranked_employees AS (
    SELECT name, 
           department,
           salary,
           ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) as rank
    FROM employees
)
SELECT name, department, salary, rank
FROM ranked_employees  
WHERE rank <= 2
ORDER BY department, rank;
"""
ts4=spark.sql(s4)
ts4.show()

In [0]:
sales_data = [
    (1, 'Alice Johnson', '2024-01-15', 15000, 'Software'),
    (2, 'Bob Smith', '2024-01-16', 22000, 'Consulting'), 
    (3, 'Charlie Brown', '2024-01-15', 8000, 'Software'),
    (4, 'Alice Johnson', '2024-01-20', 18000, 'Consulting'),
    (5, 'Diana Prince', '2024-01-18', 12000, 'Software'),
    (6, 'Charlie Brown', '2024-01-22', 25000, 'Consulting')
]

# s = spark.createDataFrame(sales_data, schema=['sale_id', 'salesperson', 'date', 'amount', 'product_type'])
# s.createOrReplaceTempView('sales')
tw5 = s.groupBy('salesperson', 'product_type') \
    .agg(F.sum('amount').alias('s_total')) \
    .withColumn('rank', F.row_number().over(Window.partitionBy('salesperson').orderBy(F.desc('s_total')))) \
    .filter(F.col('rank')==1) \
    .select(F.col('salesperson').alias('sperson'), 'product_type', 's_total', 'rank')
#tw5.show()

tw55= s.groupBy('salesperson') \
    .agg(F.sum('amount').alias('total'),
         F.count('*').alias('sale_num'),
         F.avg("amount").alias('avg_sale')) \
    .filter(F.col('total')>20000) \
    .select(F.col('salesperson'), F.col('total'), F.col('sale_num'), F.col('avg_sale'))
#tw55.show()

t5 = tw55.join(tw5, tw55.salesperson==tw5.sperson) \
    .filter(F.col('total')>20000) \
    .select(F.col('salesperson'), F.col('total'), F.col('sale_num'), F.col('avg_sale'), F.col('product_type').alias('best_product'))
t5.show()        

s5 = """
WITH tw5 AS (
    SELECT salesperson AS sperson, 
           product_type,
           SUM(amount) AS s_total,
           ROW_NUMBER() OVER (PARTITION BY salesperson ORDER BY SUM(amount) DESC) AS rank
    FROM sales
    GROUP BY salesperson, product_type
    ),

tw55 AS (
    SELECT salesperson,
           SUM(amount) AS total,
           COUNT(*) AS sale_num,
           AVG(amount) AS avg_sale
    FROM sales
    GROUP BY salesperson
    )

SELECT salesperson, total, sale_num, avg_sale, product_type AS best_product
FROM tw55
JOIN tw5 ON tw55.salesperson = tw5.sperson
WHERE total > 20000 AND rank=1
"""
ts5=spark.sql(s5)
ts5.show()