In [0]:
spark

empty_df = spark.createDataFrame([], "id INT, name STRING")
empty_df.show()

In [0]:

from pyspark import SparkConf ,SparkContext

conf=SparkConf().setAppName("name").setMaster("local[*]")
sc=SparkContext(conf=conf)

rdd_data1=sc.parallalized([1,2,3,4,5])
rdd_data1.toDF("id").show()


In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

data_schema=StructType(
			[
			StructField("id",IntegerType(),True),
			StructField("name",StringType(),True),
			StructField("age",IntegerType(),True),
			StructField("sal",DecimalType(10,2),True)
			]
)


In [0]:
data = [
    (1, "Alice", "Engineering", 85000, "2021-01-15"),
    (2, "Bob", "Engineering", 95000, "2020-03-10"),
    (3, "Charlie", "HR", 65000, "2019-07-23"),
    (4, "David", "Finance", 72000, "2022-11-01"),
    (5, "Eva", "Engineering", 99000, "2018-05-19"),
    (6, "Frank", "Finance", 80000, "2021-09-12"),
    (7, "Grace", "HR", 60000, "2023-02-14"),
    (8, "Helen", "Engineering", 87000, "2020-12-05"),
]

columns = ["emp_id", "name", "department", "salary", "hire_date"]

df_data = spark.createDataFrame(data, columns)
df_data.show()

In [0]:
from pyspark.sql import functions as F 
df_data.withColumn("new_column_upper",F.upper(col("department"))).show()

In [0]:
df_data.select("emp_id",col("name"),df_data["department"],df_data.salary).show()

In [0]:
#1. Add a new column
 df_data.withColumn("salary_bonus",col("salary")*1.13).show()

#2. Modify an existing column
 df_data.withColumn("name",concat(col("name"),lit(" K "))).show()

#3. Add derived columns
df_data.withColumn("hire_date",F.year(col("hire_date"))).show()

#4. Conditional logic (when otherwise)
from pyspark.sql.functions import col, when
df_data.withColumn("case_clm",
			when(
               col("salary")>=90000,"high"
               ).when(
                   ((col("salary")<90000) & (col("salary")>=80000)),"mid"
                   )
			.otherwise("low")
).show()



In [0]:
# Renames an existing column in the DataFrame. 
df_data.withColumnRenamed("name","first_name").show()



In [0]:
df_data.filter(col("salary")>80000).show()

In [0]:
df_data.filter(F.col("name").like("%k")).show()

In [0]:
df_data.drop("salary").show()

In [0]:
df_data.dropDuplicates().show()

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

df_data.orderBy(col("name").desc()).show()


In [0]:
# group by

df_data.groupBy("department").agg(count(col("name"))).show()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("JoinsUnionsPractice").getOrCreate()

# Employee Data
data_emp = [
    (1, "Alice", 101, 85000),
    (2, "Bob", 102, 95000),
    (3, "Charlie", 103, 65000),
    (4, "David", 101, 72000),
    (5, "Eva", 104, 99000),
]

columns_emp = ["emp_id", "name", "dept_id", "salary"]

df_emp = spark.createDataFrame(data_emp, columns_emp)
df_emp.show()

In [0]:
# Department Data
data_dept = [
    (101, "Engineering"),
    (102, "HR"),
    (103, "Finance"),
    (105, "Marketing"),
]

columns_dept = ["dept_id", "dept_name"]

df_dept = spark.createDataFrame(data_dept, columns_dept)
df_dept.show()

In [0]:
df_emp.join(df_dept,on="dept_id").show()

JOINS


In [0]:
spark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import *
from pyspark.sql.functions import *

customer_data = [(1,'manish','patna',"30-05-2022"),
(2,'vikash','kolkata',"12-03-2023"),
(3,'nikita','delhi',"25-06-2023"),
(4,'rahul','ranchi',"24-03-2023"),
(5,'mahesh','jaipur',"22-03-2023"),
(6,'prantosh','kolkata',"18-10-2022"),
(7,'raman','patna',"30-12-2022"),
(8,'prakash','ranchi',"24-02-2023"),
(9,'ragini','kolkata',"03-03-2023"),
(10,'raushan','jaipur',"05-02-2023")]
customer_schema=['customer_id','customer_name','address','date_of_joining']

customer_df=spark.createDataFrame(customer_data,customer_schema)
customer_df.show()




In [0]:
sales_data = [(1,22,10,"01-06-2022"),
(1,27,5,"03-02-2023"),
(2,5,3,"01-06-2023"),
(5,22,1,"22-03-2023"),
(7,22,4,"03-02-2023"),
(9,5,6,"03-03-2023"),
(2,1,12,"15-06-2023"),
(1,56,2,"25-06-2023"),
(5,12,5,"15-04-2023"),
(11,12,76,"12-03-2023")]
sales_schema=['customer_id','product_id','quantity','date_of_purchase']

sales_df=spark.createDataFrame(sales_data,sales_schema)
sales_df.show()

In [0]:
product_data = [(1, 'fanta',20),
(2, 'dew',22),
(5, 'sprite',40),
(7, 'redbull',100),
(12,'mazza',45),
(22,'coke',27),
(25,'limca',21),
(27,'pepsi',14),
(56,'sting',10)]
product_schema=['id','name','price']

product_df=spark.createDataFrame(product_data,product_schema)
product_df.show()   

In [0]:
#inner join

customer_df.join(sales_df,customer_df["customer_id"]==sales_df["customer_id"],"inner").show()

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

leet_code_data = [
    (1, 'Will', None),
    (2, 'Jane', None),
    (3, 'Alex', 2),
    (4, 'Bill', None),
    (5, 'Zack', 1),
    (6, 'Mark', 2)
]

In [0]:
spark

In [0]:
leet_code_shema=StructType([StructField('id',IntegerType(),True),
                 StructField('name',StringType(),True),
                 StructField('refer_id',IntegerType(),True)])


In [0]:
leet_code_data_1=spark.createDataFrame(leet_code_data,leet_code_shema)

In [0]:
leet_code_data_1.show()

In [0]:
leet_code_data_1.filter(col("refer_id").isNull()).show()

In [0]:
data = [
    (1, "Alice", "Engineering", 85000, "2021-01-15"),
    (2, "Bob", "Engineering", 95000, "2020-03-10"),
    (3, "Charlie", "HR", 65000, "2019-07-23"),
    (4, "David", "Finance", 72000, "2022-11-01"),
    (5, "Eva", "Engineering", 99000, "2018-05-19"),
    (6, "Frank", "Finance", 80000, "2021-09-12"),
    (7, "Grace", "HR", 60000, "2023-02-14"),
    (8, "Helen", "Engineering", 87000, "2020-12-05"),
]

columns = ["emp_id", "name", "department", "salary", "hire_date"]

df_data = spark.createDataFrame(data, columns)
df_data.show()

In [0]:
#- Show only employees from the Engineering department

from pyspark.sql.types import *
from pyspark.sql.functions import *

df_data.select("*").where(col("department")=="Engineering").show()


In [0]:
#- Select only name and salary columns.

df_data.select("name","salary").show()

In [0]:
#Count how many employees are in each department.

df_data.groupby("department")\
		.agg(count("name"))\
		.show()

In [0]:
# Find the average salary per department.
df_data.groupby("department")\
		.agg(avg("salary"))\
		.show()

In [0]:
# Filter employees hired after 2020-01-01.
df_data.select("name","hire_date").filter(col("hire_date")>="2020-01-01").show()


In [0]:
# Sort employees by salary in descending order.
df_data.sort(col("salary").desc()).show()

In [0]:
# Add a new column bonus = 10% of salary.
df_data.withColumn("bonus",round((col("salary")*1.13),2)).show()
