# Подготовка

In [1]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

# Spark session & context
spark = SparkSession.builder.master("local") \
    .config('spark.sql.autoBroadcastJoinThreshold', 0) \
    .config('spark.jars.packages', 'ch.cern.sparkmeasure:spark-measure_2.12:0.23') \
    .config('spark.sparkmeasure.influxdbURL', 'http://sparkdashboard:8086') \
    .config('spark.extraListeners', 'ch.cern.sparkmeasure.InfluxDBSink') \
    .getOrCreate()

Датасеты можно скачать из репозитория 

https://github.com/databricks/LearningSparkV2/tree/master/databricks-datasets/learning-spark-v2/flights

https://github.com/databricks/LearningSparkV2/tree/master/databricks-datasets/learning-spark-v2/sf-fire

In [15]:
airport_codes = spark.read \
    .option("header", "true").option("delimiter", "\t").option("inferSchema", "true") \
    .csv("../datasets/airport-codes-na.txt") \
    .persist(pyspark.StorageLevel.DISK_ONLY)
airport_delays = spark.read.option("header", "true").option("inferSchema", "true") \
    .csv("../datasets/departuredelays.csv") \
    .persist(pyspark.StorageLevel.DISK_ONLY)

fire_calls = spark.read.option("header", "true").option("inferSchema", "true") \
    .csv("../datasets/sf-fire-calls.csv") \
    .persist(pyspark.StorageLevel.DISK_ONLY)
fire_incidents = spark.read.option("header", "true").option("inferSchema", "true") \
    .csv("../datasets/sf-fire-incidents.csv") \
    .persist(pyspark.StorageLevel.DISK_ONLY)

# Пример работы оптимизатора

In [16]:
airport_delays \
    .join(airport_codes, airport_codes['IATA'] == airport_delays['origin']) \
    .filter(col('State') == 'LA') \
    .explain(True)

== Parsed Logical Plan ==
'Filter ('State = LA)
+- Join Inner, (IATA#1644 = origin#1669)
   :- Relation [date#1666,delay#1667,distance#1668,origin#1669,destination#1670] csv
   +- Relation [City#1641,State#1642,Country#1643,IATA#1644] csv

== Analyzed Logical Plan ==
date: int, delay: int, distance: int, origin: string, destination: string, City: string, State: string, Country: string, IATA: string
Filter (State#1642 = LA)
+- Join Inner, (IATA#1644 = origin#1669)
   :- Relation [date#1666,delay#1667,distance#1668,origin#1669,destination#1670] csv
   +- Relation [City#1641,State#1642,Country#1643,IATA#1644] csv

== Optimized Logical Plan ==
Join Inner, (IATA#1644 = origin#1669)
:- Filter isnotnull(origin#1669)
:  +- InMemoryRelation [date#1666, delay#1667, distance#1668, origin#1669, destination#1670], StorageLevel(disk, 1 replicas)
:        +- FileScan csv [date#62,delay#63,distance#64,origin#65,destination#66] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1

# "Солёный" JOIN

In [17]:
fruit_stats_columns = ["fruit", "items_count"]
fruit_stats_data = [("Apple", "20")] * 10 + [("Orange", "10"), ("Pineapple", "30")]

fruit_stats_data_rdd = spark.sparkContext.parallelize(fruit_stats_data)

fruit_stats_data_df = fruit_stats_data_rdd.toDF(fruit_stats_columns)

fruit_stats_data_df.show()

+---------+-----------+
|    fruit|items_count|
+---------+-----------+
|    Apple|         20|
|    Apple|         20|
|    Apple|         20|
|    Apple|         20|
|    Apple|         20|
|    Apple|         20|
|    Apple|         20|
|    Apple|         20|
|    Apple|         20|
|    Apple|         20|
|   Orange|         10|
|Pineapple|         30|
+---------+-----------+



In [18]:
fruit_color_columns = ["fruit", "color"]
fruit_color_data = [("Apple", "red"), ("Orange", "orange"), ("Pineapple", "yellow")]

fruit_color_data_rdd = spark.sparkContext.parallelize(fruit_color_data)

fruit_color_data_df = fruit_color_data_rdd.toDF(fruit_color_columns)

fruit_color_data_df.show()

+---------+------+
|    fruit| color|
+---------+------+
|    Apple|   red|
|   Orange|orange|
|Pineapple|yellow|
+---------+------+



In [19]:
fruit_stats_data_df.groupBy(col('fruit').alias('partition')).agg(count(col('fruit')).alias('partition_size')).show()

+---------+--------------+
|partition|partition_size|
+---------+--------------+
|Pineapple|             1|
|   Orange|             1|
|    Apple|            10|
+---------+--------------+



In [20]:
salted_df = fruit_stats_data_df.withColumn('salt', (rand() * 3).cast('int'))
salted_df.show()

+---------+-----------+----+
|    fruit|items_count|salt|
+---------+-----------+----+
|    Apple|         20|   1|
|    Apple|         20|   0|
|    Apple|         20|   2|
|    Apple|         20|   0|
|    Apple|         20|   0|
|    Apple|         20|   2|
|    Apple|         20|   0|
|    Apple|         20|   2|
|    Apple|         20|   1|
|    Apple|         20|   1|
|   Orange|         10|   2|
|Pineapple|         30|   1|
+---------+-----------+----+



In [21]:
salted_df.groupBy((concat(col('fruit'), col('salt'))).alias('partition')) \
                            .agg(count(col('fruit')).alias('partition_size')).show()

+----------+--------------+
| partition|partition_size|
+----------+--------------+
|    Apple1|             3|
|Pineapple1|             1|
|    Apple2|             3|
|    Apple0|             4|
|   Orange2|             1|
+----------+--------------+



In [10]:
enriched_df = fruit_color_data_df \
    .select(col('fruit'), col('color'), explode(array([lit(i) for i in [0, 1, 2]])).alias('salt'))
enriched_df.show()

+---------+------+----+
|    fruit| color|salt|
+---------+------+----+
|    Apple|   red|   0|
|    Apple|   red|   1|
|    Apple|   red|   2|
|   Orange|orange|   0|
|   Orange|orange|   1|
|   Orange|orange|   2|
|Pineapple|yellow|   0|
|Pineapple|yellow|   1|
|Pineapple|yellow|   2|
+---------+------+----+



In [11]:
joined_df = salted_df.join(enriched_df, ['fruit', 'salt'])
joined_df.drop('salt').show()

+---------+-----------+------+
|    fruit|items_count| color|
+---------+-----------+------+
|    Apple|         20|   red|
|    Apple|         20|   red|
|    Apple|         20|   red|
|    Apple|         20|   red|
|    Apple|         20|   red|
|    Apple|         20|   red|
|    Apple|         20|   red|
|    Apple|         20|   red|
|    Apple|         20|   red|
|    Apple|         20|   red|
|   Orange|         10|orange|
|Pineapple|         30|yellow|
+---------+-----------+------+



In [12]:
fruit_stats_data_df.join(fruit_color_data_df, 'fruit').show()

+---------+-----------+------+
|    fruit|items_count| color|
+---------+-----------+------+
|    Apple|         20|   red|
|    Apple|         20|   red|
|    Apple|         20|   red|
|    Apple|         20|   red|
|    Apple|         20|   red|
|    Apple|         20|   red|
|    Apple|         20|   red|
|    Apple|         20|   red|
|    Apple|         20|   red|
|    Apple|         20|   red|
|   Orange|         10|orange|
|Pineapple|         30|yellow|
+---------+-----------+------+



# Пример использования broadcast

In [13]:
airport_delays \
    .join(airport_codes, airport_codes['IATA'] == airport_delays['origin']) \
    .filter(col('State') == 'LA') \
    .show()

+-------+-----+--------+------+-----------+----------+-----+-------+----+
|   date|delay|distance|origin|destination|      City|State|Country|IATA|
+-------+-----+--------+------+-----------+----------+-----+-------+----+
|1011030|   63|     247|   AEX|        DFW|Alexandria|   LA|    USA| AEX|
|1011838|  -14|     165|   AEX|        IAH|Alexandria|   LA|    USA| AEX|
|1011204|   -8|     165|   AEX|        IAH|Alexandria|   LA|    USA| AEX|
|1011710|   18|     435|   AEX|        ATL|Alexandria|   LA|    USA| AEX|
|1011115|   -3|     435|   AEX|        ATL|Alexandria|   LA|    USA| AEX|
|1021030|   -5|     247|   AEX|        DFW|Alexandria|   LA|    USA| AEX|
|1020537|   -7|     165|   AEX|        IAH|Alexandria|   LA|    USA| AEX|
|1021851|   -3|     165|   AEX|        IAH|Alexandria|   LA|    USA| AEX|
|1021204|   21|     165|   AEX|        IAH|Alexandria|   LA|    USA| AEX|
|1020600|   -6|     435|   AEX|        ATL|Alexandria|   LA|    USA| AEX|
|1021710|  115|     435|   AEX|       

In [14]:
# С броадкастом

bc_airport_codes = broadcast(airport_codes)

airport_delays \
    .join(bc_airport_codes, airport_codes['IATA'] == airport_delays['origin']) \
    .filter(col('State') == 'LA') \
    .show()

+-------+-----+--------+------+-----------+----------+-----+-------+----+
|   date|delay|distance|origin|destination|      City|State|Country|IATA|
+-------+-----+--------+------+-----------+----------+-----+-------+----+
|1011030|   63|     247|   AEX|        DFW|Alexandria|   LA|    USA| AEX|
|1011838|  -14|     165|   AEX|        IAH|Alexandria|   LA|    USA| AEX|
|1011204|   -8|     165|   AEX|        IAH|Alexandria|   LA|    USA| AEX|
|1011710|   18|     435|   AEX|        ATL|Alexandria|   LA|    USA| AEX|
|1011115|   -3|     435|   AEX|        ATL|Alexandria|   LA|    USA| AEX|
|1021030|   -5|     247|   AEX|        DFW|Alexandria|   LA|    USA| AEX|
|1020537|   -7|     165|   AEX|        IAH|Alexandria|   LA|    USA| AEX|
|1021851|   -3|     165|   AEX|        IAH|Alexandria|   LA|    USA| AEX|
|1021204|   21|     165|   AEX|        IAH|Alexandria|   LA|    USA| AEX|
|1020600|   -6|     435|   AEX|        ATL|Alexandria|   LA|    USA| AEX|
|1021710|  115|     435|   AEX|       

# Пример использования фильтра Блума

In [None]:
fire_incidents.select(col('Incident Number')).distinct().count()

In [None]:
fire_calls.select(col('IncidentNumber')).distinct().count()

In [None]:
fire_calls \
    .join(fire_incidents, fire_incidents['Incident Number'] == fire_calls['IncidentNumber']) \
    .select(col('IncidentNumber')).distinct().count()

In [None]:
# Без фильтра

fire_calls \
    .join(fire_incidents, fire_incidents['Incident Number'] == fire_calls['IncidentNumber']) \
    .count()

In [None]:
!pip install --no-cache-dir mmh3 bitarray

In [None]:
import math
import mmh3
from bitarray import bitarray
  
  
class BloomFilter(object):
  
    '''
    Class for Bloom filter, using murmur3 hash function
    '''
  
    def __init__(self, items_count, fp_prob):
        '''
        items_count : int
            Number of items expected to be stored in bloom filter
        fp_prob : float
            False Positive probability in decimal
        '''
        self.items_count = items_count
        
        # False possible probability in decimal
        self.fp_prob = fp_prob
  
        # Size of bit array to use
        self.size = self.get_size(items_count, fp_prob)
  
        # number of hash functions to use
        self.hash_count = self.get_hash_count(self.size, items_count)
  
        # Bit array of given size
        self.bit_array = bitarray(self.size)
  
        # initialize all bits as 0
        self.bit_array.setall(0)
  
    def add(self, item):
        '''
        Add an item in the filter
        '''
        digests = []
        for i in range(self.hash_count):
  
            # create digest for given item.
            # i work as seed to mmh3.hash() function
            # With different seed, digest created is different
            digest = mmh3.hash(item, i) % self.size
            digests.append(digest)
  
            # set the bit True in bit_array
            self.bit_array[digest] = True
        
    def union(self, other):
        """ Calculates the union of the two underlying bitarrays and returns
        a new bloom filter object."""
        new_bloom = self.copy()
        new_bloom.bit_array = new_bloom.bit_array | other.bit_array
        return new_bloom
  
    def check(self, item):
        '''
        Check for existence of an item in filter
        '''
        for i in range(self.hash_count):
            digest = mmh3.hash(item, i) % self.size
            if self.bit_array[digest] == False:
  
                # if any of bit is False then,its not present
                # in filter
                # else there is probability that it exist
                return False
        return True
    
    def copy(self):
        """Return a copy of this bloom filter.
        """
        new_filter = BloomFilter(self.items_count, self.fp_prob)
        new_filter.bit_array = self.bit_array.copy()
        return new_filter
    
    def set_bit_array(self, bit_array):
        self.bit_array = bit_array
  
    @classmethod
    def get_size(self, n, p):
        '''
        Return the size of bit array(m) to used using
        following formula
        m = -(n * lg(p)) / (lg(2)^2)
        n : int
            number of items expected to be stored in filter
        p : float
            False Positive probability in decimal
        '''
        m = -(n * math.log(p))/(math.log(2)**2)
        return int(m)
  
    @classmethod
    def get_hash_count(self, m, n):
        '''
        Return the hash function(k) to be used using
        following formula
        k = (m/n) * lg(2)
  
        m : int
            size of bit array
        n : int
            number of items expected to be stored in filter
        '''
        k = (m/n) * math.log(2)
        return int(k)

In [None]:
from random import shuffle
  
n = 20 #no of items to add
p = 0.05 #false positive probability
  
bloomf1 = BloomFilter(n,p)
bloomf2 = BloomFilter(n,p)
  
# words to be added
word_present1 = ['abound','abounds','abundance','abundant','accessible',
                'bloom','blossom','bolster','bonny','bonus','bonuses']
  
# words to be added
word_present2 = ['coherent','cohesive','colorful','comely','comfort',
                'gems','generosity','generous','generously','genial']
  
# word not added
word_absent = ['bluff','cheater','hate','war','humanity',
               'racism','hurt','nuke','gloomy','facebook',
               'geeksforgeeks','twitter']
  
for item in word_present1:
    bloomf1.add(item)
  
for item in word_present:
    bloomf2.add(item)
    
bloomf = bloomf1.union(bloomf2)
  
shuffle(word_present1)
shuffle(word_present2)
shuffle(word_absent)
  
test_words = word_present1 + word_present2 + word_absent
shuffle(test_words)
for word in test_words:
    if bloomf.check(word):
        if word in word_absent:
            print("'{}' is a false positive!".format(word))
        else:
            print("'{}' is probably present!".format(word))
    else:
        print("'{}' is definitely not present!".format(word))

In [None]:
# Создаем фильтр

filterSize = 31771
prob = 0.05

def fill_bloom_filter(bf, items):
    for i in items:
        bf.add(str(i[0]))
    return bf

bloom_filter = BloomFilter(filterSize, prob)

general_bit_array = fire_incidents.select(col('Incident Number')).rdd \
    .mapPartitions(lambda p: [fill_bloom_filter(BloomFilter(filterSize, prob), p).bit_array]) \
    .reduce(lambda a, b: a.bit_array | b.bit_array)

bloom_filter.set_bit_array(general_bit_array)

maybe_in_bf = udf(lambda incident_number: bloom_filter.check(str(incident_number)))

In [None]:
# Используем фильтр

fire_calls \
    .filter(maybe_in_bf(col('IncidentNumber')) == True) \
    .join(fire_incidents, fire_incidents['Incident Number'] == fire_calls['IncidentNumber']) \
    .count()

# Использование Bucketing

In [None]:
# Без бакетинга

fire_calls \
    .join(fire_incidents, fire_incidents['Incident Number'] == fire_calls['IncidentNumber']) \
    .count()

In [None]:
fire_calls.write \
    .bucketBy(16, 'IncidentNumber') \
    .saveAsTable('fire_calls_bucketed', format='csv', mode='overwrite')
    
fire_incidents.write \
    .bucketBy(16, 'Incident Number') \
    .saveAsTable('fire_incidents_bucketed', format='csv', mode='overwrite')

In [None]:
# С бакетингом

fire_calls_bucketed = spark.table('fire_calls_bucketed')
fire_incidents_bucketed = spark.table('fire_incidents_bucketed')

fire_calls_bucketed \
    .join(fire_incidents_bucketed, fire_incidents_bucketed['Incident Number'] == fire_calls_bucketed['IncidentNumber']) \
    .count()

In [None]:
str(1)

In [None]:
print(bc_airport_codes == None)

In [None]:
bc_airport_codes.unpersist()

In [None]:
airport_codes.select(col('State')).distinct().show()

In [None]:
airport_delays.show()

In [None]:
fire_incidents.select(col('Incident Number')).show()

In [None]:
fire_calls.select(col('IncidentNumber')).show()

In [None]:
fire_incidents \
    .join(fire_calls, fire_incidents['Incident Number'] == fire_calls['IncidentNumber']) \
    .show()

In [None]:
airport_codes.show()