<a href="https://colab.research.google.com/github/aaalexlit/big-data-hadoop-spark-edx-course/blob/main/Data_Partitioning_101.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark
!pip install findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 37 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 56.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=7ea21d6be6e5cdfcb68260bce818678cf0bd4972ca890627b0a22acbfd8bb1f3
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting findspark

In [2]:
import findspark
findspark.init()

In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

## Spark Context and Session

In [4]:
sc = SparkContext()
spark = SparkSession.builder.getOrCreate()

In [5]:
spark

## RDDs ang partitions

`sc.parallelize()` creates RDD   
`glom()` function gets the partitions

In [6]:
nums = [i for i in range(10)]

rdd = sc.parallelize(nums)

print(f"Number of partitions {rdd.getNumPartitions()}")
print(f"Partitioner {rdd.partitioner}")
print(f"Partitions structure {rdd.glom().collect()}")

Number of partitions 2
Partitioner None
Partitions structure [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]


In [7]:
# number of elements in the RDD
rdd.count()

10

Now parallelize into 15 partitions

In [8]:
rdd = sc.parallelize(nums, 15)
print(f"Number of partitions {rdd.getNumPartitions()}")
print(f"Partitioner {rdd.partitioner}")
print(f"Partitions structure {rdd.glom().collect()}")

Number of partitions 15
Partitioner None
Partitions structure [[], [0], [1], [], [2], [3], [], [4], [5], [], [6], [7], [], [8], [9]]


## Partition using `partitionBy()`

 In this case, the dataset needs to be a tuple with a key/value pair as the default partioner uses a hash for the key to assign elements to a parition.



In [9]:
from re import I
rdd = sc.parallelize(nums) \
        .map(lambda el: (el, el)) \
        .partitionBy(2) \
        .persist()

print(f"Number of partitions {rdd.getNumPartitions()}")
print(f"Partitioner {rdd.partitioner}")
print(f"Partitions structure {rdd.glom().collect()}")

for i, partition in enumerate(rdd.glom().collect()):
  print(f"partition {i+1}: {partition}")

Number of partitions 2
Partitioner <pyspark.rdd.Partitioner object at 0x7f4075331a90>
Partitions structure [[(0, 0), (2, 2), (4, 4), (6, 6), (8, 8)], [(1, 1), (3, 3), (5, 5), (7, 7), (9, 9)]]
partition 1: [(0, 0), (2, 2), (4, 4), (6, 6), (8, 8)]
partition 2: [(1, 1), (3, 3), (5, 5), (7, 7), (9, 9)]


`parallelize(nums)` - transforming Python array into RDD with no partitioning scheme,    
`map(lambda el: (el, el))` - transforming data into the form of a tuple,  
`partitionBy(2)` - splitting data into 2 chunks using default hash partitioner

## More partitioning

In [10]:
transactions = [
    {'name': 'Bob', 'amount': 100, 'country': 'United Kingdom'},
    {'name': 'James', 'amount': 15, 'country': 'United Kingdom'},
    {'name': 'Marek', 'amount': 51, 'country': 'Poland'},
    {'name': 'Johannes', 'amount': 200, 'country': 'Germany'},
    {'name': 'Thomas', 'amount': 30, 'country': 'Germany'},
    {'name': 'Paul', 'amount': 75, 'country': 'Poland'},
    {'name': 'Pierre', 'amount': 120, 'country': 'France'},
    {'name': 'Frank', 'amount': 180, 'country': 'France'}
]

If further analysis will be performed analyzing many similar records within the same country it makes sense to put records from one country in one node   
For that a Custom partitioner is necessary  

Custom partitioner — function returning an integer for given object (tuple key)

In [11]:
def country_partitioner(country):
  return hash(country) % (10**7 - 1)

print(country_partitioner("Poland"))
print(country_partitioner("Germany"))
print(country_partitioner("United Kingdom"))
print(country_partitioner("France"))

8909681
5103677
5521867
2619581


custom partitioner creates a unique hash for each country name so it can be passed as a parameter to `partitionBy`

In [12]:
rdd = sc.parallelize(transactions) \
      .map(lambda elem: (elem['country'], elem)) \
      .partitionBy(5, country_partitioner)

print(f"Number of partitions {rdd.getNumPartitions()}")
print(f"Partitioner {rdd.partitioner}")
print(f"Partitions structure {rdd.glom().collect()}")

print("\n--\n")

for i, j in enumerate(rdd.glom().collect()):
  print(f"\npartition {i + 1}:\n{str(j)}")

Number of partitions 5
Partitioner <pyspark.rdd.Partitioner object at 0x7f407429ea00>
Partitions structure [[], [('France', {'name': 'Pierre', 'amount': 120, 'country': 'France'}), ('France', {'name': 'Frank', 'amount': 180, 'country': 'France'})], [('Poland', {'name': 'Marek', 'amount': 51, 'country': 'Poland'}), ('Poland', {'name': 'Paul', 'amount': 75, 'country': 'Poland'})], [('United Kingdom', {'name': 'Bob', 'amount': 100, 'country': 'United Kingdom'}), ('United Kingdom', {'name': 'James', 'amount': 15, 'country': 'United Kingdom'})], [('Germany', {'name': 'Johannes', 'amount': 200, 'country': 'Germany'}), ('Germany', {'name': 'Thomas', 'amount': 30, 'country': 'Germany'})]]

--


partition 1:
[]

partition 2:
[('France', {'name': 'Pierre', 'amount': 120, 'country': 'France'}), ('France', {'name': 'Frank', 'amount': 180, 'country': 'France'})]

partition 3:
[('Poland', {'name': 'Marek', 'amount': 51, 'country': 'Poland'}), ('Poland', {'name': 'Paul', 'amount': 75, 'country': 'Pol

carry out calculations such as total revenue/sales 

In [13]:
def sum_sales(iterator):
  yield sum(transaction[1]['amount'] for transaction in iterator)

In [14]:
by_country = sc.parallelize(transactions) \
             .map(lambda elem: (elem['country'], elem)) \
             .partitionBy(5, country_partitioner)
          

In [15]:
sum_amounts = rdd.mapPartitions(sum_sales).collect()

print(f"Total sales for each partition: {sum_amounts}")

Total sales for each partition: [0, 300, 126, 115, 230]


# DataFrames
Create a DataFrame from a python list

In [16]:
df = spark.createDataFrame(transactions)

In [17]:
df.show()

+------+--------------+--------+
|amount|       country|    name|
+------+--------------+--------+
|   100|United Kingdom|     Bob|
|    15|United Kingdom|   James|
|    51|        Poland|   Marek|
|   200|       Germany|Johannes|
|    30|       Germany|  Thomas|
|    75|        Poland|    Paul|
|   120|        France|  Pierre|
|   180|        France|   Frank|
+------+--------------+--------+



In [19]:
print(f"Number of partitions {df.rdd.getNumPartitions()}")
print(f"Partitioner {df.rdd.partitioner}")

for i, partition in enumerate(df.rdd.glom().collect()):
  print(f"partition {i+1}: {partition}")

Number of partitions 2
Partitioner None
partition 1: [Row(amount=100, country='United Kingdom', name='Bob'), Row(amount=15, country='United Kingdom', name='James'), Row(amount=51, country='Poland', name='Marek'), Row(amount=200, country='Germany', name='Johannes')]
partition 2: [Row(amount=30, country='Germany', name='Thomas'), Row(amount=75, country='Poland', name='Paul'), Row(amount=120, country='France', name='Pierre'), Row(amount=180, country='France', name='Frank')]


## Re-partition the DataFrame by country

In [21]:
df2 = df.repartition(10, "country")

print(f"Number of partitions {df2.rdd.getNumPartitions()}")
print(f"Partitioner {df2.rdd.partitioner}")

for i, partition in enumerate(df2.rdd.glom().collect()):
  print(f"partition {i+1}: {partition}")

Number of partitions 10
Partitioner None
partition 1: [Row(amount=120, country='France', name='Pierre'), Row(amount=180, country='France', name='Frank')]
partition 2: []
partition 3: [Row(amount=200, country='Germany', name='Johannes'), Row(amount=30, country='Germany', name='Thomas')]
partition 4: []
partition 5: [Row(amount=51, country='Poland', name='Marek'), Row(amount=75, country='Poland', name='Paul')]
partition 6: [Row(amount=100, country='United Kingdom', name='Bob'), Row(amount=15, country='United Kingdom', name='James')]
partition 7: []
partition 8: []
partition 9: []
partition 10: []
