In [10]:
# AWS Glue Studio Notebook
#### Packages


In [31]:
#!pip install pyspark --upgrade
from pyspark.sql.functions import *
from awsglue.dynamicframe import DynamicFrame




In [1]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 2

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

#custom added function
from datetime import date
from pyspark.sql.functions import lit
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.0 
Current idle_timeout is 2800 minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 2
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::702434652276:role/AWSGlueServiceRole-Studio
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 2
Session ID: 4efd998b-a8ea-4a75-8f75-f464d72afada
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.0
--enable-glue-datacatalog true
Waiting for session 4efd9

#### extract  data from s3


In [2]:
import pyspark
print(pyspark.__version__)

3.1.1+amzn.0


In [9]:
# Get the current date as a string
current_date_str = date.today().strftime('%d-%m-%Y')

# Print the date string
print(current_date_str)

current_date_str = "27-04-2023"
print(current_date_str)

28-04-2023
27-04-2023


In [10]:
chatDynamicFrame = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": [f"s3://landing-zone-crc/chat/{current_date_str}.csv"]},
    format="csv",
    format_options={
        "withHeader": True,
    },
)
chatDf = chatDynamicFrame.toDF() 





In [11]:
p2pDynamicFrame = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": [f"s3://landing-zone-crc/p2p/{current_date_str}.csv"]},
    format="csv",
    format_options={
        "withHeader": True,
    },
)
p2pDf = p2pDynamicFrame.toDF() 




In [12]:
ipsDynamicFrame = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": [f"s3://landing-zone-crc/ips/{current_date_str}.csv"]},
    format="csv",
    format_options={
        "withHeader": True,
    },
)
ipsDf = ipsDynamicFrame.toDF() 




In [15]:
#p2pDf.show()

+---------+--------------+----------+----+--------------------+------------+----------+-------+------+----------------+----------------+---------------------+
|IPAddress|     Timestamp|SourceType|Port|                GUID|      Vendor|  UserName|Country|Region|             ISP|             Org|CategorizedAsExplicit|
+---------+--------------+----------+----+--------------------+------------+----------+-------+------+----------------+----------------+---------------------+
|  1.2.3.4|1/01/2023 0:00|         G|6346|12345EC22866504F8...|eMule v0.60d|Superdad38|     AU|     2|Telstra Internet|Telstra Internet|                    1|
|  1.2.3.6|1/01/2023 0:00|         G|6346|12345EC22866504F8...|eMule v0.60d|Superdad38|     AU|     2| AussieBroadband| AussieBroadband|                    1|
+---------+--------------+----------+----+--------------------+------------+----------+-------+------+----------------+----------------+---------------------+


#### Transformations


In [16]:
chatDf = chatDf.withColumn( 'chat' , lit(1) )




In [17]:
merged_df = chatDf.unionByName(p2pDf, allowMissingColumns=True)




In [18]:
merged_df = merged_df.join(ipsDf,merged_df.IPAddress ==  ipsDf.IPAddress,"left").drop(ipsDf.IPAddress)




In [19]:
#merged_df.show()

+---------+--------------+---------+----------+-----------+--------+-------+-------+------+----------------+----------------+----+----------+----+----+------+---------------------+-----+
|     Date|     Timestamp|IPAddress|    UserID|   UserName| Network|Channel|Country|Region|             ISP|             Org|chat|SourceType|Port|GUID|Vendor|CategorizedAsExplicit|Label|
+---------+--------------+---------+----------+-----------+--------+-------+-------+------+----------------+----------------+----+----------+----+----+------+---------------------+-----+
|1/01/2023|1/01/2023 0:00|  1.2.3.4|Superdad38|~superdad38|undernet|#incest|     AU|     2|Telstra Internet|Telstra Internet|   1|      null|null|null|  null|                 null|    A|
|4/01/2023|4/01/2023 0:00| 1.2.3.25|Superdad59|~superdad59|undernet|#incest|     AU|    23|Telstra Internet|Telstra Internet|   1|      null|null|null|  null|                 null| null|
|6/01/2023|6/01/2023 0:00| 1.2.3.28|Superdad62|~superdad62|undern

In [23]:
merged_df = merged_df.withColumn("date_column", to_date(split(col("Timestamp"), " ")[0], "d/MM/yyyy") )

#merged_df.printSchema()




In [25]:
merged_df = merged_df.withColumn("year", year("date_column"))
merged_df = merged_df.withColumn("month", month("date_column"))
merged_df = merged_df.withColumn("day", dayofmonth("date_column"))
#merged_df.show(1)




In [26]:
#merged_df.show(1)

+---------+--------------+---------+----------+-----------+--------+-------+-------+------+----------------+----------------+----+----------+----+----+------+---------------------+-----+-----------+----+-----+---+
|     Date|     Timestamp|IPAddress|    UserID|   UserName| Network|Channel|Country|Region|             ISP|             Org|chat|SourceType|Port|GUID|Vendor|CategorizedAsExplicit|Label|date_column|year|month|day|
+---------+--------------+---------+----------+-----------+--------+-------+-------+------+----------------+----------------+----+----------+----+----+------+---------------------+-----+-----------+----+-----+---+
|1/01/2023|1/01/2023 0:00|  1.2.3.4|Superdad38|~superdad38|undernet|#incest|     AU|     2|Telstra Internet|Telstra Internet|   1|      null|null|null|  null|                 null|    A| 2023-01-01|2023|    1|  1|
+---------+--------------+---------+----------+-----------+--------+-------+-------+------+----------------+----------------+----+----------+---

In [37]:
merged_df.write.partitionBy("year", "month", "day").mode("overwrite").parquet("s3://transformed-zone-crc/test/")




#### Example: Write the data in the DynamicFrame to a location in Amazon S3 and a table for it in the AWS Glue Data Catalog


In [None]:
# s3output = glueContext.getSink(
#   path="s3://bucket_name/folder_name",
#   connection_type="s3",
#   updateBehavior="UPDATE_IN_DATABASE",
#   partitionKeys=[],
#   compression="snappy",
#   enableUpdateCatalog=True,
#   transformation_ctx="s3output",
# )
# s3output.setCatalogInfo(
#   catalogDatabase="demo", catalogTableName="populations"
# )
# s3output.setFormat("glueparquet")
# s3output.writeFrame(DyF)