## Import all the libraries needed for this notebook
PySpark is the Python API for Apache Spark. It enables you to perform real-time, large-scale data processing in a distributed environment using Python. It also provides a PySpark shell for interactively analyzing your data.

In [8]:
import os  # Miscellaneous operating system interfaces 
import sys  # System-specific parameters and functions (https://docs.python.org/3/library/sys.html)
os.environ["JAVA_HOME"] = r"C:\Program Files\Eclipse Adoptium\jdk-17.0.18.8-hotspot"  # Access Java OpenJDK installation path
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# Import SparkSession from pyspark.sql module to create a Spark session
from pyspark.sql import SparkSession  # The entry point to programming Spark with the Dataset and DataFrame API
from pyspark.sql.functions import col, month,  to_date, to_timestamp, regexp_replace  # Import the col function to reference DataFrame columns in expressions

# Import the necessary types as classes
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType

## Data manipulation with DataFrames
Handling missing data
- Use **.na.drop()** to remove rows with null values
- Use **.na.fill({"column": value})** to replace nulls with a specific value

In [14]:
# Create a Spark session
spark = SparkSession.builder.appName("MySparkAPP").getOrCreate()
print("Success") # Print a success message to indicate that the Spark session has been created successfully

# Create a DataFrame from a local CSV file with header and inferSchema options enabled
transportation_df = spark.read.csv("data/Monthly_Transportation_Statistics.csv", header=True, inferSchema=True)

# Replace dots with underscores for all column names   
for col_name in transportation_df.columns:
    if '.' in col_name:
        new_name = col_name.replace('.', '_')
        transportation_df = transportation_df.withColumnRenamed(col_name, new_name)

# Drop rows with any nulls
transportation_df_cleaned = transportation_df.na.drop()
transportation_df_cleaned.show()  # Show the contents of the transportation DataFrame

# Alternatively, we can filter out rows where a specific column has null values
transportation_df_cleaned = transportation_df.where(col("Air Safety - General Aviation Fatalities").isNotNull())
transportation_df_cleaned.show(5)

transportation_filled = transportation_df_cleaned.na.fill({"Highway Fatalities": 0})  # Fill null values with 0
transportation_filled.show(5)  # Show the contents of the transportation DataFrame after filling null values


Success
+-----+----+----------------------------------------+---------------------------------------------------------+------------------+--------------------------------------------------+----------------------------------------------------------+-----------------------------------------------------+--------------------------------------------------+----------------------------------------------+-----------------------------------------+-----------------------------+---------------------+--------------------------------------------+--------------------------------------------+--------------------------------------------+-----------------------------------------------------+-------------------------------------------------+-------------------------------------------------------------------+------------------------------------------------------------+-------------------------------------------------------------------------------+----------------------------------------------------------

## Column operations:
- Use .withColumn() to add a new column based on calculations or exisisting columns
- Use withColumnRenamed() to rename columns
- Use drop() to remove unnecessary columns

In [15]:
# Add a new column "Highway Fatalities Plus 5" which is the value of "Highway Fatalities" plus 5
transportation_df_cleaned = transportation_df_cleaned.withColumn(
    "Date_Cleaned", 
    regexp_replace(col("Date"), " 12:00:00 AM", "")
)

# Now convert to date (format: MM/dd/yyyy)
transportation_df_cleaned = transportation_df_cleaned.withColumn(
    "Date_Converted", 
    to_date(col("Date_Cleaned"), "MM/dd/yyyy")
)

# Extract month
transportation_df_cleaned = transportation_df_cleaned.withColumn(
    "Month", 
    month(col("Date_Converted"))
)

transportation_df_cleaned.show(5)  # Show the contents of the transportation DataFrame after adding a new column

transportation_df_cleaned = transportation_df_cleaned.withColumnRenamed("Date", "Timestamp")  # Rename the "Date" column to "Month"
transportation_df_cleaned.show(5)  # Show the contents of the transportation DataFrame after renaming a column

transportation_df_cleaned = transportation_df_cleaned.drop("Index") 
transportation_df_cleaned.show(5)  # Show the contents of the transportation DataFrame after dropping a column

+-----+--------------------+----------------------------------------+---------------------------------------------------------+------------------+--------------------------------------------------+----------------------------------------------------------+-----------------------------------------------------+--------------------------------------------------+----------------------------------------------+-----------------------------------------+-----------------------------+---------------------+--------------------------------------------+--------------------------------------------+--------------------------------------------+-----------------------------------------------------+-------------------------------------------------+-------------------------------------------------------------------+------------------------------------------------------------+-------------------------------------------------------------------------------+--------------------------------------------------

## Row operations
- Use **.filter()** to select rows based on specific conditions
- Use **.groupBy** and aggregate functions (e.g., **.sum()**, **.avg()**) to summarize data

In [None]:
# Filter rows where "Highway Fatalities" is greater than 1000
filtered_transportation_df_cleaned = transportation_df_cleaned.where(transportation_df_cleaned["Highway Fatalities"] > 1000)  # Filter rows where "Highway Fatalities" is greater than 1000
filtered_transportation_df_cleaned.show(5)  # Show the contents of the transportation DataFrame after filtering rows

# Sort the DataFrame by "Highway Fatalities" in descending order
grouped_transportation_df_cleaned = transportation_df_cleaned.groupBy("Month").avg("Highway Fatalities")  # Group by "Month" and calculate the sum of "Highway Fatalities"
grouped_transportation_df_cleaned.sort("Month", ascending=True).show()  # Show the contents of the transportation DataFrame after grouping and aggregation

+--------------------+----------------------------------------+---------------------------------------------------------+------------------+--------------------------------------------------+----------------------------------------------------------+-----------------------------------------------------+--------------------------------------------------+----------------------------------------------+-----------------------------------------+-----------------------------+---------------------+--------------------------------------------+--------------------------------------------+--------------------------------------------+-----------------------------------------------------+-------------------------------------------------+-------------------------------------------------------------------+------------------------------------------------------------+-------------------------------------------------------------------------------+--------------------------------------------------------

## Advanced DataFrame operations
Joins in PySpark
- Combine rows from two or more DataFrames based on common columns
- Types of joins: inner, left, right, and outer, like SQL
- Syntax: DataFrame1.join(DataFrame2, on="column", how="join_type")
