<a href="https://colab.research.google.com/github/Rohith-Dasari/Datathon/blob/main/etl.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# AWS Glue Studio Notebook for BigDataAnalysis(ETL) of BobbleAI logs

#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Basic AWS Glue setup with PySpark API for Apache Spark on G.2X 10DPUs distributed cluster


In [None]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.2X
%number_of_workers 10

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

You are already connected to a glueetl session 55738361-23c6-4592-a283-283c4fa54455.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is 2880 minutes.
idle_timeout has been set to 2880 minutes.


You are already connected to a glueetl session 55738361-23c6-4592-a283-283c4fa54455.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 4.0


You are already connected to a glueetl session 55738361-23c6-4592-a283-283c4fa54455.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: G.2X
Setting new worker type to: G.2X


You are already connected to a glueetl session 55738361-23c6-4592-a283-283c4fa54455.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: 10
Setting new number of workers to: 10



#### ETL Glue Job

In [None]:
job.init("BobbleAI-ETL")




#### Linking S3 bucket objects

In [None]:
input_path = "s3://test-buckets165/raw-data/"
output_path = "s3://test-buckets165/processed-data/"




#### Preprocessing

In [None]:
df = spark.read.json(input_path)
df.printSchema()

root
 |-- advertisingId: string (nullable = true)
 |-- app_id: string (nullable = true)
 |-- app_version: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- action: string (nullable = true)
 |    |-- action_taken: string (nullable = true)
 |    |-- action_title: string (nullable = true)
 |    |-- ad_category: string (nullable = true)
 |    |-- ad_id: string (nullable = true)
 |    |-- ad_list_size: long (nullable = true)
 |    |-- ad_package_name: string (nullable = true)
 |    |-- ad_partner: string (nullable = true)
 |    |-- ad_type: string (nullable = true)
 |    |-- ads_size: long (nullable = true)
 |    |-- animated_emoji_id: string (nullable = true)
 |    |-- api_request_identifier: string (nullable = true)
 |    |-- audio: long (nullable = true)
 |    |-- auto_corrected: long (nullable = true)
 |    |-- auto_corrected_undo: long (nullable = true)
 |    |-- autocorrect_char_saved: long (nullable = true)
 |    |-- avg_

In [None]:
df.describe().show()

+-------+--------------------+-----------------+--------------------+-------------------+--------------------+--------------------+----------+------------+--------------------+---------+------------------+-------------------+
|summary|       advertisingId|           app_id|         app_version|         created_at|           device_id|        event_action|event_type|          ip|          request_id|screen_at|       sdk_version|  server_created_at|
+-------+--------------------+-----------------+--------------------+-------------------+--------------------+--------------------+----------+------------+--------------------+---------+------------------+-------------------+
|  count|              741850|           741850|              741850|             741850|              741850|              741850|    741850|      741850|              741850|   741850|            741850|             741850|
|   mean|                null|             null|1.3558152861361057E8|               null|       

In [None]:
df.columns

['advertisingId', 'app_id', 'app_version', 'created_at', 'data', 'device_id', 'event_action', 'event_type', 'ip', 'request_id', 'screen_at', 'sdk_version', 'server_created_at']


In [None]:
print(f"The number of columns in the dataframe: {len(df.columns)}")

The number of columns in the dataframe: 13


In [None]:
print(f"The number of rows in the dataframe: {df.count()}")

The number of rows in the dataframe: 2961704


In [None]:
df.show(1)

+--------------------+-----------------+-----------+-------------------+--------------------+--------------------+--------------+----------+--------------------+--------------------+---------+-----------+-------------------+
|       advertisingId|           app_id|app_version|         created_at|                data|           device_id|  event_action|event_type|                  ip|          request_id|screen_at|sdk_version|  server_created_at|
+--------------------+-----------------+-----------+-------------------+--------------------+--------------------+--------------+----------+--------------------+--------------------+---------+-----------+-------------------+
|bc3f88ee-2006-466...|com.mint.keyboard|  112000202|2024-06-30 23:29:39|{null, null, null...|b75b85db-4710-431...|typing_summary|   feature|2001:448a:6040:72...|84d5eafb-e2ef-44c...|  kb_home|         11|2024-07-01 00:00:11|
+--------------------+-----------------+-----------+-------------------+--------------------+-------

#### Transformation

In [None]:
import pyspark.sql.functions as F

df = df.drop("request_id")

df = df.withColumn("event_timestamp", F.unix_timestamp("created_at", "yyyy-MM-dd HH:mm:ss"))

df.select("event_timestamp").show(5)

+---------------+
|event_timestamp|
+---------------+
|     1719790179|
|     1719790190|
|     1719790208|
|     1719765059|
|     1719765059|
+---------------+
only showing top 5 rows


In [None]:
df_filtered = df.filter((df["event_timestamp"] >= 1719340200) & (df["event_timestamp"] <= 1719858599))
df_filtered = df_filtered.withColumn("event_date", F.to_date("created_at", "yyyy-MM-dd HH:mm:ss"))
df_filtered = df_filtered.drop("created_at")
df_filtered.count()

2935443


In [None]:
df_filtered = df_filtered.withColumn("process_date", F.to_date("server_created_at", "yyyy-MM-dd HH:mm:ss"))
df_filtered = df_filtered.withColumn("event_name", F.col("event_action").cast("string"))

df_final = df_filtered.drop("server_created_at")
df_final = df_final.drop("event_action")
df_final.columns

['advertisingId', 'app_id', 'app_version', 'data', 'device_id', 'event_type', 'ip', 'screen_at', 'sdk_version', 'event_timestamp', 'event_date', 'process_date', 'event_name']


In [None]:
df_final.show(1)

+--------------------+-----------------+-----------+--------------------+--------------------+----------+--------------------+---------+-----------+---------------+----------+------------+--------------+
|       advertisingId|           app_id|app_version|                data|           device_id|event_type|                  ip|screen_at|sdk_version|event_timestamp|event_date|process_date|    event_name|
+--------------------+-----------------+-----------+--------------------+--------------------+----------+--------------------+---------+-----------+---------------+----------+------------+--------------+
|bc3f88ee-2006-466...|com.mint.keyboard|  112000202|{null, null, null...|b75b85db-4710-431...|   feature|2001:448a:6040:72...|  kb_home|         11|     1719790179|2024-06-30|  2024-07-01|typing_summary|
+--------------------+-----------------+-----------+--------------------+--------------------+----------+--------------------+---------+-----------+---------------+----------+---------

In [None]:
len(df_final.columns)

13


In [None]:
df_expanded = df_final.select(
    F.col('device_id'),
    F.col('advertisingId'),
    F.col('app_id'),
    F.col('app_version'),
    F.col('sdk_version'),
    F.col('ip'),
    F.col('event_timestamp'),
    F.col('event_name'),
    F.col('process_date'),
    F.col('event_date'),
    F.col('event_type'),
    F.col('screen_at'),
    F.col('data').alias('data_event_params')
)

data_fields = df_expanded.select("data_event_params.*").columns
data_fields

['action', 'action_taken', 'action_title', 'ad_category', 'ad_id', 'ad_list_size', 'ad_package_name', 'ad_partner', 'ad_type', 'ads_size', 'animated_emoji_id', 'api_request_identifier', 'audio', 'auto_corrected', 'auto_corrected_undo', 'autocorrect_char_saved', 'avg_key_stroke_time', 'background_story_id', 'banner_url', 'bigmoji_viewed', 'bigmojis', 'brand_campaign_id', 'cache_age', 'card_type', 'category', 'category_id', 'category_name', 'character_count', 'content_category_id', 'content_count', 'content_id', 'content_type', 'counter', 'customized_quick_reply', 'deeplink_id', 'default_font', 'deleted_count', 'dict', 'dictionary', 'dictionary_used', 'dictionary_version', 'direct_share', 'dismiss_reason', 'domain_name', 'emoji', 'emoji_id', 'error', 'error_message', 'error_type', 'flow', 'font', 'font_id', 'font_viewed', 'from', 'from_icon', 'gif_id', 'gif_viewed', 'gifs', 'grant', 'head_age', 'head_age_range', 'head_creation_unique_identifier', 'head_creation_user_time_taken', 'head_ge

In [None]:
for field in data_fields:
    df_expanded = df_expanded.withColumn(f"data_event_params_exploded_{field}", F.col(f"data_event_params.{field}"))

df_expanded = df_expanded.drop('data_event_params')
df_expanded.columns

['device_id', 'advertisingId', 'app_id', 'app_version', 'sdk_version', 'ip', 'event_timestamp', 'event_name', 'process_date', 'event_date', 'event_type', 'screen_at', 'data_event_params_exploded_action', 'data_event_params_exploded_action_taken', 'data_event_params_exploded_action_title', 'data_event_params_exploded_ad_category', 'data_event_params_exploded_ad_id', 'data_event_params_exploded_ad_list_size', 'data_event_params_exploded_ad_package_name', 'data_event_params_exploded_ad_partner', 'data_event_params_exploded_ad_type', 'data_event_params_exploded_ads_size', 'data_event_params_exploded_animated_emoji_id', 'data_event_params_exploded_api_request_identifier', 'data_event_params_exploded_audio', 'data_event_params_exploded_auto_corrected', 'data_event_params_exploded_auto_corrected_undo', 'data_event_params_exploded_autocorrect_char_saved', 'data_event_params_exploded_avg_key_stroke_time', 'data_event_params_exploded_background_story_id', 'data_event_params_exploded_banner_url',

In [None]:
len(df_expanded.columns)

247


#### Load

In [None]:
try:
    df_expanded.coalesce(1).write.mode("overwrite").parquet(output_path)
    print(f"Data successfully written to {output_path}")
except Exception as e:
    print(f"Error writing data to S3: {e}")

Data successfully written to s3://test-buckets165/processed-data/
