In [11]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Create DynamicFrame from Glue Catalog

In [12]:
sales = glueContext.create_dynamic_frame.from_catalog(
                 database="s3_database",
                 table_name="sales")
print ("Count: ", sales.count())
sales.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Count:  2823
root
|-- ordernumber: long
|-- quantityordered: long
|-- priceeach: double
|-- orderlinenumber: long
|-- sales: double
|-- orderdate: string
|-- status: string
|-- qtr_id: long
|-- month_id: long
|-- year_id: long
|-- productline: string
|-- msrp: long
|-- productcode: string
|-- dealsize: string
|-- customerid: long

### Selecting few columns

In [3]:
data = sales.select_fields('ordernumber')
data.toDF().show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+
|ordernumber|
+-----------+
|      10107|
|      10121|
|      10134|
|      10145|
|      10159|
+-----------+
only showing top 5 rows

In [4]:
data = sales.select_fields(['ordernumber','dealsize'])
data.toDF().show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------+
|ordernumber|dealsize|
+-----------+--------+
|      10107|   Small|
|      10121|   Small|
+-----------+--------+
only showing top 2 rows

### Drop Columns

In [5]:
data = sales.drop_fields(['msrp','customerid'])
data.toDF().show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+---------------+---------+---------------+------+--------------+-------+------+--------+-------+-----------+-----------+--------+
|ordernumber|quantityordered|priceeach|orderlinenumber| sales|     orderdate| status|qtr_id|month_id|year_id|productline|productcode|dealsize|
+-----------+---------------+---------+---------------+------+--------------+-------+------+--------+-------+-----------+-----------+--------+
|      10107|             30|     95.7|              2|2871.0|2/24/2003 0:00|Shipped|     1|       2|   2003|Motorcycles|   S10_1678|   Small|
|      10121|             34|    81.35|              5|2765.9| 5/7/2003 0:00|Shipped|     2|       5|   2003|Motorcycles|   S10_1678|   Small|
+-----------+---------------+---------+---------------+------+--------------+-------+------+--------+-------+-----------+-----------+--------+
only showing top 2 rows

In [6]:
sales.toDF().show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+---------------+---------+---------------+------+--------------+-------+------+--------+-------+-----------+----+-----------+--------+----------+
|ordernumber|quantityordered|priceeach|orderlinenumber| sales|     orderdate| status|qtr_id|month_id|year_id|productline|msrp|productcode|dealsize|customerid|
+-----------+---------------+---------+---------------+------+--------------+-------+------+--------+-------+-----------+----+-----------+--------+----------+
|      10107|             30|     95.7|              2|2871.0|2/24/2003 0:00|Shipped|     1|       2|   2003|Motorcycles|  95|   S10_1678|   Small|         1|
|      10121|             34|    81.35|              5|2765.9| 5/7/2003 0:00|Shipped|     2|       5|   2003|Motorcycles|  95|   S10_1678|   Small|         2|
+-----------+---------------+---------+---------------+------+--------------+-------+------+--------+-------+-----------+----+-----------+--------+----------+
only showing top 2 rows

### Rename Fileds

In [9]:
data = sales.rename_field('customerid','cust-id').rename_field('dealsize','deal-size')
data.toDF().show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+---------------+---------+---------------+------+--------------+-------+------+--------+-------+-----------+----+-----------+-------+---------+
|ordernumber|quantityordered|priceeach|orderlinenumber| sales|     orderdate| status|qtr_id|month_id|year_id|productline|msrp|productcode|cust-id|deal-size|
+-----------+---------------+---------+---------------+------+--------------+-------+------+--------+-------+-----------+----+-----------+-------+---------+
|      10107|             30|     95.7|              2|2871.0|2/24/2003 0:00|Shipped|     1|       2|   2003|Motorcycles|  95|   S10_1678|      1|    Small|
|      10121|             34|    81.35|              5|2765.9| 5/7/2003 0:00|Shipped|     2|       5|   2003|Motorcycles|  95|   S10_1678|      2|    Small|
+-----------+---------------+---------+---------------+------+--------------+-------+------+--------+-------+-----------+----+-----------+-------+---------+
only showing top 2 rows

## Apply Mapping transformation

##### apply_mapping transform method to drop, rename, cast, and nest the data so that other data programming languages and systems can easily access it:

In [19]:
new_dyf = sales.apply_mapping([('quantityordered','long','quantityordered','double')]) 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
new_dyf.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- quantityordered: double

In [21]:
new_dyf.schema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

StructType([Field(quantityordered, DoubleType({}), {})], {})

In [16]:
new_dyf.toDF().show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+
|quantityordered|
+---------------+
|           30.0|
|           34.0|
+---------------+
only showing top 2 rows

## Filter

In [38]:
sales.filter('sales' > '100').toDF().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

++
||
++
++

## Joining dataframes

#### AWS Glue currenty support only inner join, to perform other joins Dynamic Frames to be converted to Spark Dataframes

In [27]:
customers = glueContext.create_dynamic_frame.from_catalog(database='s3_database',table_name='customers')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
customers.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- customerid: long
|-- customername: string
|-- email: string
|-- city: string
|-- country: string
|-- territory: string
|-- contactfirstname: string
|-- contactlastname: string

In [29]:
Join_DF = Join.apply(sales,customers,'customerid','customerid').drop_fields('.customerid')
Join_DF.toDF().show(5)
Join_DF.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+-------+---------+--------------+--------------------+-------+------------+---------------+---------+-----------+------+----+---------------+-----------+-------+-------------------+----------------+-----------+---------------+----------+--------+--------+---------+
|orderlinenumber|  sales|territory|          city|        customername|year_id| productline|contactlastname|  country|.customerid|qtr_id|msrp|quantityordered|productcode| status|              email|contactfirstname|ordernumber|      orderdate|customerid|dealsize|month_id|priceeach|
+---------------+-------+---------+--------------+--------------------+-------+------------+---------------+---------+-----------+------+----+---------------+-----------+-------+-------------------+----------------+-----------+---------------+----------+--------+--------+---------+
|              8|10993.5|    Japan|     Singapore|Dragon Souveniers...|   2003|Classic Cars|         Corrio|Singapore|         31|     3| 214|         

In [8]:
type(Join_DF)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<class 'awsglue.dynamicframe.DynamicFrame'>

## map

## spigot

## mergeDynamicFrame

# relationalize

## resolve_choice

source: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-python-samples-medicaid.html

In [5]:
medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog(
       database = "s3_database",
       table_name = "medicare_hospital_provider_csv")
medicare_dynamicframe.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- drg definition: string
|-- provider id: choice
|    |-- long
|    |-- string
|-- provider name: string
|-- provider street address: string
|-- provider city: string
|-- provider state: string
|-- provider zip code: long
|-- hospital referral region description: string
|-- total discharges: long
|-- average covered charges: string
|-- average total payments: string
|-- average medicare payments: string

In [8]:
medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
medicare_res.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- drg definition: string
|-- provider id: long
|-- provider name: string
|-- provider street address: string
|-- provider city: string
|-- provider state: string
|-- provider zip code: long
|-- hospital referral region description: string
|-- total discharges: long
|-- average covered charges: string
|-- average total payments: string
|-- average medicare payments: string

## split_fields

# Split_rows

## unbox

## Unnest

## write

In [9]:
glueContext.write_dynamic_frame.from_options(frame = Join_DF,
                                             connection_type="s3", 
                                             connection_options= {"path":"s3://hivedata-load/output/glue-new"},
                                             format="parquet")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<awsglue.dynamicframe.DynamicFrame object at 0x7f7e57f6e438>

In [30]:
glueContext.write_dynamic_frame.from_options(frame = Join_DF,
          connection_type = "s3",
          connection_options = {"path": "s3://hivedata-load/output/out"},
          format = "csv",
        format_options={'withHeader': True})

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<awsglue.dynamicframe.DynamicFrame object at 0x7f2fe270f9b0>

In [25]:
help(glueContext.write_dynamic_frame.from_options)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Help on method from_options in module awsglue.dynamicframe:

from_options(frame, connection_type, connection_options={}, format=None, format_options={}, transformation_ctx='') method of awsglue.dynamicframe.DynamicFrameWriter instance
    Creates a DynamicFrame with the specified connection and format.

### write Parquet Partition files

##### glueparquet files gave error, need to check later

#### Reasonglueparquet format not supported fordeveloper environment

glueContext.write_dynamic_frame.from_options(frame = Join_DF,
          connection_type = "s3",
          connection_options = {"path": "s3://hivedata-load/output/glueparquet-part",
                               "partitionKeys" : ["year_id"]
                               },
          format = "glueparquet",
          format_options = {"compression": "snappy"})

In [20]:
glueContext.write_dynamic_frame.from_options(frame = Join_DF,
          connection_type = "s3",
          connection_options = {"path": "s3://hivedata-load/output/parquet-part",
                               "partitionKeys" : ["year_id"]
                               },
          format = "parquet")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<awsglue.dynamicframe.DynamicFrame object at 0x7f7e57fbcb00>

### Create Dynamic Dataframe from s3

In [10]:
data1 = glueContext.create_dynamic_frame_from_options(connection_type="s3",
                                                      connection_options={"path":["s3://hivedata-load/ouput/glue-new"]},
                                                      format="parquet")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
data3 = glueContext.create_dynamic_frame_from_options(connection_type="s3",
                                                      connection_options={"path":["s3://hivedata-load/input/sales.csv"]},
                                                      format="csv",
                                                      format_options={'withHeader': True})

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
data3.toDF().show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

++
||
++
++

In [27]:
data1.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root

In [29]:
Join_DF.toDF().show(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+-------+---------+---------+--------------------+-------+------------+---------------+---------+-----------+------+----+---------------+-----------+-------+----------------+----------------+-----------+--------------+----------+--------+--------+---------+
|orderlinenumber|  sales|territory|     city|        customername|year_id| productline|contactlastname|  country|.customerid|qtr_id|msrp|quantityordered|productcode| status|           email|contactfirstname|ordernumber|     orderdate|customerid|dealsize|month_id|priceeach|
+---------------+-------+---------+---------+--------------------+-------+------------+---------------+---------+-----------+------+----+---------------+-----------+-------+----------------+----------------+-----------+--------------+----------+--------+--------+---------+
|              8|10993.5|    Japan|Singapore|Dragon Souveniers...|   2003|Classic Cars|         Corrio|Singapore|         31|     3| 214|             45|   S10_1949|Shipped|bmale

In [15]:
Join_DF.toDF().select('year_id').distinct().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+
|year_id|
+-------+
|   2004|
|   2003|
|   2005|
+-------+

### Relationalize

## Write into Redshift

In [6]:
glueContext.write_dynamic_frame.from_jdbc_conf(frame = Join_DF,
                                                   catalog_connection = "redshift-connection",
                                                   connection_options = {"dbtable": "sales_data", "database": "dev"},
                                                   redshift_tmp_dir = "s3://glue-sample-target/temp-dir/")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
An error occurred while calling o40.getJDBCSink.
: java.lang.ClassNotFoundException: com.amazon.redshift.jdbc41.Driver
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:264)
	at com.amazonaws.services.glue.util.RedshiftUtils$.loadDriver(JDBCUtils.scala:644)
	at com.amazonaws.services.glue.util.JDBCWrapper.getRawConnection(JDBCUtils.scala:678)
	at com.amazonaws.services.glue.RedshiftDataSink.<init>(RedshiftDataSink.scala:40)
	at com.amazonaws.services.glue.GlueContext.getSink(GlueContext.scala:650)
	at com.amazonaws.services.glue.GlueContext.getJDBCSink(GlueContext.scala:463)
	at com.amazonaws.services.glue.GlueContext.getJDBCSink(GlueContext.scala:445)
	at sun.reflect.NativeMethodAccesso