## Import Libraries

In [1]:
#import related pyspark library
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import unix_timestamp, from_unixtime
from pyspark.sql.functions import from_json, col, struct
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
from pyspark.sql.window import Window

#import related data navigation / processing library
import os

# Load AWS credentials from environment variables
aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
aws_region = os.getenv('AWS_DEFAULT_REGION')

In [2]:
# Create SparkSession
spark = SparkSession \
    .builder \
    .appName("S3ConnectionTest") \
    .config("spark.jars.packages", 
            "org.apache.hadoop:hadoop-aws:x.x.x,"
            "com.amazonaws:aws-java-sdk:x.x.x,"
            "org.apache.spark:spark-sql-kafka:x.x.x,"
            "org.mongodb.spark:mongo-spark-connector:x.x.x") \
    .config("spark.executor.extraJavaOptions", "enable region version here") \
    .config("spark.driver.extraJavaOptions", "enable region version here") \
    .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("fs.s3a.access.key", aws_access_key_id) \
    .config("fs.s3a.secret.key", aws_secret_access_key) \
    .config("fs.s3a.endpoint", f"s3.{aws_region}.amazonaws.com") \
    .getOrCreate()

## Fetch transaction data from mongodb

In [3]:
# query data from mongodb and create dataframe
# Read the data from MongoDB
df = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
    .option("spark.mongodb.input.uri","input mongodb credential here")\
    .load()

## Flattened the dataframe as One Wide Table Example

In [4]:
# Flatten the DataFrame
# Optional: filter to certain nth row for testtin

flattened_df = df.select(
    col("trans_num"),
    col("trans_date_trans_time"),
    col("cc_num"),
    col("merchant"),
    col("category"),
    col("amt"),
    col("city_pop"),
    col("unix_time"),
    col("card_holder.first").alias("card_holder_first")
    col("is_fraud")
)

In [5]:
# Convert trans_date_trans_time to a date format
flattened_df = flattened_df.withColumn(
    'trans_date_trans_time',
    from_unixtime(unix_timestamp('trans_date_trans_time', 'yyyy-MM-dd HH:mm:ss')).cast('timestamp')
)

In [None]:
#filter to certain timeframe only (first two days for first run)
from pyspark.sql.functions import to_date, col

# Filter records for the date '2020-01-04' regardless of time
filtered_df = flattened_df.filter(to_date(col('trans_date_trans_time')) <= '2020-01-02')

# reinstate filtered value to flattened df
flattened_df = filtered_df

## Adding flattened json data to s3 bucket as parquet file

In [13]:
def insert_data_to_s3(input_df):
    # Define the output path
    output_path = f"s3a://your S3 bucket path"

    # Write the data to S3 in Parquet format
    input_df.write \
        .partitionBy("your preferred partition style") \
        .mode("append") \
        .option("compression", "gzip") \
        .parquet(output_path)

    print(f"Data successfully written to {output_path}")

In [None]:
insert_data_to_s3(flattened_df)