# Data Analysis Example Using PySpark in AWS SageMaker
```
   SageMaker Image: SparkAnalytics 2.0
  SageMaker Kernel: Glue Python [PySpark and Ray]
SageMaker Instance: ml.g4dn.xlarge (4 vCPU + 1 GPU + 16 GiB @ $0.50/hr) for storage purposes (HDFS)
```

In [None]:
%iam_role arn:aws:iam::867344433302:role/endurasoft-GlueJobServiceRole
%profile default
%etl
%number_of_workers 2
%worker_type G.2X
%glue_version 3.0

In [None]:
from pyspark.sql import SparkSession
from pyspark import StorageLevel
import pandas as pd
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import LongType
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import pyspark.sql.types as T
import pyspark.sql.functions as F

In [None]:
%status

In [None]:
# Initialize spark context and glue context to create glue job for analysis
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

In [None]:
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
spark.conf.set("spark.sql.parquet.enableVectorizedReader","false")

In [None]:
# Load 1 hour of data
# .option("mode", "DROPMALFORMED") # Default PERMISSIVE
df = spark.read.parquet("s3://endurasoft-dev-risk-framework/opensky-network/track-points/year=2024/month=10/day=5/hour=10/*.parquet")

In [None]:
df.persist()

In [None]:
df.printSchema()

In [None]:
df.count()

In [None]:
df.show(n=1, vertical=True)

In [None]:
# Drop records where weight on wheels is null
df = df.filter(df.airframe_hasWeightOnWheels.isNotNull())

In [None]:
len(df.columns)

In [None]:
# Convert column names to lowercase
df = df.toDF(*[c.lower() for c in df.columns])

In [None]:
# TODO Create aggregated analysis dataframe and write results to s3
# analysis_df = ...
# outpath = 's3://endurasoft-dev-risk-framework/analysis/data_analysis_pyspark_example/summary/'
# analysis_df.coalesce(1).write.parquet(outpath)

In [None]:
%status

In [None]:
%stop_session