In [0]:
# What is PySpark Accumulator?
# Accumulators are write-only and initialize once variables where only tasks that are running on workers are allowed to update and updates from the #workers get propagated automatically to the driver program. But, only the driver program is allowed to access the Accumulator variable using the #value property.

# an accumulator is a shared variable that can be used for aggregating values across multiple tasks in a parallel and fault-tolerant manner.

In [0]:
# Create an accumulator with an initial value of 0
accum = spark.sparkContext.accumulator(0)
rdd = spark.sparkContext.parallelize([1,2,3,4,5,6])
rdd.foreach(lambda x: accum.add(x))
print(accum.value)


21


In [0]:
#When foreach() applied on PySpark DataFrame, it executes a function specified in for each element of DataFrame.
#In PySpark, the foreach action is used to apply a specified function to each element of an RDD (Resilient Distributed Dataset) or DataFrame. It is often used for performing side-effect operations, such as saving data to an external system, updating external variables, or any other operation that doesn't return a new RDD or DataFrame.

In [0]:
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]
df = spark.createDataFrame(data,columns)
df.show()


+-----+------------+
|Seqno|        Name|
+-----+------------+
|    1|  john jones|
|    2|tracey smith|
|    3| amy sanders|
+-----+------------+



In [0]:
def f(x):
    print(x*2)


In [0]:
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)

# Define a function to be applied using foreach
def print_square(x):
    print(x * x)

# Use foreach to apply the function to each element in the RDD
rdd.foreach(print_square)

In [0]:
data = [("2022-01-01 12:30:00",), ("2022-02-15 18:45:30",), ("2022-03-20 09:15:00",)]
columns = ["timestampCOL"]
df = spark.createDataFrame(data, columns)

# Use to_timestamp to convert the timestamp string to a timestamp type
df_transformed = df.withColumn("timestamp", to_timestamp("timestampCOL", "yyyy-MM-dd HH:mm:ss"))

# Show the original and transformed DataFrames
df.show(truncate=False)
df_transformed.show(truncate=False)


+-------------------+
|timestampCOL       |
+-------------------+
|2022-01-01 12:30:00|
|2022-02-15 18:45:30|
|2022-03-20 09:15:00|
+-------------------+

+-------------------+-------------------+
|timestampCOL       |timestamp          |
+-------------------+-------------------+
|2022-01-01 12:30:00|2022-01-01 12:30:00|
|2022-02-15 18:45:30|2022-02-15 18:45:30|
|2022-03-20 09:15:00|2022-03-20 09:15:00|
+-------------------+-------------------+



In [0]:
#PySpark fillna() & fill() – Replace NULL/None Values
#fillna(value, subset=None)
#fill(value, subset=None)
df = spark.read.csv('dbfs:/FileStore/small_zipcode.csv',header=True,inferSchema=True)
df.show()

+---+-------+--------+-------------------+-----+----------+
| id|zipcode|    type|               city|state|population|
+---+-------+--------+-------------------+-----+----------+
|  1|    704|STANDARD|               null|   PR|     30100|
|  2|    704|    null|PASEO COSTA DEL SUR|   PR|      null|
|  3|    709|    null|       BDA SAN LUIS|   PR|      3700|
|  4|  76166|  UNIQUE|  CINGULAR WIRELESS|   TX|     84000|
|  5|  76177|STANDARD|               null|   TX|      null|
+---+-------+--------+-------------------+-----+----------+



In [0]:
#Replace 0 for null for all integer columns
df.na.fill(value=0).show()

+---+-------+--------+-------------------+-----+----------+
| id|zipcode|    type|               city|state|population|
+---+-------+--------+-------------------+-----+----------+
|  1|    704|STANDARD|               null|   PR|     30100|
|  2|    704|    null|PASEO COSTA DEL SUR|   PR|         0|
|  3|    709|    null|       BDA SAN LUIS|   PR|      3700|
|  4|  76166|  UNIQUE|  CINGULAR WIRELESS|   TX|     84000|
|  5|  76177|STANDARD|               null|   TX|         0|
+---+-------+--------+-------------------+-----+----------+



In [0]:
#Replace 0 for null on only population column 
df.na.fill(value=0,["population"]).show()

[0;36m  File [0;32m<command-2259084633578734>:2[0;36m[0m
[0;31m    df.na.fill(value=0,["population"]).show()[0m
[0m                                     ^[0m
[0;31mSyntaxError[0m[0;31m:[0m positional argument follows keyword argument


In [0]:

df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- population: integer (nullable = true)



In [0]:
#PySpark Replace Null/None Value with Empty String
df.na.fill("").show()

+---+-------+--------+-------------------+-----+----------+
| id|zipcode|    type|               city|state|population|
+---+-------+--------+-------------------+-----+----------+
|  1|    704|STANDARD|                   |   PR|     30100|
|  2|    704|        |PASEO COSTA DEL SUR|   PR|      null|
|  3|    709|        |       BDA SAN LUIS|   PR|      3700|
|  4|  76166|  UNIQUE|  CINGULAR WIRELESS|   TX|     84000|
|  5|  76177|STANDARD|                   |   TX|      null|
+---+-------+--------+-------------------+-----+----------+



In [0]:
#let’s replace NULLs on specific columns
df.na.fill("unkown",['city']).show()


+---+-------+--------+-------------------+-----+----------+
| id|zipcode|    type|               city|state|population|
+---+-------+--------+-------------------+-----+----------+
|  1|    704|STANDARD|             unkown|   PR|     30100|
|  2|    704|    null|PASEO COSTA DEL SUR|   PR|      null|
|  3|    709|    null|       BDA SAN LUIS|   PR|      3700|
|  4|  76166|  UNIQUE|  CINGULAR WIRELESS|   TX|     84000|
|  5|  76177|STANDARD|             unkown|   TX|      null|
+---+-------+--------+-------------------+-----+----------+



In [0]:
df.na.fill({"city":"unkown","type":"NoType"}).show()

+---+-------+--------+-------------------+-----+----------+
| id|zipcode|    type|               city|state|population|
+---+-------+--------+-------------------+-----+----------+
|  1|    704|STANDARD|             unkown|   PR|     30100|
|  2|    704|  NoType|PASEO COSTA DEL SUR|   PR|      null|
|  3|    709|  NoType|       BDA SAN LUIS|   PR|      3700|
|  4|  76166|  UNIQUE|  CINGULAR WIRELESS|   TX|     84000|
|  5|  76177|STANDARD|             unkown|   TX|      null|
+---+-------+--------+-------------------+-----+----------+



In [0]:
df.fillna(value=0).show()
df.fillna(value=0,subset=["population"]).show()

df.fillna(value="").show()
df.fillna("unknown",["city"]).fillna("",["type"]).show()

df.fillna({"city": "unknown", "type": ""}).show()

+---+-------+--------+-------------------+-----+----------+
| id|zipcode|    type|               city|state|population|
+---+-------+--------+-------------------+-----+----------+
|  1|    704|STANDARD|               null|   PR|     30100|
|  2|    704|    null|PASEO COSTA DEL SUR|   PR|         0|
|  3|    709|    null|       BDA SAN LUIS|   PR|      3700|
|  4|  76166|  UNIQUE|  CINGULAR WIRELESS|   TX|     84000|
|  5|  76177|STANDARD|               null|   TX|         0|
+---+-------+--------+-------------------+-----+----------+

+---+-------+--------+-------------------+-----+----------+
| id|zipcode|    type|               city|state|population|
+---+-------+--------+-------------------+-----+----------+
|  1|    704|STANDARD|               null|   PR|     30100|
|  2|    704|    null|PASEO COSTA DEL SUR|   PR|         0|
|  3|    709|    null|       BDA SAN LUIS|   PR|      3700|
|  4|  76166|  UNIQUE|  CINGULAR WIRELESS|   TX|     84000|
|  5|  76177|STANDARD|               nu

In [0]:
print(df)
df.display()

DataFrame[id: int, zipcode: int, type: string, city: string, state: string, population: int]


id,zipcode,type,city,state,population
1,704,STANDARD,,PR,30100.0
2,704,,PASEO COSTA DEL SUR,PR,
3,709,,BDA SAN LUIS,PR,3700.0
4,76166,UNIQUE,CINGULAR WIRELESS,TX,84000.0
5,76177,STANDARD,,TX,


In [0]:
df.describe()

Out[62]: DataFrame[summary: string, id: string, zipcode: string, type: string, city: string, state: string, population: string]

In [0]:
#PySpark Pivot and Unpivot DataFrame
data = [("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"), \
      ("Orange",2000,"USA"),("Orange",2000,"USA"),("Banana",400,"China"), \
      ("Carrots",1200,"China"),("Beans",1500,"China"),("Orange",4000,"China"), \
      ("Banana",2000,"Canada"),("Carrots",2000,"Canada"),("Beans",2000,"Mexico")]

columns= ["Product","Amount","Country"]
df = spark.createDataFrame(data,columns)
df.show()
df.printSchema()

+-------+------+-------+
|Product|Amount|Country|
+-------+------+-------+
| Banana|  1000|    USA|
|Carrots|  1500|    USA|
|  Beans|  1600|    USA|
| Orange|  2000|    USA|
| Orange|  2000|    USA|
| Banana|   400|  China|
|Carrots|  1200|  China|
|  Beans|  1500|  China|
| Orange|  4000|  China|
| Banana|  2000| Canada|
|Carrots|  2000| Canada|
|  Beans|  2000| Mexico|
+-------+------+-------+

root
 |-- Product: string (nullable = true)
 |-- Amount: long (nullable = true)
 |-- Country: string (nullable = true)



In [0]:
df.groupBy('Country').pivot('Product').sum('Amount').show()

+-------+------+-----+-------+------+
|Country|Banana|Beans|Carrots|Orange|
+-------+------+-----+-------+------+
|  China|   400| 1500|   1200|  4000|
|    USA|  1000| 1600|   1500|  4000|
| Mexico|  null| 2000|   null|  null|
| Canada|  2000| null|   2000|  null|
+-------+------+-----+-------+------+

