In [1]:
import numpy as np
import pandas as pd
from datetime import datetime
import json
import os
from glob import glob 
import itertools

In [2]:
def fix_datetime(df, timevar='created_at_h'):
    df[timevar] = pd.to_datetime(df[timevar])

def fix_token_counter(df):
    df.token_counter = df.token_counter.apply(lambda x: Counter(x))  

def fix_RT_id(df):
    df.RT_id = df.RT_id.astype(str) 


def convert_floats(df, float_dtype='float32'):
    floats = df.select_dtypes(include=['float64']).columns.tolist()
    df[floats] = df[floats].astype(float_dtype)
    return df

def tw_data_files_to_df_csv(files):
    '''append and concat data files into a pandas.DataFrame'''
    df = []
    [ df.append(pd.read_csv(file)) for file in files ]
    df = pd.concat(df, ignore_index=True)
    return df

def tw_data_files_to_df_csv2(files, frac=0.05, float_dtype=None):
    '''append and concat a sample of data into a pandas.DataFrame'''
    df = []
    [ df.append(pd.read_csv(file, low_memory=True)
        .sample(frac=frac, replace=True)) for file in files ]
    df = pd.concat(df, ignore_index=True)
    if float_dtype is None: return df
    return convert_floats(df, float_dtype)


def tw_data_files_to_df_json(files, lines=False):
    '''append and concat data files into a pandas.DataFrame'''
    df = []
    [ df.append(pd.read_json(file, orient='records', lines=lines)) for file in files ]
    df = pd.concat(df, ignore_index=True)
    return df


def tw_data_files_to_df_json3(files, lines=False, frac=0.05, float_dtype=None, verbose=False):
    '''append and concat a sample of data into a pandas.DataFrame'''
    df = []
    for file in files:
        if verbose: print('loading ' + file)
        df.append(pd.read_json(file, orient='records', lines=lines)
                 .sample(frac=frac, replace=True)) 
    df = pd.concat(df, ignore_index=True)
    if float_dtype is None: return df
    return convert_floats(df, float_dtype)

def keep_recent_files(files, base_timestamp, file_type= '.json', days = 14,
                      prefix = 'created_at_'):
    timestamps = [pd.Timestamp(file.split(prefix,1)[1]
                               .replace(file_type,'').replace('_',' ')) for file in files ]
    keep_idx1 = [(base_timestamp - timestamp) <= pd.Timedelta(days, unit='d') for timestamp in timestamps]
    return(list(itertools.compress(files,keep_idx1)))

In [3]:
data_path = '/Users/kotaminegishi/big_data_training/python/dash_demo1/'
data_dest = '/Users/kotaminegishi/big_data_training/python/dash_demo1/'

process_datatime = pd.to_datetime(datetime(2020,7,20))
process_datatime_d = process_datatime.floor('d')
#_dt = latest_datatime.floor('d').to_pydatetime()


In [4]:
def mark_tokens_contain_keyword(df, keyword):
    # returns an index indicating whether variable 'tokens' contains keyword
    return df.tokens.apply(lambda x: keyword.lower() in x)

def mark_tokens_contain_keywords(df, keywords):
    idx = [mark_tokens_contain_keyword(df, keyword) for keyword in keywords]
    return pd.DataFrame(idx).agg(max).astype(bool)
    
def mark_tokens_contain_keyword_jointly(df, keywords):
    # returns an index indicating whether variable 'tokens' contains keyword
    idx = [mark_tokens_contain_keyword(df, keyword) for keyword in keywords]
    return pd.DataFrame(idx).agg(min).astype(bool) 
    

def get_columns_json(file):
    chunk1 = pd.read_json(file, chunksize=1, orient='records', lines=True)
    for d in chunk1:
        data1 = d.iloc[0]
        break
    return list(data1.keys())

def get_columns_csv(file):
    chunk1 = pd.read_csv(file, chunksize=1)
    return list(chunk1.read(1).keys())

def df_vars_convert_to_str(df, vars):
    for var in vars:
        df[var] = df[var].astype(str)
        

def tw_data_files_to_df_json_filter(files, filter_word, lines=True, float_dtype=None, verbose=False):
    '''append and concat filtered data into a pandas.DataFrame'''
    if type(filter_word) != list: raise ValueError("filter_word must be a list")

    df = []
    for file in files:
        if verbose: print('loading ' + file)  
        if file==files[0]:
            columns = get_columns_json(file)
            df_null = pd.DataFrame(columns=columns)
            
        df_file = pd.read_json(file, orient='records', lines=lines)
        if (len(filter_word) >1): idx = mark_tokens_contain_keywords(df_file, filter_word)
        else: idx = mark_tokens_contain_keyword(df_file, filter_word[0])
        df_file_filtered = df_file[idx]
        if len(df_file_filtered)>0:
            df.append(df_file_filtered)
    
    if len(df)==0: return df_null
    df = pd.concat(df, ignore_index=True)
    if float_dtype is None: return df
    return convert_floats(df, float_dtype)


In [5]:
def mark_var_in_valuelist(df, var, valuelist):
    # returns an index indicating whether variable var is in valuelist
    return df[var].apply(lambda x: x in valuelist)

In [6]:
def tw_data_files_to_df_json_match_id(files, varname_id, list_ids,
                                      lines=True, float_dtype=None, verbose=False):
    '''append and concat filtered data into a pandas.DataFrame'''
    if type(list_ids) != list: raise ValueError("list_ids must be a list")

    df = []
    for file in files:
        if verbose: print('loading ' + file)  
        if file==files[0]:
            columns = get_columns_json(file)
            df_null = pd.DataFrame(columns=columns)
            
        df_file = pd.read_json(file, orient='records', lines=lines)
        idx = mark_var_in_valuelist(df_file, varname_id, list_ids)
        df_file_filtered = df_file[idx]
        if len(df_file_filtered)>0:
            df.append(df_file_filtered)
    
    if len(df)==0: return df_null
    df = pd.concat(df, ignore_index=True)
    if float_dtype is None: return df
    return convert_floats(df, float_dtype)

    
    

In [7]:
cities = ['Minneapolis','LosAngeles','Denver']
city_filterwords = {'Minneapolis': ['Minneapolis', '#Minneapolis','mlps', ['St.', 'Paul']],
                    'LosAngeles':['LosAngeles','LA', 'L.A.', '#LA', ['Los', 'Angeles']],
                    'Denver': ['Denver', '#Denver']}

In [8]:
type(city_filterwords['Minneapolis'][0])
type(city_filterwords['LosAngeles'][3])
idx ={}
idx[str('a')] = [True,False,True]
idx[str(['a','b'])] = [False,False,True]

pd.DataFrame(idx).agg(max, axis=1).astype(bool)

0     True
1    False
2     True
dtype: bool

In [9]:
def mark_var_contain_filterwords(df, varname, filterwords):
    if type(filterwords) != list: raise ValueError("filterwords must be a list")
    idx = {}
    for word in filterwords:
        if type(word)==str:
            idx[str(word)] = df[varname].apply(lambda x: word.lower() in x)
        elif type(word)==list:
            # assess whether all components of 'word' are jointly present 
            loc_idx = [df[varname].apply(lambda x: w.lower() in x) for w in word]
            idx[str(word)] = pd.DataFrame(loc_idx).agg(min).astype(bool)
        else: raise ValueError('each item in filterwords must be str or list')
        # assess whether any component of 'filterwords' are present 
    return pd.DataFrame(idx).agg(max, axis=1).astype(bool)    

In [10]:
def retweet_files_by_city_json(files, cities, city_filterwords, data_path,
                               lines=True, float_dtype='float16', verbose=False):
    city_df = {}

    for file in files:
        if verbose: print('loading ' + file)  
        if file==files[0]:
            columns = get_columns_json(file)
            df_null = pd.DataFrame(columns=columns)
            for city in cities:
                city_df[city] = []
        
        df_file = pd.read_json(file, orient='records', lines=lines)
        df_vars_convert_to_str(df_file, ['RT_id','user_id','created_at','created_at_h'])
        convert_floats(df_file, float_dtype)
        
        for city in cities:
            filter_word = city_filterwords[city]    
            idx = mark_var_contain_filterwords(df_file, 'tokens', filter_word)
            if sum(idx)>0: city_df[city].append(df_file[idx])
    
    for city in cities:
        if len(city_df[city])==0: city_data = df_null
        else: city_data = pd.concat(city_df[city], ignore_index=True)
        filename = 'data_cumulative/city_date/' + city + '/retweet/2020_all_retweets' + '.json'
        city_data.to_json(data_path + filename, 
                          orient='records', lines=lines)
        print('updated: ', filename)


In [11]:
#retweet_files_by_city_json(files_retweet, cities, city_filterwords, data_path)

In [12]:
def get_unique_dates(df, varname):
    tmp = pd.to_datetime(df[varname]).dt.floor('d')
    dates = tmp.unique()
    dates_str = [str(date)[:10] for date in dates]
    return dates, dates_str

def filter_df_by_date(df, varname, date, var_as_string=True):
    tmp_df = df
    varname_d = varname + '_d'
    tmp_df[varname_d] = pd.to_datetime(tmp_df[varname]).dt.floor('d')
    filtered_df = tmp_df[tmp_df[varname_d] == pd.to_datetime(date)].drop(columns = [varname_d])
    if var_as_string: filtered_df[varname] = filtered_df[varname].astype(str)
    return filtered_df

def append_to_json(filename, df, lines=True):
    df0 = pd.read_json(filename, orient='records', lines=lines)
    return df0.append(df)


def original_files_by_city_date_json(files, cities, city_filterwords, data_path,
                               lines=True, float_dtype='float16', verbose=False,
                                    city_type='city', sample_frac = .05):
    
    if city_type not in ['city','all']: raise ValueError('city_type must be "city" or "all".')

    city_df = {}
    city_RT_ids = {}
    
    
    for city in cities:
        # retrieve relevant RT_id to match 
        filename = 'data_cumulative/city_date/' + city + '/retweet/2020_all_retweets' + '.json'
        RT_id = pd.read_json(data_path + filename, 
                             orient='records', lines=True).RT_id.astype(str)
        city_RT_ids[city] = list(RT_id)
    
    for file in files:
        if verbose: print('loading ' + file)  
        if file==files[0]:
            columns = get_columns_json(file)
            df_null = pd.DataFrame(columns=columns)
            for city in cities:
                city_df[city] = []
        
        df_file = pd.read_json(file, orient='records', lines=lines)
        df_vars_convert_to_str(df_file, ['id','RT_id','created_at','created_at_h'])
        convert_floats(df_file, float_dtype)
        
        for city in cities:
            if city_type=='city':
                if verbose: print('processing data for ' + city)  
                filter_word = city_filterwords[city]
                # idx1: 'tokens' containing filter_word
                idx1 = mark_var_contain_filterwords(df_file, 'tokens', filter_word)
                # idx2: relevant retweet's that are matched  
                idx2 = mark_var_in_valuelist(df_file, 'RT_id', city_RT_ids[city])
                # idx: either idx1 or idx2 being True
                idx = pd.DataFrame(data={'idx1':idx1, 'idx2': idx2}).agg(max, axis=1)
                print(sum(idx1),sum(idx2), sum(idx))
                if sum(idx)>0: city_df[city].append(df_file[idx])
            elif city_type=='all':
                city_df[city].append(df_file.sample(frac=sample_frac, replace=False))
            
    for city in cities:
        if len(city_df[city])==0: city_data = df_null
        else: city_data = pd.concat(city_df[city], ignore_index=True)
        dates, dates_str = get_unique_dates(city_data,'created_at_h')
        for date in dates_str:
            if verbose: print('processing date of ' + date)  
            df_date = filter_df_by_date(city_data, 'created_at_h', date)
            filename = 'data_cumulative/city_date/' + city + '/original/records_'+ date + '.json'
            new_file = glob(data_path + filename)==[]
            if new_file:
                df_date.to_json(data_path + filename, 
                              orient='records', lines=lines)
                print('created: ', filename)
            else:
                df_date = append_to_json(data_path + filename, df_date)
                df_date.to_json(data_path + filename, 
                              orient='records', lines=lines)
                print('appended: ', filename)

In [13]:
cities_all = ['all_v1','all_v2','all_v3','all_v4','all_v5']

files_retweet = glob(data_path + 'data_cumulative/retweet/2020_all_retweets.json')
df_retweet = pd.read_json(files_retweet[0], orient='records',lines=True)
for c in cities_all:
    filename = data_path + 'data_cumulative/city_date/' + c + '/retweet/2020_all_retweets.json'
    print(filename)
    df_retweet.to_json(filename, orient='records',lines=True)


/Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/city_date/all_v1/retweet/2020_all_retweets.json
/Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/city_date/all_v2/retweet/2020_all_retweets.json
/Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/city_date/all_v3/retweet/2020_all_retweets.json
/Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/city_date/all_v4/retweet/2020_all_retweets.json
/Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/city_date/all_v5/retweet/2020_all_retweets.json


In [14]:
files_original = glob(data_path + 'data_cumulative/original/*')
#original_files_by_city_date_json(files_original[:3], cities, city_filterwords, data_path, verbose=True)
files_original[:10]
# original_files_by_city_date_json(files_original, cities_all, [], 
#                                  data_path, verbose=False, 
#                                  city_type='all', sample_frac=.1)



['/Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/original/created_at_2020-07-09_11:00:00.json',
 '/Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/original/created_at_2020-06-30_13:00:00.json',
 '/Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/original/created_at_2020-07-01_10:00:00.json',
 '/Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/original/created_at_2020-07-14_16:39:52.json',
 '/Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/original/created_at_2020-07-15_09:49:43.json',
 '/Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/original/created_at_2020-07-19_23:42:33.json',
 '/Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/original/created_at_2020-07-02_15:00:00.json',
 '/Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/original/created_at_2020-07-02_22:00:00.json',
 '/Users/kotaminegishi/b

In [40]:

def words_files_by_city_date_json(files_words, cities, data_path,
                                  process_datetime, process_days = 14,
                                  lines=True, verbose=False):
    city_df = {}
    city_ids = {}
    for city in cities:
        files_city_original = keep_recent_files(
            glob(data_path + "data_cumulative/city_date/" + city  + "/original/*"),
            prefix = 'records_', base_timestamp = process_datatime, days=7)
        tmp_ids = []
        for file in files_city_original:
            # retrieve relevant id to match
            if verbose: print('reading ids from ' + file)
            ids = pd.read_json(file, orient='records', lines=True).id.astype(str)
            tmp_ids.append(ids)
        city_ids[city] = list(pd.concat(tmp_ids, ignore_index=True))
    
    for file in files_words:
        if verbose: print('loading ' + file)  
        if file==files_words[0]:
            columns = get_columns_json(file)
            df_null = pd.DataFrame(columns=columns)
            for city in cities:
                city_df[city] = []
        
        df_file = pd.read_json(file, orient='records', lines=lines)
        df_vars_convert_to_str(df_file, ['id','created_at_h'])
        
        for city in cities:
            if verbose: print('processing data for ' + city)  
            # idx: relevant original tweet's that are matched  
            idx = mark_var_in_valuelist(df_file, 'id', city_ids[city])
            print(sum(idx))
            if sum(idx)>0: city_df[city].append(df_file[idx])
    
    for city in cities:
        if len(city_df[city])==0: city_data = df_null
        else: city_data = pd.concat(city_df[city], ignore_index=True)
        dates, dates_str = get_unique_dates(city_data, 'created_at_h')
        for date in dates_str:
            if verbose: print('processing date of ' + date)  
            df_date = filter_df_by_date(city_data, 'created_at_h', date)
            filename = 'data_cumulative/city_date/' + city + '/words/records_'+ date + '.json'
            new_file = glob(data_path + filename)==[]
            if new_file:
                df_date.to_json(data_path + filename, 
                              orient='records', lines=lines)
                print('created: ', filename)
            else:
                df_date = append_to_json(data_path + filename, df_date)
                df_date.to_json(data_path + filename, 
                              orient='records', lines=lines)
                print('appended: ', filename)

In [11]:


#words_files_by_city_date_json(files_words[:5], cities, data_path,
#                                  process_datetime, process_days = 14,
#                                  verbose=True)


In [18]:
tmp_df = pd.read_json(files_original[0], orient='records',lines=True)
tmp_ids = tmp_df.id[:1000] 

def keep_by_matched_id(df, list_id, varname='id'):
    return (df.set_index(varname)
            .join(pd.DataFrame(data={varname: list_id}).set_index(varname), how='inner')
            .reset_index()
            )

keep_by_matched_id(tmp_df, list(tmp_ids), varname='id')


Unnamed: 0,id,created_at,is_retweet,RT_id,RT_retweet_count,user_id,user_name,followers_count,following_count,text,quoted_text,RT_text,t_co,tags,urls,lang,created_at_h,tokens
0,1281271899784228864,2020-07-09 11:00:00,False,,0,376307465,ClimateCost,1646,1221,"Peggy Shepard, @WEACT4EJ1, sheds light on how ...",,,[],"[#blacklivesmatter, #BLM]",[],en,2020-07-09 11:00:00,"[peggy, shepard,, shed, light, environmental, ..."
1,1281271900015071232,2020-07-09 11:00:00,True,1281266982927597568,1,1054013731905196032,Denaldo1mill,8,126,,,Black Lives Matter: How can @thecsp members ad...,[],[],[],en,2020-07-09 11:00:00,[]
2,1281271899985793024,2020-07-09 11:00:00,True,1281021507083231232,4527,487769182,suzost,22426,20750,,,,[],[],[],en,2020-07-09 11:00:00,[]
3,1281271900291776512,2020-07-09 11:00:00,True,1281271080116563968,7,1264048424501932032,BlackAfroNinjaX,366,372,,,Here’s an idea! Arrest the cops who killed #Br...,[],[],[],en,2020-07-09 11:00:00,[]
4,1281271900593938432,2020-07-09 11:00:00,False,,0,294596045,JoseEColon,91,202,@NYCMayor And yet countless of black babies ar...,,,[],[#BlackLivesMatter],[],en,2020-07-09 11:00:00,"[yet, countless, black, baby, murder, nyc, day..."
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
995,1281281865706545152,2020-07-09 11:39:36,True,1281261692463112193,17,1220730760690372608,Badpanda122,1908,2229,,,1.\nThis is convicted #terrorist #SusanRosenbu...,[],[],[],en,2020-07-09 11:00:00,[]
996,1281281865668796416,2020-07-09 11:39:36,False,,0,2999967663,ASRomaEN,575725,15795,#ASRoma is auctioning off a @ChrisSmalling mat...,,,"[https://t.co/NFWT5Vl2cH, https://t.co/LESOsEE...","[#ASRoma, #BlackLivesMatter]",[],en,2020-07-09 11:00:00,"[#asroma, auction, match-worn, shirt, sleeve, ..."
997,1281281867090722816,2020-07-09 11:39:36,True,1281278590810808322,424,2981671296,JenDSchneider,520,2785,,Our city isn’t just painting the words on Fift...,,[https://t.co/VE6MT80qDI],[#BlackLivesMatter],[],en,2020-07-09 11:00:00,"[city, isn’t, paint, word, fifth, avenue., we’..."
998,1281281868625821696,2020-07-09 11:39:37,True,1281249485214146562,170,791445473424998400,ga_undrdawg,4823,4558,,,,[],[],[],en,2020-07-09 11:00:00,[]


In [17]:
def keep_by_matched_id(df, list_id, varname='id'):
    return (df.set_index(varname)
            .join(pd.DataFrame(data={varname: list_id}).set_index(varname), how='inner')
            .reset_index()
            )


def files_id_matched_by_city_date_json(
    files, cities, data_path, folder, process_datetime, process_days = 14,
    file_type ='.json', float_dtype='float16', lines=True, verbose=False):

    '''
    Looks for recent files in /city_date/[city]/original/*, extract relevant ids,
    generate data matched with those ids by city, and create data files  
    '''
    if file_type not in ['.json', '.csv'] :
        raise ValueError('file_type must be either json or csv')
            
    city_df = {}
    city_ids = {}
    for city in cities:
        files_city_original = keep_recent_files(
            glob(data_path + "data_cumulative/city_date/" + city  + "/original/*"),
            prefix = 'records_', file_type= '.json', 
            base_timestamp = process_datetime, days=process_days)
        tmp_ids = []
        for file in files_city_original:
            # retrieve relevant id to match
            if verbose: print('reading ids from ' + file)
            ids = pd.read_json(file, orient='records', lines=True).id.astype(str)
            tmp_ids.append(ids)
        city_ids[city] = list(pd.concat(tmp_ids, ignore_index=True)) if len(tmp_ids)>0 else []
    
    for file in files:
        if verbose: print('loading ' + file)  
        if file==files[0]:
            columns = get_columns_json(file) if file_type =='.json' else get_columns_csv(file)
            df_null = pd.DataFrame(columns=columns)
            for city in cities:
                city_df[city] = []
        
        if file_type =='.json': 
            df_file = pd.read_json(file, orient='records', lines=lines)
        elif file_type =='.csv': 
            df_file = pd.read_csv(file)

        df_vars_convert_to_str(df_file, ['id','created_at_h'])
        convert_floats(df_file, float_dtype)

        for city in cities:
            # tmp_df: relevant original tweet's that are matched  
            idx = mark_var_in_valuelist(df_file, 'id', city_ids[city])
            tmp_df = keep_by_matched_id(df_file, city_ids[city], varname='id')
            if verbose: print('matched data for ' + city + ': ' + str(len(tmp_df)) + ' records')  
            if len(tmp_df)>0: city_df[city].append(tmp_df)
    
    for city in cities:
        if len(city_df[city])==0: city_data = df_null
        else: city_data = pd.concat(city_df[city], ignore_index=True)
        dates, dates_str = get_unique_dates(city_data, 'created_at_h')
        for date in dates_str:
            if verbose: print('processing date of ' + date)  
            df_date = filter_df_by_date(city_data, 'created_at_h', date)
            filename = 'data_cumulative/city_date/' + city + '/' + folder + '/records_'+ date + file_type
            new_file = glob(data_path + filename)==[]
            if file_type =='.json': 
                if not new_file:
                    df_date = append_to_json(data_path + filename, df_date)
                df_date.to_json(data_path + filename, 
                              orient='records', lines=lines)
            if file_type =='.csv':
                mode = 'a' if new_file else 'w'
                df_date.to_csv(data_path + filename, index=False, mode=mode)
            if new_file: print('created: ', filename)
            else: print('appended: ', filename)

In [20]:
files_sentiments = glob(data_path + 'data_cumulative/sentiments/*')
files_sentiments[:5]
#files_id_matched_by_city_date_json(
#    files_sentiments[:10], cities, data_path, 'sentiments', 
#    process_datetime, process_days = 14,
#    file_type='.csv', verbose=True)


files_id_matched_by_city_date_json(
    files_sentiments[:5], cities_all, data_path, 'sentiments', 
    process_datetime= pd.to_datetime(datetime(2020,7,20)), 
    process_days = 5,
    file_type='.csv', verbose=True)

reading ids from /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/city_date/all_v1/original/records_2020-07-17.json
reading ids from /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/city_date/all_v1/original/records_2020-07-20.json
reading ids from /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/city_date/all_v1/original/records_2020-07-16.json
reading ids from /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/city_date/all_v1/original/records_2020-07-13.json
reading ids from /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/city_date/all_v1/original/records_2020-07-15.json
reading ids from /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/city_date/all_v1/original/records_2020-07-19.json
reading ids from /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/city_date/all_v1/original/records_2020-07-18.json
reading ids from /Us

KeyboardInterrupt: 

In [48]:
files_emotions = glob(data_path + 'data_cumulative/emotions/*')
files_emotions[:5]

#files_id_matched_by_city_date_json(
#    files_emotions[:10], cities, data_path, 'emotions', 
#    process_datetime, process_days = 14,
#    file_type='.csv', verbose=True)

files_id_matched_by_city_date_json(
    files_emotions, cities_all, data_path, 'emotions', 
    process_datatime, process_days = 30,
    file_type='.csv', verbose=True)

reading ids from /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/city_date/all_v1/original/records_2020-07-15.json
reading ids from /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/city_date/all_v1/original/records_2020-07-19.json
reading ids from /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/city_date/all_v1/original/records_2020-07-14.json
reading ids from /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/city_date/all_v2/original/records_2020-07-15.json
reading ids from /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/city_date/all_v2/original/records_2020-07-19.json
reading ids from /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/city_date/all_v2/original/records_2020-07-14.json
reading ids from /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/city_date/all_v3/original/records_2020-07-15.json
reading ids from /Us

  if (await self.run_code(code, result,  async_=asy)):


processing data for all_v1
0
processing data for all_v2
0
processing data for all_v3
0
processing data for all_v4
0
processing data for all_v5
0
loading /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/emotions/created_at_2020-07-13_12:38:18.csv
processing data for all_v1
0
processing data for all_v2
0
processing data for all_v3
0
processing data for all_v4
0
processing data for all_v5
0
loading /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/emotions/created_at_2020-07-19_02:19:37.csv
processing data for all_v1
0
processing data for all_v2
0
processing data for all_v3
0
processing data for all_v4
0
processing data for all_v5
0
loading /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/emotions/created_at_2020-07-15_22:37:55.csv
processing data for all_v1
0
processing data for all_v2
0
processing data for all_v3
0
processing data for all_v4
0
processing data for all_v5
0
loading /Users/kotaminegishi/big_data_trainin

In [49]:
files_words = glob(data_path + 'data_cumulative/words/*')
files_words[:5]

#files_id_matched_by_city_date_json(
#    files_words[:3], cities, data_path, 'words', 
#    process_datetime, process_days = 14,
#    file_type='.json', verbose=True)

files_id_matched_by_city_date_json(
    files_words, cities_all, data_path, 'words', 
    process_datatime, process_days = 30,
    file_type='.json', verbose=True)



reading ids from /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/city_date/all_v1/original/records_2020-07-15.json
reading ids from /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/city_date/all_v1/original/records_2020-07-19.json
reading ids from /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/city_date/all_v1/original/records_2020-07-14.json
reading ids from /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/city_date/all_v2/original/records_2020-07-15.json
reading ids from /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/city_date/all_v2/original/records_2020-07-19.json
reading ids from /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/city_date/all_v2/original/records_2020-07-14.json
reading ids from /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/city_date/all_v3/original/records_2020-07-15.json
reading ids from /Us

0
processing data for all_v3
0
processing data for all_v4
0
processing data for all_v5
0
loading /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/words/created_at_2020-07-11_15:00:00.json
processing data for all_v1
0
processing data for all_v2
0
processing data for all_v3
0
processing data for all_v4
0
processing data for all_v5
0
loading /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/words/created_at_2020-06-29_23:00:00.json
processing data for all_v1
0
processing data for all_v2
0
processing data for all_v3
0
processing data for all_v4
0
processing data for all_v5
0
loading /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/words/created_at_2020-06-29_14:00:00.json
processing data for all_v1
0
processing data for all_v2
0
processing data for all_v3
0
processing data for all_v4
0
processing data for all_v5
0
loading /Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/words/created_at_2020-07-1

In [27]:
def keep_recent_files(files, base_timestamp, file_type= '.json', days = 14, no_newer=False,
                      prefix = 'created_at_'):
    timestamps = [pd.Timestamp(file.split(prefix,1)[1]
                               .replace(file_type,'').replace('_',' ')) for file in files ]
    if no_newer: 
        keep_idx1 = [(base_timestamp - timestamp <= pd.Timedelta(days, unit='d')) & 
                     (base_timestamp - timestamp >= pd.Timedelta(0, unit='d')) for timestamp in timestamps]
    else: 
        keep_idx1 = [base_timestamp - timestamp <= pd.Timedelta(days, unit='d') for timestamp in timestamps]
    return(list(itertools.compress(files,keep_idx1)))



In [54]:
files_all_sentiments2 = keep_recent_files(
    glob(data_dest + 'data_cumulative/sentiments/*'), 
    base_timestamp = pd.to_datetime(datetime(2020, 7, 15, 23, 59)), 
    file_type= '.csv', days = 3, no_newer=True)



In [55]:
files_all_sentiments2 
#pd.to_datetime(datetime(2020, 7, 15, 1)) - pd.to_datetime(datetime(2020, 7, 15, 2)) >= pd.Timedelta(0, unit='d')

['/Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/sentiments/created_at_2020-07-13_22:55:13.csv',
 '/Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/sentiments/created_at_2020-07-13_12:38:18.csv',
 '/Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/sentiments/created_at_2020-07-15_22:37:55.csv',
 '/Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/sentiments/created_at_2020-07-14_15:37:12.csv',
 '/Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/sentiments/created_at_2020-07-15_13:12:47.csv',
 '/Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/sentiments/created_at_2020-07-15_15:18:20.csv',
 '/Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/sentiments/created_at_2020-07-15_23:40:54.csv',
 '/Users/kotaminegishi/big_data_training/python/dash_demo1/data_cumulative/sentiments/created_at_2020-07-14_17:42:30.csv',
 '/Users/kotamin