# PySpark DataFrames, Schemas, and Data Types

## Import Modules

In [35]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, FloatType, DateType, BooleanType
from pyspark.sql.functions import col, expr, concat_ws, round

from datetime import datetime

## Initiate Spark Session

In [2]:
spark = SparkSession.builder.appName("PySparkDataFrames").getOrCreate()

25/02/27 09:26:19 WARN Utils: Your hostname, Cesars-MBP.local resolves to a loopback address: 127.0.0.1; using 192.168.7.230 instead (on interface en0)
25/02/27 09:26:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/02/27 09:26:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Create a list of tuples
data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

In [4]:
type(data)

list

## StructType

A `StructType()` is a collection of `StructField()` that define the column's name, data type, and a boolean value to specify if the field can have NULL values or not.

In [5]:
schema = StructType([
    StructField("firstName", StringType(), True),
    StructField("middleName", StringType(), True),
    StructField("lastName", StringType(), True),
    StructField("id", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("salary", IntegerType(), True)
])

## Create DataFrame

In [6]:
df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()

root
 |-- firstName: string (nullable = true)
 |-- middleName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



In [7]:
df.show(truncate=False)

                                                                                

+---------+----------+--------+-----+------+------+
|firstName|middleName|lastName|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|Michael  |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Maria    |Anne      |Jones   |39192|F     |4000  |
|Jen      |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+



## DataFrame Reader and Writer

The DataFrame Reader is a built-in API within the DataFrame that allows one to read various source files (CSV, JSON) and other Big Data file types (Parquet, ORC, and AVRO). 

In [8]:
file_path = "../../data/input/fire-incidents.csv"

In [9]:
fire_incidents_df = (spark.read.format("csv")
                     .option("header", True)
                     .option("inferSchema", True)
                     .load(file_path))

                                                                                

In [10]:
fire_incidents_df.select("IncidentNumber", "IncidentDate", "City").show(10)

+--------------+-------------------+-------------+
|IncidentNumber|       IncidentDate|         City|
+--------------+-------------------+-------------+
|      20104668|2020-09-11 00:00:00|San Francisco|
|      20104708|2020-09-11 00:00:00|San Francisco|
|      20104648|2020-09-10 00:00:00|San Francisco|
|      20104598|2020-09-10 00:00:00|San Francisco|
|      20104575|2020-09-10 00:00:00|San Francisco|
|      20104477|2020-09-10 00:00:00|San Francisco|
|      20104443|2020-09-10 00:00:00|San Francisco|
|      20104605|2020-09-10 00:00:00|San Francisco|
|      20104474|2020-09-10 00:00:00|San Francisco|
|      20104652|2020-09-10 00:00:00|San Francisco|
+--------------+-------------------+-------------+
only showing top 10 rows



***Note***: The `select()` statement refers to a *projection*, which projects (selects) the columns one requires. Spark will then resolve at the schema level after an action is called. 

In [11]:
fire_incidents_df.printSchema()

root
 |-- IncidentNumber: integer (nullable = true)
 |-- ExposureNumber: integer (nullable = true)
 |-- ID: integer (nullable = true)
 |-- Address: string (nullable = true)
 |-- IncidentDate: timestamp (nullable = true)
 |-- CallNumber: integer (nullable = true)
 |-- AlarmDtTm: timestamp (nullable = true)
 |-- ArrivalDtTm: timestamp (nullable = true)
 |-- CloseDtTm: timestamp (nullable = true)
 |-- City: string (nullable = true)
 |-- ZIPCode: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- SuppressionUnits: integer (nullable = true)
 |-- SuppressionPersonnel: integer (nullable = true)
 |-- EMSUnits: integer (nullable = true)
 |-- EMSPersonnel: integer (nullable = true)
 |-- OtherUnits: integer (nullable = true)
 |-- OtherPersonnel: integer (nullable = true)
 |-- FirstUnitOnScene: string (nullable = true)
 |-- EstimatedPropertyLoss: integer (nullable = true)
 |-- EstimatedContentsLoss: d

Adding a datetime stamp to the directory name that will contain the collection of PARQUET files.

In [12]:
date_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

In [13]:
export_path = f"../../data/output/fire_incidents_{date_timestamp}"

fire_incidents_df.write.format("parquet").mode("overwrite").save(export_path)

25/02/27 09:27:20 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


[Stage 6:>                                                          (0 + 8) / 8]

25/02/27 09:27:21 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

## Structured Operations

### Reading JSON Files

In [17]:
schema_json_persons = StructType([
    StructField("id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("favorite_movies", ArrayType(StringType()), True), # Note the StringType inside ArrayType
    StructField("salary", FloatType(), True), 
    StructField("image_url", StringType(), True), 
    StructField("date_of_birth", DateType(), True), 
    StructField("active", BooleanType(), True),  
])

In [18]:
json_file_path = "../../data/input/persons.json"

In [19]:
persons_df = spark.read.json(json_file_path, schema=schema_json_persons, multiLine=True)

In [20]:
persons_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- favorite_movies: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- salary: float (nullable = true)
 |-- image_url: string (nullable = true)
 |-- date_of_birth: date (nullable = true)
 |-- active: boolean (nullable = true)



In [23]:
persons_df.show(10, truncate=False)

+---+----------+---------+---------------+-------+-----------------------------------------------+-------------+------+
|id |first_name|last_name|favorite_movies|salary |image_url                                      |date_of_birth|active|
+---+----------+---------+---------------+-------+-----------------------------------------------+-------------+------+
|1  |Drucy     |Poppy    |null           |1463.36|http://dummyimage.com/126x166.png/cc0000/ffffff|1991-02-16   |true  |
|2  |Emelyne   |Blaza    |null           |3006.04|http://dummyimage.com/158x106.bmp/cc0000/ffffff|1991-11-02   |false |
|3  |Max       |Rettie   |null           |1422.88|http://dummyimage.com/237x140.jpg/ff4444/ffffff|1990-03-03   |false |
|4  |Ilario    |Kean     |null           |3561.36|http://dummyimage.com/207x121.jpg/cc0000/ffffff|1987-06-09   |true  |
|5  |Toddy     |Drexel   |null           |4934.87|http://dummyimage.com/116x202.png/cc0000/ffffff|1992-10-28   |true  |
|6  |Oswald    |Petrolli |null          

### Columns and Expressions

In [30]:
persons_df.select("first_name", "last_name", "date_of_birth").show(5)

+----------+---------+-------------+
|first_name|last_name|date_of_birth|
+----------+---------+-------------+
|     Drucy|    Poppy|   1991-02-16|
|   Emelyne|    Blaza|   1991-11-02|
|       Max|   Rettie|   1990-03-03|
|    Ilario|     Kean|   1987-06-09|
|     Toddy|   Drexel|   1992-10-28|
+----------+---------+-------------+
only showing top 5 rows



In [31]:
persons_df.select(col("first_name"), col("last_name"), col("date_of_birth")).show(5)

+----------+---------+-------------+
|first_name|last_name|date_of_birth|
+----------+---------+-------------+
|     Drucy|    Poppy|   1991-02-16|
|   Emelyne|    Blaza|   1991-11-02|
|       Max|   Rettie|   1990-03-03|
|    Ilario|     Kean|   1987-06-09|
|     Toddy|   Drexel|   1992-10-28|
+----------+---------+-------------+
only showing top 5 rows



***Note***: There is no noticeable difference between the two outputs, but there is a difference:
- In the first example, one passes the column names as strings. Spark converts these strings into column objects which is concise and works wells for simple column selections. 
- By using the `col()` function, one explicitly creates a column object for each specified column name, which is more flexible when wanting to perform further transformations on the column.

In [43]:
(persons_df.select(concat_ws(" ", col("first_name"), col("last_name")).alias("full_name"),
                   col("salary"),
                   (col("salary") * 0.10 + col("salary")).alias("salary_increase"))).show(10)

+----------------+-------+------------------+
|       full_name| salary|   salary_increase|
+----------------+-------+------------------+
|     Drucy Poppy|1463.36|1609.6959838867188|
|   Emelyne Blaza|3006.04|  3306.64404296875|
|      Max Rettie|1422.88|1565.1680053710938|
|     Ilario Kean|3561.36|3917.4961181640624|
|    Toddy Drexel|4934.87|  5428.35712890625|
| Oswald Petrolli|1153.23| 1268.552978515625|
|   Adrian Clarey|1044.73| 1149.202978515625|
|Dominica Goodnow|1147.76|1262.5360107421875|
|   Emory Slocomb|1082.11|1190.3209838867188|
|   Jeremias Bode|3472.63|  3819.89287109375|
+----------------+-------+------------------+
only showing top 10 rows



In [44]:
(persons_df.select(concat_ws(" ", col("first_name"), col("last_name")).alias("full_name"),
                   col("salary"),
                   expr("salary * 0.10 + salary").alias("salary_increase"))).show(10)

+----------------+-------+------------------+
|       full_name| salary|   salary_increase|
+----------------+-------+------------------+
|     Drucy Poppy|1463.36|1609.6959838867188|
|   Emelyne Blaza|3006.04|  3306.64404296875|
|      Max Rettie|1422.88|1565.1680053710938|
|     Ilario Kean|3561.36|3917.4961181640624|
|    Toddy Drexel|4934.87|  5428.35712890625|
| Oswald Petrolli|1153.23| 1268.552978515625|
|   Adrian Clarey|1044.73| 1149.202978515625|
|Dominica Goodnow|1147.76|1262.5360107421875|
|   Emory Slocomb|1082.11|1190.3209838867188|
|   Jeremias Bode|3472.63|  3819.89287109375|
+----------------+-------+------------------+
only showing top 10 rows



***Note***: The outputs are identical, but differ by applying different functions to each approach.
- With the `concat_ws()` and `alias()` functions, one concatenates two columns into a new feature, and adds another feature based on the calculation of a column, where the added features are given an alias. 
- By using the `expr()`, the code in the second example is much cleaner and easier to apply the transformation. 

In [None]:
(persons_df.select(concat_ws(" ", col("first_name"), col("last_name")).alias("full_name"),
                   col("salary"),
                   round(expr("salary * 0.10 + salary"), 2).alias("salary_increase"))).show(10)

+----------------+-------+---------------+
|       full_name| salary|salary_increase|
+----------------+-------+---------------+
|     Drucy Poppy|1463.36|         1609.7|
|   Emelyne Blaza|3006.04|        3306.64|
|      Max Rettie|1422.88|        1565.17|
|     Ilario Kean|3561.36|         3917.5|
|    Toddy Drexel|4934.87|        5428.36|
| Oswald Petrolli|1153.23|        1268.55|
|   Adrian Clarey|1044.73|         1149.2|
|Dominica Goodnow|1147.76|        1262.54|
|   Emory Slocomb|1082.11|        1190.32|
|   Jeremias Bode|3472.63|        3819.89|
+----------------+-------+---------------+
only showing top 10 rows



***Note***: Applied rounding to the salary_increase column. Not necessary for data engineering purposes, but useful for data analysis

### Filter and Where Condition