# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.0 
Current idle_timeout is 2800 minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 5
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::835769464848:role/IAM-Role-etl-project
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: cb879fe5-556a-47cc-907a-49f544026ee1
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.0
--enable-glue-datacatalog true
Waiting for session cb879fe5-5

#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [2]:
dyf = glueContext.create_dynamic_frame.from_catalog(database='etl-project-for-medium-database', table_name='raw_data')
dyf.printSchema()

root
|-- id: long
|-- year_birth: long
|-- education: string
|-- marital_status: string
|-- income: long
|-- kidhome: long
|-- teenhome: long
|-- dt_customer: string
|-- recency: long
|-- mntwines: long
|-- mntfruits: long
|-- mntmeatproducts: long
|-- mntfishproducts: long
|-- mntsweetproducts: long
|-- mntgoldprods: long
|-- numdealspurchases: long
|-- numwebpurchases: long
|-- numcatalogpurchases: long
|-- numstorepurchases: long
|-- numwebvisitsmonth: long
|-- acceptedcmp3: long
|-- acceptedcmp4: long
|-- acceptedcmp5: long
|-- acceptedcmp1: long
|-- acceptedcmp2: long
|-- complain: long
|-- z_costcontact: long
|-- z_revenue: long
|-- response: long


#### Convert the DynamicFrame to a Spark DataFrame and display a sample of the data


In [3]:
df = dyf.toDF()
df.show()

+----+----------+----------+--------------+------+-------+--------+-----------+-------+--------+---------+---------------+---------------+----------------+------------+-----------------+---------------+-------------------+-----------------+-----------------+------------+------------+------------+------------+------------+--------+-------------+---------+--------+
|  id|year_birth| education|marital_status|income|kidhome|teenhome|dt_customer|recency|mntwines|mntfruits|mntmeatproducts|mntfishproducts|mntsweetproducts|mntgoldprods|numdealspurchases|numwebpurchases|numcatalogpurchases|numstorepurchases|numwebvisitsmonth|acceptedcmp3|acceptedcmp4|acceptedcmp5|acceptedcmp1|acceptedcmp2|complain|z_costcontact|z_revenue|response|
+----+----------+----------+--------------+------+-------+--------+-----------+-------+--------+---------+---------------+---------------+----------------+------------+-----------------+---------------+-------------------+-----------------+-----------------+----------

#### Drop columns that we don't need it

In [6]:
df = df["id","year_birth","education","marital_status","income","dt_customer"]
df.show()

+----+----------+----------+--------------+------+-----------+
|  id|year_birth| education|marital_status|income|dt_customer|
+----+----------+----------+--------------+------+-----------+
|5524|      1957|Graduation|        Single| 58138| 2012-09-04|
|2174|      1954|Graduation|        Single| 46344| 2014-03-08|
|4141|      1965|Graduation|      Together| 71613| 2013-08-21|
|6182|      1984|Graduation|      Together| 26646| 2014-02-10|
|5324|      1981|       PhD|       Married| 58293| 2014-01-19|
|7446|      1967|    Master|      Together| 62513| 2013-09-09|
| 965|      1971|Graduation|      Divorced| 55635| 2012-11-13|
|6177|      1985|       PhD|       Married| 33454| 2013-05-08|
|4855|      1974|       PhD|      Together| 30351| 2013-06-06|
|5899|      1950|       PhD|      Together|  5648| 2014-03-13|
|1994|      1983|Graduation|       Married|  null| 2013-11-15|
| 387|      1976|     Basic|       Married|  7500| 2012-11-13|
|2125|      1959|Graduation|      Divorced| 63033| 2013

#### Check NaN values for each column

In [9]:
from pyspark.sql.functions import *

df.select([count(when(col(c).isNull(),c)).alias(c) for c in df.columns]).show()

+---+----------+---------+--------------+------+-----------+
| id|year_birth|education|marital_status|income|dt_customer|
+---+----------+---------+--------------+------+-----------+
|  0|         0|        0|             0|    24|          0|
+---+----------+---------+--------------+------+-----------+


#### There are 24 NaN values in "income" column. Let's fill NaN values with mean.

In [11]:
# Calculate the mean value of the column
mean_value = df.select(mean(col('income'))).collect()[0][0]

# Fill missing values with the mean value
df = df.fillna(mean_value, subset=['income'])

# Check
df.select([count(when(col(c).isNull(),c)).alias(c) for c in df.columns]).show()

+---+----------+---------+--------------+------+-----------+
| id|year_birth|education|marital_status|income|dt_customer|
+---+----------+---------+--------------+------+-----------+
|  0|         0|        0|             0|     0|          0|
+---+----------+---------+--------------+------+-----------+


#### Write the data to our S3 Bucket named "transformed_data" as csv.

In [18]:
df.write \
    .format("csv") \
    .mode("append") \
    .option("header", "true") \
    .save("s3://etl-project-for-medium/etl-project-for-medium-database/transformed_data/")




#### Write the data to our S3 Bucket named "transformed_data" as json.

In [19]:
df.write \
    .format("json") \
    .mode("append") \
    .save("s3://etl-project-for-medium/etl-project-for-medium-database/transformed_data/")


