### 1. Libraries

In [1]:
#python_version = 3.12.9  # Python version used for this script
# REQUIREMENTS INSTALLATION (uncomment the next line to install)
# %pip install -r requirements.txt

from pymongo import MongoClient, ASCENDING # MongoDB driver
import pandas as pd                        # Data manipulation library
from bson import ObjectId, json_util       # BSON ObjectId for MongoDB and json handling
from datetime import datetime              # Date and time handling

EXPLICIT_STATEMENTS = False  # To disable explicit statements that are not needed

### 2. Activity tasks

##### 1.- Crear una base de datos MongoDB

In [None]:
# Connect to MongoDB Atlas database
dbStringConnection = "mongodb+srv://jfmateo:Portfolio@cfdatabase.itrw9ge.mongodb.net/"
client = MongoClient(dbStringConnection)
try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
    print(e)

# Database name 
dbName = 'Fundamentos_Actividad'

if EXPLICIT_STATEMENTS:
    # Explicit statement to use a database, but not necessary
    client.use(dbName)
else:
    # Use the database
    db = client[dbName]




Pinged your deployment. You successfully connected to MongoDB!


##### 2.- Crear las colecciones e ingestar los datos:

2.1.- Cargar los datasets proporcionados por el profesor en formato JSON: 
1) json de cuentas de twitter

In [7]:
# RUN ONLY ONCE TO INSERT THE DATA INTO THE COLLECTIONS

# Collection name for accounts
dbCollectionA = 'twitter_Actividad_Cuentas_R'

if EXPLICIT_STATEMENTS:
    # Explicit collection creation is not necessary, 
    # as MongoDB creates it automatically when the first document is inserted.
    db.create_collection(dbCollectionA)
else:
    # Collection creation for Twitter accounts
    accounts = db[dbCollectionA]

# Account JSON file path
accounts_file_path = 'twitter_Actividad_Cuentas_R.json'
# Load JSON file into pandas DataFrame
accounts_df = pd.read_json(accounts_file_path)
print("Accounts DataFrame:")
print(accounts_df.head(5).to_markdown()) # Pretty print of the first 5 rows of the DataFrame

# It is needed to transform the '_id' field from a dictionary with '$oid' to an ObjectId
for index, row in accounts_df.iterrows():
    accounts_df.at[index, '_id'] = ObjectId(accounts_df.at[index, '_id']["$oid"])

# Insert the data and create an index on the 'Twitter_handle' 
# field only if the insertion is successful
if accounts.insert_many(accounts_df.to_dict('records')).acknowledged:
    print("Accounts data inserted successfully.")   
    print(f"Number of documents in {dbCollectionA}: {accounts.count_documents({})}")
    # Create an index to ensure uniqueness
    accounts.create_index([('Twitter_handle', ASCENDING)], unique=True)
else:
    print("Failed to insert accounts data.")


Accounts DataFrame:
|    | _id                                  |   Unique_ID | org_name        |   org_url | Twitter_URL                         | Twitter_handle   |   earliest_tweet_in_db |   number_of_tweets_in_db |
|---:|:-------------------------------------|------------:|:----------------|----------:|:------------------------------------|:-----------------|-----------------------:|-------------------------:|
|  0 | {'$oid': '644ce86ffc978618d25518b2'} |           1 | 'El Mundo'      |       nan | https://twitter.com/elmundoes       | elmundoes        |                    nan |                      nan |
|  1 | {'$oid': '644ce86ffc978618d25518b3'} |           2 | 'El Pais'       |       nan | https://twitter.com/el_pais         | el_pais          |                    nan |                      nan |
|  2 | {'$oid': '644ce86ffc978618d25518b4'} |           3 | 'La Vanguardia' |       nan | https://twitter.com/LaVanguardia    | LaVanguardia     |                    nan |             

2) json de tweets. 

In [8]:
# RUN ONLY ONCE TO INSERT THE DATA INTO THE COLLECTIONS

# Collection name for tweets
dbCollectionT = 'tweets_Actividad_R'

if EXPLICIT_STATEMENTS:
    # Explicit collection creation is not necessary, 
    # as MongoDB creates it automatically when the first document is inserted.
    db.create_collection(dbCollectionT)
else:
    # Collection creation for tweets
    tweets = db[dbCollectionT]

# Tweets JSON file path
tweets_file_path = 'tweets_Actividad_R.json'
# Load JSON file into pandas DataFrame
tweets_df = pd.read_json(tweets_file_path)
print("Tweets DataFrame:")
print(tweets_df.head(5).to_markdown()) # Pretty print of the first 5 rows of the DataFrame

# It is needed to transform the '_id' field from a dictionary with '$oid' to an ObjectId
for index, row in tweets_df.iterrows():
    tweets_df.at[index, '_id'] = ObjectId(tweets_df.at[index, '_id']["$oid"])

# Insert the data and create an index on the 'id_str' 
# field only if the insertion is successful
if tweets.insert_many(tweets_df.to_dict('records')).acknowledged:
    print("Tweets inserted successfully.")
    print(f"Number of documents in {dbCollectionT}: {tweets.count_documents({})}")
    # Create an index to ensure uniqueness
    tweets.create_index([('id_str', ASCENDING)], unique=True)
else:
    print("Failed to insert tweets.")

Tweets DataFrame:
|    | _id                                  | created_at                | id                                     |              id_str | text                                                                                                        | truncated   | entities                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         

2.2.- Cargar la colección de ejemplo  de MongoDB Atlas con datos de 
geolocalización (sample_geospatial >> shipwrecks).

In [9]:
# THIS CODE IS USE TO CREATE A LOCAL FILE OF THE SHIPWRECKS COLLECTION
# IN GEOSPATIAL DATASET OF MONGODB ATLAS
# REQUIRES THE MONGODB ATLAS EXAMPLE DATABASE LOADED

# These parameters are defined in the MongoDB Atlas example
dbExample = 'sample_geospatial'
shipwrecks_og = client[dbExample]['shipwrecks']

# Fetch all shipwrecks data from the collection
shipwrecks_df = pd.DataFrame(list(shipwrecks_og.find())) # List then to DataFrame
# Print the first 5 rows of the DataFrame
print("Shipwrecks DataFrame:")
print(shipwrecks_df.head(5).to_markdown())


# Check the length of the DataFrame is the same as the number of documents in the collection
if shipwrecks_og.count_documents({}) == shipwrecks_df.shape[0]:
    print("Shipwrecks data fetched successfully. Writing to JSON file...")
    # Write the DataFrame to a JSON file as a local copy
    shipwrecks_df.to_json('geospatial_shipwrecks.json', 
                        orient='records', lines=False, indent=4,    # Standard JSON format
                        default_handler=json_util.default)          # Needed to handle ObjectId formatting
    print("Shipwrecks data written to 'geospatial_shipwrecks.json'.")
else:
    print("Failed to fetch shipwrecks data.")
    print(f"Expected: {shipwrecks_og.count_documents({})}, Found: {shipwrecks_df.shape[0]}")

Shipwrecks DataFrame:
|    | _id                      | recrd   | vesslterms   | feature_type                  | chart                    |   latdec |   londec | gp_quality   |   depth | sounding_type   | history   | quasou        | watlev                       | coordinates              |
|---:|:-------------------------|:--------|:-------------|:------------------------------|:-------------------------|---------:|---------:|:-------------|--------:|:----------------|:----------|:--------------|:-----------------------------|:-------------------------|
|  0 | 578f6fa2df35c7fbdbaed8c4 |         |              | Wrecks - Visible              | US,U1,graph,DNC H1409860 |  9.35478 | -79.9081 |              |       0 |                 |           |               | always dry                   | [-79.9081268, 9.3547792] |
|  1 | 578f6fa2df35c7fbdbaed8c5 |         |              | Wrecks - Visible              | US,U1,graph,DNC H1409860 |  9.33403 | -79.9357 |              |       0 |       

In [9]:
# RUN ONLY ONCE TO INSERT THE DATA INTO THE COLLECTIONS

# Collection name for accounts
dbCollectionS = 'shipwrecks'

if EXPLICIT_STATEMENTS:
    # Explicit collection creation is not necessary, 
    # as MongoDB creates it automatically when the first document is inserted.
    db.create_collection(dbCollectionS)
else:
    # Collection creation for shipwrecks
    shipwrecks = db[dbCollectionS]

# Shipwrecks JSON file path
shipwrecks_file_path = 'geospatial_shipwrecks.json' # Local copy of the shipwrecks collection (created above)
# Load JSON file into pandas DataFrame
shipwrecks_df = pd.read_json(shipwrecks_file_path)
print("Shipwrecks DataFrame:")
print(shipwrecks_df.head(5).to_markdown()) # Pretty print of the first 5 rows of the DataFrame

# It is needed to transform the '_id' field from a dictionary with '$oid' to an ObjectId
for index, row in shipwrecks_df.iterrows():
    shipwrecks_df.at[index, '_id'] = ObjectId(shipwrecks_df.at[index, '_id']["$oid"])

# Insert the data into Database
if shipwrecks.insert_many(shipwrecks_df.to_dict('records')).acknowledged:
    print("Shipwrecks data inserted successfully.")
    print(f"Number of documents in {dbCollectionS}: {shipwrecks.count_documents({})}")
else:
    print("Failed to insert shipwrecks data.")

Shipwrecks DataFrame:
|    | _id                                  | recrd   | vesslterms   | feature_type                  | chart                    |   latdec |   londec | gp_quality   |   depth | sounding_type   | history   | quasou        | watlev                       | coordinates              |
|---:|:-------------------------------------|:--------|:-------------|:------------------------------|:-------------------------|---------:|---------:|:-------------|--------:|:----------------|:----------|:--------------|:-----------------------------|:-------------------------|
|  0 | {'$oid': '578f6fa2df35c7fbdbaed8c4'} |         |              | Wrecks - Visible              | US,U1,graph,DNC H1409860 |  9.35478 | -79.9081 |              |       0 |                 |           |               | always dry                   | [-79.9081268, 9.3547792] |
|  1 | {'$oid': '578f6fa2df35c7fbdbaed8c5'} |         |              | Wrecks - Visible              | US,U1,graph,DNC H1409860 |  9.33

##### 4.1.-  En la colección de cuentas de twitter, tener los campos con el máximo de amigos y con el máximo de tweets enviados, cargar los datos correspondientes mediante consulta mongodb + código python

In [16]:
# Flag to check if the execution was successful
exec_OK = True

# Iterate over each account
for user in accounts.find():
    # Get the account name
    account_name = user['Twitter_handle']
    # Each tweet contains the field with the number of friends as "friends_count" (is also the maximum)
    num_friends = tweets.find_one({'user.screen_name': account_name})['user']['friends_count']
    # For the number of tweets sent, the field is "statuses_count" in the user object (is also the maximum)
    num_statuses = tweets.find_one({'user.screen_name': account_name})['user']['statuses_count']

    # Print to verify the data
    print(f"User: {account_name}".ljust(22), 
          f"Friends: {num_friends}".ljust(15), 
          f"Tweets: {num_statuses}".ljust(10))
    
    # Update the database and stop if it fails
    if not accounts.update_one(
            {'Twitter_handle': account_name},
            {'$set': {'friends_count'   :  num_friends, # friends_count = amigos
                      'statuses_count'  :  num_statuses # tweets_count = tweets enviados
            }}).acknowledged:
        print(f"Failed to update account: {account_name}. Interrupting the process.")
        exec_OK = False
        break

if exec_OK:
    print("All accounts updated successfully.")

User: elmundoes        Friends: 1403   Tweets: 407295
User: el_pais          Friends: 849    Tweets: 741943
User: LaVanguardia     Friends: 706    Tweets: 650271
User: RafaelNadal      Friends: 149    Tweets: 3194
User: sanchezcastejon  Friends: 5977   Tweets: 32718
User: unicomplutense   Friends: 5105   Tweets: 15293
User: uc3m             Friends: 679    Tweets: 11536
User: La_UPM           Friends: 271    Tweets: 25344
User: valenciacf       Friends: 169    Tweets: 104333
User: AlejandroSanz    Friends: 4707   Tweets: 31209
User: JuanLuisGuerra   Friends: 66     Tweets: 1653
User: LuisFonsi        Friends: 320    Tweets: 12317
All accounts updated successfully.


##### 4.2.- En la colección de tweets, calcular  para cada tweet la antigüedad del tweet relativa a la fecha de creación de la cuenta. Considerando antigüedad 0 si fue enviado el mismo día de creación de la cuenta y sumando +1 por cada día transcurrido desde entonces en función de la fecha del tweet.  Este nuevo campo se llamará Madurez

In [None]:
#TODO: NOT OK. Does not calculate the maturity of the tweets correctly.
dbCollectionT = 'tweets_Actividad_R'
tweets = db[dbCollectionT]

# Flag to check if the execution was successful
exec_OK = True

# Dictionary to store account creation dates
accountcreation_date_dict = {}  
# Find any tweet date in the collection per account
for user in tweets.distinct('user.screen_name'):
    # Find any tweet from the user
    any_tweet= tweets.find_one({'user.screen_name': user})
    accountcreation_date = any_tweet['user']['created_at'] # Get the date field of user creation
    # It is necessary to convert the date in str to the same format as the one in the tweets collection
    # The date is in the format: 'Wed Oct 04 12:34:56 +0000 2023'
    # The target format is: '2023-10-04 00:00:00'. 
    # It is important that the time is set to 00:00:00 to avoid not counting a day because the difference is less than 24 hours.
    accountcreation_date = datetime.strptime(accountcreation_date, '%a %b %d %H:%M:%S %z %Y').strftime('%Y-%m-%d') # From str to Time obj to str with correct format
    accountcreation_date = datetime.fromisoformat(accountcreation_date) # Convert to datetime object again
    # Store the creation date associated to the user
    accountcreation_date_dict[user] = accountcreation_date

# To track the number of tweets processedsed
total_tweets = tweets.count_documents({})
tweet_counter = 0
print(f"Total tweets to process: {total_tweets}")
print(f"Updating tweets maturity...{tweet_counter/total_tweets*100:.2f}%")

checked = 0
# Iterate over each tweet
for tweet in tweets.find():
    # Subtract the tweets creation date from the account creation date to get the maturity
    maturity = (tweet['created_at'] - accountcreation_date_dict[tweet['user']['screen_name']]).days # Use .days to get the difference in days
    # Update the database and stop if it fails
    if not tweets.update_one(
            {'_id': tweet['_id']},
            {'$set': {'maturity': maturity}}).acknowledged: # maturity = madurez
        exec_OK = False
        print(f"Failed to update tweets maturity in tweet {tweet['_id']}. Interrupting the process.")
        break
    tweet_counter += 1
    # This action takes a long time (~9min), so we print the progress every 1000 tweets
    if tweet_counter % 1000 == 0:
        print(f"Updating tweets maturity...{tweet_counter} done({tweet_counter/total_tweets*100:.2f}%)")

if exec_OK:
    print("All tweets maturity updated successfully.")

Total tweets to process: 11991
Updating tweets maturity...0.00%
Updating tweets maturity...1000 done(8.34%)
Updating tweets maturity...2000 done(16.68%)
Updating tweets maturity...3000 done(25.02%)
Updating tweets maturity...4000 done(33.36%)
Updating tweets maturity...5000 done(41.70%)
Updating tweets maturity...6000 done(50.04%)
Updating tweets maturity...7000 done(58.38%)
Updating tweets maturity...8000 done(66.72%)
Updating tweets maturity...9000 done(75.06%)
Updating tweets maturity...10000 done(83.40%)
Updating tweets maturity...11000 done(91.74%)
All tweets maturity updated successfully.


##### 4.3.- En la colección de tweets, calcular para cada tweet la diferencia en días entre la fecha de ese tweet y la fecha del tweet más reciente de la misma cuenta (user.screen_name). Para el tweet más reciente de cada cuenta esa diferencia será 0. Los demás tweets de la misma cuenta tendrán valores positivos según cuántos días han pasado desde ese tweet hasta el más reciente de la cuenta. Este nuevo campo se llamará Desfase. 

In [51]:
# Flag to check if the execution was successful
exec_OK = True

latest_tweet_dict = {}  # Dictionary to store the latest tweet date per account
# Find the latest tweet date in the collection per account
for user in tweets.distinct('user.screen_name'):
    # Find the latest tweet for the user
    latest_tweet= tweets.find_one({'user.screen_name': user}, # Filter by user
                                    sort=[('created_at', -1)]) # Sort by created_at in descending order (later first)
    latest_tweet_date = latest_tweet['created_at'] # Get the date field
    # Store the latest tweet date in the dictionary with the user as key
    latest_tweet_dict[user] = latest_tweet_date
    
# To track the number of tweets processed
total_tweets = tweets.count_documents({})
tweet_counter = 0
print(f"Total tweets to process: {total_tweets}")
print(f"Updating tweets offset...{tweet_counter/total_tweets*100:.2f}%")

# Iterate over each tweet
for tweet in tweets.find():
    # Subtract creation date of the tweet from the latest's to calculate maturity
    offset = (latest_tweet_dict[tweet['user']['screen_name']] - tweet['created_at']).days # Use .days to get the difference in days
    # Update the database and stop if it fails
    if not tweets.update_one(
            {'_id': tweet['_id']},
            {'$set': {'offset': offset}}).acknowledged: # offset = desfase
        exec_OK = False
        print(f"Failed to update tweets offset in tweet {tweet['_id']}. Interrupting the process.")
        break
    tweet_counter += 1 # Keep track of the number of tweets processed
    # This action takes a long time(~11min), so we print the progress every 1000 tweets
    if tweet_counter % 1000 == 0:
        print(f"Updating tweets offset...{tweet_counter} done({tweet_counter/total_tweets*100:.2f}%)")

if exec_OK:
    print("All tweets offset updated successfully.")

Total tweets to process: 11991
Updating tweets offset...0.00%
Updating tweets offset...1000 done(8.34%)
Updating tweets offset...2000 done(16.68%)
Updating tweets offset...3000 done(25.02%)
Updating tweets offset...4000 done(33.36%)
Updating tweets offset...5000 done(41.70%)
Updating tweets offset...6000 done(50.04%)
Updating tweets offset...7000 done(58.38%)
Updating tweets offset...8000 done(66.72%)
Updating tweets offset...9000 done(75.06%)
Updating tweets offset...10000 done(83.40%)
Updating tweets offset...11000 done(91.74%)
All tweets offset updated successfully.


### Clean-up and closing

In [5]:
# Delete the database to be able to run the script again
client.drop_database(dbName)

In [None]:
# Close the connection
client.close()
print("Database connection closed.")