<a href="https://colab.research.google.com/github/Dipanjana-Saha/PysparkCode/blob/main/Salthing_technique.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [20]:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('diapanjana_pyspark').getOrCreate()

In [21]:
from pyspark.sql import functions as F
from pyspark.sql.functions import rand, floor

In [22]:
billing_df = (
    spark.read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("/content/billing.csv")
)

customer_df = (
    spark.read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("/content/customer.csv")
)

billing_df.show()
customer_df.show()

+-----------+------------+------+
|customer_id|billing_date|amount|
+-----------+------------+------+
|       C001|  2024-01-01|   100|
|       C001|  2024-01-02|   120|
|       C002|  2024-01-01|   150|
|       C002|  2024-01-02|   180|
|     C99999|  2024-01-01|   201|
|     C99999|  2024-01-02|   202|
|     C99999|  2024-01-03|   203|
|     C99999|  2024-01-04|   204|
|     C99999|  2024-01-05|   205|
|     C99999|  2024-01-06|   206|
|     C99999|  2024-01-07|   207|
|     C99999|  2024-01-08|   208|
|     C99999|  2024-01-09|   209|
|     C99999|  2024-01-10|   210|
|     C99999|  2024-01-11|   211|
|     C99999|  2024-01-12|   212|
|     C99999|  2024-01-13|   213|
|     C99999|  2024-01-14|   214|
|     C99999|  2024-01-15|   215|
|     C99999|  2024-01-16|   216|
+-----------+------------+------+
only showing top 20 rows

+-----------+-------------+----------+
|customer_id|customer_name|  category|
+-----------+-------------+----------+
|       C001|         Amit|  Domestic|
| 

In [23]:
skew_check = (
    billing_df.groupBy("customer_id")
    .count()
    .orderBy(F.desc("count"))
)

print("ðŸ”¥ SKEWED KEYS (TOP 10)")
skew_check.show()

ðŸ”¥ SKEWED KEYS (TOP 10)
+-----------+-----+
|customer_id|count|
+-----------+-----+
|     C99999|   30|
|       C001|    2|
|       C002|    2|
+-----------+-----+



In [32]:
billing_salted = billing_df.withColumn("salt", floor(rand() * 10))

# Create salt dataframe: 0 to 9
salt_df = spark.range(10).withColumnRenamed("id", "salt")
salt_df.show()

# Duplicate customer table 10 times with salt
customer_salted = customer_df.crossJoin(salt_df)
customer_salted.show()

+----+
|salt|
+----+
|   0|
|   1|
|   2|
|   3|
|   4|
|   5|
|   6|
|   7|
|   8|
|   9|
+----+

+-----------+-------------+--------+----+
|customer_id|customer_name|category|salt|
+-----------+-------------+--------+----+
|       C001|         Amit|Domestic|   0|
|       C001|         Amit|Domestic|   1|
|       C001|         Amit|Domestic|   2|
|       C001|         Amit|Domestic|   3|
|       C001|         Amit|Domestic|   4|
|       C001|         Amit|Domestic|   5|
|       C001|         Amit|Domestic|   6|
|       C001|         Amit|Domestic|   7|
|       C001|         Amit|Domestic|   8|
|       C001|         Amit|Domestic|   9|
|       C002|         John|Domestic|   0|
|       C002|         John|Domestic|   1|
|       C002|         John|Domestic|   2|
|       C002|         John|Domestic|   3|
|       C002|         John|Domestic|   4|
|       C002|         John|Domestic|   5|
|       C002|         John|Domestic|   6|
|       C002|         John|Domestic|   7|
|       C002|      

In [33]:
billing_salted.show()
customer_salted.show()

df_join_salted = billing_salted.join(customer_salted, ["customer_id", "salt"],"inner")

print("ðŸ”¥ JOIN RESULT (SALTED)")
df_join_salted.show()

+-----------+------------+------+----+
|customer_id|billing_date|amount|salt|
+-----------+------------+------+----+
|       C001|  2024-01-01|   100|   3|
|       C001|  2024-01-02|   120|   8|
|       C002|  2024-01-01|   150|   2|
|       C002|  2024-01-02|   180|   8|
|     C99999|  2024-01-01|   201|   1|
|     C99999|  2024-01-02|   202|   4|
|     C99999|  2024-01-03|   203|   3|
|     C99999|  2024-01-04|   204|   9|
|     C99999|  2024-01-05|   205|   5|
|     C99999|  2024-01-06|   206|   5|
|     C99999|  2024-01-07|   207|   2|
|     C99999|  2024-01-08|   208|   4|
|     C99999|  2024-01-09|   209|   5|
|     C99999|  2024-01-10|   210|   2|
|     C99999|  2024-01-11|   211|   3|
|     C99999|  2024-01-12|   212|   3|
|     C99999|  2024-01-13|   213|   2|
|     C99999|  2024-01-14|   214|   8|
|     C99999|  2024-01-15|   215|   6|
|     C99999|  2024-01-16|   216|   9|
+-----------+------------+------+----+
only showing top 20 rows

+-----------+-------------+--------+--

In [34]:
df_join_broadcast = billing_df.join(
    F.broadcast(customer_df),
    "customer_id",
    "inner"
)

print("ðŸ”¥ JOIN RESULT (BROADCAST)")
df_join_broadcast.show()

ðŸ”¥ JOIN RESULT (BROADCAST)
+-----------+------------+------+-------------+----------+
|customer_id|billing_date|amount|customer_name|  category|
+-----------+------------+------+-------------+----------+
|       C001|  2024-01-01|   100|         Amit|  Domestic|
|       C001|  2024-01-02|   120|         Amit|  Domestic|
|       C002|  2024-01-01|   150|         John|  Domestic|
|       C002|  2024-01-02|   180|         John|  Domestic|
|     C99999|  2024-01-01|   201|   Power Corp|Commercial|
|     C99999|  2024-01-02|   202|   Power Corp|Commercial|
|     C99999|  2024-01-03|   203|   Power Corp|Commercial|
|     C99999|  2024-01-04|   204|   Power Corp|Commercial|
|     C99999|  2024-01-05|   205|   Power Corp|Commercial|
|     C99999|  2024-01-06|   206|   Power Corp|Commercial|
|     C99999|  2024-01-07|   207|   Power Corp|Commercial|
|     C99999|  2024-01-08|   208|   Power Corp|Commercial|
|     C99999|  2024-01-09|   209|   Power Corp|Commercial|
|     C99999|  2024-01-10| 

In [26]:
billing_salted.show()

AttributeError: 'NoneType' object has no attribute 'show'