### Importing Necessary Libraries

In [1]:
from pyspark.sql import SparkSession, functions as F, types as T
from pyspark.sql import *

### Creating Spark Session

In [2]:
spark = SparkSession.builder.master("local[1]").appName("SparkPractice.com").getOrCreate()
spark

### Reading JSON File

In [3]:
data = spark.read.json('zipcodes.json')
data.createOrReplaceTempView("ZIPCODES")
data.show()

+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+-------+-------------+------------+-----+---------------+----------+-----------+-----+-----+-----+-----------+-------+
|               City|Country|Decommisioned|EstimatedPopulation|  Lat|            Location|        LocationText|  LocationType|   Long|        Notes|RecordNumber|State|TaxReturnsFiled|TotalWages|WorldRegion|Xaxis|Yaxis|Zaxis|ZipCodeType|Zipcode|
+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+-------+-------------+------------+-----+---------------+----------+-----------+-----+-----+-----+-----------+-------+
|        PARC PARQUE|     US|        false|               null|17.96|NA-US-PR-PARC PARQUE|     Parc Parque, PR|NOT ACCEPTABLE| -66.22|         null|           1|   PR|           null|      null|         NA| 0.38|-0.87|  0.3|   STANDARD|    704|
|PASEO COSTA DEL SUR

### About the data

In [4]:
data.printSchema()

root
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Decommisioned: boolean (nullable = true)
 |-- EstimatedPopulation: long (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- LocationText: string (nullable = true)
 |-- LocationType: string (nullable = true)
 |-- Long: double (nullable = true)
 |-- Notes: string (nullable = true)
 |-- RecordNumber: long (nullable = true)
 |-- State: string (nullable = true)
 |-- TaxReturnsFiled: long (nullable = true)
 |-- TotalWages: long (nullable = true)
 |-- WorldRegion: string (nullable = true)
 |-- Xaxis: double (nullable = true)
 |-- Yaxis: double (nullable = true)
 |-- Zaxis: double (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- Zipcode: long (nullable = true)



In [5]:
data.count()

21

### Selecting Columns

In [6]:
data.select("country","city","zipcode","state").show(5)

+-------+-------------------+-------+-----+
|country|               city|zipcode|state|
+-------+-------------------+-------+-----+
|     US|        PARC PARQUE|    704|   PR|
|     US|PASEO COSTA DEL SUR|    704|   PR|
|     US|       BDA SAN LUIS|    709|   PR|
|     US|  CINGULAR WIRELESS|  76166|   TX|
|     US|         FORT WORTH|  76177|   TX|
+-------+-------------------+-------+-----+
only showing top 5 rows



spark.sql("SELECT country, city, zipcode, state FROM ZIPCODES").show(5)

### Filtering Rows

In [7]:
data.select("country","city","zipcode","state").where("state == 'AZ'").show(5)

+-------+----+-------+-----+
|country|city|zipcode|state|
+-------+----+-------+-----+
|     US|MESA|  85209|   AZ|
|     US|MESA|  85210|   AZ|
+-------+----+-------+-----+



spark.sql(" SELECT  country, city, zipcode, state FROM ZIPCODES WHERE state = 'AZ' ").show(5)

### Sorting/Ordering

In [8]:
data.select("country","city","zipcode","state").where("state in ('PR','AZ','FL')").orderBy("state").show(10)

+-------+-------------------+-------+-----+
|country|               city|zipcode|state|
+-------+-------------------+-------+-----+
|     US|               MESA|  85209|   AZ|
|     US|               MESA|  85210|   AZ|
|     US|           HILLIARD|  32046|   FL|
|     US|          HOMOSASSA|  34487|   FL|
|     US|             HOLDER|  34445|   FL|
|     US|               HOLT|  32564|   FL|
|     US|        PARC PARQUE|    704|   PR|
|     US|PASEO COSTA DEL SUR|    704|   PR|
|     US|       BDA SAN LUIS|    709|   PR|
|     US|    URB EUGENE RICE|    704|   PR|
+-------+-------------------+-------+-----+
only showing top 10 rows



spark.sql(" SELECT  country, city, zipcode, state FROM ZIPCODES WHERE state in ('PR','AZ','FL') order by state ").show(10)

### Grouping

In [9]:
data.groupBy("state").count().show()

+-----+-----+
|state|count|
+-----+-----+
|   AZ|    2|
|   NC|    3|
|   AL|    3|
|   TX|    3|
|   FL|    4|
|   PR|    6|
+-----+-----+



spark.sql(" SELECT state, count(*) as count FROM ZIPCODES GROUP BY state").show()

### Creating new data frame from existing dataframe

In [10]:
csdata = data.select("country", "state").limit(18)
csdata.show()

+-------+-----+
|country|state|
+-------+-----+
|     US|   PR|
|     US|   PR|
|     US|   PR|
|     US|   TX|
|     US|   TX|
|     US|   TX|
|     US|   PR|
|     US|   AZ|
|     US|   AZ|
|     US|   FL|
|     US|   FL|
|     US|   FL|
|     US|   FL|
|     US|   PR|
|     US|   PR|
|     US|   AL|
|     US|   AL|
|     US|   AL|
+-------+-----+



### Dropping duplicates using **dropDuplicates**

In [11]:
csdata1 = csdata.dropDuplicates(["state"])
csdata1.show()

+-------+-----+
|country|state|
+-------+-----+
|     US|   AL|
|     US|   AZ|
|     US|   FL|
|     US|   PR|
|     US|   TX|
+-------+-----+



### Dropping duplicates using **distinct**

In [12]:
csdata2 = csdata.distinct()
csdata2.show()

+-------+-----+
|country|state|
+-------+-----+
|     US|   PR|
|     US|   TX|
|     US|   AZ|
|     US|   FL|
|     US|   AL|
+-------+-----+



### Dropping rows using **filter**

In [13]:
csdata3 = csdata1.filter(csdata.state != "AL")
csdata3.show()

+-------+-----+
|country|state|
+-------+-----+
|     US|   AZ|
|     US|   FL|
|     US|   PR|
|     US|   TX|
+-------+-----+



### Dropping rows using **where**

In [14]:
csdata4 = csdata2.where("state != 'TX'")
csdata4.show()

+-------+-----+
|country|state|
+-------+-----+
|     US|   PR|
|     US|   AZ|
|     US|   FL|
|     US|   AL|
+-------+-----+



### Dropping rows using **drop**

Drop a row with index 0 (first row)

csdata4 = csdata1.drop(0)

### Adding a row by using **union**

new_row = spark.createDataFrame([("UK", "NI")], ["country", "state"])

csdata = csdata.union(new_row)

### Adding a row by using **collect** and **append**

In [15]:
#Collect the entire existing DataFrame as a list.
data_list = csdata.collect()
#Append the new row data to the list.
data_list.append(("UK","NI"))
#Create a new DataFrame from the updated list.
# newdata = spark.createDataFrame(data_list, csdata.schema)

### Adding a row by using **Spark SQL**

spark.sql("INSERT INTO csdata VALUES ('UK','NI')")

### Joins

In [16]:
left_join = data.join(csdata3, on="state", how="left")
left_join.show()

+-----+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+-------+-------------+------------+---------------+----------+-----------+-----+-----+-----+-----------+-------+-------+
|State|               City|Country|Decommisioned|EstimatedPopulation|  Lat|            Location|        LocationText|  LocationType|   Long|        Notes|RecordNumber|TaxReturnsFiled|TotalWages|WorldRegion|Xaxis|Yaxis|Zaxis|ZipCodeType|Zipcode|country|
+-----+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+-------+-------------+------------+---------------+----------+-----------+-----+-----+-----+-----------+-------+-------+
|   PR|        PARC PARQUE|     US|        false|               null|17.96|NA-US-PR-PARC PARQUE|     Parc Parque, PR|NOT ACCEPTABLE| -66.22|         null|           1|           null|      null|         NA| 0.38|-0.87|  0.3|   STANDARD|    7

**Left join** keeps all rows from the left DataFrame and matching rows from the right DataFrame based on the join condition.

In [17]:
ljoin = data.join(csdata3, data.State == csdata3.state, 'left')
ljoin.show()

+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+-------+-------------+------------+-----+---------------+----------+-----------+-----+-----+-----+-----------+-------+-------+-----+
|               City|Country|Decommisioned|EstimatedPopulation|  Lat|            Location|        LocationText|  LocationType|   Long|        Notes|RecordNumber|State|TaxReturnsFiled|TotalWages|WorldRegion|Xaxis|Yaxis|Zaxis|ZipCodeType|Zipcode|country|state|
+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+-------+-------------+------------+-----+---------------+----------+-----------+-----+-----+-----+-----------+-------+-------+-----+
|        PARC PARQUE|     US|        false|               null|17.96|NA-US-PR-PARC PARQUE|     Parc Parque, PR|NOT ACCEPTABLE| -66.22|         null|           1|   PR|           null|      null|         NA| 0.38|-0.87|  0.3

Match on different columns in left & right datasets

In [18]:
mjoin = data.join(csdata4, ['country', 'state'], 'left')
mjoin.show()

+-------+-----+-------------------+-------------+-------------------+-----+--------------------+--------------------+--------------+-------+-------------+------------+---------------+----------+-----------+-----+-----+-----+-----------+-------+
|Country|State|               City|Decommisioned|EstimatedPopulation|  Lat|            Location|        LocationText|  LocationType|   Long|        Notes|RecordNumber|TaxReturnsFiled|TotalWages|WorldRegion|Xaxis|Yaxis|Zaxis|ZipCodeType|Zipcode|
+-------+-----+-------------------+-------------+-------------------+-----+--------------------+--------------------+--------------+-------+-------------+------------+---------------+----------+-----------+-----+-----+-----+-----------+-------+
|     US|   PR|        PARC PARQUE|        false|               null|17.96|NA-US-PR-PARC PARQUE|     Parc Parque, PR|NOT ACCEPTABLE| -66.22|         null|           1|           null|      null|         NA| 0.38|-0.87|  0.3|   STANDARD|    704|
|     US|   PR|PASEO

Match on multiple columns

In [19]:
right_join = data.join(csdata4, on="state", how="right")
right_join.show()

+-----+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+-------+-------------+------------+---------------+----------+-----------+-----+-----+-----+-----------+-------+-------+
|state|               City|Country|Decommisioned|EstimatedPopulation|  Lat|            Location|        LocationText|  LocationType|   Long|        Notes|RecordNumber|TaxReturnsFiled|TotalWages|WorldRegion|Xaxis|Yaxis|Zaxis|ZipCodeType|Zipcode|country|
+-----+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+-------+-------------+------------+---------------+----------+-----------+-----+-----+-----+-----------+-------+-------+
|   PR|      SECT LANAUSSE|     US|        false|               null|17.96|NA-US-PR-SECT LAN...|   Sect Lanausse, PR|NOT ACCEPTABLE| -66.22|         null|           3|           null|      null|         NA| 0.38|-0.87|  0.3|   STANDARD|    7

**Right join** keeps all rows from the right DataFrame and matching rows from the left DataFrame based on the join condition.

In [20]:
fullouter_join = data.join(csdata3, on="state", how="full")
fullouter_join.show()

+-----+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+-------+-------------+------------+---------------+----------+-----------+-----+-----+-----+-----------+-------+-------+
|State|               City|Country|Decommisioned|EstimatedPopulation|  Lat|            Location|        LocationText|  LocationType|   Long|        Notes|RecordNumber|TaxReturnsFiled|TotalWages|WorldRegion|Xaxis|Yaxis|Zaxis|ZipCodeType|Zipcode|country|
+-----+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+-------+-------------+------------+---------------+----------+-----------+-----+-----+-----+-----------+-------+-------+
|   AL|      SPRING GARDEN|     US|        false|               null|33.97|NA-US-AL-SPRING G...|   Spring Garden, AL|       PRIMARY| -85.55|         null|       54354|           null|      null|         NA| 0.06|-0.82| 0.55|     PO BOX|  362

**Full Outer join** includes all rows from both DataFrames, regardless of whether there's a match in the other DataFrame

In [21]:
leftsemi_join = data.join(csdata4, on="state", how="leftsemi")
leftsemi_join.show()

+-----+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+-------+-------------+------------+---------------+----------+-----------+-----+-----+-----+-----------+-------+
|State|               City|Country|Decommisioned|EstimatedPopulation|  Lat|            Location|        LocationText|  LocationType|   Long|        Notes|RecordNumber|TaxReturnsFiled|TotalWages|WorldRegion|Xaxis|Yaxis|Zaxis|ZipCodeType|Zipcode|
+-----+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+-------+-------------+------------+---------------+----------+-----------+-----+-----+-----+-----------+-------+
|   PR|        PARC PARQUE|     US|        false|               null|17.96|NA-US-PR-PARC PARQUE|     Parc Parque, PR|NOT ACCEPTABLE| -66.22|         null|           1|           null|      null|         NA| 0.38|-0.87|  0.3|   STANDARD|    704|
|   PR|PASEO COSTA D

**Left Semi Join** returns only rows from the left DataFrame that have a matching value in the right DataFrame.

In [22]:
leftanti_join = data.join(csdata3, on="state", how="leftanti")
leftanti_join.show()

+-----+-------------+-------+-------------+-------------------+-----+--------------------+-----------------+--------------+------+-----+------------+---------------+----------+-----------+-----+-----+-----+-----------+-------+
|State|         City|Country|Decommisioned|EstimatedPopulation|  Lat|            Location|     LocationText|  LocationType|  Long|Notes|RecordNumber|TaxReturnsFiled|TotalWages|WorldRegion|Xaxis|Yaxis|Zaxis|ZipCodeType|Zipcode|
+-----+-------------+-------+-------------+-------------------+-----+--------------------+-----------------+--------------+------+-----+------------+---------------+----------+-----------+-----+-----+-----+-----------+-------+
|   AL|SPRING GARDEN|     US|        false|               null|33.97|NA-US-AL-SPRING G...|Spring Garden, AL|       PRIMARY|-85.55| null|       54354|           null|      null|         NA| 0.06|-0.82| 0.55|     PO BOX|  36275|
|   AL|  SPRINGVILLE|     US|        false|               7845|33.77|NA-US-AL-SPRINGVILLE|  

In [23]:
#leftanti_join = data.join(csdata3,"state","leftanti")

**Left Anti Join** returns only rows from the left DataFrame that have no matching value in the right DataFrame

### Adding a new static column

In [24]:
data2 = csdata4.withColumn('WorldRegion', F.lit('America'))
data2.show()

+-------+-----+-----------+
|country|state|WorldRegion|
+-------+-----+-----------+
|     US|   PR|    America|
|     US|   AZ|    America|
|     US|   FL|    America|
|     US|   AL|    America|
+-------+-----+-----------+



### Dropping a column

In [25]:
data2 = data2.drop('state')
data2.show()

+-------+-----------+
|country|WorldRegion|
+-------+-----------+
|     US|    America|
|     US|    America|
|     US|    America|
|     US|    America|
+-------+-----------+



### Renaming a Column

In [26]:
data2 = data2.withColumnRenamed('WorldRegion', 'World Region')
data2.show()

+-------+------------+
|country|World Region|
+-------+------------+
|     US|     America|
|     US|     America|
|     US|     America|
|     US|     America|
+-------+------------+



### Cleaning Columns

In [27]:
for col in data2.columns:
    data3 = data2.withColumnRenamed(col, col.lower().replace(' ', '_').replace('-', '_'))
data3.show(5)

+-------+------------+
|country|world_region|
+-------+------------+
|     US|     America|
|     US|     America|
|     US|     America|
|     US|     America|
+-------+------------+



### Casting a column to a different data type

In [28]:
data4 = data.withColumn('TotalWages', data.TotalWages.cast("double"))
data4.printSchema()

root
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Decommisioned: boolean (nullable = true)
 |-- EstimatedPopulation: long (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- LocationText: string (nullable = true)
 |-- LocationType: string (nullable = true)
 |-- Long: double (nullable = true)
 |-- Notes: string (nullable = true)
 |-- RecordNumber: long (nullable = true)
 |-- State: string (nullable = true)
 |-- TaxReturnsFiled: long (nullable = true)
 |-- TotalWages: double (nullable = true)
 |-- WorldRegion: string (nullable = true)
 |-- Xaxis: double (nullable = true)
 |-- Yaxis: double (nullable = true)
 |-- Zaxis: double (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- Zipcode: long (nullable = true)



### Coalescing Null Values with a default value

data5 = data.withColumn("Notes", data.Notes.coalesce("not null"))

In [29]:
data5 = data.fillna({'Notes': 'not null'})
data5.show()

+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+-------+-------------+------------+-----+---------------+----------+-----------+-----+-----+-----+-----------+-------+
|               City|Country|Decommisioned|EstimatedPopulation|  Lat|            Location|        LocationText|  LocationType|   Long|        Notes|RecordNumber|State|TaxReturnsFiled|TotalWages|WorldRegion|Xaxis|Yaxis|Zaxis|ZipCodeType|Zipcode|
+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+-------+-------------+------------+-----+---------------+----------+-----------+-----+-----+-----+-----------+-------+
|        PARC PARQUE|     US|        false|               null|17.96|NA-US-PR-PARC PARQUE|     Parc Parque, PR|NOT ACCEPTABLE| -66.22|     not null|           1|   PR|           null|      null|         NA| 0.38|-0.87|  0.3|   STANDARD|    704|
|PASEO COSTA DEL SUR