In [1]:
import os
os.environ['SPARK_HOME'] = 'C:/Users/RubyEllik/spark-3.5.2-bin-hadoop3'
os.environ['PATH'] += os.pathsep + os.path.join(os.environ['SPARK_HOME'], 'bin')

In [2]:
import os
from pyspark.sql import SparkSession

# Configure Spark
os.environ['SPARK_HOME'] = 'C:/Users/RubyEllik/spark-3.5.2-bin-hadoop3'
os.environ['PATH'] += os.pathsep + os.path.join(os.environ['SPARK_HOME'], 'bin')

# Create Spark session
spark = SparkSession.builder \
    .appName("Jupyter Notebook") \
    .getOrCreate()

In [3]:
spark

#### Read data and make the column names as the header

In [4]:
df=spark.read.csv('housing.csv', header=True, inferSchema=True)

In [5]:
df

DataFrame[longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string]

In [6]:
df.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

#### Get the number of rows and columns

In [7]:
df.count(),len(df.columns)

(20640, 10)

#### First 10 records, where data is not truncated

In [8]:
df.show(10, truncate=False)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|-122.23  |37.88   |41.0              |880.0      |129.0         |322.0     |126.0     |8.3252       |452600.0          |NEAR BAY       |
|-122.22  |37.86   |21.0              |7099.0     |1106.0        |2401.0    |1138.0    |8.3014       |358500.0          |NEAR BAY       |
|-122.24  |37.85   |52.0              |1467.0     |190.0         |496.0     |177.0     |7.2574       |352100.0          |NEAR BAY       |
|-122.25  |37.85   |52.0              |1274.0     |235.0         |558.0     |219.0     |5.6431       |341300.0          |NEAR BAY       |
|-122.25  |37.85   |52.0          

In [9]:
df.select('housing_median_age').show()

+------------------+
|housing_median_age|
+------------------+
|              41.0|
|              21.0|
|              52.0|
|              52.0|
|              52.0|
|              52.0|
|              52.0|
|              52.0|
|              42.0|
|              52.0|
|              52.0|
|              52.0|
|              52.0|
|              52.0|
|              52.0|
|              50.0|
|              52.0|
|              52.0|
|              50.0|
|              52.0|
+------------------+
only showing top 20 rows



#### Select All coulmns

In [10]:
df.select([col for col in df.columns]).show()
#df.select("*").show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

#### To select column by index

In [11]:
df.select(df.columns[:3]).show(3)

+---------+--------+------------------+
|longitude|latitude|housing_median_age|
+---------+--------+------------------+
|  -122.23|   37.88|              41.0|
|  -122.22|   37.86|              21.0|
|  -122.24|   37.85|              52.0|
+---------+--------+------------------+
only showing top 3 rows



In [12]:
df.select(df.columns[2:4]).show(3)

+------------------+-----------+
|housing_median_age|total_rooms|
+------------------+-----------+
|              41.0|      880.0|
|              21.0|     7099.0|
|              52.0|     1467.0|
+------------------+-----------+
only showing top 3 rows



In [13]:
#df.collect()[0]   #0th Row and all columns in 0th row
df.collect()[0][0] # displays 1st element


-122.23

##### select() is a transformation that returns a new DataFrame and holds the columns that are selected whereas collect() is an action that returns the entire data set in an Array to the driver.

In [14]:
######################################################################################################################

#### WithColumn

In [15]:
from pyspark.sql.functions import col
df.withColumn("total_rooms",col("total_rooms")*1).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

#### Rename A Column

In [16]:
df.withColumnRenamed("total_rooms","Total_rooms").show(truncate=False) 

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|Total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|-122.23  |37.88   |41.0              |880.0      |129.0         |322.0     |126.0     |8.3252       |452600.0          |NEAR BAY       |
|-122.22  |37.86   |21.0              |7099.0     |1106.0        |2401.0    |1138.0    |8.3014       |358500.0          |NEAR BAY       |
|-122.24  |37.85   |52.0              |1467.0     |190.0         |496.0     |177.0     |7.2574       |352100.0          |NEAR BAY       |
|-122.25  |37.85   |52.0              |1274.0     |235.0         |558.0     |219.0     |5.6431       |341300.0          |NEAR BAY       |
|-122.25  |37.85   |52.0          

#### Copy One Column

In [17]:
df.withColumn("CopiedColumn",col("Total_rooms")*100).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|CopiedColumn|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|     88000.0|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|    709900.0|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|    146700.0|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|      

#### Drop a Column

In [18]:
df.drop("copiedColumn")

DataFrame[longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string]

In [19]:
df.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

#### Filter

In [20]:
df.filter(df.latitude== 37.85).show(truncate=False)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|-122.24  |37.85   |52.0              |1467.0     |190.0         |496.0     |177.0     |7.2574       |352100.0          |NEAR BAY       |
|-122.25  |37.85   |52.0              |1274.0     |235.0         |558.0     |219.0     |5.6431       |341300.0          |NEAR BAY       |
|-122.25  |37.85   |52.0              |1627.0     |280.0         |565.0     |259.0     |3.8462       |342200.0          |NEAR BAY       |
|-122.25  |37.85   |52.0              |919.0      |213.0         |413.0     |193.0     |4.0368       |269700.0          |NEAR BAY       |
|-122.26  |37.85   |52.0          

In [21]:
df.filter(df.ocean_proximity!="NEAR BAY").show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -121.97|   37.64|              32.0|     1283.0|         194.0|     485.0|     171.0|       6.0574|          431000.0|      <1H OCEAN|
|  -121.99|   37.61|               9.0|     3666.0|         711.0|    2341.0|     703.0|       4.6458|          217000.0|      <1H OCEAN|
|  -121.97|   37.57|              21.0|     4342.0|         783.0|    2172.0|     789.0|       4.6146|          247600.0|      <1H OCEAN|
|  -121.96|   37.58|              15.0|     3575.0|         597.0|    1777.0|     559.0|       5.7192|          283500.0|      <1H OCEAN|
|  -121.98|   37.58|              

In [22]:
df.filter(~(df.ocean_proximity=="NEAR BAY")).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -121.97|   37.64|              32.0|     1283.0|         194.0|     485.0|     171.0|       6.0574|          431000.0|      <1H OCEAN|
|  -121.99|   37.61|               9.0|     3666.0|         711.0|    2341.0|     703.0|       4.6458|          217000.0|      <1H OCEAN|
|  -121.97|   37.57|              21.0|     4342.0|         783.0|    2172.0|     789.0|       4.6146|          247600.0|      <1H OCEAN|
|  -121.96|   37.58|              15.0|     3575.0|         597.0|    1777.0|     559.0|       5.7192|          283500.0|      <1H OCEAN|
|  -121.98|   37.58|              

In [23]:
df.filter(col("ocean_proximity")=="NEAR BAY").show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [24]:
df.filter("ocean_proximity=='NEAR BAY'").show()  ### SQL EXPRESSION
df.filter("ocean_proximity!='NEAR BAY'").show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [25]:
df.filter("ocean_proximity<>'NEAR BAY'").show()  ##Not equal to

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -121.97|   37.64|              32.0|     1283.0|         194.0|     485.0|     171.0|       6.0574|          431000.0|      <1H OCEAN|
|  -121.99|   37.61|               9.0|     3666.0|         711.0|    2341.0|     703.0|       4.6458|          217000.0|      <1H OCEAN|
|  -121.97|   37.57|              21.0|     4342.0|         783.0|    2172.0|     789.0|       4.6146|          247600.0|      <1H OCEAN|
|  -121.96|   37.58|              15.0|     3575.0|         597.0|    1777.0|     559.0|       5.7192|          283500.0|      <1H OCEAN|
|  -121.98|   37.58|              

In [26]:
df.filter((df.ocean_proximity=="NEAR BAY")&(df.latitude=="37.85")).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1627.0|         280.0|     565.0|     259.0|       3.8462|          342200.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|      919.0|         213.0|     413.0|     193.0|       4.0368|          269700.0|       NEAR BAY|
|  -122.26|   37.85|              

In [27]:
df.filter((df.ocean_proximity=="NEAR BAY")|(df.latitude=="37.85")).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

#### Filter isin()

In [28]:
list=[37.84,37.85]
df.filter(df.latitude.isin(list)).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1627.0|         280.0|     565.0|     259.0|       3.8462|          342200.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|      919.0|         213.0|     413.0|     193.0|       4.0368|          269700.0|       NEAR BAY|
|  -122.25|   37.84|              

#### Filter startswith(), endswith(), contains()

In [29]:
df.filter(df.ocean_proximity.startswith("N")).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [30]:
df.filter(df.ocean_proximity.endswith("Y")).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [31]:
df.filter(df.ocean_proximity.contains("H")).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -121.97|   37.64|              32.0|     1283.0|         194.0|     485.0|     171.0|       6.0574|          431000.0|      <1H OCEAN|
|  -121.99|   37.61|               9.0|     3666.0|         711.0|    2341.0|     703.0|       4.6458|          217000.0|      <1H OCEAN|
|  -121.97|   37.57|              21.0|     4342.0|         783.0|    2172.0|     789.0|       4.6146|          247600.0|      <1H OCEAN|
|  -121.96|   37.58|              15.0|     3575.0|         597.0|    1777.0|     559.0|       5.7192|          283500.0|      <1H OCEAN|
|  -121.98|   37.58|              

In [32]:
df.filter(df.ocean_proximity.like("%H%")).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -121.97|   37.64|              32.0|     1283.0|         194.0|     485.0|     171.0|       6.0574|          431000.0|      <1H OCEAN|
|  -121.99|   37.61|               9.0|     3666.0|         711.0|    2341.0|     703.0|       4.6458|          217000.0|      <1H OCEAN|
|  -121.97|   37.57|              21.0|     4342.0|         783.0|    2172.0|     789.0|       4.6146|          247600.0|      <1H OCEAN|
|  -121.96|   37.58|              15.0|     3575.0|         597.0|    1777.0|     559.0|       5.7192|          283500.0|      <1H OCEAN|
|  -121.98|   37.58|              

In [33]:
###########from pyspark.sql.functions import array_contains
###########df.filter(array_contains(df.ocean_proximity, "NEAR BAY")).show(truncate=False)  #### ocean _proximity is string type. If array type then this works

#### What is the difference between where and filter in PySpark?
In PySpark, both filter() and where() functions are used to select out data based on certain conditions.
They are used interchangeably, and both of them essentially perform the same operation.

#### DROP DUPLICATES

In [34]:
DISTINCTDF=df.distinct()
#print("DISTINCT COUNT :" +str(DISTINCTDF.count()))
print(f"DISTINCT COUNT : {DISTINCTDF.count()}")
DISTINCTDF.show()

DISTINCT COUNT : 20640
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.28|   37.81|              52.0|      340.0|          97.0|     200.0|      87.0|       1.5208|          112500.0|       NEAR BAY|
|  -122.13|   37.67|              40.0|     1748.0|         318.0|     914.0|     317.0|       3.8676|          184000.0|       NEAR BAY|
|  -122.07|   37.67|              27.0|     3239.0|         671.0|    1469.0|     616.0|       3.2465|          230600.0|       NEAR BAY|
|  -122.13|   37.66|              19.0|      862.0|         167.0|     407.0|     183.0|       4.3456|          163000.0|       NEAR BAY|
|  -121.85|

In [35]:
df.count()

20640

In [36]:
df2=df.dropDuplicates()
print("DISTINCT VALUE COUNT :", df2.count())
df2.show()

DISTINCT VALUE COUNT : 20640
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.28|   37.81|              52.0|      340.0|          97.0|     200.0|      87.0|       1.5208|          112500.0|       NEAR BAY|
|  -122.13|   37.67|              40.0|     1748.0|         318.0|     914.0|     317.0|       3.8676|          184000.0|       NEAR BAY|
|  -122.07|   37.67|              27.0|     3239.0|         671.0|    1469.0|     616.0|       3.2465|          230600.0|       NEAR BAY|
|  -122.13|   37.66|              19.0|      862.0|         167.0|     407.0|     183.0|       4.3456|          163000.0|       NEAR BAY|
|  -1

#### PySpark Drop duplicates of Selected Multiple Columns

In [37]:
#dropDupDF= df.dropDuplicates(["ocean_proximity","latitude"])

dropDupDF= df.dropDuplicates(["ocean_proximity"])
print(dropDupDF.count())
dropDupDF.show(truncate=False)

5
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|-118.32  |33.35   |27.0              |1675.0     |521.0         |744.0     |331.0     |2.1579       |450000.0          |ISLAND         |
|-124.17  |41.8    |16.0              |2739.0     |480.0         |1259.0    |436.0     |3.7557       |109400.0          |NEAR OCEAN     |
|-122.23  |37.88   |41.0              |880.0      |129.0         |322.0     |126.0     |8.3252       |452600.0          |NEAR BAY       |
|-121.97  |37.64   |32.0              |1283.0     |194.0         |485.0     |171.0     |6.0574       |431000.0          |<1H OCEAN      |
|-121.92  |37.64   |46.0        

##### How is distinct() different from dropDuplicates()
distinct() and dropDuplicates() in PySpark are used to remove duplicate rows, but there is a subtle difference. distinct() considers all columns when identifying duplicates, while dropDuplicates() allowing you to specify a subset of columns to determine uniqueness

##### How does distinct() handle NULL values?
The distinct() function treats NULL values as equal, so if there are multiple rows with NULL values in all columns, only one of them will be retained after applying distinct()..

###############################################################################################################

### Sort() and orderBy()

In [38]:
df.sort("latitude", "longitude", ascending=[True, False]).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -117.04|   32.54|               7.0|      938.0|         297.0|    1187.0|     282.0|       1.2667|           67500.0|     NEAR OCEAN|
|  -117.04|   32.55|              15.0|     2206.0|         648.0|    2511.0|     648.0|       1.6348|           93200.0|     NEAR OCEAN|
|  -117.06|   32.55|               5.0|     3223.0|         940.0|    3284.0|     854.0|       1.4384|          108800.0|     NEAR OCEAN|
|  -117.09|   32.55|               8.0|     6533.0|        1217.0|    4797.0|    1177.0|       3.9583|          144400.0|     NEAR OCEAN|
|  -116.97|   32.56|              

In [39]:
df.sort("latitude",ascending=True).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -117.04|   32.54|               7.0|      938.0|         297.0|    1187.0|     282.0|       1.2667|           67500.0|     NEAR OCEAN|
|  -117.04|   32.55|              15.0|     2206.0|         648.0|    2511.0|     648.0|       1.6348|           93200.0|     NEAR OCEAN|
|  -117.06|   32.55|               5.0|     3223.0|         940.0|    3284.0|     854.0|       1.4384|          108800.0|     NEAR OCEAN|
|  -117.09|   32.55|               8.0|     6533.0|        1217.0|    4797.0|    1177.0|       3.9583|          144400.0|     NEAR OCEAN|
|  -117.05|   32.56|              

In [40]:
df.sort(col("latitude"),ascending=True).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -117.04|   32.54|               7.0|      938.0|         297.0|    1187.0|     282.0|       1.2667|           67500.0|     NEAR OCEAN|
|  -117.04|   32.55|              15.0|     2206.0|         648.0|    2511.0|     648.0|       1.6348|           93200.0|     NEAR OCEAN|
|  -117.06|   32.55|               5.0|     3223.0|         940.0|    3284.0|     854.0|       1.4384|          108800.0|     NEAR OCEAN|
|  -117.09|   32.55|               8.0|     6533.0|        1217.0|    4797.0|    1177.0|       3.9583|          144400.0|     NEAR OCEAN|
|  -117.05|   32.56|              

In [41]:
df.orderBy("latitude", "ocean_proximity").show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -117.04|   32.54|               7.0|      938.0|         297.0|    1187.0|     282.0|       1.2667|           67500.0|     NEAR OCEAN|
|  -117.04|   32.55|              15.0|     2206.0|         648.0|    2511.0|     648.0|       1.6348|           93200.0|     NEAR OCEAN|
|  -117.06|   32.55|               5.0|     3223.0|         940.0|    3284.0|     854.0|       1.4384|          108800.0|     NEAR OCEAN|
|  -117.09|   32.55|               8.0|     6533.0|        1217.0|    4797.0|    1177.0|       3.9583|          144400.0|     NEAR OCEAN|
|  -117.05|   32.56|              

In [42]:
df.orderBy(col("latitude"),col("longitude"),ascending=True).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -117.04|   32.54|               7.0|      938.0|         297.0|    1187.0|     282.0|       1.2667|           67500.0|     NEAR OCEAN|
|  -117.09|   32.55|               8.0|     6533.0|        1217.0|    4797.0|    1177.0|       3.9583|          144400.0|     NEAR OCEAN|
|  -117.06|   32.55|               5.0|     3223.0|         940.0|    3284.0|     854.0|       1.4384|          108800.0|     NEAR OCEAN|
|  -117.04|   32.55|              15.0|     2206.0|         648.0|    2511.0|     648.0|       1.6348|           93200.0|     NEAR OCEAN|
|  -117.12|   32.56|              

In [43]:
df.orderBy(col("latitude").asc(),col("longitude").desc()).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -117.04|   32.54|               7.0|      938.0|         297.0|    1187.0|     282.0|       1.2667|           67500.0|     NEAR OCEAN|
|  -117.04|   32.55|              15.0|     2206.0|         648.0|    2511.0|     648.0|       1.6348|           93200.0|     NEAR OCEAN|
|  -117.06|   32.55|               5.0|     3223.0|         940.0|    3284.0|     854.0|       1.4384|          108800.0|     NEAR OCEAN|
|  -117.09|   32.55|               8.0|     6533.0|        1217.0|    4797.0|    1177.0|       3.9583|          144400.0|     NEAR OCEAN|
|  -116.97|   32.56|              

In [44]:
df.sort(col("latitude").asc()).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -117.04|   32.54|               7.0|      938.0|         297.0|    1187.0|     282.0|       1.2667|           67500.0|     NEAR OCEAN|
|  -117.04|   32.55|              15.0|     2206.0|         648.0|    2511.0|     648.0|       1.6348|           93200.0|     NEAR OCEAN|
|  -117.06|   32.55|               5.0|     3223.0|         940.0|    3284.0|     854.0|       1.4384|          108800.0|     NEAR OCEAN|
|  -117.09|   32.55|               8.0|     6533.0|        1217.0|    4797.0|    1177.0|       3.9583|          144400.0|     NEAR OCEAN|
|  -117.05|   32.56|              

#### What is the difference between orderBy() and sort() in PySpark?
In PySpark, both orderBy() and sort() are methods used for sorting rows in a DataFrame, and they serve the same purpose. However, there is no significant difference in terms of functionality or sorting capability between these two methods. Both can be used to sort rows based on one or more columns in ascending or descending order.

#################################################################################################################

## groupBy

#### Functions
count(): Use groupBy() count() to return the number of rows for each group.<br>
mean():	Returns the mean of values for each group.<br> 
max(): Returns the maximum of values for each group.<br>
min(): Returns the minimum of values for each group.<br> 
sum(): Returns the total for values for each group.<br> 
agg(): Using groupBy() agg() function, we can calculate more than one aggregate at a time.<br>
pivot(): This function is used to Pivot the DataFrame

In [45]:
df.groupBy("latitude").sum("population").show()

+--------+---------------+
|latitude|sum(population)|
+--------+---------------+
|   37.81|        82042.0|
|   38.61|        58273.0|
|   40.11|         1942.0|
|   35.17|         2514.0|
|   40.53|         2401.0|
|   39.42|         3820.0|
|   40.94|         1457.0|
|   37.23|        44829.0|
|   38.93|         8767.0|
|    37.1|         5930.0|
|   36.27|         3138.0|
|   38.68|        69765.0|
|   35.15|         5156.0|
|   41.46|          600.0|
|   37.42|        32191.0|
|   39.97|         1236.0|
|   41.78|         4294.0|
|   40.05|         1204.0|
|   35.38|        24104.0|
|   35.34|        16641.0|
+--------+---------------+
only showing top 20 rows



In [46]:
df.groupBy("ocean_proximity").count().show()

+---------------+-----+
|ocean_proximity|count|
+---------------+-----+
|         ISLAND|    5|
|     NEAR OCEAN| 2658|
|       NEAR BAY| 2290|
|      <1H OCEAN| 9136|
|         INLAND| 6551|
+---------------+-----+



In [47]:
df.groupBy("ocean_proximity").mean("latitude").show()  #if just mean(): it will show mean  of all the columns.

+---------------+------------------+
|ocean_proximity|     avg(latitude)|
+---------------+------------------+
|         ISLAND|33.358000000000004|
|     NEAR OCEAN| 34.73843867569602|
|       NEAR BAY|37.801056768558915|
|      <1H OCEAN|34.560576838879264|
|         INLAND| 36.73182872843834|
+---------------+------------------+



In [48]:
#df.groupBy("ocean_proximity").max("latitude").show()
df.groupBy("ocean_proximity").max("median_income").show()


+---------------+------------------+
|ocean_proximity|max(median_income)|
+---------------+------------------+
|         ISLAND|            3.3906|
|     NEAR OCEAN|           15.0001|
|       NEAR BAY|           15.0001|
|      <1H OCEAN|           15.0001|
|         INLAND|           15.0001|
+---------------+------------------+



In [49]:
df.groupBy("housing_median_age").min("total_bedrooms").show()

+------------------+-------------------+
|housing_median_age|min(total_bedrooms)|
+------------------+-------------------+
|               8.0|                9.0|
|               7.0|               12.0|
|              49.0|                7.0|
|              29.0|               22.0|
|              47.0|               23.0|
|              42.0|               14.0|
|              44.0|               29.0|
|              35.0|               31.0|
|              18.0|               17.0|
|              39.0|                8.0|
|               1.0|                2.0|
|              37.0|               17.0|
|              34.0|               11.0|
|              25.0|               32.0|
|              36.0|                5.0|
|              41.0|               20.0|
|               4.0|                2.0|
|              23.0|               32.0|
|              50.0|               33.0|
|              45.0|                5.0|
+------------------+-------------------+
only showing top

In [50]:
df.groupBy("housing_median_age","median_income").max("total_rooms","total_bedrooms").show()

+------------------+-------------+----------------+-------------------+
|housing_median_age|median_income|max(total_rooms)|max(total_bedrooms)|
+------------------+-------------+----------------+-------------------+
|              18.0|      11.6017|          1617.0|              210.0|
|              52.0|       7.9556|          2079.0|              273.0|
|              31.0|       5.2312|          1487.0|              280.0|
|              19.0|       5.0509|          4657.0|              739.0|
|              22.0|       2.5008|          8350.0|             2717.0|
|              37.0|       4.0924|          2066.0|              434.0|
|              30.0|        4.975|           414.0|               54.0|
|              35.0|         4.75|          2277.0|              420.0|
|              25.0|       5.0423|          5405.0|              939.0|
|              14.0|       5.2798|          5332.0|              884.0|
|              15.0|       5.8418|          1216.0|             

In [51]:
from pyspark.sql.functions import max, min
df.groupBy("median_house_value").agg(max("total_rooms").alias("MaxTotalRoomsAvail"), min("total_bedrooms").alias("MinTotalBedroom")).show()

+------------------+------------------+---------------+
|median_house_value|MaxTotalRoomsAvail|MinTotalBedroom|
+------------------+------------------+---------------+
|          270100.0|            3789.0|          162.0|
|          330000.0|            6678.0|           15.0|
|          179300.0|            5031.0|          193.0|
|          217000.0|           15207.0|          338.0|
|          300000.0|            7297.0|          168.0|
|          337200.0|            3047.0|          270.0|
|          147000.0|            2633.0|          284.0|
|          147100.0|            3048.0|          121.0|
|           51300.0|            8502.0|           57.0|
|          181400.0|            6108.0|          312.0|
|          230800.0|            3285.0|          166.0|
|          262800.0|            3085.0|          355.0|
|          169700.0|            3297.0|          534.0|
|          477600.0|            2234.0|          256.0|
|          187200.0|            9963.0|         

In [52]:
df.groupBy("median_house_value").agg(max("total_rooms").alias("MaxTotalRoomsAvail"), min("total_bedrooms").alias("MinTotalBedroom")).where(col("MinTotalBedroom")>3000).show()

+------------------+------------------+---------------+
|median_house_value|MaxTotalRoomsAvail|MinTotalBedroom|
+------------------+------------------+---------------+
|          451100.0|           28258.0|         3864.0|
|          399200.0|           30405.0|         4093.0|
|          322500.0|           18123.0|         3173.0|
|           28300.0|           17738.0|         3114.0|
+------------------+------------------+---------------+



## Join

In [53]:
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [54]:
from pyspark.sql.functions import monotonically_increasing_id
df_with_index = df.withColumn("index", monotonically_increasing_id())
df_with_index.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-----+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|index|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-----+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    0|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|    1|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|    2|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY

In [55]:
df2= df.limit(20)
df2=df2.withColumn("example_column",col("population")*10)
df2=df2.select("longitude","latitude","example_column","total_rooms")
df2.show()

+---------+--------+--------------+-----------+
|longitude|latitude|example_column|total_rooms|
+---------+--------+--------------+-----------+
|  -122.23|   37.88|        3220.0|      880.0|
|  -122.22|   37.86|       24010.0|     7099.0|
|  -122.24|   37.85|        4960.0|     1467.0|
|  -122.25|   37.85|        5580.0|     1274.0|
|  -122.25|   37.85|        5650.0|     1627.0|
|  -122.25|   37.85|        4130.0|      919.0|
|  -122.25|   37.84|       10940.0|     2535.0|
|  -122.25|   37.84|       11570.0|     3104.0|
|  -122.26|   37.84|       12060.0|     2555.0|
|  -122.25|   37.84|       15510.0|     3549.0|
|  -122.26|   37.85|        9100.0|     2202.0|
|  -122.26|   37.85|       15040.0|     3503.0|
|  -122.26|   37.85|       10980.0|     2491.0|
|  -122.26|   37.84|        3450.0|      696.0|
|  -122.26|   37.85|       12120.0|     2643.0|
|  -122.26|   37.85|        6970.0|     1120.0|
|  -122.27|   37.85|        7930.0|     1966.0|
|  -122.27|   37.85|        6480.0|     

In [56]:
df3=df.limit(40)
df3=df3.withColumn("example_column",col("population")*10)
df3=df3.select(df3.median_income, df3.example_column,df3.latitude)
df3.show()

+-------------+--------------+--------+
|median_income|example_column|latitude|
+-------------+--------------+--------+
|       8.3252|        3220.0|   37.88|
|       8.3014|       24010.0|   37.86|
|       7.2574|        4960.0|   37.85|
|       5.6431|        5580.0|   37.85|
|       3.8462|        5650.0|   37.85|
|       4.0368|        4130.0|   37.85|
|       3.6591|       10940.0|   37.84|
|         3.12|       11570.0|   37.84|
|       2.0804|       12060.0|   37.84|
|       3.6912|       15510.0|   37.84|
|       3.2031|        9100.0|   37.85|
|       3.2705|       15040.0|   37.85|
|        3.075|       10980.0|   37.85|
|       2.6736|        3450.0|   37.84|
|       1.9167|       12120.0|   37.85|
|        2.125|        6970.0|   37.85|
|        2.775|        7930.0|   37.85|
|       2.1202|        6480.0|   37.85|
|       1.9911|        9900.0|   37.84|
|       2.6033|        6900.0|   37.84|
+-------------+--------------+--------+
only showing top 20 rows



## JOINS AVAILABLE
Inner Join: Returns only the rows with matching keys in both DataFrames.<br>
Left Join: Returns all rows from the left DataFrame and matching rows from the right DataFrame. <br>
Right Join: Returns all rows from the right DataFrame and matching rows from the left DataFrame. <br>
Full Outer Join: Returns all rows from both DataFrames, including matching and non-matching rows. <br>
Left Semi Join: Returns all rows from the left DataFrame where there is a match in the right DataFrame. <br>
Left Anti Join: Returns all rows from the left DataFrame where there is no match in the right DataFrame. <br>rame.

In [57]:
df2.join(df3,df2.example_column == df3.example_column,"inner").show()

+---------+--------+--------------+-----------+-------------+--------------+--------+
|longitude|latitude|example_column|total_rooms|median_income|example_column|latitude|
+---------+--------+--------------+-----------+-------------+--------------+--------+
|  -122.23|   37.88|        3220.0|      880.0|       8.3252|        3220.0|   37.88|
|  -122.22|   37.86|       24010.0|     7099.0|       8.3014|       24010.0|   37.86|
|  -122.24|   37.85|        4960.0|     1467.0|       7.2574|        4960.0|   37.85|
|  -122.25|   37.85|        5580.0|     1274.0|       5.6431|        5580.0|   37.85|
|  -122.25|   37.85|        5650.0|     1627.0|       3.8462|        5650.0|   37.85|
|  -122.25|   37.85|        4130.0|      919.0|       4.0368|        4130.0|   37.85|
|  -122.25|   37.84|       10940.0|     2535.0|       3.6591|       10940.0|   37.84|
|  -122.25|   37.84|       11570.0|     3104.0|         3.12|       11570.0|   37.84|
|  -122.26|   37.84|       12060.0|     2555.0|       

In [58]:
df2.join(df3,df2.example_column == df3.example_column,"leftouter").show() # can write left or leftouter

+---------+--------+--------------+-----------+-------------+--------------+--------+
|longitude|latitude|example_column|total_rooms|median_income|example_column|latitude|
+---------+--------+--------------+-----------+-------------+--------------+--------+
|  -122.23|   37.88|        3220.0|      880.0|       8.3252|        3220.0|   37.88|
|  -122.22|   37.86|       24010.0|     7099.0|       8.3014|       24010.0|   37.86|
|  -122.24|   37.85|        4960.0|     1467.0|       7.2574|        4960.0|   37.85|
|  -122.25|   37.85|        5580.0|     1274.0|       5.6431|        5580.0|   37.85|
|  -122.25|   37.85|        5650.0|     1627.0|       3.8462|        5650.0|   37.85|
|  -122.25|   37.85|        4130.0|      919.0|       4.0368|        4130.0|   37.85|
|  -122.25|   37.84|       10940.0|     2535.0|       3.6591|       10940.0|   37.84|
|  -122.25|   37.84|       11570.0|     3104.0|         3.12|       11570.0|   37.84|
|  -122.26|   37.84|       12060.0|     2555.0|       

In [59]:
rightouterDF=df2.join(df3,df2.example_column == df3.example_column,"right")
print(rightouterDF.count())
rightouterDF.show()

40
+---------+--------+--------------+-----------+-------------+--------------+--------+
|longitude|latitude|example_column|total_rooms|median_income|example_column|latitude|
+---------+--------+--------------+-----------+-------------+--------------+--------+
|  -122.23|   37.88|        3220.0|      880.0|       8.3252|        3220.0|   37.88|
|  -122.22|   37.86|       24010.0|     7099.0|       8.3014|       24010.0|   37.86|
|  -122.24|   37.85|        4960.0|     1467.0|       7.2574|        4960.0|   37.85|
|  -122.25|   37.85|        5580.0|     1274.0|       5.6431|        5580.0|   37.85|
|  -122.25|   37.85|        5650.0|     1627.0|       3.8462|        5650.0|   37.85|
|  -122.25|   37.85|        4130.0|      919.0|       4.0368|        4130.0|   37.85|
|  -122.25|   37.84|       10940.0|     2535.0|       3.6591|       10940.0|   37.84|
|  -122.25|   37.84|       11570.0|     3104.0|         3.12|       11570.0|   37.84|
|  -122.26|   37.84|       12060.0|     2555.0|    

In [60]:
full_df=df3.join(df2,df3.example_column == df2. example_column, "full") #full,outer,fullouter
full_df.count()
full_df.show()

+-------------+--------------+--------+---------+--------+--------------+-----------+
|median_income|example_column|latitude|longitude|latitude|example_column|total_rooms|
+-------------+--------------+--------+---------+--------+--------------+-----------+
|       2.4038|        3170.0|   37.85|     NULL|    NULL|          NULL|       NULL|
|       8.3252|        3220.0|   37.88|  -122.23|   37.88|        3220.0|      880.0|
|       2.6736|        3450.0|   37.84|  -122.26|   37.84|        3450.0|      696.0|
|       1.6875|        3950.0|   37.84|     NULL|    NULL|          NULL|       NULL|
|       1.3578|        4090.0|   37.85|     NULL|    NULL|          NULL|       NULL|
|       4.0368|        4130.0|   37.85|  -122.25|   37.85|        4130.0|      919.0|
|       7.2574|        4960.0|   37.85|  -122.24|   37.85|        4960.0|     1467.0|
|       5.6431|        5580.0|   37.85|  -122.25|   37.85|        5580.0|     1274.0|
|       3.8462|        5650.0|   37.85|  -122.25|   37

In [61]:
semi_df=df3.join(df2,df3.example_column == df2. example_column, "leftsemi")
semi_df.count()
semi_df.show()

+-------------+--------------+--------+
|median_income|example_column|latitude|
+-------------+--------------+--------+
|       8.3252|        3220.0|   37.88|
|       8.3014|       24010.0|   37.86|
|       7.2574|        4960.0|   37.85|
|       5.6431|        5580.0|   37.85|
|       3.8462|        5650.0|   37.85|
|       4.0368|        4130.0|   37.85|
|       3.6591|       10940.0|   37.84|
|         3.12|       11570.0|   37.84|
|       2.0804|       12060.0|   37.84|
|       3.6912|       15510.0|   37.84|
|       3.2031|        9100.0|   37.85|
|       3.2705|       15040.0|   37.85|
|        3.075|       10980.0|   37.85|
|       2.6736|        3450.0|   37.84|
|       1.9167|       12120.0|   37.85|
|        2.125|        6970.0|   37.85|
|        2.775|        7930.0|   37.85|
|       2.1202|        6480.0|   37.85|
|       1.9911|        9900.0|   37.84|
|       2.6033|        6900.0|   37.84|
+-------------+--------------+--------+



In [62]:
anti_df=df3.join(df2,df3.example_column == df2.example_column, "leftanti")
print(anti_df.count())
anti_df.show()

20
+-------------+--------------+--------+
|median_income|example_column|latitude|
+-------------+--------------+--------+
|       1.3578|        4090.0|   37.85|
|       1.7135|        9290.0|   37.85|
|        1.725|       10150.0|   37.84|
|       2.1806|        8530.0|   37.84|
|          2.6|       10060.0|   37.84|
|       2.4038|        3170.0|   37.85|
|       2.4597|        6070.0|   37.85|
|        1.808|       11020.0|   37.85|
|       1.6424|       11310.0|   37.84|
|       1.6875|        3950.0|   37.84|
|       1.9274|        8630.0|   37.84|
|       1.9615|       11680.0|   37.84|
|       1.7969|       10260.0|   37.84|
|        1.375|        7540.0|   37.83|
|       2.7303|       12580.0|   37.83|
|       1.4861|        5700.0|   37.83|
|       1.0972|        9870.0|   37.83|
|       1.4103|        9010.0|   37.83|
|         3.48|        6890.0|   37.83|
|       2.5898|       13770.0|   37.83|
+-------------+--------------+--------+



#### An example of multiple joins 

df1.join(df2,df1.id1 == df2.id2,"inner") \
   .join(df3,df1.id1 == df3.id3,"inner")


In [63]:
df2=df2.select("example_column","latitude")
df3=df3.select("example_column","latitude")
DFunion=df2.union(df3)
print(DFunion.count())
DFunion.show()

60
+--------------+--------+
|example_column|latitude|
+--------------+--------+
|        3220.0|   37.88|
|       24010.0|   37.86|
|        4960.0|   37.85|
|        5580.0|   37.85|
|        5650.0|   37.85|
|        4130.0|   37.85|
|       10940.0|   37.84|
|       11570.0|   37.84|
|       12060.0|   37.84|
|       15510.0|   37.84|
|        9100.0|   37.85|
|       15040.0|   37.85|
|       10980.0|   37.85|
|        3450.0|   37.84|
|       12120.0|   37.85|
|        6970.0|   37.85|
|        7930.0|   37.85|
|        6480.0|   37.85|
|        9900.0|   37.84|
|        6900.0|   37.84|
+--------------+--------+
only showing top 20 rows



In [64]:
disUnion= df2.union(df3).distinct()
print(disUnion.count())
disUnion.show()

40
+--------------+--------+
|example_column|latitude|
+--------------+--------+
|       10940.0|   37.84|
|       12120.0|   37.85|
|       10980.0|   37.85|
|        6900.0|   37.84|
|        3220.0|   37.88|
|       24010.0|   37.86|
|        7540.0|   37.83|
|        9010.0|   37.83|
|        6480.0|   37.85|
|       11570.0|   37.84|
|        5650.0|   37.85|
|        7930.0|   37.85|
|        4960.0|   37.85|
|        9900.0|   37.84|
|        9100.0|   37.85|
|       15040.0|   37.85|
|        4130.0|   37.85|
|       12060.0|   37.84|
|        3450.0|   37.84|
|        6970.0|   37.85|
+--------------+--------+
only showing top 20 rows



#### Can we union() DataFrames that have different schemas?
The union() can be performed on the DataFrames that have the same schema and structure. If the schemas are different we may need to use unionByName() or make changes to the DataFrames to align to their schemas before performing union() transformation.

#####  UNION BY NAME (unionByName)

In [65]:
df2=df2.drop("latitude")
df2.show()

+--------------+
|example_column|
+--------------+
|        3220.0|
|       24010.0|
|        4960.0|
|        5580.0|
|        5650.0|
|        4130.0|
|       10940.0|
|       11570.0|
|       12060.0|
|       15510.0|
|        9100.0|
|       15040.0|
|       10980.0|
|        3450.0|
|       12120.0|
|        6970.0|
|        7930.0|
|        6480.0|
|        9900.0|
|        6900.0|
+--------------+



In [66]:
DFunionAll= df2.unionByName(df3, allowMissingColumns=True)
print(DFunionAll.count())
DFunionAll.show()

60
+--------------+--------+
|example_column|latitude|
+--------------+--------+
|        3220.0|    NULL|
|       24010.0|    NULL|
|        4960.0|    NULL|
|        5580.0|    NULL|
|        5650.0|    NULL|
|        4130.0|    NULL|
|       10940.0|    NULL|
|       11570.0|    NULL|
|       12060.0|    NULL|
|       15510.0|    NULL|
|        9100.0|    NULL|
|       15040.0|    NULL|
|       10980.0|    NULL|
|        3450.0|    NULL|
|       12120.0|    NULL|
|        6970.0|    NULL|
|        7930.0|    NULL|
|        6480.0|    NULL|
|        9900.0|    NULL|
|        6900.0|    NULL|
+--------------+--------+
only showing top 20 rows



### UDF (User Defined Functions)

In [67]:
# def upperCase(str):
#     return str.upper()

In [68]:
# from pyspark.sql.functions import udf
# from pyspark.sql.types import StringType
# upperCaseUDF = udf(upperCase, StringType())

In [69]:
# from pyspark.sql.functions import col
# test_df = df.limit(5)  # Limit to 5 rows for testing
# test_df.show()
# test_df = test_df.withColumn("ocean_proximity", col("ocean_proximity").cast(StringType()))
# # # Apply the UDF
# test_df = test_df.withColumn("ocean_proximity", upperCaseUDF(test_df["ocean_proximity"]))
# test_df.show()

In [70]:
# test_df.printSchema()


#### Transformation

Map, filter are transformation...
show,collect,save as file are actions
Lazy Evaluation: if there are read, map, filter and save as file then during our execution of read map and filter the compiler wont do anything. it will start doing only when an action takes place.This is because if do all the transformation and no action then all the storage go waste. so its better to do transformation after doing an action. 
Two Types Of transformation:
#### Narrow Transformation and Wide Transformation
Narrow: 1 to 1 -No shuffle
Wide: shuffle is needed

### APPLY

#### Apply using withColumn

In [71]:
test_df=df.limit(20)
from pyspark.sql.functions import lower
test_dfLower=df.withColumn("ocean_proximity", lower(df.ocean_proximity))
test_dfLower.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       near bay|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       near bay|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       near bay|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       near bay|
|  -122.25|   37.85|              

#### Apply with select

In [81]:
from pyspark.sql.functions import round
test_dfApply=test_df.select(col("median_income"), round(col("median_income"),2).alias("rounded_column"))
test_dfApply.show()



+-------------+--------------+
|median_income|rounded_column|
+-------------+--------------+
|       8.3252|          8.33|
|       8.3014|           8.3|
|       7.2574|          7.26|
|       5.6431|          5.64|
|       3.8462|          3.85|
|       4.0368|          4.04|
|       3.6591|          3.66|
|         3.12|          3.12|
|       2.0804|          2.08|
|       3.6912|          3.69|
|       3.2031|           3.2|
|       3.2705|          3.27|
|        3.075|          3.08|
|       2.6736|          2.67|
|       1.9167|          1.92|
|        2.125|          2.13|
|        2.775|          2.78|
|       2.1202|          2.12|
|       1.9911|          1.99|
|       2.6033|           2.6|
+-------------+--------------+



#### map()
Transformation function that is used to apply a function/lambda to each element of an RDD and return a new RDD consisting of the result.

In [82]:
#rdd=test_df.rdd

In [83]:
#print(rdd.take(5))
#ERROR

In [84]:
#rdd_transformed = rdd.map(lambda row: (row[0], row[1], row[2], row[3] * 2, row[4], row[5], row[6], row[7], row[8], row[9]))

In [85]:
#df_transformed = rdd_transformed.map(lambda x: Row(longitude=x[0], latitude=x[1], housing_median_age=x[2], total_rooms=x[3], total_bedrooms=x[4],population=x[5], households=x[6], median_income=x[7], median_house_value=x[8], ocean_proximity=x[9])).toDF()


#### Sampling

In [105]:
test_df1=df.limit(30)
test_df1.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [94]:
df_sample=spark.range(30)

In [95]:
print(df_sample.sample(0.6).collect())

[Row(id=0), Row(id=4), Row(id=5), Row(id=7), Row(id=8), Row(id=9), Row(id=10), Row(id=13), Row(id=15), Row(id=16), Row(id=17), Row(id=18), Row(id=19), Row(id=21), Row(id=22), Row(id=25), Row(id=26)]


#### foreach()

In [116]:
# def f(row):
#     print(row['median_income'])
# test_df1.foreach(f)

#### seed

In [110]:
print(test_df1.sample(0.2,123).collect())

[Row(longitude=-122.23, latitude=37.88, housing_median_age=41.0, total_rooms=880.0, total_bedrooms=129.0, population=322.0, households=126.0, median_income=8.3252, median_house_value=452600.0, ocean_proximity='NEAR BAY'), Row(longitude=-122.25, latitude=37.85, housing_median_age=52.0, total_rooms=1627.0, total_bedrooms=280.0, population=565.0, households=259.0, median_income=3.8462, median_house_value=342200.0, ocean_proximity='NEAR BAY'), Row(longitude=-122.27, latitude=37.84, housing_median_age=52.0, total_rooms=1503.0, total_bedrooms=298.0, population=690.0, households=275.0, median_income=2.6033, median_house_value=162900.0, ocean_proximity='NEAR BAY'), Row(longitude=-122.28, latitude=37.85, housing_median_age=49.0, total_rooms=1130.0, total_bedrooms=244.0, population=607.0, households=239.0, median_income=2.4597, median_house_value=93800.0, ocean_proximity='NEAR BAY')]


In [112]:
print(test_df1.sample(0.2,123).collect())

[Row(longitude=-122.23, latitude=37.88, housing_median_age=41.0, total_rooms=880.0, total_bedrooms=129.0, population=322.0, households=126.0, median_income=8.3252, median_house_value=452600.0, ocean_proximity='NEAR BAY'), Row(longitude=-122.25, latitude=37.85, housing_median_age=52.0, total_rooms=1627.0, total_bedrooms=280.0, population=565.0, households=259.0, median_income=3.8462, median_house_value=342200.0, ocean_proximity='NEAR BAY'), Row(longitude=-122.27, latitude=37.84, housing_median_age=52.0, total_rooms=1503.0, total_bedrooms=298.0, population=690.0, households=275.0, median_income=2.6033, median_house_value=162900.0, ocean_proximity='NEAR BAY'), Row(longitude=-122.28, latitude=37.85, housing_median_age=49.0, total_rooms=1130.0, total_bedrooms=244.0, population=607.0, households=239.0, median_income=2.4597, median_house_value=93800.0, ocean_proximity='NEAR BAY')]


In [115]:
print(test_df1.sample(0.2,453).collect())

[Row(longitude=-122.22, latitude=37.86, housing_median_age=21.0, total_rooms=7099.0, total_bedrooms=1106.0, population=2401.0, households=1138.0, median_income=8.3014, median_house_value=358500.0, ocean_proximity='NEAR BAY'), Row(longitude=-122.25, latitude=37.85, housing_median_age=52.0, total_rooms=1627.0, total_bedrooms=280.0, population=565.0, households=259.0, median_income=3.8462, median_house_value=342200.0, ocean_proximity='NEAR BAY'), Row(longitude=-122.25, latitude=37.84, housing_median_age=52.0, total_rooms=2535.0, total_bedrooms=489.0, population=1094.0, households=514.0, median_income=3.6591, median_house_value=299200.0, ocean_proximity='NEAR BAY'), Row(longitude=-122.26, latitude=37.85, housing_median_age=52.0, total_rooms=2491.0, total_bedrooms=474.0, population=1098.0, households=468.0, median_income=3.075, median_house_value=213500.0, ocean_proximity='NEAR BAY'), Row(longitude=-122.27, latitude=37.84, housing_median_age=52.0, total_rooms=1503.0, total_bedrooms=298.0, p

In [127]:
test_df2=test_df1.sample(True,0.3,123).collect() ##with duplicates
print (len(test_df2))

10


In [129]:
print(test_df2)

[Row(longitude=-122.23, latitude=37.88, housing_median_age=41.0, total_rooms=880.0, total_bedrooms=129.0, population=322.0, households=126.0, median_income=8.3252, median_house_value=452600.0, ocean_proximity='NEAR BAY'), Row(longitude=-122.25, latitude=37.85, housing_median_age=52.0, total_rooms=919.0, total_bedrooms=213.0, population=413.0, households=193.0, median_income=4.0368, median_house_value=269700.0, ocean_proximity='NEAR BAY'), Row(longitude=-122.25, latitude=37.84, housing_median_age=52.0, total_rooms=3549.0, total_bedrooms=707.0, population=1551.0, households=714.0, median_income=3.6912, median_house_value=261100.0, ocean_proximity='NEAR BAY'), Row(longitude=-122.26, latitude=37.85, housing_median_age=52.0, total_rooms=3503.0, total_bedrooms=752.0, population=1504.0, households=734.0, median_income=3.2705, median_house_value=241800.0, ocean_proximity='NEAR BAY'), Row(longitude=-122.26, latitude=37.85, housing_median_age=52.0, total_rooms=2643.0, total_bedrooms=626.0, popul

In [131]:
test_df3=test_df1.sample(0.3,123).collect() ### without duplicates
print(len(test_df3))

7


In [132]:
print(test_df3)


[Row(longitude=-122.23, latitude=37.88, housing_median_age=41.0, total_rooms=880.0, total_bedrooms=129.0, population=322.0, households=126.0, median_income=8.3252, median_house_value=452600.0, ocean_proximity='NEAR BAY'), Row(longitude=-122.25, latitude=37.85, housing_median_age=52.0, total_rooms=1627.0, total_bedrooms=280.0, population=565.0, households=259.0, median_income=3.8462, median_house_value=342200.0, ocean_proximity='NEAR BAY'), Row(longitude=-122.27, latitude=37.85, housing_median_age=52.0, total_rooms=1228.0, total_bedrooms=293.0, population=648.0, households=303.0, median_income=2.1202, median_house_value=155500.0, ocean_proximity='NEAR BAY'), Row(longitude=-122.27, latitude=37.84, housing_median_age=52.0, total_rooms=1503.0, total_bedrooms=298.0, population=690.0, households=275.0, median_income=2.6033, median_house_value=162900.0, ocean_proximity='NEAR BAY'), Row(longitude=-122.27, latitude=37.84, housing_median_age=52.0, total_rooms=2224.0, total_bedrooms=437.0, popula

In [133]:
test_df1.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [143]:
test_df_Nullcount= test_df.filter(col('median_income').isNull()).count()
print(test_df_Nullcount)

0


In [138]:
test_df.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

### PySpark Replace NULL/None Values with Zero (0)

In [149]:
test_df4=test_df1.limit(10)
updated_df =test_df4.replace(3.12, None, 'median_income')
updated_df =updated_df.replace(8.3252, None, 'median_income')
updated_df.show()


+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|         NULL|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

### to apply on all integer columns

In [152]:
updated_df.na.fill(value=0).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|          0.0|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [153]:
updated_df.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|         NULL|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

#### To apply the function on a apecified column

In [150]:
updated_df.na.fill(value=0,subset=["median_income"]).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|          0.0|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [168]:
updated_df.na.fill({"median_income":""}).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|         NULL|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [171]:
updated_df1=updated_df.groupby("housing_median_age").pivot("latitude").sum("median_house_value")
updated_df1.show()

+------------------+--------+---------+--------+--------+
|housing_median_age|   37.84|    37.85|   37.86|   37.88|
+------------------+--------+---------+--------+--------+
|              41.0|    NULL|     NULL|    NULL|452600.0|
|              21.0|    NULL|     NULL|358500.0|    NULL|
|              52.0|801700.0|1305300.0|    NULL|    NULL|
|              42.0|226700.0|     NULL|    NULL|    NULL|
+------------------+--------+---------+--------+--------+



In [178]:
unpivoted = updated_df1.selectExpr("housing_median_age", "stack(2, 37.84, 37.85, 37.86, 37.88) as (latitude, median_income)")

In [179]:
unpivoted.show()

+------------------+--------+-------------+
|housing_median_age|latitude|median_income|
+------------------+--------+-------------+
|              41.0|   37.84|        37.85|
|              41.0|   37.86|        37.88|
|              21.0|   37.84|        37.85|
|              21.0|   37.86|        37.88|
|              52.0|   37.84|        37.85|
|              52.0|   37.86|        37.88|
|              42.0|   37.84|        37.85|
|              42.0|   37.86|        37.88|
+------------------+--------+-------------+

