### <span style=color:blue> Loading Listings & Reviews data from postgresql into local MongoDB    </span>

In [1]:
import sys
import json
import csv
import yaml

import importlib

import pandas as pd
import numpy as np

import matplotlib as mpl
import matplotlib.pyplot as plt
import os
from dotenv import load_dotenv

from datetime import time
from datetime import date
from datetime import datetime
# with the above choices, the imported datetime.time(2023,07,01) is recognized
# from datetime import date
# from datetime import datetime

import pprint

import psycopg2
from sqlalchemy import create_engine, text as sql_text

# Create an utilities file util.py in a folder benchmarking and import it
sys.path.append('helper_functions/')
# import util as util
import util

In [2]:
# test that utils.py has been imported well
util.hello_world()

hello world


<span style=color:blue>Getting PostgreSQL connection set up</span>

In [3]:
# Load the env file 

dotenv_path = 'env_variables.env'
load_dotenv(dotenv_path=dotenv_path)

# Import the env variables

load_dotenv()

schema = os.getenv('DISC_6_SCHEMA')
port = os.getenv('DISC_6_PORT')
host = os.getenv('DISC_6_HOST')
database = os.getenv('DISC_6_DB')
password = os.getenv('DISC_6_PASSWORD')
connection = os.getenv('DISC_6_CONNECTION')

# Create the db engine 

db_eng = create_engine(f"postgresql+psycopg2://{connection}:{password}@{host}:{port}/{database}",
                       connect_args={'options': '-csearch_path={}'.format(schema)},
                       isolation_level = 'SERIALIZABLE')

print("Successfully created db engine.")

Successfully created db engine.


<span style=color:blue>Getting mongodb connection set up</span>

In [4]:
from pymongo import MongoClient

client = MongoClient()
# could have written client = MongoClient("localhost", 27017)
#                 or client = MongoClient("mongodb://localhost:27017/")

<span style=color:blue>Setting up collection "listings" in mongodb</span>

In [5]:
# I have (or will have) a database "airbnb"
db = client.airbnb

# inside the "airbnb" database, I have (or will have) a collection "listings"
listings = db.listings
print(db.list_collection_names())

['testing']


<span style=color:blue>Function to convert dates into datetimes.</span>

<span style=color:blue>It also has conditions that turn various kinds of null values into None.  (Curiously this works on small dataframes, but left some values of "NaT" when applied on very large dataframes.  So, I had to apply the function on a dictionary built from the dataframes.)</span>

In [29]:
import math
n1 = math.nan
print(n1)
print(math.isnan(n1))

n2 = pd.NaT
print()
print(n2)
print(pd.isnull(n2))

testing = db.testing

dict = {'date': datetime(2023, 1, 1,0,0,0)}

ts = dict['date'].timestamp()
print()
print(ts)
output = db.testing.insert_one({'date': datetime.fromtimestamp(ts)})
print(db.testing.find_one({'_id': output.inserted_id}))

# also converts NaT to None, because MongoDB does not recognize NaT
def convert_date_to_datetime(dt):
    if dt is None:
        return None
    elif pd.isnull(dt):  # tests whether dt is the pandas value NaT ("not a time")
        # print('\nEntered the NaT case\n')
        return None
    elif dt != dt:
        return None        # could also use math.nan, I think
    else:
        temp = datetime(dt.year, dt.month, dt.day)
        ts = temp.timestamp()
        new_dt = datetime.fromtimestamp(ts)
        return new_dt

print('\nNow trying with the function')
print('Here are four dictionaries to test with.')
dict1 = {'foo':1, 'date': date(2023,1,2)}
dict2 = {'goo':2, 'date': math.nan}
dict3 = {'hoo':3, 'date': None}
dict4 = {'koo':4, 'date': pd.NaT}

if pd.isnull(dict4['date']):
    print("dict4['date'] tested positive as NaT")
else:
    print("dict4['date'] did not test positive as NaT")
    

print(dict1)
print(dict2)
print(dict3)
print(dict4)
dict1['date'] = convert_date_to_datetime(dict1['date'])
dict2['date'] = convert_date_to_datetime(dict2['date'])
dict3['date'] = convert_date_to_datetime(dict3['date'])
dict4['date'] = convert_date_to_datetime(dict4['date'])

# print(dict1)
# db.testing.insert_one(dict1)
print('\nAfter changing the dictionaries')
print(dict1)
print(dict2)
print(dict3)
print(dict4)

print('\nNow testing that the third and fourth dictionary can be uploaded into MongoDB successfully.')
output = db.testing.insert_one(dict3)
print(type(output))
print(db.testing.find_one({'_id': output.inserted_id}))
print()
output = db.testing.insert_one(dict4)
print(type(output))
print(db.testing.find_one({'_id': output.inserted_id}))

nan
True

NaT
True

1672560000.0
{'_id': ObjectId('664bbc4938b2bd10b476e159'), 'date': datetime.datetime(2023, 1, 1, 0, 0)}

Now trying with the function
Here are three dictionaries to test with.
dict4['date'] tested positive as NaT
{'foo': 1, 'date': datetime.date(2023, 1, 2)}
{'goo': 2, 'date': nan}
{'hoo': 3, 'date': None}
{'koo': 4, 'date': NaT}

After changing the dictionaries
{'foo': 1, 'date': datetime.datetime(2023, 1, 2, 0, 0)}
{'goo': 2, 'date': None}
{'hoo': 3, 'date': None}
{'koo': 4, 'date': None}

Now testing that the third and fourth dictionary can be uploaded into MongoDB successfully.
<class 'pymongo.results.InsertOneResult'>
{'_id': ObjectId('664bbc4938b2bd10b476e15a'), 'hoo': 3, 'date': None}

<class 'pymongo.results.InsertOneResult'>
{'_id': ObjectId('664bbc4938b2bd10b476e15b'), 'koo': 4, 'date': None}


In [7]:
doc = db.testing.find_one()
obj_pointer = doc['_id']
print(type(obj_pointer))
print(obj_pointer)
print(doc)
print(type(doc['date']))
print(db.testing.find_one({'_id': obj_pointer}))

<class 'bson.objectid.ObjectId'>
664987068b80b91420d2b513
{'_id': ObjectId('664987068b80b91420d2b513'), 'date': datetime.datetime(2023, 1, 1, 0, 0)}
<class 'datetime.datetime'>
{'_id': ObjectId('664987068b80b91420d2b513'), 'date': datetime.datetime(2023, 1, 1, 0, 0)}


<span style=color:blue>Dropping the collection "listings" to get a fresh start.    </span>

In [8]:
# recall that db is holding access to the airbnb database within the MongoDB client
db.listings.drop()
print(db.list_collection_names())

['testing']


### <span style=color:blue>As preparation for this, I have a table reviewm (for review_mongodb) in which I dropped the comments_tsv column (because not needed) and renamed column "id" to "review_id" (so that it is not repeating the "id" column of the listings table</span>

In [57]:
import importlib
import util
importlib.reload(util)

# q = util.build_query_full_join_listings_reviewsm()
# q = util.build_query_left_join_listings_reviewsm_null_right()
q = util.build_query_left_join_listings_reviewsm()

print('We will be using the following query:')
print(q)

We will be using the following query:
select *
from listings l left join reviewsm r 
        on l.id = r.listing_id


In [None]:
with db_eng.connect() as conn:
    # df_ljr100 = pd.read_sql(q, con=conn)
    df_ljr = pd.read_sql(q, con=conn)
    

In [56]:
# print(df_ljr100.head())
print(df_ljr.head())
# print(df_ljr.head())
# print(df_ljr_new.iloc[13870])

         id                                               name   host_id  \
0  51944693  Home in Queens · ★4.82 · 1 bedroom · 5 beds · ...  91646104   
1  51944693  Home in Queens · ★4.82 · 1 bedroom · 5 beds · ...  91646104   
2  51944693  Home in Queens · ★4.82 · 1 bedroom · 5 beds · ...  91646104   
3  51944693  Home in Queens · ★4.82 · 1 bedroom · 5 beds · ...  91646104   
4  51944693  Home in Queens · ★4.82 · 1 bedroom · 5 beds · ...  91646104   

  host_name neighbourhood_group neighbourhood  latitude  longitude  \
0       Pao              Queens      Woodside  40.74395  -73.90858   
1       Pao              Queens      Woodside  40.74395  -73.90858   
2       Pao              Queens      Woodside  40.74395  -73.90858   
3       Pao              Queens      Woodside  40.74395  -73.90858   
4       Pao              Queens      Woodside  40.74395  -73.90858   

         room_type  price  ...  calculated_host_listings_count  \
0  Entire home/apt  294.0  ...                          

In [10]:
# should be 998,310 rows.  This is
#     number of records in listings whose id do not show up in reviews['listing_id'] =  11,500
#   + number of reviews                                                              = 986,810
print(df_ljr.shape)

(998310, 24)


In [11]:
# cols = df_ljr100.columns.tolist()
cols = df_ljr.columns.tolist()
print(cols)

['id', 'name', 'host_id', 'host_name', 'neighbourhood_group', 'neighbourhood', 'latitude', 'longitude', 'room_type', 'price', 'minimum_nights', 'number_of_reviews', 'last_review', 'reviews_per_month', 'calculated_host_listings_count', 'availability_365', 'number_of_reviews_ltm', 'license', 'listing_id', 'review_id', 'date', 'reviewer_id', 'reviewer_name', 'comments']


In [37]:
# to do a projection and remove duplicates
cols_of_listings = ['id', 'name', 'host_id', 'host_name', 'neighbourhood_group', 
                    'neighbourhood', 'latitude', 'longitude', 'room_type', 'price', 
                    'minimum_nights', 'number_of_reviews', 'last_review', 
                    'reviews_per_month', 'calculated_host_listings_count', 
                    'availability_365', 'number_of_reviews_ltm', 'license']
cols_of_reviews = ['listing_id', 'review_id', 'date', 'reviewer_id', 
                   'reviewer_name', 'comments']

df_ljr_new = df_ljr.drop(cols_of_reviews, axis=1).drop_duplicates()

# df_ljr_new['last_review'] = df_ljr_new['last_review'].apply(lambda x: convert_date_to_datetime(x))
df_ljr_new['last_review'] = df_ljr_new['last_review'].apply(convert_date_to_datetime)

print(type(df_ljr_new.at[0,'last_review']))
print(str(df_ljr_new.at[0,'last_review']))

print(df_ljr_new.shape)
# print(df_ljr_new.head(10))

print(df_ljr_new.iloc[13870])

<class 'pandas._libs.tslibs.timestamps.Timestamp'>
2023-09-24 00:00:00
(39202, 18)
id                                                              1009467369984606523
name                              Rental unit in New York · 2 bedrooms · 1 bed ·...
host_id                                                                   305240193
host_name                                                                      June
neighbourhood_group                                                       Manhattan
neighbourhood                                                              Gramercy
latitude                                                                  40.736534
longitude                                                                 -73.98062
room_type                                                              Private room
price                                                                         113.0
minimum_nights                                                               

In [13]:
dict_ljr_new = df_ljr_new.to_dict('records')
print(len(dict_ljr_new))
pprint.pp(dict_ljr_new[0])

39202
{'id': '51944693',
 'name': 'Home in Queens · ★4.82 · 1 bedroom · 5 beds · 1 bath',
 'host_id': '91646104',
 'host_name': 'Pao',
 'neighbourhood_group': 'Queens',
 'neighbourhood': 'Woodside',
 'latitude': 40.74395,
 'longitude': -73.90858,
 'room_type': 'Entire home/apt',
 'price': 294.0,
 'minimum_nights': 30,
 'number_of_reviews': 57,
 'last_review': Timestamp('2023-09-24 00:00:00'),
 'reviews_per_month': 1.98,
 'calculated_host_listings_count': 4,
 'availability_365': 89,
 'number_of_reviews_ltm': 23,
 'license': ''}


In [14]:
i = 0

for d in dict_ljr_new:
    i += 1

    df_reviews_one_listing = df_ljr.loc[df_ljr['id'] == d['id']].drop(cols_of_listings, axis=1)

    df_reviews_one_listing['date'] = df_reviews_one_listing['date'].apply(lambda x: convert_date_to_datetime(x))

    dicts_reviews_one_listing = df_reviews_one_listing.to_dict('records')

    if len(dicts_reviews_one_listing) == 1 and dicts_reviews_one_listing[0]['review_id'] is None:
        d['reviews'] = {}
    else:
        d['reviews'] = dicts_reviews_one_listing

    if i % 1000 == 0:
        print('Have now completed step number:', str(i))

print(len(dict_ljr_new))
pprint.pp(dict_ljr_new[-1:])


    

Have now completed step number: 1000
Have now completed step number: 2000
Have now completed step number: 3000
Have now completed step number: 4000
Have now completed step number: 5000
Have now completed step number: 6000
Have now completed step number: 7000
Have now completed step number: 8000
Have now completed step number: 9000
Have now completed step number: 10000
Have now completed step number: 11000
Have now completed step number: 12000
Have now completed step number: 13000
Have now completed step number: 14000
Have now completed step number: 15000
Have now completed step number: 16000
Have now completed step number: 17000
Have now completed step number: 18000
Have now completed step number: 19000
Have now completed step number: 20000
Have now completed step number: 21000
Have now completed step number: 22000
Have now completed step number: 23000
Have now completed step number: 24000
Have now completed step number: 25000
Have now completed step number: 26000
Have now completed st

<span style=color:blue> Doing a bulk insert    </span>

In [16]:
print(len(dict_ljr_new))

39202


<span style=color:blue>Now loading dict_ljr_new into mongodb.  The NaT values occurring in the 'last_review' field of listings need special treatment, unfortunately.     </span>

<span style=color:blue>The loading is done 100 documents at a time, with a last small loat </span>

In [52]:
print(len(dict_ljr_new) % 100)
print(len(dict_ljr_new))

2
39202


In [61]:
# CAUTION: the first step here erases db.listing
# db.listings.drop()
# print(db.list_collection_names())

listings = db.listings

"""
time1 = datetime.now()
for i in range(0,392):
# for j in range(0,10):
# for k in range(0,10):
    # testing for whether the 'last_review' value is NaT, and if so, changing it to None
    #    normally the "apply" function using the "change_date_to_datetime" should take care
    #    of this substitution, but it does not work on my mac for large df's.  
    #    (Oddly, it does work on small df's.)
    for j in range(0,100):
        if dict_ljr_new[100*i + j]['last_review'] != dict_ljr_new[100*i + j]['last_review']:
            dict_ljr_new[100*i + j]['last_review'] = None
    result = listings.insert_many(dict_ljr_new[100*i:100*(i+1)])
    # result = listings.insert_many(dict_ljr_new[13800 + 10*j:13800 + 10*(j+1)])
    # result = listings.insert_many(dict_ljr_new[13870 + k:13870 + k + 1])
    # print('Last element of result for run number', str(i), 'was:')
    # print(result.inserted_ids[-1:])

time2 = datetime.now()

print('\nThe last ObjectID in the collection is:')
print(result.inserted_ids[-1:])

print('\nThe time to do the load of 39K documents into local mongodb, with a total of about 300MB was:')
print(util.time_diff(time1,time2))

"""

# this is for the last 2 records in dict_ljr_new, but built for some number of records over 2
# again, testing for whether the 'last_review' value is NaT, and if so, changing it to None
for j in range(0,len(dict_ljr_new) % 100):
    if dict_ljr_new[39100 + j]['last_review'] != dict_ljr_new[39200 + j]['last_review']:
        dict_ljr_new[39200 + j]['last_review'] = None
result = listings.insert_many(dict_ljr_new[39200:])
print('\nLast element of result for the last run was:')
print(result.inserted_ids[-1:])


print('\nThe total number of documents in the collection db.listings is now:')
print(listings.count_documents({}))



Last element of result for the last run was:
[ObjectId('664be01338b2bd10b4774431')]

The total number of documents in the collection db.listings is now:
39202


In [60]:
print(df_ljr_new.shape)

(39202, 18)


In [None]:


pprint.pp(dict_ljr_new[13800 + 69:13800 + 72])

print(df_ljr_new.iloc[13870])

In [None]:
"""
val_last_review = df_ljr_new.iloc[13870,df_ljr_new.columns.get_loc('last_review')]
print(val_last_review)
print(convert_date_to_datetime(val_last_review))

val_reviews_per_month = df_ljr_new.iloc[13870,df_ljr_new.columns.get_loc('reviews_per_month')]
print(val_reviews_per_month)
print(convert_date_to_datetime(val_last_review))



row = df_ljr_new.iloc[13870]
print(row)
    
doc = dict_ljr_new[13870]
pprint.pp(doc)
print()
row = df_ljr_new.iloc[13870]
print(row)
"""

df_test = df_ljr_new.copy()
df_test = df_test.iloc[13870:13872]
print(df_test.head())
df_test['last_review'] = df_test['last_review'].apply(lambda x: convert_date_to_datetime(x))
# df_test['last_review'] = df_test['last_review'].apply(convert_date_to_datetime)
print(df_test.head())



if dict_ljr_new[13870]['last_review'] != dict_ljr_new[13870]['last_review']:
    dict_ljr_new[13870]['last_review'] = None
pprint.pp(dict_ljr_new[13870])




In [84]:
# result = listings.insert_many(dict_ljr_new[0:1])
print('Last element of result for run number', str(i), 'was:')
print(result.inserted_ids[-1:])
print(type(result.inserted_ids[-1:][0]))
print(db.listings.find_one({'_id': result.inserted_ids[-1:][0]}))

Last element of result for run number 1 was:
[ObjectId('664adfc48550e1de8d8d1dc7')]
<class 'bson.objectid.ObjectId'>
{'_id': ObjectId('664adfc48550e1de8d8d1dc7'), 'id': '47305871', 'name': 'Rental unit in New York · 1 bedroom · 1 bed · 0 shared baths', 'host_id': '305240193', 'host_name': 'June', 'neighbourhood_group': 'Manhattan', 'neighbourhood': 'Harlem', 'latitude': 40.821926, 'longitude': -73.956245, 'room_type': 'Private room', 'price': 46, 'minimum_nights': 30, 'number_of_reviews': 0, 'last_review': nan, 'reviews_per_month': None, 'calculated_host_listings_count': 333, 'availability_365': 160, 'number_of_reviews_ltm': 0, 'license': '', 'reviews': {}}


In [38]:
doc = db.testing.find_one()
obj_pointer = doc['_id']
print(type(obj_pointer))
print(obj_pointer)

<class 'bson.objectid.ObjectId'>
664987068b80b91420d2b513


In [40]:
print(db.list_collection_names())

['testing', 'listings']


In [62]:
def write_dict_to_json(dict, filename):
    with open(filename, 'w') as fp:
        json.dump(dict, fp)

# filename = 'listings_with_reviews_embedded__v01.json'
# write_dict_to_json(dict_ljr_new, filename)
# hmm -- timestamp is not json serializable !!
# some suggestions at
#   https://stackoverflow.com/questions/50404559/python-error-typeerror-object-of-type-timestamp-is-not-json-serializable


TypeError: Object of type Timestamp is not JSON serializable