In [4]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
spark

### 1.Broadcast variable

In PySpark, a broadcast variable is a read-only variable that is cached on each node in a cluster. Broadcast variables are used to efficiently distribute large, read-only datasets to all nodes in a cluster for use in parallel processing tasks.

In [6]:
status_map = {"M" : "Married", "S": "Single"}

data = [
    (1,"Kim","M", 74),
    (2,"Lee","S", 30),
    (3,"Choi","M", 45),
    (4,"Park","S", 28),
    (5,"Han","M", 55),
    (6,"Cho","S", 32)
]

In [7]:
# creating broadcast variable and storing the information
broadcast_status = sc.broadcast(status_map)

df = spark.createDataFrame(data, schema=["id", "name", "status", "age"])
df.printSchema()
df.show()


root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- status: string (nullable = true)
 |-- age: long (nullable = true)

+---+----+------+---+
| id|name|status|age|
+---+----+------+---+
|  1| Kim|     M| 74|
|  2| Lee|     S| 30|
|  3|Choi|     M| 45|
|  4|Park|     S| 28|
|  5| Han|     M| 55|
|  6| Cho|     S| 32|
+---+----+------+---+



In [8]:
broadcast_status.value["M"]

'Married'

In [9]:
from pyspark.sql.functions import col, udf

def convert_status(char):
    return broadcast_status.value[char]

convert_status_udf = udf(convert_status)

df_2 = df.withColumn("status_info", convert_status_udf(col("status")))
df_2.show()

+---+----+------+---+-----------+
| id|name|status|age|status_info|
+---+----+------+---+-----------+
|  1| Kim|     M| 74|    Married|
|  2| Lee|     S| 30|     Single|
|  3|Choi|     M| 45|    Married|
|  4|Park|     S| 28|     Single|
|  5| Han|     M| 55|    Married|
|  6| Cho|     S| 32|     Single|
+---+----+------+---+-----------+



### 2.Partition

In PySpark, a partition is a logical division of data that is used to distribute data across nodes in a cluster for parallel processing. Partitioning is a key concept in PySpark and is used to optimize the performance of data processing tasks.

- **Data Shuffle**  refers to the process of redistributing data across nodes in a cluster during parallel processing tasks. Shuffling is typically required when performing operations like joins, aggregations, and sorting, which require data to be reorganized based on specific keys

- **Data skew** refers to the phenomenon where certain keys in a dataset have a disproportionately large number of values associated with them. When data is skewed, some nodes in the cluster may receive significantly more data than others, leading to an imbalance in processing workload.

In [13]:
dataset = [
("Alice", 35, 'HR', 3000),
("Charlie", 40, 'Sales', 4000),
("David", 28, 'Marketing', 2500),
("Eve", 32, 'Finance', 5000),
("Frank", 45, 'IT', 6000),
("Grace", 29, 'HR', 3500),
("Heidi", 38, 'Sales', 4500),
("Ivan", 49, 'Marketing', 5500),
("Judy", 31, 'Finance', 6500),
("Karen", 50, 'IT', 7000),
("Leo", 37, 'HR', 3700),
("Mike", 44, 'Sales', 4400)
]

col = ["Name", "Age", "Department", "Salary"]

employeedf = spark.createDataFrame(data = dataset, schema = col)

In [14]:
employeedf.show()

+-------+---+----------+------+
|   Name|Age|Department|Salary|
+-------+---+----------+------+
|  Alice| 35|        HR|  3000|
|Charlie| 40|     Sales|  4000|
|  David| 28| Marketing|  2500|
|    Eve| 32|   Finance|  5000|
|  Frank| 45|        IT|  6000|
|  Grace| 29|        HR|  3500|
|  Heidi| 38|     Sales|  4500|
|   Ivan| 49| Marketing|  5500|
|   Judy| 31|   Finance|  6500|
|  Karen| 50|        IT|  7000|
|    Leo| 37|        HR|  3700|
|   Mike| 44|     Sales|  4400|
+-------+---+----------+------+



RDD stands for **Resilient Distributed Datasets**. It is a fundamental data structure in Apache Spark that represents an immutable distributed collection of objects, which can be processed in parallel across a cluster of machines.

##### partitionBy
It partitions the data based on the specified column and writes each partition to a separate file. 

In [16]:
employeedf.write.option("header",True) \
        .partitionBy("Department") \
        .mode("overwrite") \
        .csv("employee")

##### coalesce
It reduces the number of partitions in an RDD without shuffling data. 

In [21]:
employeedf.coalesce(2) \
        .write.option("header",True) \
        .mode("overwrite") \
        .csv("emp")

##### repartition
This can be useful when the existing partitions are not balanced, or when you want to increase or decrease the number of partitions.

In [22]:
employeedf.repartition(2) \
        .write.option("header",True) \
        .mode("overwrite") \
        .csv("empl")

### 3.withColumn function

The withColumn() function in PySpark is used to add a new column to an existing DataFrame or replace an existing column with a new one. The function takes two arguments: the name of the new column and the column expression that defines the values for the new column.

In [23]:
employeedf.show()

+-------+---+----------+------+
|   Name|Age|Department|Salary|
+-------+---+----------+------+
|  Alice| 35|        HR|  3000|
|Charlie| 40|     Sales|  4000|
|  David| 28| Marketing|  2500|
|    Eve| 32|   Finance|  5000|
|  Frank| 45|        IT|  6000|
|  Grace| 29|        HR|  3500|
|  Heidi| 38|     Sales|  4500|
|   Ivan| 49| Marketing|  5500|
|   Judy| 31|   Finance|  6500|
|  Karen| 50|        IT|  7000|
|    Leo| 37|        HR|  3700|
|   Mike| 44|     Sales|  4400|
+-------+---+----------+------+



##### Update existing column using values of another existing column

In [25]:
from pyspark.sql.functions import lit, current_timestamp

employeedf = employeedf.withColumn("percentage_increase", lit(10)) \
                       .withColumn("current_time", current_timestamp())
employeedf.show()
employeedf.printSchema()


+-------+---+----------+------+-------------------+--------------------+
|   Name|Age|Department|Salary|percentage_increase|        current_time|
+-------+---+----------+------+-------------------+--------------------+
|  Alice| 35|        HR|  3000|                 10|2024-03-22 14:25:...|
|Charlie| 40|     Sales|  4000|                 10|2024-03-22 14:25:...|
|  David| 28| Marketing|  2500|                 10|2024-03-22 14:25:...|
|    Eve| 32|   Finance|  5000|                 10|2024-03-22 14:25:...|
|  Frank| 45|        IT|  6000|                 10|2024-03-22 14:25:...|
|  Grace| 29|        HR|  3500|                 10|2024-03-22 14:25:...|
|  Heidi| 38|     Sales|  4500|                 10|2024-03-22 14:25:...|
|   Ivan| 49| Marketing|  5500|                 10|2024-03-22 14:25:...|
|   Judy| 31|   Finance|  6500|                 10|2024-03-22 14:25:...|
|  Karen| 50|        IT|  7000|                 10|2024-03-22 14:25:...|
|    Leo| 37|        HR|  3700|                 10|

##### Update existing column based on another column

In [28]:
from pyspark.sql.functions import col

employeedf = employeedf.withColumn("Salary", col("Salary") + (col("Salary") * (col("percentage_increase") * 0.01)))
employeedf.show()
employeedf.printSchema()


+-------+---+----------+------+-------------------+--------------------+
|   Name|Age|Department|Salary|percentage_increase|        current_time|
+-------+---+----------+------+-------------------+--------------------+
|  Alice| 35|        HR|3300.0|                 10|2024-03-22 14:28:...|
|Charlie| 40|     Sales|4400.0|                 10|2024-03-22 14:28:...|
|  David| 28| Marketing|2750.0|                 10|2024-03-22 14:28:...|
|    Eve| 32|   Finance|5500.0|                 10|2024-03-22 14:28:...|
|  Frank| 45|        IT|6600.0|                 10|2024-03-22 14:28:...|
|  Grace| 29|        HR|3850.0|                 10|2024-03-22 14:28:...|
|  Heidi| 38|     Sales|4950.0|                 10|2024-03-22 14:28:...|
|   Ivan| 49| Marketing|6050.0|                 10|2024-03-22 14:28:...|
|   Judy| 31|   Finance|7150.0|                 10|2024-03-22 14:28:...|
|  Karen| 50|        IT|7700.0|                 10|2024-03-22 14:28:...|
|    Leo| 37|        HR|4070.0|                 10|

##### Changing datatype a column using withColumn transformation

In [30]:
employeedf = employeedf.withColumn("salary",col("salary").cast("Integer"))
employeedf.show()
employeedf.printSchema()

+-------+---+----------+------+-------------------+--------------------+
|   Name|Age|Department|salary|percentage_increase|        current_time|
+-------+---+----------+------+-------------------+--------------------+
|  Alice| 35|        HR|  3300|                 10|2024-03-22 14:34:...|
|Charlie| 40|     Sales|  4400|                 10|2024-03-22 14:34:...|
|  David| 28| Marketing|  2750|                 10|2024-03-22 14:34:...|
|    Eve| 32|   Finance|  5500|                 10|2024-03-22 14:34:...|
|  Frank| 45|        IT|  6600|                 10|2024-03-22 14:34:...|
|  Grace| 29|        HR|  3850|                 10|2024-03-22 14:34:...|
|  Heidi| 38|     Sales|  4950|                 10|2024-03-22 14:34:...|
|   Ivan| 49| Marketing|  6050|                 10|2024-03-22 14:34:...|
|   Judy| 31|   Finance|  7150|                 10|2024-03-22 14:34:...|
|  Karen| 50|        IT|  7700|                 10|2024-03-22 14:34:...|
|    Leo| 37|        HR|  4070|                 10|

##### Conditional update using withColumn

In [40]:
from pyspark.sql.functions import when

employeedf = employeedf.withColumn('Department', when(employeedf['Department'] == "IT",
                 "Information Technology").otherwise(employeedf['Department']))
employeedf.show()

+-------+---+--------------------+------+-------------------+--------------------+
|   Name|Age|          Department|salary|percentage_increase|        current_time|
+-------+---+--------------------+------+-------------------+--------------------+
|  Alice| 35|                  HR|  3300|                 10|2024-03-22 14:41:...|
|Charlie| 40|               Sales|  4400|                 10|2024-03-22 14:41:...|
|  David| 28|           Marketing|  2750|                 10|2024-03-22 14:41:...|
|    Eve| 32|             Finance|  5500|                 10|2024-03-22 14:41:...|
|  Frank| 45|Information Techn...|  6600|                 10|2024-03-22 14:41:...|
|  Grace| 29|                  HR|  3850|                 10|2024-03-22 14:41:...|
|  Heidi| 38|               Sales|  4950|                 10|2024-03-22 14:41:...|
|   Ivan| 49|           Marketing|  6050|                 10|2024-03-22 14:41:...|
|   Judy| 31|             Finance|  7150|                 10|2024-03-22 14:41:...|
|  K

##### Renaming a column name using withColumn

In [42]:
employeedf = employeedf.withColumnRenamed("percentage_increase","bonus_percentage") 

employeedf.show()

+-------+---+--------------------+------+----------------+--------------------+
|   Name|Age|          Department|salary|bonus_percentage|        current_time|
+-------+---+--------------------+------+----------------+--------------------+
|  Alice| 35|                  HR|  3300|              10|2024-03-22 14:43:...|
|Charlie| 40|               Sales|  4400|              10|2024-03-22 14:43:...|
|  David| 28|           Marketing|  2750|              10|2024-03-22 14:43:...|
|    Eve| 32|             Finance|  5500|              10|2024-03-22 14:43:...|
|  Frank| 45|Information Techn...|  6600|              10|2024-03-22 14:43:...|
|  Grace| 29|                  HR|  3850|              10|2024-03-22 14:43:...|
|  Heidi| 38|               Sales|  4950|              10|2024-03-22 14:43:...|
|   Ivan| 49|           Marketing|  6050|              10|2024-03-22 14:43:...|
|   Judy| 31|             Finance|  7150|              10|2024-03-22 14:43:...|
|  Karen| 50|Information Techn...|  7700

### 4.Pivot & Unpivot

In [43]:
olympics = [("United States", "Gold", 39), 
             ("United States", "Silver", 41),
             ("United States", "Bronze", 33),
             ("China", "Gold", 38), 
             ("China", "Silver", 32),
             ("China", "Bronze", 19),
             ("Japan", "Gold", 27), 
             ("Japan", "Silver", 14),
             ("Japan", "Bronze", 17),
             ("Great Britain", "Gold", 22), 
             ("Great Britain", "Silver", 20),
             ("Great Britain", "Bronze", 22)          
             ]


col = ['Country', 'medal', 'count']

olympicsdf = spark.createDataFrame(data = olympics,schema = col)

olympicsdf.printSchema()
olympicsdf.show()

root
 |-- Country: string (nullable = true)
 |-- medal: string (nullable = true)
 |-- count: long (nullable = true)

+-------------+------+-----+
|      Country| medal|count|
+-------------+------+-----+
|United States|  Gold|   39|
|United States|Silver|   41|
|United States|Bronze|   33|
|        China|  Gold|   38|
|        China|Silver|   32|
|        China|Bronze|   19|
|        Japan|  Gold|   27|
|        Japan|Silver|   14|
|        Japan|Bronze|   17|
|Great Britain|  Gold|   22|
|Great Britain|Silver|   20|
|Great Britain|Bronze|   22|
+-------------+------+-----+



##### groupBy

In [44]:
pivotDF = olympicsdf.groupBy("Country").sum("count")
pivotDF.printSchema()
pivotDF.show(truncate=False)

root
 |-- Country: string (nullable = true)
 |-- sum(count): long (nullable = true)

+-------------+----------+
|Country      |sum(count)|
+-------------+----------+
|United States|113       |
|China        |89        |
|Japan        |58        |
|Great Britain|64        |
+-------------+----------+



##### groupBy + pivot

In [45]:
pivotDF = olympicsdf.groupBy("Country").pivot("medal").sum("count")
pivotDF.printSchema()
pivotDF.show(truncate=False)


pivotDF1 = olympicsdf.groupBy("Country").pivot("medal",['Gold','Silver']).sum("count")
pivotDF1.show(truncate=False)

root
 |-- Country: string (nullable = true)
 |-- Bronze: long (nullable = true)
 |-- Gold: long (nullable = true)
 |-- Silver: long (nullable = true)

+-------------+------+----+------+
|Country      |Bronze|Gold|Silver|
+-------------+------+----+------+
|Great Britain|22    |22  |20    |
|United States|33    |39  |41    |
|China        |19    |38  |32    |
|Japan        |17    |27  |14    |
+-------------+------+----+------+

+-------------+----+------+
|Country      |Gold|Silver|
+-------------+----+------+
|Great Britain|22  |20    |
|United States|39  |41    |
|China        |38  |32    |
|Japan        |27  |14    |
+-------------+----+------+



##### unpivot + stack

In [46]:
from pyspark.sql.functions import expr
unpivotExpr = "stack(3, 'G', Gold, 'S', Silver, 'B', Bronze) as (medal,Total)"
unPivotDF = pivotDF.select("Country", expr(unpivotExpr)) 
unPivotDF.show()

+-------------+-----+-----+
|      Country|medal|Total|
+-------------+-----+-----+
|Great Britain|    G|   22|
|Great Britain|    S|   20|
|Great Britain|    B|   22|
|United States|    G|   39|
|United States|    S|   41|
|United States|    B|   33|
|        China|    G|   38|
|        China|    S|   32|
|        China|    B|   19|
|        Japan|    G|   27|
|        Japan|    S|   14|
|        Japan|    B|   17|
+-------------+-----+-----+

