# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


In [14]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

You are already connected to a glueetl session b2b6a65c-3ada-4cfa-ba92-1e1ddbbb85fa.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is 2880 minutes.
idle_timeout has been set to 2880 minutes.


You are already connected to a glueetl session b2b6a65c-3ada-4cfa-ba92-1e1ddbbb85fa.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 5.0


You are already connected to a glueetl session b2b6a65c-3ada-4cfa-ba92-1e1ddbbb85fa.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: G.1X
Setting new worker type to: G.1X


You are already connected to a glueetl session b2b6a65c-3ada-4cfa-ba92-1e1ddbbb85fa.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: 5
Setting new number of workers to: 5



In [15]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from awsglue.dynamicframe import DynamicFrame
from datetime import datetime
from pyspark.sql.functions import col
import boto3




In [16]:
dyf = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options = {
        "path":["s3://scd-etl-bucket/raw_data/"]
    },
    format = "json"    
)




In [17]:
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("street_name", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("zip_code", StringType(), True),
    StructField("country", StringType(), True),
    StructField("job", StringType(), True),
    StructField("email", StringType(), True),
    StructField("phone_number", StringType(), True),
])




In [18]:
df = spark.read.schema(schema).json("s3://scd-etl-bucket/raw_data/")




In [19]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- street_name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- job: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone_number: string (nullable = true)


In [20]:
customer_df = df.select(
    col('id'),
    col('first_name'),
    col('last_name'),
    col('street_name'),
    col('city'),
    col('state'),
    col('zip_code'),
    col('country'),
    col('job'),
    col('email'),
    col('phone_number')
)

customer_df.show()

+----+-----------+---------+-------------------+--------------------+-------------+--------+--------------------+--------------------+--------------------+--------------------+
|  id| first_name|last_name|        street_name|                city|        state|zip_code|             country|                 job|               email|        phone_number|
+----+-----------+---------+-------------------+--------------------+-------------+--------+--------------------+--------------------+--------------------+--------------------+
|3627|   Caroline|Cervantes|   Samantha Highway|            Carrport|     New York|   27705|             Liberia|      Water engineer| uwalker@example.net|  976.325.0884x14984|
| 786|Christopher|   Garcia|   Stephen Junction|  East Christinefurt|     Maryland|   74362|   Equatorial Guinea|       Buyer, retail|hudsonangela@exam...|  598.366.6645x31009|
|3040|      Shane|    Curry|      Brooks Fields|  North Benjaminstad|      Wyoming|   10431|             Somalia|Ac

In [21]:
customer_df=customer_df.drop_duplicates(['id'])




In [22]:
customer_df.count()

3135


In [23]:
dyf = DynamicFrame.fromDF(customer_df, glueContext, "dyf")
folder_datetime = datetime.now().strftime("%m-%d-%Y_%H:%M:%S")
glueContext.write_dynamic_frame.from_options(
    frame = dyf,
    connection_type = "s3",
    connection_options = {
        "path":f"s3://scd-etl-bucket/transformed_data/customer_{folder_datetime}"
    },
    format = "csv"
)

<awsglue.dynamicframe.DynamicFrame object at 0x7f0ae0db9590>


In [26]:
#archiving the source data after transformation
bucket = 'scd-etl-bucket'
source_folder = 'raw_data/'
archive_folder = 'archive_raw_data/'

s3_client = boto3.client('s3')
object_list = s3_client.list_objects(Bucket=bucket).get('Contents')

for obj in object_list:
    obj_name = obj['Key']
    
    if obj_name.startswith(source_folder):
        
        # to get the filename
        file_name = obj_name.split('/')[-1]
        
        #filepath for the destination
        destination_path = archive_folder + file_name
        
        copy_source = {
            'Bucket':bucket,
            'Key':obj_name 
        }
        
        #copy the file to archive
        s3_client.copy_object(
            Bucket = bucket,
            CopySource = copy_source,
            Key = destination_path
        )
        
        #delete the file from source path
        s3_client.delete_object(
            Bucket=bucket,
            Key=obj_name
        )
    


{'ResponseMetadata': {'RequestId': 'QDP1HZ56VDY6Y50Q', 'HostId': 'pr8TdPSSF+9g7hjfTD1kiAd/48CaimVuSR8ybHuPPe1YusKStwOWVcJRvqSSKNUlPJqKnG5woJ5Imbv6ov5PltwikSGHW/aK', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'pr8TdPSSF+9g7hjfTD1kiAd/48CaimVuSR8ybHuPPe1YusKStwOWVcJRvqSSKNUlPJqKnG5woJ5Imbv6ov5PltwikSGHW/aK', 'x-amz-request-id': 'QDP1HZ56VDY6Y50Q', 'date': 'Sat, 14 Dec 2024 17:42:52 GMT', 'x-amz-server-side-encryption': 'AES256', 'content-type': 'application/xml', 'content-length': '224', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'ServerSideEncryption': 'AES256', 'CopyObjectResult': {'ETag': '"d41d8cd98f00b204e9800998ecf8427e"', 'LastModified': datetime.datetime(2024, 12, 14, 17, 42, 52, tzinfo=tzlocal())}}
{'ResponseMetadata': {'RequestId': 'QDP8G16JHEPK32YM', 'HostId': 'X5O11++rCCwoVqthy6L1wlSWfTY3oSi+1gJI/xfA5WU0RM4/CFxNGj0X739QdOUATFIFf932tEVJd08yd95BYl8DGKumnvom', 'HTTPStatusCode': 204, 'HTTPHeaders': {'x-amz-id-2': 'X5O11++rCCwoVqthy6L1wlSWfTY3oSi+1gJI/xfA5WU0RM4/CFxNGj0