<a href="https://colab.research.google.com/github/baoleee21/Parquet-Demo/blob/master/Task_1_Ebird.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**TASK 1**

In [None]:
# Bước 1: Thiết lập môi trường
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark geopandas shapely matplotlib pandas

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("EbirdDataAnalysis") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Kiểm tra tệp
print("\Kiểm tra tệp trong /content/drive/My Drive/DataFinal")
!ls "/content/drive/My Drive/DataFinal"

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
\Kiểm tra tệp trong /content/drive/My Drive/DataFinal
eBird_100k.csv	    eBird_1k.csv.bz2	    output
eBird_100k.csv.bz2  ebird_ZIP		    ProjectB.md
eBird_10k.csv	    eBird_ZIP_100k.parquet  tl_2018_us_zcta510
eBird_10k.csv.bz2   eBird_ZIP_10k.parquet   tl_2018_us_zcta510.zip
eBird_1k.csv	    eBird_ZIP_1k.parquet    zcta510_shapefile


In [None]:
# Kiểm tra phiên bản
import sys
import pandas as pd
print("Python:", sys.version)
print("Pandas:", pd.__version__)
print("Spark:", spark.version)

Python: 3.11.12 (main, Apr  9 2025, 08:55:54) [GCC 11.4.0]
Pandas: 2.2.2
Spark: 3.1.2


In [None]:
from pyspark.sql.functions import col, when, lit, regexp_replace, to_date
import geopandas as gpd
from shapely.geometry import Point
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql.functions import col


In [None]:
# Bước 2: Định nghĩa hàm hỗ trợ
def clean_ebird_data(ebird_df):
    """Làm sạch và chuẩn hóa dữ liệu eBird"""
    # Giữ các cột cần thiết
    ebird_df = ebird_df.select(
        "x", "y", "GLOBAL UNIQUE IDENTIFIER", "CATEGORY",
        "COMMON NAME", "SCIENTIFIC NAME", "OBSERVATION COUNT", "OBSERVATION DATE"
    )

    # Đổi tên các cột có khoảng trắng
    ebird_df = ebird_df \
        .withColumnRenamed("GLOBAL UNIQUE IDENTIFIER", "global_id") \
        .withColumnRenamed("COMMON NAME", "common_name") \
        .withColumnRenamed("SCIENTIFIC NAME", "scientific_name") \
        .withColumnRenamed("OBSERVATION COUNT", "observation_count") \
        .withColumnRenamed("OBSERVATION DATE", "observation_date")

    # Ép kiểu các cột
    ebird_df = ebird_df.withColumn("x", col("x").cast("double")) \
        .withColumn("y", col("y").cast("double")) \
        .withColumn("observation_count", col("observation_count").cast("int"))

    # Lọc các hàng có toạ độ không hợp lệ
    ebird_df = ebird_df.filter(col("x").isNotNull() & col("y").isNotNull())

    ebird_df.printSchema()
    ebird_df.show(5)
    return ebird_df
def check_pandas_data(ebird_df, csv_path):
    """Chuyển sang Pandas và kiểm tra giá trị không hợp lệ."""
    # Chọn tất cả cột để giữ nguyên dữ liệu cho spatial join
    ebird_df = ebird_df.withColumn("observation_count", col("observation_count").cast("string"))
    try:
        ebird_pd = ebird_df.toPandas()
    except Exception as e:
        print(f"Lỗi khi gọi toPandas(): {e}")
        temp_csv_path = "/content/temp_ebird_data.csv"
        ebird_df.write.csv(temp_csv_path, header=True, mode="overwrite")
        ebird_pd = pd.read_csv(temp_csv_path)
        if os.path.exists(temp_csv_path):
            os.remove(temp_csv_path)

    print(f"\nKiểu dữ liệu cho {csv_path}:")
    print(ebird_pd[['x', 'y', 'observation_count', 'observation_date']].dtypes)

    print(f"Kiểm tra giá trị không phải số trong observation_count cho {csv_path}:")
    try:
        ebird_pd['observation_count'].astype(float)
        print("Không có giá trị không phải số trong observation_count.")
    except ValueError as e:
        non_numeric_y = ebird_pd[~ebird_pd['observation_count'].apply(lambda x: pd.isna(x) or str(x).replace('.', '').replace('-', '').isdigit())][['observation_count']]
        print("Giá trị không phải số:", non_numeric_y['observation_count'].unique())

    ebird_pd['x'] = pd.to_numeric(ebird_pd['x'], errors='coerce')
    ebird_pd['y'] = pd.to_numeric(ebird_pd['y'], errors='coerce')
    ebird_pd = ebird_pd.dropna(subset=['x', 'y'])

    print(f"Số giá trị NaN trong x: {ebird_pd['x'].isna().sum()}")
    print(f"Số giá trị NaN trong y: {ebird_pd['y'].isna().sum()}")
    print("Kiểu dữ liệu sau làm sạch:")
    print(ebird_pd[['x', 'y']].dtypes)

    return ebird_pd

def create_geodataframe(ebird_pd, csv_path):
    """Tạo GeoDataFrame và thực hiện spatial join."""
    ebird_gdf = gpd.GeoDataFrame(ebird_pd, geometry=gpd.points_from_xy(ebird_pd['x'], ebird_pd['y']))
    zip_gdf = gpd.read_file("/content/drive/MyDrive/DataFinal/tl_2018_us_zcta510/tl_2018_us_zcta510.shp")
    if ebird_gdf.crs != zip_gdf.crs and ebird_gdf.crs is not None:
        zip_gdf = zip_gdf.to_crs(ebird_gdf.crs)
    else:
        ebird_gdf.crs = zip_gdf.crs
    joined_gdf = gpd.sjoin(ebird_gdf, zip_gdf, how="left", predicate="within")
    joined_gdf = joined_gdf.dropna(subset=['ZCTA5CE10'])
    joined_gdf['ZIPCode'] = joined_gdf['ZCTA5CE10']
    joined_gdf = joined_gdf.drop(columns=['ZCTA5CE10'])
    print(f"\nKết quả spatial join cho {csv_path}:")
    print("5 hàng đầu tiên:")
    print(joined_gdf[['x', 'y', 'ZIPCode']].head())
    print("Danh sách cột:", joined_gdf.columns.tolist())
    return joined_gdf

def save_to_parquet(joined_gdf, parquet_path, csv_path):
    """Lưu GeoDataFrame thành tệp Parquet."""
    keep_columns = ["x", "y", "global_id", "CATEGORY", "common_name",
                                "scientific_name", "observation_count",
                                "observation_date", "ZIPCode"]
    pandas_df = pd.DataFrame(joined_gdf[keep_columns])
    for col in pandas_df.columns:
        if pandas_df[col].dtype == 'object':
            pandas_df[col] = pandas_df[col].astype(str)
    temp_csv_path = "/content/temp_ebird_data.csv"
    pandas_df.to_csv(temp_csv_path, index=False)
    result_df = spark.read.csv(temp_csv_path, header=True, inferSchema=True)
    result_df.write.parquet(parquet_path, mode="overwrite")
    print(f"\nKết quả Parquet cho {csv_path}:")
    print("Cấu trúc (Schema):")
    result_df.printSchema()
    print("5 hàng đầu tiên:")
    result_df.select("x", "y", "ZIPCode").show(5, truncate=False)
    if os.path.exists(temp_csv_path):
        os.remove(temp_csv_path)
    return result_df
    #Hàm tính kích thước file
def calculate_file_sizes(csv_path, parquet_path):
    """Tính kích thước tệp."""
    try:
        csv_size = os.path.getsize(csv_path) / (1024 * 1024)  # MB
        parquet_size = sum(os.path.getsize(os.path.join(parquet_path, f)) for f in os.listdir(parquet_path) if os.path.isfile(os.path.join(parquet_path, f))) / (1024 * 1024)  # MB
        print(f"Kích thước CSV: {csv_size:.2f} MB")
        print(f"Kích thước Parquet: {parquet_size:.2f} MB")
        return csv_size, parquet_size
    except Exception as e:
        print(f"Lỗi khi tính kích thước tệp: {e}")
        return 0, 0

In [None]:
# Bước 3: Xử lý từng tệp
data_files = [
    "/content/drive/MyDrive/DataFinal/eBird_1k.csv",
    "/content/drive/MyDrive/DataFinal/eBird_10k.csv",
    "/content/drive/MyDrive/DataFinal/eBird_100k.csv"
]

parquet_paths = {
    data_files[0]: "/content/drive/MyDrive/DataFinal/eBird_ZIP_1k.parquet",
    data_files[1]: "/content/drive/MyDrive/DataFinal/eBird_ZIP_10k.parquet",
    data_files[2]: "/content/drive/MyDrive/DataFinal/eBird_ZIP_100k.parquet"
}

results = []
pandas_dfs = {}
geodataframes = {}
parquet_dfs = {}

for csv_path in data_files:
    print(f"\n=== Xử lý: {csv_path} ===")

    # Đọc và kiểm tra dữ liệu
    try:
        ebird_df = spark.read.csv(csv_path, header=True, inferSchema=True)
        print("Cấu trúc ban đầu:")
        ebird_df.printSchema()
        print("5 hàng đầu tiên:")
        ebird_df.show(5, truncate=False)
    except Exception as e:
        print(f"Lỗi khi đọc tệp {csv_path}: {e}")
        continue

    # Làm sạch dữ liệu
    ebird_df = clean_ebird_data(ebird_df)

    # Chuyển sang Pandas
    ebird_pd = check_pandas_data(ebird_df, csv_path)
    pandas_dfs[csv_path] = ebird_pd

    # Tạo GeoDataFrame
    try:
        joined_gdf = create_geodataframe(ebird_pd, csv_path)
        geodataframes[csv_path] = joined_gdf
    except Exception as e:
        print(f"Lỗi khi tạo GeoDataFrame cho {csv_path}: {e}")
        continue

    # Lưu Parquet
    parquet_path = parquet_paths[csv_path]
    try:
        result_df = save_to_parquet(joined_gdf, parquet_path, csv_path)
        parquet_dfs[csv_path] = result_df
    except Exception as e:
        print(f"Lỗi khi lưu Parquet cho {csv_path}: {e}")
        continue

    # Tính kích thước
    print(f"\nKích thước tệp cho {csv_path}:")
    csv_size, parquet_size = calculate_file_sizes(csv_path, parquet_path)
    dataset_size = os.path.basename(csv_path).split("_")[1].split(".")[0]
    results.append((dataset_size, csv_size, parquet_size))


=== Xử lý: /content/drive/MyDrive/DataFinal/eBird_1k.csv ===
Cấu trúc ban đầu:
root
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- GLOBAL UNIQUE IDENTIFIER: string (nullable = true)
 |-- LAST EDITED DATE: string (nullable = true)
 |-- TAXONOMIC ORDER: integer (nullable = true)
 |-- CATEGORY: string (nullable = true)
 |-- COMMON NAME: string (nullable = true)
 |-- SCIENTIFIC NAME: string (nullable = true)
 |-- SUBSPECIES COMMON NAME: string (nullable = true)
 |-- SUBSPECIES SCIENTIFIC NAME: string (nullable = true)
 |-- OBSERVATION COUNT: string (nullable = true)
 |-- BREEDING BIRD ATLAS CODE: string (nullable = true)
 |-- BREEDING BIRD ATLAS CATEGORY: string (nullable = true)
 |-- AGE/SEX: string (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- COUNTRY CODE: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- STATE CODE: string (nullable = true)
 |-- COUNTY: string (nullable = true)
 |-- COUNTY CODE: string (nullable = true)
 |-- IBA

In [None]:
# Bước 4: Hiển thị bảng so sánh kích thước
print("\nKết quả so sánh kích thước:")
print(f"{'Tập dữ liệu':<15} | {'Kích thước CSV (MB)':<20} | {'Kích thước Parquet (MB)':<20}")
print("-" * 60)
for dataset_size, csv_size, parquet_size in results:
    print(f"{dataset_size:<15} | {csv_size:<20.2f} | {parquet_size:<20.2f}")


Kết quả so sánh kích thước:
Tập dữ liệu     | Kích thước CSV (MB)  | Kích thước Parquet (MB)
------------------------------------------------------------
1k              | 0.51                 | 0.02                
10k             | 3.84                 | 0.14                
100k            | 38.92                | 1.12                
