##### PySpark transform() Function with Example
pyspark.sql.DataFrame.transform() – Available since Spark 3.0
pyspark.sql.functions.transform()

The pyspark.sql.DataFrame.transform() is used to chain the custom transformations and this function returns the new DataFrame after applying the specified transformations.

In [2]:

# Imports
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
            .appName('SparkByExamples.com') \
            .getOrCreate()

# Prepare Data
simpleData = (("Java",4000,5), \
    ("Python", 4600,10),  \
    ("Scala", 4100,15),   \
    ("Scala", 4500,15),   \
    ("PHP", 3000,20),  \
  )
columns= ["CourseName", "fee", "discount"]

# Create DataFrame
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)


root
 |-- CourseName: string (nullable = true)
 |-- fee: long (nullable = true)
 |-- discount: long (nullable = true)

+----------+----+--------+
|CourseName|fee |discount|
+----------+----+--------+
|Java      |4000|5       |
|Python    |4600|10      |
|Scala     |4100|15      |
|Scala     |4500|15      |
|PHP       |3000|20      |
+----------+----+--------+



1.1 Syntax
Following is the syntax of the pyspark.sql.DataFrame.transform() function



##### Syntax
DataFrame.transform(func: Callable[[…], DataFrame], *args: Any, **kwargs: Any) → pyspark.sql.dataframe.DataFrame


In [3]:
# 1. PySpark DataFrame.transform()
"""The pyspark.sql.DataFrame.transform() is used to chain the custom transformations and this function returns the new DataFrame after applying the specified transformations"""

# 1.2 Create Custom Functions

# Custom transformation 1
from pyspark.sql.functions import upper
def to_upper_str_columns(df):
    return df.withColumn("CourseName",upper(df.CourseName))

# Custom transformation 2
def reduce_price(df, reduceBy):
    return df.withColumn("new_fee",df.fee - reduceBy)

# Custom transformation 3
def apply_discount(df):
    return df.withColumn("discounted_fee",  \
             df.fee - (df.fee * df.discount) / 100)
             
# 1.3 PySpark Apply DataFrame.transform()
# PySpark transform() Usage
from functools import partial

df2 = df.transform(to_upper_str_columns) \
        .transform(partial(reduce_price,reduceBy=1000)) \
        .transform(apply_discount).show()

+----------+----+--------+-------+--------------+
|CourseName| fee|discount|new_fee|discounted_fee|
+----------+----+--------+-------+--------------+
|      JAVA|4000|       5|   3000|        3800.0|
|    PYTHON|4600|      10|   3600|        4140.0|
|     SCALA|4100|      15|   3100|        3485.0|
|     SCALA|4500|      15|   3500|        3825.0|
|       PHP|3000|      20|   2000|        2400.0|
+----------+----+--------+-------+--------------+



In [4]:
# custom function
df.show()
def select_columns(df):
    return df.select("CourseName","discounted_fee")

# Chain transformations
df2 = df.transform(to_upper_str_columns) \
        .transform(partial(reduce_price,reduceBy=1000)) \
        .transform(apply_discount) \
        .transform(select_columns)

+----------+----+--------+
|CourseName| fee|discount|
+----------+----+--------+
|      Java|4000|       5|
|    Python|4600|      10|
|     Scala|4100|      15|
|     Scala|4500|      15|
|       PHP|3000|      20|
+----------+----+--------+



#### 2. PySpark sql.functions.transform()

#### Syntax
pyspark.sql.functions.transform(col, f)



In [5]:

# Create DataFrame with Array
data = [
 ("James,,Smith",["Java","Scala","C++"],["Spark","Java"]),
 ("Michael,Rose,",["Spark","Java","C++"],["Spark","Java"]),
 ("Robert,,Williams",["CSharp","VB"],["Spark","Python"])
]
df = spark.createDataFrame(data=data,schema=["Name","Languages1","Languages2"])
df.printSchema()
df.show()

# using transform() function
from pyspark.sql.functions import upper
from pyspark.sql.functions import transform
df.select(transform("Languages1", lambda x: upper(x)).alias("languages1")) \
  .show()

root
 |-- Name: string (nullable = true)
 |-- Languages1: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- Languages2: array (nullable = true)
 |    |-- element: string (containsNull = true)

+----------------+------------------+---------------+
|            Name|        Languages1|     Languages2|
+----------------+------------------+---------------+
|    James,,Smith|[Java, Scala, C++]|  [Spark, Java]|
|   Michael,Rose,|[Spark, Java, C++]|  [Spark, Java]|
|Robert,,Williams|      [CSharp, VB]|[Spark, Python]|
+----------------+------------------+---------------+

+------------------+
|        languages1|
+------------------+
|[JAVA, SCALA, C++]|
|[SPARK, JAVA, C++]|
|      [CSHARP, VB]|
+------------------+



##### PySpark apply Function to Column
How to apply a function to a column in PySpark? By using withColumn(), sql(), select() you can apply a built-in function or custom function to a column. In order to apply a custom function, first you need to create a function and register the function as a UDF. Recent versions of PySpark provide a way to use Pandas API hence, you can also use pyspark.pandas.DataFrame.apply().

In [6]:
# 1. PySpark apply Function using withColumn()

# Apply function using withColumn
from pyspark.sql.functions import upper
df.withColumn("Upper_Name", upper(df.Name)) \
  .show()


+----------------+------------------+---------------+----------------+
|            Name|        Languages1|     Languages2|      Upper_Name|
+----------------+------------------+---------------+----------------+
|    James,,Smith|[Java, Scala, C++]|  [Spark, Java]|    JAMES,,SMITH|
|   Michael,Rose,|[Spark, Java, C++]|  [Spark, Java]|   MICHAEL,ROSE,|
|Robert,,Williams|      [CSharp, VB]|[Spark, Python]|ROBERT,,WILLIAMS|
+----------------+------------------+---------------+----------------+



In [7]:
# 2. Apply Function using select()
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df1 = spark.createDataFrame(data=data,schema=columns)
df1.show()
# Apply function using select  
df1.select("Seqno","Name", upper(df1.Name)) \
  .show()
# 3. Apply Function using sql()
# Apply function using sql()
df1.createOrReplaceTempView("TAB")
spark.sql("select Seqno, Name, UPPER(Name) from TAB") \
     .show() 

+-----+------------+
|Seqno|        Name|
+-----+------------+
|    1|  john jones|
|    2|tracey smith|
|    3| amy sanders|
+-----+------------+

+-----+------------+------------+
|Seqno|        Name| upper(Name)|
+-----+------------+------------+
|    1|  john jones|  JOHN JONES|
|    2|tracey smith|TRACEY SMITH|
|    3| amy sanders| AMY SANDERS|
+-----+------------+------------+

+-----+------------+------------+
|Seqno|        Name| upper(Name)|
+-----+------------+------------+
|    1|  john jones|  JOHN JONES|
|    2|tracey smith|TRACEY SMITH|
|    3| amy sanders| AMY SANDERS|
+-----+------------+------------+



In [11]:

# 4. PySpark apply Custom UDF Function

# Create custom function
def upperCase(str):
    return str.upper()


# Convert function to udf
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
upperCaseUDF = udf(lambda x:upperCase(x),StringType()) 


# 4.3 Apply Custom UDF to Column

# Custom UDF with withColumn()
df1.withColumn("Cureated Name", upperCaseUDF(col("Name"))) \
  .show(truncate=False)

# Custom UDF with select()  
df1.select(col("Seqno"), \
    upperCaseUDF(col("Name")).alias("Name") ) \
   .show(truncate=False)

# Custom UDF with sql()
spark.udf.register("upperCaseUDF", upperCaseUDF)
df.createOrReplaceTempView("TAB")
spark.sql("select Seqno, Name, upperCaseUDF(Name) from TAB") \
     .show()  


+-----+------------+-------------+
|Seqno|Name        |Cureated Name|
+-----+------------+-------------+
|1    |john jones  |JOHN JONES   |
|2    |tracey smith|TRACEY SMITH |
|3    |amy sanders |AMY SANDERS  |
+-----+------------+-------------+

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |JOHN JONES  |
|2    |TRACEY SMITH|
|3    |AMY SANDERS |
+-----+------------+



23/03/06 08:33:08 WARN SimpleFunctionRegistry: The function uppercaseudf replaced a previously registered function.


AnalysisException: cannot resolve 'Seqno' given input columns: [tab.Languages1, tab.Languages2, tab.Name]; line 1 pos 7;
'Project ['Seqno, upperCaseUDF(Name#104) AS upperCaseUDF(Name)#337]
+- SubqueryAlias tab
   +- View (`TAB`, [Name#104,Languages1#105,Languages2#106])
      +- LogicalRDD [Name#104, Languages1#105, Languages2#106], false


In [12]:
# 5. PySpark Pandas apply()

# Imports
import pyspark.pandas as ps
import numpy as np

technologies = ({
    'Fee' :[20000,25000,30000,22000,np.NaN],
    'Discount':[1000,2500,1500,1200,3000]
               })
# Create a DataFrame
psdf = ps.DataFrame(technologies)
print(psdf)

def add(data):
   return data[0] + data[1]
   
addDF = psdf.apply(add,axis=1)
print(addDF)


  fields = [


       Fee  Discount
0  20000.0      1000
1  25000.0      2500
2  30000.0      1500
3  22000.0      1200
4      NaN      3000


  fields = [


0    21000.0
1    27500.0
2    31500.0
3    23200.0
4        NaN
dtype: float64


#### 1. PySpark apply Function using withColumn()

In [13]:
# Apply function using withColumn
from pyspark.sql.functions import upper
df.withColumn("Upper_Name", upper(df.Name)) \
  .show()

+----------------+------------------+---------------+----------------+
|            Name|        Languages1|     Languages2|      Upper_Name|
+----------------+------------------+---------------+----------------+
|    James,,Smith|[Java, Scala, C++]|  [Spark, Java]|    JAMES,,SMITH|
|   Michael,Rose,|[Spark, Java, C++]|  [Spark, Java]|   MICHAEL,ROSE,|
|Robert,,Williams|      [CSharp, VB]|[Spark, Python]|ROBERT,,WILLIAMS|
+----------------+------------------+---------------+----------------+



#### PySpark map() Transformation
PySpark map (map()) is an RDD transformation that is used to apply the transformation function (lambda) on every element of RDD/DataFrame and returns a new RDD

RDD map() transformation is used to apply any complex operations like adding a column, updating a column, transforming the data e.t.c, the output of map transformations would always have the same number of records as input.

Note1: DataFrame doesn’t have map() transformation to use with DataFrame hence you need to DataFrame to RDD first.
Note2: If you have a heavy initialization use PySpark mapPartitions() transformation instead of map(), as with mapPartitions() heavy initialization executes only once for each partition instead of every record.

map() Syntax

map(f, preservesPartitioning=False)

In [14]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
    .appName("SparkByExamples.com").getOrCreate()

data = ["Project","Gutenberg’s","Alice’s","Adventures",
"in","Wonderland","Project","Gutenberg’s","Adventures",
"in","Wonderland","Project","Gutenberg’s"]

rdd=spark.sparkContext.parallelize(data)

In [15]:
# PySpark map() Example with RDD

rdd2=rdd.map(lambda x: (x,1))
for element in rdd2.collect():
    print(element)

('Project', 1)
('Gutenberg’s', 1)
('Alice’s', 1)
('Adventures', 1)
('in', 1)
('Wonderland', 1)
('Project', 1)
('Gutenberg’s', 1)
('Adventures', 1)
('in', 1)
('Wonderland', 1)
('Project', 1)
('Gutenberg’s', 1)


In [16]:
# PySpark map() Example with DataFrame
data = [('James','Smith','M',30),
  ('Anna','Rose','F',41),
  ('Robert','Williams','M',62), 
]

columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()
# Refering columns by index.
rdd2=df.rdd.map(lambda x: 
    (x[0]+","+x[1],x[2],x[3]*2)
    )  
df2=rdd2.toDF(["name","gender","new_salary"]   )
df2.show()

+---------+--------+------+------+
|firstname|lastname|gender|salary|
+---------+--------+------+------+
|    James|   Smith|     M|    30|
|     Anna|    Rose|     F|    41|
|   Robert|Williams|     M|    62|
+---------+--------+------+------+

+---------------+------+----------+
|           name|gender|new_salary|
+---------------+------+----------+
|    James,Smith|     M|        60|
|      Anna,Rose|     F|        82|
|Robert,Williams|     M|       124|
+---------------+------+----------+



In [17]:
# Referring Column Names
rdd2=df.rdd.map(lambda x: 
    (x["firstname"]+","+x["lastname"],x["gender"],x["salary"]*2)
    ) 
# Referring Column Names
rdd2=df.rdd.map(lambda x: 
    (x.firstname+","+x.lastname,x.gender,x.salary*2)
    ) 

# Referring Column Names
rdd2=df.rdd.map(lambda x: 
    (x.firstname+","+x.lastname,x.gender,x.salary*2)
    ) 

In [18]:
rdd2.collect()

                                                                                

[('James,Smith', 'M', 60),
 ('Anna,Rose', 'F', 82),
 ('Robert,Williams', 'M', 124)]

#### PySpark flatMap() Transformation
PySpark flatMap() is a transformation operation that flattens the RDD/DataFrame (array/map DataFrame columns) after applying the function on every element and returns a new PySpark RDD/DataFrame

flatMap() Syntax

flatMap(f, preservesPartitioning=False)

In [22]:

data = ["Project Gutenberg’s",
        "Alice’s Adventures in Wonderland",
        "Project Gutenberg’s",
        "Adventures in Wonderland",
        "Project Gutenberg’s"]
rdd=spark.sparkContext.parallelize(data)
for element in rdd.collect():
    print(element)
print("\n")

rdd2=rdd.flatMap(lambda x: x.split(" "))
for element in rdd2.collect():
    print(element)

Project Gutenberg’s
Alice’s Adventures in Wonderland
Project Gutenberg’s
Adventures in Wonderland
Project Gutenberg’s


Project
Gutenberg’s
Alice’s
Adventures
in
Wonderland
Project
Gutenberg’s
Adventures
in
Wonderland
Project
Gutenberg’s


In [26]:
# Using flatMap() transformation on DataFrame
# Unfortunately, PySpark DataFame doesn’t have flatMap() transformation however, DataFrame has explode() SQL function that is used to flatten the column. Below is a complete example.


import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pyspark-by-examples').getOrCreate()

arrayData = [
        ('James',['Java','Scala'],{'hair':'black','eye':'brown'}),
        ('Michael',['Spark','Java',None],{'hair':'brown','eye':None}),
        ('Robert',['CSharp',''],{'hair':'red','eye':''}),
        ('Washington',None,None),
        ('Jefferson',['1','2'],{})]
df = spark.createDataFrame(data=arrayData, schema = ['name','knownLanguages','properties'])

from pyspark.sql.functions import explode
df2 = df.select(df.name,explode(df.knownLanguages))
df3 = df.select(df.name,explode(df.properties))
df2.printSchema()
df2.show()
df3.printSchema()
df3.show()

root
 |-- name: string (nullable = true)
 |-- col: string (nullable = true)

+---------+------+
|     name|   col|
+---------+------+
|    James|  Java|
|    James| Scala|
|  Michael| Spark|
|  Michael|  Java|
|  Michael|  null|
|   Robert|CSharp|
|   Robert|      |
|Jefferson|     1|
|Jefferson|     2|
+---------+------+

root
 |-- name: string (nullable = true)
 |-- key: string (nullable = false)
 |-- value: string (nullable = true)

+-------+----+-----+
|   name| key|value|
+-------+----+-----+
|  James| eye|brown|
|  James|hair|black|
|Michael| eye| null|
|Michael|hair|brown|
| Robert| eye|     |
| Robert|hair|  red|
+-------+----+-----+



#### PySpark foreach() Usage with Examples
1.1 foreach() Syntax
Following is the syntax of the foreach() function


# Syntax
DataFrame.foreach(f)

In [29]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com') \
                    .getOrCreate()

# Prepare Data
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

# Create DataFrame
df = spark.createDataFrame(data=data,schema=columns)
df.show()

# foreach() Example
def f(df):
    print(df.Seqno)
df.foreach(f)

# foreach() with accumulator Example
accum=spark.sparkContext.accumulator(0)
df.foreach(lambda x:accum.add(int(x.Seqno)))
print(accum.value) #Accessed by driver

+-----+------------+
|Seqno|        Name|
+-----+------------+
|    1|  john jones|
|    2|tracey smith|
|    3| amy sanders|
+-----+------------+



1
3
2


6


#####  2. PySpark RDD foreach() Usage

2.1 Syntax

### Syntax
RDD.foreach(f: Callable[[T], None]) → None

In [31]:
# <!-- 2.2 RDD foreach() Example -->

# <!-- # foreach() with RDD example -->
accum=spark.sparkContext.accumulator(0)
rdd=spark.sparkContext.parallelize([1,2,3,4,5])
rdd.foreach(lambda x:accum.add(x))
print(accum.value) #Accessed by driver

15


#####  PySpark Random Sample 
##### 1. PySpark SQL sample() Usage & Examples
Below is the syntax of the sample() function.

sample(withReplacement, fraction, seed=None)

In [33]:
# 1.1 Using fraction to get a random sample in PySpark

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[1]") \
    .appName("SparkByExamples.com") \
    .getOrCreate()

df=spark.range(100)
print(df.sample(0.06).collect())

[Row(id=8), Row(id=21), Row(id=41), Row(id=54), Row(id=58), Row(id=62), Row(id=64), Row(id=70), Row(id=73), Row(id=79), Row(id=96), Row(id=99)]


In [34]:
# 1.2 Using seed to reproduce the same Samples in PySpark


print(df.sample(0.1,123).collect())

print(df.sample(0.1,123).collect())

print(df.sample(0.1,456).collect())


[Row(id=5), Row(id=10), Row(id=17), Row(id=36), Row(id=42), Row(id=52), Row(id=56), Row(id=59), Row(id=62), Row(id=65), Row(id=88), Row(id=92), Row(id=96), Row(id=97)]
[Row(id=5), Row(id=10), Row(id=17), Row(id=36), Row(id=42), Row(id=52), Row(id=56), Row(id=59), Row(id=62), Row(id=65), Row(id=88), Row(id=92), Row(id=96), Row(id=97)]
[Row(id=10), Row(id=27), Row(id=30), Row(id=54), Row(id=67), Row(id=81), Row(id=86)]


In [38]:
# 1.3 Sample withReplacement (May contain duplicates)
print(df.sample(True,0.3,123).collect()) # with Duplicates
print("--------------------")
print(df.sample(0.3,123).collect()) # No duplicates

[Row(id=0), Row(id=2), Row(id=4), Row(id=6), Row(id=6), Row(id=11), Row(id=14), Row(id=15), Row(id=16), Row(id=18), Row(id=18), Row(id=25), Row(id=27), Row(id=33), Row(id=39), Row(id=41), Row(id=45), Row(id=47), Row(id=47), Row(id=49), Row(id=51), Row(id=52), Row(id=52), Row(id=57), Row(id=57), Row(id=58), Row(id=58), Row(id=66), Row(id=73), Row(id=73), Row(id=74), Row(id=78), Row(id=78), Row(id=80), Row(id=82), Row(id=84), Row(id=87), Row(id=87), Row(id=94), Row(id=95), Row(id=96), Row(id=99)]
--------------------
[Row(id=0), Row(id=1), Row(id=3), Row(id=5), Row(id=6), Row(id=8), Row(id=10), Row(id=16), Row(id=17), Row(id=22), Row(id=23), Row(id=29), Row(id=36), Row(id=40), Row(id=42), Row(id=49), Row(id=52), Row(id=54), Row(id=56), Row(id=59), Row(id=61), Row(id=62), Row(id=65), Row(id=69), Row(id=73), Row(id=75), Row(id=81), Row(id=82), Row(id=88), Row(id=90), Row(id=92), Row(id=96), Row(id=97), Row(id=98)]


In [41]:
# 1.4 Stratified sampling in PySpark

# sampleBy() Syntax
# sampleBy(col, fractions, seed=None)
# df.show()
df2=df.select((df.id % 3).alias("key"))
print(df2.sampleBy("key", {0: 0.1, 1: 0.2},0).collect())

[Row(key=1), Row(key=0), Row(key=0), Row(key=0), Row(key=1), Row(key=0), Row(key=1), Row(key=0), Row(key=0), Row(key=1), Row(key=0)]


In [42]:
# 2. PySpark RDD Sample
# sample() of RDD returns a new RDD by selecting random sampling. Below is a syntax.
# sample(self, withReplacement, fraction, seed=None)
rdd = spark.sparkContext.range(0,100)
print(rdd.sample(False,0.1,0).collect())
print(rdd.sample(True,0.3,123).collect())


[10, 13, 32, 39, 46, 53, 65, 70, 95]
[0, 5, 23, 29, 32, 32, 34, 36, 46, 54, 63, 67, 71, 74, 85, 94, 95, 96, 99]


In [43]:
# Syntax of RDD takeSample() 
# takeSample(self, withReplacement, num, seed=None) 

print(rdd.takeSample(False,10,0))

print(rdd.takeSample(True,30,123))

[16, 36, 59, 56, 23, 9, 28, 98, 75, 51]
[96, 32, 44, 72, 94, 49, 22, 57, 33, 14, 17, 87, 56, 36, 46, 29, 4, 38, 1, 64, 25, 54, 53, 95, 23, 32, 85, 79, 25, 48]


PySpark fillna() & fill() – Replace NULL/None Values]
In PySpark, DataFrame.fillna() or DataFrameNaFunctions.fill() is used to replace NULL/None values on all or selected multiple DataFrame columns with either zero(0), empty string, space, or any constant literal values.

PySpark fillna() & fill() Syntax
PySpark provides DataFrame.fillna() and DataFrameNaFunctions.fill() to replace NULL/None values. These two are aliases of each other and returns the same results.


fillna(value, subset=None)
fill(value, subset=None)

In [45]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[1]") \
    .appName("SparkByExamples.com") \
    .getOrCreate()

filePath="small_zipcode.csv"
df = spark.read.options(header='true', inferSchema='true') \
          .csv(filePath)

df.printSchema()
df.show(truncate=False)

root
 |-- id: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- population: integer (nullable = true)

+---+-------+--------+-------------------+-----+----------+
|id |zipcode|type    |city               |state|population|
+---+-------+--------+-------------------+-----+----------+
|1  |704    |STANDARD|null               |PR   |30100     |
|2  |704    |null    |PASEO COSTA DEL SUR|PR   |null      |
|3  |709    |null    |BDA SAN LUIS       |PR   |3700      |
|4  |76166  |UNIQUE  |CINGULAR WIRELESS  |TX   |84000     |
|5  |76177  |STANDARD|null               |TX   |null      |
+---+-------+--------+-------------------+-----+----------+



In [48]:
# PySpark Replace NULL/None Values with Zero (0)
#Replace 0 for null for all integer columns
df.na.fill(value=0).show()

#Replace 0 for null on only population column 
df.na.fill(value=0,subset=["population"]).show()

+---+-------+--------+-------------------+-----+----------+
| id|zipcode|    type|               city|state|population|
+---+-------+--------+-------------------+-----+----------+
|  1|    704|STANDARD|               null|   PR|     30100|
|  2|    704|    null|PASEO COSTA DEL SUR|   PR|         0|
|  3|    709|    null|       BDA SAN LUIS|   PR|      3700|
|  4|  76166|  UNIQUE|  CINGULAR WIRELESS|   TX|     84000|
|  5|  76177|STANDARD|               null|   TX|         0|
+---+-------+--------+-------------------+-----+----------+

+---+-------+--------+-------------------+-----+----------+
| id|zipcode|    type|               city|state|population|
+---+-------+--------+-------------------+-----+----------+
|  1|    704|STANDARD|               null|   PR|     30100|
|  2|    704|    null|PASEO COSTA DEL SUR|   PR|         0|
|  3|    709|    null|       BDA SAN LUIS|   PR|      3700|
|  4|  76166|  UNIQUE|  CINGULAR WIRELESS|   TX|     84000|
|  5|  76177|STANDARD|               nu

In [51]:
# PySpark Replace Null/None Value with Empty String
df.na.fill("").show()

+---+-------+--------+-------------------+-----+----------+
| id|zipcode|    type|               city|state|population|
+---+-------+--------+-------------------+-----+----------+
|  1|    704|STANDARD|                   |   PR|     30100|
|  2|    704|        |PASEO COSTA DEL SUR|   PR|      null|
|  3|    709|        |       BDA SAN LUIS|   PR|      3700|
|  4|  76166|  UNIQUE|  CINGULAR WIRELESS|   TX|     84000|
|  5|  76177|STANDARD|                   |   TX|      null|
+---+-------+--------+-------------------+-----+----------+



In [52]:
df.na.fill("unknown",["city"]) \
    .na.fill("",["type"]).show()

+---+-------+--------+-------------------+-----+----------+
| id|zipcode|    type|               city|state|population|
+---+-------+--------+-------------------+-----+----------+
|  1|    704|STANDARD|            unknown|   PR|     30100|
|  2|    704|        |PASEO COSTA DEL SUR|   PR|      null|
|  3|    709|        |       BDA SAN LUIS|   PR|      3700|
|  4|  76166|  UNIQUE|  CINGULAR WIRELESS|   TX|     84000|
|  5|  76177|STANDARD|            unknown|   TX|      null|
+---+-------+--------+-------------------+-----+----------+



In [53]:
# Alternatively you can also write the above statement as


df.na.fill({"city": "unknown", "type": ""}) \
    .show()

+---+-------+--------+-------------------+-----+----------+
| id|zipcode|    type|               city|state|population|
+---+-------+--------+-------------------+-----+----------+
|  1|    704|STANDARD|            unknown|   PR|     30100|
|  2|    704|        |PASEO COSTA DEL SUR|   PR|      null|
|  3|    709|        |       BDA SAN LUIS|   PR|      3700|
|  4|  76166|  UNIQUE|  CINGULAR WIRELESS|   TX|     84000|
|  5|  76177|STANDARD|            unknown|   TX|      null|
+---+-------+--------+-------------------+-----+----------+



PySpark Pivot and Unpivot DataFrame

In [54]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
#Create spark session
data = [("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"), \
      ("Orange",2000,"USA"),("Orange",2000,"USA"),("Banana",400,"China"), \
      ("Carrots",1200,"China"),("Beans",1500,"China"),("Orange",4000,"China"), \
      ("Banana",2000,"Canada"),("Carrots",2000,"Canada"),("Beans",2000,"Mexico")]

columns= ["Product","Amount","Country"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- Product: string (nullable = true)
 |-- Amount: long (nullable = true)
 |-- Country: string (nullable = true)

+-------+------+-------+
|Product|Amount|Country|
+-------+------+-------+
|Banana |1000  |USA    |
|Carrots|1500  |USA    |
|Beans  |1600  |USA    |
|Orange |2000  |USA    |
|Orange |2000  |USA    |
|Banana |400   |China  |
|Carrots|1200  |China  |
|Beans  |1500  |China  |
|Orange |4000  |China  |
|Banana |2000  |Canada |
|Carrots|2000  |Canada |
|Beans  |2000  |Mexico |
+-------+------+-------+



+-------+------+-------+
Pivot PySpark DataFrame
PySpark SQL provides pivot() function to rotate the data from one column into multiple columns. It is an aggregation where one of the grouping columns values is transposed into individual columns with distinct data. To get the total amount exported to each country of each product, will do group by Product, pivot by Country, and the sum of Amount.

In [55]:
pivotDF = df.groupBy("Product").pivot("Country").sum("Amount")
pivotDF.printSchema()
pivotDF.show(truncate=False)

root
 |-- Product: string (nullable = true)
 |-- Canada: long (nullable = true)
 |-- China: long (nullable = true)
 |-- Mexico: long (nullable = true)
 |-- USA: long (nullable = true)

+-------+------+-----+------+----+
|Product|Canada|China|Mexico|USA |
+-------+------+-----+------+----+
|Orange |null  |4000 |null  |4000|
|Beans  |null  |1500 |2000  |1600|
|Banana |2000  |400  |null  |1000|
|Carrots|2000  |1200 |null  |1500|
+-------+------+-----+------+----+



In [57]:
# version 2.0 on-wards performance has been improved on Pivot, however, if you are using the lower version; note that pivot is a very expensive operation hence, it is recommended to provide column data (if known) as an argument to function as shown below.

countries = ["USA","China","Canada","Mexico"]
pivotDF = df.groupBy("Product").pivot("Country", countries).sum("Amount")
pivotDF.show(truncate=False)

# Another approach is to do two-phase aggregation. PySpark 2.0 uses this implementation in order to improve the performance Spark-13749

pivotDF = df.groupBy("Product","Country") \
      .sum("Amount") \
      .groupBy("Product") \
      .pivot("Country") \
      .sum("sum(Amount)") 
pivotDF.show(truncate=False)

+-------+----+-----+------+------+
|Product|USA |China|Canada|Mexico|
+-------+----+-----+------+------+
|Orange |4000|4000 |null  |null  |
|Beans  |1600|1500 |null  |2000  |
|Banana |1000|400  |2000  |null  |
|Carrots|1500|1200 |2000  |null  |
+-------+----+-----+------+------+

+-------+------+-----+------+----+
|Product|Canada|China|Mexico|USA |
+-------+------+-----+------+----+
|Orange |null  |4000 |null  |4000|
|Beans  |null  |1500 |2000  |1600|
|Banana |2000  |400  |null  |1000|
|Carrots|2000  |1200 |null  |1500|
+-------+------+-----+------+----+



Unpivot PySpark DataFrame

Unpivot is a reverse operation, we can achieve by rotating column values into rows values. PySpark SQL doesn’t have unpivot function hence will use the stack() function. Below code converts column countries to row

In [60]:
from pyspark.sql.functions import expr
unpivotExpr = "stack(3, 'Canada', Canada, 'China', China, 'Mexico', Mexico) as (Country,Total)"
unPivotDF = pivotDF.select("Product", expr(unpivotExpr)) \
    .where("Total is not null")
unPivotDF.show(truncate=False)

+-------+-------+-----+
|Product|Country|Total|
+-------+-------+-----+
|Orange |China  |4000 |
|Beans  |China  |1500 |
|Beans  |Mexico |2000 |
|Banana |Canada |2000 |
|Banana |China  |400  |
|Carrots|Canada |2000 |
|Carrots|China  |1200 |
+-------+-------+-----+



In [64]:
expr(unpivotExpr)

Column<'multialias(stack(3, Canada, Canada, China, China, Mexico, Mexico))'>

PySpark partitionBy() – Write to Disk Example
PySpark partitionBy() is a function of pyspark.sql.DataFrameWriter class which is used to partition the large dataset (DataFrame) into smaller files based on one or multiple columns while writing to disk,

Partition in memory:     You can partition or repartition the DataFrame by calling repartition() or coalesce() transformations.

Partition on disk:   While writing the PySpark DataFrame back to disk, you can choose how to partition the data based on columns using partitionBy() of pyspark.sql.DataFrameWriter. This is similar to Hives partitions scheme.

Syntax: 

Syntax: partitionBy(self, *cols)


In [72]:
df=spark.read.option("header",True) \
        .csv("simple-zipcodes.csv")
df.printSchema()


root
 |-- RecordNumber: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- State: string (nullable = true)



In [74]:
df.write.option("header",True) \
        .partitionBy("state") \
        .mode("overwrite") \
        .csv("/tmp/zipcodes-state")

In [75]:
# 5. PySpark partitionBy() Multiple Columns
#partitionBy() multiple columns
df.write.option("header",True) \
        .partitionBy("state","city") \
        .mode("overwrite") \
        .csv("/tmp/zipcodes-state")

In [83]:
# 6. Using repartition() and partitionBy() together
# For each partition column, if you wanted to further divide into several partitions, use repartition() and partitionBy() together as explained in the below example.

#Use repartition() and partitionBy() together

dfRepart.repartition(2) \
.write.option("header",True) \
.partitionBy("state") \
.mode("overwrite") \
.csv("/tmp/zipcodes-state-more")


In [87]:
# 7. Data Skew – Control Number of Records per Partition File
#partitionBy() control number of partitions
df.write.option("header",True) \
        .option("maxRecordsPerFile", 2) \
        .partitionBy("state","city") \
        .mode("overwrite") \
        .csv("/tmp/zipcodes-state")

In [88]:
# 8. Read a Specific Partition
dfSinglePart=spark.read.option("header",True) \
            .csv("/tmp/zipcodes-state/state=AL/city=SPRINGVILLE")
dfSinglePart.printSchema()
dfSinglePart.show()


root
 |-- RecordNumber: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Zipcode: string (nullable = true)

+------------+-------+-------+
|RecordNumber|Country|Zipcode|
+------------+-------+-------+
|       54355|     US|  35146|
+------------+-------+-------+



In [89]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number
windowDept = Window.partitionBy("Country").orderBy(col("Zipcode").desc())
df.withColumn("row",row_number().over(windowDept)) \
  .filter(col("row") == 1).drop("row") \
  .show()

+------------+-------+----+-------+-----+
|RecordNumber|Country|City|Zipcode|State|
+------------+-------+----+-------+-----+
|       39828|     US|MESA|  85210|   AZ|
+------------+-------+----+-------+-----+



#### PySpark MapType (Dict) Usage with Examples

PySpark MapType is used to represent map key-value pair similar to python Dictionary (Dict), it extends DataType class which is a superclass of all types in PySpark and takes two mandatory arguments keyType and valueType of type DataType and one optional boolean argument valueContainsNull. keyType and valueType can be any type that extends the DataType class. for e.g StringType, IntegerType, ArrayType, MapType, StructType (struct) e.t.c.

In [90]:
# <!-- 1. Create PySpark MapType  -->
from pyspark.sql.types import StringType, MapType
mapCol = MapType(StringType(),StringType(),False)

# 2. Create MapType From StructType

from pyspark.sql.types import StructField, StructType, StringType, MapType
schema = StructType([
    StructField('name', StringType(), True),
    StructField('properties', MapType(StringType(),StringType()),True)
])

In [91]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
dataDictionary = [
        ('James',{'hair':'black','eye':'brown'}),
        ('Michael',{'hair':'brown','eye':None}),
        ('Robert',{'hair':'red','eye':'black'}),
        ('Washington',{'hair':'grey','eye':'grey'}),
        ('Jefferson',{'hair':'brown','eye':''})
        ]
df = spark.createDataFrame(data=dataDictionary, schema = schema)
df.printSchema()
df.show(truncate=False)

root
 |-- name: string (nullable = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+----------+-----------------------------+
|name      |properties                   |
+----------+-----------------------------+
|James     |{eye -> brown, hair -> black}|
|Michael   |{eye -> null, hair -> brown} |
|Robert    |{eye -> black, hair -> red}  |
|Washington|{eye -> grey, hair -> grey}  |
|Jefferson |{eye -> , hair -> brown}     |
+----------+-----------------------------+



In [104]:
# 3. Access PySpark MapType Elements
df3=df.rdd.map(lambda x: \
    (x.name,x.properties["hair"],x.properties["eye"])) \
    .toDF(["name","hair","eye"])
df3.printSchema()
df3.show()

root
 |-- name: string (nullable = true)
 |-- hair: string (nullable = true)
 |-- eye: string (nullable = true)

+----------+-----+-----+
|      name| hair|  eye|
+----------+-----+-----+
|     James|black|brown|
|   Michael|brown| null|
|    Robert|  red|black|
|Washington| grey| grey|
| Jefferson|brown|     |
+----------+-----+-----+



In [105]:

df.withColumn("hair",df.properties.getItem("hair")) \
  .withColumn("eye",df.properties.getItem("eye")) \
  .drop("properties") \
  .show()

df.withColumn("hair",df.properties["hair"]) \
  .withColumn("eye",df.properties["eye"]) \
  .drop("properties") \
  .show()

+----------+-----+-----+
|      name| hair|  eye|
+----------+-----+-----+
|     James|black|brown|
|   Michael|brown| null|
|    Robert|  red|black|
|Washington| grey| grey|
| Jefferson|brown|     |
+----------+-----+-----+

+----------+-----+-----+
|      name| hair|  eye|
+----------+-----+-----+
|     James|black|brown|
|   Michael|brown| null|
|    Robert|  red|black|
|Washington| grey| grey|
| Jefferson|brown|     |
+----------+-----+-----+



In [106]:
# 4. Functions
# 4.1 – explode
from pyspark.sql.functions import explode
df.select(df.name,explode(df.properties)).show()

# 4.2 map_keys() – Get All Map Keys
from pyspark.sql.functions import map_keys
df.select(df.name,map_keys(df.properties)).show()

+----------+----+-----+
|      name| key|value|
+----------+----+-----+
|     James| eye|brown|
|     James|hair|black|
|   Michael| eye| null|
|   Michael|hair|brown|
|    Robert| eye|black|
|    Robert|hair|  red|
|Washington| eye| grey|
|Washington|hair| grey|
| Jefferson| eye|     |
| Jefferson|hair|brown|
+----------+----+-----+

+----------+--------------------+
|      name|map_keys(properties)|
+----------+--------------------+
|     James|         [eye, hair]|
|   Michael|         [eye, hair]|
|    Robert|         [eye, hair]|
|Washington|         [eye, hair]|
| Jefferson|         [eye, hair]|
+----------+--------------------+



In [117]:
# In case if you wanted to get all map keys as Python List. WARNING: This runs very slow.

from pyspark.sql.functions import explode,map_keys
keysDF = df.select(explode(map_keys(df.properties))).distinct()
keysList = keysDF.rdd.map(lambda x:x[0]).collect()
print(keysList)

# 4.3 map_values() – Get All map Values

from pyspark.sql.functions import map_values
df.select(df.name,map_values(df.properties)).show()

['eye', 'hair']
+----------+----------------------+
|      name|map_values(properties)|
+----------+----------------------+
|     James|        [brown, black]|
|   Michael|         [null, brown]|
|    Robert|          [black, red]|
|Washington|          [grey, grey]|
| Jefferson|             [, brown]|
+----------+----------------------+

