# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [2]:
%help

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.5 



# Available Magic Commands

## Sessions Magic

----
    %help                             Return a list of descriptions and input types for all magic commands. 
    %profile            String        Specify a profile in your aws configuration to use as the credentials provider.
    %region             String        Specify the AWS region in which to initialize a session. 
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\ USERNAME \.aws\config" on Windows.
    %idle_timeout       Int           The number of minutes of inactivity after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %timeout            Int           The number of minutes after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %session_id_prefix  String        Define a String that will precede all session IDs in the format 
                                      [session_id_prefix]-[session_id]. If a session ID is not provided,
                                      a random UUID will be generated.
    %status                           Returns the status of the current Glue session including its duration, 
                                      configuration and executing user / role.
    %session_id                       Returns the session ID for the running session.
    %list_sessions                    Lists all currently running sessions by ID.
    %stop_session                     Stops the current session.
    %glue_version       String        The version of Glue to be used by this session. 
                                      Currently, the only valid options are 2.0, 3.0 and 4.0. 
                                      Default: 2.0.
    %reconnect          String        Specify a live session ID to switch/reconnect to the sessions.
----

## Selecting Session Types

----
    %streaming          String        Sets the session type to Glue Streaming.
    %etl                String        Sets the session type to Glue ETL.
    %glue_ray           String        Sets the session type to Glue Ray.
    %session_type       String        Specify a session_type to be used. Supported values: streaming, etl and glue_ray. 
----

## Glue Config Magic 
*(common across all session types)*

----

    %%configure         Dictionary    A json-formatted dictionary consisting of all configuration parameters for 
                                      a session. Each parameter can be specified here or through individual magics.
    %iam_role           String        Specify an IAM role ARN to execute your session with.
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\%USERNAME%\.aws\config` on Windows.
    %number_of_workers  int           The number of workers of a defined worker_type that are allocated 
                                      when a session runs.
                                      Default: 5.
    %additional_python_modules  List  Comma separated list of additional Python modules to include in your cluster 
                                      (can be from Pypi or S3).
    %%tags        Dictionary          Specify a json-formatted dictionary consisting of tags to use in the session.
    
    %%assume_role Dictionary, String  Specify a json-formatted dictionary or an IAM role ARN string to create a session 
                                      for cross account access.
                                      E.g. {valid arn}
                                      %%assume_role 
                                      'arn:aws:iam::XXXXXXXXXXXX:role/AWSGlueServiceRole' 
                                      E.g. {credentials}
                                      %%assume_role
                                      {
                                            "aws_access_key_id" : "XXXXXXXXXXXX",
                                            "aws_secret_access_key" : "XXXXXXXXXXXX",
                                            "aws_session_token" : "XXXXXXXXXXXX"
                                       }
----

                                      
## Magic for Spark Sessions (ETL & Streaming)

----
    %worker_type        String        Set the type of instances the session will use as workers. 
    %connections        List          Specify a comma separated list of connections to use in the session.
    %extra_py_files     List          Comma separated list of additional Python files From S3.
    %extra_jars         List          Comma separated list of additional Jars to include in the cluster.
    %spark_conf         String        Specify custom spark configurations for your session. 
                                      E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer
----
                                      
## Magic for Ray Session

----
    %min_workers        Int           The minimum number of workers that are allocated to a Ray session. 
                                      Default: 1.
    %object_memory_head Int           The percentage of free memory on the instance head node after a warm start. 
                                      Minimum: 0. Maximum: 100.
    %object_memory_worker Int         The percentage of free memory on the instance worker nodes after a warm start. 
                                      Minimum: 0. Maximum: 100.
----

## Action Magic

----

    %%sql               String        Run SQL code. All lines after the initial %%sql magic will be passed
                                      as part of the SQL code.  
    %matplot      Matplotlib figure   Visualize your data using the matplotlib library.
                                      E.g. 
                                      import matplotlib.pyplot as plt
                                      # Set X-axis and Y-axis values
                                      x = [5, 2, 8, 4, 9]
                                      y = [10, 4, 8, 5, 2]
                                      # Create a bar chart 
                                      plt.bar(x, y) 
                                      # Show the plot
                                      %matplot plt    
    %plotly            Plotly figure  Visualize your data using the plotly library.
                                      E.g.
                                      import plotly.express as px
                                      #Create a graphical figure
                                      fig = px.line(x=["a","b","c"], y=[1,3,2], title="sample figure")
                                      #Show the figure
                                      %plotly fig

  
                
----



####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: edb5f650-cd81-4883-b4fc-d72d58788f7e
Applying the following default arguments:
--glue_kernel_version 1.0.5
--enable-glue-datacatalog true
Waiting for session edb5f650-cd81-4883-b4fc-d72d58788f7e to get into ready status...
Session edb5f650-cd81-4883-b4fc-d72d58788f7e has been created.



## Data Transformations

In [2]:
s3_path = "s3://western-shield/batch/"

data_frame = glueContext.create_dynamic_frame.from_options(
    's3',
    {'paths': [s3_path]},
    'csv',
    {'withHeader': True}
)




In [3]:
data_frame.show(5)

{"@timestamp": "May 9, 2024 @ 12:00:00.000", "type": "P0f", "dest_port": "28015", "geoip.country_code3": "US", "geoip.city_name": "-", "geoip.country_name": "United States", "geoip.ip": "199.45.155.25", "geoip.latitude": "37.75", "geoip.longitude": "-97.812"}
{"@timestamp": "May 9, 2024 @ 12:00:00.000", "type": "P0f", "dest_port": "28015", "geoip.country_code3": "US", "geoip.city_name": "-", "geoip.country_name": "United States", "geoip.ip": "199.45.155.25", "geoip.latitude": "37.75", "geoip.longitude": "-97.812"}
{"@timestamp": "May 9, 2024 @ 12:00:00.000", "type": "P0f", "dest_port": "28015", "geoip.country_code3": "US", "geoip.city_name": "-", "geoip.country_name": "United States", "geoip.ip": "199.45.155.25", "geoip.latitude": "37.75", "geoip.longitude": "-97.812"}
{"@timestamp": "May 9, 2024 @ 12:00:00.000", "type": "P0f", "dest_port": "28015", "geoip.country_code3": "US", "geoip.city_name": "-", "geoip.country_name": "United States", "geoip.ip": "199.45.155.25", "geoip.latitude":

In [4]:
data_frame.printSchema()

root
|-- @timestamp: string
|-- type: string
|-- geoip.country_code2: string
|-- dest_port: string
|-- geoip.city_name: string
|-- geoip.country_name: string
|-- geoip.ip: string
|-- geoip.latitude: string
|-- geoip.longitude: string
|-- geoip.country_code3: string


In [5]:
#convert from dynamic frame to pyspark dataframe
data_frame = data_frame.toDF()



In [6]:
data_frame.printSchema()

root
 |-- @timestamp: string (nullable = true)
 |-- type: string (nullable = true)
 |-- geoip.country_code2: string (nullable = true)
 |-- dest_port: string (nullable = true)
 |-- geoip.city_name: string (nullable = true)
 |-- geoip.country_name: string (nullable = true)
 |-- geoip.ip: string (nullable = true)
 |-- geoip.latitude: string (nullable = true)
 |-- geoip.longitude: string (nullable = true)
 |-- geoip.country_code3: string (nullable = true)


In [7]:
#get rows where geoip.country_name is not null
data_frame.where("'geoip.country_name' is NOT NULL").show(5)

+--------------------+----+-------------------+---------+---------------+------------------+-------------+--------------+---------------+-------------------+
|          @timestamp|type|geoip.country_code2|dest_port|geoip.city_name|geoip.country_name|     geoip.ip|geoip.latitude|geoip.longitude|geoip.country_code3|
+--------------------+----+-------------------+---------+---------------+------------------+-------------+--------------+---------------+-------------------+
|May 9, 2024 @ 12:...| P0f|               null|    28015|              -|     United States|199.45.155.25|         37.75|        -97.812|                 US|
|May 9, 2024 @ 12:...| P0f|               null|    28015|              -|     United States|199.45.155.25|         37.75|        -97.812|                 US|
|May 9, 2024 @ 12:...| P0f|               null|    28015|              -|     United States|199.45.155.25|         37.75|        -97.812|                 US|
|May 9, 2024 @ 12:...| P0f|               null|    2

In [8]:
from pyspark.sql.utils import AnalysisException

try:
    data_frame.select("geoip.city_name").show(5)
except AnalysisException:
    print("Having trouble reading columns with . in them!!")

Having trouble reading columns with . in them!!


Having issue selecting columns with a . in their name therefore we are going to rename the columns so that we can access them. 

In [9]:
#rename cols
old_cols = data_frame.schema.names
new_cols = []
for col in old_cols:
    new_cols.append(col.replace("geoip.", ""))




In [10]:
#rename columns
from functools import reduce

data_frame = reduce(lambda data_frame, idx: data_frame.withColumnRenamed(old_cols[idx], new_cols[idx]), range(len(old_cols)), data_frame)
data_frame.printSchema()

root
 |-- @timestamp: string (nullable = true)
 |-- type: string (nullable = true)
 |-- country_code2: string (nullable = true)
 |-- dest_port: string (nullable = true)
 |-- city_name: string (nullable = true)
 |-- country_name: string (nullable = true)
 |-- ip: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- country_code3: string (nullable = true)


In [11]:
#now we can select columns
data_frame.select("country_code3").show(5)

+-------------+
|country_code3|
+-------------+
|           US|
|           US|
|           US|
|           US|
|           US|
+-------------+
only showing top 5 rows


In [12]:
data_frame.where("'country_name' is NOT NULL").select(["country_name", "country_code2", "country_code3"]).show()

+--------------+-------------+-------------+
|  country_name|country_code2|country_code3|
+--------------+-------------+-------------+
| United States|         null|           US|
| United States|         null|           US|
| United States|         null|           US|
| United States|         null|           US|
| United States|         null|           US|
| United States|         null|           US|
| United States|         null|           US|
|             -|         null|            -|
|             -|         null|            -|
|             -|         null|            -|
|             -|         null|            -|
|             -|         null|            -|
|United Kingdom|         null|           GB|
|             -|         null|            -|
|             -|         null|            -|
|             -|         null|            -|
|United Kingdom|         null|           GB|
|United Kingdom|         null|           GB|
|             -|         null|            -|
|         

In [13]:
from pyspark.sql.functions import col




In [14]:
#select rows where country_name is null
data_frame.where(col('country_name').isNull()).show()

+----------+----+-------------+---------+---------+------------+---+--------+---------+-------------+
|@timestamp|type|country_code2|dest_port|city_name|country_name| ip|latitude|longitude|country_code3|
+----------+----+-------------+---------+---------+------------+---+--------+---------+-------------+
+----------+----+-------------+---------+---------+------------+---+--------+---------+-------------+


As we can see above when we select rows where country_name is null we get no rows when we know for a fact that there are rows that are empty. In order to do this we must convert rows that are empty from '-' to null.

In [15]:
#replace for empty strings
from pyspark.sql.functions import when, lit

def replace(column, value):
    return when(column != value, column).otherwise(lit(None))

#replace dash - with null
data_frame = data_frame.withColumn("country_name", replace(col("country_name"), "-"))




In [16]:
#now we have null values in our dash strings
data_frame.where(col('country_name').isNull()).show()

+--------------------+--------+-------------+---------+---------+------------+---+--------+---------+-------------+
|          @timestamp|    type|country_code2|dest_port|city_name|country_name| ip|latitude|longitude|country_code3|
+--------------------+--------+-------------+---------+---------+------------+---+--------+---------+-------------+
|May 9, 2024 @ 11:...|Suricata|         null|      443|        -|        null|  -|       -|        -|            -|
|May 9, 2024 @ 11:...|Suricata|         null|      443|        -|        null|  -|       -|        -|            -|
|May 9, 2024 @ 11:...|Suricata|         null|      443|        -|        null|  -|       -|        -|            -|
|May 9, 2024 @ 11:...|Suricata|         null|      443|        -|        null|  -|       -|        -|            -|
|May 9, 2024 @ 11:...|Suricata|         null|      547|        -|        null|  -|       -|        -|            -|
|May 9, 2024 @ 11:...|Suricata|         null|      443|        -|       

In [17]:
data_frame.groupBy('city_name').count().orderBy(col('count').desc()).show()

+-----------------+-------+
|        city_name|  count|
+-----------------+-------+
|                -|3307743|
|           London| 512968|
| North Charleston| 472719|
|     Cedar Knolls| 143898|
|        Amsterdam| 104848|
|         Hangzhou|  99665|
|           Moroni|  97943|
|            Paris|  83301|
|          Fremont|  79028|
|            Cairo|  73472|
|           Grozny|  64791|
|      La Libertad|  57400|
| Ho Chi Minh City|  56652|
|         Shanghai|  54883|
|         New York|  50380|
|         Shenzhen|  50205|
|           Lahore|  47379|
|    San Francisco|  43513|
|     North Bergen|  37462|
|Frankfurt am Main|  37340|
+-----------------+-------+
only showing top 20 rows


We can see above that the column 'city_name' is another column where there is a large amount of values with '-' in them. These should be null values.

In [18]:
#getting all values that have - in them
null_cols = []
for col_name in data_frame.schema.names:
    temp_df = data_frame.groupBy(col_name).count()
    temp_list = temp_df.select(col_name).rdd.flatMap(lambda x: x).collect()
    if '-' in temp_list:
        null_cols.append(col_name)




In [19]:
null_cols

['country_code2', 'dest_port', 'city_name', 'ip', 'latitude', 'longitude', 'country_code3']


In [20]:
#for each of those columns we are going to fill with null values
for col_name in null_cols:
    data_frame = data_frame.withColumn(col_name, replace(col(col_name), "-"))




In [21]:
#check to see if we have dashes
data_frame.where(col('latitude') == '-').show()

+----------+----+-------------+---------+---------+------------+---+--------+---------+-------------+
|@timestamp|type|country_code2|dest_port|city_name|country_name| ip|latitude|longitude|country_code3|
+----------+----+-------------+---------+---------+------------+---+--------+---------+-------------+
+----------+----+-------------+---------+---------+------------+---+--------+---------+-------------+


In [22]:
#now we have null values in our dash strings
data_frame.where(col('latitude').isNull()).show()

+--------------------+--------+-------------+---------+---------+------------+----+--------+---------+-------------+
|          @timestamp|    type|country_code2|dest_port|city_name|country_name|  ip|latitude|longitude|country_code3|
+--------------------+--------+-------------+---------+---------+------------+----+--------+---------+-------------+
|May 9, 2024 @ 11:...|Suricata|         null|      443|     null|        null|null|    null|     null|         null|
|May 9, 2024 @ 11:...|Suricata|         null|      443|     null|        null|null|    null|     null|         null|
|May 9, 2024 @ 11:...|Suricata|         null|      443|     null|        null|null|    null|     null|         null|
|May 9, 2024 @ 11:...|Suricata|         null|      443|     null|        null|null|    null|     null|         null|
|May 9, 2024 @ 11:...|Suricata|         null|      547|     null|        null|null|    null|     null|         null|
|May 9, 2024 @ 11:...|Suricata|         null|      443|     null

In [23]:
null_count_dic = { col_name: data_frame.where(col(col_name).isNull()).count() for col_name in data_frame.schema.names }




In [24]:
null_count_dic

{'@timestamp': 0, 'type': 0, 'country_code2': 1831198, 'dest_port': 78421, 'city_name': 3307743, 'country_name': 1402550, 'ip': 1402111, 'latitude': 1402550, 'longitude': 1402550, 'country_code3': 6054922}


In [25]:
#lets take a look at columns that have zero nulls to make sure there arent any nulls for sure
data_frame.groupBy('@timestamp').count().orderBy(col('count').desc()).show()

+--------------------+-----+
|          @timestamp|count|
+--------------------+-----+
|May 18, 2024 @ 14...| 1571|
|May 17, 2024 @ 17...| 1376|
|May 17, 2024 @ 17...|  937|
|May 18, 2024 @ 14...|  748|
|May 9, 2024 @ 22:...|  730|
|May 9, 2024 @ 22:...|  452|
|May 14, 2024 @ 03...|  390|
|May 9, 2024 @ 22:...|  338|
|May 13, 2024 @ 13...|  206|
|May 19, 2024 @ 03...|  192|
|May 19, 2024 @ 03...|  172|
|May 19, 2024 @ 03...|  164|
|May 17, 2024 @ 16...|  162|
|May 9, 2024 @ 14:...|  161|
|May 9, 2024 @ 14:...|  157|
|May 17, 2024 @ 16...|  153|
|May 19, 2024 @ 18...|  152|
|May 18, 2024 @ 03...|  148|
|May 18, 2024 @ 03...|  146|
|May 18, 2024 @ 03...|  142|
+--------------------+-----+
only showing top 20 rows


In [26]:
data_frame.groupBy('type').count().orderBy(col('count').desc()).show()

+--------------+-------+
|          type|  count|
+--------------+-------+
|           P0f|3113646|
|      Suricata|2878781|
|     Honeytrap| 143528|
|          Fatt| 136085|
|       Dionaea|  83896|
|        Cowrie|  57804|
|         NGINX|  28726|
|     Heralding|  23554|
|      Ciscoasa|   4582|
|        Tanner|   4182|
|CitrixHoneypot|   2939|
|      Mailoney|   1800|
| Redishoneypot|   1653|
|        ConPot|    957|
|      Adbhoney|    700|
|    Sentrypeer|    338|
|    ElasticPot|    206|
|      Dicompot|    137|
|      Ipphoney|     49|
|       ssh-rsa|      7|
+--------------+-------+


After manually checking the columns with 0 null values it does seem to fit that these values are not null. The type column has a large amount of 'P0f' values, I would need to get a better understanding of this value to understand if this could be classified as a null value or not.

The next task in our data transformations would be to tackle the country_name, country_code2 and country_code3 columns since these seem to be columns that should gather the same information. The country_name column is the column with least null values so maybe we can insert some of that information into the country_code2 and country_code2 columns.

In [27]:
data_frame.where((col('country_name').isNotNull()) & (col('country_code2').isNull())).count()

428648


In [28]:
data_frame.where((col('country_name').isNotNull()) & (col('country_code2').isNull())).show(10)

+--------------------+---------+-------------+---------+---------+--------------+-------------+--------+---------+-------------+
|          @timestamp|     type|country_code2|dest_port|city_name|  country_name|           ip|latitude|longitude|country_code3|
+--------------------+---------+-------------+---------+---------+--------------+-------------+--------+---------+-------------+
|May 9, 2024 @ 12:...|      P0f|         null|    28015|     null| United States|199.45.155.25|   37.75|  -97.812|           US|
|May 9, 2024 @ 12:...|      P0f|         null|    28015|     null| United States|199.45.155.25|   37.75|  -97.812|           US|
|May 9, 2024 @ 12:...|      P0f|         null|    28015|     null| United States|199.45.155.25|   37.75|  -97.812|           US|
|May 9, 2024 @ 12:...|      P0f|         null|    28015|     null| United States|199.45.155.25|   37.75|  -97.812|           US|
|May 9, 2024 @ 12:...|      P0f|         null|    28015|     null| United States|199.45.155.25|  

In [29]:
data_frame.where((col('country_name').isNotNull()) & (col('country_code3').isNull())).count()

4652372


In [30]:
data_frame.where((col('country_name').isNotNull()) & (col('country_code3').isNull())).show(10)

+--------------------+---------+-------------+---------+----------------+--------------+---------------+--------+---------+-------------+
|          @timestamp|     type|country_code2|dest_port|       city_name|  country_name|             ip|latitude|longitude|country_code3|
+--------------------+---------+-------------+---------+----------------+--------------+---------------+--------+---------+-------------+
|May 14, 2024 @ 05...| Suricata|           US|    20358|North Charleston| United States|162.216.150.213|  32.875|      -80|         null|
|May 14, 2024 @ 05...| Suricata|           US|    20358|North Charleston| United States|162.216.150.213|  32.875|      -80|         null|
|May 14, 2024 @ 05...|      P0f|           GB|    49691|            null|United Kingdom| 87.236.176.161|    51.5|   -0.122|         null|
|May 14, 2024 @ 05...|      P0f|           GB|    49691|            null|United Kingdom| 87.236.176.161|    51.5|   -0.122|         null|
|May 14, 2024 @ 05...|Honeytrap|  

In [31]:
data_frame.where((col('country_code2').isNotNull()) & (col('country_code3').isNotNull())).count()

0


In [32]:
#attempt at combining country_code columns
from pyspark.sql.functions import udf

def combine(col_one, col_two):
    if col_one is None and col_two is None:
        return None
    elif col_one is not None:
        return col_one
    else:
        return col_two
    
combine_udf = udf(combine)

columns = [c for c in data_frame.schema.names] + [combine_udf(col("country_code2"), col("country_code3")).alias("country_code_combined")]




In [33]:
data_frame = data_frame.select(*columns)




In [34]:
data_frame.show()

+--------------------+---------+-------------+---------+---------+--------------+-------------+--------+---------+-------------+---------------------+
|          @timestamp|     type|country_code2|dest_port|city_name|  country_name|           ip|latitude|longitude|country_code3|country_code_combined|
+--------------------+---------+-------------+---------+---------+--------------+-------------+--------+---------+-------------+---------------------+
|May 9, 2024 @ 12:...|      P0f|         null|    28015|     null| United States|199.45.155.25|   37.75|  -97.812|           US|                   US|
|May 9, 2024 @ 12:...|      P0f|         null|    28015|     null| United States|199.45.155.25|   37.75|  -97.812|           US|                   US|
|May 9, 2024 @ 12:...|      P0f|         null|    28015|     null| United States|199.45.155.25|   37.75|  -97.812|           US|                   US|
|May 9, 2024 @ 12:...|      P0f|         null|    28015|     null| United States|199.45.155.25

In [35]:
data_frame.where(col('country_code_combined').isNull()).count()

1402550


Notice that the number of nulls above here is basically the same as the number of nulls in the country name column. Lets make sure these are all the same in order to prove the redundancy of the country code column.

In [36]:
data_frame.where((col('country_name').isNotNull()) & (col('country_code_combined').isNull())).count()

0


In [38]:
data_frame.where((col('country_name').isNull()) & (col('country_code_combined').isNotNull())).count()

0


The zeros above prove the redundancy in the country code columns, we can simply just keep the country name column due to the fact that if we have the country name column then that means that we have the country code. Therefore we can just keep the country name column. 

#### Example: Write the data in the DynamicFrame to a location in Amazon S3 and a table for it in the AWS Glue Data Catalog


In [10]:
# s3output = glueContext.getSink(
#   path="s3://bucket_name/folder_name",
#   connection_type="s3",
#   updateBehavior="UPDATE_IN_DATABASE",
#   partitionKeys=[],
#   compression="snappy",
#   enableUpdateCatalog=True,
#   transformation_ctx="s3output",
# )
# s3output.setCatalogInfo(
#   catalogDatabase="demo", catalogTableName="populations"
# )
# s3output.setFormat("glueparquet")
# s3output.writeFrame(DyF)