In [None]:
pip install faker

Collecting faker
  Downloading Faker-26.0.0-py3-none-any.whl.metadata (15 kB)
Downloading Faker-26.0.0-py3-none-any.whl (1.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m18.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: faker
Successfully installed faker-26.0.0


In [None]:
pip install pyspark



In [None]:
import random
from datetime import datetime
from faker import Faker
from pyspark.sql import SparkSession

# Initialize Faker and SparkSession
fake = Faker()
spark = SparkSession.builder \
    .appName("DataGenerationAndIntegration") \
    .getOrCreate()

# List of provinces and other constants
provinces = ["Hà Nội", "Hồ Chí Minh", "Hải Phòng", "Đà Nẵng", "Cần Thơ", "An Giang", "Bà Rịa - Vũng Tàu", "Bắc Giang", "Bắc Kạn",
             "Bạc Liêu", "Bắc Ninh", "Bến Tre", "Bình Định", "Bình Dương", "Bình Phước", "Bình Thuận", "Cà Mau", "Cao Bằng",
             "Đắk Lắk", "Đắk Nông", "Điện Biên", "Đồng Nai", "Đồng Tháp", "Gia Lai", "Hà Giang", "Hà Nam", "Hà Tĩnh", "Hải Dương",
             "Hậu Giang", "Hòa Bình", "Hưng Yên", "Khánh Hòa", "Kiên Giang", "Kon Tum", "Lai Châu", "Lào Cai", "Lạng Sơn", "Lâm Đồng",
             "Long An", "Nam Định", "Nghệ An", "Ninh Bình", "Ninh Thuận", "Phú Thọ", "Phú Yên", "Quảng Bình", "Quảng Nam", "Quảng Ngãi",
             "Quảng Ninh", "Quảng Trị", "Sóc Trăng", "Sơn La", "Tây Ninh", "Thái Bình", "Thái Nguyên", "Thanh Hóa", "Thừa Thiên - Huế",
             "Tiền Giang", "Trà Vinh", "Tuyên Quang", "Vĩnh Long", "Vĩnh Phúc", "Yên Bái"]  # Add more provinces as needed
vehicle_types = ["Car", "Motorcycle", "Truck"]
plate_colors = ["White", "Blue", "Red"]
violations = ["Speeding", "Running a red light", "Illegal parking"]

# Function to generate citizen data
def generate_citizens(num_records):
    citizens = []
    for _ in range(num_records):
        personal_id = random.randint(10**11, 10**12 - 1)  # 12-digit ID
        citizen_name = fake.name()
        birth_date = fake.date_of_birth(minimum_age=18, maximum_age=80)
        province = random.choice(provinces)
        points = random.randint(0, 12)
        citizens.append((personal_id, citizen_name, birth_date, province, points))
    return citizens

# Generate and create DataFrame for citizen data
citizen_data = generate_citizens(20000)
citizen_df = spark.createDataFrame(citizen_data, schema=["personal_id", "citizen_name", "birth_date", "province", "points"])
citizen_df.createOrReplaceTempView("citizen_data_view")

# Function to generate driver license data
def generate_driver_licenses(citizen_ids, ratio=0.6):
    licenses = []
    for citizen_id in citizen_ids:
        if random.random() < ratio:
            license_number = fake.bothify(text="??######")
            issue_date = fake.date_between(start_date="-10y", end_date="today")
            expiration_date = fake.date_between(start_date="today", end_date="+10y")
            licensing_authority = fake.company()
            licenses.append((citizen_id, license_number, issue_date, expiration_date, licensing_authority))
    return licenses

# Generate and create DataFrame for driver license data
citizen_ids = [row.personal_id for row in citizen_df.collect()]
driver_license_data = generate_driver_licenses(citizen_ids)
driver_license_df = spark.createDataFrame(driver_license_data, schema=["personal_id", "license_number", "issue_date", "expiration_date", "licensing_authority"])
driver_license_df.createOrReplaceTempView("driver_license_view")

# Function to generate vehicle registration data
def generate_vehicle_registrations(citizen_ids, ratio=0.5):
    registrations = []
    for citizen_id in citizen_ids:
        if random.random() < ratio:
            vehicleTypeName = random.choice(vehicle_types)
            brand = fake.company()
            modelCode = fake.bothify(text="??-####")
            licensePlates = fake.bothify(text="??-######")
            engine = fake.bothify(text="E######")
            registrationDate = fake.date_between(start_date="-10y", end_date="today")
            ownerName = fake.name()
            issueAgency = fake.company()
            chassis = fake.bothify(text="C######")
            registrationCertificateNumber = fake.bothify(text="RCN######")
            expireDate = fake.date_between(start_date="today", end_date="+10y")
            vehicleColor = fake.color_name()
            range_usage = random.choice(["Short-range", "Long-range"])
            colorPlatesCode = random.randint(0, 2)
            province = random.choice(provinces)
            district = fake.city()
            village = fake.street_name()
            address = fake.address()
            registrations.append((citizen_id, vehicleTypeName, brand, modelCode, licensePlates, engine, registrationDate, ownerName, issueAgency, chassis, registrationCertificateNumber, expireDate, vehicleColor, range_usage, citizen_id, colorPlatesCode, province, district, village, address))
    return registrations

# Generate and create DataFrame for vehicle registration data
vehicle_registration_data = generate_vehicle_registrations(citizen_ids)
vehicle_registration_df = spark.createDataFrame(vehicle_registration_data, schema=["owner_id", "vehicleTypeName", "brand", "modelCode", "licensePlates", "engine", "registrationDate", "ownerName", "issueAgency", "chassis", "registrationCertificateNumber", "expireDate", "vehicleColor", "range", "citizenPid", "colorPlatesCode", "province", "district", "village", "address"])
vehicle_registration_df.createOrReplaceTempView("vehicle_registration_view")

# Function to generate traffic violation data
def generate_traffic_violations(citizen_ids, driver_license_data, vehicle_registration_data):
    violations_data = []
    for citizen_id in citizen_ids:
        has_license = any(license[0] == citizen_id for license in driver_license_data)
        has_vehicle = any(vehicle[0] == citizen_id for vehicle in vehicle_registration_data)

        violation_chance = 0.3 if has_license else 0.6

        if random.random() < violation_chance:
            name_violation = random.choice(violations)
            vehicle_type_violation = random.choice(vehicle_types)
            registration_number = next((v[3] for v in vehicle_registration_data if v[0] == citizen_id), None)
            license_number = next((l[1] for l in driver_license_data if l[0] == citizen_id), None)
            impound_DL_unit = random.choice(provinces)
            impound_DL_start_date = fake.date_between(start_date="-2y", end_date="today")
            impound_DL_end_date = fake.date_between(start_date=impound_DL_start_date, end_date="+30d")
            revocation_DL_start_date = fake.date_between(start_date="-2y", end_date="today")
            revocation_DL_end_date = fake.date_between(start_date=revocation_DL_start_date, end_date="+30d")
            impound_VR_start_date = fake.date_between(start_date="-2y", end_date="today")
            impound_VR_end_date = fake.date_between(start_date=impound_VR_start_date, end_date="+30d")
            revocation_VR_start_date = fake.date_between(start_date="-2y", end_date="today")
            revocation_VR_end_date = fake.date_between(start_date=revocation_VR_start_date, end_date="+30d")
            impound_VR_unit = random.choice(provinces)
            points = random.randint(0, 12)
            violations_data.append((citizen_id, name_violation, vehicle_type_violation, registration_number, license_number, impound_DL_unit, impound_DL_start_date, impound_DL_end_date, revocation_DL_start_date, revocation_DL_end_date, impound_VR_start_date, impound_VR_end_date, revocation_VR_start_date, revocation_VR_end_date, impound_VR_unit, points))
    return violations_data

# Generate and create DataFrame for traffic violation data
traffic_violation_data = generate_traffic_violations(citizen_ids, driver_license_data, vehicle_registration_data)
traffic_violation_df = spark.createDataFrame(traffic_violation_data, schema=["owner_id", "name_violation", "vehicle_type_violation", "registration_number", "license_number", "impound_DL_unit", "impound_DL_start_date", "impound_DL_end_date", "revocation_DL_start_date", "revocation_DL_end_date", "impound_VR_start_date", "impound_VR_end_date", "revocation_VR_start_date", "revocation_VR_end_date", "impound_VR_unit", "points"])
traffic_violation_df.createOrReplaceTempView("traffic_violation_view")

# Save data to CSV files
citizen_df.write.option("header", "true").csv("citizen_data_large.csv")
driver_license_df.write.option("header", "true").csv("driver_license_large.csv")
vehicle_registration_df.write.option("header", "true").csv("vehicle_registration_large.csv")
traffic_violation_df.write.option("header", "true").csv("traffic_violation_large.csv")

print("Data has been generated and saved to the corresponding CSV files.")


Data has been generated and saved to the corresponding CSV files.


In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=c7a8600c96034095932dd6edf5ee2c2adaa987249b92f1a07e84759262358402
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [1]:
from pyspark.sql import SparkSession

# Khởi tạo SparkSession
spark = SparkSession.builder \
    .appName("GlobalAsViewExample") \
    .getOrCreate()


ModuleNotFoundError: No module named 'pyspark'

In [None]:
# Đọc dữ liệu từ các tệp CSV
driver_license_df = spark.read.option("header", "true").csv("driver_license_large.csv")
vehicle_registration_df = spark.read.option("header", "true").csv("vehicle_registration_large.csv")
citizen_data_df = spark.read.option("header", "true").csv("citizen_data_large.csv")
traffic_violation_df = spark.read.option("header", "true").csv("traffic_violation_large.csv")


In [None]:
# Tạo các view cục bộ cho từng bảng dữ liệu
driver_license_df.createOrReplaceTempView("driver_license_view")
vehicle_registration_df.createOrReplaceTempView("vehicle_registration_view")
citizen_data_df.createOrReplaceTempView("citizen_data_view")
traffic_violation_df.createOrReplaceTempView("traffic_violation_view")


In [None]:
#GAV

In [None]:
# Đọc dữ liệu từ các tệp CSV
driver_license_df = spark.read.option("header", "true").csv("driver_license_large.csv")
vehicle_registration_df = spark.read.option("header", "true").csv("vehicle_registration_large.csv")
citizen_data_df = spark.read.option("header", "true").csv("citizen_data_large.csv")
traffic_violation_df = spark.read.option("header", "true").csv("traffic_violation_large.csv")

# Tạo các GLOBAL temporary view cho từng bảng dữ liệu
# These views will be accessible to the CREATE VIEW statement
driver_license_df.write.mode("overwrite").saveAsTable("driver_licenses")
vehicle_registration_df.write.mode("overwrite").saveAsTable("vehicle_registrations")
citizen_data_df.write.mode("overwrite").saveAsTable("citizen_data")
traffic_violation_df.write.mode("overwrite").saveAsTable("traffic_violations")

# Định nghĩa view toàn cục
CitizenLicenseVehicleInfo = """
   CREATE OR REPLACE VIEW CitizenLicenseVehicleInfo AS
SELECT
    C.personal_id,
    C.citizen_name,
    C.birth_date,
    C.points,
    L.license_number,
    L.issue_date,
    L.expiration_date AS license_expiration_date,
    L.licensing_authority,
    V.licensePlates,
    V.vehicleTypeName AS vehicle_type,
    V.brand,
    V.modelCode AS model_code,
    V.engine AS engine_number,
    V.chassis AS chassis_number,
    V.registrationCertificateNumber AS registration_certificate_number,
    V.expireDate AS registration_expiration_date,
    V.range AS usage_range,
    V.colorPlatesCode AS color_plates_code
FROM
    citizen_data C
LEFT JOIN driver_licenses L ON C.personal_id = L.personal_id
LEFT JOIN vehicle_registrations V ON C.personal_id = V.owner_id;

"""
TrafficViolationInfo = """
    CREATE OR REPLACE VIEW TrafficViolationInfo AS
SELECT
    T.owner_id AS personal_id,
    T.registration_number,
    T.name_violation,
    T.vehicle_type_violation,
    T.impound_DL_start_date,
    T.impound_DL_end_date,
    T.revocation_DL_start_date,
    T.revocation_VR_start_date,
    T.revocation_DL_end_date,
    T.impound_VR_start_date,
    T.impound_VR_end_date,
    T.impound_DL_unit,
    T.impound_VR_unit,
    T.points,
    T.revocation_VR_end_date
FROM
    traffic_violations T;
"""

# Tạo view toàn cục
spark.sql(CitizenLicenseVehicleInfo)  # Execute CREATE VIEW statements directly
spark.sql(TrafficViolationInfo)

# Truy vấn dữ liệu từ view toàn cục
result1 = spark.sql("SELECT * FROM CitizenLicenseVehicleInfo")
result1.show()
result2 = spark.sql("SELECT * FROM TrafficViolationInfo")
result2.show()

# Lưu dữ liệu tích hợp vào tệp CSV
result1.toPandas().to_csv("global_schema_data_citizen.csv", index=False)
result2.toPandas().to_csv("global_schema_data_traffic.csv", index=False)

print("View toàn cục đã được tạo.")

In [None]:
import time;
GPLX="""SELECT
    C.citizen_name,
    C.license_number,
    C.issue_date,
    C.license_expiration_date,
    C.licensing_authority,

    T.impound_DL_start_date,
    T.impound_DL_end_date,
    T.impound_DL_start_date,
    T.revocation_DL_end_date,
    C.points
FROM
    CitizenLicenseVehicleInfo C
LEFT JOIN
    TrafficViolationInfo T ON C.personal_id = T.personal_id
WHERE
    C.personal_id = '173568146427';
"""
st_time=time.time();
result1 = spark.sql(GPLX)
print(time.time()-st_time)
result1.show()


0.041634559631347656
+--------------+--------------+----------+-----------------------+-------------------+---------------------+-------------------+---------------------+----------------------+------+
|  citizen_name|license_number|issue_date|license_expiration_date|licensing_authority|impound_DL_start_date|impound_DL_end_date|impound_DL_start_date|revocation_DL_end_date|points|
+--------------+--------------+----------+-----------------------+-------------------+---------------------+-------------------+---------------------+----------------------+------+
|Carolyn Howell|      DZ934501|2017-08-05|             2026-12-18|      Bradley-Cross|                 NULL|               NULL|                 NULL|                  NULL|     7|
+--------------+--------------+----------+-----------------------+-------------------+---------------------+-------------------+---------------------+----------------------+------+



In [None]:
DKX="""
SELECT
    C.citizen_name,
    C.registration_certificate_number,
    C.licensePlates,
    C.vehicle_type,
    C.color_plates_code,
    C.brand,
    C.model_code,
    C.engine_number,
    C.chassis_number,
    C.registration_certificate_number,
    C.registration_expiration_date,
    C.usage_range,
    C.color_plates_code,
    T.impound_VR_start_date,
    T.impound_VR_end_date,
    T.revocation_VR_start_date,
    T.revocation_VR_end_date,
    C.Points

FROM
    CitizenLicenseVehicleInfo C
LEFT JOIN
    TrafficViolationInfo T ON C.personal_id = T.personal_id
WHERE
    C.personal_id = '173568146427';
"""

st_time=time.time();
result2 = spark.sql(DKX)
print(time.time()-st_time)
result2.show()


0.061463117599487305
+--------------+-------------------------------+-------------+------------+-----------------+---------------+----------+-------------+--------------+-------------------------------+----------------------------+-----------+-----------------+---------------------+-------------------+------------------------+----------------------+------+
|  citizen_name|registration_certificate_number|licensePlates|vehicle_type|color_plates_code|          brand|model_code|engine_number|chassis_number|registration_certificate_number|registration_expiration_date|usage_range|color_plates_code|impound_VR_start_date|impound_VR_end_date|revocation_VR_start_date|revocation_VR_end_date|Points|
+--------------+-------------------------------+-------------+------------+-----------------+---------------+----------+-------------+--------------+-------------------------------+----------------------------+-----------+-----------------+---------------------+-------------------+---------------------

In [None]:
#LAV

In [None]:
driver_licenses_view="""
CREATE OR REPLACE VIEW driver_licenses_view AS
SELECT
    d.personal_id,
    d.license_number,
    d.issue_date,
    d.expiration_date AS license_expiration_date,
    d.licensing_authority
FROM driver_licenses d;
"""
vehicle_registrations_view="""
CREATE OR REPLACE VIEW vehicle_registrations_view AS
SELECT
    v.owner_id AS owner_id,
    v.registrationCertificateNumber,
    v.licensePlates,
    v.vehicleTypeName AS vehicle_type,
    v.colorPlatesCode,
    v.brand,
    v.modelCode AS model_code,
    v.engine AS engine_number,
    v.chassis AS chassis_number,
    v.registrationCertificateNumber AS registration_certificate_number,
    v.expireDate AS registration_expiration_date,
    v.range AS usage_range,
    v.colorPlatesCode AS color_plates_code
FROM vehicle_registrations v;
"""
TrafficViolationInfo_view = """
    CREATE OR REPLACE VIEW TrafficViolationInfo_view AS
SELECT
    t.owner_id AS personal_id,
    t.registration_number,
    t.impound_DL_end_date,
    t.revocation_DL_end_date,
    t.impound_VR_end_date,
    t.impound_DL_unit,
    t.revocation_VR_end_date,
    t.impound_VR_unit,
    t.points
FROM
    traffic_violations t;
"""
citizen_data_view="""
CREATE OR REPLACE VIEW citizen_data_view AS
SELECT
    c.personal_id,
    c.citizen_name,
    c.birth_date,
    c.points
FROM citizen_data c;
"""
spark.sql(driver_licenses_view);
spark.sql(vehicle_registrations_view);
spark.sql(TrafficViolationInfo_view);
spark.sql(citizen_data_view);

In [None]:
GPLX="""
SELECT
    C.citizen_name,
    D.license_number,
    D.issue_date AS license_issue_date,
    D.expiration_date AS license_expiration_date,
    D.licensing_authority,
    T.impound_DL_end_date,
    T.revocation_DL_end_date,
    T.points
FROM
    citizen_data C
LEFT JOIN driver_licenses D ON C.personal_id = D.personal_id
LEFT JOIN traffic_violations T ON C.personal_id = t.owner_id
WHERE
    C.personal_id = '173568146427';
"""

st_time=time.time();
result2 = spark.sql(GPLX)
print(time.time()-st_time)
result2.show()


0.015216588973999023
+--------------+--------------+------------------+-----------------------+-------------------+-------------------+----------------------+------+
|  citizen_name|license_number|license_issue_date|license_expiration_date|licensing_authority|impound_DL_end_date|revocation_DL_end_date|points|
+--------------+--------------+------------------+-----------------------+-------------------+-------------------+----------------------+------+
|Carolyn Howell|      DZ934501|        2017-08-05|             2026-12-18|      Bradley-Cross|               NULL|                  NULL|  NULL|
+--------------+--------------+------------------+-----------------------+-------------------+-------------------+----------------------+------+



In [None]:
DKX="""
SELECT
    C.citizen_name,
    D.license_number,
    D.issue_date AS license_issue_date,
    D.expiration_date AS license_expiration_date,
    D.licensing_authority,
    T.impound_DL_end_date,
    T.revocation_DL_end_date,
    T.points
FROM
    citizen_data_view C
LEFT JOIN driver_licenses_view D ON C.personal_id = D.personal_id
LEFT JOIN TrafficViolationInfo_view T ON C.personal_id = T.owner_id
WHERE
    C.personal_id = '173568146427';
"""

st_time=time.time();
result2 = spark.sql(GPLX)
print(time.time()-st_time)
result2.show()


0.019505739212036133
+--------------+--------------+------------------+-----------------------+-------------------+-------------------+----------------------+------+
|  citizen_name|license_number|license_issue_date|license_expiration_date|licensing_authority|impound_DL_end_date|revocation_DL_end_date|points|
+--------------+--------------+------------------+-----------------------+-------------------+-------------------+----------------------+------+
|Carolyn Howell|      DZ934501|        2017-08-05|             2026-12-18|      Bradley-Cross|               NULL|                  NULL|  NULL|
+--------------+--------------+------------------+-----------------------+-------------------+-------------------+----------------------+------+



In [None]:
DKX="""
SELECT
    C.citizen_name,
    D.license_number,
    D.issue_date AS license_issue_date,
    D.expiration_date AS license_expiration_date,
    D.licensing_authority,
    T.impound_DL_end_date,
    T.revocation_DL_end_date,
    T.points
FROM
    citizen_data_view C
LEFT JOIN driver_licenses_view D ON C.personal_id = D.personal_id
LEFT JOIN traffic_violations_view T ON C.personal_id = T.owner_id
WHERE
    C.personal_id = '173568146427';
"""

st_time=time.time();
result2 = spark.sql(GPLX)
print(time.time()-st_time)
result2.show()
