### **Apply ETL Processes**:
  1. **Extract**: Crawl data from databases (data sources) and save as '.csv' files using Pandas and pyodbc for database connection.
  2. **Load**: Load these files into the data lake using Sqoop.
  3. **Extract**: Retrieve data from the data lake using Sqoop.
  4. **Transform**: Preprocess the data based on requirements and the data warehouse structure using Pandas and potentially Spark.
  5. **Load**: Load the ready-to-use data into the data warehouse.


In [1]:
import pandas as pd
import pyodbc # to connect databases
# import numpy as np # maybe needed for data transforming/cleaning 
from pysqoop.SqoopImport import Sqoop
import os # create directory to store data for each database
import subprocess # to run sqoop
from pyspark.sql import SparkSession # csv to Hdfs

## 1. **Extract**: Crawl data from databases (data sources) and save as '.csv' files using Pandas and pyodbc for database connection.

In [2]:
# connect to the (original) database
driver = '{ODBC Driver 17 for SQL Server}'
host = 'HOAIBAO'
database = 'Wisdom_rubberProject_Practice_3'
trusted_connection = 'yes'
# Create the connection string
connection_string = f"DRIVER={driver};SERVER={host};DATABASE={database};Trusted_Connection={trusted_connection}"
# because this is a local connection, so I chose to use 'trusted_connection'
# in many cases, consider using 'username' and 'password' to connect to the database
connection = pyodbc.connect(connection_string)

# get the table names in the database
sql_retrieve_table_names = "select name from sys.tables where name <> 'sysdiagrams';"
df_tableNames = pd.read_sql(sql_retrieve_table_names, connection)

data_dict = {}
for table in df_tableNames['name']:
    sql_retrieve_data = f"select * from [{table}]" # e.g: [plan]
    data_dict[table] = pd.read_sql(sql_retrieve_data, connection)
connection.close()

  df_tableNames = pd.read_sql(sql_retrieve_table_names, connection)
  data_dict[table] = pd.read_sql(sql_retrieve_data, connection)
  data_dict[table] = pd.read_sql(sql_retrieve_data, connection)
  data_dict[table] = pd.read_sql(sql_retrieve_data, connection)
  data_dict[table] = pd.read_sql(sql_retrieve_data, connection)
  data_dict[table] = pd.read_sql(sql_retrieve_data, connection)
  data_dict[table] = pd.read_sql(sql_retrieve_data, connection)
  data_dict[table] = pd.read_sql(sql_retrieve_data, connection)
  data_dict[table] = pd.read_sql(sql_retrieve_data, connection)
  data_dict[table] = pd.read_sql(sql_retrieve_data, connection)
  data_dict[table] = pd.read_sql(sql_retrieve_data, connection)
  data_dict[table] = pd.read_sql(sql_retrieve_data, connection)
  data_dict[table] = pd.read_sql(sql_retrieve_data, connection)
  data_dict[table] = pd.read_sql(sql_retrieve_data, connection)
  data_dict[table] = pd.read_sql(sql_retrieve_data, connection)
  data_dict[table] = pd.read_sql(sql

In [3]:
for tableName, data in data_dict.items():
    print("Table: " + tableName)

Table: Environment
Table: Drone
Table: DroneInformation
Table: DroneImage
Table: ChargingStation
Table: ChargingStatus
Table: Task
Table: Country
Table: Region
Table: Address
Table: Account
Table: UserInfo
Table: Field
Table: RubberTree
Table: RubberTreeInformation
Table: Plan
Table: PlanDetail
Table: Lidar
Table: Camera
Table: Radar
Table: SensorControlSystem
Table: Robot
Table: Energy
Table: RobotTapping
Table: Blade


In [4]:
# use 'os' module to create a new directory to store data from the database
try:
    # Create the directory
    os.mkdir(f"{database}")
    print(f"Directory '{database}' created successfully.")
    for tableName, data in data_dict.items():
        # print(type(data))
        data.to_csv(f"{database}/{tableName}.csv", index=False)
except FileExistsError:
    print(f"Directory '{database}' already exists.")

# after successfully run this code, a new directory '{database}/' should be appear.
# This directory contains all the data from the database as '.csv' format, can be useful for backup data

Directory 'Wisdom_rubberProject_Practice_3' already exists.


## 2. **Load**: Load these files into the data lake using Spark.

In [6]:
# NOTICE:

# Make sure that the HDFS is running
# First, open Command Prompt at the directory "\hadoop-3.3.0\sbin"
# Run as admminstrator
# Then use the command line "start-all.cmd" to start HDFS

# Create a 'user' directory in the Hadoop folder if it doesn't exist
# Run the command "hdfs dfs -mkdir -p /user/<your_username>/<data_contained_folder>"
# In this case, using directory "/user/hoaibao/data"
# In this code, I already run a 'os' function to create that directory


# Define local and HDFS directories
spark_local = SparkSession.builder.master('local[1]').appName("local_CSV_to_HDFS").getOrCreate()
local_directory = f'{database}/'
hdfs_directory  = f'hdfs://localhost:9001/user/hoaibao/data/{database}' # chech the 'yarn-site.xml' file to see the port, in this case, I setup to 9001, default usually is 9000

# Create the target directory in HDFS if it doesn't exist
os.system(f"hdfs dfs -mkdir -p {hdfs_directory}")

csv_files = [f for f in os.listdir(local_directory) if f.endswith('.csv')]

for csv_file in csv_files:
    local_path = os.path.join(local_directory, csv_file)
    print(local_path)
    # read csv file to dataframe using Spark
    df = spark_local.read.option('header', True).format('csv').load(local_path) #  use the inferSchema option to automatically infer the data types of the columns in your CSV file
    # Define HDFS path
    # hdfs_path = os.path.join(hdfs_directory, os.path.splitext(csv_file)[0])
    # print(hdfs_path)
    hdfs_path = f"{hdfs_directory}/{os.path.splitext(csv_file)[0]}"
    print(f"Writing to HDFS path: {hdfs_path}")
    
    # write dataframe to HDFS
    # df.coalesce(1).write.mode('overwrite').option('header','true').csv('hdfs://path/df.csv')
    df.coalesce(1).write.mode('overwrite').option('header', True).csv(hdfs_path) # "overwrite" option: always replace existed data in the hdfs (if there is) with the latest data
    # alternative modes: 'append', 'ignore', 'error'
    
    print(f'uploaded {csv_file} to {hdfs_path}')

spark_local.stop()
        
print('Upload completed.')
print('Successfully uploaded files to HDFS.')

Wisdom_rubberProject_Practice_3/Account.csv
Writing to HDFS path: hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Account
uploaded Account.csv to hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Account
Wisdom_rubberProject_Practice_3/Address.csv
Writing to HDFS path: hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Address
uploaded Address.csv to hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Address
Wisdom_rubberProject_Practice_3/Blade.csv
Writing to HDFS path: hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Blade
uploaded Blade.csv to hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Blade
Wisdom_rubberProject_Practice_3/Camera.csv
Writing to HDFS path: hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Camera
uploaded Camera.csv to hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Camera
Wisdom_rubbe

In [73]:
# spark_local.stop()
# sometime the spark session is not stopped due to the malfunction of the code, check it carefully

# After successfully write data into the HDFS, we can verify the data by using this command : "hdfs dfs -ls hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3"
# It should print out a list of contents inside the directory

## 3. **Extract**: Retrieve data from the data lake using Sqoop.

In [23]:
hdfs_files = subprocess.check_output(['hdfs', 'dfs', '-ls', hdfs_directory], shell=True).decode('utf-8').split('\n')

In [24]:
hdfs_files

['Found 25 items\r',
 'drwxr-xr-x   - tranh supergroup          0 2024-07-23 12:56 hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Account\r',
 'drwxr-xr-x   - tranh supergroup          0 2024-07-23 12:56 hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Address\r',
 'drwxr-xr-x   - tranh supergroup          0 2024-07-23 12:56 hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Blade\r',
 'drwxr-xr-x   - tranh supergroup          0 2024-07-23 12:56 hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Camera\r',
 'drwxr-xr-x   - tranh supergroup          0 2024-07-23 12:56 hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/ChargingStation\r',
 'drwxr-xr-x   - tranh supergroup          0 2024-07-23 12:56 hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/ChargingStatus\r',
 'drwxr-xr-x   - tranh supergroup          0 2024-07-23 12:56 hdfs://localhost:9001/user/ho

In [27]:
hdfs_files[0]

'Found 25 items\r'

In [28]:
# Extract directory paths
directories = [line.split()[-1] for line in hdfs_files if line and line.startswith('drwx')]
print("Directories found:", directories)

Directories found: ['hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Account', 'hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Address', 'hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Blade', 'hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Camera', 'hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/ChargingStation', 'hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/ChargingStatus', 'hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Country', 'hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Drone', 'hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/DroneImage', 'hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/DroneInformation', 'hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Energy', 'hdfs://localhost:9001/user/hoaibao/data/Wisdo

In [29]:
directories

['hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Account',
 'hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Address',
 'hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Blade',
 'hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Camera',
 'hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/ChargingStation',
 'hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/ChargingStatus',
 'hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Country',
 'hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Drone',
 'hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/DroneImage',
 'hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/DroneInformation',
 'hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Energy',
 'hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubber

In [59]:
spark_local = SparkSession.builder.master('local[1]').appName('readHDFS').getOrCreate()

df = spark_local.read.csv('hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Account/part-00000-4d5ca4a3-29ac-4c15-a5f9-64629d8a0697-c000.csv')
df.show()
spark_local.stop()

+---------+--------+---------+-----+
|      _c0|     _c1|      _c2|  _c3|
+---------+--------+---------+-----+
|AccountID|Username| Password| Role|
|        1|   user1|password1|Admin|
|        2|   user2|password2| User|
|        3|   user3|password3| User|
|        4|   user4|password4| User|
|        5|   user5|password5| User|
+---------+--------+---------+-----+



In [79]:
hdfs_data = subprocess.check_output(['hdfs', 'dfs', '-ls', hdfs_directory], shell=True).decode('utf-8').split('\n')
# e.g -> 'drwxr-xr-x   - tranh supergroup          0 2024-07-23 12:56 hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Account\r'
hdfs_directories = [line.split()[-1] for line in hdfs_data if line.startswith('drwxr')]
# e.g -> 'hdfs://localhost:9001/user/hoaibao/data/Wisdom_rubberProject_Practice_3/Account'
spark_local = SparkSession.builder.master('local[1]').appName('readHDFS').getOrCreate()
print('start spark session')

data_dict = {}
for directory in hdfs_directories:
    contentFile = subprocess.check_output(f'hdfs dfs -ls {directory}', shell=True)
    # e.g: b'Found 2 items\r\n-rw-r--r--   3 tranh supergroup          0 2024-07-23 12:56 /user/hoaibao/data/Wisdom_rubberProject_Practice_3/Account/_SUCCESS\r\n-rw-r--r--   3 tranh supergroup        155 2024-07-23 12:56 /user/hoaibao/data/Wisdom_rubberProject_Practice_3/Account/part-00000-4d5ca4a3-29ac-4c15-a5f9-64629d8a0697-c000.csv\r\n'
    table_name = contentFile.split()[-1].split(b'/')[-2].decode('utf-8') # e.g: "Account"
    file_name  = contentFile.split()[-1].split(b'/')[-1].decode('utf-8') # e.g: "part-00000-4d5ca4a3-29ac-4c15-a5f9-64629d8a0697-c000.csv"
    print(table_name, file_name)
    data_dict[table_name] = spark_local.read.option('header', True).csv(f'{hdfs_directory}/{table_name}/{file_name}')
    
    # Next:
    # Option (1): Direct Import from Data Lake to SQL Data Warehouse
    # Perform data TRANSFORMATIONS AND LOAD DIRECTLY into the data warehouse WITHIN THIS LOOP (SPARK SESSION)
    #    - Advantages:
    #        * Immediate processing and loading without intermediate steps
    #        * Efficient for straightforward transformations
    #        * Saves storage space as intermediate files are not created
    #    - Disadvantages:
    #        * Less flexible for complex transformations
    #        * Can be challenging to debug if transformations fail
    #        * Limited to data sizes that can be handled in memory during a single session
    
    # Option (2): Staging Data Locally
    # SAVE DATA LOCALLY FIRST and perform transformations/loads later
    #    - Advantages:
    #        * Allows for more complex transformations to be applied
    #        * Easier to debug transformation steps individually
    #        * Suitable for very large datasets that may not fit into memory at once
    #        * Provides the ability to re-process data without re-reading from HDFS
    #    - Disadvantages:
    #        * Requires additional storage space for intermediate files
    #        * Involves additional I/O operations, which can slow down the process
    #        * May involve more steps in the workflow, making it more complex to manage
    # ----------------------------------------------------------------------------------
    # Choose one of the options based on the above considerations
    # Example for Option 1:
    # transformed_df = data_dict[table_name].withColumnRenamed("_c0", "Column1").withColumnRenamed("_c1", "Column2")
    # write_to_data_warehouse(transformed_df, table_name)
    
    # Example for Option 2:
    # local_path = f'/local/path/{table_name}.csv'
    # data_dict[table_name].write.option("header", True).csv(local_path)

    # 
    
spark_local.stop()
print('stop spark session')


start spark session
Account part-00000-4d5ca4a3-29ac-4c15-a5f9-64629d8a0697-c000.csv
Address part-00000-c4d2362c-a356-4ee2-8321-57b949aabe3d-c000.csv
Blade part-00000-2f6e9574-3d5c-468c-ae20-a27179088604-c000.csv
Camera part-00000-2663e7c3-ad22-4f2a-9218-b9181e279b38-c000.csv
ChargingStation part-00000-210704ea-0600-4fc8-9187-45e457032179-c000.csv
ChargingStatus part-00000-56644563-ab44-43d0-81d8-65fc06882898-c000.csv
Country part-00000-ea0b38e5-f487-40e2-9778-a815d46fe58b-c000.csv
Drone part-00000-7e1c8286-31ef-4af8-b62d-2c14e2873733-c000.csv
DroneImage part-00000-4a3d47ba-72b7-41dc-8eae-e2182c002f5a-c000.csv
DroneInformation part-00000-d0c0e460-d9e0-4346-8e40-d5f849e93518-c000.csv
Energy part-00000-d53a0855-bada-4d0f-9691-5f78922095fb-c000.csv
Environment part-00000-17f87e2b-76f1-4f79-8cb8-ee56464d64e5-c000.csv
Field part-00000-8514c4d3-f9c8-4cc9-9d62-721c028c1afa-c000.csv
Lidar part-00000-1921fb6c-4d7d-4469-a7ca-09f0ddaabb77-c000.csv
Plan part-00000-00f60b46-86ac-40b1-a65a-741d56f91

In [81]:
data_dict['Account']

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string]

## 4. **Transform**: Preprocess the data based on requirements and the data warehouse structure using Pandas and potentially Spark.

## 5. **Load**: Load the ready-to-use data into the data warehouse.