In [0]:
spark

In [0]:
# Import packages

import pymongo
from pymongo import MongoClient
import json
import bson
from bson import ObjectId
from bson import json_util as jsonb
import datetime
import pandas as pd
import dns
import numpy as np
#import dnspython

In [0]:
weather_train_pd = pd.read_csv("/dbfs/FileStore/tables/weather_train.csv", header='infer')
weather_test_pd = pd.read_csv("/dbfs/FileStore/tables/weather_test.csv", header='infer')
train_pd = pd.read_csv("/dbfs/FileStore/tables/train.csv", header='infer')
test_pd = pd.read_csv("/dbfs/FileStore/tables/test.csv", header='infer')
building_pd = pd.read_csv("/dbfs/FileStore/tables/building_metadata.csv", header='infer')
print('Data Loaded')

In [0]:
print('the size of train dataset:', train_pd.shape)
print('the size of test dataset:', test_pd.shape)
print('the size of weather_train dataset:', weather_train_pd.shape)
print('the size of weather_test dataset:', weather_test_pd.shape)
print('the size of building_metadata dataset:', building_pd.shape)

In [0]:
# Core class in this notebook
class mongoDB():
    
    """
    Class Description:
    - Designed to run in Azure Databricks
    - Original version (run locally) in `data_connector.ipynb`
    - Serve as the Data Connector layer in the project. 
    - raw_data->database->spark_data
        
    Initialization:
      - host_address
      - port_number
      - connect the Mongo client
      
    Functions: 
      - Create a MongoDB service for the user
      - Write raw data (pd.dataframe)/huge raw data into mongodb collections
      - Load mongodb collections into Spark.DataFrame
      - Drop existing collections
      - Count the number of rows in a collection
      - Close MongoDB connection
    
    Notices:
      - Use close_connection after finishing using mongo service (for the best)
    
    """  
    
    def __init__(self, username='YanzheYUAN', password='YanzheYUAN', cluster_name='cluster0', database_name='ashrae_db'):
        self.username = username
        self.password = password
        self.cluster_name = cluster_name
        self.database_name = database_name
        # Create a MongoClient to the running mongod instance
        self.mongo_client = MongoClient('mongodb+srv://'+username+':'+password+'@'+cluster_name+'.3khuz.mongodb.net/'+database_name+'?retryWrites=true&w=majority') 
        
        '''
        An alternative way to link to mongodb
        #from pyspark.sql import SparkSession
        #my_spark = SparkSession \
        #    .builder \
        #    .appName("Databricks Shell") \
        #    .getOrCreate()
        #df = my_spark.read.format("com.mongodb.spark.sql.DefaultSource") \
        #  .option("uri", 'mongodb+srv://YanzheYUAN:YanzheYUAN@cluster0.3khuz.mongodb.net/sample_airbnb.listingsAndReviews?retryWrites=true&w=majority') \
        #  .load()
        '''
    
    def data_to_db(self, database_name='ashrae_db', collection_name='collection_test', raw_data=pd.DataFrame(np.arange(100).reshape(25,4), columns=list('wxyz'))):
        '''
        Write raw data (pd.dataframe form) into mongodb database (as a collection)
        '''
        
        client = self.mongo_client
        
        # Create a database inside this client
        db = client[database_name]

        # Create a collection inside this database.
        # A collection is a group of documents stored in MongoDB, and can be thought of as roughly the equivalent of a table in a relational database
        collection = db[collection_name]

        # Data in MongoDB is represented (and stored) using JSON-style documents. In PyMongo we use dictionaries to represent documents.
        # data = your_df.to_dict(orient='record') / mycol.insert_many(data) may be useful as well.
        print('----------------------------------------------')
        print('Start Inserting......')
        collection.insert_many(json.loads(raw_data.T.to_json()).values())
    
        # print related information
        # print()
        print('- Data inserted to Database:', database_name, 'Collection:', collection_name)
        # print('There are other collections:', db.list_collection_names(), 'And other databases:', client.list_database_names())
        print('- The number of rows in', collection_name, 'is', collection.count_documents({}))
        print('- Fetch the first row in the collection', jsonb.dumps(list(collection.find_one())))
        print('Data Loaded')
    
        '''
        If there are redundant info in the collection, use the following commands and simply rerun this function:
          client = MongoClient()
          db = client['database_test_1']
          collection = db['collection_test_1']
          collection.drop()
          client.close()
        Alternatively, use the following to remove single document
          # collection.remove( {'_id':id_num}) 
          db.test.delete_many({'x': 1})
        Use the following to find all documents in oone collection:
          list(collection.find())
        '''

    def huge_data_to_db(self, database_name='ashrae_db', collection_name='collection_test', data=pd.DataFrame(np.arange(100).reshape(25,4), columns=list('wxyz')),split_num=10):
        '''
        Split big data into pieces and feed them into mongoDB piece by piece in order to show the process
        '''
        def split_df(data=pd.DataFrame(np.arange(100).reshape(25,4), columns=list('wxyz')), split_num=10):
            '''
            Split raw data (pd.DataFrame form) into different pieces so that it is available to load into mongoDB
            '''
            df_length = data.shape[0]
            result = []
            start = 0
            end = 0
            for i in range(1,split_num+1):
                print(i)
                if i == split_num:
                    end = df_length
                else:
                    end=i*int(df_length/split_num)
                print('start',start)
                print('end',end)
                result.append(data[start:end])
                start=end
            return result
        
        self.get_row_num(database_name, collection_name)
        print('--------------Start At:', datetime.datetime.now(),'----------------')
        
        df_splited = split_df(data,split_num)
        for i,item in enumerate(df_splited):
            self.data_to_db(database_name, collection_name, item)
            print("--------------Data split",i+1,"has been loaded----------------")
        
        print('--------------Finish At:', datetime.datetime.now(),'----------------')
        self.get_row_num(database_name, collection_name)
        
    def db_to_data(self, database_name='ashrae_db', collection_name='collection_test', query={}, id_exist=False):
        '''
        Load collections/data from collections to Spark_DataFrame
        '''
        collection = self.mongo_client[database_name][collection_name]
        cursor = collection.find(query)
        df = pd.DataFrame(list(cursor))
        if bool(1-id_exist):
            del df['_id']
        print('Data Loaded')
        return df
    
    def drop_collection(self, database_name='ashrae_db', collection_name='collection_test'):
        '''
        Drop a collection(table) in a database, all specified.
        '''
        collection = self.mongo_client[database_name][collection_name]
        collection.drop()
        print('Collection Dropped')
    
    def get_row_num(self, database_name='ashrae_db', collection_name='collection_test'):
        '''
        Get the number of rows in the specified collection
        '''
        collection = self.mongo_client[database_name][collection_name]
        print('There are', collection.count_documents({}),'rows of data in the collection.')

    def close_connection(self):
        self.mongo_client.close()
        print('Client Closed')

In [0]:
# Create an instance of class mongoDB and load data to mongodb database called ashrae_db
instance = mongoDB()
instance.data_to_db('ashrae_db','weather_train',weather_train_pd)
instance.data_to_db('ashrae_db','weather_test',weather_test_pd)
instance.data_to_db('ashrae_db','building_metadata',building_pd)

In [0]:
# Check if loaded
instance.get_row_num('ashrae_db','weather_train')
instance.get_row_num('ashrae_db','weather_test')
instance.get_row_num('ashrae_db','building_metadata')


In [0]:
# Load train data (relatively large data)

# instance.huge_data_to_db('ashrae_db','train',train_pd,100)

# The data size exceeds the limit of mongoDB Atlas(free trial), so we upload the train.csv and test.csv, while building_csv, weather_train.csv and weather_test.csv are stored in the mongoDB


In [0]:
# Load train data (relatively large data)

#instance.huge_data_to_db('ashrae_db','test',test_pd,200)


In [0]:
#import pymongo
#client = pymongo.MongoClient("mongodb+srv://YanzheYUAN:YanzheYUAN@cluster0.3khuz.mongodb.net/sample_weatherdata?retryWrites=true&w=majority")
#db = client.sample_weatherdata
#collection = db.data
#collection.find_one()
