# Process data & Store into Redis

In [557]:
import json
import os
import pandas as pd

## process data
Select the attributes to be stored.

In [558]:
# selected attributes in tweets
twt_attributes = ['created_at',
                       'id_str',
                       'text',
                       'in_reply_to_status_id_str',
                       'in_reply_to_user_id_str',
                       'in_reply_to_screen_name',
                       'quoted_status_id_str',
                       # 'quoted_status',
                       'quote_count',
                       'reply_count',
                       'retweet_count',
                       'favorite_count',
                       'entities.hashtags',
                       'extended_entities',
                       'user.id_str', # for matching
                       'user.name',
                       'user.screen_name',
                       'user.url',
                       'user.description',
                       'user.protected',
                       'user.verified',
                       'user.followers_count',
                       'user.friends_count',
                       'user.listed_count',
                       'user.favourites_count',
                       'user.statuses_count',
                       'user.created_at'
                      ]
len(twt_attributes) # 26

26

In [559]:
# so the selected retweets attributes has an extra "retweeted_status.id_str"
retwt_attributes = ['created_at',
                    'id_str',
                    'text',
                    'in_reply_to_status_id_str',
                    'in_reply_to_user_id_str',
                    'in_reply_to_screen_name',
                    'quoted_status_id_str',
                    # 'quoted_status',
                    'quote_count',
                    'reply_count',
                    'retweet_count',
                    'favorite_count',
                    'entities.hashtags',
                    'extended_entities',
                    'retweeted_status.id_str',
                    'user.id_str', # for matching
                    'user.name',
                    'user.screen_name',
                    'user.url',
                    'user.description',
                    'user.protected',
                    'user.verified',
                    'user.followers_count',
                    'user.friends_count',
                    'user.listed_count',
                    'user.favourites_count',
                    'user.statuses_count',
                    'user.created_at'
                   ]
len(retwt_attributes) # 27

27

In [560]:
# selected attributes in user
user_attr_tracking = ['id_str',
                      'name',
                      'screen_name',
                      'url',
                      'description',
                      'protected',
                      'verified',
                      'followers_count',
                      'friends_count',
                      'listed_count',
                      'favourites_count',
                      'statuses_count',
                      'created_at']

len(user_attr_tracking) #13

13

### user_store function

In [561]:
def user_store(user, userid_table):
    
    # if user has not been seen before, add id to userid_table
    if user['id_str'] not in userid_table:
        userid_table.append(user['id_str'])
        
        # update datastores with defined tracking attributes
        user_attr_selected = {key: user[key] for key in user_attr_tracking if key in user}
        user_list.append(user_attr_selected)

In [562]:
# test test

twt_id_record = []
userid_table = [] # store userid
user_list = [] # store every attributes of user needed tracking

with open("./data/corona-out-2", "r") as f1:
    for line in f1:
        try:
            data = json.loads(line)
            if data["id_str"] not in twt_id_record:
                twt_id_record.append(data["id_str"])
            # if tweet has been seen before 
            else:
                # ignore this tweet, go to next iteration of loop
                continue
            
            user = data['user']
            user_store(user, userid_table)
        except:
            continue

In [563]:
len(user_list)

15505

### tweets / retweet store

In [564]:
# this is a partial template for processing the dataset to store into the datastores.
# you will need to write functions for the psuedocode sections and invoke them here

#insert path and replace name of the file below as needed

twt_id_record = []
userid_table = [] # store userid
user_list = [] # store tracking attributes of user
retwt_list = [] # list to store retweets
retwt_lines = 0 # count # of retweets
twt_list = []
twt_lines = 0

with open("./data/corona-out-2", "r") as f1:
    for line in f1:
        try:
            data = json.loads(line)
            if data["id_str"] not in twt_id_record:
                twt_id_record.append(data["id_str"])
            # if tweet has been seen before 
            else:
                # ignore this tweet, go to next iteration of loop
                continue
            
            user = data['user']
            user_store(user, userid_table)
            
            
            if ( data['text'].startswith('RT') ):
                # try:
                    # update retweet information
                    selected_dict = {}
                    retwt_lines = retwt_lines + 1
                    for attribute in retwt_attributes:
                        try:
                            # assigns the reference: any changes made to one will affect the other
                            current_dict = data
                            current_selected_dict = selected_dict
                            # parent-child attributes
                            for sub_attribute in attribute.split('.'):
                                # parent attributes
                                current_dict = current_dict[sub_attribute]
                                # child attributes
                                if sub_attribute == attribute.split('.')[-1]:
                                    current_selected_dict[sub_attribute] = current_dict
                                else:
                                    if sub_attribute not in current_selected_dict:
                                        current_selected_dict[sub_attribute] = {}
                                    current_selected_dict = current_selected_dict[sub_attribute]
                        except:
                            continue
                    retwt_list.append(selected_dict)
                # except:
                    # continue

                # pass  # does nothing right now
                
            else:
                # add the new tweet to datastore
                    selected_dict = {}
                    twt_lines = twt_lines + 1
                    for attribute in twt_attributes:
                        try:
                            # assigns the reference: any changes made to one will affect the other
                            current_dict = data
                            current_selected_dict = selected_dict
                            # parent-child attributes
                            for sub_attribute in attribute.split('.'):
                                # parent attributes
                                current_dict = current_dict[sub_attribute]
                                # child attributes
                                if sub_attribute == attribute.split('.')[-1]:
                                    current_selected_dict[sub_attribute] = current_dict
                                else:
                                    if sub_attribute not in current_selected_dict:
                                        current_selected_dict[sub_attribute] = {}
                                    current_selected_dict = current_selected_dict[sub_attribute]
                        except:
                            continue
                    twt_list.append(selected_dict)
        except:
            continue

In [701]:
def parse_date(date_string):
    # parse the created_at attributes to datetime formart
    
    date_format = "%a %b %d %H:%M:%S %z %Y"
    date_object = time.strptime(date_string, date_format)
    
    year = date_object.tm_year
    month = date_object.tm_mon
    day = date_object.tm_mday
    
    new_date_object = datetime(year=year, month=month, day=day)
    formatted_date = new_date_object.strftime("%Y-%m-%d")
    
    return(formatted_date)

In [703]:
for i in range(len(twt_list)):
    twt_list[i]['created_at'] = parse_date(twt_list[i]['created_at'])
    twt_list[i]['user']['created_at'] = parse_date(twt_list[i]['user']['created_at'])

In [709]:
for i in range(len(retwt_list)):
    retwt_list[i]['created_at'] = parse_date(retwt_list[i]['created_at'])
    retwt_list[i]['user']['created_at'] = parse_date(retwt_list[i]['user']['created_at'])

In [565]:
len(userid_table)

15505

In [566]:
len(user_list)

15505

In [697]:
len(twt_list)

7349

In [705]:
user_list[1]

{'id_str': '1225145123920588805',
 'name': 'efe09',
 'screen_name': 'efe0927183508',
 'url': None,
 'description': "Allah'ın en değerli eseri insandır.\nCanı yanan sabretsin.\nCan yakan, canının yanacağı günü beklesin..\n677 khk\nRT düşüncem olduğu anlamına gelmez bilgi amaçlıdır",
 'protected': False,
 'verified': False,
 'followers_count': 653,
 'friends_count': 983,
 'listed_count': 0,
 'favourites_count': 1255,
 'statuses_count': 4177,
 'created_at': 'Wed Feb 05 19:52:38 +0000 2020'}

In [706]:
twt_lines

7349

In [707]:
len(twt_list)

7349

In [708]:
twt_list[1]

{'created_at': '2020-04-12',
 'id_str': '1249403771261722624',
 'text': '#Bo der Osterhase hat nicht Corona, er ist gekommen.',
 'in_reply_to_status_id_str': None,
 'in_reply_to_user_id_str': None,
 'in_reply_to_screen_name': None,
 'quote_count': 0,
 'reply_count': 0,
 'retweet_count': 0,
 'favorite_count': 0,
 'entities': {'hashtags': [{'text': 'Bo', 'indices': [0, 3]}]},
 'user': {'id_str': '1171484224244744198',
  'name': 'Elfpunkt',
  'screen_name': 'meElfpunkt',
  'url': None,
  'description': 'Mal 1 mal 0 mal Tilt',
  'protected': False,
  'verified': False,
  'followers_count': 3,
  'friends_count': 53,
  'listed_count': 0,
  'favourites_count': 72,
  'statuses_count': 60,
  'created_at': '2019-09-10'}}

In [710]:
retwt_lines

11157

In [711]:
len(retwt_list)

11157

In [712]:
retwt_list[1]

{'created_at': '2020-04-12',
 'id_str': '1249403768023678982',
 'text': 'RT @lale_karanfil: In Turkey, there are 300 thousand prisoners and 150 thousand prison employees in prisons. \nPrisons are the most risky pl…',
 'in_reply_to_status_id_str': None,
 'in_reply_to_user_id_str': None,
 'in_reply_to_screen_name': None,
 'quote_count': 0,
 'reply_count': 0,
 'retweet_count': 0,
 'favorite_count': 0,
 'entities': {'hashtags': []},
 'retweeted_status': {'id_str': '1249397541596286979'},
 'user': {'id_str': '1225145123920588805',
  'name': 'efe09',
  'screen_name': 'efe0927183508',
  'url': None,
  'description': "Allah'ın en değerli eseri insandır.\nCanı yanan sabretsin.\nCan yakan, canının yanacağı günü beklesin..\n677 khk\nRT düşüncem olduğu anlamına gelmez bilgi amaçlıdır",
  'protected': False,
  'verified': False,
  'followers_count': 653,
  'friends_count': 983,
  'listed_count': 0,
  'favourites_count': 1255,
  'statuses_count': 4177,
  'created_at': '2020-02-05'}}

In [574]:
# # # encapsulation test: don't know why it didn't work as a function

# # store tweets/retweets data & count nums
# def retwt_store(data):
#     # list for store tweets/retweets data
#     selected_dict = {}
#     retwt_lines = retwt_lines + 1
#     for attribute in retwt_attributes:
#         try:
#             # assigns the reference: any changes made to one will affect the other
#             current_dict = data
#             current_selected_dict = selected_dict
#             # parent-child attributes
#             for sub_attribute in attribute.split('.'):
#                 # parent attributes
#                 current_dict = current_dict[sub_attribute]
#                 # child attributes
#                 if sub_attribute == attribute.split('.')[-1]:
#                     current_selected_dict[sub_attribute] = current_dict
#                 else:
#                     if sub_attribute not in current_selected_dict:
#                         current_selected_dict[sub_attribute] = {}
#                     current_selected_dict = current_selected_dict[sub_attribute]
#         except:
#             continue
#     retwt_list.append(selected_dict)

## Store in Redis (test: user_list; tweet)

In [713]:
import redis # for cache
import pymongo # for NoSQL data store

from redis.commands.json.path import Path
import redis.commands.search.aggregation as aggregations
import redis.commands.search.reducers as reducers
from redis.commands.search.field import TextField, NumericField, TagField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.query import NumericFilter, Query

# connect to a Redis cluster
from redis.cluster import RedisCluster
from string import ascii_letters
import random

In [714]:
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
redis_pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True)

In [774]:
# initiate: remove all the data exists.
for key in redis_client.keys():
    redis_client.delete(key)

In [775]:
# check keys in the Redis database
len(redis_client.keys())

0

In [748]:
import time
import datetime
from datetime import datetime, date
from typing import List, Optional
# Redis OM uses Pydantic to validate data based on the type annotations assigned to fields in a model class.
from pydantic import BaseModel, NonNegativeInt, HttpUrl, AnyUrl, ValidationError # every Redis OM model is also a Pydantic model
from redis_om import (EmbeddedJsonModel, Field, JsonModel, HashModel, Migrator, NotFoundError, get_redis_connection)

[Field Types supported by Pydantic](https://docs.pydantic.dev/usage/types/)

In [718]:
del twt_user_model

In [750]:
date.today()

datetime.date(2023, 4, 22)

In [776]:
# build an embedded sub-model twt_user with Redis OM

class twt_user_model(EmbeddedJsonModel):
    id_str: str = Field(index=True)
    name: str = Field(index=True, full_text_search=True)
    screen_name: str = Field(index=True, full_text_search=True)
    url: Optional[AnyUrl]
    description: Optional[str] = Field(index=False, full_text_search=True) # perform full text searches on the values
    protected: Optional[bool]
    verified: Optional[bool]
    followers_count: Optional[NonNegativeInt] = Field(index=True)
    friends_count: Optional[NonNegativeInt] = Field(index=True)
    listed_count: Optional[NonNegativeInt]
    favourites_count: Optional[NonNegativeInt]
    statuses_count: Optional[NonNegativeInt] = Field(index=True)
    created_at: date = Field(index=True)
    
    class Meta:
        # A redis.asyncio.Redis or redis.Redis client instance that the model will use to communicate with Redis.
        database = get_redis_connection()

In [777]:
# build embedded sub-models for entitlies (set hashtags as index)
class HashtagModel(EmbeddedJsonModel):
    text: Optional[str] = Field(index=True, full_text_search=True, default="noHashTag") # added a default here
    indices: Optional[list[int]]
    
    class Meta:
        # A redis.asyncio.Redis or redis.Redis client instance that the model will use to communicate with Redis.
        database = get_redis_connection()

class EntitiesModel(EmbeddedJsonModel):
    hashtags: HashtagModel
    
    class Meta:
        # A redis.asyncio.Redis or redis.Redis client instance that the model will use to communicate with Redis.
        database = get_redis_connection()

In [778]:
# # use Migrator to create the indexes for any models that have indexed fields
# Migrator().run()

In [779]:
# # check storage status: should be 15505
# len(redis_client.keys())

### twt_model for original tweets

In [780]:
# build a twt_model (JsonModel) for original tweet data
class twt_model(JsonModel):
    created_at: date = Field(index=True) # so many outdated docs...finally find this...orz
    id_str: str = Field(index=True)
    text: str = Field(index=True, full_text_search=True) # perform full text searches on the values
    in_reply_to_status_id_str: Optional[str] = Field(index=False, full_text_search=True)
    in_reply_to_user_id_str: Optional[str] = Field(index=False, full_text_search=True)
    in_reply_to_screen_name: Optional[str] = Field(index=False, full_text_search=True)
    quoted_status_id_str: Optional[str] = Field(index=False, full_text_search=True)
    # quoted_status: Optional[str]
    quote_count: Optional[NonNegativeInt] = Field(index=True)
    reply_count: Optional[NonNegativeInt] = Field(index=True)
    retweet_count: Optional[NonNegativeInt] = Field(index=True)
    favorite_count: Optional[NonNegativeInt] = Field(index=True)
    entities: Optional[dict] = Field(index=True)
    entities: EntitiesModel
    # extended_entities: Optional[dict]
    user: twt_user_model
    
    class Meta:
        # A redis.asyncio.Redis or redis.Redis client instance that the model will use to communicate with Redis.
        database = get_redis_connection()

In [781]:
# store twt_list data into Redis

for i in range(len(twt_list)):
    new_twt = twt_model(**twt_list[i])
    new_twt.save()
    new_twt.expire(24 * 3600) # time to live: 1 day

In [782]:
len(twt_list) # 7349

7349

In [783]:
# use Migrator to create the indexes for any models that have indexed fields
Migrator().run()

In [784]:
# check storage status: should be 7359
len(redis_client.keys())

7355

### retwt_model for retweets

## Querying: by & select

### by pk

In [594]:
# twt_model query by key

twt_model.get("01GYK29XNVBMVRBCYRMAJYGQ4V").dict() # Models generate a globally unique primary key automatically without needing to talk to Redis.

{'pk': '01GYK29XNVBMVRBCYRMAJYGQ4V',
 'created_at': 'Sun Apr 12 18:28:56 +0000 2020',
 'id_str': '1249404147167686657',
 'text': '@muhaust Negatif corona positif digigit buaya https://t.co/7hlRgSPUL9',
 'in_reply_to_status_id_str': '1247388438875127809',
 'in_reply_to_user_id_str': '111868855',
 'in_reply_to_screen_name': 'muhaust',
 'quoted_status_id_str': None,
 'quote_count': 0,
 'reply_count': 0,
 'retweet_count': 0,
 'favorite_count': 0,
 'entities': {'pk': '01GYK29XNV8BJ2D954PYAZXY4X',
  'hashtags': {'pk': '01GYK29XNV0S31V0BZW2BGZXXQ',
   'text': None,
   'indices': None}},
 'user': {'pk': '01GYK29XNVVTHQ3TM5ZSQP1H0H',
  'id_str': '984014159967870976',
  'name': '🌙',
  'screen_name': 'itsmoonchild_d',
  'url': None,
  'description': 'who iam?👆',
  'protected': False,
  'verified': False,
  'followers_count': 105,
  'friends_count': 105,
  'listed_count': 0,
  'favourites_count': 4262,
  'statuses_count': 769,
  'created_at': 'Wed Apr 11 10:23:52 +0000 2018'},
 'extended_entities'

### by user

In [758]:
twt_model.find(twt_model.user.screen_name == "JWalters777").all()

[twt_model(pk='01GYKJQMJ4HFRKX1S8K5KCQXJW', created_at=datetime.date(2020, 4, 12), id_str='1249407640553349123', text='Oh fuck this government. Seriously. Fuck these Tory cunts. That is outrageous', in_reply_to_status_id_str=None, in_reply_to_user_id_str=None, in_reply_to_screen_name=None, quoted_status_id_str='1249318984538886145', quote_count=0, reply_count=0, retweet_count=0, favorite_count=0, entities=EntitiesModel(pk='01GYKJQMJ401XVS46FY4XFCNEY', hashtags=HashtagModel(pk='01GYKJQMJ467SFZCKZHEESGBTQ', text='noHashTag', indices=None)), user=twt_user_model(pk='01GYKJQMJ4G88HQCC0Y9BCYJ1K', id_str='373455776', name='Jacob Walters', screen_name='JWalters777', url=None, description='Brentford FC. 23. London Town', protected=False, verified=False, followers_count=590, friends_count=318, listed_count=9, favourites_count=8312, statuses_count=33107, created_at=datetime.date(2011, 9, 14)))]

In [621]:
# test = twt_model.find(twt_model.user.screen_name == "Sumaj_Warmi").all()
# test[0].entities["hashtags"][0]['text'] # extract text from entities: dict -> list -> dict

In [759]:
twt_model.find(twt_model.user.id_str == "1070312218070179840").all()

[twt_model(pk='01GYKJQTNZW93D0FTTV9B402RA', created_at=datetime.date(2020, 4, 12), id_str='1249408805734993920', text="@DrakonmanTheFox Don't let it mutated to Corona extra", in_reply_to_status_id_str='1249405700369010690', in_reply_to_user_id_str='830979703750160384', in_reply_to_screen_name='DrakonmanTheFox', quoted_status_id_str=None, quote_count=0, reply_count=0, retweet_count=0, favorite_count=0, entities=EntitiesModel(pk='01GYKJQTNZ5EE5K1485XMCNQCJ', hashtags=HashtagModel(pk='01GYKJQTNZWTQ38TY8DAMKS6Z8', text='noHashTag', indices=None)), user=twt_user_model(pk='01GYKJQTNZVBX0SVCG26SAB6HP', id_str='1070312218070179840', name='\u200b\u200bColib〘 CEO of ... 〙', screen_name='colibfox', url=None, description='１5/ mele♂️ / ➡️he/him⬅ ️/ ♈ / non-religious / bi / 🐾single👣/ DMs are open / fox / memes /⚠️ may post or RT NSFW, 98% SFW ⚠️', protected=False, verified=False, followers_count=196, friends_count=95, listed_count=3, favourites_count=3348, statuses_count=2520, created_at=datetime.da

In [760]:
# by screen name

twt_model.find(twt_model.user.screen_name == "Sumaj_Warmi").all()

[twt_model(pk='01GYKJQ9AEAP6MEH30QCHVS1RM', created_at=datetime.date(2020, 4, 12), id_str='1249404894362165248', text='Bien! Tarija presenta su bunker para evitará dispersar a los pacientes sospechosos de #Coronavirus, y así prevenga… https://t.co/69NYO2L46L', in_reply_to_status_id_str=None, in_reply_to_user_id_str=None, in_reply_to_screen_name=None, quoted_status_id_str='1249000777504489478', quote_count=0, reply_count=0, retweet_count=0, favorite_count=0, entities=EntitiesModel(pk='01GYKJQ9AE2PZD0QSKM6DYV44A', hashtags=HashtagModel(pk='01GYKJQ9AEZT447TA9W9XH2JGN', text='noHashTag', indices=None)), user=twt_user_model(pk='01GYKJQ9AEZACH5K3HSAXJAW1Z', id_str='328260861', name='👑🗼𝐏𝐀𝐓𝐑𝐈𝐂𝐈𝐀 🗼👑', screen_name='Sumaj_Warmi', url=AnyUrl('https://www.facebook.com/SumajWarmi21F/', scheme='https', host='www.facebook.com', tld='com', host_type='domain', path='/SumajWarmi21F/'), description='La solidaridad como valor de vida', protected=False, verified=False, followers_count=13323, friends_count=2

### by str

In [790]:
# search by str

len(twt_model.find(twt_model.text % "happy").all())

56

### by hashtag

In [762]:
len(twt_model.find(twt_model.entities.hashtags.text == "noHashTag").all())

7349

In [763]:
twt_model.find(twt_model.entities.hashtags.text == "COVID19InTurkeysPrisons").all()

[]

### by some number

In [764]:
top_tweeter = twt_model.find(
    twt_model.user.followers_count >= 2000000
).all()

In [765]:
schema = {'user_screen_name': [], 'followers_count': []}
top_tweeter_df = pd.DataFrame(schema)

for i in range(len(top_tweeter)):
    top_tweeter_df.at[i, 'user_screen_name'] = top_tweeter[i].user.screen_name
    top_tweeter_df.at[i, 'followers_count'] = top_tweeter[i].user.followers_count
    
print(top_tweeter_df)

  user_screen_name  followers_count
0    skynewsarabia        5652410.0
1    skynewsarabia        5652459.0
2       la_patilla        7121282.0
3   casspernyovest        2702055.0
4         detikcom       15884929.0


In [657]:
# # search by some number: tweeter with the mamixmum number of followers

# top_tweeter = twt_model.find(
#     twt_model.user.followers_count >= 2000000
# ).sort_by("-followers_count").all()

In [268]:
# for i in range(len(top_tweeter)):
#     print(top_tweeter[i].screen_name + "\t" + str(top_tweeter[i].followers_count) + "\n")

detikcom	15884929

la_patilla	7121282

skynewsarabia	5652410

virsanghvi	4329132

NTelevisa_com	3047122

repubblica	2992272

casspernyovest	2702055



### by time range

In [785]:
len(twt_model.find((twt_model.created_at == "2020-04-12")).all()) # all data are from 2020-04-12???

7349

In [786]:
len(twt_model.find((twt_model.user.created_at == "2017-11-29")).all())

2