# Xây dựng hệ thống lưu trữ và phân tích dữ liệu lớn bằng Hadoop và Hive

## Bài toán : Phân tích dữ liệu sức khoẻ tâm thần

## Mục tiêu
- Lưu trữ dữ liệu sức khỏe trên hệ thống Hadoop.
- Sử dụng Hive để phân tích dữ liệu và trả lời các câu hỏi liên quan.
- Trình bày kết quả phân tích qua các công cụ trực quan.

## 1. Tạo dữ liệu giả lập
- Sử dụng thư viện random của python để tạo dữ liệu giả lập ngẫu nhiên từ list
- Sử dụng thư viện csv để ghi dữ liệu file csv
- Sử dụng thư viện tqdm hiển thị progress bar


In [None]:
import csv
import random
from tqdm import tqdm

def generate_random_name():
    first_names = ["John", "Jane", "Alex", "Emily", "Chris", "Katie", "Michael", "Sarah", "David", "Laura"]
    last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", "Davis", "Martinez", "Hernandez"]
    return f"{random.choice(first_names)} {random.choice(last_names)}"

def generate_random_address():
    streets = ["Main St", "High St", "Park Ave", "Oak St", "Pine St", "Maple Ave", "Cedar St", "Elm St", "Washington Ave", "Lake St"]
    street_number = random.randint(1, 9999)
    return f"{street_number} {random.choice(streets)}"

def generate_random_city():
    cities = ["New York", "Los Angeles", "Chicago", "Houston", "Phoenix", "Philadelphia", "San Antonio", "San Diego", "Dallas", "San Jose"]
    return random.choice(cities)

def generate_contact_row(row_id):
    name = generate_random_name()
    countries = [
        "United States", "Australia", "United Kingdom", "Canada", "France", "South Korea", "Japan", "Brazil", "India", "China"
    ]
    return [
        row_id,
        f"+{random.randint(1, 99)} {random.randint(1000000, 9999999)}",
        f"{name.lower().replace(' ', '.')}@gmail.com",
        generate_random_address(),
        generate_random_city(),
        random.choice(countries)
    ]

def generate_contact_data(file_name, a, b):
    with open(file_name, mode='w', newline='', encoding='utf-8') as file:
        writer = csv.writer(file)
        writer.writerow(["ContactID", "PhoneNumber", "Email", "Address", "City", "Country"])
        for i in tqdm(range(a, b + 1), desc="Generating contact data", unit="row"):
            writer.writerow(generate_contact_row(i))

if __name__ == "__main__":
    output_file = "contacts_data.csv"
    generate_contact_data(output_file, 1, 6)

- Dữ liệu mẫu với định dạng CSV

In [None]:
# ContactID,PhoneNumber,Email,Address,City,Country
# 1,+16 3053523,katie.davis@gmail.com,2434 Oak St,Phoenix,India
# 2,+92 1070191,chris.smith@gmail.com,3583 Maple Ave,Los Angeles,India
# 3,+71 2089826,laura.smith@gmail.com,6091 Park Ave,New York,United States
# 4,+76 2124015,alex.davis@gmail.com,9346 Elm St,Chicago,India
# 5,+94 6816887,sarah.miller@gmail.com,346 Elm St,Dallas,United Kingdom
# 6,+57 2178972,chris.johnson@gmail.com,7216 Main St,Dallas,Brazil

## 2. Chuyển từ định dạng CSV sang định dạng Parquet
- Dữ liệu được lưu trữ dưới dạng CSV được chuyển đổi sang định dạng Parquet để tối ưu hoá lưu trữ và truy vấn dữ liệu.

In [None]:
import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

input_dir = r'D:/SharedFolderVMware/Data'
output_dir = r'D:/SharedFolderVMware/Parquet'

os.makedirs(output_dir, exist_ok=True)

for file_name in os.listdir(input_dir):
    if file_name.endswith('.csv'):
        csv_file_path = os.path.join(input_dir, file_name)
        parquet_file_path = os.path.join(output_dir, file_name.replace('.csv', '.parquet'))
        
        df = pd.read_csv(csv_file_path)
        # match type
        if file_name == 'patients_data.csv':
            df['DateOfBirth'] = pd.to_datetime(df['DateOfBirth']).dt.date
        elif file_name == 'medical_history_data.csv':
            df['FamilyHistoryOfDepression'] = df['FamilyHistoryOfDepression'].astype('bool')
            df['HistoryOfSubstanceAbuse'] = df['HistoryOfSubstanceAbuse'].astype('bool')
        elif "hospital_visit_records" in file_name:
            df['VisitDate'] = pd.to_datetime(df['VisitDate']).dt.date

        table = pa.Table.from_pandas(df)
        pq.write_table(table, parquet_file_path, use_dictionary=False)
        
        print(f'Converted {csv_file_path} to {parquet_file_path}')

- Kiểm tra dữ liệu sau khi tạo đảm bảm khớp với cơ sở dữ liệu

In [None]:
import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

input_dir = r"D:/SharedFolderVMware/Parquet"

os.makedirs(input_dir, exist_ok=True)
for file_name in os.listdir(input_dir):
    parquet_file_path = os.path.join(input_dir, file_name)
    df = pq.read_table(parquet_file_path)
    print(df)

## Bước 3: Lưu trữ dữ liệu trên HDFS và Hive
- Sử dụng lệnh Hadoop để tải dữ liệu Parquet lên HDFS.

In [None]:
hdfs dfs -put *.parquet /user/fatcat/mental_health_data/

- Tạo bảng Hive từ dữ liệu Parquet.

In [None]:
CREATE TABLE patients (
    PatientID INT,
    FullName STRING,
    DateOfBirth DATE,
    MaritalStatus STRING,
    EducationLevel STRING,
    NumberOfChildren INT,
    Occupation STRING,
    Income STRING,
    ContactID INT
)
PARTITIONED BY (Gender STRING)
CLUSTERED BY (PatientID) INTO 20 BUCKETS
STORED AS PARQUET;

- Thêm dữ liệu vào table từ file Parquet

In [None]:
LOAD DATA INPATH '/user/fatcat/mental_health_data/patients_data.parquet' INTO TABLE patients;

- Thực hiện các truy vấn phân tích như tổng hợp, nhóm, và phân tích xu hướng.

In [None]:
-- Thống kê số bệnh nhân theo tuổi, giới tính, tình trạng hôn nhân và thu nhập
INSERT OVERWRITE DIRECTORY '/user/fatcat/export_data/query1'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
SELECT 
    Age,
    Gender,
    MaritalStatus,
    AVG(IncomeFloat) AS AvgIncome,
    COUNT(*) AS TotalRecords
FROM temp_p_vr_data
GROUP BY Age, Gender, MaritalStatus
ORDER BY Age, Gender, MaritalStatus;

## Bước 4: Trình bày kết quả
- Báo cáo chi tiết
- Video trình bày
- Powerpoint
- PowerBI