In [1]:
# Importing all the necessary packages

import luigi # luigi class for our 
import pandas as pd
import json
import os
from pymongo import MongoClient
import psycopg2
from psycopg2 import sql
from pymongo.errors import ConnectionFailure
import warnings
warnings.filterwarnings("ignore")
import requests
import io

# Below class is for reading the JSON file then saving it to our MongoDB Atlas Database
# Operation 1 & 2
class fromJsonToMongo(luigi.Task):

# Our necessary parmeters which are required for the function
    myJsonFileloc = luigi.Parameter()
    myconnString = luigi.Parameter()
    dataBsName = luigi.Parameter()
    collnName = luigi.Parameter()
    taskno = luigi.Parameter()
     
# Defining a new function that we want to perform for insertion of json data into mongodb
    def run(self):
        # Connect to MongoDB Atlas
        
        print("\n")
        print(f"Now Running Task {self.taskno} of Reading the json & saving it to MongoDB")
           
        client = MongoClient(self.myconnString)
        SAA_db = client.get_database(self.dataBsName)
        SAA_collection = SAA_db.get_collection(self.collnName)
        
        # before inserting into the Databse we will first Check if collection is empty or no
        # if not empty thenn we will delete it.
        if SAA_collection.count_documents({}) > 0:
            print(f'Deleting existing data from {self.dataBsName} in collection...')
            SAA_collection.delete_many({})

        # open json file to read the contents that we want to save to mongo DB
        with open(self.myJsonFileloc, 'r') as jfile:
            # Load the JSON data
            myJSONData = json.load(jfile)
            # Insert the data into MongoDB collection
            print(f'Now Inserting data in {self.collnName} collection...')
            SAA_collection.insert_many(myJSONData)
            
        print(f"Records in {self.collnName} collection: {SAA_collection.count_documents({})} ")

        

# Below Class is for reading From the API's then performing Data cleaning and saving it in the csv File
# Operation 3
class readDatafrmAPI(luigi.Task):
    def run(self):
        try:
    
            # URL of the raw CSV file in the GitHub repository
            csv_url = "https://raw.githubusercontent.com/MariaMoor/Investment-and-Trading-Capstone-Project/master/Capstone%20data"

            # we will send a GET request to the URL
            response = requests.get(csv_url)
            
            # let us Check if the request was successful (status code 200)
            if response.status_code == 200:
                # Decode the content if needed (CSV files are typically encoded as UTF-8).
                decoded_content = response.content.decode('utf-8')
                
                
                # Parse the CSV data using Pandas
                appleDataframe = pd.read_csv(io.StringIO(decoded_content), delimiter=',')
                print("Now Running Task 3 i.e Reading Apple Stock Dataset from an API...")
                print("Apple Dataset: ")
                print(appleDataframe.head())  # Print the first few rows of the DataFrame
                print('\n')
                
                # Now we will work with the appleDataframe
                print('Checking the format of the columns before datatypes Changes: ')
                appleDataframe.info()
                print('\n')
                # Keeping only required columns
                appleDataframe = appleDataframe[['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume']]
               
                appleDataframe['Open'] = appleDataframe['Open'].astype(float)
                appleDataframe['High'] = appleDataframe['High'].astype(float)
                appleDataframe['Low'] = appleDataframe['Low'].astype(float)
                appleDataframe['Close'] = appleDataframe['Close'].astype(float)
                appleDataframe['Adj Close'] = appleDataframe['Adj Close'].astype(float)
                appleDataframe['Volume'] = appleDataframe['Volume'].astype(float)
                appleDataframe['Date'] = pd.to_datetime(appleDataframe['Date'], format='%Y-%m-%d')  
                
                 # Check the format of 'Date' column
                print('Checking the format of the columns after datatypes Changes: ')
                
                appleDataframe.info()
                print('\n')
                
                print(appleDataframe.head(3))
                
                
                # now we will save the cleaned DataFrame to a CSV file
                # Create the folder if it does not exist
                folder_path = 'outputDataframes'
                if not os.path.exists(folder_path):
                    os.makedirs(folder_path)

                # Name of the CSV file path within the folder
                csv_file_path = os.path.join(folder_path, 'apple_cleaned_data.csv')

                # Save the DataFrame to CSV file
                appleDataframe.to_csv(csv_file_path, index=False)

                print("we have now completed Data cleaning ")
                print(f"And the cleaned has been data saved to '{folder_path}/apple_cleaned_data.csv'")  
                print('\n')
                
            else:
                print("There was some error while reading CSV from GIT API ")  
                
        except Exception as excep:
            # Handle other types of exceptions
            print(f"Oops! Some Error Occured: {excep}")
 


# The below class is used to retireve Data from superstore Mongo Database and save to a Dataframe and procceding for datacleaning
# Operation 4

class retrieveDataAndCreateDataFramesuperstore(luigi.Task):

    myconnString = luigi.Parameter()
    dataBsName = luigi.Parameter()
    collnName = luigi.Parameter()
        
    def run(self):
        try:
            # Connect to MongoDB Atlas
            client = MongoClient(self.myconnString)
            dabse = client[self.dataBsName] # example superstore_sls_db or  is our database name
            supSls_recrds = dabse[self.collnName] #this is our collection in which we will perform the Data analysis & EDA
            print("\n")
            print("Now Running Task 4 i.e Reading Superstore Sales Dataset from an MongoDB and performing the Data Cleaning & transformation...")
            
            storeSlsDF = pd.DataFrame(list(supSls_recrds.find({})))  # Doing a find operation and storing the data from database in a dataframe
            print('We have retirevied data from mongoDB database and is saved in a Dataframe storeSlsDF \n')
            storeSlsDF.head(2)
            
            # getting some information on column Type
            storeSlsDF.info()
            
            # Lets convert the Sales column datatype to Numeric for our sales analysis as its datatype is in Object
            storeSlsDF['Sales'] = pd.to_numeric(storeSlsDF['Sales'], errors='coerce')  #here 'coerce' will result in NaN conversion of non-convertible values.
            print('Columns type information after convertion of Sales Column datatype to Numeric :\n')
            storeSlsDF.info()
            print('\n')
            
            # Since the "Order Date" and "Ship Date" columns' datatypes are both objects, let's convert them to a datetime format.
            storeSlsDF['Order Date'] = pd.to_datetime(storeSlsDF['Order Date'], format='%d/%m/%Y')  
            storeSlsDF['Ship Date']= pd.to_datetime(storeSlsDF['Ship Date'], format='%d/%m/%Y')
            
            
            # Check the format of 'Date' column
            print('Checking the format of date columns after datatype changes: ')
            storeSlsDF.info()
            print('\n')
            
            #Let's now arrange the data according to the Order Date column.
            storeSlsDF = storeSlsDF.sort_values(by="Order Date")
            print('Arranging the dataframe according to the Order Date column')
            print(storeSlsDF.head(3))
            print('\n')
            
            #remove rows in which the order date is later than the ship date
            storeSlsDF = storeSlsDF[storeSlsDF['Ship Date'] >= storeSlsDF['Order Date']]
            print("Results after removing rows in which the order date is later than the ship date")
            print(storeSlsDF.head(2))
            print('\n')
            
            #lets have a look at columns that shows us how many values are missing.
            print("Checking for all the Null values")
            print(storeSlsDF.isna().sum())
            print('\n')
            
            #lets check if we have any duplicates
            print("Results for dublicates in the datasets")
            print(storeSlsDF.duplicated().sum())
            print('\n')
            
            # now we will save the cleaned DataFrame to a CSV file
            # Create the folder if it does not exist
            folder_path = 'outputDataframes'
            if not os.path.exists(folder_path):
                os.makedirs(folder_path)
            
            # Define the CSV file path within the folder
            csv_file_path = os.path.join(folder_path, 'superStore_cleaned_data.csv')

            # Save the DataFrame to CSV file
            storeSlsDF.to_csv(csv_file_path, index=False)
            
            print("we have now completed Data cleaning ")
            print(f"And the cleaned has been data saved to '{folder_path}/superStore_cleaned_data.csv'")  
            print('\n')
                
                
            
        except ConnectionFailure as ConFailure:  # here we will Handle connection failure exceptions
            print(f"MongoDB Atlas connection failed: {ConFailure}")
        except Exception as excep:
            # Handle other types of exceptions
            print(f"Oops! Some Error Occured: {excep}")

            


    
    
# The below class is used to retireve Data from BlackFriday Mongo Database and save to a Dataframe and procceding for datacleaning
# Operation 5
class retrieveDataAndCreateDataFrameBlackFri(luigi.Task):
 
    myconnString = luigi.Parameter()
    dataBsName = luigi.Parameter()
    collnName = luigi.Parameter()
    def run(self):
        try:
            
#             with open('DATASETS/black_friday.json', 'r') as jfile:
#                 # Load the JSON data
#                 myJSONData = json.load(jfile)
            
#             myJSONData = pd.DataFabsrame(myJSONData)
            print("\n")
            print("Now Running Task 5 i.e Reading Blackfriday Dataset from an MongoDB and performing the Data Cleaning & transformation...")

            # Connect to MongoDB Atlas
            client = MongoClient(self.myconnString)
            dabse = client[self.dataBsName] # example superstore_sls_db or  is our database name
            bf_recrds = dabse[self.collnName] #this is our collection in which we will perform the Data analysis & EDA
            bf_DF = pd.DataFrame(list(bf_recrds.find({})))  # Doing a find operation and storing the data from database in a dataframe
#             bf_DF = myJSONData
            print('We have retirevied data from mongoDB database and is saved in a Dataframe storeSlsDF \n')
            print(bf_DF.head(2))
            # getting some information on column Type
            bf_DF.info()
            
            bf_DF['Product_ID'] = bf_DF['Product_ID'].astype(str)
            bf_DF['Gender'] = bf_DF['Gender'].astype(str)
            bf_DF['Age'] = bf_DF['Age'].astype(str)
            bf_DF['City_Category'] = bf_DF['City_Category'].astype(str) 
            bf_DF['Stay_In_Current_City_Years'] = bf_DF['Stay_In_Current_City_Years'].astype(str)
            print('Columns type information after convertion of Sales Column datatype to Numeric :\n')
            bf_DF.info()
            print('\n')
            
            
            #lets have a look at columns that shows us how many values are missing.
            print("Results after removing rows in which the order date is later than the ship date")
            print(bf_DF.isna().sum())
            print('\n')
            
            #lets check if we have any duplicates
            print("Results for dublicates in the datasets")
            print(bf_DF.duplicated().sum())
            print('\n')
            
           
            #Dropping two columns
            #If a column has more than 30% missing values, we drop it. In this case, as can be seen in the plot, product category 2 and product category 3 have high levels of missing values.
            bf_DF = bf_DF.drop(['Product_Category_2', 'Product_Category_3','_id'], axis=1)
            print("Columns After dropping  product category 2 and product category 3 ")
            print(bf_DF.head(2))
            
#              bf_DF = bf_DF.reset_index(inplace=True)
                
            # Save cleaned DataFrame to CSV file
            # Create the folder if it does not exist
            folder_path = 'outputDataframes'
            if not os.path.exists(folder_path):
                os.makedirs(folder_path)
            # Define the CSV file path within the folder
            csv_file_path = os.path.join(folder_path, 'black_friday_cleaned_data.csv')
 
            # Save the DataFrame to CSV file
            bf_DF.to_csv(csv_file_path, index=False)
            print("we have now completed Data cleaning ")
            print(f"And the cleaned has been data saved to '{folder_path}/black_friday_cleaned_data.csv'")  
            print('\n')
        except ConnectionFailure as CF:   # here we will Handle connection failure exceptions
            print(f"MongoDB Atlas connection failed: {CF}")
        except Exception as excep:
            # Handle other types of exceptions
            print(f"Oops! Some Error Occured: {excep}")

# below class is for creating a New Database in the Postgres SQL Database if not created already.
# If database already exist then delete the database and create the database
# Operation 6
class createDatabaseinPostgres(luigi.Task):
    user = luigi.Parameter()
    password = luigi.Parameter()
    host = luigi.Parameter()
    port = luigi.Parameter()
    database = luigi.Parameter()
    
    def run(self):
      
        # The following code will establish a connection to Postgres and then create a database:
        try:
            # Load CSV data into a DataFrame
            dbConnstr = psycopg2.connect(
                user = self.user,
                password = self.password,
                host = self.host,
                port =  self.port,
                database =self.database
            )

            print("\n")
            print("Now Running Task 6 i.e Creating a Database in Postgres...")

            
            # This will do a autocommit    
            dbConnstr.set_isolation_level(0)
                    # The following code will create a cursor:
            dbCursor = dbConnstr.cursor() 
            # Cursor will drop the database if it exists:
            dbCursor.execute('DROP DATABASE IF EXISTS daapproj;')
            # Cursor will create a database we want:
            dbCursor.execute('CREATE DATABASE daapproj;')    
            print("Database is successfully Created")
                # Closing the cursor:
            dbCursor.close()
            
        except (Exception , psycopg2.Error) as dbError :
            print ("An error has occured while connecting to the PostgreSQL database: ", dbError)
        finally:
            # Closing the postgres connection:
            if dbConnstr in locals(): 
                dbConnstr.close()
                        
            
            
## Now we will read the CSV File and then Save data to postgresSQL for SuperStore Sales analysis Dataset 
# Operation 7
class saveSuperSalesDatatoPostgres(luigi.Task):

    dataSetPath = luigi.Parameter()
    PostGres_table_name = luigi.Parameter()
    postgres_uri = luigi.Parameter()
    user = luigi.Parameter()
    password = luigi.Parameter()
    host = luigi.Parameter()
    port = luigi.Parameter()
    database = luigi.Parameter()
    
    def run(self):
        
        # The following code will establish a connection to Postgres and then create a database:
        try:

            print("\n")
            print("Now Running Task 7 i.e Creating Tables for Superstore and storing data in the table..")

            dbConnstr = psycopg2.connect(
                user = self.user,
                password = self.password,
                host = self.host,
                port =  self.port,
                database =self.database
            )
            
#             Query for our tabel 
            table_create_string = """ 
            DROP TABLE IF EXISTS superstore_sls_tbl;
            CREATE TABLE IF NOT EXISTS superstore_sls_tbl (
            row_id INTEGER,
            order_id VARCHAR(100),
            order_date DATE,
            ship_date DATE,
            ship_mode VARCHAR(100),
            customer_id VARCHAR(100),
            customer_name VARCHAR(100),
            segment VARCHAR(100),
            country VARCHAR(100),
            city VARCHAR(100),
            state VARCHAR(100),
            postal_code VARCHAR(100),
            region VARCHAR(100),
            product_id VARCHAR(100),
            category VARCHAR(100),
            sub_category VARCHAR(100),
            product_name VARCHAR(500),
            sales numeric(20,8)
            ); """
            
            
            # This will establish a connection to the postgres database:
    
            print("Now Creating a Table for SuperStore Sales Dataset in postgres")
            # This will do a autocommit    
            dbConnstr.set_isolation_level(0)
            
            # The following code will create a cursor:
            dbCursor = dbConnstr.cursor() 
            
            # The following code will create the table in the respective database:
            dbCursor.execute(table_create_string)  
            
            # Print the success message of table creation:
            print("Table has been created successfully")

            # Now  Lets read the cleaned csv data and convert it to dataframe
            # Path to the CSV file 
#             superStore_cleaned_data = "outputDataframes/superStore_cleaned_data.csv"
            superStore_cleaned_data =  self.dataSetPath
            # Read the CSV file into a 
            storeSlsDF = pd.read_csv(superStore_cleaned_data) 
            # Display the DataFrame
            print("Read the CSV Dataset successfully")

            print(storeSlsDF.head(5))
            
            storeSlsDF.drop('_id', axis=1, inplace=True)
            storeSlsDF.reset_index(drop=True, inplace=True)

            storeSlsDF.rename(columns={'Row ID': 'row_id', 'Order ID': 'order_id','Order Date': 'order_date','Ship Date': 'ship_date',
                      'Ship Mode': 'ship_mode','Customer ID': 'customer_id','Customer Name': 'customer_name','Postal Code': 'postal_code',
                      'Product ID': 'product_id','Sub-Category': 'sub_category',
                      'Product Name': 'product_name','Segment': 'segment','Country': 'country','City': 'city',
                      'State': 'state','Region': 'region','Category': 'category','Sales': 'sales'}, inplace=True)
            print("\n")
            print("After Renaming the columns in the storeSlsDF:")
            print(storeSlsDF.head(3))
            
            # Type casting of the Data frame :
            storeSlsDF['row_id'] = storeSlsDF['row_id'].astype(int)
            storeSlsDF['order_id'] = storeSlsDF['order_id'].astype(str)
            storeSlsDF['ship_mode'] = storeSlsDF['ship_mode'].astype(str)
            storeSlsDF['customer_id'] = storeSlsDF['customer_id'].astype(str)
            storeSlsDF['customer_name'] = storeSlsDF['customer_name'].astype(str)
            storeSlsDF['segment'] = storeSlsDF['segment'].astype(str)
            storeSlsDF['country'] = storeSlsDF['country'].astype(str)
            storeSlsDF['city'] = storeSlsDF['city'].astype(str)
            storeSlsDF['state'] = storeSlsDF['state'].astype(str)
            storeSlsDF['postal_code'] = storeSlsDF['postal_code'].astype(str)
            storeSlsDF['region'] = storeSlsDF['region'].astype(str)
            storeSlsDF['product_id'] = storeSlsDF['product_id'].astype(str)
            storeSlsDF['category'] = storeSlsDF['category'].astype(str)
            storeSlsDF['sub_category'] = storeSlsDF['sub_category'].astype(str)
            storeSlsDF['product_name'] = storeSlsDF['product_name'].astype(str)

            # Mention the name of the target table:
            table_name = 'superstore_sls_tbl'
            # Prepare the insert query:
            insert_query = sql.SQL("INSERT INTO {} ({}) VALUES ({})").format(
                sql.Identifier(table_name),
                sql.SQL(', ').join(map(sql.Identifier, storeSlsDF.columns)),
                sql.SQL(', ').join(sql.Placeholder() * len(storeSlsDF.columns))
            )
            # Converting the data from dataframe into list:
            values = [tuple(x) for x in storeSlsDF.to_numpy()]
            # Insert the values into the table:
            dbCursor.executemany(insert_query, values)
            # This will do a autocommit    
            dbConnstr.set_isolation_level(0) 
            print("All the data has been saved to the superstore_sls_tbl tables successfully")
            
        except (Exception , psycopg2.Error) as dbError :
            print ("An error has occured while connecting to the PostgreSQL database: ", dbError)
        finally:
            # Closing the postgres connection:
            if dbConnstr in locals(): 
                dbConnstr.close()
                


                
# Now we will save the cleaned dataset BlackFriday to Postgres
# Operation 8
class saveBlackFridayDatatoPostgres(luigi.Task):  
                
           
        dataSetPath = luigi.Parameter()
        PostGres_table_name = luigi.Parameter()
        postgres_uri = luigi.Parameter()
        user = luigi.Parameter()
        password = luigi.Parameter()
        host = luigi.Parameter()
        port = luigi.Parameter()
        database = luigi.Parameter()

        def run(self):

            
            # The following code will establish a connection to Postgres and then create a database:
            try:
                print("\n")
                print("Now Running Task 8 i.e Creating Tables for BlackFriday and storing data in the postgres table..")
                table_create_string = """ 
                DROP TABLE IF EXISTS blackfri_tbl;
                CREATE TABLE IF NOT EXISTS blackfri_tbl (
                user_id INTEGER, 
                product_id VARCHAR(100),
                gender VARCHAR(100),
                age VARCHAR(100),
                occupation INTEGER,
                city_category VARCHAR(100),
                stay_in_current_city_years VARCHAR(100),
                marital_status INTEGER,
                product_category_1 INTEGER,
                purchase numeric(20,8)
                ); """


                dbConnstr = psycopg2.connect(
                    user = self.user,
                    password = self.password,
                    host = self.host,
                    port = self.port,
                    database = self.database
                )

                # This will do a autocommit    
                dbConnstr.set_isolation_level(0)
                # The following code will create a cursor:
                dbCursor = dbConnstr.cursor() 
                # The following code will create the table in the respective database:
                dbCursor.execute(table_create_string)  
                # Print the success message of table creation:
                print("Table has been created successfully")
    #             # Closing the cursor:
    #             dbCursor.close()


                # Now  Lets read the cleaned csv data and convert it to dataframe
                # Path to the CSV file 
                black_friday_cleaned_data = self.dataSetPath
                # Read the CSV file into a 
                blkFriDF = pd.read_csv(black_friday_cleaned_data) 
                # Display the DataFrame
                print("Read the CSV Dataset successfully")

                blkFriDF.reset_index(drop=True, inplace=True)
                print("Black Friday Cleaned CSV retreived :")
                print(blkFriDF.head(3))


                blkFriDF.rename(columns={'User_ID': 'user_id','Product_ID': 'product_id','Gender': 'gender','Age': 'age','Occupation': 'occupation',
                        'City_Category': 'city_category','Stay_In_Current_City_Years': 'stay_in_current_city_years',
                       'Marital_Status': 'marital_status','Product_Category_1': 'product_category_1',
                       'Purchase': 'purchase'}, inplace=True)

                print("Column Names After Renaming the column from the dataset")
                print(blkFriDF.columns)
                
                
                
                table_name = 'blackfri_tbl'
                
                # Prepare the insert query:
                insert_query = sql.SQL("INSERT INTO {} ({}) VALUES ({})").format(
                    sql.Identifier(table_name),
                    sql.SQL(', ').join(map(sql.Identifier, blkFriDF.columns)),
                    sql.SQL(', ').join(sql.Placeholder() * len(blkFriDF.columns))
                )
                
                # Converting the data from dataframe into list:
                values = [tuple(x) for x in blkFriDF.to_numpy()]
                # Insert the values into the table:
                dbCursor.executemany(insert_query, values)
                # This will do a autocommit    
                dbConnstr.set_isolation_level(0) 
                
                print("All the data has been saved to the blackfri_tbl tables successfully")

            except (Exception , psycopg2.Error) as dbError :
                print ("An error has occured while connecting to the PostgreSQL database: ", dbError)
            finally:
                # Closing the postgres connection:
                if dbConnstr in locals():
                    dbConnstr.close()



        
# Now we will save the cleaned dataset AppleStock price to Postgres
class saveAppleStockDatatoPostgres(luigi.Task):
    dataSetPath = luigi.Parameter()
    PostGres_table_name = luigi.Parameter()
    postgres_uri = luigi.Parameter()
    user = luigi.Parameter()
    password = luigi.Parameter()
    host = luigi.Parameter()
    port = luigi.Parameter()
    database = luigi.Parameter()
            
    def run(self):

    # The following code will establish a connection to Postgres and then create a database:
        try:

            print("\n")
            print("Now Running Task 9 i.e Creating Tables for BlackFriday and Storing data in the postgres table..")

            table_create_string = """ 
            DROP TABLE IF EXISTS aapl_tbl;
            CREATE TABLE IF NOT EXISTS aapl_tbl (
            datecol DATE, 
            opencol numeric(20,8),
            highcol numeric(20,8),
            lowcol numeric(20,8),
            closecol numeric(20,8),
            adjclosecol numeric(20,8),
            volume numeric(20,8)
            ); """
            
            dbConnstr = psycopg2.connect(
                    user = self.user,
                    password = self.password,
                    host = self.host,
                    port = self.port,
                    database = self.database
            
            )
            
            # This will do a autocommit    
            dbConnstr.set_isolation_level(0)
            # The following code will create a cursor:
            dbCursor = dbConnstr.cursor() 
            # The following code will create the table in the respective database:
            dbCursor.execute(table_create_string)  
            # Print the success message of table creation:
            print("Table has been created successfully")
            
            
            # Now  Lets read the cleaned csv data and convert it to dataframe
            # Path to the CSV file 
            apple_cleaned_data = self.dataSetPath
            # Read the CSV file into a 
            appleDF = pd.read_csv(apple_cleaned_data) 
            print("\n")
            print("apple DF Cleaned CSV retreived and read the CSV Dataset successfully")
            
            print(appleDF.head(3)) 
            appleDF.reset_index(drop=True, inplace=True)

            appleDF.rename(columns={'Date': 'datecol','Open': 'opencol', 'High': 'highcol','Low': 'lowcol','Close': 'closecol',
                      'Adj Close': 'adjclosecol','Volume': 'volume'}, inplace=True)
            
    

            
            # My target table name :
            table_name = 'aapl_tbl'
            # here below we are creating the insert query:
            insert_query = sql.SQL("INSERT INTO {} ({}) VALUES ({})").format(
                sql.Identifier(table_name),
                sql.SQL(', ').join(map(sql.Identifier, appleDF.columns)),
                sql.SQL(', ').join(sql.Placeholder() * len(appleDF.columns))
            )
            # Converting the data from dataframe into list:
            values = [tuple(x) for x in appleDF.to_numpy()]
            # Insert the values into the table:
            dbCursor.executemany(insert_query, values)
            # This will do a autocommit    
            dbConnstr.set_isolation_level(0) 
            
            print("All the data has been saved to the aapl_tbl tables successfully")
            
        except (Exception , psycopg2.Error) as dbError :
            print ("An error has occured while connecting to the PostgreSQL database: ", dbError)
        finally:
            # Closing the postgres connection:
            if dbConnstr in locals():
                dbConnstr.close()            

    
    
    
if __name__ == '__main__':
    # Define the MongoDB Atlas URI
#     uri = 'mongodb+srv://tvoneplus513:hzH5o78u2i5GxoYd@cluster0.7i7ifgj.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0'
#     replace above with below connection string
    uri = "mongodb+srv://DAAP:ncidaap@atlascluster.tlybx6o.mongodb.net/?retryWrites=true&w=majority&appName=AtlasCluster"
    
    postgres_uri = "postgresql://dap:dap@localhost:5432/dap"
    
    # Create an instance of the Luigi daap_task to save JSON data to 'black_fri' collection
    daap_task1 = fromJsonToMongo(
        myJsonFileloc='DATASETS/black_friday.json', 
        myconnString=uri,
        dataBsName='black_fri_db',
        collnName = 'black_fri',
        taskno = '1'
    )
  
    # Create an instance of the Luigi daap_task to save another JSON data to 'super_sls' collection
    daap_task2 = fromJsonToMongo(
        myJsonFileloc='DATASETS/superStore_analysis.json',
        myconnString=uri,
        dataBsName='superstore_sls_db',
        collnName='super_sls',
        taskno = '2'
    )
    
    daap_task3 = readDatafrmAPI()
    
    daap_task4 = retrieveDataAndCreateDataFramesuperstore(
        myconnString=uri,
        dataBsName='superstore_sls_db',
        collnName='super_sls',
    )
    
    daap_task5 = retrieveDataAndCreateDataFrameBlackFri(
        myconnString=uri,
        dataBsName='black_fri_db',
        collnName='black_fri',
    )
    
    daap_task6 = createDatabaseinPostgres(
            user = "dap",
            password = "dap", 
            host = "127.0.0.1",
            port = "5432",
            database = "postgres")
    

    daap_task7 = saveSuperSalesDatatoPostgres(
        dataSetPath='outputDataframes/superStore_cleaned_data.csv',
        PostGres_table_name='superStoreSales',
        postgres_uri=postgres_uri,
            user = "dap",
                    password = "dap",
                    host = "127.0.0.1",
                    port = "5432",
                    database = "daapproj"
        
    )
    
    daap_task8 = saveBlackFridayDatatoPostgres(
        dataSetPath= 'outputDataframes/black_friday_cleaned_data.csv',
        PostGres_table_name='superStoreSales',
        postgres_uri=postgres_uri,
        user = 'dap',
                    password = 'dap',
                    host = "127.0.0.1",
                    port = "5432",
                    database = "daapproj"
    )
    
    daap_task9 = saveAppleStockDatatoPostgres(
        dataSetPath="outputDataframes/apple_cleaned_data.csv",
        PostGres_table_name='superStoreSales',
        postgres_uri=postgres_uri,
        user = "dap",
                    password = "dap",
                    host = "127.0.0.1",
                    port = "5432",
                    database = "daapproj"
    
    )
    
    # Run the Luigi daap_tasks using the local scheduler
#     luigi.build([daap_task5 ,daap_task6,daap_task7,daap_task8,daap_task9], local_scheduler=True)
#     luigi.build([daap_task1], local_scheduler=True)
    
    luigi.build([daap_task1, daap_task2, daap_task3, daap_task4,daap_task5 ,daap_task6, daap_task7,daap_task8,daap_task9], local_scheduler=True)

            
        
            

DEBUG: Checking if fromJsonToMongo(myJsonFileloc=DATASETS/black_friday.json, myconnString=mongodb+srv://DAAP:ncidaap@atlascluster.tlybx6o.mongodb.net/?retryWrites=true&w=majority&appName=AtlasCluster, dataBsName=black_fri_db, collnName=black_fri, taskno=1) is complete
INFO: Informed scheduler that task   fromJsonToMongo_black_fri_black_fri_db_DATASETS_black_f_8e9f89a7e2   has status   PENDING
DEBUG: Checking if fromJsonToMongo(myJsonFileloc=DATASETS/superStore_analysis.json, myconnString=mongodb+srv://DAAP:ncidaap@atlascluster.tlybx6o.mongodb.net/?retryWrites=true&w=majority&appName=AtlasCluster, dataBsName=superstore_sls_db, collnName=super_sls, taskno=2) is complete
INFO: Informed scheduler that task   fromJsonToMongo_super_sls_superstore_sls_d_DATASETS_superSt_c19d969d44   has status   PENDING
DEBUG: Checking if readDatafrmAPI() is complete
INFO: Informed scheduler that task   readDatafrmAPI__99914b932b   has status   PENDING
DEBUG: Checking if retrieveDataAndCreateDataFramesupersto



Now Running Task 1 of Reading the json & saving it to MongoDB
Now Inserting data in black_fri collection...


INFO: [pid 75041] Worker Worker(salt=9769424971, workers=1, host=Anikets-MacBook-Air.local, username=aniketshetty, pid=75041) done      fromJsonToMongo(myJsonFileloc=DATASETS/black_friday.json, myconnString=mongodb+srv://DAAP:ncidaap@atlascluster.tlybx6o.mongodb.net/?retryWrites=true&w=majority&appName=AtlasCluster, dataBsName=black_fri_db, collnName=black_fri, taskno=1)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   fromJsonToMongo_black_fri_black_fri_db_DATASETS_black_f_8e9f89a7e2   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 8
INFO: [pid 75041] Worker Worker(salt=9769424971, workers=1, host=Anikets-MacBook-Air.local, username=aniketshetty, pid=75041) running   fromJsonToMongo(myJsonFileloc=DATASETS/superStore_analysis.json, myconnString=mongodb+srv://DAAP:ncidaap@atlascluster.tlybx6o.mongodb.net/?retryWrites=true&w=majority&appName=AtlasCluster, dataBsName=superstore_sls_db, collnName=super_sls, taskno=2)


Records in black_fri collection: 550068 


Now Running Task 2 of Reading the json & saving it to MongoDB
Deleting existing data from superstore_sls_db in collection...
Now Inserting data in super_sls collection...


INFO: [pid 75041] Worker Worker(salt=9769424971, workers=1, host=Anikets-MacBook-Air.local, username=aniketshetty, pid=75041) done      fromJsonToMongo(myJsonFileloc=DATASETS/superStore_analysis.json, myconnString=mongodb+srv://DAAP:ncidaap@atlascluster.tlybx6o.mongodb.net/?retryWrites=true&w=majority&appName=AtlasCluster, dataBsName=superstore_sls_db, collnName=super_sls, taskno=2)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   fromJsonToMongo_super_sls_superstore_sls_d_DATASETS_superSt_c19d969d44   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 7
INFO: [pid 75041] Worker Worker(salt=9769424971, workers=1, host=Anikets-MacBook-Air.local, username=aniketshetty, pid=75041) running   readDatafrmAPI()


Records in super_sls collection: 9800 


INFO: [pid 75041] Worker Worker(salt=9769424971, workers=1, host=Anikets-MacBook-Air.local, username=aniketshetty, pid=75041) done      readDatafrmAPI()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   readDatafrmAPI__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 6
INFO: [pid 75041] Worker Worker(salt=9769424971, workers=1, host=Anikets-MacBook-Air.local, username=aniketshetty, pid=75041) running   retrieveDataAndCreateDataFramesuperstore(myconnString=mongodb+srv://DAAP:ncidaap@atlascluster.tlybx6o.mongodb.net/?retryWrites=true&w=majority&appName=AtlasCluster, dataBsName=superstore_sls_db, collnName=super_sls)


Now Running Task 3 i.e Reading Apple Stock Dataset from an API...
Apple Dataset: 
         Date  Adj Close     Close      High       Low      Open       Volume  \
0  1996-05-28   0.203497  0.235491  0.243304  0.235491  0.238839  101852800.0   
1  1996-05-29   0.191924  0.222098  0.234375  0.220982  0.234375  219520000.0   
2  1996-05-30   0.196746  0.227679  0.229911  0.220982  0.222098  103465600.0   
3  1996-05-31   0.201568  0.233259  0.237723  0.227679  0.228795  162646400.0   
4  1996-06-03   0.190959  0.220982  0.232143  0.220982  0.231027  125462400.0   

       MACD    Signal  Rolling mean Adj Close 20  ...  lag_41  lag_42  lag_43  \
0  0.002717  0.003506                   0.203907  ...    -1.0     1.0    -1.0   
1  0.001364  0.003077                   0.204100  ...     1.0    -1.0     1.0   
2  0.000673  0.002596                   0.204534  ...     1.0     1.0    -1.0   
3  0.000509  0.002179                   0.205450  ...    -1.0     1.0     1.0   
4 -0.000472  0.001649     

INFO: [pid 75041] Worker Worker(salt=9769424971, workers=1, host=Anikets-MacBook-Air.local, username=aniketshetty, pid=75041) done      retrieveDataAndCreateDataFramesuperstore(myconnString=mongodb+srv://DAAP:ncidaap@atlascluster.tlybx6o.mongodb.net/?retryWrites=true&w=majority&appName=AtlasCluster, dataBsName=superstore_sls_db, collnName=super_sls)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   retrieveDataAndCreateDataFramesuperstore_super_sls_superstore_sls_d_mongodb_srv___DA_0d3e3949b3   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 5
INFO: [pid 75041] Worker Worker(salt=9769424971, workers=1, host=Anikets-MacBook-Air.local, username=aniketshetty, pid=75041) running   retrieveDataAndCreateDataFrameBlackFri(myconnString=mongodb+srv://DAAP:ncidaap@atlascluster.tlybx6o.mongodb.net/?retryWrites=true&w=majority&appName=AtlasCluster, dataBsName=black_fri_db, collnName=black_fri)


We have retirevied data from mongoDB database and is saved in a Dataframe storeSlsDF 

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 9800 entries, 0 to 9799
Data columns (total 19 columns):
 #   Column         Non-Null Count  Dtype 
---  ------         --------------  ----- 
 0   _id            9800 non-null   object
 1   Row ID         9800 non-null   object
 2   Order ID       9800 non-null   object
 3   Order Date     9800 non-null   object
 4   Ship Date      9800 non-null   object
 5   Ship Mode      9800 non-null   object
 6   Customer ID    9800 non-null   object
 7   Customer Name  9800 non-null   object
 8   Segment        9800 non-null   object
 9   Country        9800 non-null   object
 10  City           9800 non-null   object
 11  State          9800 non-null   object
 12  Postal Code    9800 non-null   object
 13  Region         9800 non-null   object
 14  Product ID     9800 non-null   object
 15  Category       9800 non-null   object
 16  Sub-Category   9800 non-nul

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 550068 entries, 0 to 550067
Data columns (total 13 columns):
 #   Column                      Non-Null Count   Dtype 
---  ------                      --------------   ----- 
 0   _id                         550068 non-null  object
 1   User_ID                     550068 non-null  object
 2   Product_ID                  550068 non-null  object
 3   Gender                      550068 non-null  object
 4   Age                         550068 non-null  object
 5   Occupation                  550068 non-null  object
 6   City_Category               550068 non-null  object
 7   Stay_In_Current_City_Years  550068 non-null  object
 8   Marital_Status              550068 non-null  object
 9   Product_Category_1          550068 non-null  object
 10  Product_Category_2          550068 non-null  object
 11  Product_Category_3          550068 non-null  object
 12  Purchase                    550068 non-null  object
dtypes: object(13)
memory usage: 5

INFO: [pid 75041] Worker Worker(salt=9769424971, workers=1, host=Anikets-MacBook-Air.local, username=aniketshetty, pid=75041) done      retrieveDataAndCreateDataFrameBlackFri(myconnString=mongodb+srv://DAAP:ncidaap@atlascluster.tlybx6o.mongodb.net/?retryWrites=true&w=majority&appName=AtlasCluster, dataBsName=black_fri_db, collnName=black_fri)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   retrieveDataAndCreateDataFrameBlackFri_black_fri_black_fri_db_mongodb_srv___DA_138dda3970   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 4
INFO: [pid 75041] Worker Worker(salt=9769424971, workers=1, host=Anikets-MacBook-Air.local, username=aniketshetty, pid=75041) running   createDatabaseinPostgres(user=dap, password=dap, host=127.0.0.1, port=5432, database=postgres)


we have now completed Data cleaning 
And the cleaned has been data saved to 'outputDataframes/black_friday_cleaned_data.csv'




Now Running Task 6 i.e Creating a Database in Postgres...


INFO: [pid 75041] Worker Worker(salt=9769424971, workers=1, host=Anikets-MacBook-Air.local, username=aniketshetty, pid=75041) done      createDatabaseinPostgres(user=dap, password=dap, host=127.0.0.1, port=5432, database=postgres)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   createDatabaseinPostgres_postgres_127_0_0_1_dap_3dd8ae9895   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
INFO: [pid 75041] Worker Worker(salt=9769424971, workers=1, host=Anikets-MacBook-Air.local, username=aniketshetty, pid=75041) running   saveSuperSalesDatatoPostgres(dataSetPath=outputDataframes/superStore_cleaned_data.csv, PostGres_table_name=superStoreSales, postgres_uri=postgresql://dap:dap@localhost:5432/dap, user=dap, password=dap, host=127.0.0.1, port=5432, database=daapproj)


Database is successfully Created


Now Running Task 7 i.e Creating Tables for Superstore and storing data in the table..
Now Creating a Table for SuperStore Sales Dataset in postgres
Table has been created successfully
Read the CSV Dataset successfully
                        _id  Row ID        Order ID  Order Date   Ship Date  \
0  66313ac0d7d0d08e9e260043    7981  CA-2015-103800  2015-01-03  2015-01-07   
1  66313ac0d7d0d08e9e25e3fc     742  CA-2015-112326  2015-01-04  2015-01-08   
2  66313ac0d7d0d08e9e25e3fb     741  CA-2015-112326  2015-01-04  2015-01-08   
3  66313ac0d7d0d08e9e25e3fa     740  CA-2015-112326  2015-01-04  2015-01-08   
4  66313ac0d7d0d08e9e25e7f6    1760  CA-2015-141817  2015-01-05  2015-01-12   

        Ship Mode Customer ID  Customer Name      Segment        Country  \
0  Standard Class    DP-13000  Darren Powers     Consumer  United States   
1  Standard Class    PO-19195  Phillina Ober  Home Office  United States   
2  Standard Class    PO-19195  Phillina Ober

INFO: [pid 75041] Worker Worker(salt=9769424971, workers=1, host=Anikets-MacBook-Air.local, username=aniketshetty, pid=75041) done      saveSuperSalesDatatoPostgres(dataSetPath=outputDataframes/superStore_cleaned_data.csv, PostGres_table_name=superStoreSales, postgres_uri=postgresql://dap:dap@localhost:5432/dap, user=dap, password=dap, host=127.0.0.1, port=5432, database=daapproj)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   saveSuperSalesDatatoPostgres_superStoreSales_outputDataframes_daapproj_508bd73833   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 75041] Worker Worker(salt=9769424971, workers=1, host=Anikets-MacBook-Air.local, username=aniketshetty, pid=75041) running   saveBlackFridayDatatoPostgres(dataSetPath=outputDataframes/black_friday_cleaned_data.csv, PostGres_table_name=superStoreSales, postgres_uri=postgresql://dap:dap@localhost:5432/dap, user=dap, password=dap, host=127.0.0.1, port

All the data has been saved to the superstore_sls_tbl tables successfully


Now Running Task 8 i.e Creating Tables for BlackFriday and storing data in the postgres table..
Table has been created successfully
Read the CSV Dataset successfully
Black Friday Cleaned CSV retreived :
   User_ID Product_ID Gender   Age  Occupation City_Category  \
0  1000001  P00069042      F  0-17          10             A   
1  1000001  P00248942      F  0-17          10             A   
2  1000001  P00087842      F  0-17          10             A   

  Stay_In_Current_City_Years  Marital_Status  Product_Category_1  Purchase  
0                          2               0                   3      8370  
1                          2               0                   1     15200  
2                          2               0                  12      1422  
Column Names After Renaming the column from the dataset
Index(['user_id', 'product_id', 'gender', 'age', 'occupation', 'city_category',
       'stay_in_curr

INFO: [pid 75041] Worker Worker(salt=9769424971, workers=1, host=Anikets-MacBook-Air.local, username=aniketshetty, pid=75041) done      saveBlackFridayDatatoPostgres(dataSetPath=outputDataframes/black_friday_cleaned_data.csv, PostGres_table_name=superStoreSales, postgres_uri=postgresql://dap:dap@localhost:5432/dap, user=dap, password=dap, host=127.0.0.1, port=5432, database=daapproj)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   saveBlackFridayDatatoPostgres_superStoreSales_outputDataframes_daapproj_ba6d7395d5   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 75041] Worker Worker(salt=9769424971, workers=1, host=Anikets-MacBook-Air.local, username=aniketshetty, pid=75041) running   saveAppleStockDatatoPostgres(dataSetPath=outputDataframes/apple_cleaned_data.csv, PostGres_table_name=superStoreSales, postgres_uri=postgresql://dap:dap@localhost:5432/dap, user=dap, password=dap, host=127.0.0.1, port=543

All the data has been saved to the blackfri_tbl tables successfully


Now Running Task 9 i.e Creating Tables for BlackFriday and Storing data in the postgres table..
Table has been created successfully


apple DF Cleaned CSV retreived and read the CSV Dataset successfully
         Date      Open      High       Low     Close  Adj Close       Volume
0  1996-05-28  0.238839  0.243304  0.235491  0.235491   0.203497  101852800.0
1  1996-05-29  0.234375  0.234375  0.220982  0.222098   0.191924  219520000.0
2  1996-05-30  0.222098  0.229911  0.220982  0.227679   0.196746  103465600.0


INFO: [pid 75041] Worker Worker(salt=9769424971, workers=1, host=Anikets-MacBook-Air.local, username=aniketshetty, pid=75041) done      saveAppleStockDatatoPostgres(dataSetPath=outputDataframes/apple_cleaned_data.csv, PostGres_table_name=superStoreSales, postgres_uri=postgresql://dap:dap@localhost:5432/dap, user=dap, password=dap, host=127.0.0.1, port=5432, database=daapproj)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   saveAppleStockDatatoPostgres_superStoreSales_outputDataframes_daapproj_23d7f91d0f   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=9769424971, workers=1, host=Anikets-MacBook-Air.local, username=aniketshetty, pid=75041) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 9 tasks of which:
* 9 ran successfully:
    - 1 createDatabaseinPostgres(user=dap, password=dap, host=127.0.0.1, p

All the data has been saved to the aapl_tbl tables successfully
