In [1]:
# env : pixlake
# we focuing on pyspark dataframe processing
# documentation https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame
%load_ext autoreload
%autoreload 2

In [2]:
# make you auto compeletion faster
# https://stackoverflow.com/questions/40536560/ipython-and-jupyter-autocomplete-not-working
%config Completer.use_jedi = False

In [3]:
import os
import sys

def get_workstation_spark_path(where_are_you : str) -> str:
    if where_are_you == 'titan':
        return '/home/data/ryanchao2012/lib'
    elif where_are_you == 'thor':
        return '/opt/spark/versions'
    else:
        raise ValueError("wrong work station name")

spark_path = get_workstation_spark_path('thor')

print('You have pyspark version : ', os.listdir(spark_path))
os.environ['PYSPARK_PYTHON'] = sys.executable
# spark-2.3, spark-2.4
# os.environ['SPARK_HOME'] = os.path.join(spark_path,'spark-2.3')

# use spakr 3.1

os.environ['SPARK_HOME'] = '/opt/spark/versions/spark-3.1'

You have pyspark version :  ['spark-2.3', 'spark-3.1.2-bin-hadoop2.7', 'spark-3.0', 'spark-3.0.1-bin-hadoop2.7', 'spark-2.3.4-bin-hadoop2.7', 'spark-2.4.7-bin-hadoop2.7', 'spark-2.4', 'spark-3.1']


In [4]:
os.environ['SPARK_HOME']

'/opt/spark/versions/spark-3.1'

In [5]:
from os.path import join
import pandas as pd
from pyspark.sql import SparkSession as Session
from pyspark.sql import DataFrame
from pyspark import SparkConf as Conf
from pyspark.sql import functions as F, Window as W, types as T
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
C = F.col

In [6]:
conf = (Conf()
    .set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
    .set('spark.driver.memory', '10g')
    .set('spark.driver.maxResultSize', '8g')
   )

In [7]:
spark = (Session
     .builder
     .appName('pyspark-challenge')
     .master('local[10]')
     .config(conf=conf)
     .getOrCreate())

22/03/03 09:44:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [8]:
spark

# 1. Cross join within a partition (PN2)

In [9]:
# Cross join within a partition (or create pairs in partition)
# Aka Permutation N 2 in partition

# perform cross join in a window?
# just N vs N inner join =)
# https://stackoverflow.com/questions/53630342/sparksql-pyspark-crossjoin-over-dimension-for-a-specific-window

# Permutation (3 2) = 6

data = [
    ('台北市信義區信義路20號','肉多多火鍋'),
    ('台北市信義區信義路20號','文章牛肉湯台北信義店'),
    ('台北市信義區信義路20號','7-11'),
    ('台南市安平區安平路590號','文章牛肉湯總店'),
]

poi_sdf = spark.createDataFrame(data, ['addr','poi_name'])

(
     poi_sdf
    .withColumnRenamed('poi_name','poi_name_A')
    .join(
        poi_sdf
        .select(
            'addr',
            C('poi_name').alias('poi_name_B'),
        ),
        on=['addr']
    )
    .where(C("poi_name_A") != C("poi_name_B"))
).show(truncate=False, vertical=True)

                                                                                

-RECORD 0----------------------------
 addr       | 台北市信義區信義路20號 
 poi_name_A | 肉多多火鍋             
 poi_name_B | 文章牛肉湯台北信義店   
-RECORD 1----------------------------
 addr       | 台北市信義區信義路20號 
 poi_name_A | 肉多多火鍋             
 poi_name_B | 7-11                   
-RECORD 2----------------------------
 addr       | 台北市信義區信義路20號 
 poi_name_A | 文章牛肉湯台北信義店   
 poi_name_B | 肉多多火鍋             
-RECORD 3----------------------------
 addr       | 台北市信義區信義路20號 
 poi_name_A | 文章牛肉湯台北信義店   
 poi_name_B | 7-11                   
-RECORD 4----------------------------
 addr       | 台北市信義區信義路20號 
 poi_name_A | 7-11                   
 poi_name_B | 肉多多火鍋             
-RECORD 5----------------------------
 addr       | 台北市信義區信義路20號 
 poi_name_A | 7-11                   
 poi_name_B | 文章牛肉湯台北信義店   



# Join at least match 2 tags

* Need to avoid `CartesianProduct` - If you have a big / small sdf, you can use broadcast join to get `BroadcastNestedLoopJoin`

* According to your join expression, spark will detect your join types(equal join, non-equal join)

## `F.size(F.array_intersect(C("a_tags"),C("b_tags"))) >= 2`

In [10]:
# Cross join within a partition (or create pairs in partition)
# Aka Permutation N 2 in partition

# perform cross join in a window?
# just N vs N inner join =)
# https://stackoverflow.com/questions/53630342/sparksql-pyspark-crossjoin-over-dimension-for-a-specific-window

# Permutation (3 2) = 6

data = [
    ('a1',['火鍋','台北','麻辣鍋']),
    ('a2',['火鍋','台北','火烤兩吃']),
    ('a3',['火烤兩吃','台北','鮮魚湯']),
    ('a4',['台北','網美咖啡廳'])
]

feature_sdf = spark.createDataFrame(data, ['article_id','tags'])

at_least_2_matches : C = (
    F.size(F.array_intersect(C("a_tags"),C("b_tags"))) >= 2
)
    
joined_sdf = (
    feature_sdf
    .select(
        C("article_id").alias('a_article_id'),
        C("tags").alias('a_tags')
    )
    .join(
        (
            feature_sdf
            .select(
            C("article_id").alias('b_article_id'),
            C("tags").alias('b_tags')
            )
        ),
        on=at_least_2_matches
    )
)

joined_sdf.explain()

== Physical Plan ==
CartesianProduct (size(array_intersect(a_tags#33, b_tags#37), true) >= 2)
:- *(1) Project [article_id#28 AS a_article_id#32, tags#29 AS a_tags#33]
:  +- *(1) Scan ExistingRDD[article_id#28,tags#29]
+- *(2) Project [article_id#28 AS b_article_id#36, tags#29 AS b_tags#37]
   +- *(2) Scan ExistingRDD[article_id#28,tags#29]




In [11]:

joined_sdf = (
    feature_sdf
    .select(
        C("article_id").alias('a_article_id'),
        C("tags").alias('a_tags')
    )
    .join(
        F.broadcast(
            feature_sdf
            .select(
            C("article_id").alias('b_article_id'),
            C("tags").alias('b_tags')
            )
        ),
        on=at_least_2_matches
    )
)

joined_sdf.explain()

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, Inner, (size(array_intersect(a_tags#49, b_tags#53), true) >= 2)
:- *(1) Project [article_id#28 AS a_article_id#48, tags#29 AS a_tags#49]
:  +- *(1) Scan ExistingRDD[article_id#28,tags#29]
+- BroadcastExchange IdentityBroadcastMode, [id=#103]
   +- *(2) Project [article_id#28 AS b_article_id#52, tags#29 AS b_tags#53]
      +- *(2) Scan ExistingRDD[article_id#28,tags#29]




## Create hashes, apply join

* in the same stage, no shuffle needed

In [28]:
def fingerprint(text:str,joiner:str='') -> str:
    return joiner.join(sorted(text))

In [44]:

data = [
    ('a1',['火鍋','台北','麻辣鍋']),
    ('a2',['火鍋','台北','火烤兩吃']),
    ('a3',['火烤兩吃','台北','鮮魚湯']),
    ('a4',['台北','網美咖啡廳'])
]

feature_sdf = spark.createDataFrame(data, ['article_id','tags'])

# approach 1



cn2_sdf = (
    feature_sdf
    .withColumn('t1',F.explode('tags'))
    .select(
        'article_id',
        't1',
        F.explode('tags').alias('t2'),
        
    )
    .where(C("t1") != C("t2"))
    .withColumn('hashcode',F.concat_ws('',C("t1"),C("t2")))
    .withColumn('hashcode',F.udf(fingerprint,'string')(C("hashcode")))
    .drop_duplicates(['hashcode'])
    
)

print(cn2_sdf.count())
cn2_sdf.explain()
cn2_sdf.show()

# apply self join

8
== Physical Plan ==
SortAggregate(key=[hashcode#994], functions=[first(article_id#974, false), first(t1#979, false), first(t2#984, false)])
+- *(5) Sort [hashcode#994 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(hashcode#994, 200), ENSURE_REQUIREMENTS, [id=#2153]
      +- SortAggregate(key=[hashcode#994], functions=[partial_first(article_id#974, false), partial_first(t1#979, false), partial_first(t2#984, false)])
         +- *(4) Sort [hashcode#994 ASC NULLS FIRST], false, 0
            +- *(4) Project [article_id#974, t1#979, t2#984, pythonUDF0#1021 AS hashcode#994]
               +- BatchEvalPython [fingerprint(concat_ws(, t1#979, t2#984))], [pythonUDF0#1021]
                  +- *(3) Filter (isnotnull(t2#984) AND NOT (t1#979 = t2#984))
                     +- Generate explode(tags#975), [article_id#974, t1#979], false, [t2#984]
                        +- *(2) Filter isnotnull(t1#979)
                           +- Generate explode(tags#975), [article_id#974, tags#975]

# Join at least match 3 tags

if we wanna explode it to N, we need itertools maybe

In [49]:

data = [
    ('a1',['火鍋','台北','麻辣鍋']),
    ('a2',['火鍋','台北','火烤兩吃','麻辣鍋']),
    ('a3',['火烤兩吃','台北','鮮魚湯']),
    ('a4',['台北','網美咖啡廳'])
]

feature_sdf = spark.createDataFrame(data, ['article_id','tags'])

# approach 1


distinct_tag_combination = (
      (C("t1") != C("t2"))
    & (C("t1") != C("t3"))
    & (C("t2") != C("t3"))
)

cn3_sdf = (
    feature_sdf
    .withColumn('t1',F.explode('tags'))
    .withColumn('t2',F.explode('tags'))
    .withColumn('t3',F.explode('tags'))
    .select(
        'article_id',
        *['t1','t2','t3'],
    )
    .where(distinct_tag_combination)
    .withColumn('hashcode',F.concat_ws('',*[C(col) for col in ['t1','t2','t3']]))
    .withColumn('hashcode',F.udf(fingerprint,'string')(C("hashcode")))
    .drop_duplicates(['hashcode'])
)

print(cn3_sdf.count())
cn3_sdf.explain()
cn3_sdf.show()

5
== Physical Plan ==
SortAggregate(key=[hashcode#1224], functions=[first(article_id#1191, false), first(t1#1196, false), first(t2#1201, false), first(t3#1207, false)])
+- *(6) Sort [hashcode#1224 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(hashcode#1224, 200), ENSURE_REQUIREMENTS, [id=#2623]
      +- SortAggregate(key=[hashcode#1224], functions=[partial_first(article_id#1191, false), partial_first(t1#1196, false), partial_first(t2#1201, false), partial_first(t3#1207, false)])
         +- *(5) Sort [hashcode#1224 ASC NULLS FIRST], false, 0
            +- *(5) Project [article_id#1191, t1#1196, t2#1201, t3#1207, pythonUDF0#1257 AS hashcode#1224]
               +- BatchEvalPython [fingerprint(concat_ws(, t1#1196, t2#1201, t3#1207))], [pythonUDF0#1257]
                  +- *(4) Filter ((isnotnull(t3#1207) AND NOT (t1#1196 = t3#1207)) AND NOT (t2#1201 = t3#1207))
                     +- Generate explode(tags#1192), [article_id#1191, t1#1196, t2#1201], false, [t3#1207]
      

# Using broadcast

## analyze your data, then broadcast 

* daily updated poi (small) <---> historical articles (big)
* popular tags articles(small) <---> long tail articles (big)

## Compress data size, then broadcast


* numerical

Numeric types

ByteType: Represents 1-byte signed integer numbers. The range of numbers is from -128 to 127.

ShortType: Represents 2-byte signed integer numbers. The range of numbers is from -32768 to 32767.

IntegerType: Represents 4-byte signed integer numbers. The range of numbers is from -2147483648 to 2147483647.

LongType: Represents 8-byte signed integer numbers. The range of numbers is from -9223372036854775808 to 9223372036854775807.

FloatType: Represents 4-byte single-precision floating point numbers.

DoubleType: Represents 8-byte double-precision floating point numbers.

DecimalType: Represents arbitrary-precision signed decimal numbers. Backed internally by java.math.BigDecimal. A 
BigDecimal consists of an arbitrary precision integer unscaled value and a 32-bit integer scale.

1. default `Double` --> `Float` size : 1/2
2. if you using signed integer, you could make it smaller


* string

String type

StringType: Represents character string values.

VarcharType(length): A variant of StringType which has a length limitation. Data writing will fail if the input string exceeds the length limitation. Note: this type can only be used in table schema, not functions/operators.

CharType(length): A variant of VarcharType(length) which is fixed length. Reading column of type CharType(n) always returns string values of length n. Char type column comparison will pad the short one to the longer length.

1. hashing string to unsigned integer (smaller!)

    * using `F.monotonically_increasing_id()` to get complete hashed result(with float64 data type) -
    * using zipWithIndex from RDD to make hashes smaller
    https://stackoverflow.com/questions/48209667/using-monotonically-increasing-id-for-assigning-row-number-to-pyspark-datafram

In [15]:
# suppose 200M urls, with double by default
import numpy as np
import random

size= 2*1e6

data = np.random.random(size=int(size))


indexed_data_gen = (
    (i, float(number)) 
    for i, number 
    in enumerate(data)
)

# print(list(indexed_data_gen))

sdf = spark.createDataFrame(
    list(indexed_data_gen),
    schema=['vec_component_id','tfidf']
)

print(sdf.count())
sdf.printSchema()

22/02/26 18:29:59 WARN TaskSetManager: Stage 17 contains a task of very large size (3001 KiB). The maximum recommended task size is 1000 KiB.


2000000
root
 |-- vec_component_id: long (nullable = true)
 |-- tfidf: double (nullable = true)



In [16]:
import string


def get_link(n=30):
    return 'http://' + ''.join(random.choices(string.ascii_uppercase + string.digits, k=n))

save = True
size= int(2*1e6)

data = [get_link() for _ in range(size)]


indexed_data_gen = (
    (i, url) 
    for i, url 
    in enumerate(data)
)

sdf = spark.createDataFrame(
    list(indexed_data_gen),
    schema=['idx','url']
)

complete_hashed_sdf = (
    sdf
    .withColumn('hashes',F.monotonically_increasing_id())
    .select('idx','hashes')
)


smaller_complete_hashed_sdf = (
    sdf
    .rdd
    .zipWithIndex()
    .toDF()
    .select(
    # T.IntegerType() - -20億 ~ 20億
    C('_1.idx').cast(T.IntegerType()),
    C('_2').cast(T.IntegerType()).alias('hashes'),    
    )
)

if save:
    sdf.write.json('2m_url.json',mode='overwrite')
    complete_hashed_sdf.write.json('2m_url_hashed.json',mode='overwrite')
    smaller_complete_hashed_sdf.write.json('2m_url_hashed_smaller.json',mode='overwrite')

22/02/26 18:30:36 WARN TaskSetManager: Stage 19 contains a task of very large size (9046 KiB). The maximum recommended task size is 1000 KiB.
22/02/26 18:30:38 WARN TaskSetManager: Stage 20 contains a task of very large size (9046 KiB). The maximum recommended task size is 1000 KiB.
22/02/26 18:30:38 WARN TaskSetManager: Stage 21 contains a task of very large size (9046 KiB). The maximum recommended task size is 1000 KiB.
22/02/26 18:30:39 WARN TaskSetManager: Stage 22 contains a task of very large size (9046 KiB). The maximum recommended task size is 1000 KiB.
22/02/26 18:30:39 WARN TaskSetManager: Stage 23 contains a task of very large size (9046 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [17]:
! du -sh 2m_url.json

119M	2m_url.json


In [18]:
! du -sh 2m_url_hashed.json

69M	2m_url_hashed.json


In [19]:
# 調整資料型態確實是有一些幫助
! du -sh 2m_url_hashed_smaller.json

62M	2m_url_hashed_smaller.json


# Using shuffle hash join


https://www.hadoopinrealworld.com/how-does-shuffle-hash-join-work-in-spark/

https://www.waitingforcode.com/apache-spark-sql/what-new-apache-spark-3-1-join-evolutions/read

1. estimate the hash size (so you know what many memory needed)
2. check the execution plan
3. remember to deal the data skew

In [9]:
data = [
    ('a1',"火鍋"),
    ('a2','火鍋'),
    ('a2','台北'),
    ('a3','火烤兩吃'),
    ('a3','台北'),
    ('a3','鮮魚湯')
]

feature_sdf = (
    spark.createDataFrame(data, ['article_id','tag'])
)

feature_sdf.show(n=1)

joined_sdf = (
    feature_sdf
    .join(feature_sdf.hint("shuffle_hash"),
          on=['tag']
         )
)

joined_sdf.explain()

joined_sdf.show()

                                                                                

+----------+----+
|article_id| tag|
+----------+----+
|        a1|火鍋|
+----------+----+
only showing top 1 row

== Physical Plan ==
*(3) Project [tag#1, article_id#0, article_id#13]
+- *(3) ShuffledHashJoin [tag#1], [tag#14], Inner, BuildRight
   :- Exchange hashpartitioning(tag#1, 200), ENSURE_REQUIREMENTS, [id=#37]
   :  +- *(1) Filter isnotnull(tag#1)
   :     +- *(1) Scan ExistingRDD[article_id#0,tag#1]
   +- ReusedExchange [article_id#13, tag#14], Exchange hashpartitioning(tag#1, 200), ENSURE_REQUIREMENTS, [id=#37]


+--------+----------+----------+
|     tag|article_id|article_id|
+--------+----------+----------+
|火烤兩吃|        a3|        a3|
|    台北|        a2|        a3|
|    台北|        a2|        a2|
|    台北|        a3|        a3|
|    台北|        a3|        a2|
|  鮮魚湯|        a3|        a3|
|    火鍋|        a1|        a2|
|    火鍋|        a1|        a1|
|    火鍋|        a2|        a2|
|    火鍋|        a2|        a1|
+--------+----------+----------+

