This file will be used to contain data processing components

In [782]:
%pip install boto3 pyspark delta-spark python-dotenv

22625.11s - pydevd: Sending message related to process being replaced timed-out after 5 seconds


Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [783]:
import os
from pyspark.sql import SparkSession

In [784]:
# Define S3 storage
obj_storage_access_key = os.getenv('OBJ_STORAGE_ACCESS_KEY', 'mEme2PPQTqFNmqKwZaZS')
obj_storage_secret_key = os.getenv('OBJ_STORAGE_SECRET_KEY', '7X2H2jkfqjFh1Px8MCG5Qo98C4PUSQ1VTVibOwMx')
obj_storage_endpoint = os.getenv('OBJ_STORAGE_ENDPOINT', 'http://localhost:9000')

In [785]:
data_folder_path = "data/data-result"

path_1 = "s3a://data/data-raw/data.json"
path_2 = "s3a://data/data-raw/data2.json"
path_3 = "s3a://data/data-raw/data3.json"

In [786]:
""" 6. Use Spark to connect to MinIO and read raw files """

# You need to more configuration if you want to use minio as object storage 
# (hint: maybe you can using .config() method to set the configuration if you want using spark to read/write data from/to minio)
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .master("local") \
    .config("spark.jars.packages", "com.amazonaws:aws-java-sdk-bundle:1.11.375,org.apache.hadoop:hadoop-aws:3.2.0") \
    .config("spark.hadoop.fs.s3a.endpoint", obj_storage_endpoint) \
    .config("spark.hadoop.fs.s3a.access.key", obj_storage_access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", obj_storage_secret_key) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \
    .getOrCreate()

In [787]:
try:
    file_1 = spark.read.json(path_1, multiLine=True)
    file_2 = spark.read.json(path_2, multiLine=True)
    file_3 = spark.read.json(path_3, multiLine=True)
except Exception as e:
    raise e

In [788]:
import os
import boto3

# Function to upload file to minio
def upload_file_to_minio(file_path, minio_bucket, minio_object_name):
    s3c = boto3.resource('s3',
                        endpoint_url=obj_storage_endpoint,
                        aws_access_key_id=obj_storage_access_key,
                        aws_secret_access_key=obj_storage_secret_key,
                        config=boto3.session.Config(signature_version='s3v4'),
                        verify=False
                        )
    try:
        s3c.Bucket(minio_bucket).upload_file(file_path, minio_object_name)
    except Exception as e:
        raise e

# PySpark by default will create files name `part-xxxxx`
# so we should rename the main file and delete other files
def rename_file(path, extension, name):
    if os.path.exists(path):
        for f in os.listdir(path):
            if f.endswith(extension):
                os.rename(f'{path}/{f}', f'{path}/{name}')
            elif f.startswith(".part") or f.startswith("_SUCCESS") or f.endswith(".crc"):
                # Also delete unused files
                os.remove(f'{path}/{f}')

In [789]:
""" 7. Merge JSON files into a single JSON file and upload to MinIO """
import json

data_result_json_filename = "result.json"

# merge 3 JSON files
df_result = file_1.union(file_2).union(file_3)
result_json = df_result.toJSON().map(lambda x: json.loads(x)).collect()

if not os.path.exists(data_folder_path):
    os.mkdir(data_folder_path)

# PySpark doesn't support indentation so default Python is used
with open(f'{data_folder_path}/{data_result_json_filename}', "w") as f:
    json.dump(result_json, f, ensure_ascii=False, indent=4)
    
upload_file_to_minio(f'{data_folder_path}/{data_result_json_filename}', "data", "data-result/result.json")

In [790]:
""" 8. Read result.json file, display data, remove duplicate """

result_json_path = "s3a://data/data-result/result.json"
df_result_json = spark.read.json(result_json_path, multiLine=True)
print(df_result_json.show(df_result_json.count()))

# Drop duplicated records by ID
df_result_json = df_result_json.dropDuplicates(["id"])
print(df_result_json.count())

+--------------------+--------------------+----+-------------------+----+--------------------+-----+
|             batters|             filling|  id|               name| ppu|             topping| type|
+--------------------+--------------------+----+-------------------+----+--------------------+-----+
|{[{1001, Regular}...|                NULL|0004|              Jelly|0.65|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0005|     Custard-Filled|0.75|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|[{6001, None}, {6...|0006|     Cinnamon Twist|0.85|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0007|    Vanilla Frosted|0.75|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0008| Strawberry Frosted|0.85|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0009|     Chocolate Cake|0.75|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0010|     Blueberry Cake|0.55|[{5001, None}, {5.

In [791]:
from pyspark.sql.functions import col, concat_ws, when, array
# 9. Save to CSV file, upload to MinIO and read it.

data_result_csv_filename = "result.csv"

# Cast columns datatype to string to write CSV file
df_result_as_string = df_result_json.select([col(c).cast("string") for c in df_result_json.columns])

df_result_as_string.coalesce(1).write.mode("overwrite").csv(data_folder_path, header=True)

rename_file(data_folder_path, ".csv", data_result_csv_filename)

upload_file_to_minio(f'{data_folder_path}/{data_result_csv_filename}', "data", "data-result/result.csv")

# Try to read the recent uploaded CSV file   
result_csv_path = "s3a://data/data-result/result.csv"
file_result_csv = spark.read.option("delimiter",",").option("header","true").csv(result_csv_path)

print(file_result_csv.show())

spark.stop()

+--------------------+--------------------+----+-------------------+----+--------------------+-----+
|             batters|             filling|  id|               name| ppu|             topping| type|
+--------------------+--------------------+----+-------------------+----+--------------------+-----+
|{[{1001, Regular}...|                NULL|0004|              Jelly|0.65|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0005|     Custard-Filled|0.75|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|[{6001, None}, {6...|0006|     Cinnamon Twist|0.85|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0007|    Vanilla Frosted|0.75|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0008| Strawberry Frosted|0.85|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0009|     Chocolate Cake|0.75|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0010|     Blueberry Cake|0.55|[{5001, None}, {5.