In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
from pyspark.sql.functions import col
from pyspark.sql.functions import cast
from pyspark.sql.types import LongType
import boto3

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

#Copying the filename

session = boto3.Session()
s3 = session.resource('s3')
client = boto3.client('s3')
response = client.list_objects(Bucket = 'practiseons3')
filenames = []
for i in range(len(response["Contents"])):
    if response['Contents'][i]['Key'].__contains__("Archive/"):
        continue
    filenames.append(response['Contents'][i]['Key'])

# Define the S3 path
s3_path = f"s3://practiseons3/{filenames[0]}"
# df = glueContext.create_dynamic_frame_from_options(
#     connection_type="s3",
#     connection_options={"paths": [s3_path]},
#     format="json", transformation_ctx = "test")
# df.printSchema()

spark_df=spark.read.json(s3_path)

# Read data from DynamoDB using GluedDataFrame
dynamodb_df = glueContext.create_dynamic_frame.from_options(
    connection_type="dynamodb",
    connection_options={"dynamodb.input.tableName": "Financial_Data",
        "dynamodb.throughput.read.percent": "1.0",
        "dynamodb.splits": "100"
    }
)

logger  = glueContext.get_logger()

# logger.info("Changing the glue data frame to spark")
# spark_df = df.toDF()

logger.info("Covert the complaint_id into the Integer format")
spark_df = spark_df.withColumn("complaint_id", col("complaint_id").cast(LongType()))
spark_df.show(5)
spark_df.printSchema()

logger.info("Changing the dynamoDB glue data frame to spark")
spark_dynamodb_df = dynamodb_df.toDF()
spark_dynamodb_df.show(5)
spark_dynamodb_df.printSchema()

new_spark_df = None
logger.info("Joining the two dataframes on complaint ID")
if spark_dynamodb_df.isEmpty():
    new_spark_df = spark_df.coalesce(10)
else:
    new_dynamodb_df = spark_dynamodb_df.select('complaint_id').withColumnRenamed('complaint_id', 'complaint_id_dynamodb')
    new_spark_df = spark_df.join(new_dynamodb_df,
                            spark_df['complaint_id'] == new_dynamodb_df['complaint_id_dynamodb'], 
                            how = 'left').where(new_dynamodb_df['complaint_id_dynamodb'].isNull())
    new_spark_df.drop('complaint_id_dynamodb')
    new_spark_df = new_spark_df.coalesce(10)

new_spark_df.show()

logger.info("Converting the SparkDataframe into GlueDataframe")
glue_df = DynamicFrame.fromDF(new_spark_df, glueContext, "new_spark_df")
logger.info("loading the data into the DynamoDB")
glueContext.write_dynamic_frame.from_options(frame=glue_df, connection_type="dynamodb", connection_options={
    "dynamodb.throughput.write.percent": 1.0,
    "dynamodb.output.tableName": "Financial_Data"
})
logger.info("Get the file name from S3 object")

logger.info("Move the file from S3 to folder named archive")
# Define source and destination paths


# Create S3 client
s3_client = boto3.client('s3')

# Copy the file
response = client.copy_object(Bucket= 'practiseons3', CopySource={'Bucket': 'practiseons3', 'Key': filenames[0]}, Key = f"Archive/{filenames[0]}")

# Delete the original file (optional)
s3_client.delete_object(Bucket="practiseons3", Key=filenames[0])

print("File moved successfully.")
job.commit()