## 1. Finding minimum Pledge

In [1]:
def find_min_pledge(pledge_list):
    # Filter out non-positive numbers and sort the list
    positive_pledges = sorted([p for p in pledge_list if p > 0])

    # If there is no positive numbers the smallest pledge is 1
    if not positive_pledges:
        return 1

    # Go through the sorted list and find the first missing positive number
    min_pledge = 1
    for pledge in positive_pledges:
        if pledge == min_pledge:
            min_pledge += 1
        elif pledge > min_pledge:
            break

    return min_pledge

In [2]:
assert find_min_pledge([1, 3, 6, 4, 1, 2]) == 5
assert find_min_pledge([1, 2, 3]) == 4
assert find_min_pledge([-1, -3]) == 1

## 2. Getting headlines

In [3]:
import requests
import xml.etree.ElementTree as ET

def get_headlines(rss_url):
    """
    Fetches and parses the RSS feed from the provided URL and returns a list of article titles.

    param:
        rss_url (str): The URL of the RSS feed to fetch and parse.

    return:
        list of str: A list of article titles extracted from the RSS feed.
    """
    # Fetch the RSS feed content
    response = requests.get(rss_url)
    if response.status_code != 200:
        return []  # Return an empty list if the request fails

    # Parse the XML content
    root = ET.fromstring(response.content)

    # Extract the titles
    headlines = []
    for item in root.findall(".//item/title"):
        headlines.append(item.text)

    return headlines

In [4]:
# Example usage
google_news_url = "https://news.google.com/news/rss"
print(get_headlines(google_news_url))

["'Significant' earthquake rocks Los Angeles - ABC News", "Top Ukrainian commander says his forces now control almost 390 square miles of Russia's Kursk region - The Associated Press", 'Defense secretary orders submarine to Middle East, accelerates arrival of strike group ahead of anticipated Iran attack - CNN', 'EU warns Elon Musk ahead of Trump interview to keep hate speech off X - CNBC', 'Iran gives few clues as region awaits military response to Israel - Financial Times', 'Walz Slammed by Former Battalion Leader: ‘He Did Not Earn the Rank’ - The Daily Beast', 'Joe Biden would sign a bill eliminating a tax on tips, White House says - The Hill', '‘We gotta be somewhere’: Homeless Californians react to Newsom’s crackdown - CalMatters', 'Trump falsely claims Harris campaign used AI to fake crowd in Detroit - CBS News', 'Crews begin demolishing Texas church where gunman killed more than two dozen in 2017 - The Associated Press', 'Hamas Refuses to Attend Upcoming Cease-Fire, Hostage Talk

## Streaming Payments Processor

In [5]:
def get_payments_storage():
    """
    @returns an instance of
    https://docs.python.org/3/library/io.html#io.BufferedWriter
    """
# Sample implementation to make the code run in coderpad.
# Do not rely on this exact implementation.
    return open('/dev/null', 'wb')

# This is a library function, you can't modify it.
def stream_payments_to_storage(storage):
    """
    
    Loads payments and writes them to the `storage`.
    Python
    Assignment for
    Associate Data Engineer role
    Returns when all payments have been written.
    @parameter `storage`: is an instance of
    https://docs.python.org/3/library/io.html#io.BufferedWriter
    """
# Sample implementation to make the code run in coderpad.
# Do not rely on this exact implementation
    for i in range(10):
        storage.write(bytes([1, 2, 3, 4, 5]))

In [6]:
def process_payments():
    """
    Processes payments by streaming them to storage and prints the checksum of all bytes written.
    
    The function calculates the checksum as a simple arithmetic sum of the byte values written by
    the `stream_payments_to_storage()` call. The existing functionality of streaming payments to storage 
    is preserved, and the checksum is printed at the end.

    Example:
        If the bytes [1, 2, 3, 4, 5] were written in each iteration, and there were 10 iterations,
        the checksum would be 150.
    """
    
    class ChecksumWriter:
        def __init__(self, wrapped_storage):
            self.wrapped_storage = wrapped_storage
            self.checksum = 0
        
        def write(self, buffer):
            self.checksum += sum(buffer)
            self.wrapped_storage.write(buffer)
        
        def close(self):
            self.wrapped_storage.close()
    
    # Get the storage
    storage = get_payments_storage()
    
    # Wrap the storage to calculate checksum
    checksum_writer = ChecksumWriter(storage)
    
    # Stream payments to storage
    stream_payments_to_storage(checksum_writer)
    
    # Print the checksum of all bytes written
    print(checksum_writer.checksum)
    checksum_writer.close()

In [7]:
# Example usage
process_payments()

150


## Streaming Payments Processor, two vendors edition.

In [8]:
import io

# This is a library function, you can't modify it.
def stream_payments(callback_fn):
    """
    Reads payments from a payment processor and calls `callback_fn(amount)`
    for each payment.
    Returns when there is no more payments.
    """
    for i in range(10):
        callback_fn(i)

# This is a library function, you can't modify it.
def store_payments(amount_iterator):
    """
    Iterates over the payment amounts from amount_iterator
    and stores them to a remote system.
    """
    # Sample implementation to make the code run in coderpad.
    # Do not rely on this exact implementation.
    for i in amount_iterator:
        print(i)

def callback_example(amount):
    print(amount)
    return True

In [9]:

def process_payments_2():
    """
    Bridges the gap between `stream_payments()` and `store_payments()` by converting the 
    callback-based API of `stream_payments()` into an iterator that `store_payments()` can consume.
    
    The function ensures that payments are streamed and stored efficiently, adhering to the restrictions
    of limited memory and single calls to the vendor functions.
    """
    
    def payment_generator():
        """
        A generator that yields payment amounts one by one.
        This function acts as a bridge, converting the callback pattern into an iterator.
        """
        payment_list = []
        def callback(amount):
            payment_list.append(amount)

        stream_payments(callback)
        for payment in payment_list:
            yield payment
    
    # Pass the generator to store_payments
    store_payments(payment_generator())

In [10]:
# Example usage
process_payments_2()

0
1
2
3
4
5
6
7
8
9


# Code review 

In [11]:
def get_value(data, key, default, lookup=None, mapper=None):
    """
    Finds the value from data associated with key, or default if the
    key isn't present.
    If a lookup enum is provided, this value is then transformed to its
    enum value.
    If a mapper function is provided, this value is then transformed
    by applying mapper to it.
    """
    return_value = data[key] # Better to use data.get(key, None) as there might no key in dict
    if return_value is None or return_value == "": # can be used "if not return_value:" , covering both
        return_value = default
    if lookup:
        return_value = lookup[return_value]
    if mapper:
        return_value = mapper(return_value)
    return return_value

In [12]:
def ftp_file_prefix(namespace):
    """
    Given a namespace string with dot-separated tokens, returns the
    string with
    the final token replaced by 'ftp'.
    Example: a.b.c => a.b.ftp
    """
    ## there might be no dot in the string, better handle that case
    # tokens = namespace.split(".")
    # 
    # # Suggestion: Handle the case where the namespace has only one token
    # if len(tokens) > 1:
    #     return ".".join(tokens[:-1]) + '.ftp'
    # else:
    #     return 'ftp'
    return ".".join(namespace.split(".")[:-1]) + '.ftp'

In [13]:
def string_to_bool(string):
    """
    Returns True if the given string is 'true' case-insensitive,
    False if it is
    'false' case-insensitive.
    Raises ValueError for any other input.
    """

    ## We can fist make lower case, then use dict to return the value 
    # normalized_str = string.lower()
    # 
    # # Suggestion: Use a dictionary for mapping
    # bool_map = {
    #     'true': True,
    #     'false': False
    # }
    # 
    ## Check if the normalized string is in the dictionary and return the corresponding value
    # if normalized_str in bool_map:
    #     return bool_map[normalized_str]
    
    if string.lower() == 'true':
        return True
    if string.lower() == 'false':
        return False
    raise ValueError(f'String {string} is neither true nor false')

In [14]:
def config_from_dict(dict): ## We can not use "dict" as a parameter, we should change it, for example  use "data"
    """
    Given a dict representing a row from a namespaces csv file,
    returns a DAG configuration as a pair whose first element is the
    DAG name
    and whose second element is a dict describing the DAG's properties
    """
    # suggestion: check if namespace and Airflow DAG is in data , as they are required
    
    # if 'Namespace' not in data or 'Airflow DAG' not in data:
    #     raise KeyError("The keys 'Namespace' and 'Airflow DAG' are required.")
    # 
    # namespace = data['Namespace']
    # dag_name = data['Airflow DAG']
    
    namespace = dict['Namespace']
    
    # Suggestion: for not writing call of function so many times, put it into loop. First with defining # #default values.This reducing redundancy.

    #     default_values = {
    #     "earliest_available_delta_days": 0,
    #     "lif_encoding": 'json',
    #     "earliest_available_time": '07:00',
    #     "latest_available_time": '08:00',
    #     "require_schema_match": 'True',
    #     "schedule_interval": '1 7 * * * ',
    #     "ftp_file_wildcard": None,
    #     "ftp_file_prefix": ftp_file_prefix(namespace)
    # }
    # 
    # # Build the properties dictionary
    # dag_properties = {
    #     key: data.get(key, default) for key, default in default_values.items()
    # }
    
    # # Apply the mapper where necessary
    # dag_properties["require_schema_match"] = string_to_bool(dag_properties["require_schema_match"])
    # 
    ## Update the namespace value
    # dag_properties["namespace"] = namespace
    #
    # return (dag_name, dag_properties)
    
    
    return (dict['Airflow DAG'],
    {"earliest_available_delta_days": 0,
     "lif_encoding": 'json',
     "earliest_available_time": get_value(dict, 'Available Start Time', '07:00'),
     "latest_available_time": get_value(dict, 'Available End Time', '08:00'),
     "require_schema_match": get_value(dict, 'Requires Schema Match', 'True',  mapper=string_to_bool),
     "schedule_interval": get_value(dict, 'Schedule', '1 7 * * * '),
     "delta_days": get_value(dict, 'Delta Days', 'DAY_BEFORE', lookup=DeltaDays), #remove DeltaDays, it                                                                              #is not defined
     "ftp_file_wildcard": get_value(dict, 'File Naming Pattern', None),
     "ftp_file_prefix":  get_value(dict, 'FTP File Prefix', ftp_file_prefix(namespace)),
     "namespace": namespace
    }
    )