<a href="https://colab.research.google.com/github/SanjayJanardhan-89/ApacheSparkHandsOn/blob/main/ComplexDatatypes.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup Pyspark


In [1]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import os
import sys
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"


import findspark
findspark.init()
findspark.find()

import pyspark
from pyspark.sql import SparkSession

spark= SparkSession \
       .builder \
       .appName("OurSparkApp") \
       .getOrCreate()

spark

[33m0% [Working][0m            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
            Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
[33m0% [Connecting to archive.ubuntu.com (185.125.190.81)] [Waiting for headers] [1[0m[33m0% [Connecting to archive.ubuntu.com (185.125.190.81)] [Waiting for headers] [W[0m[33m0% [Connecting to archive.ubuntu.com (185.125.190.81)] [Waiting for headers] [W[0m                                                                               Get:3 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [1,804 kB]
Get:7 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:8 https://r2u.stat.illinois.edu/ubuntu 

# Play with Arrays


## Append an item to array(array_append)

In [11]:
from pyspark.sql.functions import array_append

data = [(1, ["apple", "banana"]), (2, ["orange", "grape"])]

df = spark.createDataFrame(data, ["id", "fruits"])

print("Original DataFrame:")
df.show(truncate=False)

df_with_new_fruit = df.withColumn("fruits", array_append(df["fruits"], "kiwi"))

print("Updated DataFrame with new fruit:")
df_with_new_fruit.show(truncate=False)


Original DataFrame:
+---+---------------+
|id |fruits         |
+---+---------------+
|1  |[apple, banana]|
|2  |[orange, grape]|
+---+---------------+

Updated DataFrame with new fruit:
+---+---------------------+
|id |fruits               |
+---+---------------------+
|1  |[apple, banana, kiwi]|
|2  |[orange, grape, kiwi]|
+---+---------------------+



## Length of the array(size)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import size

spark = SparkSession.builder.getOrCreate()

data = [
    (1, [1, 2, 3]),
    (2, [4, 5]),
    (3, [])
]
df = spark.createDataFrame(data, ["id", "numbers"])

df.show(truncate=False)

from pyspark.sql.functions import size, col

df_with_size = df.withColumn("numbers_size", size("numbers"))
df_with_size = df_with_size.withColumn("numbers_size_2", size(col("numbers")))

df_with_size.show()

+---+---------+
|id |numbers  |
+---+---------+
|1  |[1, 2, 3]|
|2  |[4, 5]   |
|3  |[]       |
+---+---------+

+---+---------+------------+--------------+
| id|  numbers|numbers_size|numbers_size_2|
+---+---------+------------+--------------+
|  1|[1, 2, 3]|           3|             3|
|  2|   [4, 5]|           2|             2|
|  3|       []|           0|             0|
+---+---------+------------+--------------+



## Fist item from array(element_at)

In [8]:
from pyspark.sql.functions import element_at
from pyspark.sql.functions import col

# Example DataFrame with an array column

data = [
(1, [10, 20, 30]),
(2, [40, 50, 60]),
(3, [70, 80, 90])
]

schema = ["id", "numbers"]

df = spark.createDataFrame(data, schema=schema)
df.show()

# Select the first element from the array column 'numbers'
df.select(element_at(col("numbers"), 2).alias("first_element")).show()

+---+------------+
| id|     numbers|
+---+------------+
|  1|[10, 20, 30]|
|  2|[40, 50, 60]|
|  3|[70, 80, 90]|
+---+------------+

+-------------+
|first_element|
+-------------+
|           20|
|           50|
|           80|
+-------------+



## Create array using Spark SQL

In [None]:
df_sql_array = spark.sql("SELECT array('KGF 1', 'KGF 2', 'Autograph', 'Kicha','Hucha') as movies")
df_sql_array.printSchema()
df_sql_array.show(truncate=False)

root
 |-- movies: array (nullable = false)
 |    |-- element: string (containsNull = false)

+---------------------------------------+
|movies                                 |
+---------------------------------------+
|[KGF 1, KGF 2, Autograph, Kicha, Hucha]|
+---------------------------------------+



In [None]:
from pyspark.sql.types import StructType,StructField, StringType, ArrayType,MapType

data = [
          [
            ["KGF 1", "KGF 2", "Autograph", "Kicha","Hucha"],
            # ["Hello", "Hi"]
          ]
        ]

# Schema
schema = StructType([
             StructField('movies', ArrayType(StringType()), True),
     ])

# Create DataFrame
df_array = spark.createDataFrame(data = data, schema = schema)
df_array.printSchema()
df_array.show(truncate=False) # shows all columns

root
 |-- movies: array (nullable = true)
 |    |-- element: string (containsNull = true)

+---------------------------------------+
|movies                                 |
+---------------------------------------+
|[KGF 1, KGF 2, Autograph, Kicha, Hucha]|
+---------------------------------------+



## Merge 2 arrays(concat)


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat, array

spark = SparkSession.builder.getOrCreate()

data = [
    (1, [1, 2], [1,3, 4]),
    (2, [5, 6], [7, 8]),
]
df = spark.createDataFrame(data, ["id", "arr1", "arr2"])

# concat arrays
df2 = df.withColumn("merged", concat("arr1", "arr2"))

df2.show(truncate=False)


+---+------+---------+---------------+
|id |arr1  |arr2     |merged         |
+---+------+---------+---------------+
|1  |[1, 2]|[1, 3, 4]|[1, 2, 1, 3, 4]|
|2  |[5, 6]|[7, 8]   |[5, 6, 7, 8]   |
+---+------+---------+---------------+



In [None]:
##Concat cant not handle nested array

from pyspark.sql.functions import flatten, concat

df = spark.createDataFrame([
    (1, [[1, 2], [3, 4]]),
    (2, [[5, 6]])
], ["id", "nested_array"])

df.select("id","nested_array",concat("nested_array")).show(truncate=False)


+---+----------------+--------------------+
|id |nested_array    |concat(nested_array)|
+---+----------------+--------------------+
|1  |[[1, 2], [3, 4]]|[[1, 2], [3, 4]]    |
|2  |[[5, 6]]        |[[5, 6]]            |
+---+----------------+--------------------+



## Flatten Nested Array(flatten)


In [None]:
from pyspark.sql.functions import flatten

df = spark.createDataFrame([
    (1, [[1, 2], [3, 4]]),
    (2, [[5, 6]])
], ["id", "nested_array"])

df.select(flatten("nested_array")).show(truncate=False)


+---------------------+
|flatten(nested_array)|
+---------------------+
|[1, 2, 3, 4]         |
|[5, 6]               |
+---------------------+



In [None]:
from pyspark.sql.functions import flatten

# Sample data: each row includes a product ID and nested arrays of review key phrases
data = [
    (1, [["great battery life", "sleek design"], ["heavy", "expensive"], ["sleek design"]]),
    (2, [["easy to install", "value for money"], ["requires maintenance"]])
]

# Create DataFrame
df = spark.createDataFrame(data, ["product_id", "reviews"])

# Show the original DataFrame
df.show(truncate=False)

# Flatten the nested array of reviews into a single array per product
flattened_df = df.withColumn("flattened_reviews", flatten(df["reviews"]))

# Show the DataFrame with the flattened reviews
flattened_df.show(truncate=False)

+----------+------------------------------------------------------------------------+
|product_id|reviews                                                                 |
+----------+------------------------------------------------------------------------+
|1         |[[great battery life, sleek design], [heavy, expensive], [sleek design]]|
|2         |[[easy to install, value for money], [requires maintenance]]            |
+----------+------------------------------------------------------------------------+

+----------+------------------------------------------------------------------------+------------------------------------------------------------------+
|product_id|reviews                                                                 |flattened_reviews                                                 |
+----------+------------------------------------------------------------------------+------------------------------------------------------------------+
|1         |[[great batt

In [None]:
from pyspark.sql.functions import flatten
# Sample DataFrame creation
recs_df = spark.createDataFrame([
    (1, [["prodA", "prodB"], ["prodC"]]),
    (2, [["prodD"], ["prodE", "prodF"]])
], ["user_id", "recommendations"])

# without flattening
recs_df.show(truncate=False)

flattened_recs = recs_df.select("user_id", flatten("recommendations").alias("all_recs"))
flattened_recs.show(truncate=False)

+-------+-------------------------+
|user_id|recommendations          |
+-------+-------------------------+
|1      |[[prodA, prodB], [prodC]]|
|2      |[[prodD], [prodE, prodF]]|
+-------+-------------------------+

+-------+---------------------+
|user_id|all_recs             |
+-------+---------------------+
|1      |[prodA, prodB, prodC]|
|2      |[prodD, prodE, prodF]|
+-------+---------------------+



## Explode-values in array to rows(explode)


In [None]:
from pyspark.sql.functions import explode
df_exploded = recs_df.withColumn("all_recs", explode(flatten("recommendations")))
df_exploded.show(truncate=False)

+-------+-------------------------+--------+
|user_id|recommendations          |all_recs|
+-------+-------------------------+--------+
|1      |[[prodA, prodB], [prodC]]|prodA   |
|1      |[[prodA, prodB], [prodC]]|prodB   |
|1      |[[prodA, prodB], [prodC]]|prodC   |
|2      |[[prodD], [prodE, prodF]]|prodD   |
|2      |[[prodD], [prodE, prodF]]|prodE   |
|2      |[[prodD], [prodE, prodF]]|prodF   |
+-------+-------------------------+--------+



In [None]:
from pyspark.sql.functions import explode
df_exploded = flattened_recs.withColumn("all_recs", explode("all_recs"))
df_exploded.show()

+-------+--------+
|user_id|all_recs|
+-------+--------+
|      1|   prodA|
|      1|   prodB|
|      1|   prodC|
|      2|   prodD|
|      2|   prodE|
|      2|   prodF|
+-------+--------+



In [None]:
from pyspark.sql.functions import explode_outer

# Handling nulls and empties
nullable_df = spark.createDataFrame([
    (1, ["apple", "banana"]),
    (2, []),
    (3, None)
], ["id", "fruits"])

# Applying explode_outer
nullable_exploded = nullable_df.select("id", explode("fruits").alias("fruit"))
nullable_exploded.show()


nullable_df.select("id", explode_outer("fruits").alias("fruit")).show()

+---+------+
| id| fruit|
+---+------+
|  1| apple|
|  1|banana|
+---+------+

+---+------+
| id| fruit|
+---+------+
|  1| apple|
|  1|banana|
|  2|  NULL|
|  3|  NULL|
+---+------+



## Merge arrays - Using array_union(Variation of Concat)


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import array_union

spark = SparkSession.builder.getOrCreate()

data = [
    (1, [1, 2, 3], [3, 4, 5]),
    (2, [5, 6], [6, 7, 8])
]
df = spark.createDataFrame(data, ["id", "arr1", "arr2"])

df2 = df.withColumn("union_array", array_union("arr1", "arr2"))

df2.show(truncate=False)

+---+---------+---------+---------------+
|id |arr1     |arr2     |union_array    |
+---+---------+---------+---------------+
|1  |[1, 2, 3]|[3, 4, 5]|[1, 2, 3, 4, 5]|
|2  |[5, 6]   |[6, 7, 8]|[5, 6, 7, 8]   |
+---+---------+---------+---------------+



## From Certificaiton

### Sample Data

In [12]:
# Sample DataFrame
data = [
    (1, "apple banana cherry"),
    (2, "grape orange mango"),
    (3, "kiwi"),
    (4, None)
]
columns = ["id", "desc"]

df = spark.createDataFrame(data, columns)

# Show original DataFrame
print("Original DataFrame:")
df.show()

Original DataFrame:
+---+-------------------+
| id|               desc|
+---+-------------------+
|  1|apple banana cherry|
|  2| grape orange mango|
|  3|               kiwi|
|  4|               NULL|
+---+-------------------+



In [13]:
from pyspark.sql.functions import split, col

# Split 'desc' column into an array of words
df_with_array = df.withColumn("array_col", split(col("desc"), " "))

# Show DataFrame with the new array column
print("DataFrame with 'array_col' (split into an array):")
df_with_array.show(truncate=False)

DataFrame with 'array_col' (split into an array):
+---+-------------------+-----------------------+
|id |desc               |array_col              |
+---+-------------------+-----------------------+
|1  |apple banana cherry|[apple, banana, cherry]|
|2  |grape orange mango |[grape, orange, mango] |
|3  |kiwi               |[kiwi]                 |
|4  |NULL               |NULL                   |
+---+-------------------+-----------------------+



In [16]:
# Access the first element of the array column
df_first_element = df_with_array.select(col("id"), col("array_col")[1].alias("first_element"))

# Show DataFrame with the first element of the array
print("DataFrame with first element of 'array_col':")
df_first_element.show()

DataFrame with first element of 'array_col':
+---+-------------+
| id|first_element|
+---+-------------+
|  1|       banana|
|  2|       orange|
|  3|         NULL|
|  4|         NULL|
+---+-------------+



In [17]:
from pyspark.sql.functions import size

# Calculate the length of the array column
df_array_size = df_with_array.select(col("id"), \
    size(col("array_col")).alias("array_length"))

# Show DataFrame with the array length
print("DataFrame with array length:")
df_array_size.show()

DataFrame with array length:
+---+------------+
| id|array_length|
+---+------------+
|  1|           3|
|  2|           3|
|  3|           1|
|  4|          -1|
+---+------------+



### array_contains

In [18]:
from pyspark.sql.functions import array_contains

# Check if the array contains the word 'banana'
df_array_contains = df_with_array.select(
    col("id"),
    array_contains(col("array_col"), "banana").alias("contains_banana")
)

# Show DataFrame with the result of array_contains
print("DataFrame with array contains 'banana':")
df_array_contains.show()

DataFrame with array contains 'banana':
+---+---------------+
| id|contains_banana|
+---+---------------+
|  1|           true|
|  2|          false|
|  3|          false|
|  4|           NULL|
+---+---------------+



# Play with Map


**Creating Map type**

In [None]:
df_sql_map = spark.sql("SELECT map('Building','500 CR', 'Commercal',100) as income")
df_sql_map.printSchema()
df_sql_map.show(truncate=False)

root
 |-- income: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = false)

+--------------------------------------+
|income                                |
+--------------------------------------+
|{Building -> 500 CR, Commercal -> 100}|
+--------------------------------------+



In [None]:
from pyspark.sql import Row

df = spark.createDataFrame([
                              Row({"Bank":"100 CR", "Business":"50 CR", "Land":"150 CR"}),
                              Row({"Others":"300 CR"}),
                              Row({"Building":"500 CR", "Commercal":100}),
                              Row({"Building":100 , "Commercal":100}),
                            ]
                            , ["Assests"])
df.show(truncate=False)

+---------------------------------------------------+
|Assests                                            |
+---------------------------------------------------+
|{Bank -> 100 CR, Land -> 150 CR, Business -> 50 CR}|
|{Others -> 300 CR}                                 |
|{Building -> 500 CR, Commercal -> 100}             |
|{Building -> 100, Commercal -> 100}                |
+---------------------------------------------------+



In [None]:
from pyspark.sql.types import StructType,StructField, StringType, ArrayType,MapType


data = [

          ({"Bank":"100 CR", "Business":"50 CR", "Land":"150 CR"}),
          ({"Others":"300 CR"}),
          # ({"Building":"500 CR", "Commercal":100}),
          # ({"Building":"500 CR", "Commercal":100}),

      ]


# Schema
schema = StructType([
               StructField('properties', MapType(StringType(),StringType()), True)
        ])

# Create DataFrame
df_map = spark.createDataFrame(data = data, schema = schema)
df_map.printSchema()
df_map.show(truncate=False) # shows all columns

root
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+----------+
|properties|
+----------+
|NULL      |
|NULL      |
+----------+



In [None]:
|spark.sql("SELECT struct(1, 2, 3) as ex_struct")

DataFrame[ex_struct: struct<col1:int,col2:int,col3:int>]

## Retrieve all keys from a map column(map_keys)

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, MapType
from pyspark.sql.functions import col, map_keys

# Define MapType

schema = StructType([
StructField("id", IntegerType(), True),
StructField("attributes", MapType(StringType(), StringType()), True)
])

# Sample Data

data = [(1, {"Key1": "Value1", "Key2": "Value2"}), (2, {"Key3": "Value3", "Key4": "Value4"})]

df = spark.createDataFrame(data, schema)
df.show(truncate=False)
df.select(map_keys(col("attributes")).alias("keys")).show()

+---+--------------------------------+
|id |attributes                      |
+---+--------------------------------+
|1  |{Key2 -> Value2, Key1 -> Value1}|
|2  |{Key4 -> Value4, Key3 -> Value3}|
+---+--------------------------------+

+------------+
|        keys|
+------------+
|[Key2, Key1]|
|[Key4, Key3]|
+------------+



## Create map(create_map)

In [10]:
import pyspark.sql.functions as F

df.withColumn(
    "my_map",
    F.create_map(
        F.lit("A"), F.lit(1),
        F.lit("B"), F.lit(2)
    )
)

DataFrame[product: string, sales: double, my_map: map<string,int>]

## Merge map(map_concat)


In [None]:
from pyspark.sql.functions import map_concat

# Sample Data
data = [
    (1, {"a": 1, "b": 2}, {"c": 3, "d": 4}),
    (2, {"x": 10}, {"y": 20, "z": 30}),
    (3, {}, {"p": 100, "q": 200}),
    (4, {"k": 5}, {})
]

df = spark.createDataFrame(data, ["id", "map1", "map2"])

print("before the merge")
df.show(truncate=False)

# Merge the two map columns
df_merged = df.withColumn("merged_map", map_concat("map1", "map2"))

print("after the merge")
df_merged.show(truncate=False)


before the merge
+---+----------------+--------------------+
|id |map1            |map2                |
+---+----------------+--------------------+
|1  |{a -> 1, b -> 2}|{d -> 4, c -> 3}    |
|2  |{x -> 10}       |{y -> 20, z -> 30}  |
|3  |{}              |{p -> 100, q -> 200}|
|4  |{k -> 5}        |{}                  |
+---+----------------+--------------------+

after the merge
+---+----------------+--------------------+--------------------------------+
|id |map1            |map2                |merged_map                      |
+---+----------------+--------------------+--------------------------------+
|1  |{a -> 1, b -> 2}|{d -> 4, c -> 3}    |{a -> 1, b -> 2, d -> 4, c -> 3}|
|2  |{x -> 10}       |{y -> 20, z -> 30}  |{x -> 10, y -> 20, z -> 30}     |
|3  |{}              |{p -> 100, q -> 200}|{p -> 100, q -> 200}            |
|4  |{k -> 5}        |{}                  |{k -> 5}                        |
+---+----------------+--------------------+--------------------------------

In [21]:
from pyspark.sql.types import StructType, StructField, StringType, \
IntegerType
from pyspark.sql.types import MapType

# Define MapType
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("metadata", MapType(StringType(), StringType()), True)
])

# Sample Data
data = [(1, {"Key1": "Value1", "Key2": "Value2"}),
        (2, {"Key1": "Value3", "Key2": "Value4"})]

df = spark.createDataFrame(data, schema)
df.show(truncate=False)

df.select(df.metadata.getItem("Key1").alias("value_of_key1")).show()


# Explode the metadata column while retaining the original metadata column
df_exploded = df.selectExpr("id", "metadata", "explode(metadata) as \
(key, value)")

# Show the resulting DataFrame
df_exploded.show(truncate=False)

+---+--------------------------------+
|id |metadata                        |
+---+--------------------------------+
|1  |{Key2 -> Value2, Key1 -> Value1}|
|2  |{Key2 -> Value4, Key1 -> Value3}|
+---+--------------------------------+

+-------------+
|value_of_key1|
+-------------+
|       Value1|
|       Value3|
+-------------+

+---+--------------------------------+----+------+
|id |metadata                        |key |value |
+---+--------------------------------+----+------+
|1  |{Key2 -> Value2, Key1 -> Value1}|Key2|Value2|
|1  |{Key2 -> Value2, Key1 -> Value1}|Key1|Value1|
|2  |{Key2 -> Value4, Key1 -> Value3}|Key2|Value4|
|2  |{Key2 -> Value4, Key1 -> Value3}|Key1|Value3|
+---+--------------------------------+----+------+



# Play with Struct


In [None]:
df_struct = spark.sql("SELECT struct(1, 2, '3') as ex_struct")
df_struct.show()

+---------+
|ex_struct|
+---------+
|{1, 2, 3}|
+---------+



In [None]:
df_struct.select("ex_struct.col3", "ex_struct").show()

+----+---------+
|col3|ex_struct|
+----+---------+
|   3|{1, 2, 3}|
+----+---------+



In [None]:
df_struct.select("ex_struct.*").show(truncate=False)

+----+----+----+
|col1|col2|col3|
+----+----+----+
|1   |2   |3   |
+----+----+----+



In [None]:
df_map = spark.sql("SELECT map(1.0, '2', 3.0, '4') as ex_map")
df_map.show()

+--------------------+
|              ex_map|
+--------------------+
|{1.0 -> 2, 3.0 -> 4}|
+--------------------+



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, ArrayType,MapType

# https://sparkbyexamples.com/pyspark/pyspark-maptype-dict-examples/

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

# Data
data = [
        (("James",None,"Smith"),"OH","M"),
        (("Anna","Rose",""),"NY","F"),
        (("Julia","","Williams"),"OH","F"),
        (("Maria","Anne","Jones"),"NY","M"),
        (("Jen","Mary","Brown"),"NY","M"),
        (("Mike","Mary","Williams"),"OH","M")
        ]

# Schema
schema = StructType([
    StructField('name', StructType([
         StructField('firstname', StringType(), True),
         StructField('middlename', StringType(), True),
         StructField('lastname', StringType(), True)
         ])),
     StructField('state', StringType(), True),
     StructField('gender', StringType(), True)
     ])

# Create DataFrame
df2 = spark.createDataFrame(data = data, schema = schema)
df2.printSchema()
df2.show(truncate=False) # shows all columns

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)

+----------------------+-----+------+
|name                  |state|gender|
+----------------------+-----+------+
|{James, NULL, Smith}  |OH   |M     |
|{Anna, Rose, }        |NY   |F     |
|{Julia, , Williams}   |OH   |F     |
|{Maria, Anne, Jones}  |NY   |M     |
|{Jen, Mary, Brown}    |NY   |M     |
|{Mike, Mary, Williams}|OH   |M     |
+----------------------+-----+------+



In [None]:
df2.select("name").show()

+--------------------+
|                name|
+--------------------+
|{James, NULL, Smith}|
|      {Anna, Rose, }|
| {Julia, , Williams}|
|{Maria, Anne, Jones}|
|  {Jen, Mary, Brown}|
|{Mike, Mary, Will...|
+--------------------+



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, ArrayType,MapType
from pyspark.sql.functions import element_at

data = [
        ([("Yash","K",None),("Yash2","K2",None)],"BL","M",["KGF 1", "KGF 2"],{"Bank":"100 CR", "Business":"50 CR", "Land":"150 CR"}),
        ([("Sudeep","Kicha","S")],"DL","M",["Autograph", "Kicha","Hucha"],{"Others":"300 CR"}),
        ([("Puneeth",None,"Raj")],"MB","M",[], {"Building":"500 CR", "Commercal":100}),
        ([("Darshan",None,None)],"MB","M",[], {"Building":"500 CR", "Commercal":100}),

        ]

# Schema
schema = StructType([
    StructField(    'name'
                  , ArrayType(
                        StructType([
                            StructField('firstname', StringType(), True),
                            StructField('middlename', StringType(), True),
                            StructField('lastname', StringType(), True)
                        ])
                    ), True),
     StructField('state', StringType(), True),
     StructField('gender', StringType(), True),
     StructField('movies', ArrayType(StringType()), True),
     StructField('properties', MapType(StringType(),StringType()), True)
     ])

# Create DataFrame
df2 = spark.createDataFrame(data = data, schema = schema)
df2.printSchema()
df2.show(truncate=False) # shows all columns

root
 |-- name: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- firstname: string (nullable = true)
 |    |    |-- middlename: string (nullable = true)
 |    |    |-- lastname: string (nullable = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- movies: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+------------------------------------+-----+------+-------------------------+---------------------------------------------------+
|name                                |state|gender|movies                   |properties                                         |
+------------------------------------+-----+------+-------------------------+---------------------------------------------------+
|[{Yash, K, NULL}, {Yash2, K2, NULL}]|BL   |M     |[KGF 1, KGF 2]           |{Bank -> 100 CR, La

In [None]:
 (df2
  .select("properties", "name", "name.firstname")
  .withColumn("M-Building", col("properties").getItem("Building"))
  .withColumn("M-Commercal ", col("properties").getItem("Commercal"))
  .withColumn("M-Others ", col("properties").getItem("Others"))
  .withColumn("S-FName", col("name.firstname"))
  ).show()

+--------------------+--------------------+---------+----------+------------+---------+-------+
|          properties|                name|firstname|M-Building|M-Commercal |M-Others |S-FName|
+--------------------+--------------------+---------+----------+------------+---------+-------+
|{Bank -> 100 CR, ...|     {Yash, K, NULL}|     Yash|      NULL|        NULL|     NULL|   Yash|
|  {Others -> 300 CR}|  {Sudeep, Kicha, S}|   Sudeep|      NULL|        NULL|   300 CR| Sudeep|
|{Building -> 500 ...|{Puneeth, NULL, Raj}|  Puneeth|    500 CR|         100|     NULL|Puneeth|
|{Building -> 500 ...|{Darshan, NULL, N...|  Darshan|    500 CR|         100|     NULL|Darshan|
+--------------------+--------------------+---------+----------+------------+---------+-------+



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, ArrayType,MapType
from pyspark.sql.functions import element_at

data = [
        (("Yash","K",None),"BL","M",["KGF 1", "KGF 2"],{"Bank":"100 CR", "Business":"50 CR", "Land":"150 CR"}),
        (("Sudeep","Kicha","S"),"DL","M",["Autograph", "Kicha","Hucha"],{"Others":"300 CR"}),
        (("Puneeth",None,"Raj"),"MB","M",[], {"Building":"500 CR", "Commercal":100}),
        (("Darshan",None,None),"MB","M",[], {"Building":"500 CR", "Commercal":100}),

        ]

# Schema
schema = StructType([
    StructField('name', StructType([
         StructField('firstname', StringType(), True),
         StructField('middlename', StringType(), True),
         StructField('lastname', StringType(), True)
         ])),
     StructField('state', StringType(), True),
     StructField('gender', StringType(), True),
     StructField('movies', ArrayType(StringType()), True),
     StructField('properties', MapType(StringType(),StringType()), True)
     ])

# Create DataFrame
df2 = spark.createDataFrame(data = data, schema = schema)
df2.printSchema()
df2.show(truncate=False) # shows all columns

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- movies: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+---------------------+-----+------+-------------------------+---------------------------------------------------+
|name                 |state|gender|movies                   |properties                                         |
+---------------------+-----+------+-------------------------+---------------------------------------------------+
|{Yash, K, NULL}      |BL   |M     |[KGF 1, KGF 2]           |{Bank -> 100 CR, Land -> 150 CR, Business -> 50 CR}|
|{Sudeep, Kicha, S}   |DL   |M     |[Autograph, Kicha, Hucha]|{Others -> 300 CR}        

In [None]:
from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema = StructType([
  StructField("DEST_COUNTRY_NAME", StringType(), True),
  StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
  StructField("count", LongType(), False, metadata={"hello":"world"})
])
df = spark.read.format("json").schema(myManualSchema)\
  .load("sample_data/2015-summary.json")

df.show()

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/content/sample_data/2015-summary.json.

In [None]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [None]:
spark.read.format("json").load("sample_data/2015-summary.json").schema

StructType([StructField('DEST_COUNTRY_NAME', StringType(), True), StructField('ORIGIN_COUNTRY_NAME', StringType(), True), StructField('count', LongType(), True)])

## Merge struct(no built in method)


In [None]:
from pyspark.sql import Row

data = [
    Row(id=1, s1=Row(a=10, b=20), s2=Row(c=30, d=40)),
    Row(id=2, s1=Row(a=100, b=200), s2=Row(c=300, d=400))
]

df = spark.createDataFrame(data)

df.show(truncate=False)
df.printSchema()


from pyspark.sql.functions import col, struct

merged_df = df.withColumn(
    "merged_struct",
    struct(
        col("s1.a").alias("a"),
        col("s1.b").alias("b"),
        col("s2.c").alias("c"),
        col("s2.d").alias("d")
    )
)

merged_df.select("id", "merged_struct").show(truncate=False)
merged_df.printSchema()


+---+----------+----------+
|id |s1        |s2        |
+---+----------+----------+
|1  |{10, 20}  |{30, 40}  |
|2  |{100, 200}|{300, 400}|
+---+----------+----------+

root
 |-- id: long (nullable = true)
 |-- s1: struct (nullable = true)
 |    |-- a: long (nullable = true)
 |    |-- b: long (nullable = true)
 |-- s2: struct (nullable = true)
 |    |-- c: long (nullable = true)
 |    |-- d: long (nullable = true)

+---+--------------------+
|id |merged_struct       |
+---+--------------------+
|1  |{10, 20, 30, 40}    |
|2  |{100, 200, 300, 400}|
+---+--------------------+

root
 |-- id: long (nullable = true)
 |-- s1: struct (nullable = true)
 |    |-- a: long (nullable = true)
 |    |-- b: long (nullable = true)
 |-- s2: struct (nullable = true)
 |    |-- c: long (nullable = true)
 |    |-- d: long (nullable = true)
 |-- merged_struct: struct (nullable = false)
 |    |-- a: long (nullable = true)
 |    |-- b: long (nullable = true)
 |    |-- c: long (nullable = true)
 |    |-- d: lo

In [12]:
from pyspark.sql.functions import col, struct

# Sample DataFrame with struct column "address" (city, zip)
data = [
    (1, ("New York", "10001")),
    (2, ("San Francisco", "94105")),
    (3, ("Chicago", "60601"))
]

# Define schema explicitly
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("address", StructType([
        StructField("city", StringType(), True),
        StructField("zip", StringType(), True)
    ]), True)
])

# Create DataFrame
df = spark.createDataFrame(data, schema)
print("Original Schema:")
df.printSchema()

# Rename the "zip" field to "zipcode"
df_transformed = df.withColumn("address", struct(
    col("address.city"),
    col("address.zip").alias("zipcode")  # Renaming zip to zipcode
))
print("Updated Schema:")
df_transformed.printSchema()

Original Schema:
root
 |-- id: integer (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- zip: string (nullable = true)

Updated Schema:
root
 |-- id: integer (nullable = true)
 |-- address: struct (nullable = false)
 |    |-- city: string (nullable = true)
 |    |-- zipcode: string (nullable = true)



# Other Examples



## Using expressions

In [None]:
from pyspark.sql import Row
from pyspark.sql.functions import col, expr

df = spark.createDataFrame([
  Row(name="san", salary=1500),
  Row(name="ana", salary=2000),
  Row(name="shu", salary=1000)
])

print("approach 1:")
df.withColumn("bonus", df.salary * 0.1).show()

print("approach 2:")
df.withColumn("bonus", expr("salary * 0.1")).show()

print("approach 3:")
df.select(col("name"), col("salary"), (col("salary") * 0.1).alias("bonus")).show()

approach 1:
+----+------+-----+
|name|salary|bonus|
+----+------+-----+
| san|  1500|150.0|
| ana|  2000|200.0|
| shu|  1000|100.0|
+----+------+-----+

approach 2:
+----+------+-----+
|name|salary|bonus|
+----+------+-----+
| san|  1500|150.0|
| ana|  2000|200.0|
| shu|  1000|100.0|
+----+------+-----+

approach 3:
+----+------+-----+
|name|salary|bonus|
+----+------+-----+
| san|  1500|150.0|
| ana|  2000|200.0|
| shu|  1000|100.0|
+----+------+-----+



## Nested fields fetching(struct)

In [None]:
data = [
(1, {"city": "New York", "state": "NY"}),
(2, {"city": "San Francisco", "state": "CA"})
]

schema = ["id", "address"]

df = spark.createDataFrame(data, schema=schema)

print("approach 1")
df.select(col("address.city")).show() #correct syntax

print("approach 2")
df.select("address.city").show() #correct syntax

print("approach 3")
df.select(col("address")["city"]).show() #correct syntax

print("approach 4")
(df.select("address")
   .select(col("address.city")).show()) #correct syntax

approach 1
+-------------+
|         city|
+-------------+
|     New York|
|San Francisco|
+-------------+

approach 2
+-------------+
|         city|
+-------------+
|     New York|
|San Francisco|
+-------------+

approach 3
+-------------+
|address[city]|
+-------------+
|     New York|
|San Francisco|
+-------------+

approach 4
+-------------+
|         city|
+-------------+
|     New York|
|San Francisco|
+-------------+



## SELECT mappings

In [None]:
from pyspark.sql.types import Row
from pyspark.sql.types import StructType, StructField, LongType, StringType

# Define Schema
schema = StructType([
    StructField("id", LongType(), True),
    StructField("name", StringType(), True),
    StructField("age", LongType(), True),
    StructField("salary", LongType(), True)
])

# Create Dummy Data
data = [
    Row(id=1, name="San", age=30, salary=70000),
    Row(id=2, name="Ana", age=35, salary=80000),
    Row(id=3, name="Shuchi", age=25, salary=50000),
    Row(id=4, name="Krishna", age=40, salary=90000)
]

# Create DataFrame
df = spark.createDataFrame(data, schema)
df.show()

+---+-------+---+------+
| id|   name|age|salary|
+---+-------+---+------+
|  1|    San| 30| 70000|
|  2|    Ana| 35| 80000|
|  3| Shuchi| 25| 50000|
|  4|Krishna| 40| 90000|
+---+-------+---+------+



In [None]:
from pyspark.sql.functions import expr

# Select 'name' and 'age', and create 'age_category' based on 'age'
df_expr = df.select(
    "name",
    "age",
    expr("CASE WHEN age >= 35 THEN 'Senior' \
ELSE 'Junior' END as age_category")
)
df_expr.show()


from pyspark.sql.functions import expr

# Select 'name' and 'age', and create 'age_category' based on 'age'
df_select_expr = df.selectExpr(
    "name",
    "age",
    "CASE WHEN age >= 35 THEN 'Senior' ELSE 'Junior' END as age_category"
)
df_select_expr.show()

+-------+---+------------+
|   name|age|age_category|
+-------+---+------------+
|    San| 30|      Junior|
|    Ana| 35|      Senior|
| Shuchi| 25|      Junior|
|Krishna| 40|      Senior|
+-------+---+------------+

+-------+---+------------+
|   name|age|age_category|
+-------+---+------------+
|    San| 30|      Junior|
|    Ana| 35|      Senior|
| Shuchi| 25|      Junior|
|Krishna| 40|      Senior|
+-------+---+------------+



In [None]:
from pyspark.sql.functions import col

# Add a new column 'age_plus_5'
df_with_age = df.withColumn("age_plus_5", col("age") + 5)
df_with_age.show()

+---+-------+---+------+----------+
| id|   name|age|salary|age_plus_5|
+---+-------+---+------+----------+
|  1|    San| 30| 70000|        35|
|  2|    Ana| 35| 80000|        40|
|  3| Shuchi| 25| 50000|        30|
|  4|Krishna| 40| 90000|        45|
+---+-------+---+------+----------+



In [None]:
# Cast 'salary' from LongType to DoubleType
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col

df_casted = df.withColumn("salary_double", col("salary").cast(DoubleType()))
df_casted.show()

+---+-------+---+------+-------------+
| id|   name|age|salary|salary_double|
+---+-------+---+------+-------------+
|  1|    San| 30| 70000|      70000.0|
|  2|    Ana| 35| 80000|      80000.0|
|  3| Shuchi| 25| 50000|      50000.0|
|  4|Krishna| 40| 90000|      90000.0|
+---+-------+---+------+-------------+



## UDF

In [None]:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import IntegerType

@pandas_udf(IntegerType())
def string_length(s: pd.Series) -> pd.Series:
    return s.str.len()

In [2]:
from pyspark.sql.functions import lit

df = spark.createDataFrame([(1, "Ram"), (2, "Santosh")], ["id", "name"])

df = df.withColumns({"age1": lit(10), "age2": lit(20)})

df.show()

+---+-------+----+----+
| id|   name|age1|age2|
+---+-------+----+----+
|  1|    Ram|  10|  20|
|  2|Santosh|  10|  20|
+---+-------+----+----+



In [7]:
from pyspark.sql.functions import col
from pyspark.sql import Row


df = spark.createDataFrame([
    Row(id=1, name="santosh", age=30),
    Row(id=1, name="santosh", age=30),
    Row(id=2, name="ram", age=7),
    Row(id=3, name="abdul", age=5),
    Row(id=4, name="john", age=5),
    Row(id=5, name="john", age=5),
    Row(id=6, name="john", age=5),
    Row(id=7, name="john", age=5)
])

df.distinct().show()

df.dropDuplicates().show()

df.sort(col("id").desc()).dropDuplicates(subset=["name", "age"]).show()

df.sort(col("id").desc()).dropDuplicates(subset=["all"]).show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  2|    ram|  7|
|  3|  abdul|  5|
|  1|santosh| 30|
|  5|   john|  5|
|  4|   john|  5|
|  7|   john|  5|
|  6|   john|  5|
+---+-------+---+

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  2|    ram|  7|
|  3|  abdul|  5|
|  1|santosh| 30|
|  5|   john|  5|
|  4|   john|  5|
|  7|   john|  5|
|  6|   john|  5|
+---+-------+---+

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  2|    ram|  7|
|  1|santosh| 30|
|  7|   john|  5|
|  3|  abdul|  5|
+---+-------+---+



AnalysisException: Cannot resolve column name "all" among (id, name, age).

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

# Define the UDF using a lambda function
split_comma_udf = udf(lambda x: x.split(","), ArrayType(StringType()))

# Create some sample data
data = [("san,ana,roger,ram",), ("apple,nvidia,tesla",)]
df = spark.createDataFrame(data, ["names"])

# Apply the UDF to the DataFrame
df_with_split_names = df.withColumn("split_names", split_comma_udf(df["names"]))

# Show the results
df_with_split_names.show(truncate=False)

+------------------+----------------------+
|names             |split_names           |
+------------------+----------------------+
|san,ana,roger,ram |[san, ana, roger, ram]|
|apple,nvidia,tesla|[apple, nvidia, tesla]|
+------------------+----------------------+



In [4]:
from pyspark.sql import functions as F

# Sample data
data = [
    (1, None, "B"),
    (2, "A", "A"),
    (3, "C", "D"),
]

# Creating DataFrame
df = spark.createDataFrame(data, ["ID", "col1", "col2"])

# Applying NULL-handling functions
df.select(
    "ID",
    "col1",
    "col2",
    F.expr("ifnull(col1, 'default')").alias("IFNULL"),
    F.expr("nvl(col1, 'default')").alias("NVL"),
    F.expr("nullif(col1, col2)").alias("NULLIF"),
    F.expr("nvl2(col1, 'Not Null', 'Null')").alias("NVL2")
).show()

+---+----+----+-------+-------+------+--------+
| ID|col1|col2| IFNULL|    NVL|NULLIF|    NVL2|
+---+----+----+-------+-------+------+--------+
|  1|NULL|   B|default|default|  NULL|    Null|
|  2|   A|   A|      A|      A|  NULL|Not Null|
|  3|   C|   D|      C|      C|     C|Not Null|
+---+----+----+-------+-------+------+--------+



In [7]:
from pyspark.sql.functions import col

# Sample data for stores DataFrame
stores_data = [
    (1, "Store A", "NY"),
    (2, "Store B", "CA"),
    (3, "Store C", "NY"),
    (4, "Store D", "TX")
]
# Sample data for sales DataFrame
sales_data = [
    (1, 1500, "2024-01-01"),
    (2, 800, "2024-01-02"),
    (3, 1200, "2024-01-03"),
    (4, 500, "2024-01-04"),
    (1, 950, "2024-01-05"),
    (3, 1100, "2024-01-06")
]
# Define schema
stores_columns = ["store_id", "store_name", "location"]
sales_columns = ["store_id", "sales_amount", "sales_date"]

# Create DataFrames
stores = spark.createDataFrame(stores_data, stores_columns)
stores.show()
sales = spark.createDataFrame(sales_data, sales_columns)
sales.show()

# Perform the join and filter conditions
# stores.join(sales, "store_id").filter((stores.location == "NY") & (sales.sales_amount > 1000)).show()
# stores.join(sales, stores.store_id == sales.store_id).filter((stores.location == "NY") & (sales.sales_amount > 1000)).show()
stores.join(sales, "store_id").filter((stores.location == "NY") & (sales.sales_amount > 1000)).show()

+--------+----------+--------+
|store_id|store_name|location|
+--------+----------+--------+
|       1|   Store A|      NY|
|       2|   Store B|      CA|
|       3|   Store C|      NY|
|       4|   Store D|      TX|
+--------+----------+--------+

+--------+------------+----------+
|store_id|sales_amount|sales_date|
+--------+------------+----------+
|       1|        1500|2024-01-01|
|       2|         800|2024-01-02|
|       3|        1200|2024-01-03|
|       4|         500|2024-01-04|
|       1|         950|2024-01-05|
|       3|        1100|2024-01-06|
+--------+------------+----------+

+--------+----------+--------+------------+----------+
|store_id|store_name|location|sales_amount|sales_date|
+--------+----------+--------+------------+----------+
|       1|   Store A|      NY|        1500|2024-01-01|
|       3|   Store C|      NY|        1200|2024-01-03|
|       3|   Store C|      NY|        1100|2024-01-06|
+--------+----------+--------+------------+----------+



In [8]:
# Sample Data
data = [
    (1, "A", 50),
    (2, "B", 120),
    (3, "C", 200),
    (4, "D", 90),
    (5, "E", 150),
    (6, "F", 300),
    (7, "G", 400),
    (8, "H", 75),
    (9, "I", 110),
    (10, "J", 500)
]

# Define schema
columns = ["id", "category", "value"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

# Split DataFrame into training (70%) and testing (30%)
train_df, test_df = df.randomSplit([0.7, 0.3], seed=42)

# Show results
print("Training Data:")
train_df.show()

print("Testing Data:")
test_df.show()

Training Data:
+---+--------+-----+
| id|category|value|
+---+--------+-----+
|  1|       A|   50|
|  2|       B|  120|
|  4|       D|   90|
|  5|       E|  150|
|  7|       G|  400|
|  8|       H|   75|
|  9|       I|  110|
| 10|       J|  500|
+---+--------+-----+

Testing Data:
+---+--------+-----+
| id|category|value|
+---+--------+-----+
|  3|       C|  200|
|  6|       F|  300|
+---+--------+-----+



In [9]:
from pyspark.sql import Row
from pyspark.sql.functions import col, avg, when

df = spark.createDataFrame([
  Row(name="san", age = 30),
  Row(name="bob", age = 20),
  Row(name="alice", age = 10),
  Row(name="charlie", age = None)
])

df = df.na.fill({"age": df.agg({"age": "avg"}).collect()[0][0]})

df.show()


+-------+---+
|   name|age|
+-------+---+
|    san| 30|
|    bob| 20|
|  alice| 10|
|charlie| 20|
+-------+---+



In [19]:
from pyspark.sql.functions import col

# Create Left DataFrame
data1 = [(1, "Alice"), (2, "Bob"), (3, "Charlie"), (4, "David")]
df1 = spark.createDataFrame(data1, ["id", "name"])

# Create Right DataFrame
data2 = [(1, "Alice"), (3, "Charlie")]
df2 = spark.createDataFrame(data2, ["id", "name"])

print("data type of the id column:")
print(df2.schema["id"].dataType)

print("\ndata type of the name column:")
print(df2.schema["name"].dataType)

data type of the id column:
LongType()

data type of the name column:
StringType()


# Partition

In [3]:
from pyspark.sql import functions as F

# Create a DataFrame with 10 million records
df = spark.range(10_000_000).select(
       F.concat(F.lit("Product_"), F.floor(F.rand() * 4)\
.cast("int")).alias("product"),
       F.floor(F.rand() * 1000).cast("double").alias("sales")
)

In [4]:
from pyspark.sql.functions import spark_partition_id

# Assume 'df' is your DataFrame
num_partitions = df.select(spark_partition_id().alias("partition_id")) \
                   .distinct() \
                   .count()

print(f"Number of partitions: {num_partitions}")

Number of partitions: 2


In [5]:
from pyspark.sql.functions  import spark_partition_id
df.withColumn("partitionId", spark_partition_id()).\
groupBy("partitionId").count().show()

+-----------+-------+
|partitionId|  count|
+-----------+-------+
|          0|5000000|
|          1|5000000|
+-----------+-------+



In [6]:
df_re_partition = df.repartition(4)

In [7]:
from pyspark.sql.functions  import spark_partition_id
df_re_partition.withColumn("partitionId", spark_partition_id()).\
groupBy("partitionId").count().show()

+-----------+-------+
|partitionId|  count|
+-----------+-------+
|          0|2500000|
|          1|2500000|
|          2|2500000|
|          3|2500000|
+-----------+-------+



In [8]:
df_coalesce = df.coalesce(1)

In [9]:
from pyspark.sql.functions  import spark_partition_id
df_coalesce.withColumn("partitionId", spark_partition_id()).\
groupBy("partitionId").count().show()

+-----------+--------+
|partitionId|   count|
+-----------+--------+
|          0|10000000|
+-----------+--------+

