In [None]:
!rm -rf /private/tmp/derby/metastore_db /tmp/derby /tmp/spark-warehouse/
%pip install sparksql-magic
%load_ext sparksql_magic
!echo $AWS_PROFILE

In [None]:
# Initialize Spark session
import os
from pyspark.sql import SparkSession

spark_version = "3.2.3"
hadoop_version = "3.3.1"
scala_version = "2.12"

packages = ",".join([
    f"org.apache.hadoop:hadoop-aws:{hadoop_version}"
])

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("AnalysisTool") \
    .config("spark.sql.warehouse.dir", "/tmp/spark-warehouse") \
    .config("spark.hadoop.fs.AbstractFileSystem.s3.impl", "org.apache.hadoop.fs.s3a.S3A") \
    .config("spark.hadoop.fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A") \
    .config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.profile.ProfileCredentialsProvider") \
    .config("spark.hadoop.fs.s3a.multipart.size", "104857600") \
    .config("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true -Dderby.system.home=/tmp/derby/xyz") \
    .config("spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.speculation", "false") \
    .config("spark.jars.packages", packages) \
    .config("spark.hive.serialization.extend.nesting.levels", "true") \
    .config("spark.sql.caseSensitive", True) \
    .enableHiveSupport() \
    .getOrCreate()

# Other hadoop configs
hadoop_config = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_config.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
hadoop_config.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
hadoop_config.set("parquet.enable.summary-metadata", "false")
hadoop_config.set("dfs.client.read.shortcircuit.skip.checksum", "true")
hadoop_config.set("spark.hive.serialization.extend.nesting.levels", "true")

sc = spark.sparkContext
spark

## Read from DynamoDB

In [None]:
import boto3


# This is not the ideal method but it's the easiest way. Might get an OOM.
def deserialize(table_name):
    client = boto3.client(service_name="dynamodb", region_name="eu-west-1")
    response = client.scan(TableName=table_name)
    data = response["Items"]

    # Below loop will actually give clear output from the data 
    list_data = []
    for item in data:
        output_dict = {}
        for key, value in item.items():
            if isinstance(value, dict):
                if 'S' in value:
                    output_dict[key] = value['S']
                elif 'N' in value:
                    output_dict[key] = value['N']
        list_data.append(output_dict)
    
    return spark.createDataFrame(list_data)


In [None]:
# Load all the tables
tables = ["customer", "product", "order"]
for table in tables:
    deserialize(table).createOrReplaceTempView(table)

In [None]:
%%sparksql
SELECT * FROM customer

In [None]:
%%sparksql
SELECT * FROM product

In [None]:
%%sparksql
SELECT * FROM order

## Sample Report

In [None]:
%%sparksql
WITH dataset AS (
    SELECT
        o.id AS order_id,
        c.id AS customer_id,
        c.name AS customer_name,
        p.id AS product_id,
        p.name AS product_name,
        CAST(o.quantity AS BIGINT) AS quantity,
        CAST(o.total AS DECIMAL(10,2)) AS total
    FROM
        order o
    LEFT JOIN
        customer c ON c.id = o.customer_id
    LEFT JOIN
        product p ON p.id = o.product_id
)

SELECT
    product_id,
    product_name,
    SUM(quantity) AS quantity,
    SUM(total) AS total,
    COUNT(DISTINCT order_id) AS num_orders
FROM 
    dataset
GROUP BY
    product_id,
    product_name
ORDER BY
    num_orders DESC
LIMIT 
    10
