In [2]:
from pyspark.sql import SparkSession
import getpass
username=getpass.getuser()
spark=SparkSession.\
    builder.\
    config('spark.shuffle.useOldFetchProtocol','true'). \
    config("spark.sql.warehouse.dir",f"/user/{username}/warehouse").\
    enableHiveSupport().\
    master('yarn').\
    getOrCreate()

In [22]:
order_schema = 'order_id long, order_date string, customer_id long, order_status string'

In [3]:
order_df = spark.read \
.format("csv") \
.schema(order_schema) \
.load("source_file_path")

In [4]:
order_df.show()

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
|       4|2013-07-25 00:00:...|       8827|         CLOSED|
|       5|2013-07-25 00:00:...|      11318|       COMPLETE|
|       6|2013-07-25 00:00:...|       7130|       COMPLETE|
|       7|2013-07-25 00:00:...|       4530|       COMPLETE|
|       8|2013-07-25 00:00:...|       2911|     PROCESSING|
|       9|2013-07-25 00:00:...|       5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:...|       5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:...|        918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:...|       1837|         CLOSED|
|      13|2013-07-25 00:00:...|       9149|PENDING_PAYMENT|
|      14|2013-07-25 00:00:...|       98

In [29]:
order_df.rdd.getNumPartitions()

9

In [30]:
order_df.groupBy("order_status").count()

order_status,count
PENDING_PAYMENT,5636250
COMPLETE,8587125
ON_HOLD,1424250
PAYMENT_REVIEW,273375
PROCESSING,3103125
CLOSED,2833500
SUSPECTED_FRAUD,584250
PENDING,2853750
CANCELED,535500


In [54]:
customer_schema = "customer_id long, customer_fname string, customer_lname string, username string, password string, address string, city string, state string,pincode long"

In [32]:
customer_df = spark.read \
.format("csv") \
.schema(customer_schema) \
.load("source_file_path")

In [33]:
customer_df.show(5)

+-----------+--------------+--------------+---------+---------+--------------------+-----------+-----+-------+
|customer_id|customer_fname|customer_lname| username| password|             address|       city|state|pincode|
+-----------+--------------+--------------+---------+---------+--------------------+-----------+-----+-------+
|          1|       Richard|     Hernandez|XXXXXXXXX|XXXXXXXXX|  6303 Heather Plaza|Brownsville|   TX|  78521|
|          2|          Mary|       Barrett|XXXXXXXXX|XXXXXXXXX|9526 Noble Embers...|  Littleton|   CO|  80126|
|          3|           Ann|         Smith|XXXXXXXXX|XXXXXXXXX|3422 Blue Pioneer...|     Caguas|   PR|    725|
|          4|          Mary|         Jones|XXXXXXXXX|XXXXXXXXX|  8324 Little Common| San Marcos|   CA|  92069|
|          5|        Robert|        Hudson|XXXXXXXXX|XXXXXXXXX|10 Crystal River ...|     Caguas|   PR|    725|
+-----------+--------------+--------------+---------+---------+--------------------+-----------+-----+-------+
o

In [34]:
spark.conf.get('spark.sql.autoBroadcastJoinThreshold')

'10485760b'

In [35]:
spark.conf.set('spark.sql.autoBroadcastJoinThreshold','-1')

In [36]:
order_df.join(customer_df, order_df.customer_id == customer_df.customer_id, "inner").write.format("noop").mode("overwrite").save()

##broadcast Join

In [37]:
spark.conf.set('spark.sql.autoBroadcastJoinThreshold','10485760b')

In [38]:
order_df.join(customer_df, order_df.customer_id == customer_df.customer_id, "inner").write.format("noop").mode("overwrite").save()

In [39]:
order_df.join(customer_df, order_df.customer_id == customer_df.customer_id, "left").write.format("noop").mode("overwrite").save()

## partition skew

In [40]:
order_new_df = spark.read \
.format("csv") \
.schema(order_schema) \
.load("source_file_path")

In [41]:
order_new_df.groupBy("order_status").count().collect()

[Row(order_status='PENDING_PAYMENT', count=5636250),
 Row(order_status='COMPLETE', count=46008801),
 Row(order_status='ON_HOLD', count=1424250),
 Row(order_status='PAYMENT_REVIEW', count=273375),
 Row(order_status='PROCESSING', count=3103125),
 Row(order_status='CLOSED', count=2833500),
 Row(order_status='SUSPECTED_FRAUD', count=584250),
 Row(order_status='PENDING', count=2853750),
 Row(order_status='CANCELED', count=535500)]

In [55]:
customer_new_df = spark.read \
.format("csv") \
.schema(customer_schema) \
.load("source_file_path")

In [44]:
customer_new_df.show(6)

+-----------+--------------+--------------+---------+---------+--------------------+-----------+-----+-------+
|customer_id|customer_fname|customer_lname| username| password|             address|       city|state|pincode|
+-----------+--------------+--------------+---------+---------+--------------------+-----------+-----+-------+
|          1|       Richard|     Hernandez|XXXXXXXXX|XXXXXXXXX|  6303 Heather Plaza|Brownsville|   TX|  78521|
|          2|          Mary|       Barrett|XXXXXXXXX|XXXXXXXXX|9526 Noble Embers...|  Littleton|   CO|  80126|
|          3|           Ann|         Smith|XXXXXXXXX|XXXXXXXXX|3422 Blue Pioneer...|     Caguas|   PR|    725|
|          4|          Mary|         Jones|XXXXXXXXX|XXXXXXXXX|  8324 Little Common| San Marcos|   CA|  92069|
|          5|        Robert|        Hudson|XXXXXXXXX|XXXXXXXXX|10 Crystal River ...|     Caguas|   PR|    725|
|          6|          Mary|         Smith|XXXXXXXXX|XXXXXXXXX|3151 Sleepy Quail...|    Passaic|   NJ|   7055|
+

In [56]:
order_df.join(customer_new_df.distinct(), order_df.customer_id == customer_new_df.distinct().customer_id, "inner").write.format("noop").mode("overwrite").save()

In [6]:
spark.stop()

## solving for 3 problem  is AQE

In [5]:
spark

1 Coalesing

In [8]:
order_schema = 'order_id long, order_date string, customer_id long, order_status string'

In [9]:
order_df = spark.read \
.format("csv") \
.schema(order_schema) \
.load("source_file_path")

In [19]:
order_df.groupBy("order_status").count().write.format("noop").mode("overwrite").save()

In [13]:
spark.conf.get("spark.sql.adaptive.enabled")

'false'

In [47]:
spark.conf.set("spark.sql.adaptive.enabled", True)

In [15]:
spark.conf.get("spark.sql.adaptive.enabled")

'true'

2 handling partition skew

In [20]:
spark.stop()

In [38]:
order_schema = 'order_id long, order_date string, customer_id long, order_status string'

In [39]:
order_new_df = spark.read \
.format("csv") \
.schema(order_schema) \
.load("source_file_path")

In [44]:
order_new_df.show(3)

+--------+--------------------+-----------+------------+
|order_id|          order_date|customer_id|order_status|
+--------+--------------------+-----------+------------+
|    2480|2013-08-07 00:00:...|       3807|    COMPLETE|
|   30479|2014-01-30 00:00:...|       9265|    COMPLETE|
|    2481|2013-08-07 00:00:...|       2476|    COMPLETE|
+--------+--------------------+-----------+------------+
only showing top 3 rows



In [40]:
mapping_schema = "status string, code int"

In [41]:
mapping_df = spark.read \
.format("csv") \
.option("delimiter","|") \
.schema(mapping_schema) \
.load("source_file_path")

In [45]:
mapping_df.show(2)

+---------------+----+
|         status|code|
+---------------+----+
|PENDING_PAYMENT|   1|
|       COMPLETE|   2|
+---------------+----+
only showing top 2 rows



In [42]:
spark.conf.set('spark.sql.autoBroadcastJoinThreshold','-1')

In [48]:
order_new_df.join(mapping_df, order_new_df.order_status == mapping_df.status, "inner").write.format("noop").mode("overwrite").save()

In [3]:
spark.stop()