En este archivo puedes escribir lo que estimes conveniente. Te recomendamos detallar tu solución y todas las suposiciones que estás considerando. Aquí puedes ejecutar las funciones que definiste en los otros archivos de la carpeta src, medir el tiempo, memoria, etc.

## Let's start with a handmade aproach

Let's suppose we are in a truly limited computer, but with infinite execution time

But first, let's check a suposition about dates.

In [None]:
import json
from datetime import datetime

FILE_PATH = './farmers-protest-tweets-2021-2-4.json'
FILE_PATH_TEST = './farmers-protest-tweets-2021-2-4-test.json' # An smaller dataset :P
READ_MODE = 'r'
DUMMY_DATE = datetime.fromisoformat('2050-01-01T00:00:00+00:00').date()
TOP_N = 10

# Let's define a fictional date in the future to compare
prev_date = DUMMY_DATE

with open(FILE_PATH, 'r', encoding='utf-8', buffering=1) as json_file:
    for line in json_file:
        _line = json.loads(line)

        date = datetime.fromisoformat(_line['date']).date()

        if prev_date < date:  # Check if is sorted in a desc way
            raise ValueError
        
        prev_date = date

Since this was executed without errors, we can affirm dates are always desc

We'll need to create a method to insert in a sorted way elements, to avoid to order

... and create some basic tests for this

In [None]:
def insert_sorted(iterable: list, element: tuple, el_idx: int) -> list:
    i = 0
    element[el_idx]  # check if element index exists and raise error if not
    
    if not iterable or element[el_idx] < iterable[-1][el_idx]:
        iterable.append(element)
    else:
        while element[el_idx] < iterable[i][el_idx] and i < len(iterable):
            i += 1
        iterable.insert(i, element)


In [None]:
import unittest
from unittest import TestCase

class TestInsertSorted(TestCase):

    def test_insert_sorted_initial(self):
        INITIAL_TEST_VALUE = []
        NEW_DATA = ('a', 1)
        EXPECTED_RESULT = [('a', 1)]

        test_value = INITIAL_TEST_VALUE.copy()
        insert_sorted(test_value, NEW_DATA, 0)
        self.assertEqual(test_value, EXPECTED_RESULT)

        test_value = INITIAL_TEST_VALUE.copy()
        insert_sorted(test_value, NEW_DATA, 1)
        self.assertEqual(test_value, EXPECTED_RESULT)

        test_value = INITIAL_TEST_VALUE.copy()
        self.assertRaises(IndexError, insert_sorted, test_value, NEW_DATA, 2)

    def test_insert_sorted_begin(self):
        INITIAL_TEST_VALUE = [('c', 3),  ('b', 2), ('a', 1)]
        NEW_DATA = ('d', 4)
        EXPECTED_RESULT = [('d', 4),('c', 3),  ('b', 2), ('a', 1)]

        test_value = INITIAL_TEST_VALUE.copy()
        insert_sorted(test_value, NEW_DATA, 0)
        self.assertEqual(test_value, EXPECTED_RESULT)

        test_value = INITIAL_TEST_VALUE.copy()
        insert_sorted(test_value, NEW_DATA, 1)
        self.assertEqual(test_value, EXPECTED_RESULT)

        test_value = INITIAL_TEST_VALUE.copy()
        self.assertRaises(IndexError, insert_sorted, test_value, NEW_DATA, 2)


    def test_insert_sorted_normal(self):
        INITIAL_TEST_VALUE = [('d', 4), ('b', 2), ('a', 1)]
        NEW_DATA = ('c', 3)
        EXPECTED_RESULT = [('d', 4), ('c', 3), ('b', 2), ('a', 1)]

        test_value = INITIAL_TEST_VALUE.copy()
        insert_sorted(test_value, NEW_DATA, 0)
        self.assertEqual(test_value, EXPECTED_RESULT)

        test_value = INITIAL_TEST_VALUE.copy()
        insert_sorted(test_value, NEW_DATA, 1)
        self.assertEqual(test_value, EXPECTED_RESULT)

        test_value = INITIAL_TEST_VALUE.copy()
        self.assertRaises(IndexError, insert_sorted, test_value, NEW_DATA, 2)


    def test_insert_sorted_end(self):
        INITIAL_TEST_VALUE = [('d', 4), ('c', 3), ('b', 2)]
        NEW_DATA = ('a', 1)
        EXPECTED_RESULT = [('d', 4),('c', 3),  ('b', 2), ('a', 1)]

        test_value = INITIAL_TEST_VALUE.copy()
        insert_sorted(test_value, NEW_DATA, 0)
        self.assertEqual(test_value, EXPECTED_RESULT)

        test_value = INITIAL_TEST_VALUE.copy()
        insert_sorted(test_value, NEW_DATA, 1)
        self.assertEqual(test_value, EXPECTED_RESULT)

        test_value = INITIAL_TEST_VALUE.copy()
        self.assertRaises(IndexError, insert_sorted, test_value, NEW_DATA, 2)


unittest.main(argv=[''], verbosity=2, exit=False)
    

Well, everything looks fine...
And we'll need to get the max value from tuples inside the value in a dict

In [None]:
def get_max_value(tweets_counter_by_user: dict) -> tuple:
    """ Returns a tuple (key, value) for the maximum value in a dictionay

    WARNING: Keep in mind this only will work for positive numbers because
    tweet counter cannot be negative.
    """
    if not tweets_counter_by_user:
        return None

    max_value = ('__initial__', -1)
    for user_name, tweets_counter in tweets_counter_by_user.items():
        if tweets_counter > max_value[1]:  # This implies we'll get the first founded max
            max_value = (user_name, tweets_counter)

    return max_value

In [None]:
import unittest
from unittest import TestCase

class TestGetMaxValue(TestCase):

    def test_get_max(self):
        TEST_OBJECT = {
            'min': 1,
            'dummy': 2,
            'max': 3
        }
        EXPECTED_RESULT = ('max', 3)
        result = get_max_value(TEST_OBJECT)

        self.assertEqual(result, EXPECTED_RESULT)

    def test_get_duplicated(self):
        TEST_OBJECT = {
            'min': 1,
            'dummy': 2,
            'max': 2
        }
        EXPECTED_RESULT = ('dummy', 2)
        result = get_max_value(TEST_OBJECT)

        self.assertEqual(result, EXPECTED_RESULT)

unittest.main(argv=[''], verbosity=2, exit=False)

## Limited memory, a lot of execution time

For this section let's suppose we have only a small computer but with infinite execution time

Let's start with the q1 tasks

In [None]:
COUNTER_INDEX = 1
DATE_INDEX = 0
TOP_N = 10
TOP_USERNAME_INDEX = 2
USERNAME_INDEX = 0

tweets_counter_by_user = {}
max_user_by_date = []
tweets_by_day = 0
prev_date = DUMMY_DATE


with open(FILE_PATH, 'r', encoding='utf-8', buffering=1) as json_file:
    for line in json_file:
        tweets_by_day += 1
        _line = json.loads(line)

        date = datetime.fromisoformat(_line['date']).date()
        user = _line['user']['username']

        # this is because date is always descendent:
        if prev_date != date and prev_date != DUMMY_DATE:
            max_value = get_max_value(tweets_counter_by_user)
            insert_sorted(
                max_user_by_date,
                (prev_date, tweets_by_day - 1, max_value[USERNAME_INDEX]),
                COUNTER_INDEX
            )
            if len(max_user_by_date) > TOP_N:  # We only want TOP_N results
                max_user_by_date.pop()

            tweets_counter_by_user = {}
            tweets_by_day = 1

        prev_date = date

        if tweets_counter_by_user.get(user):
            tweets_counter_by_user[user] += 1
        else:
            tweets_counter_by_user[user] = 1

    # Compute data for last date:
    max_value = get_max_value(tweets_counter_by_user)
    insert_sorted(
        max_user_by_date,
        (prev_date, tweets_by_day - 1, max_value[USERNAME_INDEX]),
        COUNTER_INDEX
    )
    if len(max_user_by_date) > TOP_N:  # We only want TOP_N results
        max_user_by_date.pop()


result = [(el[DATE_INDEX], el[TOP_USERNAME_INDEX]) for el in max_user_by_date]

In [None]:
result

Now q3 because I don't know (yet) how to handle with emojies

In [None]:
import re

COUNTER_INDEX = 1
mentions_counter = {}

with open(FILE_PATH, 'r', encoding='utf-8', buffering=1) as json_file:
    for line in json_file:
        _line = json.loads(line)
        tweet_content = _line['renderedContent']
        mentioned_users = re.findall(r'@(\w+)\b', tweet_content)

        for user in mentioned_users:
            if mentions_counter.get(user):
                mentions_counter[user] += 1
            else:
                mentions_counter[user] = 1

all_mentions_sorted = {
    username: mentions_counter for username, mentions_counter in 
    sorted(mentions_counter.items(), key=lambda item: item[COUNTER_INDEX], reverse=True)
}
top_n_mentions = [
    (username, all_mentions_sorted[username]) 
    for username in list(all_mentions_sorted)[:TOP_N]
]

In [None]:
top_n_mentions

Let's copy the last solution but searching for emojis instead mentions

In [None]:
import emoji


emoji_counter = {}

with open(FILE_PATH, 'r', encoding='utf-8', buffering=1) as json_file:
    for line in json_file:
        tweets_by_day += 1
        _line = json.loads(line)
        tweet_content = _line['renderedContent']

        # This is trully slow, but since Indi characters are used I couldn't 
        # find a better solution far now :(
        # I'd like to use a regex to find just emojies
        used_emojis = [el['emoji'] for el in emoji.emoji_list(tweet_content)]

        for _emoji in used_emojis:
            if emoji_counter.get(_emoji):
                emoji_counter[_emoji] += 1
            else:
                emoji_counter[_emoji] = 1

# ToDo: Check if is possible to avoid sort at the end and keep top_n sorted
sorted_emojies = {
    emoji: counter for emoji, counter in sorted(
        emoji_counter.items(), key=lambda item: item[COUNTER_INDEX],
        reverse=True
    )
}

# Format in the desired output
top_n_emojies = [
    (emoji, sorted_emojies[emoji]) for emoji in list(sorted_emojies)[:TOP_N]
]

In [None]:
top_n_emojies

## Now let's suppose we have GCP and unlimited memory and procesors.

Please note I don't believe to use GCP in this test could improve something since we are using an small dataset and limited computational resources, but this will be coded as if I wanted to use DataFlow and use as many workers as I wish, but instead read file from an Bucket, this time will be readed from a local path and the outoput just printed, but this could be stored in some GCP service easily, like BigQuery.

Also remember the execution time will depends on the available computational resources, I believe the optimal solution for time and memory with limited resources are the previus ones, but lets use power that [apache beam](https://beam.apache.org/about/) offers.

For more information about the execution model, take a look to [apache beam domentation](https://beam.apache.org/documentation/runtime/model/) about this.

### Let's start with the easy one.

Since multiple agregations are not required and we have a library to get emojis...
Let's use a "count words" but with some extra steps.

In [None]:
from apache_beam import CombinePerKey, Pipeline, FlatMap, Map
from apache_beam.io import ReadFromText
from apache_beam.options.pipeline_options import PipelineOptions

import apache_beam as beam

import emoji

def parse_element(element):
    element = json.loads(element)
    return element['renderedContent']


def run(pipeline_args=None):
    pipeline_options = PipelineOptions(
        pipeline_args, save_main_session=True
    )

    with Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            | 'Read file' >> ReadFromText(FILE_PATH)  # This is faster than ReadFromJson
            | 'Use the desired field' >> Map(parse_element)
            # # Next line returns arrays:
            | 'Get emojis' >> Map(lambda x: [el['emoji'] for el in emoji.emoji_list(x)])  # This is taking so much time :(
            # # But next line flattens it:
            | 'Flat elements' >> FlatMap(lambda x: x)
            | 'Map to key / value' >> Map(lambda x: (x, 1))
            | 'And sum by key' >> CombinePerKey(sum)
            | "Select top N" >> beam.combiners.Top.Largest(TOP_N, key=lambda x: x[1])
            | Map(print)
        )


pipeline_args = {'flexrs_goal': 'SPEED_OPTIMIZED'}  # Define this to avoid error
run(pipeline_args)

In [None]:
import apache_beam as beam

from apache_beam import CombinePerKey, Pipeline, FlatMap, Map, Regex
from apache_beam.io import ReadFromText
from apache_beam.options.pipeline_options import PipelineOptions


def run(pipeline_args=None):
    pipeline_options = PipelineOptions(
        pipeline_args, save_main_session=True,
    )

    with Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            | 'Read file' >> ReadFromText(FILE_PATH)
            | 'Use the desired field' >> Map(parse_element)
            # Next line returns arrays:
            | 'Get mentions' >> Regex.find_all(r'(?<=@)(\w+)\b')
            # But next line flattens it:
            | 'Flat elements' >> FlatMap(lambda x: x)
            | 'Map to (key, 1)' >> Map(lambda x: (x, 1))
            | 'And sum by key' >> CombinePerKey(sum)
            | "Select top N" >> beam.combiners.Top.Largest(TOP_N, key=lambda x: x[1])
            | Map(print)
            # ToDo: you could write it easily into BQ instead print it
            # | WriteToBigQuery(
            # project=project_id,
            # table=table,
            # schema=TABLE_SCHEMA,
            # method=WriteToBigQuery.Method.STREAMING_INSERTS,  # To avoid use cloud storage 
            # insert_retry_strategy=RetryStrategy.RETRY_NEVER
            # )
        )

pipeline_args = {'flexrs_goal': 'SPEED_OPTIMIZED'}  # Define this to avoid error
run(pipeline_args)

But for get some top days with most activity and top user by day some extra ticks are needed, because we need to create a custom CombineFM, we take the solution from [Apache Beam documentation](https://beam.apache.org/documentation/transforms/python/aggregation/combinevalues/) looking in the [git repository](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/aggregation/combinevalues_combinefn.py) and adapt it to get what we want.

In [None]:
import json
from datetime import datetime

import apache_beam as beam
from apache_beam import Pipeline, Map, GroupBy, CombineValues, CombineFn
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def parse_element(element):
    element = json.loads(element)
    return (
        str(datetime.fromisoformat(element['date']).date()),
        element['user']['username']
    )


def format_output(element):
    DATE_INDEX = 0
    MOST_ACTIVE_USER_INDEX, USERNAME_INDEX = 1

    return [
        (
           datetime.strptime(el[DATE_INDEX], '%Y-%m-%d').date(),
           el[MOST_ACTIVE_USER_INDEX][USERNAME_INDEX]
        ) for el in element
    ]


class MaxFn(CombineFn):
    def create_accumulator(self):
      return {}

    def add_input(self, accumulator, input):
        if input not in accumulator:
            accumulator[input] = 0
        accumulator[input] += 1
        return accumulator

    def merge_accumulators(self, accumulators):
      merged = {}
      for accum in accumulators:
        for item, count in accum.items():
            if item not in merged:
                merged[item] = 0
            merged[item] += count
      return merged

    def extract_output(self, accumulator):
        """Customed solution to get the most active user by date"""
        sum_tweets = 0

        max_value = -1  # Initialize a dummy max value
        most_active_user = None
        for element, counter in accumulator.items():
           sum_tweets += counter
           if counter > max_value:
                most_active_user = element[1]
                max_value = counter

        return sum_tweets, most_active_user


def run(pipeline_args=None):
    pipeline_options = PipelineOptions(pipeline_args)

    with Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            # ToDo: You can replace read from local storage and read it from Cloud Storage using a gs://{path}
            | 'Read file' >> ReadFromText(FILE_PATH)
            | 'Use the desired field' >> Map(parse_element)
            | 'Group all users by date' >> GroupBy(lambda x: x[0])
            | 'Get most active user per date' >> CombineValues(MaxFn())
            | 'Select Top 10' >> beam.combiners.Top.Largest(TOP_N, key=lambda x: x[1][0])
            | Map(print)
            # | 'Format to expected output' >> Map(format_output)
            # # Since return a value is a nonsense in Apache Beam, let's persist the results:
            # | 'Persist results' >> WriteToText('./results/q1/results.txt')

            # ToDo: you could write it easily into BQ instead a file with this:
            # 'Write to BQ' | WriteToBigQuery(
            # project=project_id,
            # table=table,
            # schema=TABLE_SCHEMA,
            # method=WriteToBigQuery.Method.STREAMING_INSERTS,  # To avoid use cloud storage 
            # insert_retry_strategy=RetryStrategy.RETRY_NEVER
            # )
        )


pipeline_args = {'flexrs_goal': 'SPEED_OPTIMIZED'}  # Define this to avoid error
run(pipeline_args)

In [None]:
# Check we can read the file

import os

files_paths = [x for x in os.listdir('./results/q1/')]
file_path = os.path.join('./results/q1/' , files_paths[0])

with open(file_path, 'r', encoding='utf-8', buffering=1) as file:
    import datetime

    for line in file:
        result = eval(line)

print(result)

### Important consideration:

Since Apache dataflow doesn't returns an element, we'll write the results into a file and then read it :(

## GCP deployment instructions

The apache beam solutions could be deployed using Flex Templates

 ```bash
export PROJECT=my-project-id
export BUCKET_NAME=dataflow-utilities
export BUCKET_PATH=dataflow/templates
export TEMPLATE_NAME=latam-challenge
export REGION=us-east1
```

Create a new bucket to save the template
```bash
gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}
```

Then, build a container image:
```bash
export TEMPLATE_IMAGE="gcr.io/${PROJECT}/${TEMPLATE_NAME}:latest"
gcloud builds submit --tag "${TEMPLATE_IMAGE}" .
```

Create a metadata.json with the correct values and then create the Flex Template
```bash
export TEMPLATE_PATH="gs://${BUCKET_NAME}/${BUCKET_PATH}/${TEMPLATE_NAME}/${TEMPLATE_NAME}.json"

gcloud dataflow flex-template build ${TEMPLATE_PATH} \
  --image "${TEMPLATE_IMAGE}" \
  --sdk-language "PYTHON" \
  --metadata-file "metadata.json"
```

Finally execute the template in Dataflow and set the correct num of workers
```bash
gcloud dataflow flex-template run "${TEMPLATE_NAME}" \
    --template-file-gcs-location "${TEMPLATE_PATH}" \
    --region "${REGION}" \
    --parameters num_workers=10,max_num_workers=70,worker_region=${REGION}
```
