Stage 2:  Data preprocessing stage to produce structured data in csv format also stored in Cloud Object Storage. As columns in the csv file we suggest date, geographic location, url, and sentiment analysis.

In [None]:
import lithops

analysis_folder_name = ''

# Retrieve analysis_folder from Stage 1 notebook
%store -r analysis_folder_name

if len(analysis_folder_name) == 0:
    print('You have not defined any folder yet (use Stage 1 notebook or the \'analysis_folder\' variable directly)')
else:
    # Prepend bucket name to the hashtags
    bucket_name = lithops.Storage().bucket
    data_location = ['cos://'+bucket_name+'/'+analysis_folder_name+'/']
    print('Paths to preprocess:', data_location)

In [None]:
from transformers import pipeline
from custom_snscrape.twitter import Tweet

def map_preprocess(obj) -> dict:

    classifier = pipeline('sentiment-analysis', model="nlptown/bert-base-multilingual-uncased-sentiment")

    to_reduction = []

    for data_body_str in obj.data_stream.read().decode('utf-8').splitlines():
        tweet = Tweet.from_json(data_body_str)

        # Skip tweets with non-supported langs
        if tweet.lang not in ['es', 'en', 'it', 'fr', 'de', 'nl']:
            continue
        
        sentiment = classifier(tweet.content)

        to_reduction.append({
                 'id': tweet.id,
                 'user': tweet.user.username,
                 'content': tweet.content,
                 'lang': tweet.lang,
                 'coordinates': tweet.coordinates,
                 'retweetCount': tweet.retweetCount,
                 'likeCount': tweet.likeCount,
                 'quoteCount': tweet.quoteCount,
                 'replyCount': tweet.replyCount,
                 'date': tweet.date.isoformat(),
                 'mentionedUsers': tweet.mentionedUsers,
                 'outlinks': tweet.outlinks,
                 'sentiment': sentiment[0]['label']
            })
    
    return to_reduction


In [None]:
import csv
from io import StringIO


def reduce_to_csv(results, storage):
    
    # BUG: Because we are passing dataclasses from a pickled module inside map() function as its results, we must
    # import something from the library that contains the dataclasses or lithops won't find it.
    # If you don't believe me, remove this line :)
    from custom_snscrape.twitter import Tweet
    #############################################################################################################

    key = analysis_folder_name + '.csv'

    with StringIO() as csv_buffer:
        column_names = results[0][0].keys() # all rows should have same headers

        dict_writer = csv.DictWriter(csv_buffer, column_names)
        dict_writer.writeheader()

        for single_map_results in results:
            dict_writer.writerows(single_map_results)

        storage.put_object(bucket=storage.bucket,
                            key=key,
                            body=csv_buffer.getvalue())

    return 'reduced result and stored it at cos://{}/{}'.format(storage.bucket, key)

In [None]:
# Bigger chunksize, longer functions (bigger chances of timeout)
# Smaller chunksize, faster execution, more parallelism, more resources consumption
object_chunksize = int(0.3 * 1024**2)  # 0.3 MB

with lithops.FunctionExecutor(runtime='gilarasa/lithops-cloudbutton-challenge-py3.9:0.5', runtime_memory=2048, log_level='debug') as fexec:
  
  fexec.map_reduce(map_preprocess, data_location, reduce_to_csv, obj_chunk_size=object_chunksize) 
  print(fexec.get_result())