This file will be used to contain data processing components

In [1]:
%pip install boto3 pyspark delta-spark python-dotenv

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 23.2.1 -> 24.0
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:

import os
from pyspark.sql import SparkSession
from pyspark import SparkConf

In [3]:
# Define S3 storage
obj_storage_access_key = os.getenv('OBJ_STORAGE_ACCESS_KEY')
obj_storage_secret_key = os.getenv('OBJ_STORAGE_SECRET_KEY')
obj_storage_endpoint = os.getenv('OBJ_STORAGE_ENDPOINT', 'http://localhost:9000')

In [4]:
path_1 = "s3a://data/data-raw/data.json"
path_2 = "s3a://data/data-raw/data2.json"
path_3 = "s3a://data/data-raw/data3.json"

In [5]:
conf = SparkConf()
conf.set("spark.hadoop.fs.s3a.access.key", obj_storage_access_key)
conf.set("spark.hadoop.fs.s3a.secret.key", obj_storage_secret_key)
conf.set("spark.hadoop.fs.s3a.endpoint", obj_storage_endpoint)
conf.set("spark.hadoop.fs.s3a.path.style.access", "true")
conf.set("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4")
conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

# Below code is to avoid having to create an AWS account
conf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider')
conf.set("fs.s3a.requester.pays.enabled", "true")

<pyspark.conf.SparkConf at 0x1af68c8c9b0>

In [6]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config(conf=conf) \
    .getOrCreate()

In [7]:
from functools import reduce
path_list = [path_1, path_2, path_3]
dataframes = map(lambda p: spark.read.json(p, multiLine=True), path_list)
union_df = reduce(lambda df1, df2: df1.union(df2), dataframes)


In [17]:
union_df.show()

+--------------------+--------------------+----+-------------------+----+--------------------+-----+
|             batters|             filling|  id|               name| ppu|             topping| type|
+--------------------+--------------------+----+-------------------+----+--------------------+-----+
|{[{1001, Regular}...|                NULL|0004|              Jelly|0.65|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0005|     Custard-Filled|0.75|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|[{6001, None}, {6...|0006|     Cinnamon Twist|0.85|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0007|    Vanilla Frosted|0.75|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0008| Strawberry Frosted|0.85|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0009|     Chocolate Cake|0.75|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0010|     Blueberry Cake|0.55|[{5001, None}, {5.

In [48]:
import pandas as pd
pandas_df = union_df.toPandas()
pandas_df.to_json(path_or_buf="result.json", orient='records')

In [45]:
# THIS PART IS A TEST TO FORMAT RESULT.JSON
# import json
# with open(file="result.json", mode="r+") as file:
#     data = json.load(file)
#     for rec in data:
        
#         rec["batters"] = {"batter": item for item in rec["batters"]}
#     file.seek(0)
#     json.dump(data, file, indent=4)

In [50]:
def upload_file_to_minio(file_path, minio_bucket, minio_object_name):
    s3c = boto3.resource('s3',
                        endpoint_url=obj_storage_endpoint,
                        aws_access_key_id=obj_storage_access_key,
                        aws_secret_access_key=obj_storage_secret_key,
                        config=boto3.session.Config(signature_version='s3v4'),
                        verify=False
                        )
    s3c.Bucket(minio_bucket).upload_file(file_path, minio_object_name)

In [52]:
upload_file_to_minio("result.json", "data", "data-result/result.json")

In [54]:
result_path = "s3a://data/data-result/result.json"
result_df = spark.read.json(result_path, multiLine=True)
result_df.show()

+--------------------+--------------------+----+-------------------+----+--------------------+-----+
|             batters|             filling|  id|               name| ppu|             topping| type|
+--------------------+--------------------+----+-------------------+----+--------------------+-----+
|[[[1001, Regular]...|                NULL|0004|              Jelly|0.65|[[5001, None], [5...|donut|
|[[[1001, Regular]...|                NULL|0005|     Custard-Filled|0.75|[[5001, None], [5...|donut|
|[[[1001, Regular]...|[[6001, None], [6...|0006|     Cinnamon Twist|0.85|[[5001, None], [5...|donut|
|[[[1001, Regular]...|                NULL|0007|    Vanilla Frosted|0.75|[[5001, None], [5...|donut|
|[[[1001, Regular]...|                NULL|0008| Strawberry Frosted|0.85|[[5001, None], [5...|donut|
|[[[1001, Regular]...|                NULL|0009|     Chocolate Cake|0.75|[[5001, None], [5...|donut|
|[[[1001, Regular]...|                NULL|0010|     Blueberry Cake|0.55|[[5001, None], [5.