# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


####  Run this cell to set up and start your interactive session.


In [28]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

You are already connected to a glueetl session dda01b26-dea3-477d-8e94-24b339ebb7e9.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is 2880 minutes.
idle_timeout has been set to 2880 minutes.


You are already connected to a glueetl session dda01b26-dea3-477d-8e94-24b339ebb7e9.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 4.0


You are already connected to a glueetl session dda01b26-dea3-477d-8e94-24b339ebb7e9.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: G.1X
Setting new worker type to: G.1X


You are already connected to a glueetl session dda01b26-dea3-477d-8e94-24b339ebb7e9.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: 5
Setting new number of workers to: 5



In [41]:
import sys
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from pyspark.sql.functions import col




In [42]:
# Initialize Spark session
spark = SparkSession.builder.appName("SchemaValidation").getOrCreate()




In [43]:
# Define expected schema for capital_rideshare_combined.csv
expected_schema_rideshare = StructType([
    StructField("ride_id", StringType(), True),
    StructField("rideable_type", StringType(), True),
    StructField("started_at", TimestampType(), True),
    StructField("ended_at", TimestampType(), True),
    StructField("start_station_name", StringType(), True),
    StructField("start_station_id", IntegerType(), True),
    StructField("end_station_name", StringType(), True),
    StructField("end_station_id", IntegerType(), True),
    StructField("start_lat", DoubleType(), True),
    StructField("start_lng", DoubleType(), True),
    StructField("end_lat", DoubleType(), True),
    StructField("end_lng", DoubleType(), True),
    StructField("member_casual", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("month", IntegerType(), True)
])




In [44]:
# Load data from S3
s3_path_rideshare = "s3://capitalbikesharedata/bikeshare/capital_rideshare_combined.csv"
data_rideshare = spark.read.csv(s3_path_rideshare, header=True, inferSchema=True)

# Log the actual schema
print("Actual schema for capital_rideshare_combined.csv:")
print(data_rideshare.schema)

# Function to validate schema field by field
def validate_schema(expected_schema, actual_schema):
    failed_fields = []
    
    for expected, actual in zip(expected_schema, actual_schema):
        if expected.name != actual.name:
            failed_fields.append(f"Field name mismatch: Expected '{expected.name}', Found '{actual.name}'")
        elif type(expected.dataType) != type(actual.dataType):
            failed_fields.append(f"Field type mismatch: Field '{expected.name}' - Expected '{expected.dataType}', Found '{actual.dataType}'")
    
    if failed_fields:
        print("Schema validation failed for capital_rideshare_combined.csv")
        for error in failed_fields:
            print(error)
    else:
        print("Schema validation successful for capital_rideshare_combined.csv")

# Validate schema
validate_schema(expected_schema_rideshare, data_rideshare.schema)

Actual schema for capital_rideshare_combined.csv:
StructType([StructField('ride_id', StringType(), True), StructField('rideable_type', StringType(), True), StructField('started_at', StringType(), True), StructField('ended_at', StringType(), True), StructField('start_station_name', StringType(), True), StructField('start_station_id', IntegerType(), True), StructField('end_station_name', StringType(), True), StructField('end_station_id', IntegerType(), True), StructField('start_lat', DoubleType(), True), StructField('start_lng', DoubleType(), True), StructField('end_lat', DoubleType(), True), StructField('end_lng', DoubleType(), True), StructField('member_casual', StringType(), True), StructField('year', IntegerType(), True), StructField('month', IntegerType(), True)])
Schema validation failed for capital_rideshare_combined.csv
Field type mismatch: Field 'started_at' - Expected 'TimestampType()', Found 'StringType()'
Field type mismatch: Field 'ended_at' - Expected 'TimestampType()', Fou

In [45]:
# Define expected schema for placemarks_data.csv
expected_schema_placemarks = StructType([
    StructField("id", StringType(), True),
    StructField("networkName", StringType(), True),
    StructField("networkId", IntegerType(), True),
    StructField("color", StringType(), True),
    StructField("textColor", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("type", StringType(), True),
    StructField("location", StringType(), True)
])




In [46]:
# Load data from S3
s3_path_placemarks = "s3://capitalbikesharedata/transit_placemarkers/placemarks_data.csv"
data_placemarks = spark.read.csv(s3_path_placemarks, header=True, inferSchema=True)





In [47]:
# Log the actual schema
print("Actual schema for placemarks_data.csv:")
print(data_placemarks.schema)

# Function to validate schema field by field
def validate_schema(expected_schema, actual_schema):
    failed_fields = []
    
    for expected, actual in zip(expected_schema, actual_schema):
        if expected.name != actual.name:
            failed_fields.append(f"Field name mismatch: Expected '{expected.name}', Found '{actual.name}'")
        elif type(expected.dataType) != type(actual.dataType):
            failed_fields.append(f"Field type mismatch: Field '{expected.name}' - Expected '{expected.dataType}', Found '{actual.dataType}'")
    
    if failed_fields:
        print("Schema validation failed for placemarks_data.csv")
        for error in failed_fields:
            print(error)
    else:
        print("Schema validation successful for placemarks_data.csv")

# Validate schema
validate_schema(expected_schema_placemarks, data_placemarks.schema)

Actual schema for placemarks_data.csv:
StructType([StructField('id', StringType(), True), StructField('networkName', StringType(), True), StructField('networkId', IntegerType(), True), StructField('color', StringType(), True), StructField('textColor', StringType(), True), StructField('latitude', DoubleType(), True), StructField('longitude', DoubleType(), True), StructField('type', StringType(), True), StructField('location', StringType(), True)])
Schema validation successful for placemarks_data.csv
