# Join RDDs

## Browse Databricks datasets 

In [0]:
#https://docs.databricks.com/en/discover/databricks-datasets.html
display(dbutils.fs.ls('/databricks-datasets'))

path,name,size,modificationTime
dbfs:/databricks-datasets/COVID/,COVID/,0,0
dbfs:/databricks-datasets/README.md,README.md,976,1532468253000
dbfs:/databricks-datasets/Rdatasets/,Rdatasets/,0,0
dbfs:/databricks-datasets/SPARK_README.md,SPARK_README.md,3359,1455043490000
dbfs:/databricks-datasets/adult/,adult/,0,0
dbfs:/databricks-datasets/airlines/,airlines/,0,0
dbfs:/databricks-datasets/amazon/,amazon/,0,0
dbfs:/databricks-datasets/asa/,asa/,0,0
dbfs:/databricks-datasets/atlas_higgs/,atlas_higgs/,0,0
dbfs:/databricks-datasets/bikeSharing/,bikeSharing/,0,0


## TPCH Data Analysis tpch

In [0]:
display(dbutils.fs.ls("/databricks-datasets/tpch/data-001/"))

path,name,size,modificationTime
dbfs:/databricks-datasets/tpch/data-001/README.md,README.md,236,1419281876000
dbfs:/databricks-datasets/tpch/data-001/customer/,customer/,0,0
dbfs:/databricks-datasets/tpch/data-001/lineitem/,lineitem/,0,0
dbfs:/databricks-datasets/tpch/data-001/nation/,nation/,0,0
dbfs:/databricks-datasets/tpch/data-001/orders/,orders/,0,0
dbfs:/databricks-datasets/tpch/data-001/part/,part/,0,0
dbfs:/databricks-datasets/tpch/data-001/partsupp/,partsupp/,0,0
dbfs:/databricks-datasets/tpch/data-001/region/,region/,0,0
dbfs:/databricks-datasets/tpch/data-001/supplier/,supplier/,0,0


### Analyze `Parts` dataset

In [0]:
# Read input part dataset as RDD[String]
part_input = sc.textFile("/databricks-datasets/tpch/data-001/part/")
part_input.take(10)

Out[3]: ['1|goldenrod lavender spring chocolate lace|Manufacturer#1|Brand#13|PROMO BURNISHED COPPER|7|JUMBO PKG|901.00|ly. slyly ironi|',
 '2|blush thistle blue yellow saddle|Manufacturer#1|Brand#13|LARGE BRUSHED BRASS|1|LG CASE|902.00|lar accounts amo|',
 '3|spring green yellow purple cornsilk|Manufacturer#4|Brand#42|STANDARD POLISHED BRASS|21|WRAP CASE|903.00|egular deposits hag|',
 '4|cornflower chocolate smoke green pink|Manufacturer#3|Brand#34|SMALL PLATED BRASS|14|MED DRUM|904.00|p furiously r|',
 '5|forest brown coral puff cream|Manufacturer#3|Brand#32|STANDARD POLISHED TIN|15|SM PKG|905.00| wake carefully |',
 '6|bisque cornflower lawn forest magenta|Manufacturer#2|Brand#24|PROMO PLATED STEEL|4|MED BAG|906.00|sual a|',
 '7|moccasin green thistle khaki floral|Manufacturer#1|Brand#11|SMALL PLATED COPPER|45|SM BAG|907.00|lyly. ex|',
 '8|misty lace thistle snow royal|Manufacturer#4|Brand#44|PROMO BURNISHED TIN|41|LG DRUM|908.00|eposi|',
 '9|thistle dim navajo dark gainsboro|Manufac

In [0]:
# Split the input data
part_input_splitted = part_input.map(lambda p: p.split('|'))
part_input_splitted.take(10)

Out[4]: [['1',
  'goldenrod lavender spring chocolate lace',
  'Manufacturer#1',
  'Brand#13',
  'PROMO BURNISHED COPPER',
  '7',
  'JUMBO PKG',
  '901.00',
  'ly. slyly ironi',
  ''],
 ['2',
  'blush thistle blue yellow saddle',
  'Manufacturer#1',
  'Brand#13',
  'LARGE BRUSHED BRASS',
  '1',
  'LG CASE',
  '902.00',
  'lar accounts amo',
  ''],
 ['3',
  'spring green yellow purple cornsilk',
  'Manufacturer#4',
  'Brand#42',
  'STANDARD POLISHED BRASS',
  '21',
  'WRAP CASE',
  '903.00',
  'egular deposits hag',
  ''],
 ['4',
  'cornflower chocolate smoke green pink',
  'Manufacturer#3',
  'Brand#34',
  'SMALL PLATED BRASS',
  '14',
  'MED DRUM',
  '904.00',
  'p furiously r',
  ''],
 ['5',
  'forest brown coral puff cream',
  'Manufacturer#3',
  'Brand#32',
  'STANDARD POLISHED TIN',
  '15',
  'SM PKG',
  '905.00',
  ' wake carefully ',
  ''],
 ['6',
  'bisque cornflower lawn forest magenta',
  'Manufacturer#2',
  'Brand#24',
  'PROMO PLATED STEEL',
  '4',
  'MED BAG',
  '906.00',


### Apply simple filteration for malformed records 
Assume any record with more than 9 columns is malformed
https://github.com/oracle/heatwave-tpch/blob/main/TPCH/create_tables.sql
```sql
CREATE TABLE PART  ( P_PARTKEY     INTEGER NOT NULL,
                     P_NAME        VARCHAR(55) NOT NULL,
                     P_MFGR        CHAR(25) NOT NULL,
                     P_BRAND       CHAR(10) NOT NULL,
                     P_TYPE        VARCHAR(25) NOT NULL,
                     P_SIZE        INTEGER NOT NULL,
                     P_CONTAINER   CHAR(10) NOT NULL,
                     P_RETAILPRICE DECIMAL(15,2) NOT NULL,
                     P_COMMENT     VARCHAR(23) NOT NULL,
                     PRIMARY KEY (P_PARTKEY));
CREATE TABLE PARTSUPP ( PS_PARTKEY     INTEGER NOT NULL,
                        PS_SUPPKEY     INTEGER NOT NULL,
                        PS_AVAILQTY    INTEGER NOT NULL,
                        PS_SUPPLYCOST  DECIMAL(15,2)  NOT NULL,
                        PS_COMMENT     VARCHAR(199) NOT NULL,
                        PRIMARY KEY (PS_PARTKEY, PS_SUPPKEY));
```

In [0]:
PART_SIZE = 10

# Split part input and parse it into RDD[(partKey, Part)]
part_mapped = part_input_splitted \
    .filter(lambda arr: len(arr) == PART_SIZE)
part_mapped.take(10)  

Out[5]: [['1',
  'goldenrod lavender spring chocolate lace',
  'Manufacturer#1',
  'Brand#13',
  'PROMO BURNISHED COPPER',
  '7',
  'JUMBO PKG',
  '901.00',
  'ly. slyly ironi',
  ''],
 ['2',
  'blush thistle blue yellow saddle',
  'Manufacturer#1',
  'Brand#13',
  'LARGE BRUSHED BRASS',
  '1',
  'LG CASE',
  '902.00',
  'lar accounts amo',
  ''],
 ['3',
  'spring green yellow purple cornsilk',
  'Manufacturer#4',
  'Brand#42',
  'STANDARD POLISHED BRASS',
  '21',
  'WRAP CASE',
  '903.00',
  'egular deposits hag',
  ''],
 ['4',
  'cornflower chocolate smoke green pink',
  'Manufacturer#3',
  'Brand#34',
  'SMALL PLATED BRASS',
  '14',
  'MED DRUM',
  '904.00',
  'p furiously r',
  ''],
 ['5',
  'forest brown coral puff cream',
  'Manufacturer#3',
  'Brand#32',
  'STANDARD POLISHED TIN',
  '15',
  'SM PKG',
  '905.00',
  ' wake carefully ',
  ''],
 ['6',
  'bisque cornflower lawn forest magenta',
  'Manufacturer#2',
  'Brand#24',
  'PROMO PLATED STEEL',
  '4',
  'MED BAG',
  '906.00',


In [0]:
# Catch the rejected records which aren't matching the case class size
part_rejected = part_input_splitted.filter(lambda arr: len(arr) != PART_SIZE)

## Any better way to filter?
part_rejected.take(10)


Out[6]: []

In [0]:
# Display parsed and rejected records count
print(f"Number of rejected records = {part_rejected.count()}")
print(f"Number of parsed records = {part_mapped.count()}")

Number of rejected records = 0
Number of parsed records = 1000000


In [0]:
class Part:
    def __init__(self, p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment):
        self.p_partkey = p_partkey
        self.p_name = p_name
        self.p_mfgr = p_mfgr
        self.p_brand = p_brand
        self.p_type = p_type
        self.p_size = p_size
        self.p_container = p_container
        self.p_retailprice = p_retailprice
        self.p_comment = p_comment

    def __repr__(self):
        return f"Part({self.p_partkey}, {self.p_name}, {self.p_mfgr}, {self.p_brand}, {self.p_type}, {self.p_size}, {self.p_container}, {self.p_retailprice}, {self.p_comment})"


In [0]:
part_transformed = part_mapped.map(lambda arr: (int(arr[0]), Part(int(arr[0]), arr[1], arr[2], arr[3], arr[4], int(arr[5]), arr[6], float(arr[7]), arr[8])))
part_transformed.take(10)

# RDD [(Int, Part)]

Out[9]: [(1,
  Part(1, goldenrod lavender spring chocolate lace, Manufacturer#1, Brand#13, PROMO BURNISHED COPPER, 7, JUMBO PKG, 901.0, ly. slyly ironi)),
 (2,
  Part(2, blush thistle blue yellow saddle, Manufacturer#1, Brand#13, LARGE BRUSHED BRASS, 1, LG CASE, 902.0, lar accounts amo)),
 (3,
  Part(3, spring green yellow purple cornsilk, Manufacturer#4, Brand#42, STANDARD POLISHED BRASS, 21, WRAP CASE, 903.0, egular deposits hag)),
 (4,
  Part(4, cornflower chocolate smoke green pink, Manufacturer#3, Brand#34, SMALL PLATED BRASS, 14, MED DRUM, 904.0, p furiously r)),
 (5,
  Part(5, forest brown coral puff cream, Manufacturer#3, Brand#32, STANDARD POLISHED TIN, 15, SM PKG, 905.0,  wake carefully )),
 (6,
  Part(6, bisque cornflower lawn forest magenta, Manufacturer#2, Brand#24, PROMO PLATED STEEL, 4, MED BAG, 906.0, sual a)),
 (7,
  Part(7, moccasin green thistle khaki floral, Manufacturer#1, Brand#11, SMALL PLATED COPPER, 45, SM BAG, 907.0, lyly. ex)),
 (8,
  Part(8, misty lace thist

### Read and Parse the `partsupp` Dataset

In [0]:
class PartSupp:
    def __init__(self, ps_partkey, ps_suppkey, ps_availqty, ps_supplycost, ps_comment):
        self.ps_partkey = ps_partkey
        self.ps_suppkey = ps_suppkey
        self.ps_availqty = ps_availqty
        self.ps_supplycost = ps_supplycost
        self.ps_comment = ps_comment

    def __repr__(self):
        return f"PartSupp({self.ps_partkey}, {self.ps_suppkey}, {self.ps_availqty}, {self.ps_supplycost}, {self.ps_comment})"


In [0]:
# Read input partSupp dataset as RDD[String]
partsupp_input = sc.textFile("/databricks-datasets/tpch/data-001/partsupp/")

partsupp_input.take(10)

Out[11]: ['1|2|3325|771.64|, even theodolites. regular, final theodolites eat after the carefully pending foxes. furiously regular deposits sleep slyly. carefully bold realms above the ironic dependencies haggle careful|',
 '1|12502|8076|993.49|ven ideas. quickly even packages print. pending multipliers must have to are fluff|',
 '1|25002|3956|337.09|after the fluffily ironic deposits? blithely special dependencies integrate furiously even excuses. blithely silent theodolites could have to haggle pending, express requests; fu|',
 '1|37502|4069|357.84|al, regular dependencies serve carefully after the quickly final pinto beans. furiously even deposits sleep quickly final, silent pinto beans. fluffily reg|',
 '2|3|8895|378.49|nic accounts. final accounts sleep furiously about the ironic, bold packages. regular, regular accounts|',
 '2|12503|4969|915.27|ptotes. quickly pending dependencies integrate furiously. fluffily ironic ideas impress blithely above the express accounts. furiously ev

In [0]:

# Split the input data
partsupp_input_splitted = partsupp_input.map(lambda p: p.split('|'))
partsupp_input_splitted.take(10)


Out[12]: [['1',
  '2',
  '3325',
  '771.64',
  ', even theodolites. regular, final theodolites eat after the carefully pending foxes. furiously regular deposits sleep slyly. carefully bold realms above the ironic dependencies haggle careful',
  ''],
 ['1',
  '12502',
  '8076',
  '993.49',
  'ven ideas. quickly even packages print. pending multipliers must have to are fluff',
  ''],
 ['1',
  '25002',
  '3956',
  '337.09',
  'after the fluffily ironic deposits? blithely special dependencies integrate furiously even excuses. blithely silent theodolites could have to haggle pending, express requests; fu',
  ''],
 ['1',
  '37502',
  '4069',
  '357.84',
  'al, regular dependencies serve carefully after the quickly final pinto beans. furiously even deposits sleep quickly final, silent pinto beans. fluffily reg',
  ''],
 ['2',
  '3',
  '8895',
  '378.49',
  'nic accounts. final accounts sleep furiously about the ironic, bold packages. regular, regular accounts',
  ''],
 ['2',
  '12503',
  '496

In [0]:
PARTSUPP_SIZE = 6

# Split partSupp input and parse it into RDD[(partKey, PartSupp)]
partsupp_mapped = partsupp_input_splitted \
    .filter(lambda arr: len(arr) == PARTSUPP_SIZE) \
    .map(lambda arr: (int(arr[0]), PartSupp(int(arr[0]), int(arr[1]), int(arr[2]), float(arr[3]), arr[4])))
partsupp_mapped.take(10)
# RDD [ (integer , PartSupp)]

Out[13]: [(1,
  PartSupp(1, 2, 3325, 771.64, , even theodolites. regular, final theodolites eat after the carefully pending foxes. furiously regular deposits sleep slyly. carefully bold realms above the ironic dependencies haggle careful)),
 (1,
  PartSupp(1, 12502, 8076, 993.49, ven ideas. quickly even packages print. pending multipliers must have to are fluff)),
 (1,
  PartSupp(1, 25002, 3956, 337.09, after the fluffily ironic deposits? blithely special dependencies integrate furiously even excuses. blithely silent theodolites could have to haggle pending, express requests; fu)),
 (1,
  PartSupp(1, 37502, 4069, 357.84, al, regular dependencies serve carefully after the quickly final pinto beans. furiously even deposits sleep quickly final, silent pinto beans. fluffily reg)),
 (2,
  PartSupp(2, 3, 8895, 378.49, nic accounts. final accounts sleep furiously about the ironic, bold packages. regular, regular accounts)),
 (2,
  PartSupp(2, 12503, 4969, 915.27, ptotes. quickly pending depen

In [0]:
    

# Catch the rejected records which aren't matching the case class size
partsupp_rejected = partsupp_input_splitted.filter(lambda arr: len(arr) != PARTSUPP_SIZE)

# Display parsed and rejected records count
print(f"Number of rejected records = {partsupp_rejected.count()}")
print(f"Number of parsed records = {partsupp_mapped.count()}")

Number of rejected records = 0
Number of parsed records = 4000000


# Join the part and partsupp Datasets

In [0]:
# PartRDD -> RDD[ (integer, Part)]
# PartSuppRDD -> RDD [ (integer, PartSupp)]
# JoinedRDD -> RDD [(integer,(Part,PartSupp)) ]
# Perform inner join on part and partsupp datasets
part_joined_partsupp = part_transformed.join(partsupp_mapped)

# Take the first 10 elements of the joined RDD and print them
# for record in part_joined_partsupp.take(10):
#     print(record)

# Print the count of joined records
print(f"Number of joined records = {part_joined_partsupp.count()}")

Number of joined records = 4000000
