- DataFrame.to(schema)
- It can:
    - Reorder columns by name to match the schema.
    - Project away (remove) columns not needed by the schema.
    - Cast column types if compatible.
    - Fail if there are missing columns or incompatible types.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType

spark = SparkSession.builder.appName("PySpark DataFrame.to() Example woth Schema").getOrCreate()


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/15 13:27:49 WARN Utils: Your hostname, KLZPC0015, resolves to a loopback address: 127.0.1.1; using 172.25.17.96 instead (on interface eth0)
25/09/15 13:27:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/15 13:28:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/09/15 13:28:04 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/09/15 13:28:04 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/09/15 13:28:04 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
25/09/15 13:28:04 WARN Utils: Serv

In [4]:
data = [
    ("Souvik", 72, "Math", True),
    ("Soukarjya", 32, "Chemistry", True),
    ("Sandip", 74,"Math", False),
    ("Prodipta", 76, "Data Analyst", True),
    ("RamaSai", 69, "System Engineer", False),
    ("Riya", 78, "Oracle Developer", True),
    ("Padma", 46, "Data Analyst", True)
]

columns = ["name", "score", "subject", "passed"]

df = spark.createDataFrame(data, columns)
df.show(truncate=False)
df.printSchema()




+---------+-----+----------------+------+
|name     |score|subject         |passed|
+---------+-----+----------------+------+
|Souvik   |72   |Math            |true  |
|Soukarjya|32   |Chemistry       |true  |
|Sandip   |74   |Math            |false |
|Prodipta |76   |Data Analyst    |true  |
|RamaSai  |69   |System Engineer |false |
|Riya     |78   |Oracle Developer|true  |
|Padma    |46   |Data Analyst    |true  |
+---------+-----+----------------+------+

root
 |-- name: string (nullable = true)
 |-- score: long (nullable = true)
 |-- subject: string (nullable = true)
 |-- passed: boolean (nullable = true)



                                                                                

define a new Schema (Reordered columns, different types)

Define a new schema:
  - Reorders columns: Passed Score, Name
  - Casts Score from IntegerType to StringType
  - Drops Subject column (it is missing from the schema, so it will be droped)


In [5]:
schema = StructType(
    [
        StructField("passed", BooleanType(), True),     # keep as boolean
        StructField("score", StringType(), True),       # casr int to string
        StructField("name", StringType(), True)         # keep as string
    ]
)


In [None]:
# apply the schema with .to()
# Reconcile the DataFrame to match the specified schema

df2 = df.to(schema)

print("\n New DataFrame After Applying Schema with .to(): ")
df2.show(truncate=False)
df2.printSchema()



 New DataFrame After Applying Schema with .to(): 


[Stage 5:>                                                          (0 + 3) / 3]

+------+-----+---------+
|passed|score|name     |
+------+-----+---------+
|true  |72   |Souvik   |
|true  |32   |Soukarjya|
|false |74   |Sandip   |
|true  |76   |Prodipta |
|false |69   |RamaSai  |
|true  |78   |Riya     |
|true  |46   |Padma    |
+------+-----+---------+

root
 |-- passed: boolean (nullable = true)
 |-- score: string (nullable = true)
 |-- name: string (nullable = true)



                                                                                

25/09/15 15:13:47 WARN NettyRpcEnv: Ignored failure: java.util.concurrent.TimeoutException: Cannot receive any reply from 172.25.17.96:37063 in 10000 milliseconds


- What Happened?
    - The 'subject' column was dropped because it wasn't in the schema.
    - The columns were reordered: Passed, Score, Name.
    - The 'score' column was cast from int to string.
    - Nullability rules and types were validated automatically