# Setup

In [24]:
!pip install findspark
!pip install pyarrow==0.14

Collecting findspark
  Downloading findspark-1.4.2-py2.py3-none-any.whl (4.2 kB)
Installing collected packages: findspark
Successfully installed findspark-1.4.2
Collecting pyarrow==0.14
  Downloading pyarrow-0.14.0-cp37-cp37m-manylinux1_x86_64.whl (31.6 MB)
[K     |████████████████████████████████| 31.6 MB 16.7 MB/s eta 0:00:01
[?25hCollecting six>=1.0.0
  Downloading six-1.16.0-py2.py3-none-any.whl (11 kB)
Collecting numpy>=1.14
  Downloading numpy-1.21.4-cp37-cp37m-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (15.7 MB)
[K     |████████████████████████████████| 15.7 MB 67.5 MB/s eta 0:00:01
[31mERROR: tensorflow 2.6.0 has requirement numpy~=1.19.2, but you'll have numpy 1.21.4 which is incompatible.[0m
[31mERROR: tensorflow 2.6.0 has requirement six~=1.15.0, but you'll have six 1.16.0 which is incompatible.[0m
[31mERROR: sparktspy-nojars 2.0.5.0 has requirement pyspark==3.0.1, but you'll have pyspark 3.0.2 which is incompatible.[0m
[31mERROR: itc-utils 0.1.8 has requiremen

In [25]:
import findspark
findspark.init()

In [26]:
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [27]:
!python --version

Python 3.7.10


In [28]:
!pyspark --version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.2
      /_/
                        
Using Scala version 2.12.10, Eclipse OpenJ9 VM, 1.8.0_252
Branch HEAD
Compiled by user centos on 2021-02-16T04:53:13Z
Revision 648457905c4ea7d00e3d88048c63f360045f0714
Url https://gitbox.apache.org/repos/asf/spark.git
Type --help for more information.


# Spark Context and Spark Session

### Creating the spark session and context

In [29]:
sc = SparkContext()

spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

### Initialize spark session

In [30]:
spark

# RDDs

### Create an RDD

For demonstration purposes, we create an RDD here by calling `sc.parallelize()`\
We create an RDD which has integers from 1 to 10.

We then get the number of partitions using the `getNumPartitions()` function and the partitions using the `glom()` function.

In [31]:
nums = [i for i in range(10)]

rdd = sc.parallelize(nums)

print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(rdd.glom().collect()))

Number of partitions: 2
Partitioner: None
Partitions structure: [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]


In the above cell we can see the default partitions done for the RDD. However, we can change that by passing in an optional second argument to the `parallelize` function.
Let us now try with 2 and 15 partitions and see how they look like in memory.

In [32]:
rdd = sc.parallelize(nums, 2)

print("Default parallelism: {}".format(sc.defaultParallelism))
print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(rdd.glom().collect()))

Default parallelism: 2
Number of partitions: 2
Partitioner: None
Partitions structure: [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]


In [33]:
rdd.count()

10

In [34]:
rdd = sc.parallelize(nums, 15)

print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(rdd.glom().collect()))

Number of partitions: 15
Partitioner: None
Partitions structure: [[], [0], [1], [], [2], [3], [], [4], [5], [], [6], [7], [], [8], [9]]


Anoter way to partition data is by using the `partitionBy()` function. In this case, the dataset needs to be a tuple with a key/value pair as the default partioner uses a hash for the key to assign elements to a parition.


In [35]:
rdd = sc.parallelize(nums).map(lambda el: (el, el)).partitionBy(2).persist()

print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(rdd.glom().collect()))

j = 0
for i in rdd.glom().collect():
    j += 1
    print("Partition: " + str(j) + " " + str(i))

Number of partitions: 2
Partitioner: <pyspark.rdd.Partitioner object at 0x7effc875da10>
Partitions structure: [[(0, 0), (2, 2), (4, 4), (6, 6), (8, 8)], [(5, 5), (7, 7), (9, 9), (1, 1), (3, 3)]]
Partition: 1 [(0, 0), (2, 2), (4, 4), (6, 6), (8, 8)]
Partition: 2 [(5, 5), (7, 7), (9, 9), (1, 1), (3, 3)]


You can see that now the elements are distributed differently. A few interesting things happened:

```
parallelize(nums) - we are transforming Python array into RDD with no partitioning scheme,
map(lambda el: (el, el)) - transforming data into the form of a tuple,
partitionBy(2) - splitting data into 2 chunks using default hash partitioner
```

Explicit assignment of partition locations makes the hashing strategy more apparent. The use of the % function assigns it to the correct partition.

Let us now create a more practical dataset of transactions. We have 8 transactions from 4 different geographies as shown below.

In [36]:
transactions = [
    {'name': 'Bob', 'amount': 100, 'country': 'United Kingdom'},
    {'name': 'James', 'amount': 15, 'country': 'United Kingdom'},
    {'name': 'Marek', 'amount': 51, 'country': 'Poland'},
    {'name': 'Johannes', 'amount': 200, 'country': 'Germany'},
    {'name': 'Thomas', 'amount': 30, 'country': 'Germany'},
    {'name': 'Paul', 'amount': 75, 'country': 'Poland'},
    {'name': 'Pierre', 'amount': 120, 'country': 'France'},
    {'name': 'Frank', 'amount': 180, 'country': 'France'}
]

We know that further analysis will be performed analyzing many similar records within the same country. To optimize network traffic it seems to be a good idea to put records from one country in one node. To meet this requirement, we will need a custom partitioner: Custom partitioner — function returning an integer for given object (tuple key).


In [37]:
# Dummy implementation assuring that data for each country is in one partition
def country_partitioner(country):
    return hash(country)% (10**7+1)
    #return portable_hash(country)
    

# Validate results
print(country_partitioner("Poland"))
print(country_partitioner("Germany"))
print(country_partitioner("United Kingdom"))
print(country_partitioner("France"))

3121592
9635877
1694088
2050123


We can see that our custom partitioner creates a unique hash for each country name so it can be used to `partitionBy` our dataset.


In [38]:
rdd = sc.parallelize(transactions) \
        .map(lambda el: (el['country'], el)) \
        .partitionBy(5, country_partitioner)
    
print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(rdd.glom().collect()))

print("\n--\n")
for i, j in enumerate(rdd.glom().collect()):
    print("\npartition: " + str(i+1) + "\n"+ str(j))

Number of partitions: 5
Partitioner: <pyspark.rdd.Partitioner object at 0x7effc8768590>
Partitions structure: [[('United Kingdom', {'name': 'Bob', 'amount': 100, 'country': 'United Kingdom'}), ('United Kingdom', {'name': 'James', 'amount': 15, 'country': 'United Kingdom'})], [('France', {'name': 'Pierre', 'amount': 120, 'country': 'France'}), ('France', {'name': 'Frank', 'amount': 180, 'country': 'France'})], [('Germany', {'name': 'Johannes', 'amount': 200, 'country': 'Germany'}), ('Germany', {'name': 'Thomas', 'amount': 30, 'country': 'Germany'})], [('Poland', {'name': 'Paul', 'amount': 75, 'country': 'Poland'}), ('Poland', {'name': 'Marek', 'amount': 51, 'country': 'Poland'})], []]

--


partition: 1
[('United Kingdom', {'name': 'Bob', 'amount': 100, 'country': 'United Kingdom'}), ('United Kingdom', {'name': 'James', 'amount': 15, 'country': 'United Kingdom'})]

partition: 2
[('France', {'name': 'Pierre', 'amount': 120, 'country': 'France'}), ('France', {'name': 'Frank', 'amount': 18

Using the partitioning scheme, we can now carry out calculations such as total revenue/sales as shown below.

In [39]:
def sum_sales(iterator):
    yield sum(transaction[1]['amount'] for transaction in iterator)

In [40]:
by_country = sc.parallelize(transactions) \
        .map(lambda el: (el['country'], el)) \
        .partitionBy(5, country_partitioner)
    
print("Partitions structure: {}".format(by_country.glom().collect()))

# Sum sales in each partition
sum_amounts = by_country \
    .mapPartitions(sum_sales) \
    .collect()

print("Total sales for each partition: {}".format(sum_amounts))

Partitions structure: [[('United Kingdom', {'name': 'Bob', 'amount': 100, 'country': 'United Kingdom'}), ('United Kingdom', {'name': 'James', 'amount': 15, 'country': 'United Kingdom'})], [('France', {'name': 'Pierre', 'amount': 120, 'country': 'France'}), ('France', {'name': 'Frank', 'amount': 180, 'country': 'France'})], [('Germany', {'name': 'Johannes', 'amount': 200, 'country': 'Germany'}), ('Germany', {'name': 'Thomas', 'amount': 30, 'country': 'Germany'})], [('Poland', {'name': 'Paul', 'amount': 75, 'country': 'Poland'}), ('Poland', {'name': 'Marek', 'amount': 51, 'country': 'Poland'})], []]
Total sales for each partition: [115, 300, 230, 126, 0]


# DataFrames

### Create the DataFrame

In [41]:
df = spark.createDataFrame(transactions)



In [42]:
df.show()

+------+--------------+--------+
|amount|       country|    name|
+------+--------------+--------+
|   100|United Kingdom|     Bob|
|    15|United Kingdom|   James|
|    51|        Poland|   Marek|
|   200|       Germany|Johannes|
|    30|       Germany|  Thomas|
|    75|        Poland|    Paul|
|   120|        France|  Pierre|
|   180|        France|   Frank|
+------+--------------+--------+



In [43]:
print("Number of partitions: {}".format(df.rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(df.rdd.glom().collect()))

Number of partitions: 2
Partitioner: <pyspark.rdd.Partitioner object at 0x7effc8768590>
Partitions structure: [[Row(amount=100, country='United Kingdom', name='Bob'), Row(amount=15, country='United Kingdom', name='James'), Row(amount=51, country='Poland', name='Marek'), Row(amount=200, country='Germany', name='Johannes')], [Row(amount=30, country='Germany', name='Thomas'), Row(amount=75, country='Poland', name='Paul'), Row(amount=120, country='France', name='Pierre'), Row(amount=180, country='France', name='Frank')]]


In [44]:
for i, j in enumerate(df.rdd.glom().collect()):
    print("partition: " + str(i+1) + "\n"+ str(j))

partition: 1
[Row(amount=100, country='United Kingdom', name='Bob'), Row(amount=15, country='United Kingdom', name='James'), Row(amount=51, country='Poland', name='Marek'), Row(amount=200, country='Germany', name='Johannes')]
partition: 2
[Row(amount=30, country='Germany', name='Thomas'), Row(amount=75, country='Poland', name='Paul'), Row(amount=120, country='France', name='Pierre'), Row(amount=180, country='France', name='Frank')]


For dataframes we can repartition the dataset by column using the `repartition()` function. The cell below shows how we can partition the dataset by country.

In [45]:
# Repartition by column
df2 = df.repartition(10,"country")

print("\nAfter 'repartition()'")
print("Number of partitions: {}".format(df2.rdd.getNumPartitions()))
print("Partitioner: {}".format(df2.rdd.partitioner))
print("Partitions structure: {}".format(df2.rdd.glom().collect()))


After 'repartition()'
Number of partitions: 10
Partitioner: None
Partitions structure: [[Row(amount=120, country='France', name='Pierre'), Row(amount=180, country='France', name='Frank')], [], [Row(amount=200, country='Germany', name='Johannes'), Row(amount=30, country='Germany', name='Thomas')], [], [Row(amount=51, country='Poland', name='Marek'), Row(amount=75, country='Poland', name='Paul')], [Row(amount=100, country='United Kingdom', name='Bob'), Row(amount=15, country='United Kingdom', name='James')], [], [], [], []]


In [46]:
for i, j in enumerate(df2.rdd.glom().collect()):
    print("partition: " + str(i+1) + "\n"+ str(j))

partition: 1
[Row(amount=120, country='France', name='Pierre'), Row(amount=180, country='France', name='Frank')]
partition: 2
[]
partition: 3
[Row(amount=200, country='Germany', name='Johannes'), Row(amount=30, country='Germany', name='Thomas')]
partition: 4
[]
partition: 5
[Row(amount=51, country='Poland', name='Marek'), Row(amount=75, country='Poland', name='Paul')]
partition: 6
[Row(amount=100, country='United Kingdom', name='Bob'), Row(amount=15, country='United Kingdom', name='James')]
partition: 7
[]
partition: 8
[]
partition: 9
[]
partition: 10
[]


# End spark session

In [47]:
spark.stop()