This file will be used to contain data processing components

In [1]:
import os
from pyspark.sql import SparkSession

In [2]:
# You can use the following to set the environment variables in the notebook if you don't set manually access key, secret key and endpoint in minio
os.environ['AWS_ACCESS_KEY_ID'] = os.getenv('MINIO_ACCESS_KEY')
os.environ['AWS_SECRET_KEY'] = os.getenv('MINIO_SECRET_KEY')
os.environ['AWS_SECRET_ACCESS_KEY'] = os.getenv('MINIO_SECRET_KEY')

In [3]:
# Define S3 storage
obj_storage_access_key = os.getenv('MINIO_ACCESS_KEY')
obj_storage_secret_key = os.getenv('MINIO_SECRET_KEY')
obj_storage_endpoint = os.getenv('MINIO_STORAGE_ENDPOINT', 'http://minio:9000')


In [4]:
path_1 = "s3a://data/data-raw/data.json"
path_2 = "s3a://data/data-raw/data2.json"
path_3 = "s3a://data/data-raw/data3.json"
result_path = "s3a://data/data-result/result.json"

In [None]:
from pyspark import SparkContext
# You need to more configuration if you want to use minio as object storage 
# (hint: maybe you can using .config() method to set the configuration if you want using spark to read/write data from/to minio)
# Make sure Java install and JAVA_HOME as well as PATH is updated
spark = (SparkSession.builder.appName("Python Spark SQL basic example")
         .getOrCreate())

def load_config(spark_context: SparkContext):
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv("MINIO_ACCESS_KEY"))
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key",
                                                 os.getenv("MINIO_SECRET_KEY"))
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", obj_storage_endpoint)
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "true")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.attempts.maximum", "1")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.establish.timeout", "5000")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.timeout", "10000")
load_config(spark.sparkContext)


# Read JSON file from MinIO
data1 = spark.read.option("multiline","true").json(path_1)
data2 = spark.read.option("multiline","true").json(path_2)
data3 = spark.read.option("multiline","true").json(path_3)
# Merge data from multiple dataframe
data = data1.union(data2).union(data3)
data.show()

In [None]:
# Using dropDuplicates to remove duplicated rows and show
data = data.dropDuplicates()
data.show()

### Update load result to MinIO with json format

In [None]:
# Based on the fact that Spark save data in multiple parts to allow multiple workers can access and write in parallel
# To save result in single file, I chose save dataframe to json file then upload to MinIO
import json, shutil, os
from ultils import upload_file_to_minio
json_data = data.toJSON().collect()
json_data = [json.loads(x) for x in json_data]
with open ("./result.json", "a") as outfile:
    json.dump(json_data, outfile)
upload_file_to_minio('result.json', 'data', 'data-result/result.json', obj_storage_endpoint, obj_storage_access_key, obj_storage_secret_key)

### Update load result to MinIO with csv format

In [None]:
import pandas as pd

# This file contain complex data such as dict, list.
# So that, to save in csv format, we need convert these information to string.
# To do that, I chose pandas lib to normalize json
pd_data = pd.read_json(json.dumps(json_data))
data_csv= pd_data.to_csv("result.csv")
# Now, upload file to MinIO
upload_file_to_minio('result.csv', 'data', 'data-result/result.csv', obj_storage_endpoint, obj_storage_access_key, obj_storage_secret_key)
