In [None]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("RDD Operations").setMaster("local[*]")
sc = SparkContext(conf=conf)

data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
rdd1 =sc.parallelize([1, 2, 3])
rdd2 =sc.parallelize([4, 5, 6])
union_rdd = rdd1.union(rdd2)
print("Original Rdd", rdd.collect())
print("Union Rdd", union_rdd.collect())
count = rdd.count()
print("Number of elements in Rdd", count)

Original Rdd [1, 2, 3, 4, 5]
Union Rdd [1, 2, 3, 4, 5, 6]
Number of elements in Rdd 5


In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=88c8b3c7a40f9e5e9efd9f3e960d858e367cca3146907ef431ad753d4ca3d1c1
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
sum = rdd.reduce(lambda x, y: x + y)
print("Sum of elements in Rdd", sum)

Sum of elements in Rdd 15


In [None]:
first3 = rdd.take(3)
print("First 3 elements in Rdd", first3)

First 3 elements in Rdd [1, 2, 3]


In [None]:
first_element = rdd.first()
print("First element in Rdd", first_element)

First element in Rdd 1


In [None]:
from pyspark import SparkContext, SparkConf
data = [("b", 2), ("a",1), ("a", 3), ("c", 4), ("b", 5)]
pair_rdd = sc.parallelize(data)
sorted_rdd_asc = pair_rdd.sortByKey()
sorted_rdd_desc = pair_rdd.sortByKey(ascending=False)
print("Sorted Rdd in ascending order", sorted_rdd_asc.collect())
print("Sorted Rdd in descending order", sorted_rdd_desc.collect())
for key, value in sorted_rdd_asc.collect():
    print(key, value)


Sorted Rdd in ascending order [('a', 1), ('a', 3), ('b', 2), ('b', 5), ('c', 4)]
Sorted Rdd in descending order [('c', 4), ('b', 2), ('b', 5), ('a', 1), ('a', 3)]
a 1
a 3
b 2
b 5
c 4


In [None]:
data = [("b", 2), ("a",1), ("a", 3), ("c", 4), ("b", 5)]
pair_rdd = sc.parallelize(data)
#mapping
mapped_rdd = pair_rdd.mapValues(lambda x: x*10)
print("Mapped Rdd", mapped_rdd.collect())
for key, value in mapped_rdd.collect():
    print(key, value)

Mapped Rdd [('b', 20), ('a', 10), ('a', 30), ('c', 40), ('b', 50)]
b 20
a 10
a 30
c 40
b 50


In [None]:
data = [("b", 2), ("a",1), ("a", 3), ("c", 4), ("b", 5)]
pair_rdd = sc.parallelize(data)
#get keys
keys_rdd= pair_rdd.keys()
print("Keys:")
for key in keys_rdd.collect():
    print(key)

values_rdd = pair_rdd.values()
print("Values:")
for value in values_rdd.collect():
    print(value)

Keys:
b
a
a
c
b
Values:
2
1
3
4
5


In [None]:
data = [1, 2, 3,4, 5]
rdd = sc.parallelize(data)
rdd_repartitioned = rdd.repartition(3)
print("Rdd after repartitioning", rdd_repartitioned.getNumPartitions())

Rdd after repartitioning 3


In [None]:
data = [1, 2, 3,4, 5]
rdd = sc.parallelize(data, 5)
print("Number of partitions before coalesce", rdd.getNumPartitions())
rdd_coalesced = rdd.coalesce(3)
print("Number of partitions after coalesce", rdd_coalesced.getNumPartitions())


Number of partitions before coalesce 5
Number of partitions after coalesce 3


In [None]:
sc.stop()

In [None]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("RDD Operations").setMaster("local[*]")
sc = SparkContext(conf=conf)

transactions = [
    (1, 101),
    (1, 102),
    (3, 101),
    (4, 103),
    (5, 102),
]

In [None]:

transactions_rdd = sc.parallelize(transactions)

In [None]:
products_mapping = {
    101: "electronics",
    102: "clothing",
    103: "groceries",
}

In [None]:
broadcast_products_mapping = sc.broadcast(products_mapping)

In [None]:
def map_product(transactions):
    product_id, amount = transactions
    product_category = broadcast_products_mapping.value.get(product_id, products_mapping)
    return (product_category, amount)

In [None]:
mapped_transactions_rdd = transactions_rdd.map(map_product)

In [None]:
print(mapped_transactions_rdd.collect())
sc.stop()

AttributeError: 'NoneType' object has no attribute 'setCallSite'

In [None]:
numbers= [1, 2, 3, 4, 5, 6, 7,8, 9, 10, 12]

In [None]:
numbers_rdd = sc.parallelize(numbers)
even_count = sc.accumulator(0)
odd_count = sc.accumulator(0)
def count_even_odd(number):
    if number % 2 == 0:
        even_count.add(1)
    else:
        odd_count.add(1)
    return number
numbers_rdd.foreach(count_even_odd)
print("Even count:", even_count.value)
print("Odd count:", odd_count.value)
sc.stop()

Even count: 6
Odd count: 5


In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=62278015075791cfccc57ba9a80607c5ce3a4612acb05bd5546241fe6e1b0112
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
    .master("local[1]") \
    .appName("SparkByExamples.com") \
    .getOrCreate()

# Read CSV File
df = spark.read.csv("/content/Book1.xlsx")
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)



In [None]:
df = spark.read.format("csv").option("header", "true").load("/content/Book1.xlsx")
df.printSchema()

root
 |-- PK  \b   ! b�h^  �   \b[Content_Types].xml �(�                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 ���N�0E�H�C�-Jܲ@\b5��*Q>�ēƪc[�ii����B�j7���{2��h�nm���ƻR\f����U^7/���%��rZY�@1__�f� �q��R4D�AJ�h>����V�ƹ\f�Z�9����NV�8ʩ����ji){^��-I�"{�v^�P!XS)bR�r��K�s(�3�`c�0��������7M4�����ZƐk+�|\|z�(���P��6h_-[�@�!��� Pk���2n�}�?�L��� ��%���d����dN"m: string (nullable = true)
 |-- �ǞDO97*�~��ɸ8�O�c|n��\a�E������B��!$}�����;{���[����2�  �� PK  \b   ! �U0#�   L  \v \b_rels/.rels �(�

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=7ef22a725ed3139aa8cb3a4d94c672a8a3004c7e38a17a549a4d3ba8ca55c3cb
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession


In [None]:
spark = SparkSession.builder \
    .master("local[1]") \
    .appName("SparkByExamples.com") \
    .getOrCreate()

df = spark.read.csv("/content/Electric_Vehicle_Population_Data.csv")
df1 = spark.read.csv("/content/iris.data.csv")

In [None]:
df.printSchema()
df1.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)



In [None]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
schema = StructType() \
      .add("RecordNumber",IntegerType(),True) \
      .add("Zipcode",IntegerType(),True) \
      .add("ZipCodeType",StringType(),True) \
      .add("City",StringType(),True)


df_with_schema = spark.read.format("csv") \
      .option("header", True) \
      .schema(schema) \
      .load("/content/Electric_Vehicle_Population_Data.csv")
df_with_schema.printSchema()

df2 = spark.read.csv("/content/iris.data.csv")
df2.printSchema()

root
 |-- RecordNumber: integer (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)



In [None]:
df.write.option("header",True) \
 .csv("/tmp/spark_output/zipcodes")

In [None]:
emptyRDD = spark.sparkContext.emptyRDD()
print(emptyRDD)
rdd2 = spark.sparkContext.parallelize([])
print(rdd2)


EmptyRDD[33] at emptyRDD at NativeMethodAccessorImpl.java:0
ParallelCollectionRDD[34] at readRDDFromFile at PythonRDD.scala:289


In [None]:
#creating empty dataframe with schema
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([
  StructField('firstname', StringType(), True),
  StructField('middlename', StringType(), True),
  StructField('lastname', StringType(), True)
  ])

In [None]:
#now create empty Rdd  created above and pass it
df = spark.createDataFrame(data=emptyRDD,schema=schema)
df.printSchema()
df.show()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)

+---------+----------+--------+
|firstname|middlename|lastname|
+---------+----------+--------+
+---------+----------+--------+



In [None]:
df1 = emptyRDD.toDF(schema)
df1.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)



In [None]:
#here we create it manually with schema and without RDD
df2 = spark.createDataFrame([], schema)
df2.printSchema()
df2.show()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)

+---------+----------+--------+
|firstname|middlename|lastname|
+---------+----------+--------+
+---------+----------+--------+



In [None]:
df3 = spark.createDataFrame([], StructType([]))
df3.printSchema()
df3.show()

root

++
||
++
++



In [None]:
dataDF = [(('James','','Smith'),'1991-04-01','M',3000),
  (('Michael','Rose',''),'2000-05-19','M',4000),
  (('Robert','','Williams'),'1978-09-05','M', 4000),
  (('Maria','Anne','Jones'),'1967-12-01','F', 4000),
  (('Jen','Mary','Brown'),'1980-02-17','F', -1)]

In [None]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
schema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('dob', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
df = spark.createDataFrame(data=dataDF, schema=schema)
df.printSchema()
df.show(truncate=False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+--------------------+----------+------+------+
|name                |dob       |gender|salary|
+--------------------+----------+------+------+
|{James, , Smith}    |1991-04-01|M     |3000  |
|{Michael, Rose, }   |2000-05-19|M     |4000  |
|{Robert, , Williams}|1978-09-05|M     |4000  |
|{Maria, Anne, Jones}|1967-12-01|F     |4000  |
|{Jen, Mary, Brown}  |1980-02-17|F     |-1    |
+--------------------+----------+------+------+



In [None]:
#direct printschema
df.withColumnRenamed("dob","DateOfBirth").printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



In [None]:
#to change multiple column name
df2 = df.withColumnRenamed("dob","DateOfBirth").withColumnRenamed("salary","salary_amount")
df2.printSchema()
df2.show(truncate=False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary_amount: integer (nullable = true)

+--------------------+-----------+------+-------------+
|name                |DateOfBirth|gender|salary_amount|
+--------------------+-----------+------+-------------+
|{James, , Smith}    |1991-04-01 |M     |3000         |
|{Michael, Rose, }   |2000-05-19 |M     |4000         |
|{Robert, , Williams}|1978-09-05 |M     |4000         |
|{Maria, Anne, Jones}|1967-12-01 |F     |4000         |
|{Jen, Mary, Brown}  |1980-02-17 |F     |-1           |
+--------------------+-----------+------+-------------+



In [None]:
from pyspark.sql.functions import *
df.select(col("name.firstname").alias("fname"), \
          col("name.middlename").alias("midname"), \
          col("name.lastname").alias("lname"), \
          col("dob").alias("date_of_birth"), \
          col("gender").alias("gender"), \
          col("salary").alias("salary")).printSchema()

root
 |-- fname: string (nullable = true)
 |-- midname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- date_of_birth: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=6c8fe76474f833b42c79ab92debde607b30f9174d4d1de6b68d1a80f717a1cde
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
#rewritten whole code again
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
dataDF = [(('James','','Smith'),'1991-04-01','M',3000),
  (('Michael','Rose',''),'2000-05-19','M',4000),
  (('Robert','','Williams'),'1978-09-05','M', 4000),
  (('Maria','Anne','Jones'),'1967-12-01','F', 4000),
  (('Jen','Mary','Brown'),'1980-02-17','F', -1)]
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
schema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('dob', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])
df = spark.createDataFrame(data=dataDF, schema=schema)
df.printSchema()
df.show(truncate=False)
#new code
newColumns = ["newCol1","newCol2","newCol3","newCol4"]
df.toDF(*newColumns).printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+--------------------+----------+------+------+
|name                |dob       |gender|salary|
+--------------------+----------+------+------+
|{James, , Smith}    |1991-04-01|M     |3000  |
|{Michael, Rose, }   |2000-05-19|M     |4000  |
|{Robert, , Williams}|1978-09-05|M     |4000  |
|{Maria, Anne, Jones}|1967-12-01|F     |4000  |
|{Jen, Mary, Brown}  |1980-02-17|F     |-1    |
+--------------------+----------+------+------+

root
 |-- newCol1: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- newCol2: string (nullable = true)
 |-- newCol3: string (nullable = true)
 |-- newCol4: i

In [None]:
dataDF = [(('James','','Smith'),'1991-04-01','M',3000),
  (('Michael','Rose',''),'2000-05-19','M',4000),
  (('Robert','','Williams'),'1978-09-05','M', 4000),
  (('Maria','Anne','Jones'),'1967-12-01','F', 4000),
  (('Jen','Mary','Brown'),'1980-02-17','F', 5000)]
columns = ["name", "dob", "gender", "salary"]
df = spark.createDataFrame(data=dataDF, schema=columns)
df.printSchema()
df.show(truncate=False)

root
 |-- name: struct (nullable = true)
 |    |-- _1: string (nullable = true)
 |    |-- _2: string (nullable = true)
 |    |-- _3: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+--------------------+----------+------+------+
|name                |dob       |gender|salary|
+--------------------+----------+------+------+
|{James, , Smith}    |1991-04-01|M     |3000  |
|{Michael, Rose, }   |2000-05-19|M     |4000  |
|{Robert, , Williams}|1978-09-05|M     |4000  |
|{Maria, Anne, Jones}|1967-12-01|F     |4000  |
|{Jen, Mary, Brown}  |1980-02-17|F     |5000  |
+--------------------+----------+------+------+



In [None]:
from pyspark.sql.functions import col
df.withColumn("new_salary", col("salary").cast("integer")).show()

+--------------------+----------+------+------+----------+
|                name|       dob|gender|salary|new_salary|
+--------------------+----------+------+------+----------+
|    {James, , Smith}|1991-04-01|     M|  3000|      3000|
|   {Michael, Rose, }|2000-05-19|     M|  4000|      4000|
|{Robert, , Williams}|1978-09-05|     M|  4000|      4000|
|{Maria, Anne, Jones}|1967-12-01|     F|  4000|      4000|
|  {Jen, Mary, Brown}|1980-02-17|     F|  5000|      5000|
+--------------------+----------+------+------+----------+



In [None]:
df.withColumn("new_salary", col("salary")*100).show()

+--------------------+----------+------+------+----------+
|                name|       dob|gender|salary|new_salary|
+--------------------+----------+------+------+----------+
|    {James, , Smith}|1991-04-01|     M|  3000|    300000|
|   {Michael, Rose, }|2000-05-19|     M|  4000|    400000|
|{Robert, , Williams}|1978-09-05|     M|  4000|    400000|
|{Maria, Anne, Jones}|1967-12-01|     F|  4000|    400000|
|  {Jen, Mary, Brown}|1980-02-17|     F|  5000|    500000|
+--------------------+----------+------+------+----------+



In [None]:
df.printSchema()


root
 |-- name: struct (nullable = true)
 |    |-- _1: string (nullable = true)
 |    |-- _2: string (nullable = true)
 |    |-- _3: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



In [None]:
df.withColumn("Copied Column", col("salary")*-1).show()

+--------------------+----------+------+------+-------------+
|                name|       dob|gender|salary|Copied Column|
+--------------------+----------+------+------+-------------+
|    {James, , Smith}|1991-04-01|     M|  3000|        -3000|
|   {Michael, Rose, }|2000-05-19|     M|  4000|        -4000|
|{Robert, , Williams}|1978-09-05|     M|  4000|        -4000|
|{Maria, Anne, Jones}|1967-12-01|     F|  4000|        -4000|
|  {Jen, Mary, Brown}|1980-02-17|     F|  5000|        -5000|
+--------------------+----------+------+------+-------------+



In [None]:
from pyspark.sql.functions import col, lit
df.withColumn("Country", lit("USA")).show()
df.withColumn("Country", lit("USA")) \
  .withColumn("Years of working", lit("10yrs")) \
  .show()

+--------------------+----------+------+------+-------+
|                name|       dob|gender|salary|Country|
+--------------------+----------+------+------+-------+
|    {James, , Smith}|1991-04-01|     M|  3000|    USA|
|   {Michael, Rose, }|2000-05-19|     M|  4000|    USA|
|{Robert, , Williams}|1978-09-05|     M|  4000|    USA|
|{Maria, Anne, Jones}|1967-12-01|     F|  4000|    USA|
|  {Jen, Mary, Brown}|1980-02-17|     F|  5000|    USA|
+--------------------+----------+------+------+-------+

+--------------------+----------+------+------+-------+----------------+
|                name|       dob|gender|salary|Country|Years of working|
+--------------------+----------+------+------+-------+----------------+
|    {James, , Smith}|1991-04-01|     M|  3000|    USA|           10yrs|
|   {Michael, Rose, }|2000-05-19|     M|  4000|    USA|           10yrs|
|{Robert, , Williams}|1978-09-05|     M|  4000|    USA|           10yrs|
|{Maria, Anne, Jones}|1967-12-01|     F|  4000|    USA|  

In [None]:
df.withColumnRenamed("gender","sex").show()

+--------------------+----------+---+------+
|                name|       dob|sex|salary|
+--------------------+----------+---+------+
|    {James, , Smith}|1991-04-01|  M|  3000|
|   {Michael, Rose, }|2000-05-19|  M|  4000|
|{Robert, , Williams}|1978-09-05|  M|  4000|
|{Maria, Anne, Jones}|1967-12-01|  F|  4000|
|  {Jen, Mary, Brown}|1980-02-17|  F|  5000|
+--------------------+----------+---+------+



In [None]:
df.drop("dob").show()

+--------------------+------+------+
|                name|gender|salary|
+--------------------+------+------+
|    {James, , Smith}|     M|  3000|
|   {Michael, Rose, }|     M|  4000|
|{Robert, , Williams}|     M|  4000|
|{Maria, Anne, Jones}|     F|  4000|
|  {Jen, Mary, Brown}|     F|  5000|
+--------------------+------+------+



In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
data=[("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"), \
      ("Orange",2000,"USA"),("Orange",2000, "USA"),("Banana",400,"China"), \
      ("Carrots",1200,"China"),("Beans",1500,"China"),("Orange",4000,"China"), \
      ("Banana",2000,"Canada"),("Carrots",2000,"Canada"),("Beans",2000,"Mexico")]

columns= ["Product","Amount","Country"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- Product: string (nullable = true)
 |-- Amount: long (nullable = true)
 |-- Country: string (nullable = true)

+-------+------+-------+
|Product|Amount|Country|
+-------+------+-------+
|Banana |1000  |USA    |
|Carrots|1500  |USA    |
|Beans  |1600  |USA    |
|Orange |2000  |USA    |
|Orange |2000  |USA    |
|Banana |400   |China  |
|Carrots|1200  |China  |
|Beans  |1500  |China  |
|Orange |4000  |China  |
|Banana |2000  |Canada |
|Carrots|2000  |Canada |
|Beans  |2000  |Mexico |
+-------+------+-------+



In [None]:
pivotDF = df.groupBy("Product").pivot("Country").sum("Amount")
pivotDF.printSchema()
pivotDF.show(truncate=False)

root
 |-- Product: string (nullable = true)
 |-- Canada: long (nullable = true)
 |-- China: long (nullable = true)
 |-- Mexico: long (nullable = true)
 |-- USA: long (nullable = true)

+-------+------+-----+------+----+
|Product|Canada|China|Mexico|USA |
+-------+------+-----+------+----+
|Orange |NULL  |4000 |NULL  |4000|
|Beans  |NULL  |1500 |2000  |1600|
|Banana |2000  |400  |NULL  |1000|
|Carrots|2000  |1200 |NULL  |1500|
+-------+------+-----+------+----+



In [None]:
countries = ["USA","China","Canada","Mexico"]
pivotDF = df.groupBy("Product").pivot("Country", countries).sum("Amount")
pivotDF.show(truncate=False)



+-------+----+-----+------+------+
|Product|USA |China|Canada|Mexico|
+-------+----+-----+------+------+
|Orange |4000|4000 |NULL  |NULL  |
|Beans  |1600|1500 |NULL  |2000  |
|Banana |1000|400  |2000  |NULL  |
|Carrots|1500|1200 |2000  |NULL  |
+-------+----+-----+------+------+



In [None]:
pivotDF = df.groupBy("Product","Country") \
      .sum("Amount") \
      .groupBy("Product") \
      .pivot("Country") \
      .sum("sum(Amount)")
pivotDF.show(truncate=False)

+-------+------+-----+------+----+
|Product|Canada|China|Mexico|USA |
+-------+------+-----+------+----+
|Orange |NULL  |4000 |NULL  |4000|
|Beans  |NULL  |1500 |2000  |1600|
|Banana |2000  |400  |NULL  |1000|
|Carrots|2000  |1200 |NULL  |1500|
+-------+------+-----+------+----+



In [None]:
from pyspark.sql.functions import expr
unpivotExpr = "stack(3, 'Canada', Canada, 'China', China, 'Mexico', Mexico) as (Country,Total)"
unPivotDF = pivotDF.select("Product", expr(unpivotExpr)) \
    .where("Total is not null")
unPivotDF.show(truncate=False)
unPivotDF.show()

+-------+-------+-----+
|Product|Country|Total|
+-------+-------+-----+
|Orange |China  |4000 |
|Beans  |China  |1500 |
|Beans  |Mexico |2000 |
|Banana |Canada |2000 |
|Banana |China  |400  |
|Carrots|Canada |2000 |
|Carrots|China  |1200 |
+-------+-------+-----+

+-------+-------+-----+
|Product|Country|Total|
+-------+-------+-----+
| Orange|  China| 4000|
|  Beans|  China| 1500|
|  Beans| Mexico| 2000|
| Banana| Canada| 2000|
| Banana|  China|  400|
|Carrots| Canada| 2000|
|Carrots|  China| 1200|
+-------+-------+-----+



In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
# Check if 'spark' is already defined and rename if necessary
if 'spark' in locals() and isinstance(spark, SparkSession):
    spark_session = spark  # Rename existing SparkSession
else:
    spark_session = SparkSession.builder.master("local[1]") \
        .appName("SparkByExamples.com") \
        .getOrCreate()  # Create new SparkSession

data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
                      StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])

df = spark_session.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show(truncate=False)

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|Michael  |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Maria    |Anne      |Jones   |39192|F     |4000  |
|Jen      |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+



In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=328b00014a6ed681743a11c8a470ed4d8396d400ba3dd9ba042746446230409b
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType


In [None]:
if 'spark' in locals() and isinstance(spark, SparkSession):
    spark_session = spark  # Rename existing SparkSession
else:
    spark_session = SparkSession.builder.master("local[1]") \
        .appName("SparkByExamples.com") \
        .getOrCreate()  # Create new SparkSession
data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
                      StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
df2 = spark_session.createDataFrame(data=data,schema=schema) # Use 'spark_session' instead of 'spark'
df2.printSchema()
df2.show(truncate=False)

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|Michael  |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Maria    |Anne      |Jones   |39192|F     |4000  |
|Jen      |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+



In [None]:
from pyspark.sql.functions import col,struct, when
updatedDF = df2.withColumn("OtherInfo",
   struct(col("id").alias("identifier"),
    col("gender").alias("gender"),
    col("salary").alias("salary"),
    when(col("salary").cast(IntegerType()) < 2000,"Low")
      .when(col("salary").cast(IntegerType()) < 4000,"Medium")
      .otherwise("High").alias("Salary_Grade")
  )).drop("id","gender","salary")

updatedDF.printSchema()
updatedDF.show(truncate=False)

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- OtherInfo: struct (nullable = false)
 |    |-- identifier: string (nullable = true)
 |    |-- gender: string (nullable = true)
 |    |-- salary: integer (nullable = true)
 |    |-- Salary_Grade: string (nullable = false)

+---------+----------+--------+------------------------+
|firstname|middlename|lastname|OtherInfo               |
+---------+----------+--------+------------------------+
|James    |          |Smith   |{36636, M, 3000, Medium}|
|Michael  |Rose      |        |{40288, M, 4000, High}  |
|Robert   |          |Williams|{42114, M, 4000, High}  |
|Maria    |Anne      |Jones   |{39192, F, 4000, High}  |
|Jen      |Mary      |Brown   |{, F, -1, Low}          |
+---------+----------+--------+------------------------+



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

data=[("James","Sales",3000),("Michael","Sales",4600),
      ("Robert","Sales",4100),("Maria","Finance",3000),
      ("James","Sales",3000),("Scott","Finance",3300),
      ("Jen","Finance",3900),("Jeff","Marketing",3000),
      ("Kumar","Marketing",2000),("Saif","Sales",4100)]

df = spark.createDataFrame(data=data, schema = ["employee_name","department","salary"])

df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
w2  = Window.partitionBy("department").orderBy(col("salary"))
df.withColumn("row",row_number().over(w2))\
    .filter(col("row")==1).drop("row") \
    .show()



+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        Maria|   Finance|  3000|
|        Kumar| Marketing|  2000|
|        James|     Sales|  3000|
+-------------+----------+------+



In [None]:
df.createOrReplaceTempView("EMP")
spark.sql("select employee_name, Department, Salary from" +
          "(select * ,row_number() OVER (PARTITION BY department ORDER BY salary)as rn "+
          "From EMP)tmp where rn = 1").show()

+-------------+----------+------+
|employee_name|Department|Salary|
+-------------+----------+------+
|        Maria|   Finance|  3000|
|        Kumar| Marketing|  2000|
|        James|     Sales|  3000|
+-------------+----------+------+



In [None]:
w3 = Window.partitionBy("department").orderBy(col("salary").desc())
df.withColumn("row", row_number().over(w3)) \
    .filter(col("row") == 1).drop("row") \
    .show()


+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|      Michael|     Sales|  4600|
+-------------+----------+------+



In [None]:
from pyspark.sql.functions import col, row_number,avg,sum,min,max,dense_rank

w4 = Window.partitionBy("department")
df.withColumn("row",row_number().over(w3)) \
  .withColumn("avg", avg(col("salary")).over(w4)) \
  .withColumn("sum", sum(col("salary")).over(w4)) \
  .withColumn("min", min(col("salary")).over(w4)) \
  .withColumn("max", max(col("salary")).over(w4)) \
  .where(col("row")==1).select("department","avg","sum","min","max") \
  .show()

+----------+------+-----+----+----+
|department|   avg|  sum| min| max|
+----------+------+-----+----+----+
|   Finance|3400.0|10200|3000|3900|
| Marketing|2500.0| 5000|2000|3000|
|     Sales|3760.0|18800|3000|4600|
+----------+------+-----+----+----+



In [None]:
import pyspark
from pyspark.sql import SparkSession
simpleData = [("James","Sales","NY",90000,34,100000), \
    ("Michael","Sales","NY",86000,56,200000), \
    ("Robert","Sales","CA",81000,30, 230000), \
    ("Maria","Finance","CA",90000,24,230000), \
    ("Raman","Finance","CA",99000,24,240000)]
columns= ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+------+
|employee_name|department|state|salary|age|bonus |
+-------------+----------+-----+------+---+------+
|James        |Sales     |NY   |90000 |34 |100000|
|Michael      |Sales     |NY   |86000 |56 |200000|
|Robert       |Sales     |CA   |81000 |30 |230000|
|Maria        |Finance   |CA   |90000 |24 |230000|
|Raman        |Finance   |CA   |99000 |24 |240000|
+-------------+----------+-----+------+---+------+



In [None]:
df.sort("department","state").show(truncate=False)
df.sort(col("department"),col("state")).show(truncate=False)

+-------------+----------+-----+------+---+------+
|employee_name|department|state|salary|age|bonus |
+-------------+----------+-----+------+---+------+
|Maria        |Finance   |CA   |90000 |24 |230000|
|Raman        |Finance   |CA   |99000 |24 |240000|
|Robert       |Sales     |CA   |81000 |30 |230000|
|James        |Sales     |NY   |90000 |34 |100000|
|Michael      |Sales     |NY   |86000 |56 |200000|
+-------------+----------+-----+------+---+------+

+-------------+----------+-----+------+---+------+
|employee_name|department|state|salary|age|bonus |
+-------------+----------+-----+------+---+------+
|Maria        |Finance   |CA   |90000 |24 |230000|
|Raman        |Finance   |CA   |99000 |24 |240000|
|Robert       |Sales     |CA   |81000 |30 |230000|
|James        |Sales     |NY   |90000 |34 |100000|
|Michael      |Sales     |NY   |86000 |56 |200000|
+-------------+----------+-----+------+---+------+



In [None]:
df.sort(df.department.asc(),df.state.asc()).show(truncate=False)
df.sort(col("department").asc(),col("state").asc()).show(truncate=False)
df.orderBy(col("department").asc(),col("state").asc()).show(truncate=False)

+-------------+----------+-----+------+---+------+
|employee_name|department|state|salary|age|bonus |
+-------------+----------+-----+------+---+------+
|Maria        |Finance   |CA   |90000 |24 |230000|
|Raman        |Finance   |CA   |99000 |24 |240000|
|Robert       |Sales     |CA   |81000 |30 |230000|
|James        |Sales     |NY   |90000 |34 |100000|
|Michael      |Sales     |NY   |86000 |56 |200000|
+-------------+----------+-----+------+---+------+

+-------------+----------+-----+------+---+------+
|employee_name|department|state|salary|age|bonus |
+-------------+----------+-----+------+---+------+
|Maria        |Finance   |CA   |90000 |24 |230000|
|Raman        |Finance   |CA   |99000 |24 |240000|
|Robert       |Sales     |CA   |81000 |30 |230000|
|James        |Sales     |NY   |90000 |34 |100000|
|Michael      |Sales     |NY   |86000 |56 |200000|
+-------------+----------+-----+------+---+------+

+-------------+----------+-----+------+---+------+
|employee_name|department|sta

In [None]:
df.sort(df.department.asc(),df.state.asc()).show(truncate=False)
df.sort(col("department").asc(),col("state").asc()).show(truncate=False)
df.orderBy(col("department").asc(),col("state").asc()).show(truncate=False)


+-------------+----------+-----+------+---+------+
|employee_name|department|state|salary|age|bonus |
+-------------+----------+-----+------+---+------+
|Maria        |Finance   |CA   |90000 |24 |230000|
|Raman        |Finance   |CA   |99000 |24 |240000|
|Robert       |Sales     |CA   |81000 |30 |230000|
|James        |Sales     |NY   |90000 |34 |100000|
|Michael      |Sales     |NY   |86000 |56 |200000|
+-------------+----------+-----+------+---+------+

+-------------+----------+-----+------+---+------+
|employee_name|department|state|salary|age|bonus |
+-------------+----------+-----+------+---+------+
|Maria        |Finance   |CA   |90000 |24 |230000|
|Raman        |Finance   |CA   |99000 |24 |240000|
|Robert       |Sales     |CA   |81000 |30 |230000|
|James        |Sales     |NY   |90000 |34 |100000|
|Michael      |Sales     |NY   |86000 |56 |200000|
+-------------+----------+-----+------+---+------+

+-------------+----------+-----+------+---+------+
|employee_name|department|sta

In [None]:
df.sort(df.department.asc(),df.state.desc()).show(truncate=False)
df.sort(col("department").asc(),col("state").desc()).show(truncate=False)
df.orderBy(col("department").asc(),col("state").desc()).show(truncate=False)

+-------------+----------+-----+------+---+------+
|employee_name|department|state|salary|age|bonus |
+-------------+----------+-----+------+---+------+
|Maria        |Finance   |CA   |90000 |24 |230000|
|Raman        |Finance   |CA   |99000 |24 |240000|
|James        |Sales     |NY   |90000 |34 |100000|
|Michael      |Sales     |NY   |86000 |56 |200000|
|Robert       |Sales     |CA   |81000 |30 |230000|
+-------------+----------+-----+------+---+------+

+-------------+----------+-----+------+---+------+
|employee_name|department|state|salary|age|bonus |
+-------------+----------+-----+------+---+------+
|Maria        |Finance   |CA   |90000 |24 |230000|
|Raman        |Finance   |CA   |99000 |24 |240000|
|James        |Sales     |NY   |90000 |34 |100000|
|Michael      |Sales     |NY   |86000 |56 |200000|
|Robert       |Sales     |CA   |81000 |30 |230000|
+-------------+----------+-----+------+---+------+

+-------------+----------+-----+------+---+------+
|employee_name|department|sta

In [None]:
df.createOrReplaceTempView("EMP")
spark.sql("select employee_name,department,state,salary,age,bonus from EMP").show(truncate=False)

+-------------+----------+-----+------+---+------+
|employee_name|department|state|salary|age|bonus |
+-------------+----------+-----+------+---+------+
|James        |Sales     |NY   |90000 |34 |100000|
|Michael      |Sales     |NY   |86000 |56 |200000|
|Robert       |Sales     |CA   |81000 |30 |230000|
|Maria        |Finance   |CA   |90000 |24 |230000|
|Raman        |Finance   |CA   |99000 |24 |240000|
+-------------+----------+-----+------+---+------+



In [None]:
from pyspark.sql import SparkSession
spark : SparkSession = SparkSession.builder.master("local[1]") \
        .appName("SparkByExamples.com") \
        .getOrCreate()

filePath = "/content/electric vehicle.csv"
df = spark.read.options(header='True', inferSchema='True') \
          .csv(filePath)
df.printSchema()
df.show()

root
 |-- Date: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Vehicle Primary Use: string (nullable = true)
 |-- Battery Electric Vehicles (BEVs): string (nullable = true)
 |-- Plug-In Hybrid Electric Vehicles (PHEVs): string (nullable = true)
 |-- Electric Vehicle (EV) Total: string (nullable = true)
 |-- Non-Electric Vehicle Total: string (nullable = true)
 |-- Total Vehicles: string (nullable = true)
 |-- Percent Electric Vehicles: double (nullable = true)

+-----------------+--------------+-----+-------------------+--------------------------------+----------------------------------------+---------------------------+--------------------------+--------------+-------------------------+
|             Date|        County|State|Vehicle Primary Use|Battery Electric Vehicles (BEVs)|Plug-In Hybrid Electric Vehicles (PHEVs)|Electric Vehicle (EV) Total|Non-Electric Vehicle Total|Total Vehicles|Percent Electric Vehicles|
+-------------

In [None]:
df.na.drop().show()

+-----------------+--------------+-----+-------------------+--------------------------------+----------------------------------------+---------------------------+--------------------------+--------------+-------------------------+
|             Date|        County|State|Vehicle Primary Use|Battery Electric Vehicles (BEVs)|Plug-In Hybrid Electric Vehicles (PHEVs)|Electric Vehicle (EV) Total|Non-Electric Vehicle Total|Total Vehicles|Percent Electric Vehicles|
+-----------------+--------------+-----+-------------------+--------------------------------+----------------------------------------+---------------------------+--------------------------+--------------+-------------------------+
|September 30 2022|     Riverside|   CA|          Passenger|                               7|                                       0|                          7|                       460|           467|                      1.5|
| December 31 2022|Prince William|   VA|          Passenger|                

In [None]:
df.na.drop("any").show()

+-----------------+--------------+-----+-------------------+--------------------------------+----------------------------------------+---------------------------+--------------------------+--------------+-------------------------+
|             Date|        County|State|Vehicle Primary Use|Battery Electric Vehicles (BEVs)|Plug-In Hybrid Electric Vehicles (PHEVs)|Electric Vehicle (EV) Total|Non-Electric Vehicle Total|Total Vehicles|Percent Electric Vehicles|
+-----------------+--------------+-----+-------------------+--------------------------------+----------------------------------------+---------------------------+--------------------------+--------------+-------------------------+
|September 30 2022|     Riverside|   CA|          Passenger|                               7|                                       0|                          7|                       460|           467|                      1.5|
| December 31 2022|Prince William|   VA|          Passenger|                

In [None]:
df.na.drop("all").show()

+-----------------+--------------+-----+-------------------+--------------------------------+----------------------------------------+---------------------------+--------------------------+--------------+-------------------------+
|             Date|        County|State|Vehicle Primary Use|Battery Electric Vehicles (BEVs)|Plug-In Hybrid Electric Vehicles (PHEVs)|Electric Vehicle (EV) Total|Non-Electric Vehicle Total|Total Vehicles|Percent Electric Vehicles|
+-----------------+--------------+-----+-------------------+--------------------------------+----------------------------------------+---------------------------+--------------------------+--------------+-------------------------+
|September 30 2022|     Riverside|   CA|          Passenger|                               7|                                       0|                          7|                       460|           467|                      1.5|
| December 31 2022|Prince William|   VA|          Passenger|                

In [None]:
df.na.drop("any", subset=["state"]).show()

+-----------------+--------------+-----+-------------------+--------------------------------+----------------------------------------+---------------------------+--------------------------+--------------+-------------------------+
|             Date|        County|State|Vehicle Primary Use|Battery Electric Vehicles (BEVs)|Plug-In Hybrid Electric Vehicles (PHEVs)|Electric Vehicle (EV) Total|Non-Electric Vehicle Total|Total Vehicles|Percent Electric Vehicles|
+-----------------+--------------+-----+-------------------+--------------------------------+----------------------------------------+---------------------------+--------------------------+--------------+-------------------------+
|September 30 2022|     Riverside|   CA|          Passenger|                               7|                                       0|                          7|                       460|           467|                      1.5|
| December 31 2022|Prince William|   VA|          Passenger|                

In [None]:
df.dropna().show()

+-----------------+--------------+-----+-------------------+--------------------------------+----------------------------------------+---------------------------+--------------------------+--------------+-------------------------+
|             Date|        County|State|Vehicle Primary Use|Battery Electric Vehicles (BEVs)|Plug-In Hybrid Electric Vehicles (PHEVs)|Electric Vehicle (EV) Total|Non-Electric Vehicle Total|Total Vehicles|Percent Electric Vehicles|
+-----------------+--------------+-----+-------------------+--------------------------------+----------------------------------------+---------------------------+--------------------------+--------------+-------------------------+
|September 30 2022|     Riverside|   CA|          Passenger|                               7|                                       0|                          7|                       460|           467|                      1.5|
| December 31 2022|Prince William|   VA|          Passenger|                

In [None]:
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data=data = [(('James','','Smith'),'1991-04-01'),
  (('Michael','Rose',''),'2000-05-19'),
  (('Robert','','Williams'),'1978-09-05'),
  (('Maria','Anne','Jones'),'1967-12-01'),
  (('Jen','Mary','Brown'),'1980-02-17')]

columns= ["name", "dob"]
df = spark.createDataFrame(data, columns)
df.printSchema()
df.show()

root
 |-- name: struct (nullable = true)
 |    |-- _1: string (nullable = true)
 |    |-- _2: string (nullable = true)
 |    |-- _3: string (nullable = true)
 |-- dob: string (nullable = true)

+--------------------+----------+
|                name|       dob|
+--------------------+----------+
|    {James, , Smith}|1991-04-01|
|   {Michael, Rose, }|2000-05-19|
|{Robert, , Williams}|1978-09-05|
|{Maria, Anne, Jones}|1967-12-01|
|  {Jen, Mary, Brown}|1980-02-17|
+--------------------+----------+



In [None]:
from pyspark.sql.functions import split
df1 = df.withColumn('year', split(df['dob'], '-').getItem(0)) \
    .withColumn('month', split(df['dob'], '-').getItem(1)) \
    .withColumn('day', split(df['dob'], '-').getItem(2))
df1.show(truncate=False)

+--------------------+----------+----+-----+---+
|name                |dob       |year|month|day|
+--------------------+----------+----+-----+---+
|{James, , Smith}    |1991-04-01|1991|04   |01 |
|{Michael, Rose, }   |2000-05-19|2000|05   |19 |
|{Robert, , Williams}|1978-09-05|1978|09   |05 |
|{Maria, Anne, Jones}|1967-12-01|1967|12   |01 |
|{Jen, Mary, Brown}  |1980-02-17|1980|02   |17 |
+--------------------+----------+----+-----+---+



In [None]:
split_col = pyspark.sql.functions.split(df['dob'], ' -')
df2 = df.withColumn('year', split_col.getItem(0)) \
    .withColumn('month', split_col.getItem(1)) \
    .withColumn('day', split_col.getItem(2))
df2.show(truncate=False)

+--------------------+----------+----------+-----+----+
|name                |dob       |year      |month|day |
+--------------------+----------+----------+-----+----+
|{James, , Smith}    |1991-04-01|1991-04-01|NULL |NULL|
|{Michael, Rose, }   |2000-05-19|2000-05-19|NULL |NULL|
|{Robert, , Williams}|1978-09-05|1978-09-05|NULL |NULL|
|{Maria, Anne, Jones}|1967-12-01|1967-12-01|NULL |NULL|
|{Jen, Mary, Brown}  |1980-02-17|1980-02-17|NULL |NULL|
+--------------------+----------+----------+-----+----+

