In [79]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Basic Transformation - I")
    .master("local[*]")
    .getOrCreate()
)

spark

In [80]:
# Check default Parallelism
spark.sparkContext.defaultParallelism

16

In [81]:
# Read the data
logs_df = spark.read.format("parquet").load("/home/jupyter/raw_data/AWSLogs/713881793976/CloudFront/*")

In [82]:
logs_df.printSchema()

root
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- x_edge_location: string (nullable = true)
 |-- sc_bytes: string (nullable = true)
 |-- c_ip: string (nullable = true)
 |-- cs_method: string (nullable = true)
 |-- cs_Host: string (nullable = true)
 |-- cs_uri_stem: string (nullable = true)
 |-- sc_status: string (nullable = true)
 |-- cs_Referer: string (nullable = true)
 |-- cs_User_Agent: string (nullable = true)
 |-- cs_uri_query: string (nullable = true)
 |-- cs_Cookie: string (nullable = true)
 |-- x_edge_result_type: string (nullable = true)
 |-- x_edge_request_id: string (nullable = true)
 |-- x_host_header: string (nullable = true)
 |-- cs_protocol: string (nullable = true)
 |-- cs_bytes: string (nullable = true)
 |-- time_taken: string (nullable = true)
 |-- x_forwarded_for: string (nullable = true)
 |-- ssl_protocol: string (nullable = true)
 |-- ssl_cipher: string (nullable = true)
 |-- x_edge_response_result_type: string (nullable = true)
 |-

In [83]:
logs_df.count()

58931

In [84]:
# Combine 'Date' and 'Time' into 'Timestamp' and cast as TimestampType
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import col, concat_ws, to_timestamp
logs_df_cleaned = logs_df.withColumn("timestamp", to_timestamp(concat_ws(" ", col("date"), col("time")), "yyyy-MM-dd HH:mm:ss"))

In [85]:
logs_df_cleaned.show(5)

+----------+--------+---------------+--------+-------------+---------+--------------------+--------------------+---------+--------------------+--------------------+------------+---------+------------------+--------------------+--------------------+-----------+--------+----------+---------------+------------+--------------------+---------------------------+-------------------+----------+--------------------+------+------------------+---------------------------+--------------------+--------------+--------------+------------+-------------------+
|      date|    time|x_edge_location|sc_bytes|         c_ip|cs_method|             cs_Host|         cs_uri_stem|sc_status|          cs_Referer|       cs_User_Agent|cs_uri_query|cs_Cookie|x_edge_result_type|   x_edge_request_id|       x_host_header|cs_protocol|cs_bytes|time_taken|x_forwarded_for|ssl_protocol|          ssl_cipher|x_edge_response_result_type|cs_protocol_version|fle_status|fle_encrypted_fields|c_port|time_to_first_byte|x_edge_detailed

In [86]:
logs_df_cleaned.printSchema()

root
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- x_edge_location: string (nullable = true)
 |-- sc_bytes: string (nullable = true)
 |-- c_ip: string (nullable = true)
 |-- cs_method: string (nullable = true)
 |-- cs_Host: string (nullable = true)
 |-- cs_uri_stem: string (nullable = true)
 |-- sc_status: string (nullable = true)
 |-- cs_Referer: string (nullable = true)
 |-- cs_User_Agent: string (nullable = true)
 |-- cs_uri_query: string (nullable = true)
 |-- cs_Cookie: string (nullable = true)
 |-- x_edge_result_type: string (nullable = true)
 |-- x_edge_request_id: string (nullable = true)
 |-- x_host_header: string (nullable = true)
 |-- cs_protocol: string (nullable = true)
 |-- cs_bytes: string (nullable = true)
 |-- time_taken: string (nullable = true)
 |-- x_forwarded_for: string (nullable = true)
 |-- ssl_protocol: string (nullable = true)
 |-- ssl_cipher: string (nullable = true)
 |-- x_edge_response_result_type: string (nullable = true)
 |-

In [87]:
# Cast relevant string columns to appropriate numerical types
# Numerical columns (LongType for counts/sizes, DoubleType for time_taken/time_to_first_byte
from pyspark.sql.types import LongType, DoubleType, IntegerType

logs_df_cleaned = logs_df_cleaned.withColumn("sc_bytes", col("sc_bytes").cast(LongType())) \
                               .withColumn("cs_bytes", col("cs_bytes").cast(LongType())) \
                               .withColumn("time_taken", col("time_taken").cast(DoubleType())) \
                               .withColumn("c_port", col("c_port").cast(IntegerType())) \
                               .withColumn("time_to_first_byte", col("time_to_first_byte").cast(DoubleType())) \
                               .withColumn("sc_content_len", col("sc_content_len").cast(LongType()))



In [88]:
# Handle 'sc_range_start' and 'sc_range_end' which can be '-'
# Convert '-' to NULL before casting to LongType
from pyspark.sql.functions import expr
logs_df_cleaned = logs_df_cleaned.withColumn("sc_range_start", expr("CASE WHEN sc_range_start = '-' THEN NULL ELSE sc_range_start END").cast(LongType())) \
                                 .withColumn("sc_range_end", expr("CASE WHEN sc_range_end = '_' THEN NULL ELSE sc_range_end END").cast(LongType()))


In [89]:
# Handle potential nulls in the new timestamp column (from parsing errors)
initial_row_count = logs_df_cleaned.count()
logs_df_cleaned = logs_df_cleaned.na.drop(subset=["timestamp"])
if logs_df_cleaned.count() < initial_row_count:
    print(f"Number of rows dropped: {initial_row_count - logs_df_cleaned}.")

In [90]:
# Extract partitioning columns (year, month, day, hour) from the new 'timestamp' column
from pyspark.sql.functions import year, month, dayofmonth, hour
logs_df_cleaned = logs_df_cleaned.withColumn("year", year(col("timestamp"))) \
                                    .withColumn("month", month(col("timestamp"))) \
                                    .withColumn("day", dayofmonth(col("timestamp"))) \
                                    .withColumn("hour", hour(col("timestamp")))

In [91]:
# Drop 'date' and 'time'
logs_df_cleaned = logs_df_cleaned.drop("date", "time")

In [92]:
logs_df_cleaned.printSchema()

root
 |-- x_edge_location: string (nullable = true)
 |-- sc_bytes: long (nullable = true)
 |-- c_ip: string (nullable = true)
 |-- cs_method: string (nullable = true)
 |-- cs_Host: string (nullable = true)
 |-- cs_uri_stem: string (nullable = true)
 |-- sc_status: string (nullable = true)
 |-- cs_Referer: string (nullable = true)
 |-- cs_User_Agent: string (nullable = true)
 |-- cs_uri_query: string (nullable = true)
 |-- cs_Cookie: string (nullable = true)
 |-- x_edge_result_type: string (nullable = true)
 |-- x_edge_request_id: string (nullable = true)
 |-- x_host_header: string (nullable = true)
 |-- cs_protocol: string (nullable = true)
 |-- cs_bytes: long (nullable = true)
 |-- time_taken: double (nullable = true)
 |-- x_forwarded_for: string (nullable = true)
 |-- ssl_protocol: string (nullable = true)
 |-- ssl_cipher: string (nullable = true)
 |-- x_edge_response_result_type: string (nullable = true)
 |-- cs_protocol_version: string (nullable = true)
 |-- fle_status: string (nul

In [93]:
# Save data partitionned by year, month, day, hour
logs_df_cleaned.write.mode("overwrite").partitionBy("year", "month", "day", "hour").parquet("/home/jupyter/clean_data/AWSLogs/")