##### Entry point of PySpark :  Spark Context
    
    

In [None]:
sparksc = spark.sparkContext

##### Inspect SparkContext

In [None]:
#Retrieve SparkContext version
sc.version   

#Retrieve Python version
sc.pythonVer

#Master URL to connect to
sc.master

#Path where Spark is installed on worker nodes
str(sc.sparkHome)

#Retrieve name of the Spark User running SparkContext
str(sc.sparkUser())

#Return application name
sc.appName

#Retrieve application ID
sc.applicationId

#Return default level of parallelism
sc.defaultParallelism

#Default minimum number of partitions for RDDs
sc.defaultMinPartitions


# Loading Data

#### Load data from external file

In [None]:
# Reading data from text file : Before that import the file nad copy the path
rdd3 = sc.textFile("/FileStore/tables/001_Wordcount.txt")

In [None]:
# Rading data from CSV file

# Path to the CSV file
csv_file_path = "/FileStore/tables/avocado.csv"

# Read the CSV file into a DataFrame
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

df.show()

### Parallelize Collections

#### Use the parallelize method to convert a local collection to an RDD

In [None]:
rdd = sc.parallelize([('a',7),('b',2),('c',2)])

rdd2 = sc.parallelize([('a',2),('d' ,1),('b',1)])

rdd3 = sc.parallelize(range(100))

rdd4 = sc.parallelize([('a',['x' ,'y' ,'z' ]),('b' ,['p' ,'r' ])])

#### Basic Information

#### Summary

In [None]:
#Maximum value of RDD elements
rdd3.max()

#Minimum value of RDD elements
rdd3.min()


#Mean value of RDD elements
rdd3.mean()
 
#Standard deviation of RDD elements
rdd3.stdev()

#Compute variance of RDD elements
rdd3.variance()


#Compute histogram by bins
rdd3.histogram(3)

#Summary statistics (count, mean, stdev, max & min)
rdd3.stats()




#### Applying Functions

In [None]:
# Use case: Use map when you need to transform each element of the RDD independently.
#Apply a function to each RDD element 
rdd.map(lambda x: x+(x[1],x[0])).collect()

# python 
al = [('a',7),('b',2),('c',2)]
list(map(lambda x: x+(x[1],x[0]),al))


# Use case: Use flatMap when each input element should map to zero or more output elements.
#Apply a function to each RDD element and flatten the result
rdd5 = rdd.flatMap(lambda x: x+(x[1],x[0]))

# Use case: Use flatMapValues when working with key-value RDDs and you want to transform the values while keeping the keys intact, potentially expanding each value into multiple values.
#Apply a flatMap function to each (key,value) pair of rdd4 without changing the keys
rdd4.flatMapValues(lambda x: x).collect()


## Function of Dataframe

In [None]:
# show()
data = [(1,'Ram'),(2,'Sam'),(3,'Emily'),(4,'Andy')]
schema = ['id', 'name']

df = spark.createDataFrame(data=data, schema=schema)

df.show()

# df.show(2, truncate=3)
# df.show(4, truncate=3, vertical=True)

In [None]:
# printSchema  : helps to know the datatype of column in a table
df1.printSchema()

#### withColumn

In [None]:
data = [(1,'Ram','3000'),(2,'Sam','4000'),(3,'Emily','3500'),(4,'Andy','6000')]
schema = ['id', 'name','salary']

df1 = spark.createDataFrame(data=data, schema=schema)


# Casting the datatype of a column
from pyspark.sql.functions import col
df2 = df1.withColumn(colName='salary',col=col('salary').cast('Integer'))
df2.show()
df2.printSchema()


In [None]:
# Modifying column data
df3 = df2.withColumn('salary', col('salary')*3)


In [None]:
# Creating new column with value
from pyspark.sql.functions import lit
df4 = df2.withColumn('country', lit('India'))
df4.show()


In [None]:
# copy a column
df5 = df4.withColumn('copysalary', col('salary'))
df5.show()


In [None]:
# Rename column name
df6 =df5.withColumnRenamed('copysalary', 'kopySalary')
df6.show()

#### structType()  and  structField() 

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

data = [(1,'Ram',4000),(2,'Sam',6000),(3,'Emily',10000),(4,'Andy',6200)]

# Define the schema using StructType and StructField
schema = StructType([
    StructField("id", IntegerType()),
    StructField("name", StringType()),
    StructField("salary", IntegerType())
])

df8 = spark.createDataFrame(data, schema)
df8.show()

In [None]:
# using struct type
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

data = [(1,('Ram','Smith'),4000),(2,('Sam','Smith'),6000),(3,('Tim','Smith'),10000),(4,('Paul','Smith'),6200)]

structName = StructType([
                     StructField("firstname", StringType()),
                      StructField("lastname", StringType()),
])

# Define the schema using StructType and StructField
schema = StructType([
    StructField("id", IntegerType()),
    StructField("name", structName),
    StructField("salary", IntegerType())
])

df9 = spark.createDataFrame(data, schema)
df9.show()

#### ArrayType Column 

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, ArrayType

data = [('abc',[1,2,3]),('xyz',[15,23,23]),('dac',[11,22,31])]


# Define the schema using StructType and StructField
schema = StructType([
    StructField("id", StringType()),
    StructField("numbers", ArrayType(IntegerType()))
])

df10 = spark.createDataFrame(data, schema)
df10.show()

# we can use Array index to fetch value
df10.withColumn('firstNumber', df10.numbers[0]).show()

In [None]:
from pyspark.sql.functions import col,array

df1.withColumns('numbers', array(col('id'),col('salary')))

data = [(1,'Ram','3000'),(2,'Sam','4000'),(3,'Emily','3500'),(4,'Andy','6000')]
schema = ['id', 'name','salary']

df1 = spark.createDataFrame(data=data, schema=schema)

# Another use : creating a new Array type column with present column value
df12 = df11.withColumn('numbers', array(col('id'),col('salary')))

#### understanding explode(), split(), array(), array_contains()

In [None]:
data = [(1,'ram',['python','pyspark']),(2,'sam',['sql','aws'])]
schema = ['id', 'name', 'skills']

df12 = spark.createDataFrame(data,schema)
display(df12)

# explode()  : it will create a row for each element in Array
from pyspark.sql.functions import col,explode

df13 = df12.withColumn('oneSkill',explode(col('skills')))
df13.show()

In [None]:
data = [(1,'ram','python,pyspark'),(2,'sam','sql,aws')]
schema = ['id', 'name', 'skills']

df12 = spark.createDataFrame(data,schema)
display(df12)

# split() : it will split string to an Array
from pyspark.sql.functions import col,split
df13 = df12.withColumn('oneSkillArray',split(col('skills'),','))
df13.show() 

In [None]:
data = [(1,'ram','python','pyspark'),(2,'sam','sql','aws')]
schema = ['id', 'name', 'prim_skills','sec_skills']


df13 = spark.createDataFrame(data,schema)
display(df13)

# array()  : to create a column for holding array datatype using column values
from pyspark.sql.functions import col,array 
df14 = df13.withColumn('oneSkillArray',array(col('prim_skills'),col('sec_skills')))
df14.show()

In [None]:
data = [(1,'ram',['python','pyspark']),(2,'sam',['sql','aws'])]
schema = ['id', 'name', 'skills']

df14 = spark.createDataFrame(data,schema)
display(df14)

# array_contains  : return True if element is present : N.B: It is case sensitive
from pyspark.sql.functions import split,col,array, array_contains 
df15 = df14.withColumn('HasPythonSkill',array_contains(col('skills'),'python'))
display(df15)

### MapType

In [None]:
# example of MapType 
data = [('ram',{'hair':'black','eyes':'black','skill':'java'}),('sam',{'hair':'red','eyes':'blue','skill':'python'})]
schema = ['name','properties']

df15 = spark.createDataFrame(data,schema)
display(df15)

In [None]:
# Using structType() defining schema
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType,MapType
data = [('ram',{'hair':'black','eyes':'black','skill':'java'}),('sam',{'hair':'red','eyes':'blue','skill':'python'})]

# Define the schema using StructType and StructField
schema = StructType([
    StructField("name", StringType()),
    StructField("properties", MapType(StringType(),StringType()))
])


df16 = spark.createDataFrame(data,schema)
display(df16)

In [None]:
# scenario 2
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType,MapType
data = [('ram',{'hair':'black','eyes':'black','skill':'java','age':32}),('sam',{'hair':'red','eyes':'blue','skill':'python','age':23})]

# Define the inner schema for the dictionary
inner_schema = StructType([
    StructField("hair", StringType(), True),
    StructField("eyes", StringType(), True),
    StructField("skill", StringType(), True),
    StructField("age", IntegerType(), True)
])

# Define the outer schema
schema = StructType([
    StructField("name", StringType(), True),
    StructField("attributes", inner_schema, True)
])

df = spark.createDataFrame(data, schema=schema)

# Show the DataFrame
df.show(truncate=False)

# Print the schema of the DataFrame
df.printSchema()

In [None]:
# Add a new column 'age' by extracting it from the 'attributes' column
df18 = df17.withColumn("age", col("attributes.age"))
df18.show()

df19 = df18.withColumn("hair", df18.attributes['hair'])
df19.show()

df19 = df18.withColumn("hair", df18.attributes.getItem('hair'))
df19.show() 

In [None]:
# explode()  on map keys : it explodes the dictionary into seperate rows for each keys
from pyspark.sql.functions import explode

df1 = df16.select('name','properties',explode(df16.properties))
df1.show(truncate=False)



In [None]:
# map_keys() : fetches all keys and store in array
from pyspark.sql.functions import map_keys
df2 = df16.withColumn('keys',map_keys(df16.properties))
df2.show()


# map_values() : fetches all keys and store in array
from pyspark.sql.functions import map_values
df2 = df16.withColumn('values',map_values(df16.properties))
df2.show()

## Row()

In [None]:
# we can Row object by using named argument or create custom row like class 
# Row is represent as record/ row in Dataframe

In [None]:
from pyspark.sql import Row

row = Row('ram', 2000)
row1 = Row(name = 'sam', salary =3000)
print(row[0] +' ' + str(row[1]))
print(row1.name + '  ' + str(row1.salary))

In [None]:
# Create dataframe using row
from pyspark.sql import Row

row1 = Row(name = 'sam', salary =3000)
row2 = Row(name = 'ram', salary =4000)

data = [row1,row2]

df = spark.createDataFrame(data)
df.show()
df.printSchema()


In [None]:
# Create Row like class

Person = Row('name','salary')
p1 = Person('ram',3000)
p2 = Person('sam',2000)

print(p1.name + ' '+ str(p1.salary))


In [None]:
# Data in dataframe represent as Row
data = [p1,p2]
df1 = spark.createDataFrame(data)

df1.show()

In [None]:
 ## Creating nested struct using Row()
    data = [Row(name='ram', prop=Row(hair='black',eyes='blue',skill='java')), 
        Row(name='sam', prop=Row(hair='blue',eyes='black',skill='python'))]

df3 = spark.createDataFrame(data)
df3.show()
df3.printSchema()

## Column class 

In [None]:
# lit() function to fill value to a column
from pyspark.sql.functions import lit
df9 = df8.withColumn('newCol', lit('Hello'))
df9.show()

In [None]:
# fetch column
df8.select(df8.name).show()
df8.select(df8['name']).show()
from pyspark.sql.functions import col
df8.select(col('name')).show()

In [None]:
# fetch column vlue if struct type

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

data = [(1,'Ram',4000,('black','blue')),(2,'Sam',6000,('blue','red'))]

propertyType = StructType([
    StructField("hair", StringType()),
    StructField("eyes", StringType()),

])

# Define the schema using StructType and StructField
schema = StructType([
    StructField("id", IntegerType()),
    StructField("name", StringType()),
    StructField("salary", IntegerType()),
    StructField("properties", propertyType)
])

df8 = spark.createDataFrame(data, schema)
df8.show()

In [None]:
df8.select(df8.properties.hair).show()
df8.select(df8['properties.hair']).show()

from pyspark.sql.functions import col
df8.select(col('properties.hair')).show()


## when  AND otherwise

In [None]:
data = [(1,'Ram','M',3000),(2,'Sam','M',6000),(3,'Emily','F',7000),(4,'Andy','',6500)]
schema = ['id', 'name','gender','salary']

df1 = spark.createDataFrame(data=data, schema=schema)

df1.show()

In [None]:
from pyspark.sql.functions import when
df3 =df1.select(df1.id,df1.name,
                when(df1.gender=='M','male')
                .when(df1.gender=='F','female')
                .otherwise('unknown').alias('genderCol')
                )
df3.show()

## Function


In [None]:
# alias() : to rename column 
df1.select(df1.id.alias('emp_id'), df1.name,df1.gender,df1.salary).show()

In [None]:
# asc()
df1.sort(df1.name.asc()).show()

In [None]:
#desc()
df1.sort(df1.name.desc()).show()

In [None]:
# cast()
df3 = df1.select(df1.id,df1.name,df1.gender,df1.salary.cast('int'))
df3.printSchema()

# other available cast type
Integer: cast("int")
String: cast("string")
Float: cast("float")
Double: cast("double")
Date: to_date(column, "format")
Timestamp: to_timestamp(column, "format")
Boolean: cast("boolean")

In [None]:
# All String data

data = [("1", "2024-06-26", "true", "123.45")]
schema = ["integer_col", "date_col", "boolean_col", "float_col"]
df = spark.createDataFrame(data, schema)
df.show()
df.printSchema()

df = df.withColumn("date_col", to_date(col("date_col"), "yyyy-MM-dd"))
df.printSchema()

df = df.withColumn("date_col", to_timestamp(col("date_col"), "yyyy-MM-dd"))
df.printSchema()


# multiple Casting in one line
df = df.withColumn("integer_col", col("integer_col").cast("int")) \
       .withColumn("date_col", to_date(col("date_col"), "yyyy-MM-dd")) \
       .withColumn("boolean_col", col("boolean_col").cast("boolean")) \
       .withColumn("float_col", col("float_col").cast("float"))


In [None]:
# like operator : Same as SQL : Case sensitive

df1.filter(df1.name.like('E%')).show()


### filter()  where() 

In [None]:
df1.filter(df1.gender== 'M').show()

df1.filter("gender =='F' ").show()

df1.where("gender =='M' ").show()


# Multiple condition
df1.filter((df1.gender == 'F') & (df1.salary == 7000)).show()

# Other way

# Correct usage with filter and col functions
from pyspark.sql.functions import col
df1.filter((col("gender") == 'F') & (col("salary") == 7000)).show()

# Using a SQL expression directly in filter
df1.filter("gender = 'F' AND salary = 7000").show()

## distinct()  and   dropDuplicate()

In [None]:
# distinct   : to fetch unique rows
df2.distinct().show()

# dropDuplicates  : N.B if nothing is passed the works as distinct
df2.dropDuplicates().show()

# passing columns 
df2.dropDuplicates(['gender']).show()

# if matches in both column then row is dropped
df2.dropDuplicates(['gender','salary']).show()

## orderBy()  and sort()

In [None]:
data = [(1,'Ram','M',3000,'IT'),(2,'Sam','M',6000,'HR'),(2,'Sam','M',6000,'TE'),(3,'Emily','F',7000,'IT'),(4,'Andy','M',6000,'HR')]
schema = ['id', 'name','gender','salary','department']

df2 = spark.createDataFrame(data=data, schema=schema)

df2.show()

In [None]:
# orderBy and ort are interchangeable

df2.sort('department').show()
df2.sort(df2.department).show()

df2.orderBy('department').show()
df2.orderBy(df2.department).show()

df2.orderBy(df2.department,df2.id).show()
df2.orderBy(df2.department,df2.id.desc()).show()



### Union and unionAll

In [None]:
data1 = [(1,'Ram','M',3000),(2,'Sam','M',6000),(3,'Emily','F',7000),(4,'Andy','M',6000)]
schema = ['id', 'name','gender','salary']

data2 = [(5,'Ramesh','M',4000),(6,'Samy','M',6000),(7,'Emma','F',7000),(8,'SAndy','M',6000)]
schema = ['id', 'name','gender','salary']


df2 = spark.createDataFrame(data=data1, schema=schema)
df3 = spark.createDataFrame(data=data2, schema=schema)

df2.show()
df3.show()

In [None]:
# N.B : duplicate row is not delected in union() and unionAll()
newdf = df2.union(df3)
newdf = df2.unionAll(df3)
newdf.show()

# to remove duplicate use distinct()
newdf.distinct().show()

## unionByName()

In [None]:
# merge two df when schema is different and put Null value to it
df4 = df2.unionByName(df3,allowMissingColumns=True)
df4.show()

### groupBy()

In [None]:
df3 = df2.groupBy('department').count()
df3 = df2.groupBy(df2.department).count()
df3 = df2.groupBy('department').min('salary')
df3 = df2.groupBy('department').max('salary')
df3 = df2.groupBy('department','gender').count()


### agg()


In [None]:
from pyspark.sql.functions import count,max,min
df2.groupBy('department').agg(count('*').alias('EmpCount'),min('salary').alias('minSal'),max('salary').alias('maxSal')).show()


## select

In [None]:
# fetch column
df4.select(df4.id,df4.name,df4.gender).show()
df4.select('id','name','gender').show()
df4.select(df4['id'],df4['name']).show()
df4.select(['id','name']).show()
df4.select('*').show()
df4.select([col for col in df4.columns]).show()
from pyspark.sql.functions import col
df8.select(col('name')).show()

### Join in Pyspark

In [None]:
# Sample employee data
employee_data = [
    (1, "Alice", 101),
    (2, "Bob", 102),
    (3, "Cathy", 101),
    (4, "David", 103),
    (5, "Eva", 102),
    (6, "Emily", 106)
]

# Sample department data
department_data = [
    (101, "HR"),
    (102, "Engineering"),
    (103, "Marketing"),
    (104, "Sales"),
    (105, "Payroll")
]

# Create DataFrames
employee_df = spark.createDataFrame(employee_data, ["EmployeeID", "Name", "DepartmentID"])
department_df = spark.createDataFrame(department_data, ["DepartmentID", "DepartmentName"])

# Show DataFrames
employee_df.show()
department_df.show()

In [None]:
# inner join : matching row fetched from both column
inner_join_df = employee_df.join(department_df, employee_df.DepartmentID == department_df.DepartmentID, "inner")
inner_join_df.show()

In [None]:
# left join : all matched row plus unmatched from left table
left_join_df = employee_df.join(department_df, employee_df.DepartmentID == department_df.DepartmentID, "left")
left_join_df.show()

In [None]:
# right join : all matched row plus unmatched from right table
right_join_df = employee_df.join(department_df, employee_df.DepartmentID == department_df.DepartmentID, "right")
right_join_df.show()

In [None]:
# full outer join : all matched row plus unmatched from left and right table
outer_join_df = employee_df.join(department_df, employee_df.DepartmentID == department_df.DepartmentID, "outer")
outer_join_df.show()



In [None]:
# cross join : Returns the Cartesian product of the two DataFrames.
cross_join_df = employee_df.crossJoin(department_df)
cross_join_df.show()


In [None]:
# left semi : fetching matched row(inner join) but show only left table
left_semi_join_df = employee_df.join(department_df, employee_df.DepartmentID == department_df.DepartmentID, "left_semi")
left_semi_join_df.show()


In [None]:
# left anti : opposite of left semi : i.e fetchin non matching row and shows left table
left_anti_join_df = employee_df.join(department_df, employee_df.DepartmentID == department_df.DepartmentID, "left_anti")
left_anti_join_df.show()


In [None]:
# self join : join with same data frame
employee_data_with_manager = [
    (1, "Alice", 101, None),
    (2, "Bob", 102, 1),
    (3, "Cathy", 101, 1),
    (4, "David", 103, 2),
    (5, "Eva", 102, 2)
]

# Create DataFrame
employee_with_manager_df = spark.createDataFrame(employee_data_with_manager, ["EmployeeID", "Name", "DepartmentID", "ManagerID"])
employee_with_manager_df.show()


from pyspark.sql.functions import col
self_join_df = employee_with_manager_df.alias("emp").join(
    employee_with_manager_df.alias("mgr"),
    col("emp.ManagerID") == col("mgr.EmployeeID"),
    "left"
).select(
    col("emp.EmployeeID").alias("EmployeeID"),
    col("emp.Name").alias("EmployeeName"),
    col("mgr.Name").alias("ManagerName")
)

self_join_df.show()

## pivot()

In [None]:
# Sample employee data
employee_data = [
    (1, "Alice", "HR", 5000,"male"),
    (2, "Bob", "Engineering", 6000,"male"),
    (3, "Cathy", "HR", 5500,"female"),
    (4, "David", "Engineering", 7000,"male"),
    (5, "Eva", "Marketing", 6000,"female")
]

# Create DataFrame
employee_df = spark.createDataFrame(employee_data, ["EmployeeID", "Name", "Department", "Salary","Gender"])

# Show DataFrame
employee_df.show()


In [None]:
from pyspark.sql.functions import sum

# Pivot the DataFrame
pivot_df = employee_df.groupBy("Department") \
    .pivot("Name") \
    .sum("Salary")

pivot_df.show()


In [None]:
from pyspark.sql.functions import sum

# Pivot the DataFrame
nopivot_df = employee_df.groupBy("Department","Gender").count()

# Pivot the DataFrame
pivot_df = employee_df.groupBy("Department").pivot("Gender").count()
nopivot_df.show()
pivot_df.show()


### Unpivot dataframe

In [None]:
data = [("HR", 10, 15), ("IT", 20, 5), ("Sales", 30, 25)]
columns = ["department", "male", "female"]
df = spark.createDataFrame(data, columns)
df.show()

In [None]:
from pyspark.sql.functions import expr

# Unpivot DataFrame : for column name no single quote
unpivoted_df = df.selectExpr("department", "stack(2, 'M', male, 'F', female) as (gender, count)")

# Unpivot DataFrame : for column name no single quote
unpivoted_df = df.select("department", expr("stack(2, 'M', male, 'F', female) as (gender, count)"))

# Show the result
unpivoted_df.show()

#### fill() and fillna()

In [None]:
# Sample DataFrame
data = [
    (1, "Alice", "F", 5000, "HR"),
    (2, "Bob", "M", None, "IT"),
    (3, "Carol", "F", 7000, None),
    (4, "Dave", None, 8000, "IT"),
    (5, None, "M", 9000, "Sales")
]
columns = ["id", "name", "gender", "salary", "department"]
df = spark.createDataFrame(data, columns)

# Show the original DataFrame
df.show()

In [None]:
# Fill null values in specific columns
filled_df = df.fillna({"name": "Unknown", "salary": 0, "department": "Unknown"})

# Show the result
filled_df.show()

In [None]:
# Fill null values across all columns with a single value
filled_df_all = df.fillna("Unknown")

# Show the result
filled_df_all.show()

In [None]:
# Fill null values across all columns with a single value
filled_df_all = df.na.fill("Unknown","department")

# Show the result
filled_df_all.show()


### sample()

In [None]:
# Path to the CSV file
csv_file_path = "/FileStore/tables/avocado.csv"

# Read the CSV file into a DataFrame
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

df.show()

df1 = df.sample(fraction=0.1)
display(df1)

# to get same row use seed
df1 = df.sample(fraction=0.1, seed =123)
display(df1)

### collect()

In [None]:
# Sample DataFrame
data = [
    (1, "Alice", "F", 5000, "HR"),
    (2, "Bob", "M", 6000, "IT"),
    (3, "Carol", "F", 7000, "IT"),
    (4, "Dave", "M", 8000, "HR"),
    (5, "Eve", "F", 9000, "Sales")
]
columns = ["id", "name", "gender", "salary", "department"]
df = spark.createDataFrame(data, columns)

# Show the original DataFrame
df.show()


In [None]:
# Collect the DataFrame to the driver as a list of Row objects
collected_data = df.collect()

# Print the collected data
for row in collected_data:
    print(row)

    
print(collected_data)
print(collected_data[0])
print(collected_data[0][1])

#### transform()

In [None]:
from pyspark.sql.functions import col, when

# Sample DataFrame
data = [
    (1, "Alice", "F", 5000, "HR"),
    (2, "Bob", "M", 6000, "IT"),
    (3, "Carol", "F", 7000, "IT"),
    (4, "Dave", "M", 8000, "HR"),
    (5, "Eve", "F", 9000, "Sales")
]
columns = ["id", "name", "gender", "salary", "department"]
df = spark.createDataFrame(data, columns)

df.show()

In [None]:
# Define a custom transformation function
def add_bonus(df):
    return df.withColumn("bonus", when(col("salary") < 7000, col("salary") * 0.10).otherwise(col("salary") * 0.05))


In [None]:
# Define a custom transformation function
def convertToUpper(df):
    return df.withColumn("name", upper("name")


In [None]:
# Apply the transformation using the transform method
transformed_df = df.transform(add_bonus)

# Show the result
transformed_df.show()

In [None]:
# transform()   : applied on column of array type


In [None]:
from pyspark.sql.functions import col, transform, upper

# Define a custom transformation function
def convertArrayToUpper(x):
    return upper(x)

# Sample DataFrame
data = [
    ("Alice", ["Python", "Java", "Scala"]),
    ("Bob", ["C++", "Python", "JavaScript"]),
    ("Carol", ["Java", "C#", "Python"]),
    ("Dave", ["JavaScript", "HTML", "CSS"]),
    ("Eve", ["Scala", "Python", "Go"])
]

columns = ["name", "programmingskills"]
df = spark.createDataFrame(data, columns)

# Define the transformation to convert each programming skill to uppercase
transformed_df = df.withColumn("programmingskills", transform("programmingskills", convertArrayToUpper))

# Show the result
transformed_df.show(truncate=False)


### createOrReplaceTempView()

In [None]:
# Sample DataFrame
data = [
    (1, "Alice", 5000),
    (2, "Bob", 6000),
    (3, "Carol", 7000),
    (4, "Dave", 8000),
    (5, "Eve", 9000)
]
columns = ["id", "name", "salary"]
df = spark.createDataFrame(data, columns)
df.show()

In [None]:
# Create a temporary view
df.createOrReplaceTempView("employee")

# Perform an SQL query on the temporary view
result_df = spark.sql("SELECT * FROM employee WHERE salary > 6000")

# Show the result
result_df.show()


OR

%sql
select id,upper(name) as Name from employee

### createOrReplaceGlobalTempView

In [None]:
# Create a temporary view
df.createOrReplaceGlobalTempView("employee")


%sql
SELECT * from global_temp.employee

In [None]:
# to view tables from current session
spark.catalog.listTables(spark.catalog.currentDatabase())


# to view global temp tables
spark.catalog.listTables('global_temp')

### UDF

In [None]:
from pyspark.sql.functions import udf , col
from pyspark.sql.types import IntegerType

# Sample DataFrame
data = [
    (1, "Alice", 5000, 500),
    (2, "Bob", 6000, 600),
    (3, "Carol", 7000, 700),
    (4, "Dave", 8000, 800),
    (5, "Eve", 9000, 900)
]
columns = ["id", "name", "salary", "bonus"]
df = spark.createDataFrame(data, columns)

# Define a UDF to calculate total compensation
def calculate_total_compensation(salary, bonus):
    return salary + bonus

# Register the UDF
calculate_total_compensation_udf = udf(calculate_total_compensation, IntegerType())

# Apply the UDF to the DataFrame
df_with_total_compensation = df.withColumn("total_compensation", calculate_total_compensation_udf(df.salary, df.bonus))

# Show the result
df_with_total_compensation.show()



In [None]:
# Define a UDF to calculate total compensation using the @udf decorator
@udf(returnType=IntegerType())
def calculate_total_compensation(salary, bonus):
    return salary + bonus

# Apply the UDF to the DataFrame
df_with_total_compensation = df.withColumn("total_compensation", calculate_total_compensation(col("salary"), col("bonus")))

# Show the result
df_with_total_compensation.show()

#### udf.register

In [None]:
def calculate_total_compensation(salary, bonus):
    return salary + bonus

spark.udf.register(name='TotalPaySQL', f=calculate_total_compensation, returnType=IntegerType())

In [None]:
%sql
Select id, TotalPaySQL(salary,bonus) as totPay from emps

## Understanding RDD and some functions

In [None]:
# Sample data
data = [
    (1, "Alice", 5000, 500),
    (2, "Bob", 6000, 600),
    (3, "Carol", 7000, 700),
    (4, "Dave", 8000, 800),
    (5, "Eve", 9000, 900)
]

columns = ["id", "name", "salary", "bonus"]

# create dataframe
df = spark.createDataFrame(data, columns)
df.show()

In [None]:
# Create an RDD from Python list
rdd = sc.parallelize(data)

In [None]:
# to show RDD
rdd.collect()

## map() example:  it iterates over each element and perform some action

In [None]:
# Increase salary by 10%
def increase_salary(record):
    id, name, salary, bonus = record
    return (id, name, salary * 1.10, bonus)


rdd_with_increased_salary = rdd.map(increase_salary)

print("map() result:", rdd_with_increased_salary.collect())

## flatMap()


In [None]:
# flatMap() example: Split names into characters
def split_name_into_chars(record):
    id, name, salary, bonus = record
    return list(name)

rdd_with_split_names = rdd.flatMap(split_name_into_chars)
print("flatMap() result:", rdd_with_split_names.collect())

### JSON

In [None]:
from pyspark.sql.functions import col, from_json, to_json, json_tuple, schema_of_json, get_json_object
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define JSON schema
json_schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

# Read JSON file into DataFrame
df = spark.read.json("path/to/json/file")

# Parse JSON string column into StructType
df_parsed = df.withColumn("data", from_json(col("value"), json_schema))

# Extract JSON object based on path
df_extracted = df_parsed.withColumn("name", get_json_object(col("value"), "$.name"))

# Convert StructType column back to JSON string
df_json = df_extracted.withColumn("json_string", to_json(col("data")))

# Write DataFrame to JSON file
df_json.write.json("path/to/output/json")

# Show the result
df_json.show(truncate=False)


In [None]:
from pyspark.sql.functions import col, to_json, from_json, get_json_object, json_tuple
from pyspark.sql.types import StructType, StructField, StringType

# Sample data
data = [
    ('ram', {'hair': 'black', 'eyes': 'black', 'skill': 'java'}),
    ('sam', {'hair': 'red', 'eyes': 'blue', 'skill': 'python'})
]

# Create DataFrame
df = spark.createDataFrame(data, ["name", "attributes"])

df.show(truncate=False)
df.printSchema()

In [None]:
# Convert the dictionary to JSON format
df = df.withColumn("attributes_json", to_json(col("attributes")))
df.show(truncate=False)
df.printSchema()

### from_json

In [None]:
# Define schema for parsing the JSON column
schema = StructType([
    StructField("hair", StringType(), True),
    StructField("eyes", StringType(), True),
    StructField("skill", StringType(), True)
])

# Parse JSON column
df_parsed = df.withColumn("parsed_attributes", from_json(col("attributes_json"), schema))

df_parsed.show(truncate=False)
df_parsed.printSchema()

#### get_json_object

In [None]:
# Extract specific fields using get_json_object
df_with_fields = df_parsed.withColumn("hair", get_json_object(col("attributes_json"), "$.hair")) \
                          .withColumn("eyes", get_json_object(col("attributes_json"), "$.eyes")) \
                          .withColumn("skill", get_json_object(col("attributes_json"), "$.skill"))

df_with_fields.show()

## json_tuple()

In [None]:
# Extract fields using json_tuple
df_with_tuple = df_with_fields.withColumn("hair_tuple", json_tuple(col("attributes_json"), "hair")) \
                              .withColumn("eyes_tuple", json_tuple(col("attributes_json"), "eyes")) \
                              .withColumn("skill_tuple", json_tuple(col("attributes_json"), "skill"))

# Show the result
df_with_tuple.select("name", "attributes_json", "hair", "eyes", "skill", "hair_tuple", "eyes_tuple", "skill_tuple").show(truncate=False)

#### Date time

In [None]:
from pyspark.sql.functions import col, current_date, current_timestamp, date_format, year, month, dayofmonth, hour, minute, second, datediff, add_months, date_add, date_sub, to_date, to_timestamp

# Sample data
data = [("2024-07-01", "2024-01-01 12:34:56")]
df = spark.createDataFrame(data, ["date_str", "timestamp_str"])

# Convert string columns to date and timestamp
df = df.withColumn("date", to_date(col("date_str"), "yyyy-MM-dd")) \
       .withColumn("timestamp", to_timestamp(col("timestamp_str"), "yyyy-MM-dd HH:mm:ss"))

# Apply date functions
df = df.withColumn("current_date", current_date()) \
       .withColumn("current_timestamp", current_timestamp()) \
       .withColumn("formatted_date", date_format(col("date"), "MM/dd/yyyy")) \
       .withColumn("year", year(col("date"))) \
       .withColumn("month", month(col("date"))) \
       .withColumn("day_of_month", dayofmonth(col("date"))) \
       .withColumn("hour", hour(col("timestamp"))) \
       .withColumn("minute", minute(col("timestamp"))) \
       .withColumn("second", second(col("timestamp"))) \
       .withColumn("days_diff", datediff(col("current_date"), col("date"))) \
       .withColumn("add_months", add_months(col("date"), 2)) \
       .withColumn("date_add", date_add(col("date"), 10)) \
       .withColumn("date_sub", date_sub(col("date"), 10))

# Show the result
df.select("date_str", "timestamp_str", "date", "timestamp", "current_date", "current_timestamp", "formatted_date", "year", "month", "day_of_month", "hour", "minute", "second", "days_diff", "add_months", "date_add", "date_sub").show(truncate=False)


#### approx_count_distinct(),avg(),collect_list()

In [None]:
from pyspark.sql.functions import approx_count_distinct, avg, collect_list


# Sample data
data = [
    ("Alice", 1, 50),
    ("Bob", 2, 60),
    ("Alice", 3, 50),
    ("Bob", 4, 70),
    ("Alice", 5, 80),
    ("Bob", 6, 80)
]

# Create DataFrame
df = spark.createDataFrame(data, ["name", "id", "score"])

# Group by name and apply aggregation functions
result_df = df.groupBy("name").agg(
    approx_count_distinct("id").alias("approx_distinct_id"),
    avg("score").alias("avg_score"),
    collect_list("score").alias("scores_list")
)

# Show the result
result_df.show(truncate=False)



### row_number(), rank(),dense_rank()

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, rank, dense_rank


# Sample data
data = [
    ("Alice", "Math", 85),
    ("Alice", "English", 78),
    ("Alice", "Science", 92),
    ("Bob", "Math", 95),
    ("Bob", "English", 89),
    ("Bob", "Science", 72),
    ("Charlie", "Math", 70),
    ("Charlie", "English", 65),
    ("Charlie", "Science", 80)
]

# Create DataFrame
df = spark.createDataFrame(data, ["name", "subject", "score"])

# Define a window specification
windowSpec = Window.partitionBy("name").orderBy(col("score").desc())

# Apply row_number, rank, and dense_rank
df_with_ranks = df.withColumn("row_number", row_number().over(windowSpec)) \
                  .withColumn("rank", rank().over(windowSpec)) \
                  .withColumn("dense_rank", dense_rank().over(windowSpec))

# Show the result
df_with_ranks.show(truncate=False)
