# ANT345 - Multicloud Analytics - Glue Notebook
##### We are now running an AWS Glue Studio notebook, and can use Glue Interactive Sessions to initiate a Spark shell


#### Use the `%help` command to see available notebook commands ("magics").


In [2]:
%help

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.2 



# Available Magic Commands

## Sessions Magic

----
    %help                             Return a list of descriptions and input types for all magic commands. 
    %profile            String        Specify a profile in your aws configuration to use as the credentials provider.
    %region             String        Specify the AWS region in which to initialize a session. 
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\ USERNAME \.aws\config" on Windows.
    %idle_timeout       Int           The number of minutes of inactivity after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %session_id_prefix  String        Define a String that will precede all session IDs in the format 
                                      [session_id_prefix]-[session_id]. If a session ID is not provided,
                                      a random UUID will be generated.
    %status                           Returns the status of the current Glue session including its duration, 
                                      configuration and executing user / role.
    %session_id                       Returns the session ID for the running session. 
    %list_sessions                    Lists all currently running sessions by ID.
    %stop_session                     Stops the current session.
    %glue_version       String        The version of Glue to be used by this session. 
                                      Currently, the only valid options are 2.0, 3.0 and 4.0. 
                                      Default: 2.0.
----

## Selecting Session Types

----
    %streaming          String        Sets the session type to Glue Streaming.
    %etl                String        Sets the session type to Glue ETL.
    %glue_ray           String        Sets the session type to Glue Ray.
    %session_type       String        Specify a session_type to be used. Supported values: streaming, etl and glue_ray. 
----

## Glue Config Magic 
*(common across all session types)*

----

    %%configure         Dictionary    A json-formatted dictionary consisting of all configuration parameters for 
                                      a session. Each parameter can be specified here or through individual magics.
    %iam_role           String        Specify an IAM role ARN to execute your session with.
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\%USERNAME%\.aws\config` on Windows.
    %number_of_workers  int           The number of workers of a defined worker_type that are allocated 
                                      when a session runs.
                                      Default: 5.
    %additional_python_modules  List  Comma separated list of additional Python modules to include in your cluster 
                                      (can be from Pypi or S3).
    %%tags        Dictionary          Specify a json-formatted dictionary consisting of tags to use in the session.
    
    %%assume_role Dictionary, String  Specify a json-formatted dictionary or an IAM role ARN string to create a session 
                                      for cross account access.
                                      E.g. {valid arn}
                                      %%assume_role 
                                      'arn:aws:iam::XXXXXXXXXXXX:role/AWSGlueServiceRole' 
                                      E.g. {credentials}
                                      %%assume_role
                                      {
                                            "aws_access_key_id" : "XXXXXXXXXXXX",
                                            "aws_secret_access_key" : "XXXXXXXXXXXX",
                                            "aws_session_token" : "XXXXXXXXXXXX"
                                       }
----

                                      
## Magic for Spark Sessions (ETL & Streaming)

----
    %worker_type        String        Set the type of instances the session will use as workers. 
    %connections        List          Specify a comma separated list of connections to use in the session.
    %extra_py_files     List          Comma separated list of additional Python files From S3.
    %extra_jars         List          Comma separated list of additional Jars to include in the cluster.
    %spark_conf         String        Specify custom spark configurations for your session. 
                                      E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer
----
                                      
## Magic for Ray Session

----
    %min_workers        Int           The minimum number of workers that are allocated to a Ray session. 
                                      Default: 1.
    %object_memory_head Int           The percentage of free memory on the instance head node after a warm start. 
                                      Minimum: 0. Maximum: 100.
    %object_memory_worker Int         The percentage of free memory on the instance worker nodes after a warm start. 
                                      Minimum: 0. Maximum: 100.
----

## Action Magic

----

    %%sql               String        Run SQL code. All lines after the initial %%sql magic will be passed
                                      as part of the SQL code.  
----



####  Use magics to set up our Interactive Session settings (idle timeout, version of Glue, etc).
#### We can also specify our multi-cloud connections in this section. 


In [8]:
%idle_timeout 80
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5
%connections bigquery,snowflake,ADLSgen2

Current idle_timeout is None minutes.
idle_timeout has been set to 80 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Connections to be included:
bigquery
snowflake
ADLSgen2


#### Import relevant libraries for our job, and initiate the Spark/Glue context and Spark session

In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import format_number
import pyspark.sql.functions as sf
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Session ID: ce03f012-54cf-4c8b-856e-00a8caa234d6
Applying the following default arguments:
--glue_kernel_version 1.0.2
--enable-glue-datacatalog true
Waiting for session ce03f012-54cf-4c8b-856e-00a8caa234d6 to get into ready status...
Session ce03f012-54cf-4c8b-856e-00a8caa234d6 has been created.



#### Create Glue DynamicFrame using the **bigquery** connection. 
Options that need to be included when running a query using this connector include:
- materializationDataset: This specifies a BigQuery dataset that can be used to store a temporary table containing the result of the query we run
- parentProject: This specifies the name of the GCP parent project that contains the BigQuery resource we want to query
- viewsEnabled: This option must be set to true in order to run a SQL query using the connector
- query: This specifies the SQL query that we want to run
- connectionName: The name of the AWS Glue connection we want to use

Note that the SQL query we specify here is executed by BigQuery, and only the results of the query are transferred over the network from GCP to AWS. The query we specify aggregates the raw daily data we have in BigQuery (covering each day over the last 2.5 years) and calculates the net sales by year and month. This is much more efficient than loading all the data from the table in GCP, and then filtering or aggregating the data in Spark.

In [2]:
bq_web_net_sales = glueContext.create_dynamic_frame.from_options(
    connection_type="bigquery",
    connection_options={
        "materializationDataset": "TempViews",
        "parentProject": "multicloud-analytics",
        "viewsEnabled": "true",
        "query": """
        WITH web_sales_date AS (
        select *,
            round(round(web_sales, 2) - round(web_returns, 2), 2) as net_sales,
            extract(month from cast(date as timestamp)) date_month,
            extract(year from cast(date as timestamp)) date_year
        from `multicloud-analytics.AnyCompany.daily_web_sales` 
        )

        select
            date_year, date_month, round(SUM(net_sales),2) as web_net_sales
        from web_sales_date
        group by date_year, date_month
        order by date_year, date_month asc
        """,
        "connectionName": "bigquery",
    },
)
        




In [3]:
# Print bq_net_web_sales schema
bq_web_net_sales.printSchema()

# Display contents of bq_web_net_sales, sorted by date_year then date_month
bq_web_net_sales.toDF().sort("date_year", "date_month").show()

# Count the rows in bq_web_net_sales
bq_web_net_sales.count()

root
|-- date_year: long
|-- date_month: long
|-- web_net_sales: double

+---------+----------+-------------+
|date_year|date_month|web_net_sales|
+---------+----------+-------------+
|     2021|         1|    796957.39|
|     2021|         2|    705379.77|
|     2021|         3|     778931.1|
|     2021|         4|    732405.26|
|     2021|         5|    768200.52|
|     2021|         6|    744789.88|
|     2021|         7|    777226.65|
|     2021|         8|     755394.4|
|     2021|         9|    729603.49|
|     2021|        10|    758422.85|
|     2021|        11|    708773.45|
|     2021|        12|    759453.98|
|     2022|         1|    780733.38|
|     2022|         2|    704074.24|
|     2022|         3|    785959.02|
|     2022|         4|    719178.31|
|     2022|         5|    748272.72|
|     2022|         6|    778954.65|
|     2022|         7|    754346.96|
|     2022|         8|    710306.32|
+---------+----------+-------------+
only showing top 20 rows

33


#### Create Glue DynamicFrame using the **snowflake** connection. 
Options that need to be included when running a query using this connector include:
- autopushdown: This option must be set to *on* in order to specify a query that you want Snowflake to run
- connectionName: The name of the AWS Glue connection we want to use
- sfDatabase: The name of the Snowflake database containing the table you want to query
- query: This specifies the SQL query that you want to run

The query we specify is run by Snowflake, and aggregates the raw daily sales data for our stores to calculate net sales, by year nad month. Only the results of the query are passed over the network from Snowflake to Glue. 

In [4]:
sf_store_net_sales = glueContext.create_dynamic_frame.from_options(
    connection_type="snowflake",
    connection_options={
        "autopushdown": "on",
        "connectionName": "snowflake",
        "sfDatabase": "ANYCOMPANY",
        "query": """
        WITH store_sales_date AS (
                select *,
                    round(round(store_sales, 2) - round(store_returns, 2), 2) as net_sales,
                    extract(month from sales_date) date_month,
                    extract(year from sales_date) date_year
                from store_sales
            )

            select date_year, date_month, round(SUM(net_sales),2) as store_net_sales
            from store_sales_date
            group by date_year, date_month
            order by date_year, date_month asc
            """,
    }
)




In [5]:
# Print sf_store_net_sales schema
sf_store_net_sales.printSchema()

# Display contents of sf_store_net_sales, sorted by date_year then date_month
sf_store_net_sales.toDF().sort("date_year", "date_month").show(20)

# Count the rows in sf_store_net_sales
sf_store_net_sales.count()

root
|-- DATE_YEAR: decimal
|-- DATE_MONTH: decimal
|-- STORE_NET_SALES: decimal

+---------+----------+---------------+
|DATE_YEAR|DATE_MONTH|STORE_NET_SALES|
+---------+----------+---------------+
|     2021|         1|      995270.22|
|     2021|         2|      883355.84|
|     2021|         3|      975316.06|
|     2021|         4|      951041.91|
|     2021|         5|      964885.17|
|     2021|         6|      917237.75|
|     2021|         7|     1012048.65|
|     2021|         8|      972043.88|
|     2021|         9|      934952.16|
|     2021|        10|      996833.78|
|     2021|        11|      934065.69|
|     2021|        12|      961486.12|
|     2022|         1|      986548.55|
|     2022|         2|      852694.85|
|     2022|         3|      986495.57|
|     2022|         4|      957231.21|
|     2022|         5|      974230.58|
|     2022|         6|      957339.40|
|     2022|         7|      980734.73|
|     2022|         8|      961008.91|
+---------+----------

#### Join the BigQuery and Snowflake aggregated table data to create a new *total_net_sales* DynamicFrame
In this code block we do the following:
- We join the two DynamicFrames on the year and month columns. Note that Snowflake column names are in uppercase
- We drop the unneeded data and month columns
- We print the DynamicFrame schema
- We show a sample of content from the DynamicFrame, but convert it to a Spark dataframe so we can use the sort function
- We count the rows in the DynamicFrame

In [6]:
total_net_sales = Join.apply(bq_web_net_sales, sf_store_net_sales, ["date_year","date_month"], ["DATE_YEAR","DATE_MONTH"])
# Delete unneeded columns
total_net_sales = total_net_sales.drop_fields(["DATE_YEAR","DATE_MONTH"])
# Print Schema, then show contents of total_net_sales, and count rows
total_net_sales.printSchema()
total_net_sales.toDF().sort("date_year", "date_month").show(20)
total_net_sales.count()

root
|-- date_month: long
|-- STORE_NET_SALES: decimal
|-- date_year: long
|-- web_net_sales: double

+----------+---------------+---------+-------------+
|date_month|STORE_NET_SALES|date_year|web_net_sales|
+----------+---------------+---------+-------------+
|         1|      995270.22|     2021|    796957.39|
|         2|      883355.84|     2021|    705379.77|
|         3|      975316.06|     2021|     778931.1|
|         4|      951041.91|     2021|    732405.26|
|         5|      964885.17|     2021|    768200.52|
|         6|      917237.75|     2021|    744789.88|
|         7|     1012048.65|     2021|    777226.65|
|         8|      972043.88|     2021|     755394.4|
|         9|      934952.16|     2021|    729603.49|
|        10|      996833.78|     2021|    758422.85|
|        11|      934065.69|     2021|    708773.45|
|        12|      961486.12|     2021|    759453.98|
|         1|      986548.55|     2022|    780733.38|
|         2|      852694.85|     2022|    704074.2

#### Create Glue DynamicFrame using the **Azure Data Lake Storage (ADLS)** connection. 
Options that need to be included when running a query using this connector include:
- fileFormat: Specifies the format of the file stored in ADLS (in this case, a CSV file)
- path: The path in ADLS for the file we want to load data from
- header: When working with a CSV file the header option specifies if the CSV file has a header line with column names
- connectionName: The name of the AWS Glue connection

Note that with the ADLS connector all data is read from the specified file - you are not able to specify a query to run. In our case, we are reading a limited amount of data that contains the Consumer Confidence Index score for the past 2.5 years. 

In [7]:
ADLS_consumer_confidence = glueContext.create_dynamic_frame.from_options(
        connection_type="marketplace.spark",
        connection_options={
            "fileFormat": "csv",
            "path": "/consumer_confidence.csv",
            "header": "true",
            "connectionName": "ADLSgen2",
        },
        transformation_ctx="ADLS_consumer_confidence",
    )

ADLS_consumer_confidence.toDF().show()

+----+-----+---------+
|year|month|cci_value|
+----+-----+---------+
|2023|    7|      117|
|2022|   12|    108.3|
|2023|    4|    101.3|
|2021|   11|    111.9|
|2023|    2|    103.4|
|2021|    9|    109.8|
|2021|    4|    117.5|
|2021|    1|     88.9|
|2022|    9|    107.8|
|2022|   11|    101.4|
|2021|   12|    115.2|
|2023|    9|      103|
|2021|    6|    128.9|
|2022|    2|    105.7|
|2021|    3|      109|
|2022|    5|    103.2|
|2022|    4|    108.6|
|2022|   10|    102.5|
|2022|    7|     95.3|
|2022|    3|    107.6|
+----+-----+---------+
only showing top 20 rows


#### Join the *total_net_sales* and *ADLS_consumer_confidence* DynamicFrames
In this code block we do the following:
- Create a new DynamicFrame called total_net_sales_with_confidence by joining the previously create total_net_sales DynamicFrame with the consumer confidence index data retrieved from Azure Data Lake Storage. 
- Delete uneeded columns (year and month)
- Display a sample of the data from the new DynamicFrame (although we convert to a DataFrame for improved formatting display)

In [8]:
# Join total_net_sales and ADLS_consumer_confidence
total_net_sales_with_confidence = Join.apply(total_net_sales, ADLS_consumer_confidence, ["date_year","date_month"], ["year","month"])
# Delete unneeded columns
total_net_sales_with_confidence = total_net_sales_with_confidence.drop_fields(["year","month"])
# Display contents of total_net_sales_with_confidence
total_net_sales_with_confidence.toDF().show()

+----------+---------------+---------+-------------+---------+
|date_month|STORE_NET_SALES|cci_value|web_net_sales|date_year|
+----------+---------------+---------+-------------+---------+
|         2|      883355.84|     90.4|    705379.77|     2021|
|         2|      852694.85|    105.7|    704074.24|     2022|
|         1|      995270.22|     88.9|    796957.39|     2021|
|        12|      961486.12|    115.2|    759453.98|     2021|
|         3|      975316.06|      109|     778931.1|     2021|
|         3|      986495.57|    107.6|    785959.02|     2022|
|         4|      951041.91|    117.5|    732405.26|     2021|
|         2|      870137.75|    103.4|    704881.79|     2023|
|        12|      980373.87|    108.3|    767855.13|     2022|
|        11|      934065.69|    111.9|    708773.45|     2021|
|         1|      986548.55|    111.1|    780733.38|     2022|
|        10|      996833.78|    111.6|    758422.85|     2021|
|         4|      957231.21|    108.6|    719178.31|   

#### Final formatting
In this  code block, we do some final formatting. 
- We rename the STORE_NET_SALES column (which came from Snowflake) to have a lowercase name
- We change the order of the columns using the DynamicFrame *select_fields* function
- We display some sample data from the DynamicFrame (but again convert it to a DataFrame for improved formatting)

In [9]:
# Rename STORE_NET_SALES to store_net_sales
total_net_sales_with_confidence = total_net_sales_with_confidence.rename_field("STORE_NET_SALES", "store_net_sales")

# Change column order
total_net_sales_with_confidence = total_net_sales_with_confidence.select_fields(["date_year","date_month","store_net_sales","web_net_sales","cci_value"])
total_net_sales_with_confidence.toDF().sort("date_year", "date_month").show()

+---------+----------+---------------+-------------+---------+
|date_year|date_month|store_net_sales|web_net_sales|cci_value|
+---------+----------+---------------+-------------+---------+
|     2021|         1|      995270.22|    796957.39|     88.9|
|     2021|         2|      883355.84|    705379.77|     90.4|
|     2021|         3|      975316.06|     778931.1|      109|
|     2021|         4|      951041.91|    732405.26|    117.5|
|     2021|         5|      964885.17|    768200.52|      120|
|     2021|         6|      917237.75|    744789.88|    128.9|
|     2021|         7|     1012048.65|    777226.65|    125.1|
|     2021|         8|      972043.88|     755394.4|    115.2|
|     2021|         9|      934952.16|    729603.49|    109.8|
|     2021|        10|      996833.78|    758422.85|    111.6|
|     2021|        11|      934065.69|    708773.45|    111.9|
|     2021|        12|      961486.12|    759453.98|    115.2|
|     2022|         1|      986548.55|    780733.38|   

#### Write the new table to Amazon S3
In this final code block we write the data from our final DynamicFrame to Amazon S3. 

The data file we write to S3 contains data that was joined across 3 non-AWS data sources -- *Google BigQuery*, *Snowflake*, and *Azure Data Lake Storage*. 

In [10]:
# Write total_net_sales_with_confidence to S3 and store in Glue catalog
glueContext.write_dynamic_frame.from_options(
    frame=total_net_sales_with_confidence,
    connection_type="s3",
    format="csv",
    connection_options={
        "path": "s3://multicloud-analytics-output/total_net_sales_with_confidence",
    },
)

<awsglue.dynamicframe.DynamicFrame object at 0x7f8bdd309870>
