In [0]:
############
# Answer to question 
# for the TPC_H_Dataset part
#-------------
#Author: AdrianJ
#V1.0 Created(2023-09-01)
############

In [0]:
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import functions as F

#### **Question #1**: Joins in Core Spark
Pick any two datasets and join them using Spark's API. Feel free to pick any two datasets. For example: `PART` and `PARTSUPP`. The goal of this exercise is not to derive anything meaningful out of this data but to demonstrate how to use Spark to join two datasets. For this problem, you're **NOT allowed to use SparkSQL**. You can only use RDD API. You can use either Python or Scala to solve this problem.

In [0]:
#For this join I will be enriching PARTSUPP with PART table.
#function to separete columns 

def parser(line):
    """
    Parse the rdd as expression with a separetor
    have the ability to select the number
    @line: row from text file

    """
    r = line.split('|')
    
    return (r[0], tuple(r[n] for n in range(len(r)) if n != 0))

#read part(P_)
p_file_path = '/databricks-datasets/tpch/data-001/part/'
p_rdd = (
    sc.textFile(p_file_path, 2)
)

p_rdd = p_rdd.map(lambda line : parser(line))
p_rdd = p_rdd.cache()

#read partsupp(ps_)
ps_file_path = '/databricks-datasets/tpch/data-001/partsupp/'
ps_rdd = (
    sc.textFile(ps_file_path, 2)
)

ps_rdd = ps_rdd.map(lambda line : parser(line))

#join p and ps table
join = ps_rdd.leftOuterJoin(p_rdd)

print("INFO:The PARTSUPP object enriched with PART object")
print(f" Example of one row {join.take(1)}")
ps_rdd.unpersist

INFO:The PARTSUPP object enriched with PART object
 Example of one row [('24', (('25', '5180', '905.41', 'heodolites above the ironic requests poach fluffily carefully unusual pinto beans. even packages acc', ''), ('seashell coral metallic midnight floral', 'Manufacturer#5', 'Brand#52', 'MEDIUM PLATED STEEL', '20', 'MED CASE', '924.02', ' final the', '')))]


<bound method RDD.unpersist of PythonRDD[791] at RDD at PythonRDD.scala:58>

#### **Question #2**: Joins With Spark SQL
Pick any two datasets and join them using SparkSQL API. Feel free to pick any two datasets. For example, PART and PARTSUPP. The goal of this exercise is not to derive anything meaningful out of this data but to demonstrate how to use Spark to join two datasets. For this problem, you're **NOT allowed to use the RDD API**. You can only use SparkSQL API. You can use either Python or Scala to solve this problem. 

In [0]:

#For this join I will be enriching PARTSUPP with the part name from the PART table.
#read part(P_)
p_schema = StructType([
    StructField("PARTKEY",IntegerType(),False),
    StructField("NAME",StringType(),True),
    StructField("MFGR",StringType(),True),
    StructField("BRAND",StringType(),True),
    StructField("TYPE",StringType(),True),
    StructField("SIZE",StringType(),True),
    StructField("CONTAINER",StringType(),True),
    StructField("RETAILPRICE",StringType(),True),
    StructField("COMMENT",StringType(),True)
])

p_file_path = '/databricks-datasets/tpch/data-001/part/'
p_df = (
    spark.read.format("csv")
    .options(sep="|", header="false", inferSchema="false")
    .schema(p_schema)
    .load(p_file_path)
    .select(
        F.col("PARTKEY"),
        F.col("NAME")
    )
)

#read partsupp(ps_)
ps_schema = StructType([
    StructField("PARTKEY",IntegerType(),False),
    StructField("SUPPKEY",StringType(),False),
    StructField("AVAILQTY",StringType(),True),
    StructField("SUPPLYCOST",StringType(),True),
    StructField("COMMENT",StringType(),True)
])

ps_file_path = '/databricks-datasets/tpch/data-001/partsupp/'
ps_df = (
    spark.read.format("csv")
    .options(sep="|", header="false", inferSchema="false")
    .schema(ps_schema)
    .load(ps_file_path)
)

#enrich PARTSUPP with part name from PART
ps_df = ps_df.join(
    F.broadcast(p_df),
    on=["PARTKEY"], 
    how="left"
)

print("INFO:The PARTSUPP object enriched with part name from PART on PARTKEY")
print(f" List of columns from PARTSUPP {ps_df.columns}")
print(f" Example of one row")
ps_df.show(1)

INFO:The PARTSUPP object enriched with part name from PART on PARTKEY
 List of columns from PARTSUPP ['PARTKEY', 'SUPPKEY', 'AVAILQTY', 'SUPPLYCOST', 'COMMENT', 'NAME']
 Example of one row
+-------+-------+--------+----------+--------------------+--------------------+
|PARTKEY|SUPPKEY|AVAILQTY|SUPPLYCOST|             COMMENT|                NAME|
+-------+-------+--------+----------+--------------------+--------------------+
|      1|      2|    3325|    771.64|, even theodolite...|goldenrod lavende...|
+-------+-------+--------+----------+--------------------+--------------------+
only showing top 1 row



#### **Question #3**: Alternate Data Formats
The given dataset above is in raw text storage format. What other data storage format can you suggest optimizing the performance of our Spark workload if we were to frequently scan and read this dataset. Please come up with a code example and explain why you decide to go with this approach. Please note that there's no completely correct answer here. We're interested to hear your thoughts and see the implementation details.

In [0]:
# The format I would choose to optimize the would be any parquet format file. 
# In my experience Delta lake format is a very good format due to the multiple compatibility that 
# it has with a platform like databricks. For example: ACID Transactions, Schema Enforcement and Evolution and 'time travel'
# NB! Databricks create delta tables by default. However, for sake of the exercise I'm declaring the format

#Example code below:
def create_delta_table(df: SparkDataFrame, schema_name: str, mode: str, table_name: str):
    """
    This function takes a SparkDataFrame as an input and creates a managed delta table
    Then checks if table was succefully created otherwise raise error
    """
    spark.sql(f"""CREATE SCHEMA IF NOT EXISTS de_coding_challenge""")

    (df.write.mode(mode).format("delta").saveAsTable(f"{schema_name}.{table_name}"))
    #check if table was created successfully
    table_exists = spark.catalog.tableExists(f"{schema_name}.{table_name}")
    if table_exists:
        print(f"INFO: Successfully created delta table -> {schema_name}.{table_name}")
    else:
        e = f"ERROR: Failed to create table -> {schema_name}.{table_name}"
        raise Exception(e)

#schema variables
schema_name = "de_coding_challenge"
mode = "overwrite"

#create PART table from question 2
create_delta_table(p_df, schema_name, mode, "PART")

#Create PARTSUPP delta table from question 2
create_delta_table(ps_df, schema_name, mode, "PARTSUPP")

INFO: Successfully created delta table -> de_coding_challenge.PART
INFO: Successfully created delta table -> de_coding_challenge.PARTSUPP
