In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from awsglue.dynamicframe import DynamicFrame

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

import boto3
from botocore.exceptions import ClientError

# Glue

In [2]:
sys.argv +=["--JOB_NAME", "TEST_JOB"]
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
args

{'job_bookmark_option': 'job-bookmark-disable',
 'job_bookmark_from': None,
 'job_bookmark_to': None,
 'JOB_ID': None,
 'JOB_RUN_ID': None,
 'SECURITY_CONFIGURATION': None,
 'encryption_type': None,
 'enable_data_lineage': None,
 'RedshiftTempDir': None,
 'TempDir': None,
 'JOB_NAME': 'TEST_JOB'}

In [3]:
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

In [8]:
data = glueContext.create_dynamic_frame.from_catalog(
             database="test-s3",
             table_name="test-s3-table")

KeyboardInterrupt: 

In [6]:
data = DropFields.apply(data, paths=['col6'])

NameError: name 'data' is not defined

In [6]:
data.printSchema()

root
|-- col0: string
|-- col1: string
|-- col2: string
|-- col3: string
|-- col4: string
|-- col5: string



In [7]:
data = ApplyMapping.apply( frame = data, mappings = [ 
  ("col0","string","id","integer"), 
  ("col1","string","sepal_length","double"),
  ("col2","string","sepal_width","double"),
  ("col3","string","petal_length","double"),
  ("col4","string","petal_width","double"),
  ("col5","string","variety","string"),
])

In [8]:
data.printSchema()

root
|-- id: int
|-- sepal_length: double
|-- sepal_width: double
|-- petal_length: double
|-- petal_width: double
|-- variety: string



In [9]:
data = Filter.apply(frame = data, f=lambda x: x['variety'] != ' variety ')

In [10]:
glueContext.write_dynamic_frame.from_options(data, 
                                             connection_type = "s3", 
                                             connection_options = {"path": "s3://test-353006047186/final"}, 
                                             format = 'json')

<awsglue.dynamicframe.DynamicFrame at 0x7fe7bc8750f0>

In [11]:
data_dynamodb = Filter.apply(frame = data, f=lambda x: x['variety'] == 'Setosa')

In [12]:
#glueContext.write_dynamic_frame.from_options(data_dynamodb, connection_type = "dynamodb", 
#                                             connection_options = {"dynamodb.output.tableName": "test-table"}, 
#                                             )

# PySpark - DataFrames

In [13]:
#spark = SparkSession.builder.appName("test").getOrCreate()

In [14]:
df = data.toDF()

In [15]:
df.show(2)

+--------+------------+------------+-----------+-----------+---+
| variety|sepal_length|petal_length|petal_width|sepal_width| id|
+--------+------------+------------+-----------+-----------+---+
| Setosa |         5.1|         1.4|        0.2|        3.5|  1|
| Setosa |         4.9|         1.4|        0.2|        3.0|  2|
+--------+------------+------------+-----------+-----------+---+
only showing top 2 rows



In [16]:
df.withColumn('sepal_sum',df['sepal_length']+df['sepal_width']).show(2)

+--------+------------+------------+-----------+-----------+---+---------+
| variety|sepal_length|petal_length|petal_width|sepal_width| id|sepal_sum|
+--------+------------+------------+-----------+-----------+---+---------+
| Setosa |         5.1|         1.4|        0.2|        3.5|  1|      8.6|
| Setosa |         4.9|         1.4|        0.2|        3.0|  2|      7.9|
+--------+------------+------------+-----------+-----------+---+---------+
only showing top 2 rows



In [17]:
df.filter( (df["sepal_length"] > 5) & (df['petal_width'] < 0.2) ).show()

+-------+------------+------------+-----------+-----------+---+
|variety|sepal_length|petal_length|petal_width|sepal_width| id|
+-------+------------+------------+-----------+-----------+---+
| Setosa|         5.2|         1.5|        0.1|        4.1| 33|
+-------+------------+------------+-----------+-----------+---+



In [18]:
df.collect()[0].asDict()

{'variety': ' Setosa ',
 'sepal_length': 5.1,
 'petal_length': 1.4,
 'petal_width': 0.2,
 'sepal_width': 3.5,
 'id': 1}

# PySpark - SQL

In [19]:
df.createOrReplaceTempView("iris")

In [20]:
spark.sql("SELECT * FROM iris WHERE sepal_length>5").show(2)

+--------+------------+------------+-----------+-----------+---+
| variety|sepal_length|petal_length|petal_width|sepal_width| id|
+--------+------------+------------+-----------+-----------+---+
| Setosa |         5.1|         1.4|        0.2|        3.5|  1|
| Setosa |         5.4|         1.7|        0.4|        3.9|  6|
+--------+------------+------------+-----------+-----------+---+
only showing top 2 rows



# PySpark - GroupBy and Aggregate Functions

In [21]:
df.groupBy("variety").mean().show()

+----------+-----------------+------------------+-------------------+------------------+-------+
|   variety|avg(sepal_length)| avg(petal_length)|   avg(petal_width)|  avg(sepal_width)|avg(id)|
+----------+-----------------+------------------+-------------------+------------------+-------+
|   Setosa |5.031578947368421| 1.431578947368421|0.23157894736842108|3.4631578947368418|   10.0|
| Virginica|6.587999999999998|             5.552|              2.026|2.9739999999999998|  125.5|
|    Setosa|4.990322580645163|1.4806451612903222| 0.2548387096774194| 3.406451612903226|   35.0|
|Versicolor|            5.936|              4.26| 1.3259999999999998|2.7700000000000005|   75.5|
+----------+-----------------+------------------+-------------------+------------------+-------+



In [22]:
df.groupBy("variety").sum().show()

+----------+------------------+------------------+------------------+------------------+-------+
|   variety| sum(sepal_length)| sum(petal_length)|  sum(petal_width)|  sum(sepal_width)|sum(id)|
+----------+------------------+------------------+------------------+------------------+-------+
|   Setosa |              95.6|              27.2|               4.4|              65.8|    190|
| Virginica| 329.3999999999999|277.59999999999997|101.29999999999998|             148.7|   6275|
|    Setosa|154.70000000000005| 45.89999999999999| 7.900000000000001|105.60000000000001|   1085|
|Versicolor|             296.8|212.99999999999997|              66.3|138.50000000000003|   3775|
+----------+------------------+------------------+------------------+------------------+-------+



In [23]:
df.agg({'sepal_length':'max'}).show()

+-----------------+
|max(sepal_length)|
+-----------------+
|              7.9|
+-----------------+



In [24]:
df.select(countDistinct("variety")).show()

+-----------------------+
|count(DISTINCT variety)|
+-----------------------+
|                      4|
+-----------------------+



In [25]:
df.select(avg('sepal_length')).show()

+-----------------+
|avg(sepal_length)|
+-----------------+
|5.843333333333335|
+-----------------+



In [26]:
df.select(stddev("sepal_length")).show()

+-------------------------+
|stddev_samp(sepal_length)|
+-------------------------+
|       0.8280661279778637|
+-------------------------+



In [27]:
df.orderBy("sepal_length").show(2)

+--------+------------+------------+-----------+-----------+---+
| variety|sepal_length|petal_length|petal_width|sepal_width| id|
+--------+------------+------------+-----------+-----------+---+
| Setosa |         4.3|         1.1|        0.1|        3.0| 14|
|  Setosa|         4.4|         1.3|        0.2|        3.2| 43|
+--------+------------+------------+-----------+-----------+---+
only showing top 2 rows



# PySpark - DataFrame

In [28]:
df.na.drop().show(2)

+--------+------------+------------+-----------+-----------+---+
| variety|sepal_length|petal_length|petal_width|sepal_width| id|
+--------+------------+------------+-----------+-----------+---+
| Setosa |         5.1|         1.4|        0.2|        3.5|  1|
| Setosa |         4.9|         1.4|        0.2|        3.0|  2|
+--------+------------+------------+-----------+-----------+---+
only showing top 2 rows



In [148]:
df.na.fill('NEW VALUE',subset=['variety']).show(2)

+--------+------------+------------+-----------+-----------+---+
| variety|sepal_length|petal_length|petal_width|sepal_width| id|
+--------+------------+------------+-----------+-----------+---+
| Setosa |         5.1|         1.4|        0.2|        3.5|  1|
| Setosa |         4.9|         1.4|        0.2|        3.0|  2|
+--------+------------+------------+-----------+-----------+---+
only showing top 2 rows



In [149]:
mean_val = df.select(mean(df['sepal_length'])).collect()[0][0]

In [150]:
df.na.fill(mean_val, subset=['sepal_length']).show(2)

+--------+------------+------------+-----------+-----------+---+
| variety|sepal_length|petal_length|petal_width|sepal_width| id|
+--------+------------+------------+-----------+-----------+---+
| Setosa |         5.1|         1.4|        0.2|        3.5|  1|
| Setosa |         4.9|         1.4|        0.2|        3.0|  2|
+--------+------------+------------+-----------+-----------+---+
only showing top 2 rows



# PySpark - Dates and Timestamps

In [151]:
df1 = df.withColumn("date", date_format(current_timestamp(), 'yyyy-MM-dd : hh-mm-ss'))

In [152]:
df1.select(dayofmonth(df1['date'])).show(2)

+----------------+
|dayofmonth(date)|
+----------------+
|               7|
|               7|
+----------------+
only showing top 2 rows



In [153]:
df1.select(year(df1['date'])).show(2)

+----------+
|year(date)|
+----------+
|      2022|
|      2022|
+----------+
only showing top 2 rows



In [154]:
df1.withColumn("year",year(df1['date'])).groupBy("Year").mean().show(2)

+----+-----------------+------------------+-----------------+-----------------+-------+---------+
|Year|avg(sepal_length)| avg(petal_length)| avg(petal_width)| avg(sepal_width)|avg(id)|avg(year)|
+----+-----------------+------------------+-----------------+-----------------+-------+---------+
|2022|5.843333333333335|3.7580000000000027|1.199333333333334|3.057333333333334|   75.5|   2022.0|
+----+-----------------+------------------+-----------------+-----------------+-------+---------+



In [155]:
spark.getActiveSession()

AttributeError: 'SparkSession' object has no attribute 'getActiveSession'

In [104]:
#spark.stop()

In [117]:
job.commit()