# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.2X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.4 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.2X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.2X
Number of Workers: 5
Session ID: 7421d826-4dbd-4da9-9123-1532c7381994
Applying the following default arguments:
--glue_kernel_version 1.0.4
--enable-glue-datacatalog true
Waiting for session 7421d826-4dbd-4da9-9123-1532c7381994 to get into ready status...
Session 7421d826-4dbd-4da9-9123-1532c7381994 has been created.



In [2]:
# Example: Read Parquet from S3

from pyspark.context import SparkContext
from awsglue.context import GlueContext

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

dyf = glueContext.create_dynamic_frame.from_options(
    connection_type = "s3", 
    connection_options = {"paths": ["s3://noturs/merged_file.parquet"]}, 
    format = "parquet"
)
#dyf.printSchema()




#### Example: Convert the DynamicFrame to a Spark DataFrame and display a sample of the data


In [8]:
df = dyf.toDF()
#df.show()



### Prepare Data

In [9]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, TimestampType
from pyspark.sql.types import StructType, StructField, StringType

# Split the 'coordinate' column into two separate columns 'x' and 'y'
df2 = df.withColumn('x', F.split(df['coordinate'], ',').getItem(0).cast('int')) \
       .withColumn('y', F.split(df['coordinate'], ',').getItem(1).cast('int'))

#Convert timestamp to timstamp data type
df2 = df2.withColumn("timestamp", df2["timestamp"].cast(TimestampType()))

# Show the updated DataFrame to verify the new columns
#df2.show()





### Bot Detection Code

In [11]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F

# Window spec to order by user and timestamp
windowSpec = Window.partitionBy("user").orderBy("timestamp")

# Add a column with the previous timestamp for each user
df2 = df2.withColumn("prev_timestamp", F.lag("timestamp").over(windowSpec))

# Calculate the time difference in seconds between the current and previous actions
df2 = df2.withColumn("time_diff", (F.unix_timestamp("timestamp") - F.unix_timestamp("prev_timestamp")))

# Identify actions that are exactly or nearly 5 minutes apart (300 seconds)
# Adjust the tolerance as needed to define "exactness"
tolerance = 2  # seconds
df2 = df2.withColumn("is_exact_timing", F.abs(F.col("time_diff") - 300) <= tolerance)





In [12]:
# Add columns with the previous x and y coordinates for each user
df2 = df2.withColumn("prev_x", F.lag("x").over(windowSpec))
df2 = df2.withColumn("prev_y", F.lag("y").over(windowSpec))

# Calculate the difference in coordinates between the current and previous actions
df2 = df2.withColumn("coord_diff", F.sqrt(F.pow(F.col("x") - F.col("prev_x"), 2) + F.pow(F.col("y") - F.col("prev_y"), 2)))

# Flag actions that occur at the same or nearly the same coordinates
# Adjust the threshold as needed based on your definition of "nearby"
coord_threshold = 3  # units
df2 = df2.withColumn("is_same_coord", F.col("coord_diff") <= coord_threshold)





In [13]:
# Filter to potential bot actions based on timing or coordinate precision
potential_bots = df2.filter(F.col("is_exact_timing") & F.col("is_same_coord"))

# Group by user to count potential bot actions
bot_counts = potential_bots.groupBy("user").agg(F.count("*").alias("num_bot_actions"))

# Optionally, filter users based on a threshold of bot-like actions
# This threshold can be adjusted based on your analysis
threshold = 5  # Example threshold
suspected_bots = bot_counts.filter(F.col("num_bot_actions") > threshold)

#suspected_bots.show()





In [14]:
# Join the suspected bots DataFrame with the original DataFrame to filter actions by bots
bot_actions = df2.join(suspected_bots, "user").select(df2["user"], "pixel_color", "x", "y")

# Select only the necessary columns before converting to Pandas DataFrame
bot_actions_selected = bot_actions.select("pixel_color", "x", "y")




#### Convert back to dynamic frame and then Use glue to write parquet


In [17]:
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame

from awsglue.dynamicframe import DynamicFrame

# Convert the coalesced Spark DataFrame to a Glue DynamicFrame
dynamic_frame = DynamicFrame.fromDF(bot_actions_selected, glueContext, "dynamic_frame")

# Write the DynamicFrame to S3
glueContext.write_dynamic_frame.from_options(
    frame = dynamic_frame,
    connection_type = "s3",
    connection_options = {"path": "s3://bpkbbucket/last_minute/"},
    format = "parquet"  # Or your desired format
)


<awsglue.dynamicframe.DynamicFrame object at 0x7f16fd340580>
