## Discovering Hidden PySpark Treasures: Unique Tips and Hacks
---

### 1.  Accurate Approximate Distinct Count
---
- Calculating the precise distinct count might take time when dealing with large datasets. To optimize this, we can use the `approx_count_distinct()` function, which provides an accurate approximation of the distinct count. This function is particularly useful when you need to estimate.

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import approx_count_distinct

In [0]:
spark = SparkSession.builder.appName("approx-distinct-count").getOrCreate()
df = spark.createDataFrame([(1,), (2,), (1,), (3,), (2,)], ["value"])
df.show()

+-----+
|value|
+-----+
|    1|
|    2|
|    1|
|    3|
|    2|
+-----+



In [0]:
distinct_count = df.select(approx_count_distinct('value')).first()[0]
print("Approximate Distinct Count :", distinct_count)

Approximate Distinct Count : 3


### 2.  Optimizing UDF Performance
---

- User-Defined Function(UDFs) are powerful, but their performance can be a concern. Leveraging pandas UDFs can significantly speed up your computations by allowing you to use vectroized pandas functions on your pySpark Dataframes.

In [0]:
from pyspark.sql.functions import pandas_udf
import pandas as pd



In [0]:
# @pandas_udf(IntegerType())
# def slen(s: pd.Series) -> pd.Series:
#     return s.str.len()

In [0]:
spark = SparkSession.builder.appName("Pandas-udf").getOrCreate()

@pandas_udf('double')
def custom_udf(series: pd.Series) -> pd.Series:
    return series * 2

df = spark.createDataFrame([(1,), (2,), (3,)], ['value'])
df.withColumn('result', custom_udf(df['value'])).show()

+-----+------+
|value|result|
+-----+------+
|    1|   2.0|
|    2|   4.0|
|    3|   6.0|
+-----+------+



### 3.  Custom Aggregation with Pandas UDAF
---
Taking aggregation a step further, we can create custom aggregation functions using Pandas User-Defined Aggregate Functions (UDAF). This unlocks to perform intricate aggregations with the power of pandas.

In [0]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import IntegerType
import pandas as pd



In [0]:
spark = SparkSession.builder.appName('pandas-udaf').getOrCreate()

@pandas_udf(IntegerType())
def custom_udaf(series: pd.Series) -> int:
    return series.sum()

df = spark.createDataFrame([(1,), (2,), (3,), (4,)], ['value'])
df.show()
result = df.agg(custom_udaf(df['value'])).first()[0]
print('custome UDAF Result: ', result)

custome UDAF Result:  10


### 4 Dynamic Column Selection in DataFrame
---
Often, you may need to select columns from DataFrame based on a specific conditions. The select function can be enhanced with the `colRegex()` to achive dynamic column selection based on column name characteristics. For instance  you want to select column with names that start with 'feature_' followed by any digit.

In [0]:
from pyspark.sql.functions import expr

In [0]:
spark = SparkSession.builder.appName('columnSelection').getOrCreate()
df = spark.createDataFrame([(1,2,3), (4,5,6)], ['feature_1', 'feature_2','Other'])
df.show()

+---------+---------+-----+
|feature_1|feature_2|Other|
+---------+---------+-----+
|        1|        2|    3|
|        4|        5|    6|
+---------+---------+-----+



In [0]:
selected_cols = df.select(df.colRegex('`feature_[123]`'))
selected_cols.show()

+---------+---------+
|feature_1|feature_2|
+---------+---------+
|        1|        2|
|        4|        5|
+---------+---------+



### 5.  Efficient Data Sampling (Stratified Sampling)
---

Sampling data is a common technique for exploratory analysis. PySpark offers the `sampleBy()` function to sample data based on a specified condition. This function is more efficient than sample function. `sampleBy()` function allows you to maintain the proportion of classes within your sample.


In [0]:
spark = SparkSession.builder.appName('Stratified_sample').getOrCreate()
df = spark.createDataFrame([(1, 'A'), (2, 'B'), (3, 'A'), (4, 'B'), (5, 'A'), (6, 'B')], ['Value', 'Class'])
df.show()

+-----+-----+
|Value|Class|
+-----+-----+
|    1|    A|
|    2|    B|
|    3|    A|
|    4|    B|
|    5|    A|
|    6|    B|
+-----+-----+



In [0]:
df.sampleBy('Class', fractions={'A':0.3, 'B':0.7}, seed=42).show()

+-----+-----+
|Value|Class|
+-----+-----+
|    4|    B|
|    6|    B|
+-----+-----+



### 6. Advance window Function
---
Window function are a staple in data analysis, but PySpark's window functions offer advanced capabilities. The `percent_rank()` function, for instance, calculate the relative rank of each row within a window

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import percent_rank

In [0]:
spark = SparkSession.builder.appName('Window-Function').getOrCreate()
df = spark.createDataFrame([(1, "A"), (2, "B"), (3, "A"), (4, "A"), (5, "A"), (6, "B"), (7, "B")], ["value", "class"])
df.show()

+-----+-----+
|value|class|
+-----+-----+
|    1|    A|
|    2|    B|
|    3|    A|
|    4|    A|
|    5|    A|
|    6|    B|
|    7|    B|
+-----+-----+



In [0]:
window_spaec = Window.partitionBy('class').orderBy('value')
result_df = df.withColumn('percent_rank', percent_rank().over(window_spaec))
result_df.show()


+-----+-----+------------------+
|value|class|      percent_rank|
+-----+-----+------------------+
|    1|    A|               0.0|
|    3|    A|0.3333333333333333|
|    4|    A|0.6666666666666666|
|    5|    A|               1.0|
|    2|    B|               0.0|
|    6|    B|               0.5|
|    7|    B|               1.0|
+-----+-----+------------------+



### 7. Broadcasting Small DataFrames
---
When joining a large DataFrame with a small one. Pyspark `broadcast()` function can significantly speed up the process by distributing the small DataFrame to all worker nodes.

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast

In [0]:
spark = SparkSession.builder.appName('BroadcastJoin').getOrCreate()

In [0]:
large_df = spark.createDataFrame([(1, "CA"), (2, "FL"), (3, "NY")], ["id", "state_code"])
large_df.show()
small_df = spark.createDataFrame([(1, "California"), (2, "Florida")], ["id", "states"])
small_df.show()

+---+----------+
| id|state_code|
+---+----------+
|  1|        CA|
|  2|        FL|
|  3|        NY|
+---+----------+

+---+----------+
| id|    states|
+---+----------+
|  1|California|
|  2|   Florida|
+---+----------+



In [0]:

result_df = large_df.join(broadcast(small_df), on="id", how="left")
result_df.show()

+---+----------+----------+
| id|state_code|    states|
+---+----------+----------+
|  1|        CA|California|
|  2|        FL|   Florida|
|  3|        NY|      null|
+---+----------+----------+

