In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285387 sha256=d870c764324d770c53ec80b8d0b239657514d0813849f95ee5a14f050260ce3d
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


# Khai báo thư viện



In [24]:
import pandas as pd
import numpy as np
import datetime
import matplotlib.pyplot as plt
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, round, sum

# Đọc dữ liệu

In [6]:
spark = SparkSession.builder.appName('BigData').getOrCreate()

In [7]:
path = '/content/drive/MyDrive/Code/Hoctap/K1N3/BigData/BTL2/'
city_df = spark.read.csv(path+"country_UN_update.csv", header=True, inferSchema=True)
df = spark.read.csv(path+"data.csv", header=True, inferSchema=True)

In [8]:
city_df.limit(5).show()

+-----------+--------+---------+------------+------------+-------+----------------+---------+
|    Country|Latitude|Longitude|Commencement|Population_y|   Area|         Capital|elevation|
+-----------+--------+---------+------------+------------+-------+----------------+---------+
|Afghanistan| 34.5289|  69.1725|        1946|       41.13| 653000|           Kabul|   1800.0|
|    Albania| 41.3275|  19.8189|        1955|        2.78|  29000|          Tirane|    110.0|
|    Algeria| 36.7525|    3.042|        1962|        44.9|2382000|         Algiers|    186.0|
|    Andorra| 42.5078|   1.5211|        1993|        0.08|    470|Andorra la Vella|   1045.0|
|     Angola| -8.8368|  13.2343|        1976|       35.59|1247000|          Luanda|     75.0|
+-----------+--------+---------+------------+------------+-------+----------------+---------+



In [9]:
df.limit(5).show()

+-------------------+--------------+-------------------+-----------+--------------------+-------------+----+--------+-----------+------------+----------------+----------+--------------+--------------+---------------+--------------------------+----------------------+-------------+-----------------+-------------------------+----------------------+------+-----+
|               time|temperature_2m|relativehumidity_2m|dewpoint_2m|apparent_temperature|precipitation|rain|snowfall|weathercode|pressure_msl|surface_pressure|cloudcover|cloudcover_low|cloudcover_mid|cloudcover_high|et0_fao_evapotranspiration|vapor_pressure_deficit|windspeed_10m|winddirection_10m|soil_temperature_0_to_7cm|soil_moisture_0_to_7cm|is_day| City|
+-------------------+--------------+-------------------+-----------+--------------------+-------------+----+--------+-----------+------------+----------------+----------+--------------+--------------+---------------+--------------------------+----------------------+------------

# Mô tả dữ liệu
## Kích thước dữ liệu

In [10]:
print("Rows:", df.count())
print("Columns:", len(df.columns))

Rows: 7753968
Columns: 23


## Thông tin các thuộc tính

In [11]:
df.columns

['time',
 'temperature_2m',
 'relativehumidity_2m',
 'dewpoint_2m',
 'apparent_temperature',
 'precipitation',
 'rain',
 'snowfall',
 'weathercode',
 'pressure_msl',
 'surface_pressure',
 'cloudcover',
 'cloudcover_low',
 'cloudcover_mid',
 'cloudcover_high',
 'et0_fao_evapotranspiration',
 'vapor_pressure_deficit',
 'windspeed_10m',
 'winddirection_10m',
 'soil_temperature_0_to_7cm',
 'soil_moisture_0_to_7cm',
 'is_day',
 'City']

- time: Thời gian tại thời điểm đo đạc
- temperature_2m: Nhiệt độ không khí ở độ cao 2 mét so với mặt đất (°C)
- relativehumidity_2m: Độ ẩm tương đối ở độ cao 2 mét so với mặt đất (%)
- dewpoint_2m: Nhiệt độ điểm sương ở độ cao 2 mét so với mặt đất (°C)
- apparent_temperature: Nhiệt độ cảm nhận là nhiệt độ cảm nhận kết hợp yếu tố gió, độ ẩm tương đối và bức xạ mặt trời (°C)
- precipitation: Lượng kết tủa tổng (mưa, tuyết) của giờ trước. Dữ liệu được lưu trữ với độ chính xác 0,1 mm. Nếu dữ liệu lượng mưa được tính tổng thành tổng số tháng, có thể có những sai lệch nhỏ với tổng lượng mưa (mm)
- rain: Lượng mưa của giờ trước (mm)
- snowfall: Lượng tuyết rơi của giờ trước tính bằng centimet. Đối với lượng nước tương đương tính bằng milimét, chia cho 0.7 Ví dụ: 7 cm tuyết = 10 mm lượng nước tương đương lượng mưa (cm)
- weathercode: Điều kiện thời tiết dưới dạng mã thông dịch thời tiết WMO:
    + Mã 0: Thời tiết trong lành
    + Mã 1: Mây mỏng hoặc ít mây
    + Mã 2: Mây xám hoặc có mây
    + Mã 3: Mây dày
    + Mã 51: Có mưa
    + Mã 53: Mưa rào
    + Mã 55: Mưa đá
    + Mã 61: Có tuyết
    + Mã 63: Tuyết rơi
    + Mã 71: Có sấm sét
    + Mã 73: Sấm sét và mưa
    + Mã 65: Sấm sét và tuyết
    + Mã 75: Sấm sét và mưa đá
- pressure_msl: Áp suất không khí tại mực nước biển trung bình. Được sử dụng trong khí tượng học (hPa)
- surface_pressure: Áp suất tại bề mặt. Áp suất bề mặt giảm khi độ cao tăng (hPa)
- cloudcover: Tổng phần trăm diện tích độ che phủ của mây (%)
- cloudcover_low: Mây cấp thấp và sương mù từ mặt đất đến độ cao 2 km (%)
- cloudcover_mid: Mây tầm trung từ 2 đến 6 km (%)
- cloudcover_high: Mây tầm cao từ độ cao 6 km trở lên (%)
- et0_fao_evapotranspiration: ETo Tham chiếu Sự bay hơi của cỏ được tưới nước tốt. Dựa trên các phương trình Penman-Monteith FAO-56 ET₀ được tính từ nhiệt độ, tốc độ gió, độ ẩm và bức xạ mặt trời. Giả sử nước trong lòng đất không giới hạn. ET₀ thường được sử dụng để ước tính lượng tưới tiêu cần thiết cho cây trồng. (mm)
- vapor_pressure_deficit: Chênh lệch Áp suất Hơi nước (VPD) Đối với VPD cao (> 1,6), sự thải hơi nước của cây tăng lên. Đối với VPD thấp (<0,4), sự thải hơi giảm (kPa)
- windspeed_10m: Tốc độ gió ở độ cao 10m so với mặt đất (km/h)
- winddirection_10m: Hướng gió ở độ cao 10m so với mặt đất (độ) (0 độ bắt đầu từ N, chạy theo chiều kim đồng hồ)
- soil_temperature_0_to_7cm: Nhiệt độ trung bình của các lớp đất tại độ sâu 0-7m. (°C)
- soil_moisture_0_to_7cm: Hàm lượng nước trong lòng đất trung bình trong 1 m³ đất tại độ sâu 0-7m (m³)
- is_day: Là ban ngày hay không
- City: Thành phố, nơi đo đạc

## Kiểu dữ liệu

In [14]:
df.printSchema()

root
 |-- time: timestamp (nullable = true)
 |-- temperature_2m: double (nullable = true)
 |-- relativehumidity_2m: integer (nullable = true)
 |-- dewpoint_2m: double (nullable = true)
 |-- apparent_temperature: double (nullable = true)
 |-- precipitation: double (nullable = true)
 |-- rain: double (nullable = true)
 |-- snowfall: double (nullable = true)
 |-- weathercode: integer (nullable = true)
 |-- pressure_msl: double (nullable = true)
 |-- surface_pressure: double (nullable = true)
 |-- cloudcover: integer (nullable = true)
 |-- cloudcover_low: integer (nullable = true)
 |-- cloudcover_mid: integer (nullable = true)
 |-- cloudcover_high: integer (nullable = true)
 |-- et0_fao_evapotranspiration: double (nullable = true)
 |-- vapor_pressure_deficit: double (nullable = true)
 |-- windspeed_10m: double (nullable = true)
 |-- winddirection_10m: integer (nullable = true)
 |-- soil_temperature_0_to_7cm: double (nullable = true)
 |-- soil_moisture_0_to_7cm: double (nullable = true)
 |-

- **is_day**, **weathercode** là dữ liệu định tính được mã hoá thành số.
- **City** là dữ liệu định tính
- Các cột còn lại đều là dữ liệu định lượng.

## Thống kê cơ bản

In [16]:
df.describe().show()

+-------+------------------+-------------------+-----------------+--------------------+-------------------+-------------------+--------------------+------------------+------------------+-----------------+------------------+------------------+------------------+-----------------+--------------------------+----------------------+------------------+------------------+-------------------------+----------------------+-------------------+--------------------+
|summary|    temperature_2m|relativehumidity_2m|      dewpoint_2m|apparent_temperature|      precipitation|               rain|            snowfall|       weathercode|      pressure_msl| surface_pressure|        cloudcover|    cloudcover_low|    cloudcover_mid|  cloudcover_high|et0_fao_evapotranspiration|vapor_pressure_deficit|     windspeed_10m| winddirection_10m|soil_temperature_0_to_7cm|soil_moisture_0_to_7cm|             is_day|                City|
+-------+------------------+-------------------+-----------------+------------------

# Các vấn đề về dữ liệu
## Kiểu dữ liệu

In [15]:
df.dtypes

[('time', 'timestamp'),
 ('temperature_2m', 'double'),
 ('relativehumidity_2m', 'int'),
 ('dewpoint_2m', 'double'),
 ('apparent_temperature', 'double'),
 ('precipitation', 'double'),
 ('rain', 'double'),
 ('snowfall', 'double'),
 ('weathercode', 'int'),
 ('pressure_msl', 'double'),
 ('surface_pressure', 'double'),
 ('cloudcover', 'int'),
 ('cloudcover_low', 'int'),
 ('cloudcover_mid', 'int'),
 ('cloudcover_high', 'int'),
 ('et0_fao_evapotranspiration', 'double'),
 ('vapor_pressure_deficit', 'double'),
 ('windspeed_10m', 'double'),
 ('winddirection_10m', 'int'),
 ('soil_temperature_0_to_7cm', 'double'),
 ('soil_moisture_0_to_7cm', 'double'),
 ('is_day', 'int'),
 ('City', 'string')]

Không có vấn đề gì về kiểu dữ liệu

## Giá trị bị thiếu (Missing Value)

In [26]:
df_missing_value = df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns))

In [27]:
df_missing_value.show()

+----+--------------+-------------------+-----------+--------------------+-------------+----+--------+-----------+------------+----------------+----------+--------------+--------------+---------------+--------------------------+----------------------+-------------+-----------------+-------------------------+----------------------+------+----+
|time|temperature_2m|relativehumidity_2m|dewpoint_2m|apparent_temperature|precipitation|rain|snowfall|weathercode|pressure_msl|surface_pressure|cloudcover|cloudcover_low|cloudcover_mid|cloudcover_high|et0_fao_evapotranspiration|vapor_pressure_deficit|windspeed_10m|winddirection_10m|soil_temperature_0_to_7cm|soil_moisture_0_to_7cm|is_day|City|
+----+--------------+-------------------+-----------+--------------------+-------------+----+--------+-----------+------------+----------------+----------+--------------+--------------+---------------+--------------------------+----------------------+-------------+-----------------+-------------------------+-

- Không có giá trị nào bị null.
- Vì kiểu dữ liệu đa phần là kiểu số nên có thể dữ liệu null được thay thế bằng giá trị thay thế hoặc 0. Tuy nhiên khả năng gán 0 cho giá trị null khó có thể xảy ra do 0 cũng là giá trị có ý nghĩa, vì vậy người hoặc máy nhập liệu sẽ không gán 0 thay cho giá trị null. Còn lại thông thường sẽ chọn 1 giá trị ngoài khoảng giá trị của các cột (chẳng hạn cột về % từ 0-100, null sẽ thay thế bằng -1), tuy nhiên khi describe không thấy xuất hiện giá trị min hay max nào bất thường.
- Cột **City** không thể null do trong quá trình nối bảng đều có thêm giá trị của các thành phố

## Trùng lặp (Duplicate)
Cần kiểm tra xem cùng 1 thành phố, liệu có thời gian nào bi trùng nhau hay không

In [32]:
df.count() - df.dropDuplicates(subset=['City', 'time']).count()

0

Vậy không có thành phố nào bị lặp thời gian.

Kiểm tra khi bỏ cột **time**

In [34]:
duplicates = df.drop('time').groupBy(df.drop('time').columns).count()\
    .where(pyspark.sql.functions.col('count') > 1)

# Tính tổng số dòng trùng lặp
num_duplicates = duplicates.select(pyspark.sql.functions.sum('count')).collect()[0][0]

In [36]:
duplicates.show()

+--------------+-------------------+-----------+--------------------+-------------+----+--------+-----------+------------+----------------+----------+--------------+--------------+---------------+--------------------------+----------------------+-------------+-----------------+-------------------------+----------------------+------+--------------------+-----+
|temperature_2m|relativehumidity_2m|dewpoint_2m|apparent_temperature|precipitation|rain|snowfall|weathercode|pressure_msl|surface_pressure|cloudcover|cloudcover_low|cloudcover_mid|cloudcover_high|et0_fao_evapotranspiration|vapor_pressure_deficit|windspeed_10m|winddirection_10m|soil_temperature_0_to_7cm|soil_moisture_0_to_7cm|is_day|                City|count|
+--------------+-------------------+-----------+--------------------+-------------+----+--------+-----------+------------+----------------+----------+--------------+--------------+---------------+--------------------------+----------------------+-------------+----------------

Có 5 cặp có thời gian gần nhau (cách nhau 1h) và cùng thành phố. Vì vậy số liệu có thể giống nhau.

**Vậy không có giá trị nào bị trùng lặp**

## Giá trị ngoại lai (Outlier)

In [43]:
def IQR_Outlier(col_name):
    q1 = df.approxQuantile(col_name, [0.25], 0.0)[0]
    q3 = df.approxQuantile(col_name, [0.75], 0.0)[0]
    iqr = q3 - q1
    lower_bound = q1 - 1.5 * iqr
    upper_bound = q3 + 1.5 * iqr

    return (lower_bound, upper_bound)

### apparent_temperature Nhiệt độ cảm nhận

In [38]:
min_atemp = df.selectExpr("min(apparent_temperature)").first()[0]
max_atemp = df.selectExpr("max(apparent_temperature)").first()[0]
min_atemp, max_atemp

(-44.2, 51.4)

In [39]:
df.select('City', 'time').filter(col('apparent_temperature') == min_atemp).show()

+-----------+-------------------+
|       City|               time|
+-----------+-------------------+
|Ulaanbaatar|2023-01-24 06:00:00|
+-----------+-------------------+



Vào thời gian trên ở Ulaanbaata (Mông Cổ) đúng là đã ghi nhận mức nhiệt thấp dưới -44 độ

In [40]:
df.select('City', 'time').filter(col('apparent_temperature') == max_atemp).show()

+-------+-------------------+
|   City|               time|
+-------+-------------------+
|Baghdad|2020-07-29 12:00:00|
+-------+-------------------+



Baghdad ở Iraq thuộc khu vực Trung Đông là 1 trong nhưng nơi có khí hậu nóng nhất.


In [46]:
q1 = df.approxQuantile("apparent_temperature", [0.25], 0.0)[0]
q3 = df.approxQuantile("apparent_temperature", [0.75], 0.0)[0]
iqr = q3 - q1
out_atemp = (q1 - 1.5 * iqr, q3 + 1.5 * iqr)
print(out_atemp)

Py4JJavaError: ignored

In [None]:
city_out_low_atemp = df.filter(col('apparent_temperature') < out_atemp[0]).select('City').distinct()

In [None]:
filtered_city_df = city_df.filter(col('Capital').isin(city_out_low_atemp))

lat_out_low_atemp = filtered_city_df.select('Latitude').rdd.flatMap(lambda x: x).collect()
long_out_low_atemp = filtered_city_df.select('Longitude').rdd.flatMap(lambda x: x).collect()

plt.scatter(x=long_out_low_atemp, y=lat_out_low_atemp)
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.show()