
# Glue Studio Notebook
You are now running a **Glue Studio** notebook; before you can start using your notebook you *must* start an interactive session.

## Available Magics
|          Magic              |   Type       |                                                                        Description                                                                        |
|-----------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| %%configure                 |  Dictionary  |  A json-formatted dictionary consisting of all configuration parameters for a session. Each parameter can be specified here or through individual magics. |
| %profile                    |  String      |  Specify a profile in your aws configuration to use as the credentials provider.                                                                          |
| %iam_role                   |  String      |  Specify an IAM role to execute your session with.                                                                                                        |
| %region                     |  String      |  Specify the AWS region in which to initialize a session.                                                                                                 |
| %session_id                 |  String      |  Returns the session ID for the running session.                                                                                                          |
| %connections                |  List        |  Specify a comma separated list of connections to use in the session.                                                                                     |
| %additional_python_modules  |  List        |  Comma separated list of pip packages, s3 paths or private pip arguments.                                                                                 |
| %extra_py_files             |  List        |  Comma separated list of additional Python files from S3.                                                                                                 |
| %extra_jars                 |  List        |  Comma separated list of additional Jars to include in the cluster.                                                                                       |
| %number_of_workers          |  Integer     |  The number of workers of a defined worker_type that are allocated when a job runs. worker_type must be set too.                                          |
| %glue_version               |  String      |  The version of Glue to be used by this session. Currently, the only valid options are 2.0 and 3.0 (eg: %glue_version 2.0).                               |
| %security_config            |  String      |  Define a security configuration to be used with this session.                                                                                            |
| %sql                        |  String      |  Run SQL code. All lines after the initial %%sql magic will be passed as part of the SQL code.                                                            |
| %streaming                  |  String      |  Changes the session type to Glue Streaming.                                                                                                              |
| %etl                        |  String      |  Changes the session type to Glue ETL.                                                                                                                    |
| %status                     |              |  Returns the status of the current Glue session including its duration, configuration and executing user / role.                                          |
| %stop_session               |              |  Stops the current session.                                                                                                                               |
| %list_sessions              |              |  Lists all currently running sessions by name and ID.                                                                                                     |
| %worker_type                |  String      |  Standard, G.1X, *or* G.2X. number_of_workers must be set too. Default is G.1X.                                                                           |
| %spark_conf                 |  String      |  Specify custom spark configurations for your session. E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer.                      |

In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.0 
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::393747608406:role/gluelearning-access-jogesh
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: 0412df71-24fc-4a27-b02e-44a9953c36b6
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.0
--enable-glue-datacatalog true
Waiting for session 0412df71-24fc-4a27-b02e-44a9953c36b6 to get into ready status...
Session 0412df71-24fc-4a27-b02e-44a9953c36b6 has been created.



## Sample code To perform Multiple table Extraction and merge them simultaneously

In [62]:
# Defining the source information in a dictionary
"""
Example:
dict = {
"database_name": {
"table_name": []
}
}
"""
config = {
"pyspark_123_db":{
"customers": [],
"employees": [],
"orders":[]
}
}

# Loop to create Glue Dynamicframes.
dict_dyf = {}
for db_name in config:
    source_database_name = db_name
    for tbl_name in config[db_name]:
        source_table_name = tbl_name
        a = glueContext.create_dynamic_frame.from_catalog(
        database = source_database_name,
        table_name = source_table_name,
        transformation_ctx = "extract_data"
        )
        dict_dyf.update({tbl_name:a})
            
        




## Checking the Schema for the data and correcting the same

In [58]:
##dict_dyf['customers'].printSchema()
mappings = [("customer_id","choice","customer_id","bigint"), ("first_name", "string","first_name", "string"),("last_name", "string","last_name", "string"), ("full_name", "string","full_name", "string")]
dyf_output = ApplyMapping.apply(
frame = dict_dyf['customers'],
mappings = mappings,
transformation_ctx = "ctx"
)

##dyf_output.printSchema()

mappings = [("customer_id","long","customer_id","bigint"), ("first_name", "string","first_name", "string"),("last_name", "string","last_name", "string"), ("full_name", "string","full_name", "string")]
dyf = ApplyMapping.apply(
frame = dyf_output,
mappings = mappings,
transformation_ctx = "ctx"
)

##dyf.printSchema()

dict_dyf['customers'] = dyf
dict_dyf['customers'].printSchema()
dict_dyf['employees'].printSchema()
dict_dyf['orders'].printSchema()

root
|-- customer_id: long
|-- first_name: string
|-- last_name: string
|-- full_name: string

root
|-- employee_id: long
|-- manager_id: choice
|    |-- long
|    |-- string
|-- first_name: string
|-- 	last_name: string
|-- 	full_name: string
|-- 	jobtitle: string
|-- 	organizationlevel: long
|-- maritalstatus: string
|-- gender	: string
|-- territory: string
|-- country: string
|-- group: string

root
|-- salesorder_id: long
|-- 	salesorderdetail_id: long
|-- 	orderdate: string
|-- duedate: string
|-- shipdate: string
|-- employee_id: long
|-- customer_id: long
|-- 	subtotal: double
|-- taxamt: double
|-- freight: double
|-- totaldue: double
|-- product_id: long
|-- 	orderqty: long
|-- unitprice: double
|-- unitpricediscount: double
|-- 	linetotal: double


## Viewing the stored Dynamic frame in Dictionary

In [49]:
print(dict_dyf)

{'customers': <awsglue.dynamicframe.DynamicFrame object at 0x7f420b7d2f50>, 'employees': <awsglue.dynamicframe.DynamicFrame object at 0x7f420b732b50>, 'orders': <awsglue.dynamicframe.DynamicFrame object at 0x7f420b7325d0>}


## CREATING VIEWS

In [60]:
#Defining Master table Dictionary
"""
dict = {
"table": "master_table_name"
}
"""
master_table = {
"table": "orders"
}

# Loop to create views
for i,j in dict_dyf.items():
    for k in master_table.values():
        if i == k:
            master_table = j
            master_df = master_table.toDF()
            master_df.createOrReplaceTempView("master")
        else:
            dim_table = j
            dim_df = dim_table.toDF()
            dim_df.createOrReplaceTempView(f"{i}")




## Performing the Joins to the master table

In [61]:
#Defining the dictionary containing the dimension table Unique key Mapping to Master Dataset
"""
Example: 
dict = {
"table_name": {"dim_table_unique_key": "master_table_unique_key"}
}
"""

dim_table_mapping = {
"customers": {"customer_id": "customer_id"},
"employees": {"employee_id": "employee_id"}
}

for i,j in dim_table_mapping.items():
    for k,l in j.items():
        dim_psdf = spark.sql(f"SELECT * FROM {i}")
        dim_psdf = dim_psdf.withColumnRenamed(f"{k}", "extra")
        dim_psdf.createOrReplaceTempView(f"{i}")
        spark.sql(f"""
                SELECT * FROM master
                INNER JOIN {i}
                ON master.{l} = {i}.extra
                """).createOrReplaceTempView("data")
        psdf_out = spark.sql("SELECT * FROM data")
        psdf_out = psdf_out.drop("extra")
        psdf_out.createOrReplaceTempView("data")




## Viewing the Schema for Final Merged Data

In [63]:
# Converting the view to Pyspark Dataframe
psdf = spark.sql("SELECT * FROM data")
psdf.printSchema()

root
 |-- salesorder_id: long (nullable = true)
 |-- 	salesorderdetail_id: long (nullable = true)
 |-- 	orderdate: string (nullable = true)
 |-- duedate: string (nullable = true)
 |-- shipdate: string (nullable = true)
 |-- employee_id: long (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- 	subtotal: double (nullable = true)
 |-- taxamt: double (nullable = true)
 |-- freight: double (nullable = true)
 |-- totaldue: double (nullable = true)
 |-- product_id: long (nullable = true)
 |-- 	orderqty: long (nullable = true)
 |-- unitprice: double (nullable = true)
 |-- unitpricediscount: double (nullable = true)
 |-- 	linetotal: double (nullable = true)
 |-- manager_id: struct (nullable = true)
 |    |-- long: long (nullable = true)
 |    |-- string: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- 	last_name: string (nullable = true)
 |-- 	full_name: string (nullable = true)
 |-- 	jobtitle: string (nullable = true)
 |-- 	organizationlevel: long (nullable

## Writing the data to Glue Catalog and partitioning it to S3

In [56]:
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import current_timestamp
psdf = psdf.withColumn("load_date", current_timestamp())
gdyf = DynamicFrame.fromDF(psdf, glueContext, "convert")
gdyf.printSchema()
write_parquet = glueContext.getSink(
path = "s3://gluepractice-jogesh-bucket/temp-dir/",
connection_type = "s3",
updateBehaviour = "LOG",
partitionKeys = "load_date",
compression = "snappy",
enableDataCatalog = True,
transformation_ctx = 'load_data'
)
write_parquet.setCatalogInfo(catalogDatabase = 'retail_opsreadiness_datalake', catalogTableName = 'processed_customer_data')
write_parquet.setFormat("glueparquet")
write_parquet.writeFrame(gdyf)

root
|-- salesorder_id: long
|-- 	salesorderdetail_id: long
|-- 	orderdate: string
|-- duedate: string
|-- shipdate: string
|-- employee_id: long
|-- customer_id: long
|-- 	subtotal: double
|-- taxamt: double
|-- freight: double
|-- totaldue: double
|-- product_id: long
|-- 	orderqty: long
|-- unitprice: double
|-- unitpricediscount: double
|-- 	linetotal: double
|-- manager_id: struct
|    |-- long: long
|    |-- string: string
|-- first_name: string
|-- 	last_name: string
|-- 	full_name: string
|-- 	jobtitle: string
|-- 	organizationlevel: long
|-- maritalstatus: string
|-- gender	: string
|-- territory: string
|-- country: string
|-- group: string
|-- load_date: timestamp

<awsglue.dynamicframe.DynamicFrame object at 0x7f420b7f5410>
