In [None]:
# Standard library imports
import gc
import heapq
import json
import re
from collections import defaultdict, Counter
from datetime import datetime
from typing import List, Tuple

# Third-party imports
import emoji
import memray
import orjson
import pandas as pd
import snakeviz
import ujson

In [1]:
file_path = "farmers-protest-tweets-2021-2-4.json"

### First, I tried to load the file in a pandas to evaluate if the file was correct.

In [None]:
df = pd.read_json(file_path)

### As the load fails I checked the file to find what happened. I used an online tool to prove some random objects and I saw that each object was correct. So I assumed that the problem was that the file contained a lot of objects but them were not in a list. In the next cell I read the file and fix it adding each element in a list and convert to a json. I save the file again in a new file.

In [None]:
# List to save the elements
json_list = []

# Read the file and add each JSON to the list
with open(file_path, 'r') as file:
    for line in file:
        try:
            # Load each line as JSON object and append to the list
            object_ = json.loads(line)
            json_list.append(object_)
        except json.JSONDecodeError as e:
            print(f'Error decoding the line: {line.strip()} - {e}')

In [None]:
# Convert the list to JSON
valid_json = json.dumps(json_list, indent=4)

In [None]:
with open('fixed_data.json', 'w') as end_file:
    end_file.write(valid_json)

In [None]:
file_path_test = "fixed_data.json"

In [None]:
df = pd.read_json(file_path_test)

In [None]:
df.head()

In [None]:
print(min(df["date"]), max(df["date"]))

### After processing the file like I explain before. I tried to load in a pandas again and this time it was correct. So I was able to visualize the file structure, fields, types and validate the date range of the file as well as the quantity of rows.

### After understanding the problem with the file I solved the first exercise knowing the problems that the file had and knowing that I had to correct it during the execution of the function.

### I defined a function to process the file in a way to optimize the execution time. I proved three packages to use the one that had better performance.

### I proved with json, ujson and orjson

### Finally, after checked the execution time of all of them. I decided to use orjson

In [None]:
def process_json(file_path):
    tweets_per_date = defaultdict(lambda: defaultdict(int))
    # Open the file in read mode
    with open(file_path, 'r') as f:
        # Read the file line by line
        for line in f:
            # Fix and convert each line in a JSON object
            line = line.strip()
            if not line:
                # Ignore empty lines
                continue

            # Validate the object to process correctly
            if line.startswith('{') and line.endswith('}'):
                try:
                    tweet = orjson.loads(line)
                except orjson.JSONDecodeError as e:
                    print(f'Error decoding the line: {line.strip()} - {e}')

                # Obtain date and username of each tweet
                tweet_date = datetime.strptime(tweet['date'], '%Y-%m-%dT%H:%M:%S+00:00').date()
                username = tweet['user']['username']

                # Add to the count of that date and user
                tweets_per_date[tweet_date][username] += 1

    return tweets_per_date

## Exercise 1

### Now I create the function to return the top 10 of dates with more tweets and the username with more publications. First, I implemented a loop to extract username and date of each tweet and append to a defaultdict. Here I add a new parameter to the function called "version" to proved two versions that I implemented. The first one has a loop over the items of the defaultdict and for each date calculate the total tweets and the user with more tweets, then make a validation with the maximum found. The second one sorted the dates by the total tweets and then just for the top 10 calculate the user with more tweets

In [None]:
def q1_time(file_path: str, version: int) -> List[Tuple[datetime.date, str]]:
    tweets_per_date = process_json(file_path)

    if version == 1:
        top_dates = []
        for date, users in tweets_per_date.items():
            # Calculate total tweets and find user with more tweets
            total_tweets = 0
            top_user = None
            max_tweets = 0

            for user, count in users.items():
                total_tweets += count
                if count > max_tweets:
                    max_tweets = count
                    top_user = user

            # Add date, total tweets and user to the list
            top_dates.append((date, total_tweets, top_user))

        # Sort list by total tweets and save just top 10
        top_dates_sorted = sorted(top_dates, key=lambda x: x[1], reverse=True)[:10]

        # Return date and user in a list of tuples
        return [(date, top_user) for date, _, top_user in top_dates_sorted]

    else:
        # List to save dates and total tweets per date
        date_tweet_sums = []
        # Dict to save users with more tweets per date
        top_users_by_date = {}

        # Calculate total tweets and find user with more tweets
        for date, users in tweets_per_date.items():
            total_tweets = sum(users.values())
            top_user = max(users.items(), key=lambda item: item[1])[0]
            date_tweet_sums.append((date, total_tweets))
            top_users_by_date[date] = top_user

        # Sort list by total tweets and save just top 10
        top_10_dates = sorted(date_tweet_sums, key=lambda x: x[1], reverse=True)[:10]

        # Use the date list and users dict to return the list
        return [(tweet_date, top_users_by_date[tweet_date]) for tweet_date, _ in top_10_dates]

### Now, for the function optimized by memory use I implemented a function that read the file line by line with a generator. The "tweet_generator" function process the file line by line and use yield to return each tweet one by one. With the use of yield avoid that the data are in memory and allow to procees when is necessary. In addition I used heap to implement a priority queue, where the element with the minimum or maximum value can be accessed efficiently, to obtain the k largest or smallest elements from a list efficiently

In [None]:
def tweet_generator(file_path):
    # Open file in read mode
    with open(file_path, 'r') as f:
        # Read each line
        for line in f:
            # Delete line breaks and spaces
            line = line.strip()
            if not line:
                continue

            # Validate JSON format
            if line.startswith('{') and line.endswith('}'):
                # Convert to object
                try:
                    tweet = json.loads(line)
                except json.JSONDecodeError as e:
                    print(f'Error decoding the line: {line.strip()} - {e}')

                # Save the date and username
                tweet_date = datetime.strptime(tweet['date'], '%Y-%m-%dT%H:%M:%S+00:00').date()
                username = tweet['user']['username']

                # Use of yield to return data one by one
                yield tweet_date, username

def process_tweets(file_path):
    # Dict to count tweets by date and user
    tweets_per_date = defaultdict(lambda: defaultdict(int))

    # use generator to process each tweet
    for tweet_date, username in tweet_generator(file_path):
        # Add to dict
        tweets_per_date[tweet_date][username] += 1

    return tweets_per_date

def q1_memory(file_path):
    top_dates_heap = []
    tweets = process_tweets(file_path)

    # Process file in an incremental way and free memory whenever possible
    for date, user_data in tweets.items():
        # Total tweets by user
        total_tweets = sum(user_data.values())
        top_user = max(user_data, key=user_data.get)
        top_user_count = user_data[top_user]

        # Use heap
        heapq.heappush(top_dates_heap, (total_tweets, date, top_user, top_user_count))

        # Limit the heap to 10 elements
        if len(top_dates_heap) > 10:
            heapq.heappop(top_dates_heap)

        # Free memory manually
        del user_data
        gc.collect()

    # Convert the heap to sorted list
    top_dates_sorted = sorted(top_dates_heap, key=lambda x: x[0], reverse=True)

    return [(date, top_user) for _, date, top_user, _ in top_dates_sorted]

In [None]:
q1_time(file_path=file_path, version=2)

In [None]:
q1_memory(file_path=file_path)

### Use of snakeviz. SnakeViz is a browser based graphical viewer for the output of Python’s cProfile module. With that package we can see each function and their times

In [None]:
%load_ext snakeviz

### Here I use snakeviz to see the time profile of each function. I run the time function twice, because I was proving the two versions

In [None]:
%snakeviz q1_time(file_path=file_path, version=1)

In [None]:
%snakeviz q1_time(file_path=file_path, version=2)

In [None]:
%snakeviz q1_memory(file_path=file_path)

### Use of memray to see the memory used profile

In [None]:
%load_ext memray

In [None]:
%%memray_flamegraph
def q1_time(file_path: str, version: int) -> List[Tuple[datetime.date, str]]:
    tweets_per_date = process_json(file_path)

    if version == 1:
        top_dates = []
        for date, users in tweets_per_date.items():
            # Calculate total tweets and find user with more tweets
            total_tweets = 0
            top_user = None
            max_tweets = 0

            for user, count in users.items():
                total_tweets += count
                if count > max_tweets:
                    max_tweets = count
                    top_user = user

            # Add date, total tweets and user to the list
            top_dates.append((date, total_tweets, top_user))

        # Sort list by total tweets and save just top 10
        top_dates_sorted = sorted(top_dates, key=lambda x: x[1], reverse=True)[:10]

        # Return date and user in a list of tuples
        return [(date, top_user) for date, _, top_user in top_dates_sorted]

    else:
        # List to save dates and total tweets per date
        date_tweet_sums = []
        # Dict to save users with more tweets per date
        top_users_by_date = {}

        # Calculate total tweets and find user with more tweets
        for date, users in tweets_per_date.items():
            total_tweets = sum(users.values())
            top_user = max(users.items(), key=lambda item: item[1])[0]
            date_tweet_sums.append((date, total_tweets))
            top_users_by_date[date] = top_user

        # Sort list by total tweets and save just top 10
        top_10_dates = sorted(date_tweet_sums, key=lambda x: x[1], reverse=True)[:10]

        # Use the date list and users dict to return the list
        return [(tweet_date, top_users_by_date[tweet_date]) for tweet_date, _ in top_10_dates]

q1_time(file_path=file_path, version=2)

In [None]:
%%memray_flamegraph
def tweet_generator(file_path):
    # Open file in read mode
    with open(file_path, 'r') as f:
        # Read each line
        for line in f:
            # Delete line breaks and spaces
            line = line.strip()
            if not line:
                continue

            # Validate JSON format
            if line.startswith('{') and line.endswith('}'):
                try:
                    # Convert to object
                    tweet = json.loads(line)
                except json.JSONDecodeError as e:
                    print(f'Error decoding the line: {line.strip()} - {e}')

                # Save the date and username
                tweet_date = datetime.strptime(tweet['date'], '%Y-%m-%dT%H:%M:%S+00:00').date()
                username = tweet['user']['username']

                # Use of yield to return data one by one
                yield tweet_date, username

def process_tweets(file_path):
    # Dict to count tweets by date and user
    tweets_per_date = defaultdict(lambda: defaultdict(int))

    # use generator to process each tweet
    for tweet_date, username in tweet_generator(file_path):
        # Add to dict
        tweets_per_date[tweet_date][username] += 1

    return tweets_per_date

def q1_memory(file_path):
    top_dates_heap = []
    tweets = process_tweets(file_path)

    # Process file in an incremental way and free memory whenever possible
    for date, user_data in tweets.items():
        # Total tweets by user
        total_tweets = sum(user_data.values())
        top_user = max(user_data, key=user_data.get)
        top_user_count = user_data[top_user]

        # Use heap
        heapq.heappush(top_dates_heap, (total_tweets, date, top_user, top_user_count))

        # Limit the heap to 10 elements
        if len(top_dates_heap) > 10:
            heapq.heappop(top_dates_heap)

        # Free memory manually
        del user_data
        gc.collect()

    # Convert the heap to sorted list
    top_dates_sorted = sorted(top_dates_heap, key=lambda x: x[0], reverse=True)

    return [(date, top_user) for _, date, top_user, _ in top_dates_sorted]

q1_memory(file_path=file_path)

### After use the packages, we can see that in terms of time the function q1_time is more efficient than q1_memory. However, in terms of memory we can see in the profiling that both functions has the same use of memory that is because in both cases I read the file line by line so just save in memory the last object that I processed. I try to load all the file in memory in q1_time but the orjson.loads function took a lot of time trying to convert the entire file to an object.

## Exercise 2

### For the second exercise I implemented a function to extract emojis from a text. Then I process the file line by line and extract emojis from the text and save in a Counter.

### For this exercise I assumed that I should to find the most used emojis regardless of whether they were in the main tweet or in a quoted tweet.

### I used the emoji package because even the use of regular expressions can be more efficient, that way has a complexity in maintain uploaded the list of expressions. The package emoji is uploaded with each new version of unicode

In [None]:
def extract_emojis(text):
    """Extract emojis from text."""
    # Use emoji package to find emojis
    return [char for char in text if char in emoji.EMOJI_DATA]

def q2_time(file_path):
    emoji_counter = Counter()
    with open(file_path, 'r') as f:
            # Read the file line by line
            for line in f:
                # Fix and convert each line in a JSON object
                line = line.strip()
                if not line:
                    # Ignore empty lines
                    continue

                # Validate the object to process correctly
                if line.startswith('{') and line.endswith('}'):
                    try:
                        tweet = orjson.loads(line)
                    except orjson.JSONDecodeError as e:
                        print(f'Error decoding the line: {line.strip()} - {e}')

                # Extract emojis from 'content'
                content_emojis = extract_emojis(tweet.get('content', ''))
                emoji_counter.update(content_emojis)

                # Check if quotedTweet exists and extract emojis from their 'content'
                quoted_tweet = tweet.get('quotedTweet')
                if quoted_tweet:
                    quoted_content_emojis = extract_emojis(quoted_tweet.get('content', ''))
                    emoji_counter.update(quoted_content_emojis)

            # Obtain top 10 of emojis
            top_10 = emoji_counter.most_common(10)

            return top_10

In [None]:
q2_time(file_path)

### For the memory function I used the same approach. I extract emojis with the emoji package from the content of the main tweet and the quoted tweet. And I used generator to process the file in incrementally way

In [None]:
def extract_emojis(text):
    """Extract emojis from text."""
    return [char for char in text if char in emoji.EMOJI_DATA]

def tweet_generator(file_path):
    """Generator to process each tweet and their quotedTweet."""
    with open(file_path, 'r') as f:
            # Read the file line by line
            for line in f:
                # Fix and convert each line in a JSON object
                line = line.strip()
                if not line:
                    # Ignore empty lines
                    continue

                # Validate the object to process correctly
                if line.startswith('{') and line.endswith('}'):
                    try:
                        tweet = json.loads(line)
                    except json.JSONDecodeError as e:
                        print(f'Error decoding the line: {line.strip()} - {e}')

                # Process emojis from 'content'
                yield extract_emojis(tweet.get('content', ''))

                # Process emojis from 'content' in the 'quotedTweet' if it exists
                quoted_tweet = tweet.get('quotedTweet')
                if quoted_tweet:
                    yield extract_emojis(quoted_tweet.get('content', ''))

def q2_memory(file_path):
    emoji_counter = Counter()

    # Use generator to process tweets one by one
    for emoji_list in tweet_generator(file_path):
        for emoji_char in emoji_list:
            emoji_counter[emoji_char] += 1

    # Obtain top 10 of emojis
    return emoji_counter.most_common(10)

In [None]:
q2_memory(file_path)

In [None]:
%snakeviz q2_time(file_path)

In [None]:
%snakeviz q2_memory(file_path)

In [None]:
%%memray_flamegraph
def extract_emojis(text):
    """Extract emojis from text."""
    # Use emoji package to find emojis
    return [char for char in text if char in emoji.EMOJI_DATA]

def q2_time(file_path):
    emoji_counter = Counter()
    with open(file_path, 'r') as f:
            # Read the file line by line
            for line in f:
                # Fix and convert each line in a JSON object
                line = line.strip()
                if not line:
                    # Ignore empty lines
                    continue

                # Validate the object to process correctly
                if line.startswith('{') and line.endswith('}'):
                    try:
                        tweet = orjson.loads(line)
                    except orjson.JSONDecodeError as e:
                        print(f'Error decoding the line: {line.strip()} - {e}')

                # Extract emojis from 'content'
                content_emojis = extract_emojis(tweet.get('content', ''))
                emoji_counter.update(content_emojis)

                # Check if quotedTweet exists and extract emojis from their 'content'
                quoted_tweet = tweet.get('quotedTweet')
                if quoted_tweet:
                    quoted_content_emojis = extract_emojis(quoted_tweet.get('content', ''))
                    emoji_counter.update(quoted_content_emojis)

            # Obtain top 10 of emojis
            top_10 = emoji_counter.most_common(10)

            return top_10

q2_time(file_path)

In [None]:
%%memray_flamegraph
def extract_emojis(text):
    """Extract emojis from text."""
    return [char for char in text if char in emoji.EMOJI_DATA]

def tweet_generator(file_path):
    """Generator to process each tweet and their quotedTweet."""
    with open(file_path, 'r') as f:
            # Read the file line by line
            for line in f:
                # Fix and convert each line in a JSON object
                line = line.strip()
                if not line:
                    # Ignore empty lines
                    continue

                # Validate the object to process correctly
                if line.startswith('{') and line.endswith('}'):
                    try:
                        tweet = json.loads(line)
                    except json.JSONDecodeError as e:
                        print(f'Error decoding the line: {line.strip()} - {e}')

                # Process emojis from 'content'
                yield extract_emojis(tweet.get('content', ''))

                # Process emojis from 'content' in the 'quotedTweet' if it exists
                quoted_tweet = tweet.get('quotedTweet')
                if quoted_tweet:
                    yield extract_emojis(quoted_tweet.get('content', ''))

def q2_memory(file_path):
    emoji_counter = Counter()

    # Use generator to process tweets one by one
    for emoji_list in tweet_generator(file_path):
        for emoji_char in emoji_list:
            emoji_counter[emoji_char] += 1  # Contar emojis uno por uno

    # Obtain top 10 of emojis
    return emoji_counter.most_common(10)

q2_memory(file_path)

### In the second exercise, after use snakeviz we can see that the function q2_time has a better performance in terms of execution time. For memory, in the profiling we can see that the function q2_memory has a better use of memory but the difference is 2 KiB. In large data sets there could be more difference

## Exercise 3

### For exercise 3 like in exercise 2 I assumed that I should to find the most influential users regardless of whether they were in the main tweet or in a quoted tweet.

In [None]:
def q3_time(file_path):
    mention_counter = Counter()

    with open(file_path, 'r') as f:
            # Read the file line by line
            for line in f:
                # Fix and convert each line in a JSON object
                line = line.strip()
                if not line:
                    # Ignore empty lines
                    continue

                # Validate the object to process correctly
                if line.startswith('{') and line.endswith('}'):
                    try:
                        tweet = orjson.loads(line)
                    except orjson.JSONDecodeError as e:
                        print(f'Error decoding the line: {line.strip()} - {e}')

                # Process mentions in 'mentionedUsers' from main tweet
                mentioned_users = tweet.get('mentionedUsers', [])
                if mentioned_users is None:
                    mentioned_users = []
                mention_counter.update(user['username'] for user in mentioned_users)

                # Process mentions in 'mentionedUsers' from 'quotedTweet' if it exists
                quoted_tweet = tweet.get('quotedTweet')
                if quoted_tweet:
                    quoted_mentioned_users = quoted_tweet.get('mentionedUsers', [])
                    if quoted_mentioned_users is None:
                        quoted_mentioned_users = []
                    mention_counter.update(user['username'] for user in quoted_mentioned_users)

            # Obtain top 10 of more mentioned users
            return mention_counter.most_common(10)

In [None]:
q3_time(file_path)

### As well as in the other exercises, for the memory implementation I used generators to maintain the memory with an efficient use.

In [None]:
def mention_generator(file_path):
    """Generator to process mentions of each tweet and their quotedTweet."""
    with open(file_path, 'r') as f:
            # Read the file line by line
            for line in f:
                # Fix and convert each line in a JSON object
                line = line.strip()
                if not line:
                    # Ignore empty lines
                    continue

                # Validate the object to process correctly
                if line.startswith('{') and line.endswith('}'):
                    try:
                        tweet = json.loads(line)
                    except json.JSONDecodeError as e:
                        print(f'Error decoding the line: {line.strip()} - {e}')

                # Process mentions in 'mentionedUsers' from main tweet
                mentioned_users = tweet.get('mentionedUsers', [])
                if mentioned_users is None:
                    mentioned_users = []
                for user in mentioned_users:
                    yield user['username']

                # Process mentions in 'mentionedUsers' from 'quotedTweet' if it exists
                quoted_tweet = tweet.get('quotedTweet')
                if quoted_tweet:
                    quoted_mentioned_users = quoted_tweet.get('mentionedUsers', [])
                    if quoted_mentioned_users is None:
                        quoted_mentioned_users = []
                    for user in quoted_mentioned_users:
                        yield user['username']

def q3_memory(file_path):
    mention_counter = Counter()

    # Use generator to process mentions
    for username in mention_generator(file_path):
        mention_counter[username] += 1

    # Obtain top 10 of more mentioned users
    return mention_counter.most_common(10)

In [None]:
q3_memory(file_path)

In [None]:
%snakeviz q3_time(file_path)

In [None]:
%snakeviz q3_memory(file_path)

In [None]:
%%memray_flamegraph
def q3_time(file_path):
    mention_counter = Counter()

    with open(file_path, 'r') as f:
            # Read the file line by line
            for line in f:
                # Fix and convert each line in a JSON object
                line = line.strip()
                if not line:
                    # Ignore empty lines
                    continue

                # Validate the object to process correctly
                if line.startswith('{') and line.endswith('}'):
                    try:
                        tweet = orjson.loads(line)
                    except json.JSONDecodeError as e:
                        print(f'Error decoding the line: {line.strip()} - {e}')

                # Process mentions in 'mentionedUsers' from main tweet
                mentioned_users = tweet.get('mentionedUsers', [])
                if mentioned_users is None:
                    mentioned_users = []
                mention_counter.update(user['username'] for user in mentioned_users)

                # Process mentions in 'mentionedUsers' from 'quotedTweet' if it exists
                quoted_tweet = tweet.get('quotedTweet')
                if quoted_tweet:
                    quoted_mentioned_users = quoted_tweet.get('mentionedUsers', [])
                    if quoted_mentioned_users is None:
                        quoted_mentioned_users = []
                    mention_counter.update(user['username'] for user in quoted_mentioned_users)

            # Obtain top 10 of more mentioned users
            return mention_counter.most_common(10)

q3_time(file_path)

In [None]:
%%memray_flamegraph
def mention_generator(file_path):
    """Generator to process mentions of each tweet and their quotedTweet."""
    with open(file_path, 'r') as f:
            # Read the file line by line
            for line in f:
                # Fix and convert each line in a JSON object
                line = line.strip()
                if not line:
                    # Ignore empty lines
                    continue

                # Validate the object to process correctly
                if line.startswith('{') and line.endswith('}'):
                    try:
                        tweet = json.loads(line)
                    except json.JSONDecodeError as e:
                        print(f'Error decoding the line: {line.strip()} - {e}')

                # Process mentions in 'mentionedUsers' from main tweet
                mentioned_users = tweet.get('mentionedUsers', [])
                if mentioned_users is None:
                    mentioned_users = []
                for user in mentioned_users:
                    yield user['username']

                # Process mentions in 'mentionedUsers' from 'quotedTweet' if it exists
                quoted_tweet = tweet.get('quotedTweet')
                if quoted_tweet:
                    quoted_mentioned_users = quoted_tweet.get('mentionedUsers', [])
                    if quoted_mentioned_users is None:
                        quoted_mentioned_users = []
                    for user in quoted_mentioned_users:
                        yield user['username']

def q3_memory(file_path):
    mention_counter = Counter()

    # Use generator to process mentions
    for username in mention_generator(file_path):
        mention_counter[username] += 1

    # Obtain top 10 of more mentioned users
    return mention_counter.most_common(10)

q3_memory(file_path)

### For the third exercise, we see that in terms of time, the execution time for the q3_time function is lower. In terms of memory, we see that both use approximately 883 KiB of memory; this, like in the previous case, may be due to the fact that both implementations process the file line by line and store it in structures that are already optimized for memory usage. Since neither of the functions loads the file completely, the usage is very similar in that they access the necessary elements in each iteration.

## Test functions from files

In [2]:
from q1_time import q1_time
from q1_memory import q1_memory
from q2_time import q2_time
from q2_memory import q2_memory
from q3_time import q3_time
from q3_memory import q3_memory

In [None]:
file_path = "farmers-protest-tweets-2021-2-4.json"

In [3]:
q1_time(file_path=file_path)

[(datetime.date(2021, 2, 12), 'RanbirS00614606'),
 (datetime.date(2021, 2, 13), 'MaanDee08215437'),
 (datetime.date(2021, 2, 17), 'RaaJVinderkaur'),
 (datetime.date(2021, 2, 16), 'jot__b'),
 (datetime.date(2021, 2, 14), 'rebelpacifist'),
 (datetime.date(2021, 2, 18), 'neetuanjle_nitu'),
 (datetime.date(2021, 2, 15), 'jot__b'),
 (datetime.date(2021, 2, 20), 'MangalJ23056160'),
 (datetime.date(2021, 2, 23), 'Surrypuria'),
 (datetime.date(2021, 2, 19), 'Preetm91')]

In [None]:
q1_memory(file_path=file_path)

In [None]:
q2_time(file_path=file_path)

In [None]:
q2_memory(file_path=file_path)

In [None]:
q3_time(file_path=file_path)

In [None]:
q3_memory(file_path=file_path)

## Solution with gcp

### As an extra exercise, implement a small solution that stores the data in Cloud Storage, a function that stores the data from Cloud Storage in BigQuery, and a test of queries to BigQuery.

### The credentials file is not sent, but a service account can be created, keys can be generated, and the path to the keys can be saved as an environment variable called GOOGLE_APPLICATION_CREDENTIALS.

In [None]:
import os
import orjson

from google.cloud import storage, bigquery

### Here you can change the file names and the project_id to replicate it.

In [None]:
file_path = "farmers-protest-tweets-2021-2-4.json"
bucket_name = "challenge-latam"
source_file_name = "corrected_data.json"
destination_blob_name = "data_farmers_corrected.json"
file_name = "data_farmers_corrected.json"
project_id = "latam-challenge-438317"

In [None]:
def fix_source_file(file_path: str):
    """Function to fix source file and save it as a new file."""
    objects = []

    with open(file_path, 'r') as f:
        # Read the file line by line
        for line in f:
            # Fix and convert each line in a JSON object
            line = line.strip()
            if not line:
                # Ignore empty lines
                continue

            # Validate the object to process correctly
            if line.startswith('{') and line.endswith('}'):
                try:
                    tweet = orjson.loads(line)
                    objects.append(tweet)
                except orjson.JSONDecodeError as e:
                    print(f'Error decoding the line: {line.strip()} - {e}')

        json_string = orjson.dumps(objects)

    # Name of new file
    file_name = 'corrected_data.json'

    # Save JSON as a new file
    with open(file_name, 'wb') as json_file:
        json_file.write(json_string)

In [None]:
def create_bucket(bucket_name, storage_client):
    """Crate bucket in Google Cloud Storage."""
    # Initialize client
    storage_client = storage_client

    # Create bucket
    bucket = storage_client.bucket(bucket_name)

    # Create bucket in GCS
    new_bucket = storage_client.create_bucket(bucket)

    print(f'Bucket {new_bucket.name} created.')

In [None]:
def upload_to_gcs(bucket_name, source_file_name, destination_blob_name):
    """Load file to Google Cloud Storage bucket."""

    # Create client
    storage_client = storage.Client.from_service_account_json(os.getenv("GOOGLE_APPLICATION_CREDENTIALS"))

    # Obtain bucket
    bucket = storage_client.bucket(bucket_name)

    # Create blob
    blob = bucket.blob(destination_blob_name)

    # Load file
    try:
        blob.upload_from_filename(source_file_name)
        print(f"File {source_file_name} uploaded to {destination_blob_name} in bucket {bucket_name}.")
    except Exception as e:
        create_bucket(bucket_name, storage_client)
        blob.upload_from_filename(source_file_name)
        print(f"File {source_file_name} uploaded to {destination_blob_name} in bucket {bucket_name}.")

In [None]:
def upload_to_bigquery(dataset_id, table_id, rows_to_insert):
    """Function to upload data to bigquery."""
    bq_client = bigquery.Client.from_service_account_json(os.getenv("GOOGLE_APPLICATION_CREDENTIALS"))
    table_ref = bq_client.dataset(dataset_id).table(table_id)
    table = bq_client.get_table(table_ref)
    errors = bq_client.insert_rows_json(table, rows_to_insert)

    if errors:
        raise RuntimeError(f"Error inserting rows: {errors}")
    return f"Inserted {len(rows_to_insert)} rows into {table_id}."

In [None]:
def process_json(bucket_name: str, file_name: str):
    """Function to process JSON and return as a list."""
    client = storage.Client.from_service_account_json(os.getenv("GOOGLE_APPLICATION_CREDENTIALS"))
    bucket = client.get_bucket(bucket_name)
    blob = bucket.blob(file_name)
    json_data = blob.download_as_text()
    try:
        tweets_data = orjson.loads(json_data)
    except Exception as e:
        print(f"Error {e}")

    return tweets_data


In [None]:
def build_data_structures(tweets_data):
    """Function to build data structures to upload to BigQuery."""

    tweets = []
    users = []

    for tweet in tweets_data:
        user = tweet.get("user")
        user_id = user.get("id")  # Obtén el ID del usuario

        # Add user to list of users
        users.append({
            "username": user.get("username"),
            "displayname": user.get("displayname"),
            "id": user_id,
            "followersCount": user.get("followersCount"),
            "friendsCount": user.get("friendsCount"),
            "statusesCount": user.get("statusesCount"),
            "created": user.get("created"),
        })

        # Process and add tweet
        tweets.append({
            "url": tweet.get("url"),
            "date": tweet.get("date"),
            "content": tweet.get("content"),
            "id": tweet.get("id"),
            "replyCount": tweet.get("replyCount"),
            "retweetCount": tweet.get("retweetCount"),
            "likeCount": tweet.get("likeCount"),
            "quoteCount": tweet.get("quoteCount"),
            "lang": tweet.get("lang"),
            "source": tweet.get("source"),
            "user_id": user_id,
            "mentioned_users": [user['id'] for user in tweet.get('mentionedUsers', [])] if tweet.get('mentionedUsers') else [],
            "quoted_tweet_id": tweet.get('quotedTweet', {}).get('id', None) if tweet.get('quotedTweet') else None,
            "quoted_tweet_content": tweet.get('quotedTweet', {}).get('content', None) if tweet.get('quotedTweet') else None,
            "quoted_tweet_mentioned_users": [user['id'] for user in tweet.get('quotedTweet', {}).get('mentionedUsers', [])] if tweet.get('quotedTweet') and tweet.get('quotedTweet').get('mentionedUsers', []) else []
        })

    return tweets, users


In [None]:
def create_bigquery_tables(project_id, dataset_id, table_id, schema):
    """Function to create BigQuery tables."""

    bq_client = bigquery.Client.from_service_account_json(os.getenv("GOOGLE_APPLICATION_CREDENTIALS"))

    # ID of new Dataset
    dataset_name = f'{project_id}.{dataset_id}'

    dataset = bigquery.Dataset(dataset_name)
    dataset.description = "Dataset created to Latam Challenge."

    # Create Dataset in BigQuery
    try:
        dataset = bq_client.create_dataset(dataset)
        print(f"Dataset {dataset.dataset_id} successfully created.")
    except Exception as e:
        print(f"Error creating dataset: {e}")

    # Create reference to the Dataset
    dataset_ref = bq_client.dataset(dataset_id)

    # Define table
    table_ref = dataset_ref.table(table_id)

    # Create table
    table = bigquery.Table(table_ref, schema=schema)

    # Create table in BigQuery
    try:
        table = bq_client.create_table(table)  # Crea la tabla
        print(f"Table {table.table_id} created in dataset {dataset_id}.")
    except Exception as e:
        print(f"Error creating table: {e}")

In [None]:
def unique_data(tweets, users):
    """Function to clean data structures."""

    unique_tweets = []
    seen_ids_tweets = set()

    for item in tweets:
        if item["id"] not in seen_ids_tweets:
            unique_tweets.append(item)
            seen_ids_tweets.add(item["id"])

    unique_users = []
    seen_ids_users = set()

    for item in users:
        if item["id"] not in seen_ids_users:
            unique_users.append(item)
            seen_ids_users.add(item["id"])

    return unique_tweets, unique_users

### This is the main process. First, the file is corrected and sent to Cloud Storage. Then, the uploaded file is processed; the function downloads the file and returns it as an object. Next, the data structures to be sent to BigQuery are built. The tables and the dataset are created if they do not exist. After that, the data is cleaned, and finally, it is sent to the tables.

In [None]:
fix_source_file(file_path="farmers-protest-tweets-2021-2-4.json")
upload_to_gcs(bucket_name, source_file_name, destination_blob_name)
tweets_data = process_json(bucket_name=bucket_name, file_name=file_name)
tweets, users = build_data_structures(tweets_data=tweets_data)

tweets_schema = [
    bigquery.SchemaField("url", "STRING"),
    bigquery.SchemaField("date", "TIMESTAMP"),
    bigquery.SchemaField("content", "STRING"),
    bigquery.SchemaField("id", "INTEGER"),
    bigquery.SchemaField("replyCount", "INTEGER"),
    bigquery.SchemaField("retweetCount", "INTEGER"),
    bigquery.SchemaField("likeCount", "INTEGER"),
    bigquery.SchemaField("quoteCount", "INTEGER"),
    bigquery.SchemaField("lang", "STRING"),
    bigquery.SchemaField("source", "STRING"),
    bigquery.SchemaField("user_id", "INTEGER"),
    bigquery.SchemaField("mentioned_users", "INTEGER", mode="REPEATED"),
    bigquery.SchemaField("quoted_tweet_id", "INTEGER"),
    bigquery.SchemaField("quoted_tweet_content", "STRING"),
    bigquery.SchemaField("quoted_tweet_mentioned_users", "INTEGER", mode="REPEATED")
]

users_schema = [
    bigquery.SchemaField("username", "STRING"),
    bigquery.SchemaField("displayname", "STRING"),
    bigquery.SchemaField("id", "INTEGER"),
    bigquery.SchemaField("followersCount", "INTEGER"),
    bigquery.SchemaField("friendsCount", "INTEGER"),
    bigquery.SchemaField("statusesCount", "INTEGER"),
    bigquery.SchemaField("created", "TIMESTAMP"),
]
create_bigquery_tables(project_id, "challenge_data", "tweets", tweets_schema)
create_bigquery_tables(project_id, "challenge_data", "users", users_schema)

tweets, users = unique_data(tweets, users)

# Insert in BigQuery

try:
    for elements in range(0, 120000, 10000):
        tweet_insert = upload_to_bigquery("challenge_data", "tweets", tweets[elements: elements + 10000])
    user_insert = upload_to_bigquery("challenge_data", "users", users)
except Exception as e:
    print(f"Error uploading data to BigQuery: {str(e)}")

print(f"Upload successful: {tweet_insert}, {user_insert}")

## Api Implementation

### An API was created with an endpoint to showcase applications that could arise from bringing the data to BigQuery. This API consumes directly from BigQuery, and an endpoint was implemented that solves the first exercise.

## Below are images of the test done from Postman, showing the response, and images of the Swagger generated with the API documentation.

## In the file 'test from Postman' there is an image of the test conducted on the API that solves exercise 1.

## In the file 'Api Documentation' there is an image of the test conducted on the API that solves exercise 1.

## Finally, as potential improvements, GCP logging could be implemented for error handling, which is already in place. Additionally, more extensive error handling could be added, along with unit tests to test the functions or API endpoints.