In [None]:
import datetime

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat, count, desc, lit, max, rank, when
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType
from pyspark.sql.window import Window


In [None]:
spark = SparkSession.builder.appName("Practice").getOrCreate()


In [None]:
worker_schema = StructType([
    StructField("worker_id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("salary", IntegerType(), True),
    StructField("joining_date", DateType(), True),
    StructField("department", StringType(), True)
])

worker_data = [
    (1, "Monika", "Arora", 100000, datetime.date(2014, 2, 20), "HR"),
    (2, "Niharika", "Verma", 80000, datetime.date(2014, 6, 11), "Admin"),
    (3, "Vishal", "Singhal", 300000, datetime.date(2014, 2, 20), "HR"),
    (4, "Amitah", "Singh", 500000, datetime.date(2014, 2, 20), "Admin"),
    (5, "Vivek", "Bhati", 500000, datetime.date(2014, 6, 11), "Admin"),
    (6, "Vipul", "Diwan", 200000, datetime.date(2014, 6, 11), "Account"),
    (7, "Satish", "Kumar", 75000, datetime.date(2014, 1, 20), "Account"),
    (8, "Geetika", "Chauhan", 90000, datetime.date(2014, 4, 11), "Admin"),
    (9, "Agepi", "Argon", 90000, datetime.date(2015, 4, 10), "Admin"),
    (10, "Moe", "Acharya", 65000, datetime.date(2015, 4, 11), "HR"),
    (11, "Nayah", "Laghari", 75000, datetime.date(2014, 3, 20), "Account"),
    (12, "Jai", "Patel", 85000, datetime.date(2014, 3, 21), "HR")
]

worker_columns = ["worker_id", "first_name", "last_name", "salary", "joining_date", "department"]
worker_df = spark.createDataFrame(data=worker_data, schema=worker_schema)

In [None]:
# 1: Full Name and Highest Salary
worker_df = worker_df.withColumn("full_name", concat(worker_df.first_name, lit(" "), worker_df.last_name))
max_salary = worker_df.agg({"salary": "max"}).collect()[0][0]
worker_df.filter(worker_df.salary == max_salary).select("full_name", "salary").show()

In [None]:
# 2:  Highest salary in each department

# max_salary_df = worker_df.groupBy("department").agg(max("salary").alias("max_salary"))
# max_salary_df.join(worker_df.alias("w"), (max_salary_df.department == col("w.department")) & (max_salary_df.max_salary == col("w.salary")), "inner").select("w.department", "w.full_name", "w.salary").show()

# OR

window_spec = Window.partitionBy("department").orderBy(desc("salary"))
ranked_df = worker_df.withColumn("rank", rank().over(window_spec))
ranked_df.filter(col("rank") == 1).select("department", "full_name", "salary").show()


In [None]:
# 3: Last Five Records of Dataset
# worker_df.show(5)
worker_df.tail(5)

In [None]:
marketing_campaign_data = [
    ('10', '2019-01-01', '101', '3', '55'),
    ('10', '2019-01-02', '119', '5', '29'),
    ('10', '2019-03-31', '111', '2', '149'),
    ('11', '2019-01-02', '105', '3', '234'),
    ('11', '2019-03-31', '120', '3', '99'),
    ('12', '2019-01-02', '112', '2', '200'),
    ('12', '2019-03-31', '110', '2', '299'),
    ('13', '2019-01-05', '113', '1', '67'),
    ('13', '2019-03-31', '118', '3', '35'),
    ('14', '2019-01-06', '109', '5', '199'),
    ('14', '2019-01-06', '107', '2', '27'),
    ('14', '2019-03-31', '112', '3', '200'),
    ('15', '2019-01-08', '105', '4', '234'),
    ('15', '2019-01-09', '110', '4', '299'),
    ('15', '2019-03-31', '116', '2', '499'),
    ('16', '2019-01-10', '113', '2', '67'),
    ('16', '2019-03-31', '107', '4', '27'),
    ('17', '2019-01-11', '116', '2', '499'),
    ('17', '2019-03-31', '104', '1', '154'),
    ('18', '2019-01-12', '114', '2', '248'),
    ('18', '2019-01-12', '113', '4', '67'),
    ('19', '2019-01-12', '114', '3', '248'),
    ('20', '2019-01-15', '117', '2', '999'),
    ('21', '2019-01-16', '105', '3', '234'),
    ('21', '2019-01-17', '114', '4', '248'),
    ('22', '2019-01-18', '113', '3', '67'),
    ('22', '2019-01-19', '118', '4', '35'),
    ('23', '2019-01-20', '119', '3', '29'),
    ('24', '2019-01-21', '114', '2', '248'),
    ('25', '2019-01-22', '114', '2', '248'),
    ('25', '2019-01-22', '115', '2', '72'),
    ('25', '2019-01-24', '114', '5', '248'),
    ('25', '2019-01-27', '115', '1', '72'),
    ('26', '2019-01-25', '115', '1', '72'),
    ('27', '2019-01-26', '104', '3', '154'),
    ('28', '2019-01-27', '101', '4', '55'),
    ('29', '2019-01-27', '111', '3', '149'),
    ('30', '2019-01-29', '111', '1', '149'),
    ('31', '2019-01-30', '104', '3', '154'),
    ('32', '2019-01-31', '117', '1', '999'),
    ('33', '2019-01-31', '117', '2', '999'),
    ('34', '2019-01-31', '110', '3', '299'),
    ('35', '2019-02-03', '117', '2', '999'),
    ('36', '2019-02-04', '102', '4', '82'),
    ('37', '2019-02-05', '102', '2', '82'),
    ('38', '2019-02-06', '113', '2', '67'),
    ('39', '2019-02-07', '120', '4', '99')
]

marketing_campaign_columns = ["user_id", "created_at", "product_id", "quantity", "price"]
marketing_campaign_df = spark.createDataFrame(marketing_campaign_data, marketing_campaign_columns)


In [None]:
# Convert quantity to integer type
marketing_campaign_df = marketing_campaign_df.withColumn("quantity", col("quantity").cast("int"))
# Calculate total units sold for each product
product_units_sold_df = marketing_campaign_df.groupBy("product_id").agg({"quantity": "sum"}).withColumnRenamed("sum(quantity)", "total_units_sold")
# Categorize ad performance
product_units_sold_df = product_units_sold_df.withColumn(
    "ad_performance", 
    when(col("total_units_sold") >= 30, "Outstanding")
    .when((col("total_units_sold") >= 20) & (col("total_units_sold") <= 29), "Satisfactory")
    .when((col("total_units_sold") >= 10) & (col("total_units_sold") <= 19), "Unsatisfactory")
    .when((col("total_units_sold") >= 1) & (col("total_units_sold") <= 9), "Poor")
    .otherwise("Unknown")
)
# Sort by total units sold in descending order
product_units_sold_df = product_units_sold_df.orderBy(col("total_units_sold").desc())
