# Data Analysis Example Using Local PySpark
**NOTE:** Install and configure pyspark locally by following these Medium articles:
* https://medium.com/@agusmahari/pyspark-step-by-step-guide-to-installing-pyspark-on-linux-bb8af96ea5e8
* https://medium.com/attest-product-and-technology/how-to-install-pyspark-locally-connecting-to-aws-s3-redshift-55488e87d4cd

**NOTE:** It's very important that the `hadoop-aws` jars have matching versions. Otherwise, you will get `ClassNotFound` errors.

See: https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws/3.3.1

Example:
```log
/opt/spark/jars/hadoop-aws-3.3.1.jar
/opt/spark/jars/hadoop-client-runtime-3.3.1.jar
/opt/spark/jars/hadoop-yarn-server-web-proxy-3.3.1.jar
/opt/spark/jars/hadoop-client-api-3.3.1.jar
```

**NOTE:** Ensure spark version matches pyspark version.

Example:
```log
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.2.0
      /_/
```
```shell
python -m pip show pyspark
# Name: pyspark
# Version: 3.2.0
# Summary: Apache Spark Python API
# Home-page: https://github.com/apache/spark/tree/master/python
# Author: Spark Developers
# Author-email: dev@spark.apache.org
# License: http://www.apache.org/licenses/LICENSE-2.0
# Location: /home/ballen/miniconda3/envs/pyspark/lib/python3.10/site-packages
# Requires: py4j
```

**NOTE:** Ensure python version is compatible with spark and pyspark versions
```shell
conda create -n pyspark python==3.10.9
activate pyspark
python -m pip install pyspark==3.2.0
```

## Setup

### Imports

In [1]:
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

### Configurations

In [2]:
spark = SparkSession \
    .builder \
    .appName("Data Analysis Local Pyspark Example") \
    .config("spark.pyspark.python", "python") \
    .getOrCreate()

24/11/17 19:09:27 WARN Utils: Your hostname, Brett-Surface resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/11/17 19:09:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/17 19:09:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
spark.conf.set("spark.sql.parquet.enableVectorizedReader","false")

## Load the Data
Loading 1 hour of track points data sampled from full dataset

In [4]:
# Load 1 hour of data
# .option("mode", "DROPMALFORMED") # Default PERMISSIVE
df = spark.read.parquet("s3a://endurasoft-dev-risk-framework/opensky-network/track-points/year=2024/month=10/day=5/hour=10/*.parquet")

24/11/17 19:09:39 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

In [5]:
df.persist()

DataFrame[time: timestamp, icao24: string, lat: double, lon: double, velocity: double, heading: double, vertrate: double, callsign: string, onground: boolean, alert: boolean, spi: boolean, squawk: string, baroaltitude: double, geoaltitude: double, lastposupdate: double, lastcontact: double, serials: array<bigint>]

## Analyze the Data

In [6]:
df.printSchema()

root
 |-- time: timestamp (nullable = true)
 |-- icao24: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- velocity: double (nullable = true)
 |-- heading: double (nullable = true)
 |-- vertrate: double (nullable = true)
 |-- callsign: string (nullable = true)
 |-- onground: boolean (nullable = true)
 |-- alert: boolean (nullable = true)
 |-- spi: boolean (nullable = true)
 |-- squawk: string (nullable = true)
 |-- baroaltitude: double (nullable = true)
 |-- geoaltitude: double (nullable = true)
 |-- lastposupdate: double (nullable = true)
 |-- lastcontact: double (nullable = true)
 |-- serials: array (nullable = true)
 |    |-- element: long (containsNull = true)



In [7]:
df.count()

                                                                                

166928

We can see that one hour of track point data contains `166,928` track points.

In [8]:
df.show(n=1, vertical=True)

-RECORD 0-----------------------------
 time          | 2024-10-05 06:00:01  
 icao24        | ab5afe               
 lat           | 41.29389953613281    
 lon           | -73.83734130859375   
 velocity      | 206.49976965976984   
 heading       | 246.8201519716215    
 vertrate      | 0.0                  
 callsign      | SWA3124              
 onground      | false                
 alert         | false                
 spi           | false                
 squawk        | 3447                 
 baroaltitude  | 12192.0              
 geoaltitude   | 12451.08             
 lastposupdate | 1.728122400721E9     
 lastcontact   | 1.728122400998E9     
 serials       | [-1408230990, -14... 
only showing top 1 row



### Inspect Missing Values

In [9]:
# Get missing values
missing_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns])

In [10]:
missing_counts.show(vertical=True)

-RECORD 0--------------
 time          | 0     
 icao24        | 0     
 lat           | 1641  
 lon           | 1641  
 velocity      | 9254  
 heading       | 9254  
 vertrate      | 9254  
 callsign      | 1338  
 onground      | 0     
 alert         | 0     
 spi           | 0     
 squawk        | 64280 
 baroaltitude  | 20135 
 geoaltitude   | 20312 
 lastposupdate | 1641  
 lastcontact   | 0     
 serials       | 0     



In [11]:
# Create function to show more information on missing values
def get_missing_values(df: DataFrame) -> DataFrame:
    """
    Create a dataframe to represent the missing values in the original dataframe.

    Args:
        df (DataFrame): Original dataframe to identify missing values.

    Returns:
        DataFrame: New dataframe representing a report of missing values in original dataframe.
    """
    total_records = df.count()
    
    # For each column, calculate the total missing, available, ratio, and percentage of missing values
    missing_stats = []
    
    for column in df.columns:
        total_missing = df.select(F.count(F.when(F.col(column).isNull(), column)).alias("total_missing")).collect()[0][0]
        total_available = total_records - total_missing
        ratio_missing = total_missing / total_records
        percent_missing = round(ratio_missing * 100, 1)
        
        missing_stats.append({
            'column': column,
            'total_missing': total_missing,
            'total_available': total_available,
            'ratio_missing': round(ratio_missing, 4),
            'percent_missing': f'{percent_missing}%'
        })
    
    # Create a DataFrame from the list of dictionaries
    result_df = df.sql_ctx.createDataFrame(missing_stats)
    
    # Ensure correct column order
    return result_df.select(['column', 'total_missing', 'total_available', 'ratio_missing', 'percent_missing'])

In [12]:
missing_values_report = get_missing_values(df)

In [13]:
missing_values_report.show()

+-------------+-------------+---------------+-------------+---------------+
|       column|total_missing|total_available|ratio_missing|percent_missing|
+-------------+-------------+---------------+-------------+---------------+
|         time|            0|         166928|          0.0|           0.0%|
|       icao24|            0|         166928|          0.0|           0.0%|
|          lat|         1641|         165287|       0.0098|           1.0%|
|          lon|         1641|         165287|       0.0098|           1.0%|
|     velocity|         9254|         157674|       0.0554|           5.5%|
|      heading|         9254|         157674|       0.0554|           5.5%|
|     vertrate|         9254|         157674|       0.0554|           5.5%|
|     callsign|         1338|         165590|        0.008|           0.8%|
|     onground|            0|         166928|          0.0|           0.0%|
|        alert|            0|         166928|          0.0|           0.0%|
|          s

The core columns that will be required for analyzing risk have a low amount of missing values which is good.
* time - 0.0%
* icao24 - 0.0%
* lat - 1.0%
* lon - 1.0%
* baroaltitude - 12.1%
* geoaltitude - 12.2%

Other useful columns such as `velocity`, `heading`, `vertrate`, and `onground` have high availability as well.

Will need to impute values for lat, lon, and altitude fields. Either using the previous non-null value or using a nearest neighbors algorithm (e.g., KNN).

## Save Analysis Results

In [15]:
outpath = 's3a://endurasoft-dev-risk-framework/analysis/data_analysis_pyspark_example/missing_values_local_analysis_202410510/'

In [None]:
# Save missing values analysis dataframe to s3
missing_values_report.coalesce(1)\
                     .write\
                     .mode('overwrite')\
                     .option("header", True)\
                     .csv(outpath)

24/10/27 13:27:52 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
24/10/27 13:27:53 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
                                                                                

In [17]:
!aws s3 ls {outpath.replace('s3a', 's3')} --profile endurasoft-dev

2024-10-27 13:27:57          0 _SUCCESS
2024-10-27 13:27:56        578 part-00000-babdfe83-5f8e-4d60-b0b6-363187bdc0d3-c000.csv
