RDD

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [3]:
ls

[0m[01;34msample_data[0m/  [01;34mspark-3.1.1-bin-hadoop3.2[0m/  spark-3.1.1-bin-hadoop3.2.tgz


In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [6]:
!wget https://jacobceles.github.io/knowledge_repo/colab_and_pyspark/cars.csv

--2023-12-10 09:55:15--  https://jacobceles.github.io/knowledge_repo/colab_and_pyspark/cars.csv
Resolving jacobceles.github.io (jacobceles.github.io)... 185.199.108.153, 185.199.109.153, 185.199.110.153, ...
Connecting to jacobceles.github.io (jacobceles.github.io)|185.199.108.153|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://jacobcelestine.com/knowledge_repo/colab_and_pyspark/cars.csv [following]
--2023-12-10 09:55:15--  https://jacobcelestine.com/knowledge_repo/colab_and_pyspark/cars.csv
Resolving jacobcelestine.com (jacobcelestine.com)... 185.199.108.153, 185.199.109.153, 185.199.110.153, ...
Connecting to jacobcelestine.com (jacobcelestine.com)|185.199.108.153|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 22608 (22K) [text/csv]
Saving to: ‘cars.csv’


2023-12-10 09:55:16 (78.6 MB/s) - ‘cars.csv’ saved [22608/22608]



In [7]:
!ls

cars.csv  sample_data  spark-3.1.1-bin-hadoop3.2  spark-3.1.1-bin-hadoop3.2.tgz


In [8]:
df = spark.read.csv('cars.csv', header=True, sep=";")
df.show(5)

+--------------------+----+---------+------------+----------+------+------------+-----+------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0| 3504.|        12.0|   70|    US|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0| 3693.|        11.5|   70|    US|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0| 3436.|        11.0|   70|    US|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0| 3433.|        12.0|   70|    US|
|         Ford Torino|17.0|        8|       302.0|     140.0| 3449.|        10.5|   70|    US|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
only showing top 5 rows



In [9]:
df.show(5, truncate=False)

+-------------------------+----+---------+------------+----------+------+------------+-----+------+
|Car                      |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+-------------------------+----+---------+------------+----------+------+------------+-----+------+
|Chevrolet Chevelle Malibu|18.0|8        |307.0       |130.0     |3504. |12.0        |70   |US    |
|Buick Skylark 320        |15.0|8        |350.0       |165.0     |3693. |11.5        |70   |US    |
|Plymouth Satellite       |18.0|8        |318.0       |150.0     |3436. |11.0        |70   |US    |
|AMC Rebel SST            |16.0|8        |304.0       |150.0     |3433. |12.0        |70   |US    |
|Ford Torino              |17.0|8        |302.0       |140.0     |3449. |10.5        |70   |US    |
+-------------------------+----+---------+------------+----------+------+------------+-----+------+
only showing top 5 rows



In [10]:
df.limit(5)

Car,MPG,Cylinders,Displacement,Horsepower,Weight,Acceleration,Model,Origin
Chevrolet Chevell...,18.0,8,307.0,130.0,3504.0,12.0,70,US
Buick Skylark 320,15.0,8,350.0,165.0,3693.0,11.5,70,US
Plymouth Satellite,18.0,8,318.0,150.0,3436.0,11.0,70,US
AMC Rebel SST,16.0,8,304.0,150.0,3433.0,12.0,70,US
Ford Torino,17.0,8,302.0,140.0,3449.0,10.5,70,US


In [11]:
df.columns

['Car',
 'MPG',
 'Cylinders',
 'Displacement',
 'Horsepower',
 'Weight',
 'Acceleration',
 'Model',
 'Origin']

In [12]:
df.dtypes

[('Car', 'string'),
 ('MPG', 'string'),
 ('Cylinders', 'string'),
 ('Displacement', 'string'),
 ('Horsepower', 'string'),
 ('Weight', 'string'),
 ('Acceleration', 'string'),
 ('Model', 'string'),
 ('Origin', 'string')]

In [13]:
df.printSchema()

root
 |-- Car: string (nullable = true)
 |-- MPG: string (nullable = true)
 |-- Cylinders: string (nullable = true)
 |-- Displacement: string (nullable = true)
 |-- Horsepower: string (nullable = true)
 |-- Weight: string (nullable = true)
 |-- Acceleration: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Origin: string (nullable = true)



In [14]:
df = spark.read.csv('cars.csv', header=True, sep=";", inferSchema=True)
df.printSchema()

root
 |-- Car: string (nullable = true)
 |-- MPG: double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Displacement: double (nullable = true)
 |-- Horsepower: double (nullable = true)
 |-- Weight: decimal(4,0) (nullable = true)
 |-- Acceleration: double (nullable = true)
 |-- Model: integer (nullable = true)
 |-- Origin: string (nullable = true)



In [15]:
from pyspark.sql.types import *
df.columns

['Car',
 'MPG',
 'Cylinders',
 'Displacement',
 'Horsepower',
 'Weight',
 'Acceleration',
 'Model',
 'Origin']

In [16]:
# Creating a list of the schema in the format column_name, data_type
labels = [
     ('Car',StringType()),
     ('MPG',DoubleType()),
     ('Cylinders',IntegerType()),
     ('Displacement',DoubleType()),
     ('Horsepower',DoubleType()),
     ('Weight',DoubleType()),
     ('Acceleration',DoubleType()),
     ('Model',IntegerType()),
     ('Origin',StringType())
]

In [17]:
schema = StructType([StructField (x[0], x[1], True) for x in labels])
schema

StructType(List(StructField(Car,StringType,true),StructField(MPG,DoubleType,true),StructField(Cylinders,IntegerType,true),StructField(Displacement,DoubleType,true),StructField(Horsepower,DoubleType,true),StructField(Weight,DoubleType,true),StructField(Acceleration,DoubleType,true),StructField(Model,IntegerType,true),StructField(Origin,StringType,true)))

In [18]:
df = spark.read.csv('cars.csv', header=True, sep=";", schema=schema)
df.printSchema()

root
 |-- Car: string (nullable = true)
 |-- MPG: double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Displacement: double (nullable = true)
 |-- Horsepower: double (nullable = true)
 |-- Weight: double (nullable = true)
 |-- Acceleration: double (nullable = true)
 |-- Model: integer (nullable = true)
 |-- Origin: string (nullable = true)



In [19]:
df.show(truncate=False)

+--------------------------------+----+---------+------------+----------+------+------------+-----+------+
|Car                             |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+--------------------------------+----+---------+------------+----------+------+------------+-----+------+
|Chevrolet Chevelle Malibu       |18.0|8        |307.0       |130.0     |3504.0|12.0        |70   |US    |
|Buick Skylark 320               |15.0|8        |350.0       |165.0     |3693.0|11.5        |70   |US    |
|Plymouth Satellite              |18.0|8        |318.0       |150.0     |3436.0|11.0        |70   |US    |
|AMC Rebel SST                   |16.0|8        |304.0       |150.0     |3433.0|12.0        |70   |US    |
|Ford Torino                     |17.0|8        |302.0       |140.0     |3449.0|10.5        |70   |US    |
|Ford Galaxie 500                |15.0|8        |429.0       |198.0     |4341.0|10.0        |70   |US    |
|Chevrolet Impala                |14.

In [20]:
print(df.Car)
print("*"*20)
df.select(df.Car).show(truncate=False)

Column<'Car'>
********************
+--------------------------------+
|Car                             |
+--------------------------------+
|Chevrolet Chevelle Malibu       |
|Buick Skylark 320               |
|Plymouth Satellite              |
|AMC Rebel SST                   |
|Ford Torino                     |
|Ford Galaxie 500                |
|Chevrolet Impala                |
|Plymouth Fury iii               |
|Pontiac Catalina                |
|AMC Ambassador DPL              |
|Citroen DS-21 Pallas            |
|Chevrolet Chevelle Concours (sw)|
|Ford Torino (sw)                |
|Plymouth Satellite (sw)         |
|AMC Rebel SST (sw)              |
|Dodge Challenger SE             |
|Plymouth 'Cuda 340              |
|Ford Mustang Boss 302           |
|Chevrolet Monte Carlo           |
|Buick Estate Wagon (sw)         |
+--------------------------------+
only showing top 20 rows



In [21]:
print(df['car'])
print("*"*20)
df.select(df['car']).show(truncate=False)

Column<'car'>
********************
+--------------------------------+
|car                             |
+--------------------------------+
|Chevrolet Chevelle Malibu       |
|Buick Skylark 320               |
|Plymouth Satellite              |
|AMC Rebel SST                   |
|Ford Torino                     |
|Ford Galaxie 500                |
|Chevrolet Impala                |
|Plymouth Fury iii               |
|Pontiac Catalina                |
|AMC Ambassador DPL              |
|Citroen DS-21 Pallas            |
|Chevrolet Chevelle Concours (sw)|
|Ford Torino (sw)                |
|Plymouth Satellite (sw)         |
|AMC Rebel SST (sw)              |
|Dodge Challenger SE             |
|Plymouth 'Cuda 340              |
|Ford Mustang Boss 302           |
|Chevrolet Monte Carlo           |
|Buick Estate Wagon (sw)         |
+--------------------------------+
only showing top 20 rows



In [22]:
from pyspark.sql.functions import col
df.select(col("car")).show(truncate=False)

+--------------------------------+
|car                             |
+--------------------------------+
|Chevrolet Chevelle Malibu       |
|Buick Skylark 320               |
|Plymouth Satellite              |
|AMC Rebel SST                   |
|Ford Torino                     |
|Ford Galaxie 500                |
|Chevrolet Impala                |
|Plymouth Fury iii               |
|Pontiac Catalina                |
|AMC Ambassador DPL              |
|Citroen DS-21 Pallas            |
|Chevrolet Chevelle Concours (sw)|
|Ford Torino (sw)                |
|Plymouth Satellite (sw)         |
|AMC Rebel SST (sw)              |
|Dodge Challenger SE             |
|Plymouth 'Cuda 340              |
|Ford Mustang Boss 302           |
|Chevrolet Monte Carlo           |
|Buick Estate Wagon (sw)         |
+--------------------------------+
only showing top 20 rows



In [23]:
# M1
print(df.Car, df.Cylinders)
print("*"*40)
df.select(df.Car, df.Cylinders).show(truncate=False)

Column<'Car'> Column<'Cylinders'>
****************************************
+--------------------------------+---------+
|Car                             |Cylinders|
+--------------------------------+---------+
|Chevrolet Chevelle Malibu       |8        |
|Buick Skylark 320               |8        |
|Plymouth Satellite              |8        |
|AMC Rebel SST                   |8        |
|Ford Torino                     |8        |
|Ford Galaxie 500                |8        |
|Chevrolet Impala                |8        |
|Plymouth Fury iii               |8        |
|Pontiac Catalina                |8        |
|AMC Ambassador DPL              |8        |
|Citroen DS-21 Pallas            |4        |
|Chevrolet Chevelle Concours (sw)|8        |
|Ford Torino (sw)                |8        |
|Plymouth Satellite (sw)         |8        |
|AMC Rebel SST (sw)              |8        |
|Dodge Challenger SE             |8        |
|Plymouth 'Cuda 340              |8        |
|Ford Mustang Boss 302   

In [24]:
# M2
print(df['car'], df['cylinders'])
print("*"*40)
df.select(df['car'], df['cylinders']).show(truncate=False)

Column<'car'> Column<'cylinders'>
****************************************
+--------------------------------+---------+
|car                             |cylinders|
+--------------------------------+---------+
|Chevrolet Chevelle Malibu       |8        |
|Buick Skylark 320               |8        |
|Plymouth Satellite              |8        |
|AMC Rebel SST                   |8        |
|Ford Torino                     |8        |
|Ford Galaxie 500                |8        |
|Chevrolet Impala                |8        |
|Plymouth Fury iii               |8        |
|Pontiac Catalina                |8        |
|AMC Ambassador DPL              |8        |
|Citroen DS-21 Pallas            |4        |
|Chevrolet Chevelle Concours (sw)|8        |
|Ford Torino (sw)                |8        |
|Plymouth Satellite (sw)         |8        |
|AMC Rebel SST (sw)              |8        |
|Dodge Challenger SE             |8        |
|Plymouth 'Cuda 340              |8        |
|Ford Mustang Boss 302   

In [25]:
# M3
from pyspark.sql.functions import col
df.select(col('car'), col('cylinders')).show(truncate=False)

+--------------------------------+---------+
|car                             |cylinders|
+--------------------------------+---------+
|Chevrolet Chevelle Malibu       |8        |
|Buick Skylark 320               |8        |
|Plymouth Satellite              |8        |
|AMC Rebel SST                   |8        |
|Ford Torino                     |8        |
|Ford Galaxie 500                |8        |
|Chevrolet Impala                |8        |
|Plymouth Fury iii               |8        |
|Pontiac Catalina                |8        |
|AMC Ambassador DPL              |8        |
|Citroen DS-21 Pallas            |4        |
|Chevrolet Chevelle Concours (sw)|8        |
|Ford Torino (sw)                |8        |
|Plymouth Satellite (sw)         |8        |
|AMC Rebel SST (sw)              |8        |
|Dodge Challenger SE             |8        |
|Plymouth 'Cuda 340              |8        |
|Ford Mustang Boss 302           |8        |
|Chevrolet Monte Carlo           |8        |
|Buick Est

In [26]:
# C1: Adding a new column
from pyspark.sql.functions import lit
df = df.withColumn('first_column', lit(1))

df.show(5, truncate=False)

+-------------------------+----+---------+------------+----------+------+------------+-----+------+------------+
|Car                      |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|first_column|
+-------------------------+----+---------+------------+----------+------+------------+-----+------+------------+
|Chevrolet Chevelle Malibu|18.0|8        |307.0       |130.0     |3504.0|12.0        |70   |US    |1           |
|Buick Skylark 320        |15.0|8        |350.0       |165.0     |3693.0|11.5        |70   |US    |1           |
|Plymouth Satellite       |18.0|8        |318.0       |150.0     |3436.0|11.0        |70   |US    |1           |
|AMC Rebel SST            |16.0|8        |304.0       |150.0     |3433.0|12.0        |70   |US    |1           |
|Ford Torino              |17.0|8        |302.0       |140.0     |3449.0|10.5        |70   |US    |1           |
+-------------------------+----+---------+------------+----------+------+------------+-----+----

In [27]:
# C2
df = df.withColumn('second_column', lit(2)) \
      .withColumn('third_column', lit('Third Column'))
df.show(5, truncate=False)

+-------------------------+----+---------+------------+----------+------+------------+-----+------+------------+-------------+------------+
|Car                      |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|first_column|second_column|third_column|
+-------------------------+----+---------+------------+----------+------+------------+-----+------+------------+-------------+------------+
|Chevrolet Chevelle Malibu|18.0|8        |307.0       |130.0     |3504.0|12.0        |70   |US    |1           |2            |Third Column|
|Buick Skylark 320        |15.0|8        |350.0       |165.0     |3693.0|11.5        |70   |US    |1           |2            |Third Column|
|Plymouth Satellite       |18.0|8        |318.0       |150.0     |3436.0|11.0        |70   |US    |1           |2            |Third Column|
|AMC Rebel SST            |16.0|8        |304.0       |150.0     |3433.0|12.0        |70   |US    |1           |2            |Third Column|
|Ford Torino        

In [28]:
# C3
from pyspark.sql.functions import concat
df = df.withColumn('car_model', concat(col("Car"), lit(" "), col("model")))
df.show(5,truncate=False)

+-------------------------+----+---------+------------+----------+------+------------+-----+------+------------+-------------+------------+----------------------------+
|Car                      |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|first_column|second_column|third_column|car_model                   |
+-------------------------+----+---------+------------+----------+------+------------+-----+------+------------+-------------+------------+----------------------------+
|Chevrolet Chevelle Malibu|18.0|8        |307.0       |130.0     |3504.0|12.0        |70   |US    |1           |2            |Third Column|Chevrolet Chevelle Malibu 70|
|Buick Skylark 320        |15.0|8        |350.0       |165.0     |3693.0|11.5        |70   |US    |1           |2            |Third Column|Buick Skylark 320 70        |
|Plymouth Satellite       |18.0|8        |318.0       |150.0     |3436.0|11.0        |70   |US    |1           |2            |Third Column|Plymouth Satelli

In [29]:
#Renaming a column in PySpark
df = df.withColumnRenamed('first_column', 'new_column_one') \
       .withColumnRenamed('second_column', 'new_column_two') \
       .withColumnRenamed('third_column', 'new_column_three')
df.show(truncate=False)

+--------------------------------+----+---------+------------+----------+------+------------+-----+------+--------------+--------------+----------------+-----------------------------------+
|Car                             |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|new_column_one|new_column_two|new_column_three|car_model                          |
+--------------------------------+----+---------+------------+----------+------+------------+-----+------+--------------+--------------+----------------+-----------------------------------+
|Chevrolet Chevelle Malibu       |18.0|8        |307.0       |130.0     |3504.0|12.0        |70   |US    |1             |2             |Third Column    |Chevrolet Chevelle Malibu 70       |
|Buick Skylark 320               |15.0|8        |350.0       |165.0     |3693.0|11.5        |70   |US    |1             |2             |Third Column    |Buick Skylark 320 70               |
|Plymouth Satellite              |18.0|8        |3

In [30]:
df.groupBy('Origin').count().show()

+------+-----+
|Origin|count|
+------+-----+
|Europe|   73|
|    US|  254|
| Japan|   79|
+------+-----+



In [31]:
df.groupBy('Origin', 'Model').count().show(5)

+------+-----+-----+
|Origin|Model|count|
+------+-----+-----+
|Europe|   71|    5|
|Europe|   80|    9|
|Europe|   79|    4|
| Japan|   75|    4|
|    US|   72|   18|
+------+-----+-----+
only showing top 5 rows



In [32]:
df = df.drop('new_column_one')
df.show(5, truncate=False)

+-------------------------+----+---------+------------+----------+------+------------+-----+------+--------------+----------------+----------------------------+
|Car                      |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|new_column_two|new_column_three|car_model                   |
+-------------------------+----+---------+------------+----------+------+------------+-----+------+--------------+----------------+----------------------------+
|Chevrolet Chevelle Malibu|18.0|8        |307.0       |130.0     |3504.0|12.0        |70   |US    |2             |Third Column    |Chevrolet Chevelle Malibu 70|
|Buick Skylark 320        |15.0|8        |350.0       |165.0     |3693.0|11.5        |70   |US    |2             |Third Column    |Buick Skylark 320 70        |
|Plymouth Satellite       |18.0|8        |318.0       |150.0     |3436.0|11.0        |70   |US    |2             |Third Column    |Plymouth Satellite 70       |
|AMC Rebel SST            |16.0|8 

In [33]:
# remove multiple column
df = df.drop('new_column_two') \
.drop('new_column_three')
df.show(5, truncate=False)

+-------------------------+----+---------+------------+----------+------+------------+-----+------+----------------------------+
|Car                      |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|car_model                   |
+-------------------------+----+---------+------------+----------+------+------------+-----+------+----------------------------+
|Chevrolet Chevelle Malibu|18.0|8        |307.0       |130.0     |3504.0|12.0        |70   |US    |Chevrolet Chevelle Malibu 70|
|Buick Skylark 320        |15.0|8        |350.0       |165.0     |3693.0|11.5        |70   |US    |Buick Skylark 320 70        |
|Plymouth Satellite       |18.0|8        |318.0       |150.0     |3436.0|11.0        |70   |US    |Plymouth Satellite 70       |
|AMC Rebel SST            |16.0|8        |304.0       |150.0     |3433.0|12.0        |70   |US    |AMC Rebel SST 70            |
|Ford Torino              |17.0|8        |302.0       |140.0     |3449.0|10.5        |70   |US   

## **DataFrame Operations on Row**

Filtering Rows

In [34]:
# Filtering eows in pyspark
total_count = df.count()
print("Total Record Count: " + str(total_count))
europe_filttered_count = df.filter(col('Origin') == 'Europe').count()
print("Europe Filtered Record Count: " + str(europe_filttered_count))
df.filter(col('origin')=='Europe').show(truncate=False)

Total Record Count: 406
Europe Filtered Record Count: 73
+----------------------------+----+---------+------------+----------+------+------------+-----+------+-------------------------------+
|Car                         |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|car_model                      |
+----------------------------+----+---------+------------+----------+------+------------+-----+------+-------------------------------+
|Citroen DS-21 Pallas        |0.0 |4        |133.0       |115.0     |3090.0|17.5        |70   |Europe|Citroen DS-21 Pallas 70        |
|Volkswagen 1131 Deluxe Sedan|26.0|4        |97.0        |46.0      |1835.0|20.5        |70   |Europe|Volkswagen 1131 Deluxe Sedan 70|
|Peugeot 504                 |25.0|4        |110.0       |87.0      |2672.0|17.5        |70   |Europe|Peugeot 504 70                 |
|Audi 100 LS                 |24.0|4        |107.0       |90.0      |2430.0|14.5        |70   |Europe|Audi 100 LS 70                 

In [39]:
# Filtering rows in Pyspark based on multiple conditions
total_count = df.count()
print('Total Record Count: '+ str(total_count))
europe_filttered_count = df.filter((col('Origin')=='Europe') &
                                    (col('Cylinders')==4)).count()
print("Europe Filtered Record Count: " + str(europe_filttered_count))
df.filter(col('Origin')=='Europe').show(truncate=False)

Total Record Count: 406
Europe Filtered Record Count: 66
+----------------------------+----+---------+------------+----------+------+------------+-----+------+-------------------------------+
|Car                         |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|car_model                      |
+----------------------------+----+---------+------------+----------+------+------------+-----+------+-------------------------------+
|Citroen DS-21 Pallas        |0.0 |4        |133.0       |115.0     |3090.0|17.5        |70   |Europe|Citroen DS-21 Pallas 70        |
|Volkswagen 1131 Deluxe Sedan|26.0|4        |97.0        |46.0      |1835.0|20.5        |70   |Europe|Volkswagen 1131 Deluxe Sedan 70|
|Peugeot 504                 |25.0|4        |110.0       |87.0      |2672.0|17.5        |70   |Europe|Peugeot 504 70                 |
|Audi 100 LS                 |24.0|4        |107.0       |90.0      |2430.0|14.5        |70   |Europe|Audi 100 LS 70                 

### Get Distinct Rows

In [40]:
df.select('Origin').distinct().show()

+------+
|Origin|
+------+
|Europe|
|    US|
| Japan|
+------+



In [41]:
df.select('Origin', 'model').distinct().show()

+------+-----+
|Origin|model|
+------+-----+
|Europe|   71|
|Europe|   80|
|Europe|   79|
| Japan|   75|
|    US|   72|
|    US|   80|
|Europe|   74|
| Japan|   79|
|Europe|   76|
|    US|   75|
| Japan|   77|
|    US|   82|
| Japan|   80|
| Japan|   78|
|    US|   78|
|Europe|   75|
|    US|   71|
|    US|   77|
| Japan|   70|
| Japan|   71|
+------+-----+
only showing top 20 rows



In [44]:
# Sort Rows in Pyspark
df.orderBy('Cylinders').show(truncate=False)

+----------------------------+----+---------+------------+----------+------+------------+-----+------+-------------------------------+
|Car                         |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|car_model                      |
+----------------------------+----+---------+------------+----------+------+------------+-----+------+-------------------------------+
|Mazda RX-4                  |21.5|3        |80.0        |110.0     |2720.0|13.5        |77   |Japan |Mazda RX-4 77                  |
|Mazda RX-7 GS               |23.7|3        |70.0        |100.0     |2420.0|12.5        |80   |Japan |Mazda RX-7 GS 80               |
|Mazda RX2 Coupe             |19.0|3        |70.0        |97.0      |2330.0|13.5        |72   |Japan |Mazda RX2 Coupe 72             |
|Mazda RX3                   |18.0|3        |70.0        |90.0      |2124.0|13.5        |73   |Japan |Mazda RX3 73                   |
|Datsun 510 (sw)             |28.0|4        |97.0      

In [45]:
df.orderBy('Cylinders', ascending=False).show(truncate=False)

+-------------------------+----+---------+------------+----------+------+------------+-----+------+----------------------------+
|Car                      |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|car_model                   |
+-------------------------+----+---------+------------+----------+------+------------+-----+------+----------------------------+
|Plymouth 'Cuda 340       |14.0|8        |340.0       |160.0     |3609.0|8.0         |70   |US    |Plymouth 'Cuda 340 70       |
|Pontiac Safari (sw)      |13.0|8        |400.0       |175.0     |5140.0|12.0        |71   |US    |Pontiac Safari (sw) 71      |
|Ford Mustang Boss 302    |0.0 |8        |302.0       |140.0     |3353.0|8.0         |70   |US    |Ford Mustang Boss 302 70    |
|Buick Skylark 320        |15.0|8        |350.0       |165.0     |3693.0|11.5        |70   |US    |Buick Skylark 320 70        |
|Chevrolet Monte Carlo    |15.0|8        |400.0       |150.0     |3761.0|9.5         |70   |US   

In [46]:
df.groupBy('Origin').count().orderBy('count', ascending=False).show(10)

+------+-----+
|Origin|count|
+------+-----+
|    US|  254|
| Japan|   79|
|Europe|   73|
+------+-----+



### Union Dataframes

You will see three main methods for performing union of dataframes. It is important to know the difference between them and which one is preferred:

* union() – It is used to merge two DataFrames of the same structure/schema. If schemas are not the same, it returns an error
* unionAll() – This function is deprecated since Spark 2.0.0, and replaced with union()
* unionByName() - This function is used to merge two dataframes based on column name.

Since unionAll() is deprecated, union() is the preferred method for merging dataframes.
The difference between unionByName() and union() is that unionByName() resolves columns by name, not by position.

In other SQLs, Union eliminates the duplicates but UnionAll merges two datasets, thereby including duplicate records. But, in PySpark, both behave the same and includes duplicate records. The recommendation is to use distinct() or dropDuplicates() to remove duplicate records.

In [48]:
# case 1: Union when columns are in order
df = spark.read.csv('cars.csv', header=True, sep=';', inferSchema=True)
europe_cars = df.filter((col('Origin')=='Europe') & (col('Cylinders')==5))
japan_cars = df.filter((col('Origin')=='Japan') & (col('Cylinders')==3))
print("EUROPE CARS: "+str(europe_cars.count()))
print("JAPAN CARS: "+str(japan_cars.count()))
print("AFTER UNION: "+str(europe_cars.union(japan_cars).count()))

EUROPE CARS: 3
JAPAN CARS: 4
AFTER UNION: 7


In [50]:
# CASE 1: Union when columns are not in order
# Creating two Dataframes with jumbled columns
df1 = spark.createDataFrame([[1,2,3]], ["col0","col1","col2"])
df2 = spark.createDataFrame([[4,5,6]], ["col1","col2","col0"])
df1.unionByName(df2).show()

+----+----+----+
|col0|col1|col2|
+----+----+----+
|   1|   2|   3|
|   6|   4|   5|
+----+----+----+



## Common Data Manipulation Functions

In [51]:
# Functions available in Pyspark
from  pyspark.sql import functions
# Similar to python, we can use the dir function to view available functiions
print(dir(functions))



### String Functions

In [53]:
# Load the data
from pyspark.sql.functions import col
df = spark.read.csv('cars.csv', header=True, sep=";", inferSchema=True)

In [54]:
from pyspark.sql.functions import col,lower,upper,substring
help(substring)
# alias is used to rename the column name in the output
df.select(col('Car'),lower(col('Car')),upper(col('Car')),substring(col('Car'),1,4).alias("concated value")).show(5,False)

Help on function substring in module pyspark.sql.functions:

substring(str, pos, len)
    Substring starts at `pos` and is of length `len` when str is String type or
    returns the slice of byte array that starts at `pos` in byte and is of length `len`
    when str is Binary type.
    
    .. versionadded:: 1.5.0
    
    Notes
    -----
    The position is not zero based, but 1 based index.
    
    Examples
    --------
    >>> df = spark.createDataFrame([('abcd',)], ['s',])
    >>> df.select(substring(df.s, 1, 2).alias('s')).collect()
    [Row(s='ab')]

+-------------------------+-------------------------+-------------------------+--------------+
|Car                      |lower(Car)               |upper(Car)               |concated value|
+-------------------------+-------------------------+-------------------------+--------------+
|Chevrolet Chevelle Malibu|chevrolet chevelle malibu|CHEVROLET CHEVELLE MALIBU|Chev          |
|Buick Skylark 320        |buick skylark 320        |BUI

In [56]:
from pyspark.sql.functions import concat
df.select(col("Car"),col("model"),concat(col("Car"), lit(" "), col("model"))).show(5, False)

+-------------------------+-----+----------------------------+
|Car                      |model|concat(Car,  , model)       |
+-------------------------+-----+----------------------------+
|Chevrolet Chevelle Malibu|70   |Chevrolet Chevelle Malibu 70|
|Buick Skylark 320        |70   |Buick Skylark 320 70        |
|Plymouth Satellite       |70   |Plymouth Satellite 70       |
|AMC Rebel SST            |70   |AMC Rebel SST 70            |
|Ford Torino              |70   |Ford Torino 70              |
+-------------------------+-----+----------------------------+
only showing top 5 rows



### Numeric functions

**Show the oldest date and the most recent date**

In [57]:
from pyspark.sql.functions import min, max
df.select(min(col("Weight")), max(col('Weight'))).show()

+-----------+-----------+
|min(Weight)|max(Weight)|
+-----------+-----------+
|       1613|       5140|
+-----------+-----------+



**Add 10 to the minimum and maximum weight**

In [59]:
from pyspark.sql.functions import min, max, lit
df.select(min(col("Weight"))+lit(10), max(col("Weight")+lit(10))).show()

+------------------+------------------+
|(min(Weight) + 10)|max((Weight + 10))|
+------------------+------------------+
|              1623|              5150|
+------------------+------------------+



### Operations on Date

In [61]:
from pyspark.sql.functions import to_date, to_timestamp, lit
df = spark.createDataFrame([('2019-12-25 13:30:00',)], ['DOB'])
df.show()
df.printSchema()

+-------------------+
|                DOB|
+-------------------+
|2019-12-25 13:30:00|
+-------------------+

root
 |-- DOB: string (nullable = true)



In [62]:
df = spark.createDataFrame([('2019-12-25 13:30:00',)], ['DOB'])
df = df.select(to_date(col('DOB'), 'yyyy-MM-dd HH:mm:ss'), to_timestamp(col('DOB'),'yyyy-MM-dd HH:mm:ss'))
df.show()
df.printSchema()

+---------------------------------+--------------------------------------+
|to_date(DOB, yyyy-MM-dd HH:mm:ss)|to_timestamp(DOB, yyyy-MM-dd HH:mm:ss)|
+---------------------------------+--------------------------------------+
|                       2019-12-25|                   2019-12-25 13:30:00|
+---------------------------------+--------------------------------------+

root
 |-- to_date(DOB, yyyy-MM-dd HH:mm:ss): date (nullable = true)
 |-- to_timestamp(DOB, yyyy-MM-dd HH:mm:ss): timestamp (nullable = true)



In [63]:
df = spark.createDataFrame([('2019/12/25 13:30:00',)], ['DOB'])
df = df.select(to_date(col('DOB'), 'yyyy/MM/dd HH:mm:ss'), to_timestamp(col('DOB'),'yyyy/MM/dd HH:mm:ss'))
df.show()
df.printSchema()

+---------------------------------+--------------------------------------+
|to_date(DOB, yyyy/MM/dd HH:mm:ss)|to_timestamp(DOB, yyyy/MM/dd HH:mm:ss)|
+---------------------------------+--------------------------------------+
|                       2019-12-25|                   2019-12-25 13:30:00|
+---------------------------------+--------------------------------------+

root
 |-- to_date(DOB, yyyy/MM/dd HH:mm:ss): date (nullable = true)
 |-- to_timestamp(DOB, yyyy/MM/dd HH:mm:ss): timestamp (nullable = true)



**What is 3 days earlier that the oldest date and 3 days later than the most recent date**

In [64]:
from pyspark.sql.functions import date_add, date_sub
# create a dummy dataframe
df = spark.createDataFrame([('1990-01-01',),('1995-01-03',),('2021-03-30',)], ['Date'])
# find out the required dates
df.select(date_add(max(col('Date')),3), date_sub(min(col('Date')), 3)).show()

+----------------------+----------------------+
|date_add(max(Date), 3)|date_sub(min(Date), 3)|
+----------------------+----------------------+
|            2021-04-02|            1989-12-29|
+----------------------+----------------------+



In [65]:
# Create two Dataframe
cars_df = spark.createDataFrame([[1, 'Car A'], [2, 'Car B'], [3, 'Car C']], ["id", "car_name"])
car_price_df = spark.createDataFrame([[1, 1000],[2, 2000],[3, 3000]], ["id", "car_price"])
cars_df.show()
car_price_df.show()

+---+--------+
| id|car_name|
+---+--------+
|  1|   Car A|
|  2|   Car B|
|  3|   Car C|
+---+--------+

+---+---------+
| id|car_price|
+---+---------+
|  1|     1000|
|  2|     2000|
|  3|     3000|
+---+---------+



In [66]:
# Executing an inner join so we can see the id, name and price of each car in one row
cars_df.join(car_price_df, cars_df.id == car_price_df.id, 'inner').select(cars_df['id'], cars_df['car_name'],car_price_df['car_price']).show(truncate=False)

+---+--------+---------+
|id |car_name|car_price|
+---+--------+---------+
|1  |Car A   |1000     |
|3  |Car C   |3000     |
|2  |Car B   |2000     |
+---+--------+---------+



As you can see, we have done an inner join between two dataframes. The following joins are supported by PySpark:

1.inner (default)

2.cross

3.outer

4.full

5.full_outer

6.left

7.left_outer

8.right

9.right_outer

10.left_semi

11.left_anti

## Spark SQL
SQL has been around since the 1970s, and so one can imagine the number of people who made it their bread and butter. As big data came into popularity, the number of professionals with the technical knowledge to deal with it was in shortage. This led to the creation of Spark SQL. To quote the docs:

Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations.

Basically, what you need to know is that Spark SQL is used to execute SQL queries on big data. Spark SQL can also be used to read data from Hive tables and views. Let me explain Spark SQL with an example.

In [68]:
# Load data
df = spark.read.csv('cars.csv',header=True,sep=';')
# Register temporary Table
df.createOrReplaceTempView("temp")
# select all the data from temp data
spark.sql("select * from temp limit 5").show()
# Select count of data in table
spark.sql("select count(*) as total_count from temp").show()

+--------------------+----+---------+------------+----------+------+------------+-----+------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0| 3504.|        12.0|   70|    US|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0| 3693.|        11.5|   70|    US|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0| 3436.|        11.0|   70|    US|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0| 3433.|        12.0|   70|    US|
|         Ford Torino|17.0|        8|       302.0|     140.0| 3449.|        10.5|   70|    US|
+--------------------+----+---------+------------+----------+------+------------+-----+------+

+-----------+
|total_count|
+-----------+
|        406|
+-----------+



As you can see, we registered the dataframe as temporary table and then ran basic SQL queries on it. How amazing is that?!
If you are a person who is more comfortable with SQL, then this feature is truly a blessing for you! But this raises a question:

Should I just keep using Spark SQL all the time?

And the answer is, it depends.
So basically, the different functions acts in differnet ways, and depending upon the type of action you are trying to do, the speed at which it completes execution also differs. But as time progress, this feature is getting better and better, so hopefully the difference should be a small margin. There are plenty of analysis done on this, but nothing has a definite answer yet. You can read this comparative study done by horton works or the answer to this stackoverflow question if you are still curious about it.


##RDD
With map, you define a function and then apply it record by record. Flatmap returns a new RDD by first applying a function to all of the elements in RDDs and then flattening the result. Filter, returns a new RDD. Meaning only the elements that satisfy a condition. With reduce, we are taking neighboring elements and producing a single combined result. For example, let's say you have a set of numbers. You can reduce this to its sum by providing a function that takes as input two values and reduces them to one.

Some of the reasons you would use a dataframe over RDD are:

* It's ability to represnt data as rows and columns. But this also means it can only hold structred and semi-structured data.
* It allows processing data in different formats (AVRO, CSV, JSON, and storage system HDFS, HIVE tables, MySQL).
* It's superior job Optimization capability.
* DataFrame API is very easy to use

In [69]:
cars = spark.sparkContext.textFile('cars.csv')
print(cars.first())
cars_header = cars.first()
cars_rest = cars.filter(lambda line: line!=cars_header)
print(cars_rest.first())

Car;MPG;Cylinders;Displacement;Horsepower;Weight;Acceleration;Model;Origin
Chevrolet Chevelle Malibu;18.0;8;307.0;130.0;3504.;12.0;70;US


**How many cars are in our csv data**

In [70]:
cars_rest.map(lambda line: line.split(";")).count()

406

**Display the car name, MPG, Cylinders, Weight and Origin for the cars Originating in Europe**

In [72]:
# car name is column 0
(cars_rest.filter(lambda line: line.split(";")[8]=='Europe').
 map(lambda line: (line.split(";")[0],
     line.split(';')[1],
     line.split(";")[2],
     line.split(";")[5],
     line.split(";")[8])).collect())

[('Citroen DS-21 Pallas', '0', '4', '3090.', 'Europe'),
 ('Volkswagen 1131 Deluxe Sedan', '26.0', '4', '1835.', 'Europe'),
 ('Peugeot 504', '25.0', '4', '2672.', 'Europe'),
 ('Audi 100 LS', '24.0', '4', '2430.', 'Europe'),
 ('Saab 99e', '25.0', '4', '2375.', 'Europe'),
 ('BMW 2002', '26.0', '4', '2234.', 'Europe'),
 ('Volkswagen Super Beetle 117', '0', '4', '1978.', 'Europe'),
 ('Opel 1900', '28.0', '4', '2123.', 'Europe'),
 ('Peugeot 304', '30.0', '4', '2074.', 'Europe'),
 ('Fiat 124B', '30.0', '4', '2065.', 'Europe'),
 ('Volkswagen Model 111', '27.0', '4', '1834.', 'Europe'),
 ('Volkswagen Type 3', '23.0', '4', '2254.', 'Europe'),
 ('Volvo 145e (sw)', '18.0', '4', '2933.', 'Europe'),
 ('Volkswagen 411 (sw)', '22.0', '4', '2511.', 'Europe'),
 ('Peugeot 504 (sw)', '21.0', '4', '2979.', 'Europe'),
 ('Renault 12 (sw)', '26.0', '4', '2189.', 'Europe'),
 ('Volkswagen Super Beetle', '26.0', '4', '1950.', 'Europe'),
 ('Fiat 124 Sport Coupe', '26.0', '4', '2265.', 'Europe'),
 ('Fiat 128', '29

**Display the Car name, MPG, Cylinders, Weight and Origin for the cars Originating in either Europe or Japan**

In [73]:
# car name is column 0
(cars_rest.filter(lambda line: line.split(";")[8] in ['Europe','Japan']).
 map(lambda line: (line.split(";")[0],
     line.split(';')[1],
     line.split(";")[2],
     line.split(";")[5],
     line.split(";")[8])).collect())

[('Citroen DS-21 Pallas', '0', '4', '3090.', 'Europe'),
 ('Toyota Corolla Mark ii', '24.0', '4', '2372.', 'Japan'),
 ('Datsun PL510', '27.0', '4', '2130.', 'Japan'),
 ('Volkswagen 1131 Deluxe Sedan', '26.0', '4', '1835.', 'Europe'),
 ('Peugeot 504', '25.0', '4', '2672.', 'Europe'),
 ('Audi 100 LS', '24.0', '4', '2430.', 'Europe'),
 ('Saab 99e', '25.0', '4', '2375.', 'Europe'),
 ('BMW 2002', '26.0', '4', '2234.', 'Europe'),
 ('Datsun PL510', '27.0', '4', '2130.', 'Japan'),
 ('Toyota Corolla', '25.0', '4', '2228.', 'Japan'),
 ('Volkswagen Super Beetle 117', '0', '4', '1978.', 'Europe'),
 ('Opel 1900', '28.0', '4', '2123.', 'Europe'),
 ('Peugeot 304', '30.0', '4', '2074.', 'Europe'),
 ('Fiat 124B', '30.0', '4', '2065.', 'Europe'),
 ('Toyota Corolla 1200', '31.0', '4', '1773.', 'Japan'),
 ('Datsun 1200', '35.0', '4', '1613.', 'Japan'),
 ('Volkswagen Model 111', '27.0', '4', '1834.', 'Europe'),
 ('Toyota Corolla Hardtop', '24.0', '4', '2278.', 'Japan'),
 ('Volkswagen Type 3', '23.0', '4', '

Certainly! Let's briefly go through each of the main modules and submodules in the PySpark package, along with a simple example for each where applicable.

1. **pyspark.sql module:**
    - **Module Context:**
        - `pyspark.sql.SparkSession` is the entry point to interact with DataFrame and SQL functionality.
        ```python
        from pyspark.sql import SparkSession

        # Create a Spark session
        spark = SparkSession.builder.appName("example").getOrCreate()
        ```

    - **pyspark.sql.types module:**
        - Provides data types used in PySpark SQL.
        ```python
        from pyspark.sql.types import StructType, StructField, StringType, IntegerType

        # Define a schema
        schema = StructType([
            StructField("name", StringType(), True),
            StructField("age", IntegerType(), True)
        ])
        ```

    - **pyspark.sql.functions module:**
        - Contains built-in functions for DataFrame manipulation.
        ```python
        from pyspark.sql.functions import col, count

        # Use functions to manipulate DataFrame
        df.select("column1", "column2").filter(col("column3") > 5).groupBy("column1").agg(count("*"))
        ```

    - **pyspark.sql.streaming module:**
        - Enables streaming capabilities in PySpark.
        ```python
        from pyspark.sql import SparkSession

        # Create a Spark session
        spark = SparkSession.builder.appName("streaming_example").getOrCreate()

        # Read data from a streaming source
        streaming_df = spark.readStream.format("source").load()

        # Perform streaming transformations
        result = streaming_df.groupBy("column").count()

        # Write the result to a streaming sink
        query = result.writeStream.outputMode("complete").format("console").start()
        query.awaitTermination()
        ```

2. **pyspark.streaming module:**
    - **pyspark.streaming.kafka module:**
        - Provides functionality for consuming and producing data from/to Kafka topics.
        ```python
        from pyspark.streaming.kafka import KafkaUtils

        # Create a Kafka Direct Stream
        kafkaStream = KafkaUtils.createDirectStream(ssc, ["topic"], {"metadata.broker.list": "broker"})

        # Process the stream
        kafkaStream.map(lambda x: x[1]).flatMap(lambda line: line.split(" ")).countByValue().pprint()
        ```

    - **pyspark.streaming.kinesis module:**
        - Offers support for processing data from Amazon Kinesis.
        ```python
        from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream

        # Create a Kinesis Direct Stream
        kinesisStream = KinesisUtils.createStream(ssc, "appName", "streamName", "endpointURL", "regionName",
                                                 "initialPositionInStream", 2, StorageLevel.MEMORY_AND_DISK_2)

        # Process the Kinesis stream
        kinesisStream.map(lambda x: x[1]).flatMap(lambda line: line.split(" ")).countByValue().pprint()
        ```

3. **pyspark.ml package:**
    - **ML Pipeline APIs:**
        - PySpark ML provides a high-level API for building machine learning pipelines.
        ```python
        from pyspark.ml import Pipeline
        from pyspark.ml.feature import VectorAssembler
        from pyspark.ml.regression import LinearRegression

        # Define feature columns
        feature_columns = ["feature1", "feature2"]

        # Create a feature vector assembler
        assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

        # Create a linear regression model
        lr = LinearRegression(featuresCol="features", labelCol="label")

        # Create a pipeline
        pipeline = Pipeline(stages=[assembler, lr])

        # Fit the pipeline to training data
        model = pipeline.fit(training_data)
        ```

    - **pyspark.ml.param module:**
        - Handles parameters for PySpark ML algorithms.
        ```python
        from pyspark.ml.classification import LogisticRegression

        # Create a Logistic Regression model with custom parameters
        lr = LogisticRegression(maxIter=10, regParam=0.01)

        # Fit the model
        model = lr.fit(training_data)
        ```

    - **pyspark.ml.feature module:**
        - Provides feature extraction and transformation tools.
        ```python
        from pyspark.ml.feature import Tokenizer

        # Tokenize a text column
        tokenizer = Tokenizer(inputCol="text", outputCol="words")
        words_df = tokenizer.transform(dataframe)
        ```

    - **pyspark.ml.classification module:**
        - Implements classification algorithms.
        ```python
        from pyspark.ml.classification import RandomForestClassifier

        # Create a Random Forest Classifier
        rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=10)

        # Train the model
        model = rf.fit(training_data)
        ```

    - **pyspark.ml.clustering module:**
        - Includes clustering algorithms.
        ```python
        from pyspark.ml.clustering import KMeans

        # Create a KMeans model
        kmeans = KMeans(featuresCol="features", k=3)

        # Fit the model
        model = kmeans.fit(training_data)
        ```

    - **pyspark.ml.linalg module:**
        - Provides linear algebra utilities.
        ```python
        from pyspark.ml.linalg import Vectors

        # Create a dense vector
        dense_vector = Vectors.dense([1.0, 2.0, 3.0])

        # Create a sparse vector
        sparse_vector = Vectors.sparse(3, [0, 2], [1.0, 3.0])
        ```

    - **pyspark.ml.recommendation module:**
        - Includes collaborative filtering for recommendation.
        ```python
        from pyspark.ml.recommendation import ALS

        # Create an ALS model
        als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")

        # Fit the model
        model = als.fit(training_data)
        ```

    - **pyspark.ml.regression module:**
        - Implements regression algorithms.
        ```python
        from pyspark.ml.regression import LinearRegression

        # Create a Linear Regression model
        lr = LinearRegression(featuresCol="features", labelCol="label")

        # Train the model
        model = lr.fit(training_data)
        ```

    - **pyspark.ml.tuning module:**
        - Supports hyperparameter tuning.
        ```python
        from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

        # Define a parameter grid
        param_grid = ParamGridBuilder().addGrid(lr.regParam, [0.01, 0.1, 0.5]).build()

        # Create a cross-validator
        crossval = CrossValidator(estimator=lr, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3)

        # Run cross-validation to find the best model
        cv_model = crossval.fit(training_data)
        ```

    - **pyspark.ml.evaluation module:**
        -

 Provides evaluation metrics for models.
        ```python
        from pyspark.ml.evaluation import BinaryClassificationEvaluator

        # Create a Binary Classification Evaluator
        evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label")

        # Evaluate the model
        auc = evaluator.evaluate(predictions)
        ```

4. **pyspark.mllib package:**
    - **pyspark.mllib.classification module:**
        - Implements classic machine learning classification algorithms.
        ```python
        from pyspark.mllib.classification import SVMWithSGD
        from pyspark.mllib.regression import LabeledPoint

        # Create labeled points
        labeled_points = [LabeledPoint(1.0, [1.0, 0.0, 3.0]), LabeledPoint(0.0, [2.0, 1.0, 0.0])]

        # Train an SVM model
        model = SVMWithSGD.train(sc.parallelize(labeled_points))
        ```

    - **pyspark.mllib.clustering module:**
        - Includes classic machine learning clustering algorithms.
        ```python
        from pyspark.mllib.clustering import KMeans
        from pyspark.mllib.linalg import Vectors

        # Create RDD of vectors
        data = [Vectors.dense([1.0, 2.0]), Vectors.dense([2.0, 3.0]), Vectors.dense([8.0, 7.0])]

        # Train a KMeans model
        model = KMeans.train(sc.parallelize(data), k=2, maxIterations=10)
        ```

    - **pyspark.mllib.evaluation module:**
        - Provides evaluation metrics for models.
        ```python
        from pyspark.mllib.evaluation import BinaryClassificationMetrics

        # Evaluate a binary classification model
        metrics = BinaryClassificationMetrics(predictions_and_labels)
        auc = metrics.areaUnderROC
        ```

    - **pyspark.mllib.feature module:**
        - Includes feature extraction and transformation tools.
        ```python
        from pyspark.mllib.feature import HashingTF
        from pyspark.mllib.feature import IDF

        # Create RDD of documents
        documents = ["This is a sample document", "Another document for testing", "And one more document"]

        # Tokenize and apply TF-IDF transformation
        hashingTF = HashingTF()
        tf = hashingTF.transform(documents)
        idf = IDF().fit(tf)
        tfidf = idf.transform(tf)
        ```

    - **pyspark.mllib.fpm module:**
        - Implements frequent pattern mining algorithms.
        ```python
        from pyspark.mllib.fpm import FPGrowth

        # Create an RDD of transactions
        data = [["item1", "item2", "item3"], ["item1", "item2"], ["item1", "item3"], ["item2", "item3"]]

        # Train an FP-growth model
        model = FPGrowth.train(sc.parallelize(data), minSupport=0.2, numPartitions=10)
        ```

    - **pyspark.mllib.linalg module:**
        - Provides linear algebra utilities.
        ```python
        from pyspark.mllib.linalg import Vectors

        # Create a dense vector
        dense_vector = Vectors.dense([1.0, 2.0, 3.0])

        # Create a sparse vector
        sparse_vector = Vectors.sparse(3, [0, 2], [1.0, 3.0])
        ```

    - **pyspark.mllib.linalg.distributed module:**
        - Provides distributed linear algebra utilities.
        ```python
        from pyspark.mllib.linalg.distributed import RowMatrix

        # Create an RDD of vectors
        rows = sc.parallelize([Vectors.dense([1.0, 2.0]), Vectors.dense([2.0, 3.0]), Vectors.dense([8.0, 7.0])])

        # Create a distributed RowMatrix
        mat = RowMatrix(rows)
        ```

    - **pyspark.mllib.random module:**
        - Implements random data generation.
        ```python
        from pyspark.mllib.random import RandomRDDs

        # Generate a random RDD of Gaussian numbers
        random_rdd = RandomRDDs.normalRDD(sc, 1000, 2)
        ```

    - **pyspark.mllib.recommendation module:**
        - Implements collaborative filtering for recommendation.
        ```python
        from pyspark.mllib.recommendation import ALS

        # Create an ALS model
        model = ALS.train(training_data, rank=10, iterations=5, lambda_=0.01)

        # Make recommendations
        user_product = (1, 2)
        prediction = model.predict(user_product[0], user_product[1])
        ```

    - **pyspark.mllib.regression module:**
        - Implements classic machine learning regression algorithms.
        ```python
        from pyspark.mllib.regression import LinearRegressionWithSGD
        from pyspark.mllib.regression import LabeledPoint

        # Create labeled points
        labeled_points = [LabeledPoint(1.0, [1.0, 0.0, 3.0]), LabeledPoint(0.0, [2.0, 1.0, 0.0])]

        # Train a linear regression model
        model = LinearRegressionWithSGD.train(sc.parallelize(labeled_points))
        ```

    - **pyspark.mllib.stat module:**
        - Provides statistical tools.
        ```python
        from pyspark.mllib.stat import Statistics
        from pyspark.mllib.linalg import Vectors

        # Create RDD of vectors
        data = sc.parallelize([Vectors.dense([1.0, 2.0]), Vectors.dense([2.0, 3.0]), Vectors.dense([8.0, 7.0])])

        # Calculate correlation matrix
        correlation_matrix = Statistics.corr(data)
        ```

    - **pyspark.mllib.tree module:**
        - Implements decision trees.
        ```python
        from pyspark.mllib.tree import DecisionTree
        from pyspark.mllib.regression import LabeledPoint

        # Create labeled points
        labeled_points = [LabeledPoint(1.0, [1.0, 0.0, 3.0]), LabeledPoint(0.0, [2.0, 1.0, 0.0])]

        # Train a decision tree model
        model = DecisionTree.trainClassifier(sc.parallelize(labeled_points), numClasses=2, categoricalFeaturesInfo={})
        ```

    - **pyspark.mllib.util module:**
        - Provides utility functions.
        ```python
        from pyspark.mllib.util import MLUtils

        # Load a dataset as an RDD of LabeledPoints
        data = MLUtils.loadLibSVMFile(sc, "data/libsvm.txt")
        ```

This provides a high-level overview and examples for each major module and submodule in PySpark. Keep in mind that the examples are simplified for illustration purposes, and the actual usage may involve more detailed configurations and considerations depending on