### Create Spark session

In [1]:
!pip install pyspark
! pip install azure.storage.blob
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Data Analyst Pyspark Tutorial").getOrCreate()

from pyspark.sql.types import *

Collecting azure.storage.blob
  Downloading azure_storage_blob-12.25.1-py3-none-any.whl.metadata (26 kB)
Collecting azure-core>=1.30.0 (from azure.storage.blob)
  Downloading azure_core-1.35.0-py3-none-any.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.4/44.4 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
Collecting isodate>=0.6.1 (from azure.storage.blob)
  Downloading isodate-0.7.2-py3-none-any.whl.metadata (11 kB)
Downloading azure_storage_blob-12.25.1-py3-none-any.whl (406 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m407.0/407.0 kB[0m [31m9.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading azure_core-1.35.0-py3-none-any.whl (210 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m210.7/210.7 kB[0m [31m13.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading isodate-0.7.2-py3-none-any.whl (22 kB)
Installing collected packages: isodate, azure-core, azure.storage.blob
Successfully installed azure-core-1.35.0 az

### Create Data Frame

##### *Read CSV with header*

##### *Read file without header*

In [2]:
# import file raw

dfcsv = spark.read.options(Header=True).csv("postalcodes.csv")

dfcsv.show(10)

+------------+-----+-----+---------+---------+--------+---+
|        City|State|  Zip| Latitude|Longitude|Timezone|DST|
+------------+-----+-----+---------+---------+--------+---+
|        Cove|   AR|90085|34.398483|-94.39398|      -6|  1|
|    Edgemont|   AR|72044|35.624351|-92.16056|      -6|  1|
|    Sherburn|   MN|90100|43.660847|-94.74357|      -6|  1|
|      Lamont|   MI|90107|43.010337|-85.89754|      -5|  1|
|    Richland|   IA|52585|41.194129|-91.98027|      -6|  1|
|   Cannelton|   IN|90108|37.934311|-86.67821|      -5|  0|
|     Zeeland|   MI|90110|42.829252|-85.99621|      -5|  1|
|   Covington|   MI|49919|46.559834|-88.52201|      -5|  1|
|Cedar Rapids|   IA|90113| 41.97545|-91.65912|      -6|  1|
|      Higdon|   AL|90119|34.831242|-85.61564|      -6|  1|
+------------+-----+-----+---------+---------+--------+---+
only showing top 10 rows



In [3]:
dfcsv = spark.read.csv("postalcodes.csv")

dfcsv.show(10)

+------------+-----+-----+---------+---------+--------+---+
|         _c0|  _c1|  _c2|      _c3|      _c4|     _c5|_c6|
+------------+-----+-----+---------+---------+--------+---+
|        City|State|  Zip| Latitude|Longitude|Timezone|DST|
|        Cove|   AR|90085|34.398483|-94.39398|      -6|  1|
|    Edgemont|   AR|72044|35.624351|-92.16056|      -6|  1|
|    Sherburn|   MN|90100|43.660847|-94.74357|      -6|  1|
|      Lamont|   MI|90107|43.010337|-85.89754|      -5|  1|
|    Richland|   IA|52585|41.194129|-91.98027|      -6|  1|
|   Cannelton|   IN|90108|37.934311|-86.67821|      -5|  0|
|     Zeeland|   MI|90110|42.829252|-85.99621|      -5|  1|
|   Covington|   MI|49919|46.559834|-88.52201|      -5|  1|
|Cedar Rapids|   IA|90113| 41.97545|-91.65912|      -6|  1|
+------------+-----+-----+---------+---------+--------+---+
only showing top 10 rows



In [4]:
dfcsv.show(3,0)

+--------+-----+-----+---------+---------+--------+---+
|_c0     |_c1  |_c2  |_c3      |_c4      |_c5     |_c6|
+--------+-----+-----+---------+---------+--------+---+
|City    |State|Zip  |Latitude |Longitude|Timezone|DST|
|Cove    |AR   |90085|34.398483|-94.39398|-6      |1  |
|Edgemont|AR   |72044|35.624351|-92.16056|-6      |1  |
+--------+-----+-----+---------+---------+--------+---+
only showing top 3 rows



In [5]:
# prompt: schema for dfcsv

dfcsv.printSchema()


root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)



In [6]:
orderSchema = StructType([
    StructField("City", StringType()),
    StructField("State", StringType()),
    StructField("Zip", StringType()),
    StructField("Latitude", StringType()),
    StructField("Longitude", StringType()),
    StructField("Timezone", StringType()),
    StructField("DST", StringType())
    ])

##### *Read file poviding schema*

In [8]:
type(orderSchema)

In [9]:
# Import file raw providing Schema

df = spark.read.format("csv").options(Header=True).load("postalcodes.csv")
df.show(30)

+-------------+-----+-----+---------+----------+--------+---+
|         City|State|  Zip| Latitude| Longitude|Timezone|DST|
+-------------+-----+-----+---------+----------+--------+---+
|         Cove|   AR|90085|34.398483| -94.39398|      -6|  1|
|     Edgemont|   AR|72044|35.624351| -92.16056|      -6|  1|
|     Sherburn|   MN|90100|43.660847| -94.74357|      -6|  1|
|       Lamont|   MI|90107|43.010337| -85.89754|      -5|  1|
|     Richland|   IA|52585|41.194129| -91.98027|      -6|  1|
|    Cannelton|   IN|90108|37.934311| -86.67821|      -5|  0|
|      Zeeland|   MI|90110|42.829252| -85.99621|      -5|  1|
|    Covington|   MI|49919|46.559834| -88.52201|      -5|  1|
| Cedar Rapids|   IA|90113| 41.97545| -91.65912|      -6|  1|
|       Higdon|   AL|90119|34.831242| -85.61564|      -6|  1|
| Midland City|   AL|36350|31.319083| -85.48718|      -6|  1|
|     Columbia|   TN|90124|35.619784| -87.03565|      -6|  1|
|  Muses Mills|   KY|90126|  38.3481|-83.718626|      -5|  1|
|    Ann

### Exploring Data Frame

#### Getting info

In [10]:
df.count()

43191

In [None]:
dfstate = df.select('State')


dfstate.show(5)

df.show(5)

+-----+
|State|
+-----+
|   AR|
|   AR|
|   MN|
|   MI|
|   IA|
+-----+
only showing top 5 rows

+--------+-----+-----+---------+---------+--------+---+
|    City|State|  Zip| Latitude|Longitude|Timezone|DST|
+--------+-----+-----+---------+---------+--------+---+
|    Cove|   AR|90085|34.398483|-94.39398|      -6|  1|
|Edgemont|   AR|72044|35.624351|-92.16056|      -6|  1|
|Sherburn|   MN|90100|43.660847|-94.74357|      -6|  1|
|  Lamont|   MI|90107|43.010337|-85.89754|      -5|  1|
|Richland|   IA|52585|41.194129|-91.98027|      -6|  1|
+--------+-----+-----+---------+---------+--------+---+
only showing top 5 rows



In [None]:
dfdistinctstate= df.select('state').distinct().count()
dfdistinctstate





54

In [None]:
df.createOrReplaceTempView("postalcodes")

result_df=spark.sql("select State, count(*) as cnt from postalcodes group by State")

result_df.show(5)

+-----+----+
|State| cnt|
+-----+----+
|   SC| 554|
|   AZ| 538|
|   LA| 755|
|   MN|1054|
|   NJ| 762|
+-----+----+
only showing top 5 rows



In [11]:
# prompt: select all columns from postalcodes table

from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pandas as pd

# Create Spark session
!pip install pyspark
spark = SparkSession.builder.appName("Data Analyst Pyspark Tutorial").getOrCreate()

# Read the CSV file into a DataFrame
dfcsv = spark.read.csv("postalcodes.csv", header=True, inferSchema=True)

# Create a temporary view
dfcsv.createOrReplaceTempView("postalcodes")

# Select all columns from the postalcodes table
all_columns_df = spark.sql("SELECT * FROM postalcodes")

# Show the result
all_columns_df.show()


+------------+-----+-----+---------+----------+--------+---+
|        City|State|  Zip| Latitude| Longitude|Timezone|DST|
+------------+-----+-----+---------+----------+--------+---+
|        Cove|   AR|90085|34.398483| -94.39398|      -6|  1|
|    Edgemont|   AR|72044|35.624351| -92.16056|      -6|  1|
|    Sherburn|   MN|90100|43.660847| -94.74357|      -6|  1|
|      Lamont|   MI|90107|43.010337| -85.89754|      -5|  1|
|    Richland|   IA|52585|41.194129| -91.98027|      -6|  1|
|   Cannelton|   IN|90108|37.934311| -86.67821|      -5|  0|
|     Zeeland|   MI|90110|42.829252| -85.99621|      -5|  1|
|   Covington|   MI|49919|46.559834| -88.52201|      -5|  1|
|Cedar Rapids|   IA|90113| 41.97545| -91.65912|      -6|  1|
|      Higdon|   AL|90119|34.831242| -85.61564|      -6|  1|
|Midland City|   AL|36350|31.319083| -85.48718|      -6|  1|
|    Columbia|   TN|90124|35.619784| -87.03565|      -6|  1|
| Muses Mills|   KY|90126|  38.3481|-83.718626|      -5|  1|
|   Annandale|   MN|5530

#### Selecting columns

In [None]:
df.select(df[1],df[2]).show(5)

+-----+-----+
|State|  Zip|
+-----+-----+
|   AR|90085|
|   AR|72044|
|   MN|90100|
|   MI|90107|
|   IA|52585|
+-----+-----+
only showing top 5 rows



#### Describe Data Frame

In [None]:
df.describe().show()

+-------+----------+-----+-----------------+-----------------+------------------+------------------+-------------------+
|summary|      City|State|              Zip|         Latitude|         Longitude|          Timezone|                DST|
+-------+----------+-----+-----------------+-----------------+------------------+------------------+-------------------+
|  count|     43191|43191|            43191|            43191|             43191|             43191|              43191|
|   mean|      NULL| NULL|49605.28133175893|38.50296645893744|-90.93793709960308|-5.842143039059064| 0.9594822995531477|
| stddev|      NULL| NULL|28046.55767852084|5.395779013062673|15.356758118054104|1.0164852258682984|0.19717230158797316|
|    min|Aaronsburg|   AK|            10001|        -7.209975|        -100.00991|               -10|                  0|
|    max|    Zwolle|   WY|            99950|        71.299525|         -99.99968|                -9|                  1|
+-------+----------+-----+------

### Using functions

[Pyspark Function](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html)

#### Generating columns


**Introduction to withColumn link to this section**

The withColumn function in Spark allows you to add a new column or replace an existing column in a DataFrame. It provides a flexible and expressive way to modify or derive new columns based on existing ones. With withColumn , you can apply transformations, perform computations, or create complex expressions to augment your data.

**Adding a New Column link to this section**

To add a new column using withColumn , you need to specify the name of the new column and the transformation or computation you want to apply.

In [24]:
# Create the schema for the table
orderSchema = StructType([
    StructField("SalesOrderNumber", StringType()),
    StructField("SalesOrderLineNumber", IntegerType()),
    StructField("OrderDate", DateType()),
    StructField("CustomerName", StringType()),
    StructField("Email", StringType()),
    StructField("Item", StringType()),
    StructField("Quantity", IntegerType()),
    StructField("UnitPrice", FloatType()),
    StructField("Tax", FloatType())
    ])

In [26]:
# prompt: read file 2019.csv using orderSchema

df2019 = spark.read.format("csv").schema(orderSchema).options(Header=True).load("2019.csv")
#df2019.show()

df=df2019

In [28]:
df.show(3,0)

+----------------+--------------------+----------+------------+---------------------------+-----------------------+--------+---------+--------+
|SalesOrderNumber|SalesOrderLineNumber|OrderDate |CustomerName|Email                      |Item                   |Quantity|UnitPrice|Tax     |
+----------------+--------------------+----------+------------+---------------------------+-----------------------+--------+---------+--------+
|SO43704         |1                   |2019-07-01|Julio Ruiz  |julio1@adventure-works.com |Mountain-100 Black, 48 |1       |3374.99  |269.9992|
|SO43705         |1                   |2019-07-01|Curtis Lu   |curtis9@adventure-works.com|Mountain-100 Silver, 38|1       |3399.99  |271.9992|
|SO43700         |1                   |2019-07-01|Ruben Prasad|ruben10@adventure-works.com|Road-650 Black, 62     |1       |699.0982 |55.9279 |
+----------------+--------------------+----------+------------+---------------------------+-----------------------+--------+---------+--

In [29]:
from pyspark.sql.functions import when,lit, col, current_timestamp, input_file_name

dfWithNewColumn = df.withColumn("lot", when(col("Quantity") >= 100, "a lot").otherwise("not a lot"))

dfWithNewColumn.show(5,0)

+----------------+--------------------+----------+--------------+---------------------------+-----------------------+--------+---------+--------+---------+
|SalesOrderNumber|SalesOrderLineNumber|OrderDate |CustomerName  |Email                      |Item                   |Quantity|UnitPrice|Tax     |lot      |
+----------------+--------------------+----------+--------------+---------------------------+-----------------------+--------+---------+--------+---------+
|SO43704         |1                   |2019-07-01|Julio Ruiz    |julio1@adventure-works.com |Mountain-100 Black, 48 |1       |3374.99  |269.9992|not a lot|
|SO43705         |1                   |2019-07-01|Curtis Lu     |curtis9@adventure-works.com|Mountain-100 Silver, 38|1       |3399.99  |271.9992|not a lot|
|SO43700         |1                   |2019-07-01|Ruben Prasad  |ruben10@adventure-works.com|Road-650 Black, 62     |1       |699.0982 |55.9279 |not a lot|
|SO43703         |1                   |2019-07-01|Albert Alvarez

**Replacing an Existing Column link to this section**

withColumn can also be used to replace an existing column in a DataFrame

In [30]:
from pyspark.sql.functions import upper

dfWithUppercaseName = df.withColumn("CustomerName", upper(col("CustomerName")))
dfWithUppercaseName.show(3)


+----------------+--------------------+----------+------------+--------------------+--------------------+--------+---------+--------+
|SalesOrderNumber|SalesOrderLineNumber| OrderDate|CustomerName|               Email|                Item|Quantity|UnitPrice|     Tax|
+----------------+--------------------+----------+------------+--------------------+--------------------+--------+---------+--------+
|         SO43704|                   1|2019-07-01|  JULIO RUIZ|julio1@adventure-...|Mountain-100 Blac...|       1|  3374.99|269.9992|
|         SO43705|                   1|2019-07-01|   CURTIS LU|curtis9@adventure...|Mountain-100 Silv...|       1|  3399.99|271.9992|
|         SO43700|                   1|2019-07-01|RUBEN PRASAD|ruben10@adventure...|  Road-650 Black, 62|       1| 699.0982| 55.9279|
+----------------+--------------------+----------+------------+--------------------+--------------------+--------+---------+--------+
only showing top 3 rows



In [31]:
from pyspark.sql.functions import lower

dfWithLowercaseName = df.withColumn("CustomerName", lower(col("CustomerName")))
dfWithLowercaseName.show(3)


+----------------+--------------------+----------+------------+--------------------+--------------------+--------+---------+--------+
|SalesOrderNumber|SalesOrderLineNumber| OrderDate|CustomerName|               Email|                Item|Quantity|UnitPrice|     Tax|
+----------------+--------------------+----------+------------+--------------------+--------------------+--------+---------+--------+
|         SO43704|                   1|2019-07-01|  julio ruiz|julio1@adventure-...|Mountain-100 Blac...|       1|  3374.99|269.9992|
|         SO43705|                   1|2019-07-01|   curtis lu|curtis9@adventure...|Mountain-100 Silv...|       1|  3399.99|271.9992|
|         SO43700|                   1|2019-07-01|ruben prasad|ruben10@adventure...|  Road-650 Black, 62|       1| 699.0982| 55.9279|
+----------------+--------------------+----------+------------+--------------------+--------------------+--------+---------+--------+
only showing top 3 rows



In [32]:
from pyspark.sql.functions import initcap

dfWithInitcapcaseName = df.withColumn("CustomerName", initcap(col("CustomerName")))
dfWithInitcapcaseName.show(3)

+----------------+--------------------+----------+------------+--------------------+--------------------+--------+---------+--------+
|SalesOrderNumber|SalesOrderLineNumber| OrderDate|CustomerName|               Email|                Item|Quantity|UnitPrice|     Tax|
+----------------+--------------------+----------+------------+--------------------+--------------------+--------+---------+--------+
|         SO43704|                   1|2019-07-01|  Julio Ruiz|julio1@adventure-...|Mountain-100 Blac...|       1|  3374.99|269.9992|
|         SO43705|                   1|2019-07-01|   Curtis Lu|curtis9@adventure...|Mountain-100 Silv...|       1|  3399.99|271.9992|
|         SO43700|                   1|2019-07-01|Ruben Prasad|ruben10@adventure...|  Road-650 Black, 62|       1| 699.0982| 55.9279|
+----------------+--------------------+----------+------------+--------------------+--------------------+--------+---------+--------+
only showing top 3 rows



#### Dropping columns

In [33]:
df_pyspark = df
df_pyspark = df_pyspark.drop("Email")
df_pyspark.show(5)

+----------------+--------------------+----------+--------------+--------------------+--------+---------+--------+
|SalesOrderNumber|SalesOrderLineNumber| OrderDate|  CustomerName|                Item|Quantity|UnitPrice|     Tax|
+----------------+--------------------+----------+--------------+--------------------+--------+---------+--------+
|         SO43704|                   1|2019-07-01|    Julio Ruiz|Mountain-100 Blac...|       1|  3374.99|269.9992|
|         SO43705|                   1|2019-07-01|     Curtis Lu|Mountain-100 Silv...|       1|  3399.99|271.9992|
|         SO43700|                   1|2019-07-01|  Ruben Prasad|  Road-650 Black, 62|       1| 699.0982| 55.9279|
|         SO43703|                   1|2019-07-01|Albert Alvarez|    Road-150 Red, 62|       1|  3578.27|286.2616|
|         SO43697|                   1|2019-07-01|   Cole Watson|    Road-150 Red, 62|       1|  3578.27|286.2616|
+----------------+--------------------+----------+--------------+---------------

#### Using functions

[Pyspark Function](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html)

In [34]:
from pyspark.sql.functions import when,lit, col, current_timestamp, input_file_name

 # Add columns IsFlagged, CreatedTS and ModifiedTS
df = df.withColumn("FileName", input_file_name())

df.show(5,0)

+----------------+--------------------+----------+--------------+---------------------------+-----------------------+--------+---------+--------+------------------------+
|SalesOrderNumber|SalesOrderLineNumber|OrderDate |CustomerName  |Email                      |Item                   |Quantity|UnitPrice|Tax     |FileName                |
+----------------+--------------------+----------+--------------+---------------------------+-----------------------+--------+---------+--------+------------------------+
|SO43704         |1                   |2019-07-01|Julio Ruiz    |julio1@adventure-works.com |Mountain-100 Black, 48 |1       |3374.99  |269.9992|file:///content/2019.csv|
|SO43705         |1                   |2019-07-01|Curtis Lu     |curtis9@adventure-works.com|Mountain-100 Silver, 38|1       |3399.99  |271.9992|file:///content/2019.csv|
|SO43700         |1                   |2019-07-01|Ruben Prasad  |ruben10@adventure-works.com|Road-650 Black, 62     |1       |699.0982 |55.9279 |

In [None]:
#df =df.withColumn("CreatedTS", current_timestamp()).withColumn("ModifiedTS", current_timestamp())

In [35]:
# Add specific columns

df =df.withColumn("CreatedTS", current_timestamp())
df =df.withColumn("ModifiedTS", current_timestamp())

df.show(3,0)

+----------------+--------------------+----------+------------+---------------------------+-----------------------+--------+---------+--------+------------------------+-------------------------+-------------------------+
|SalesOrderNumber|SalesOrderLineNumber|OrderDate |CustomerName|Email                      |Item                   |Quantity|UnitPrice|Tax     |FileName                |CreatedTS                |ModifiedTS               |
+----------------+--------------------+----------+------------+---------------------------+-----------------------+--------+---------+--------+------------------------+-------------------------+-------------------------+
|SO43704         |1                   |2019-07-01|Julio Ruiz  |julio1@adventure-works.com |Mountain-100 Black, 48 |1       |3374.99  |269.9992|file:///content/2019.csv|2025-07-11 20:35:18.05826|2025-07-11 20:35:18.05826|
|SO43705         |1                   |2019-07-01|Curtis Lu   |curtis9@adventure-works.com|Mountain-100 Silver, 38|1

In [17]:
desired_column=df.columns[-5:]

In [18]:
type(desired_column)

list

In [19]:
print(desired_column)

['Timezone', 'DST', 'FileName', 'CreatedTS', 'ModifiedTS']


In [20]:
df.select(df.columns[-5:]).show(3,0)

+--------+---+-------------------------------+--------------------------+--------------------------+
|Timezone|DST|FileName                       |CreatedTS                 |ModifiedTS                |
+--------+---+-------------------------------+--------------------------+--------------------------+
|-6      |1  |file:///content/postalcodes.csv|2025-07-11 20:28:27.196045|2025-07-11 20:28:27.196045|
|-6      |1  |file:///content/postalcodes.csv|2025-07-11 20:28:27.196045|2025-07-11 20:28:27.196045|
|-6      |1  |file:///content/postalcodes.csv|2025-07-11 20:28:27.196045|2025-07-11 20:28:27.196045|
+--------+---+-------------------------------+--------------------------+--------------------------+
only showing top 3 rows



In [36]:
df.select("CustomerName").show(5,0)

+--------------+
|CustomerName  |
+--------------+
|Julio Ruiz    |
|Curtis Lu     |
|Ruben Prasad  |
|Albert Alvarez|
|Cole Watson   |
+--------------+
only showing top 5 rows



In [None]:
#df.where(df.CustomerName.isNotNull()).show(10,0)
#df.where(df.CustomerName.isNotNull()).count()

In [None]:
df.where(df.CustomerName.isNull()).show(10,0)


In [None]:
df.where(df.CustomerName.isNull()).count()

In [None]:
df.filter(df.CustomerName == "").show(1)
df.filter(df.CustomerName == "").count()


In [None]:
df.filter(df.CustomerName != "").show(1)
df.filter(df.CustomerName != "").count()


In [None]:
from pyspark.sql.functions import when,lit, col, current_timestamp, input_file_name

# Update CustomerName to "Unknown" if CustomerName null or empty
dfnew2 = df.withColumn("CustomerName", when(col("CustomerName").isNull(),lit("Unknown")).otherwise(col("CustomerName")))

dfnew2.show(3)

### CLEANING DATA

#### Handle null values

In [None]:
#Load corrupted file


In [48]:

df.show(5)

df.count()

+----------------+--------------------+----------+--------------+--------------------+--------------------+--------+---------+--------+--------------------+--------------------+--------------------+
|SalesOrderNumber|SalesOrderLineNumber| OrderDate|  CustomerName|               Email|                Item|Quantity|UnitPrice|     Tax|            FileName|           CreatedTS|          ModifiedTS|
+----------------+--------------------+----------+--------------+--------------------+--------------------+--------+---------+--------+--------------------+--------------------+--------------------+
|         SO43704|                   1|2019-07-01|    Julio Ruiz|julio1@adventure-...|Mountain-100 Blac...|       1|  3374.99|269.9992|file:///content/2...|2025-07-11 20:43:...|2025-07-11 20:43:...|
|         SO43705|                   1|2019-07-01|     Curtis Lu|curtis9@adventure...|Mountain-100 Silv...|       1|  3399.99|271.9992|file:///content/2...|2025-07-11 20:43:...|2025-07-11 20:43:...|
|    

1200

In [46]:
# Drop rows with null values in the "CustomerName" column
df_filtered_nn = df.filter(col("CustomerName").isNotNull())
# Show the resulting DataFrame
df_filtered_nn.show(5)

df_filtered_nn.count()


+----------------+--------------------+----------+--------------+--------------------+--------------------+--------+---------+--------+--------------------+--------------------+--------------------+
|SalesOrderNumber|SalesOrderLineNumber| OrderDate|  CustomerName|               Email|                Item|Quantity|UnitPrice|     Tax|            FileName|           CreatedTS|          ModifiedTS|
+----------------+--------------------+----------+--------------+--------------------+--------------------+--------+---------+--------+--------------------+--------------------+--------------------+
|         SO43704|                   1|2019-07-01|    Julio Ruiz|julio1@adventure-...|Mountain-100 Blac...|       1|  3374.99|269.9992|file:///content/2...|2025-07-11 20:43:...|2025-07-11 20:43:...|
|         SO43705|                   1|2019-07-01|     Curtis Lu|curtis9@adventure...|Mountain-100 Silv...|       1|  3399.99|271.9992|file:///content/2...|2025-07-11 20:43:...|2025-07-11 20:43:...|
|    

1199

In [49]:
# Drop rows with null values in the "CustomerName" column
df_filtered_null = df.filter(col("CustomerName").isNull())
# Show the resulting DataFrame
df_filtered_null.show(5)

df_filtered_null.count()


+----------------+--------------------+----------+------------+--------------------+----------------+--------+---------+--------+--------------------+--------------------+--------------------+
|SalesOrderNumber|SalesOrderLineNumber| OrderDate|CustomerName|               Email|            Item|Quantity|UnitPrice|     Tax|            FileName|           CreatedTS|          ModifiedTS|
+----------------+--------------------+----------+------------+--------------------+----------------+--------+---------+--------+--------------------+--------------------+--------------------+
|         SO43710|                   1|2019-07-02|        NULL|katrina20@adventu...|Road-150 Red, 56|       1|  3578.27|286.2616|file:///content/2...|2025-07-11 20:44:...|2025-07-11 20:44:...|
+----------------+--------------------+----------+------------+--------------------+----------------+--------+---------+--------+--------------------+--------------------+--------------------+



1

#### Handle missing values

In [50]:
df_pyspark = df
df_pyspark.show(5)

df_pyspark.count()

+----------------+--------------------+----------+--------------+--------------------+--------------------+--------+---------+--------+--------------------+--------------------+--------------------+
|SalesOrderNumber|SalesOrderLineNumber| OrderDate|  CustomerName|               Email|                Item|Quantity|UnitPrice|     Tax|            FileName|           CreatedTS|          ModifiedTS|
+----------------+--------------------+----------+--------------+--------------------+--------------------+--------+---------+--------+--------------------+--------------------+--------------------+
|         SO43704|                   1|2019-07-01|    Julio Ruiz|julio1@adventure-...|Mountain-100 Blac...|       1|  3374.99|269.9992|file:///content/2...|2025-07-11 20:44:...|2025-07-11 20:44:...|
|         SO43705|                   1|2019-07-01|     Curtis Lu|curtis9@adventure...|Mountain-100 Silv...|       1|  3399.99|271.9992|file:///content/2...|2025-07-11 20:44:...|2025-07-11 20:44:...|
|    

1200

#### Remove null value

In [52]:
df_pyspark.na.drop().count()

1199

#### Filling null values

In [53]:
df_pyspark.na.fill("Missing").show()

+----------------+--------------------+----------+------------------+--------------------+--------------------+--------+---------+--------+--------------------+--------------------+--------------------+
|SalesOrderNumber|SalesOrderLineNumber| OrderDate|      CustomerName|               Email|                Item|Quantity|UnitPrice|     Tax|            FileName|           CreatedTS|          ModifiedTS|
+----------------+--------------------+----------+------------------+--------------------+--------------------+--------+---------+--------+--------------------+--------------------+--------------------+
|         SO43704|                   1|2019-07-01|        Julio Ruiz|julio1@adventure-...|Mountain-100 Blac...|       1|  3374.99|269.9992|file:///content/2...|2025-07-11 20:45:...|2025-07-11 20:45:...|
|         SO43705|                   1|2019-07-01|         Curtis Lu|curtis9@adventure...|Mountain-100 Silv...|       1|  3399.99|271.9992|file:///content/2...|2025-07-11 20:45:...|2025-07

### USING SQL

In [54]:
df.show(2)

+----------------+--------------------+----------+------------+--------------------+--------------------+--------+---------+--------+--------------------+--------------------+--------------------+
|SalesOrderNumber|SalesOrderLineNumber| OrderDate|CustomerName|               Email|                Item|Quantity|UnitPrice|     Tax|            FileName|           CreatedTS|          ModifiedTS|
+----------------+--------------------+----------+------------+--------------------+--------------------+--------+---------+--------+--------------------+--------------------+--------------------+
|         SO43704|                   1|2019-07-01|  Julio Ruiz|julio1@adventure-...|Mountain-100 Blac...|       1|  3374.99|269.9992|file:///content/2...|2025-07-11 20:45:...|2025-07-11 20:45:...|
|         SO43705|                   1|2019-07-01|   Curtis Lu|curtis9@adventure...|Mountain-100 Silv...|       1|  3399.99|271.9992|file:///content/2...|2025-07-11 20:45:...|2025-07-11 20:45:...|
+--------------

In [None]:
## Assuming you have a PySpark DataFrame named "df"
df.createOrReplaceTempView("customerdata")

In [None]:
# SQL Select query
spark.sql("SELECT *  FROM customerdata").show(5)

+----------------+--------------------+----------+--------------+--------------------+--------------------+--------+---------+--------+--------------------+--------------------+--------------------+
|SalesOrderNumber|SalesOrderLineNumber| OrderDate|  CustomerName|               Email|                Item|Quantity|UnitPrice|     Tax|            FileName|           CreatedTS|          ModifiedTS|
+----------------+--------------------+----------+--------------+--------------------+--------------------+--------+---------+--------+--------------------+--------------------+--------------------+
|         SO43701|                   1|2019-07-01|          null|christy12@adventu...|Mountain-100 Silv...|       1|  3399.99|271.9992|file:///C:/Users/...|2024-04-03 18:17:...|2024-04-03 18:17:...|
|         SO43704|                   1|2019-07-01|    Julio Ruiz|julio1@adventure-...|Mountain-100 Blac...|       1|  3374.99|269.9992|file:///C:/Users/...|2024-04-03 18:17:...|2024-04-03 18:17:...|
|    

##### FULL COMPARISON

In [None]:
# Import
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()


In [57]:

# Create DataFrame
df = spark.read \
          .option("header",True) \
          .csv("./postalcodes.csv")
df.printSchema()
df.show(3)


root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zip: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- DST: string (nullable = true)

+--------+-----+-----+---------+---------+--------+---+
|    City|State|  Zip| Latitude|Longitude|Timezone|DST|
+--------+-----+-----+---------+---------+--------+---+
|    Cove|   AR|90085|34.398483|-94.39398|      -6|  1|
|Edgemont|   AR|72044|35.624351|-92.16056|      -6|  1|
|Sherburn|   MN|90100|43.660847|-94.74357|      -6|  1|
+--------+-----+-----+---------+---------+--------+---+
only showing top 3 rows



In [58]:
# Create SQL table
spark.read \
          .option("header",True) \
          .csv("./postalcodes.csv") \
          .createOrReplaceTempView("Zipcodes")


In [63]:
# Select query
df.select("city","zip","state").show(5)

spark.sql("SELECT  city, zip, state FROM ZIPCODES").show(5)


+--------+-----+-----+
|    city|  zip|state|
+--------+-----+-----+
|    Cove|90085|   AR|
|Edgemont|72044|   AR|
|Sherburn|90100|   MN|
|  Lamont|90107|   MI|
|Richland|52585|   IA|
+--------+-----+-----+
only showing top 5 rows

+--------+-----+-----+
|    city|  zip|state|
+--------+-----+-----+
|    Cove|90085|   AR|
|Edgemont|72044|   AR|
|Sherburn|90100|   MN|
|  Lamont|90107|   MI|
|Richland|52585|   IA|
+--------+-----+-----+
only showing top 5 rows



In [65]:
# where
df.select("city","zip","state") \
  .where("state == 'AZ'") \
  .show(5)

spark.sql(""" SELECT  city, zip, state FROM ZIPCODES
          WHERE state = 'AZ' """) \
     .show(5)

+--------------+-----+-----+
|          city|  zip|state|
+--------------+-----+-----+
|Fountain Hills|90548|   AZ|
|     Winkelman|90586|   AZ|
|        Peoria|90739|   AZ|
|      Chandler|85225|   AZ|
|        Lupton|86508|   AZ|
+--------------+-----+-----+
only showing top 5 rows

+--------------+-----+-----+
|          city|  zip|state|
+--------------+-----+-----+
|Fountain Hills|90548|   AZ|
|     Winkelman|90586|   AZ|
|        Peoria|90739|   AZ|
|      Chandler|85225|   AZ|
|        Lupton|86508|   AZ|
+--------------+-----+-----+
only showing top 5 rows



In [68]:
# sorting
df.select("city","zip","state") \
  .where("state in ('PR','AZ','FL')") \
  .orderBy("state") \
  .show(10)

spark.sql(""" SELECT  city, zip, state FROM ZIPCODES
          WHERE state in ('PR','AZ','FL') order by state """) \
     .show(10)

+----------------+-----+-----+
|            city|  zip|state|
+----------------+-----+-----+
|        Chandler|85225|   AZ|
|Chandler Heights|85227|   AZ|
|          Lupton|86508|   AZ|
|  Fountain Hills|90548|   AZ|
|      Scottsdale|85262|   AZ|
|   Golden Valley|86413|   AZ|
|      Cave Creek|85327|   AZ|
|          Peoria|90739|   AZ|
|            Pima|85543|   AZ|
|   Fort Huachuca|85670|   AZ|
+----------------+-----+-----+
only showing top 10 rows

+----------------+-----+-----+
|            city|  zip|state|
+----------------+-----+-----+
|        Chandler|85225|   AZ|
|Chandler Heights|85227|   AZ|
|          Lupton|86508|   AZ|
|  Fountain Hills|90548|   AZ|
|      Scottsdale|85262|   AZ|
|   Golden Valley|86413|   AZ|
|      Cave Creek|85327|   AZ|
|          Peoria|90739|   AZ|
|            Pima|85543|   AZ|
|   Fort Huachuca|85670|   AZ|
+----------------+-----+-----+
only showing top 10 rows



In [70]:
# grouping
df.groupBy("state").count() \
  .show(3)

spark.sql(""" SELECT state, count(*) as count FROM ZIPCODES
          GROUP BY state""") \
     .show(3)

+-----+-----+
|state|count|
+-----+-----+
|   SC|  554|
|   AZ|  538|
|   LA|  755|
+-----+-----+
only showing top 3 rows

+-----+-----+
|state|count|
+-----+-----+
|   SC|  554|
|   AZ|  538|
|   LA|  755|
+-----+-----+
only showing top 3 rows



### Query catalog

In [71]:
spark.catalog.currentCatalog()

'spark_catalog'

In [72]:
spark.catalog.currentDatabase()

'default'

In [73]:
spark.catalog.listCatalogs()

[CatalogMetadata(name='spark_catalog', description=None)]

In [74]:
spark.catalog.listDatabases()

[Database(name='default', catalog='spark_catalog', description='default database', locationUri='file:/content/spark-warehouse')]

In [75]:
spark.catalog.listTables("default")

[Table(name='postalcodes', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='Zipcodes', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]