### Import Packages

In [1]:
import apache_beam as beam
import requests

### Class to Extract JSON Data

In [2]:
# Define a custom DoFn to extract the specified fields from nested JSON
class ExtractFields(beam.DoFn):
    def __init__(self, fields_to_extract):
        self.fields_to_extract = fields_to_extract

    def process(self, element):
        import json

        try:
            # creating a list of fields to extract from string
            fields_to_extract = self.fields_to_extract.split(",")

            extracted_data = {}
            for field in fields_to_extract:
                # Split the field name by '.' to navigate nested structures
                field_parts = field.split(".")
                current_data = element
                for part in field_parts:
                    if part in current_data:
                        current_data = current_data[part]
                    else:
                        # Field not found, skip this field
                        current_data = None
                        break

                if current_data is not None:
                    extracted_data[field] = current_data

            if extracted_data:
                yield extracted_data
        except (json.JSONDecodeError, ValueError) as e:
            # Handle JSON decoding errors here
            pass

### Class to Add 2 new columns

In [3]:
class AddDatetimeAndDate(beam.DoFn):
    def process(self, element):
        from datetime import datetime

        new_element = dict(element)  # Create a copy of the original dictionary

        current_time = datetime.now()

        # Add a DATETIME column with the current timestamp
        new_element["load_datetime"] = current_time
        new_element["load_date"] = current_time.date()

        yield new_element  # Emit the new dictionary with the added columns

### Testing Pipeline with FPL API

In [4]:
fields_to_extract = "id,code"

with beam.Pipeline() as p:

    json_data = (
        p | "Read API" >> beam.Create(["https://fantasy.premierleague.com/api/fixtures/?event=4"])
        | "HTTP GET" >> beam.ParDo(lambda url: requests.get(url).json())
        | "Extract Fields" >> beam.ParDo(ExtractFields(fields_to_extract)) 
        | "AddDatetimeAndDate" >> beam.ParDo(AddDatetimeAndDate())         
        | "Print" >> beam.Map(print)
    )

p.run()

{'id': 38, 'code': 2367575, 'load_datetime': datetime.datetime(2023, 10, 17, 14, 28, 34, 935974), 'load_date': datetime.date(2023, 10, 17)}
{'id': 40, 'code': 2367577, 'load_datetime': datetime.datetime(2023, 10, 17, 14, 28, 34, 936979), 'load_date': datetime.date(2023, 10, 17)}
{'id': 32, 'code': 2367569, 'load_datetime': datetime.datetime(2023, 10, 17, 14, 28, 34, 936979), 'load_date': datetime.date(2023, 10, 17)}
{'id': 34, 'code': 2367571, 'load_datetime': datetime.datetime(2023, 10, 17, 14, 28, 34, 936979), 'load_date': datetime.date(2023, 10, 17)}
{'id': 35, 'code': 2367572, 'load_datetime': datetime.datetime(2023, 10, 17, 14, 28, 34, 937986), 'load_date': datetime.date(2023, 10, 17)}
{'id': 39, 'code': 2367576, 'load_datetime': datetime.datetime(2023, 10, 17, 14, 28, 34, 937986), 'load_date': datetime.date(2023, 10, 17)}
{'id': 33, 'code': 2367570, 'load_datetime': datetime.datetime(2023, 10, 17, 14, 28, 34, 938507), 'load_date': datetime.date(2023, 10, 17)}
{'id': 36, 'code': 2

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x19b5e4a78e0>

### Testing Pipeline with 

In [5]:
fields_to_extract = "name"

with beam.Pipeline() as p:

    json_data = (
        p | "Read API" >> beam.Create(["https://api.punkapi.com/v2/beers"])
        | "HTTP GET" >> beam.ParDo(lambda url: requests.get(url).json())
        | "Extract Fields" >> beam.ParDo(ExtractFields(fields_to_extract))
        # | "AddDatetimeAndDate" >> beam.ParDo(AddDatetimeAndDate())        
        | "Print" >> beam.Map(print)
    )

p.run()

{'name': 'Buzz'}
{'name': 'Trashy Blonde'}
{'name': 'Berliner Weisse With Yuzu - B-Sides'}
{'name': 'Pilsen Lager'}
{'name': 'Avery Brown Dredge'}
{'name': 'Electric India'}
{'name': 'AB:12'}
{'name': 'Fake Lager'}
{'name': 'AB:07'}
{'name': 'Bramling X'}
{'name': 'Misspent Youth'}
{'name': 'Arcade Nation'}
{'name': 'Movember'}
{'name': 'Alpha Dog'}
{'name': 'Mixtape 8'}
{'name': 'Libertine Porter'}
{'name': 'AB:06'}
{'name': 'Russian Doll – India Pale Ale'}
{'name': 'Hello My Name Is Mette-Marit'}
{'name': 'Rabiator'}
{'name': 'Vice Bier'}
{'name': 'Devine Rebel (w/ Mikkeller)'}
{'name': 'Storm'}
{'name': 'The End Of History'}
{'name': 'Bad Pixie'}
{'name': 'Buzz'}
{'name': 'Trashy Blonde'}
{'name': 'Berliner Weisse With Yuzu - B-Sides'}
{'name': 'Pilsen Lager'}
{'name': 'Avery Brown Dredge'}
{'name': 'Electric India'}
{'name': 'AB:12'}
{'name': 'Fake Lager'}
{'name': 'AB:07'}
{'name': 'Bramling X'}
{'name': 'Misspent Youth'}
{'name': 'Arcade Nation'}
{'name': 'Movember'}
{'name': 'Al

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x19b596b8cd0>