In [14]:
import pandas as pd
import numpy as np
import os
import glob
import time
from concurrent.futures import wait as futures_wait
from concurrent.futures.process import ProcessPoolExecutor

In [4]:
data_path = "/mainstorage/Data/COVID-19-Tweets/"
january_data_path = os.path.join(data_path, "2020-01")


In [9]:
glob.glob(data_path + "/2*")

['/mainstorage/Data/COVID-19-Tweets/2020-01',
 '/mainstorage/Data/COVID-19-Tweets/2020-02',
 '/mainstorage/Data/COVID-19-Tweets/2020-03',
 '/mainstorage/Data/COVID-19-Tweets/2020-04',
 '/mainstorage/Data/COVID-19-Tweets/2020-05',
 '/mainstorage/Data/COVID-19-Tweets/2020-06',
 '/mainstorage/Data/COVID-19-Tweets/2020-07']

In [None]:
def process_data(df: pd.DataFrame):
    time.sleep(5)
    return len(df)

def process_all_data(filename, chunksize=int(1e6), workers=7):
    results = []
    # single process
#     for chunk in pd.read_csv(filename, lineterminator='\n', chunksize=chunksize):
#         results.append(process_data(chunk))
    executor = ProcessPoolExecutor(max_workers=workers)
    futures = []
    
    # multiprocess
#     for chunk in pd.read_csv(filename, lineterminator='\n', chunksize=chunksize):
#         subchunks = np.array_split(chunk, workers)
#         for sc in subchunks:
#             futures.append(executor.submit(process_data, sc))
#         futures_wait(futures)
#         # waits until all futures are completed
#         partial_results = [fut.result() for fut in futures]
# #         for fut in futures:
# #             results.append(fut.result())
#         results.extend(partial_results)

    chunks = pd.read_csv(filename, lineterminator='\n', chunksize=chunksize)
    chunk = None
    
    try:
        chunk = next(chunks)
    except StopIteration:
        chunk = None
    
    while chunk is not None:
    
        subchunks = np.array_split(chunk, workers)
        for sc in subchunks:
            futures.append(executor.submit(process_data, sc))
        try:
            chunk = next(chunks)
        except StopIteration:
            chunk = None
        
        futures_wait(futures)
        # waits until all futures are completed
        partial_results = [fut.result() for fut in futures]
#         for fut in futures:
#             results.append(fut.result())
        results.extend(partial_results)
    
    return results


In [36]:
def process_data(df: pd.DataFrame):
    time.sleep(5)
    return {'count': len(df), 'unique_users': len(df['user_screen_name'].unique())}

def process_all_data(filename, chunksize=int(1e6), workers=7):
    total_count = 0
    total_unique_users = 0
    
    executor = ProcessPoolExecutor(max_workers=workers)
    futures = []

    chunks = pd.read_csv(filename, lineterminator='\n', chunksize=chunksize)
    chunk = None
    
    try:
        chunk = next(chunks)
    except StopIteration:
        chunk = None
    
    while chunk is not None:
    
        subchunks = np.array_split(chunk, workers)
        for sc in subchunks:
            futures.append(executor.submit(process_data, sc))
        try:
            chunk = next(chunks)
        except StopIteration:
            chunk = None
        
        futures_wait(futures)
        # waits until all futures are completed
        
        for fut in futures:
            res = fut.result()
            total_count += res['count']
            total_unique_users += res['unique_users']
    
    return results


In [27]:
users = pd.read_csv('/media/felipe/Extreme SSD/data/COVID-19-Tweets/2020-06/users.csv', lineterminator='\n', usecols=['created_at'], low_memory=False)
users[users['created_at'].isna()]

Unnamed: 0,created_at


In [28]:
tweets = next(pd.read_csv('/media/felipe/Extreme SSD/data/COVID-19-Tweets/2020-05/tweets.csv', chunksize=1000, lineterminator='\n'))

In [29]:
tweets.head()

Unnamed: 0,id,id_str,created_at,user_id,user_screen_name,user_verified,user_created_at,user_tweets_count,user_friends_count,user_followers_count,...,quoted_status_text,quoted_status_created_at,quoted_user_id,quoted_user_created_at,hashtags,symbols,user_mentions,urls,polls,media
0,1258352962289537024,1258352962289537024,Thu May 07 11:08:19 +0000 2020,1190945888174129161,mimichanmama211,False,Sun Nov 03 10:57:24 +0000 2019,3678,1530,360,...,,,,,"[{'text': 'StayHome', 'indices': [22, 31]}, {'...",[],"[{'screen_name': 'Kraso_felissimo', 'name': 'フ...",[],[],[]
1,1258352959399878656,1258352959399878656,Thu May 07 11:08:19 +0000 2020,1327297680,FlorBallarino,False,Thu Apr 04 16:32:34 +0000 2013,8015,2230,18202,...,,,,,[],[],"[{'screen_name': 'ANMATsalud', 'name': 'ANMAT'...","[{'url': 'https://t.co/4vt6VebJ5j', 'expanded_...",[],[]
2,1258352959794143233,1258352959794143233,Thu May 07 11:08:19 +0000 2020,781075960468832256,KewalSi72649359,False,Wed Sep 28 10:19:55 +0000 2016,308,7,0,...,,,,,"[{'text': 'UtiliseLockdownWisely', 'indices': ...",[],"[{'screen_name': 'Kirti83081269', 'name': 'Kir...",[],[],[]
3,1258352959320186881,1258352959320186881,Thu May 07 11:08:19 +0000 2020,2261320772,ArjPascal,False,Wed Dec 25 10:16:37 +0000 2013,399727,4963,2712,...,,,,,"[{'text': 'Pologne', 'indices': [63, 71]}, {'t...",[],"[{'screen_name': 'Conflits_FR', 'name': 'Confl...",[],[],[]
4,1258352962843394048,1258352962843394048,Thu May 07 11:08:20 +0000 2020,2787131872,merlinslaugh,False,Sat Sep 27 13:16:27 +0000 2014,113272,565,695,...,"Government confirms 400,000 Turkish gowns are ...",Thu May 07 09:17:19 +0000 2020,87818409.0,Thu Nov 05 23:49:19 +0000 2009,[],[],[],"[{'url': 'https://t.co/YQBuxZtNNK', 'expanded_...",[],[]


In [32]:
def pippo(x):
    x**3
def pippo2(x):
    x**2

In [31]:
%timeit pippo(10)

182 ns ± 1.68 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)


In [33]:
%timeit pippo2(10)

174 ns ± 0.42 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)


In [35]:
%prun -l 2 pippo(10)

 