In [None]:
import pandas as pd
import os
import translators as ts
import re
from datetime import datetime
from langdetect import detect
from json_parsers import *
from sqlalchemy import create_engine
from pymongo import MongoClient

In [2]:
import psycopg2
import pandas as pd
# Connection parameters, yours will be different
params = {
    "host"      : "localhost",
    "database"  : "frenchgp",
    "user"      : "postgres",
    "password"  : ""
}
def connect(params_dic):
    """ Connect to the PostgreSQL database server """
    conn = None
    try:
        # connect to the PostgreSQL server
        print('Connecting to the PostgreSQL database...')
        conn = psycopg2.connect(**params_dic)
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        sys.exit(1) 
    print("Connection successful")
    return conn

def getDrivers(df):
    driver_arr = np.unique([drivers_id[i] for i in drivers if re.search(i, df['translated_text'].lower())])
    return driver_arr

In [7]:
client_db =  'stream'
client_col = 'frenchgp_authors' #args.collection

# source and target connections
client = MongoClient('mongodb://localhost:27017')

# database and collection names
db = client[client_db]
col = db[client_col]

In [5]:
df = pd.DataFrame(list(col.find({})))

In [56]:
def getData(df):
    """
    Convert the nexted json inside the data column to its own dataframe
    """
    df['author_id'] = df.apply(lambda df: parse_json(df, 'data', 'author_id'), 1)
    df['created_at'] = df.apply(lambda df: parse_json(df, 'data', 'created_at'), 1)
    df['geo'] = df.apply(lambda df: parse_json(df, 'data', 'geo'), 1)
    df['tweet_id'] = df.apply(lambda df: parse_json_exact(df, 'data', 'id'), 1)
    df['raw_text'] = df.apply(lambda df: parse_json(df, 'data', 'text'), 1)
    df = df[['tweet_id', 'author_id', 'created_at', 'raw_text']]
    return df


def getTranslation(df):
    clean_text = df['clean_text']
    try:
        translated_text = ts.google(clean_text, if_use_cn_host=True)
    except Exception:
        translated_text = clean_text
    return translated_text


def getCleanText(df):
    clean_text = ''.join(e for e in df['raw_text'] if e.isascii())
    clean_text = ''.join(e for e in clean_text if e not in ["!", "@", "#"])
    return clean_text


def getRawText(df, column):
    raw_text = ''.join(e for e in df[column] if e.isascii())
    return raw_text


def getLanguage(df):
    clean_text = ''.join(e for e in df['raw_text'] if e.isascii())
    clean_text = ''.join(e for e in clean_text if e not in ["!", "@", "#"])
    try:
        language = detect(clean_text)
    except Exception:
        language = ''
    return language


def getUsers(df):
    users = df['includes']['users']
    return users


def getUserDataframe(df):
    df['user_created_at'] = df.apply(
        lambda df: parse_json(df, 'users', 'created_at'), 1)
    df['user_id'] = df.apply(lambda df: parse_json(df, 'users', 'id'), 1)
    df['location'] = df.apply(
        lambda df: parse_json(df, 'users', 'location'), 1)
    df['name'] = df.apply(lambda df: parse_json_exact(df, 'users', 'name'), 1)
    df['username'] = df.apply(
        lambda df: parse_json(df, 'users', 'username'), 1)
    df = df.drop(['users'], 1)
    return df


def escapeArray(df, column):
    if len(df[column]) == 0:
        return ''
    else:
        return df[column]

In [8]:
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ CLEAN TWEET DATA / TRANSLATE ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~


data_df = getData(df)

for column in list(data_df):
    data_df[column] = data_df.apply(
        lambda data_df: escapeArray(data_df, column), 1)

for i, j in zip(data_df['tweet_id'], range(len(data_df))):
    if isinstance(i, np.ndarray):
        data_df['tweet_id'][j] = data_df['tweet_id'][j][0]

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


In [None]:
data_df

In [None]:
driver = pd.read_csv("../../../data/driver.csv")
engine = create_engine('postgresql://postgres@localhost:5432/frenchgp')
driver.to_sql('driver', engine, index=False, if_exists='replace')

In [80]:
tweets = pd.read_sql_query(
    """
    SELECT tweet_id, author_id, created_at, language, translated_text FROM tweet;
    """, con = connect(params))

tweets = tweets[tweets['tweet_id'] != ''].reset_index(drop=True)

Connecting to the PostgreSQL database...
Connection successful


In [81]:
drivers = [
    'max', 'verstappen', 'lewis', 'hamilton', 'sergio', 'perez', 'lando',
    'norris', 'charles', 'leclerc', 'valtteri', 'bottas', 'carlos', 'sainz',
    'pierre', 'gasly', 'sebastian', 'vettel', 'daniel', 'ricciardo',
    'fernando', 'alonso', 'esteban', 'ocon', 'lance', 'stroll', 'yuki',
    'tsunoda', 'kimi', 'raikkonen', 'antonio', 'giovinazzi', 'mick',
    'schumacher', 'george', 'russell', 'nikita', 'mazepin', 'nicholas',
    'latifi'
]
drivers_id = {
    'max': 1,
    'verstappen': 1,
    'lewis': 2,
    'hamilton': 2,
    'sergio': 3,
    'perez': 3,
    'lando': 4,
    'norris': 4,
    'charles': 5,
    'leclerc': 5,
    'valtteri': 6,
    'bottas': 6,
    'carlos': 7,
    'sainz': 7,
    'pierre': 8,
    'gasly': 8,
    'sebastian': 9,
    'vettel': 9,
    'daniel': 10,
    'ricciardo': 10,
    'fernando': 11,
    'alonso': 11,
    'esteban': 12,
    'ocon': 12,
    'lance': 13,
    'stroll': 13,
    'yuki': 14,
    'tsunoda': 14,
    'kimi': 15,
    'raikkonen': 15,
    'antonio': 16,
    'giovinazzi': 16,
    'mick': 17,
    'schumacher': 17,
    'george': 18,
    'russell': 18,
    'nikita': 19,
    'mazepin': 19,
    'nicholas': 20,
    'latifi': 20
}

# GET DRIVER ENTITIES

In [82]:
tweets['driver_id'] = tweets.apply(lambda tweets: getDrivers(tweets), 1)
driver_assign = tweets[['tweet_id', 'driver_id']]
driver_assign = driver_assign.explode('driver_id')
driver_assign = driver_assign.dropna(axis=0)
driver_assign['driver_id'] = driver_assign['driver_id'].astype(int)
driver_assign.to_sql('driver_entities', engine, index=False, if_exists='replace')

In [None]:
driver_assign.sort_values('tweet_id')

In [None]:
driver_assign

# GET TWEET META DATA

In [None]:
import requests
import os
import json

# To set your enviornment variables in your terminal run the following line:
# export 'BEARER_TOKEN'='<your_bearer_token>'


def auth():
    return os.environ.get("BEARER_TOKEN")


def create_url(tweet_id):
#     tweet_fields = "tweet.fields=lang,author_id"
    tweet_fields = 'tweet.fields=public_metrics,geo'
    # Tweet fields are adjustable.
    # Options include:
    # attachments, author_id, context_annotations,
    # conversation_id, created_at, entities, geo, id,
    # in_reply_to_user_id, lang, non_public_metrics, organic_metrics,
    # possibly_sensitive, promoted_metrics, public_metrics, referenced_tweets,
    # source, text, and withheld
    id = "ids={}".format(tweet_id)
    # You can adjust ids to include a single Tweets.
    # Or you can add to up to 100 comma-separated IDs
    url = "https://api.twitter.com/2/tweets?{}&{}".format(id, tweet_fields)
    return url


def create_headers(bearer_token):
    headers = {"Authorization": "Bearer {}".format(bearer_token)}
    return headers


def connect_to_endpoint(url, headers):
    response = requests.request("GET", url, headers=headers)
    print(response.status_code)
    if response.status_code != 200:
        raise Exception(
            "Request returned an error: {} {}".format(
                response.status_code, response.text
            )
        )
    return response.json()


def main():
    bearer_token = auth()
    url = create_url(tweets['tweet_id'][0])
    headers = create_headers(bearer_token)
    json_response = connect_to_endpoint(url, headers)
    print(json_response, "\n\n")
    print(json.dumps(json_response, indent=4, sort_keys=True))


if __name__ == "__main__":
    main()

In [None]:
import math
batch = 0
batch_size = 100
batches = math.ceil(len(data_df)/batch_size)
ids = str(list(data_df['tweet_id'][batch*batch_size:batch*batch_size+batch_size]))\
    .replace("'", "").replace("[", "").replace("]", "").replace(" ", "")

In [None]:
batches

In [None]:
batch_size = 100
for batch in range(batches):   
    try:
        ids = str(list(data_df['tweet_id'][batch*batch_size:batch*batch_size+batch_size-1]))\
            .replace("'", "").replace("[", "").replace("]", "").replace(" ", "")
        bearer_token = auth()
        url = create_url(ids)
        headers = create_headers(bearer_token)
        json_response = connect_to_endpoint(url, headers)
        col.insert_many(json_response['data'])
        print(batch)
    except:
        print(batch)
#     print(batch*batch_size, batch*batch_size+batch_size-1)

In [None]:
ids

In [None]:
for batch in range(442, batches)
ids = str(list(data_df['tweet_id'][442*batch_size:]))\
    .replace("'", "").replace("[", "").replace("]", "").replace(" ", "")
bearer_token = auth()
url = create_url(ids)
headers = create_headers(bearer_token)
json_response = connect_to_endpoint(url, headers)
# col.insert_many(json_response['data'])
print(batch)
#     print(batch*batch_size, batch*batch_size+batch_size-1)

In [None]:
len(ids)

In [None]:
json_response['data']

In [None]:
json_response

# ETL META

In [None]:

df = pd.DataFrame(list(col.find({})))

In [None]:
df = df.drop_duplicates('id', keep='first').reset_index(drop=True)

In [None]:
df

In [None]:
df['retweet_count'] = df.apply(lambda df: parse_json(df, 'public_metrics', 'retweet_count'), 1)
df['reply_count'] = df.apply(lambda df: parse_json(df, 'public_metrics', 'reply_count'), 1)
df['like_count'] = df.apply(lambda df: parse_json(df, 'public_metrics', 'like_count'), 1)
df['quote_count'] = df.apply(lambda df: parse_json(df, 'public_metrics', 'quote_count'), 1)
# df['tweet_id'] = df.apply(lambda df: parse_json(df, 'data', 'id'), 1)

In [None]:
df = df.rename(index=str, columns={'id': 'tweet_id'})

In [None]:
df = df[['tweet_id','retweet_count','reply_count','like_count','quote_count']]

In [None]:
df

In [None]:
def escapeArray(df, column):
    if len(df[column]) == 0:
        return ''
    else:
        return df[column]

for column in list(df):
    for i, j in zip(df[column], range(len(df))):
        if isinstance(i, np.ndarray):
            df[column][j] = 0
            
df = df.reset_index(drop=True)
df['retweet_count'] = df['retweet_count'].astype(int)
df['reply_count'] = df['reply_count'].astype(int)
df['like_count'] = df['like_count'].astype(int)
df['quote_count'] = df['quote_count'].astype(int)

In [None]:
df.to_sql('meta', engine, index=False, if_exists='replace')

# USER LOOKUP

In [38]:
import requests
import os
import json

# To set your enviornment variables in your terminal run the following line:
# export 'BEARER_TOKEN'='<your_bearer_token>'


def auth():
    return os.environ.get("BEARER_TOKEN")


def create_url(usernames):
    # Specify the usernames that you want to lookup below
    # You can enter up to 100 comma-separated values.
#     usernames = "69008563"
    user_fields = "user.fields=description,created_at,public_metrics,location,name,username,verified"
    # User fields are adjustable, options include:
    # created_at, description, entities, id, location, name,
    # pinned_tweet_id, profile_image_url, protected,
    # public_metrics, url, username, verified, and withheld
    url = "https://api.twitter.com/2/users?ids={}&{}".format(usernames, user_fields)
    return url


def create_headers(bearer_token):
    headers = {"Authorization": "Bearer {}".format(bearer_token)}
    return headers


def connect_to_endpoint(url, headers):
    response = requests.request("GET", url, headers=headers)
    print(response.status_code)
    if response.status_code != 200:
        raise Exception(
            "Request returned an error: {} {}".format(
                response.status_code, response.text
            )
        )
    return response.json()


def main():
    bearer_token = auth()
    url = create_url()
    headers = create_headers(bearer_token)
    json_response = connect_to_endpoint(url, headers)
    print(json.dumps(json_response, indent=4, sort_keys=True))


# if __name__ == "__main__":
#     main()

In [11]:
author_df = data_df.drop_duplicates('author_id', keep='first').reset_index(drop=True)
author_df

Unnamed: 0,tweet_id,author_id,created_at,raw_text
0,1406593716555505668,2590166742,2021-06-20T12:44:09.000Z,Currently 24C/75F at Paul Ricard the track sur...
1,1406593717113339905,443677496,2021-06-20T12:44:10.000Z,Hab noch keine vollständige Meinung zum neuen ...
2,1406593718430453766,41781539,2021-06-20T12:44:10.000Z,LEWIS HAMILTON!!! #FrenchGP
3,1406593732183482377,2273878903,2021-06-20T12:44:13.000Z,Watching my first #F1 I think I’m just going t...
4,1406593734444212226,4648175957,2021-06-20T12:44:14.000Z,Hoje de manhã vou revezar entre secar o Hamilt...
...,...,...,...,...
20455,1406624674038128644,2210684323,2021-06-20T14:47:10.000Z,Get rid of the 'Driver of the Day' radio inter...
20456,1406624676156252168,707503315,2021-06-20T14:47:11.000Z,#Formula1 • @SChecoPerez logra subirse al podi...
20457,1406624679671046147,12846,2021-06-20T14:47:12.000Z,Tremenda carrera la de hoy!!! Esta entretenida...
20458,1406624680690081795,2814968690,2021-06-20T14:47:12.000Z,What a race! Amazing drive by Max Verstappen. ...


In [19]:
def auth():
    return os.environ.get("BEARER_TOKEN")

def create_headers(bearer_token):
    headers = {"Authorization": "Bearer {}".format(bearer_token)}
    return headers

bearer_token = auth()
headers = create_headers(bearer_token)
ids = '1364879051438002176,1350309702'
user_fields = "user.fields=description,created_at,public_metrics,location,name,username,verified"
r = requests.request("GET", 'https://api.twitter.com/2/users/{}?expansions=pinned_tweet_id&{}'.format(ids, user_fields), headers=headers)

In [34]:
headers = create_headers(bearer_token)
ids = 'ids=1364879051438002176,1350309702'
r = requests.request("GET", 'https://api.twitter.com/2/users?{}&{}'.format(ids, user_fields), headers=headers)
print(json.loads(r.text))

{'data': [{'created_at': '2021-02-25T10:05:11.000Z', 'location': 'India', 'username': 'Khel17934254', 'public_metrics': {'followers_count': 9, 'following_count': 115, 'tweet_count': 27508, 'listed_count': 0}, 'name': 'Khel ख़बर', 'verified': False, 'description': 'Please follow us on Intagram https://t.co/5twmBGY5dF and Facebook khelkhabr', 'id': '1364879051438002176'}, {'created_at': '2013-04-13T21:55:37.000Z', 'location': 'Stoke Ash, England', 'username': 'SFeatley', 'public_metrics': {'followers_count': 397, 'following_count': 987, 'tweet_count': 2568, 'listed_count': 3}, 'name': 'Steven Featley', 'verified': False, 'description': '6x Liver transplants | Content Creator | Owner of SFeatleyTV CHARITY YouTube Channel  | Arsenal & Hamilton Fan | Volunteer For CLDF  | #StillIRise.', 'id': '1350309702'}]}


In [27]:
r

<Response [404]>

In [37]:
ids

'2590166742,443677496,41781539,2273878903,4648175957,1405862551250903040,3298415044,1228365703486545925,934461876309839872,4607264653,2675145414,1081063545461186560,97050667,1406105241657937923,1298261221980827654,1214918155526651904,406326008,1143541105369202688,28039562,2847430133,1854496014,1367203592,2598689675,1128098521830150144,547957320,70131679,2277937115,1234955542533197825,18016521,24276976,836713582079295488,50785047,3239509871,3290301301,111416958,1110350069675503618,872472139466555393,1234946158264373248,1116995788649971713,1234953194398212099,628786129,1100022872922341377,1300844179661238273,60868886,94438597,19914423,1124585346052890624,9290152,173449565,320266494,163518481,1329169727684554753,65691630,2519242866,48593376,748234411422920705,756868705523564546,282447944,2637582405,472705227,260467752,1122153434881282048,22677822,996145596200255488,74797302,709766541710127104,1057621820,271724821,22879111,210501383,2163291222,1040742121647398914,1320387150228148225,231205

In [41]:
batch_size = 100
batches = math.ceil(len(author_df)/batch_size)
for batch in range(batches):   
    try:
        ids = str(list(author_df['author_id'][batch*batch_size:batch*batch_size+batch_size-1]))\
            .replace("'", "").replace("[", "").replace("]", "").replace(" ", "")
        bearer_token = auth()
        url = create_url(ids)
        headers = create_headers(bearer_token)
        json_response = connect_to_endpoint(url, headers)
        col.insert_many(json_response['data'])
        print(batch)
    except Exception:
        print(batch)
#     print(batch*batch_size, batch*batch_size+batch_size-1)

200
0
200
1
200
2
200
3
200
4
200
5
200
6
200
7
200
8
200
9
200
10
200
11
200
12
200
13
200
14
200
15
200
16
200
17
200
18
200
19
200
20
200
21
200
22
200
23
200
24
200
25
200
26
200
27
200
28
200
29
200
30
200
31
200
32
200
33
200
34
200
35
200
36
200
37
200
38
200
39
200
40
200
41
200
42
200
43
200
44
200
45
200
46
200
47
200
48
200
49
200
50
200
51
200
52
200
53
200
54
200
55
200
56
200
57
200
58
200
59
200
60
200
61
200
62
200
63
200
64
200
65
200
66
200
67
200
68
200
69
200
70
200
71
200
72
200
73
200
74
200
75
200
76
200
77
200
78
200
79
200
80
200
81
200
82
200
83
200
84
200
85
200
86
200
87
200
88
200
89
200
90
200
91
200
92
200
93
200
94
200
95
200
96
200
97
200
98
200
99
200
100
200
101
200
102
200
103
200
104
200
105
200
106
200
107
200
108
200
109
200
110
200
111
200
112
200
113
200
114
200
115
200
116
200
117
200
118
200
119
200
120
200
121
200
122
200
123
200
124
200
125
200
126
200
127
200
128
200
129
200
130
200
131
200
132
200
133
200
134
200
135
200
136
200
137
200
13

In [73]:
df = pd.DataFrame(list(col.find({})))

In [74]:
df = df[1074:].reset_index(drop=True)

In [75]:
df['public_metrics'][0]

{'followers_count': 842,
 'following_count': 512,
 'tweet_count': 100986,
 'listed_count': 0}

In [76]:
df['followers_count'] = df.apply(lambda df: parse_json(df, 'public_metrics', 'followers_count'), 1)
df['following_count'] = df.apply(lambda df: parse_json(df, 'public_metrics', 'following_count'), 1)
df['tweet_count'] = df.apply(lambda df: parse_json(df, 'public_metrics', 'tweet_count'), 1)
df['listed_count'] = df.apply(lambda df: parse_json(df, 'public_metrics', 'listed_count'), 1)

In [77]:
df['name'] = df.apply(lambda df: getRawText(df, 'name'), 1)

In [78]:
df = df.rename(index=str, columns={'id': 'author_id'})
df = df[[
    'author_id', 'description', 'verified', 'username', 'created_at', 'name',
    'location', 'followers_count', 'following_count','tweet_count', 'listed_count'
]]

In [79]:
engine = create_engine('postgresql://postgres@localhost:5432/frenchgp')
df.to_sql('author', engine, if_exists='replace', index=False)

In [69]:
for i, idx in zip(list(df), range(len(df))):
    for j in df[i]:
        if isinstance(j, dict):
            print(i, j, idx)

withheld {'country_codes': ['RU']} 7
withheld {'country_codes': ['RU']} 7
