In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config('spark.shuffle.useOldFetchProtocol', 'true'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [2]:
orders_schema = "order_id long , order_date string, customer_id long,order_status string"

In [3]:
orders_df = spark.read \
.format("csv") \
.schema(orders_schema) \
.load("/public/trendytech/orders/orders_1gb.csv")

In [4]:
customer_schema = " customerid long , customer_fname string , customer_lname string , user_name string,password string , address string, city string, state string, pincode long "

In [5]:
customers_df = spark.read \
.format("csv") \
.schema(customer_schema) \
.load("/public/trendytech/retail_db/customers")

In [6]:
spark.conf.set('spark.sql.autoBroadcastJoinThreshold', '-1')

In [7]:
spark.sql("create database itv009538_optimizeJoin")

In [8]:
spark.sql("use itv009538_optimizeJoin")

In [9]:
orders_df.write \
.mode("overwrite") \
.bucketBy(8, "customer_id") \
.sortBy("customer_id") \
.option("path", "Week10PySparkOptimizations/orders201") \
.saveAsTable("orderstab201")

In [10]:
spark.sql("describe formatted orderstab201").show(50, False)

+----------------------------+---------------------------------------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                                                            |comment|
+----------------------------+---------------------------------------------------------------------------------------------------------------------+-------+
|order_id                    |bigint                                                                                                               |null   |
|order_date                  |string                                                                                                               |null   |
|customer_id                 |bigint                                                                                                               |null   |
|order_status                |string                      

In [11]:
customers_df.write \
.mode("overwrite") \
.bucketBy(8, "customerid") \
.sortBy("customerid") \
.option("path", "Week10PySparkOptimizations/customers201") \
.saveAsTable("customerstab201")

In [12]:
spark.sql("select * from orderstab201 inner join customerstab201 on orderstab201.customer_id == customerstab201.customerid").write.format("noop").mode("overwrite").save()

In [13]:
spark.sql("select * from orderstab201 inner join customerstab201 on orderstab201.customer_id == customerstab201.customerid").explain()

== Physical Plan ==
*(3) SortMergeJoin [customer_id#89L], [customerid#91L], Inner
:- *(1) Sort [customer_id#89L ASC NULLS FIRST], false, 0
:  +- *(1) Filter isnotnull(customer_id#89L)
:     +- *(1) ColumnarToRow
:        +- FileScan parquet itv009538_optimizejoin.orderstab201[order_id#87L,order_date#88,customer_id#89L,order_status#90] Batched: true, DataFilters: [isnotnull(customer_id#89L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://m01.itversity.com:9000/user/itv009538/warehouse/itv009538_optimizejoin.d..., PartitionFilters: [], PushedFilters: [IsNotNull(customer_id)], ReadSchema: struct<order_id:bigint,order_date:string,customer_id:bigint,order_status:string>, SelectedBucketsCount: 8 out of 8
+- *(2) Sort [customerid#91L ASC NULLS FIRST], false, 0
   +- *(2) Filter isnotnull(customerid#91L)
      +- *(2) ColumnarToRow
         +- FileScan parquet itv009538_optimizejoin.customerstab201[customerid#91L,customer_fname#92,customer_lname#93,user_name#94,password#95,address#96,cit

In [14]:
spark.sql("show tables")

database,tableName,isTemporary
itv009538_optimiz...,customerstab201,False
itv009538_optimiz...,orderstab201,False


In [15]:
spark.sql(" drop table itv006277_optimizeJoin.customerstab201")

In [16]:
spark.sql(" drop table itv006277_optimizeJoin.orderstab201")