In [1]:
pip install apache-beam

Collecting apache-beam
  Downloading apache_beam-2.49.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (14.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.6/14.6 MB[0m [31m36.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting crcmod<2.0,>=1.7 (from apache-beam)
  Downloading crcmod-1.7.tar.gz (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m8.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting orjson<4.0 (from apache-beam)
  Downloading orjson-3.9.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (140 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m140.3/140.3 kB[0m [31m15.7 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting dill<0.3.2,>=0.3.1.1 (from apache-beam)
  Downloading dill-0.3.1.1.tar.gz (151 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m16.5 MB/s[0m eta [36m0:00:00[0m
[?2

In [2]:
import apache_beam as beam

# CoGroupByKey
CoGroupByKey, Apache Beam içinde iki veya daha fazla PCollection'ı anahtarlarına göre gruplamak ve bu grupları işlemek için kullanılan bir işlemdir. Bu işlem, bir anahtara sahip her PCollection öğesini bir araya getirir ve bu öğeleri anahtarlarına göre gruplar.

Bir CoGroupByKey işlemi, birden fazla PCollection'ın her birinin aynı anahtara sahip öğelerini birleştirir ve sonuç olarak her anahtar için bir grup oluşturur. Her grup, farklı PCollection'lardan gelen öğeleri içerebilir. Bu işlem, özellikle birden fazla veri kaynağını birleştirip analiz etmeniz gereken senaryolarda kullanışlıdır.

In [3]:
""" Bu sadece 2 argüman alan yerlerde işe yaramaktadır."""
with beam.Pipeline() as pipeline:
    orders = pipeline | 'Create orders' >> beam.Create([
        ('order-1', 'product-1'),
        ('order-1', 'product-2'),
        ('order-2', 'product-1'),
        ('order-2', 'product-3'),
    ])

    products = pipeline | 'Create products' >> beam.Create([
        ('product-1', 'Apple'),
        ('product-2', 'Banana'),
        ('product-3', 'Orange'),
    ])

    grouped_data = (({
        'orders': orders, 'products': products
    } )
    | 'Merge' >> beam.CoGroupByKey()
    | beam.Map(print))



('order-1', {'orders': ['product-1', 'product-2'], 'products': []})
('order-2', {'orders': ['product-1', 'product-3'], 'products': []})
('product-1', {'orders': [], 'products': ['Apple']})
('product-2', {'orders': [], 'products': ['Banana']})
('product-3', {'orders': [], 'products': ['Orange']})


# CombineGlobally

CombineGlobally, Apache Beam içinde bir PCollection'ın tüm öğelerini birleştirip bir tek sonuç üretmek için kullanılan bir dönüşüm işlemidir. Bu işlem, veri akışındaki tüm öğeleri toplayarak, ortalama hesaplayarak, en büyük veya en küçük değeri bulup, özel bir işlem yaparak veya özel bir işlevi kullanarak sonuç üretmek için kullanılabilir.

CombineGlobally işlemi, birleştirme işlemi sırasında tüm öğelerin global bir şekilde işlenmesini sağlar. Yani, her öğe ayrı ayrı işlenmez, tamamı işlenir ve sonuç üretilir.

In [4]:
import apache_beam as beam

with beam.Pipeline() as pipeline:
    numbers = pipeline | 'Create numbers' >> beam.Create([1, 2, 3, 4, 5])

    total_sum = numbers | 'Calculate total sum' >> beam.CombineGlobally(sum)
    average = numbers | 'Calculate average' >> beam.CombineGlobally(beam.combiners.MeanCombineFn())

    total_sum | 'Print total sum' >> beam.Map(print)
    average | 'Print average' >> beam.Map(print)

15
3.0


In [5]:
"""
Apache-beam dökümantasyonunda bulunan örnekte ise  farklı set'leri içeren PCollectionlarda birleştirerek
bu set'lerin kesişimini bulan bir Apache Beam işlemini gerçekleştiriyor.
"""
def get_common_items(sets):
    return set.intersection(*(sets or [set()]))

with beam.Pipeline() as pipeline:
    common_items = (
        pipeline
        | 'Create produce' >> beam.Create([
            {'🍓', '🥕', '🍌', '🍅', '🌶️'},
            {'🍇', '🥕', '🥝', '🍅', '🥔'},
            {'🍉', '🥕', '🍆', '🍅', '🍍'},
            {'🥑', '🥕', '🌽', '🍅', '🥥'},
        ])
        | 'Get common items' >> beam.CombineGlobally(get_common_items)
        | beam.Map(print))

{'🍅', '🥕'}


In [6]:
"""
Bu kod, beam.CombineGlobally() işlemi içinde bir işlev kullanarak farklı set'leri birleştirerek
bu set'lerin kesişimini bulan bir Apache Beam işlemini gerçekleştiriyor.
Ancak bu sefer, belirli öğeleri kesişimden çıkararak istisnaları işliyor.
"""
with beam.Pipeline() as pipeline:
    common_items_with_exceptions = (
        pipeline
        | 'Create produce' >> beam.Create([
            {'🍓', '🥕', '🍌', '🍅', '🌶️'},
            {'🍇', '🥕', '🥝', '🍅', '🥔'},
            {'🍉', '🥕', '🍆', '🍅', '🍍'},
            {'🥑', '🥕', '🌽', '🍅', '🥥'},
        ])
        | 'Get common items with exceptions' >> beam.CombineGlobally(
            lambda sets, exclude: \
                set.intersection(*(sets or [set()])) - exclude,
            exclude={'🥕'})
        | beam.Map(print)
    )


{'🍅'}


# CombinePerKey

CombinePerKey, Apache Beam içinde her bir anahtara sahip öğeleri birleştirmek ve bu birleştirme işlemi sonucunda bir PCollection oluşturmak için kullanılan bir dönüşüm işlemidir. Bu işlem, belirli bir anahtara sahip öğeleri gruplayarak birleştirir ve sonuç olarak her bir anahtar için bir çıktı üretir.

CombinePerKey işlemi, GroupByKey işlemine benzer. Ancak, GroupByKey işlemi her bir anahtar için bir grup oluştururken, CombinePerKey işlemi bu grupları birleştirerek sonuçları üretir. Bu işlem sayesinde her anahtar için bir grup oluşturmak yerine, her anahtar için bir sonuç üretilir.

In [7]:
def combine_fn(values):
    return sum(values)

with beam.Pipeline() as pipeline:
    orders = pipeline | 'Create orders' >> beam.Create([
        ('apple', 3),
        ('banana', 2),
        ('apple', 5),
        ('banana', 1),
        ('orange', 4),
    ])

    combined_orders = orders | 'Combine orders' >> beam.CombinePerKey(combine_fn)

    combined_orders | beam.Map(print)

('apple', 8)
('banana', 3)
('orange', 4)


# Count.Globally()

Count.Globally() Apache Beam'de kullanılan bir işlevdir ve bir PCollection içindeki öğelerin toplam sayısını hesaplamak için kullanılır. Bu işlev, tüm veri kümesinin genel sayısını döndürür.

In [8]:
import apache_beam as beam

with beam.Pipeline() as pipeline:
    numbers = pipeline | 'Create numbers' >> beam.Create([1, 2, 3, 4, 5])

    total_count = numbers | 'Calculate total count' >> beam.combiners.Count.Globally()

    total_count | 'Print total count' >> beam.Map(print)

5


In [9]:
""" apache beam dökümantasyonu içinde yer alan örnekte Count.PerKey() kullanımı Key değerleri ile birlikte Pcollection içinde kaç adet olduğunu döndürür."""
with beam.Pipeline() as pipeline:
    total_elements_per_keys = (
        pipeline
        | 'Create plants' >> beam.Create([
            ('spring', '🍓'),
            ('spring', '🥕'),
            ('summer', '🥕'),
            ('fall', '🥕'),
            ('spring', '🍆'),
            ('winter', '🍆'),
            ('spring', '🍅'),
            ('summer', '🍅'),
            ('fall', '🍅'),
            ('summer', '🌽'),
        ])
        | 'Count elements per key' >> beam.combiners.Count.PerKey()
        | beam.Map(print))

('spring', 4)
('summer', 3)
('fall', 2)
('winter', 1)


In [10]:
"""
Yine aynı şekilde yer alan diğer bir örnekte Pcollection içide yer alan her bir elementten kaç adet olduğunu döndüren bir PerElement() kullanım örneği
"""
with beam.Pipeline() as pipeline:
    total_unique_elements = (
        pipeline
        | 'Create produce' >> beam.Create(
            ['🍓', '🥕', '🥕', '🥕', '🍆', '🍆', '🍅', '🍅', '🍅', '🌽'])
        | 'Count unique elements' >> beam.combiners.Count.PerElement()
        | beam.Map(print))

('🍓', 1)
('🥕', 3)
('🍆', 2)
('🍅', 3)
('🌽', 1)


# GroupBy(), GroupByKey() , GroupIntoBatches()
# GroupBy():
GroupBy işlemi, belirli bir özelliğe göre verileri gruplamak ve bu grupları işlemek için kullanılır. Örneğin, belirli bir anahtar sütunu kullanarak verileri gruplandırmak ve bu gruplarda istatistiksel hesaplamalar yapmak gibi durumlar için kullanılabilir.

# GroupByKey():
GroupByKey işlemi, anahtar-değer çiftleri içeren verileri, anahtarlara göre gruplandırmak için kullanılır. Bu operasyon, Apache Beam'in anahtar-tabanlı işlem modeline uygun olarak tasarlanmıştır. Bu sayede aynı anahtara sahip veriler bir araya getirilir ve daha sonra bu anahtarın değeri üzerinde işlem yapılabilir.

# GroupIntoBatches():
GroupIntoBatches işlemi, belirli bir boyutta veri grupları (batch'ler) oluşturmak için kullanılır. Bu işlem, büyük veri kümelerini daha küçük gruplara bölmek ve bu gruplar üzerinde işlem yapmak için kullanılabilir. Batch işleme senaryolarında kullanışlıdır.

In [11]:
# Veri akışı oluşturalım
veri = [
    ('anahtar1', 10),
    ('anahtar2', 20),
    ('anahtar1', 30),
    ('anahtar2', 25),
    ('anahtar3', 15)
]

with beam.Pipeline() as pipeline:
    veri_akisi = pipeline | beam.Create(veri)

    # GroupByKey ile anahtara göre gruplandırma yapalım
    gruplar = veri_akisi | beam.GroupByKey()

    # Grupları yazdıralım
    gruplar | beam.Map(print)

('anahtar1', [10, 30])
('anahtar2', [20, 25])
('anahtar3', [15])


In [14]:
import apache_beam as beam

# Veri akışı oluşturalım
veri = [
    ('anahtar1', 10),
    ('anahtar2', 20),
    ('anahtar1', 30),
    ('anahtar2', 25),
    ('anahtar3', 15)
]

with beam.Pipeline() as pipeline:
    veri_akisi = pipeline | beam.Create(veri)

    # GroupByKey ile anahtara göre gruplandıralım
    gruplar = veri_akisi | beam.GroupByKey()

    # Her grup için toplamı hesaplayan bir ParDo işlevi
    def hesapla_toplam(element):
        anahtar, degerler = element
        toplam = sum(degerler)
        return [(anahtar, toplam)]

    # Grupları toplamı hesaplayarak işle
    toplamlar = gruplar | beam.ParDo(hesapla_toplam)

    # Toplam sonuçları yazdıralım
    toplamlar | beam.Map(print)

('anahtar1', 40)
('anahtar2', 45)
('anahtar3', 15)


In [23]:
# Veriyi anahtar-değer çiftlerine dönüştürelim
veri = [1,1,1,2,2,2,2,2,5,4,6,4,7,89,78]
veri_ciftleri = [(i, i) for i in veri]

with beam.Pipeline() as pipeline:
    veri_akisi = pipeline | beam.Create(veri_ciftleri)

    # Veriyi belirli boyuttaki batch'lere bölelim
    batchler = veri_akisi | beam.GroupIntoBatches(3) #1 keyine karşılık gelen 3 adet value var , 2 keyine karşılık gelen 5 adet value değeri var fakat 3 erli gruplandığı için başka bir batche geçerek gruplamaya devam ediyor.

    # Batch'leri yazdıralım
    batchler | beam.Map(print)

(1, [1, 1, 1])
(2, [2, 2, 2])
(2, [2, 2])
(5, [5])
(4, [4, 4])
(6, [6])
(7, [7])
(89, [89])
(78, [78])


# Lates

Apache Beam içinde "Latest" işlemi, gelen verileri belirli bir anahtar ile gruplayarak, her grup için en son gelen veriyi seçmenizi sağlar. Bu işlem, her anahtarın en son durumunu veya güncel değerini bulmak için kullanışlıdır.

Örnek olarak, "Latest" işlemi kullanarak anahtarlarla ilişkilendirilmiş en son değeri seçmek istediğinizi varsayalım. Aşağıda bu işlemi gösteren bir Apache Beam örneği bulabilirsiniz:

beam.combiners.Latest.Globally() Apache Beam içinde kullanılan bir CombineGlobally işlemidir. Bu işlem, tüm gelen veriler arasında en son gelen değeri seçmenizi sağlar. Yani, tüm verileri birleştirip en son gelen veriyi seçer.

In [30]:
import time
def to_unix_time(time_str, format='%Y-%m-%d %H:%M:%S'):
    return time.mktime(time.strptime(time_str, format))

with beam.Pipeline() as pipeline:
    latest_element = (
        pipeline
        | 'Create crops' >> beam.Create([
            {
                'item': '🥬', 'harvest': '2020-02-24 00:00:00'
            },
            {
                'item': '🍓', 'harvest': '2020-06-16 00:00:00'
            },
            {
                'item': '🥕', 'harvest': '2020-07-17 00:00:00'
            },
            {
                'item': '🍆', 'harvest': '2020-10-26 00:00:00'
            },
            {
                'item': '🍅', 'harvest': '2020-10-01 00:00:00'
            },
        ])
        | 'With timestamps' >> beam.Map(
            lambda crop: beam.window.TimestampedValue(
                crop['item'], to_unix_time(crop['harvest']))) #to_unix_time(crop['harvest'])) ifadesi, bir crop verisini alır ve crop verisindeki item değerini, to_unix_time(crop['harvest']) zaman damgası ile işaretler.
        | 'Get latest element' >> beam.combiners.Latest.Globally()
        | beam.Map(print))

🍆


beam.combiners.Latest.PerKey() Apache Beam içinde kullanılan bir CombinePerKey işlemidir. Bu işlem, verileri belirli bir anahtar ile gruplayarak her anahtar grubunda en son gelen değeri seçmenizi sağlar.

Özellikle zaman serisi verilerini işlerken veya güncel durumları takip ederken kullanışlıdır. Her anahtar grubundaki en son veriyi seçerek grupları güncel bir şekilde yönetebilirsiniz.

In [24]:
veri = [
    ('anahtar1', 10),
    ('anahtar2', 20),
    ('anahtar1', 30),
    ('anahtar2', 25),
    ('anahtar3', 15)
]

with beam.Pipeline() as pipeline:
    veri_akisi = pipeline | beam.Create(veri)

    # Veriyi anahtarlarına göre gruplandıralım ve en son veriyi seçelim
    latest_veri = (
        veri_akisi
        | beam.GroupByKey()
        | beam.Map(lambda element: (element[0], max(element[1])))
    )

    # En son verileri yazdıralım
    latest_veri | beam.Map(print)

('anahtar1', 30)
('anahtar2', 25)
('anahtar3', 15)


In [32]:
def to_unix_time(time_str, format='%Y-%m-%d %H:%M:%S'):
    return time.mktime(time.strptime(time_str, format))

with beam.Pipeline() as pipeline:
    latest_elements_per_key = (
        pipeline
        | 'Create crops' >> beam.Create([
            ('spring', {
                'item': '🥕', 'harvest': '2020-06-28 00:00:00'
            }),
            ('spring', {
                'item': '🍓', 'harvest': '2020-06-16 00:00:00'
            }),
            ('summer', {
                'item': '🥕', 'harvest': '2020-07-17 00:00:00'
            }),
            ('summer', {
                'item': '🍅', 'harvest': '2020-09-22 00:00:00'
            }),
            ('autumn', {
                'item': '🍅', 'harvest': '2020-10-01 00:00:00'
            }),
            ('autumn', {
                'item': '🥬', 'harvest': '2020-10-20 00:00:00'
            }),
            ('autumn', {
                'item': '🍆', 'harvest': '2020-10-26 00:00:00'
            }),
            ('winter', {
                'item': '🥬', 'harvest': '2020-02-24 00:00:00'
            }),
        ])
        | 'With timestamps' >> beam.Map(
            lambda pair: beam.window.TimestampedValue(
                (pair[0], pair[1]['item']), to_unix_time(pair[1]['harvest'])))
        | 'Get latest elements per key' >> beam.combiners.Latest.PerKey()
        | beam.Map(print))

('spring', '🥕')
('summer', '🍅')
('autumn', '🍆')
('winter', '🥬')


WindowInto()

beam.WindowInto işlemi, Apache Beam içinde kullanılan bir PTransform'dir ve verileri belirli bir pencereleme stratejisi ile pencerelemek için kullanılır. Pencereleme stratejileri, zaman aralıklarına veya veri miktarına dayalı olarak verileri gruplara bölmeyi sağlar. Bu sayede belirli bir zaman dilimi veya veri miktarı içinde işlemler yapabilirsiniz.

In [35]:
with beam.Pipeline() as pipeline:
    produce = (pipeline
                 | 'Garden plants' >> beam.Create([
                    {'name': 'Strawberry', 'season': 1585699200},  # April, 2020
                    {'name': 'Strawberry', 'season': 1588291200},  # May, 2020
                    {'name': 'Carrot', 'season': 1590969600},  # June, 2020
                    {'name': 'Artichoke', 'season': 1583020800},  # March, 2020
                    {'name': 'Artichoke', 'season': 1585699200},  # April, 2020
                    {'name': 'Tomato', 'season': 1588291200},  # May, 2020
                    {'name': 'Potato', 'season': 1598918400},  # September, 2020
                  ])
                 | 'With timestamps' >> beam.Map(lambda plant: beam.window.TimestampedValue(plant['name'], plant['season']))
                 | 'Window into fixed 2-month windows' >> beam.WindowInto(
                              beam.window.FixedWindows(2 * 30 * 24 * 60 * 60))
                 | 'Count per window' >> beam.combiners.Count.PerElement()
                 | 'Print results' >> beam.Map(print)
                 )

('Strawberry', 1)
('Strawberry', 1)
('Carrot', 1)
('Artichoke', 2)
('Tomato', 1)
('Potato', 1)


'Window into fixed 2-month windows': Bu yorum, hangi pencereleme stratejisinin kullanıldığını belirtir. Bu örnekte 2 aylık sabit pencereleme stratejisi kullanılıyor.

beam.WindowInto(beam.window.FixedWindows(2 * 30 * 24 * 60 * 60)): Bu ifade, pencerelemeyi ayarlayan bölümdür.

beam.window.FixedWindows(2 * 30 * 24 * 60 * 60) ifadesi, 2 ay (2 * 30 gün) boyunca sürekli olarak sabit büyüklükte (saniye cinsinden) pencereleme yapılacağını belirtir.

beam.WindowInto(...) işlemi, verileri belirli pencerelere bölen bir işlemi temsil eder. İşte bu pencereleme stratejisi ile belirli bir zaman aralığı boyunca gelen verileri gruplayabilirsiniz.

Bu tür pencerelemeler, özellikle zaman serisi verileri işlerken veya belirli zaman dilimlerine göre analiz yaparken kullanışlıdır. Pencereleme stratejileri, veri analizini belirli zaman dilimlerine uygun bir şekilde sınırlamak veya daha iyi paralel işlem yapmak için kullanılır.