In [None]:
!pip install opensearch-py

In [None]:
import os

import boto3
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

In [None]:
import sys

root_directory = '/home/glue_user/workspace/jupyter_workspace'
sys.path.append(root_directory)

from glue_connectors.creators import get_connector

In [None]:
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

hadoop_conf = sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.access.key", os.getenv('MINIO_ACCESS_KEY'))
hadoop_conf.set("fs.s3a.secret.key", os.getenv('MINIO_SECRET_KEY'))
hadoop_conf.set("fs.s3a.path.style.access", "true")
hadoop_conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.endpoint", "http://minio:9000")

In [None]:
# Initialize the S3 client for MinIO
s3 = boto3.client(
    's3',
    endpoint_url='http://minio:9000',
    aws_access_key_id=os.getenv('MINIO_ACCESS_KEY'),
    aws_secret_access_key=os.getenv('MINIO_SECRET_KEY'),
)

connector = get_connector("opensearch").get_connection()

In [None]:
def extract_file_name_without_extension(s3_object_key):
    """
    Extracts the file name without the extension from an S3 object key.
    
    Args:
        s3_object_key (str): The S3 object key.

    Returns:
        str: The file name without the extension.
    """
    # Use os.path.splitext to split the file name and extension
    file_name, file_extension = os.path.splitext(s3_object_key)

    # Get just the file name without the leading path and extension
    name_without_extension = os.path.basename(file_name)

    return name_without_extension

# Define the bucket and prefix
minio_bucket = os.getenv('MINIO_BUCKET')
minio_prefix = os.getenv('MINIO_PREFIX')

# List the objects in the bucket
response = s3.list_objects_v2(Bucket=minio_bucket, Prefix=minio_prefix)
for obj in response.get('Contents', []):
    path_s3 = "s3://{}/{}".format(minio_bucket, obj.get('Key'))
    df = spark.read.csv(path_s3, header=True, inferSchema=True)
    file_name = extract_file_name_without_extension(obj.get('Key'))
    connector.write_dataframe(df, index=file_name)