# Problems in pyspark 📝🔍

PySpark là một công cụ mạnh mẽ để xử lý các bài toán dữ liệu lớn. Sau đây là một vài bài toán kinh điển:

- WordCount
- ASEAN countries count
- House price predicting

In [44]:
# chuẩn bị môi trường cho việc chạy spark
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

spark = SparkSession.builder.appName("Spark Asean count").getOrCreate()
sc = spark.sparkContext

## WordCount 📊 <a id="wordcount"></a>


Bài toán cấp độ "hello word" với dành cho các beginner học big data. Ta sẽ giải quyết bằng phương pháp mapReduce.

Cho một chuỗi, yêu cầu đếm từng ký tự trong chuỗi đó.

- Input: 
```
"Apache Spark is an open-source unified analytics engine for large-scale data processing. Spark provides an interface for programming clusters with implicit data parallelism and fault tolerance. Originally developed at the University of California, Berkeley's AMPLab, the Spark codebase was later donated to the Apache Software Foundation, which has maintained it since."
```

- Output: 
    
    danh sách các cặp `(word, count)`

Solution:

In [45]:
doc = "Apache Spark is an open-source unified analytics engine for large-scale data processing. Spark provides an interface for programming clusters with implicit data parallelism and fault tolerance. Originally developed at the University of California, Berkeley's AMPLab, the Spark codebase was later donated to the Apache Software Foundation, which has maintained it since."

result = sc.parallelize([doc]) \
    .flatMap(lambda x : x.split(" ")) \
    .map(lambda x: (x, 1)) \
    .reduceByKey(lambda x, y : x + y) \
    .collect()
result

[('Apache', 2),
 ('implicit', 1),
 ('developed', 1),
 ('of', 1),
 ('codebase', 1),
 ('and', 1),
 ('later', 1),
 ('to', 1),
 ('Software', 1),
 ('has', 1),
 ('open-source', 1),
 ('unified', 1),
 ('clusters', 1),
 ('at', 1),
 ('University', 1),
 ("Berkeley's", 1),
 ('for', 2),
 ('processing.', 1),
 ('with', 1),
 ('California,', 1),
 ('it', 1),
 ('since.', 1),
 ('analytics', 1),
 ('engine', 1),
 ('fault', 1),
 ('large-scale', 1),
 ('parallelism', 1),
 ('tolerance.', 1),
 ('maintained', 1),
 ('Spark', 3),
 ('programming', 1),
 ('data', 2),
 ('Originally', 1),
 ('donated', 1),
 ('Foundation,', 1),
 ('an', 2),
 ('provides', 1),
 ('was', 1),
 ('the', 3),
 ('which', 1),
 ('is', 1),
 ('AMPLab,', 1),
 ('interface', 1)]

Giải thích chi tiết:

- `sc.parallelize` sẽ bao bọc `doc` trong `list`, đảm bảo rằng RDD có một phần tử là toàn bộ chuỗi.
- Áp dụng `flatMap` lên chuỗi đó, đi kèm `split` để tách thành một `list` các từ, sau đó “làm phẳng” `list` này thành một RDD chứa các từ.
  
  Ví dụ, khi chạy thì kết quả sẽ là: 
  ```python
  ["Apache", "Spark", "is", "an", "open-source", "unified", "analytics", "engine", "for", "large-scale", "data", "processing.", "Spark", "provides", "an", "interface", "for", "programming", "clusters", "with", "implicit", "data", "parallelism", "and", "fault", "tolerance."]
  ```


- `map` chuyển đổi mỗi từ trong RDD thành một tuple có định dạng `(word, count)`

    Ví dụ:

    ```python
    [("Apache", 1), ("Spark", 1), ("is", 1), ("an", 1),
    ("open-source", 1), ("unified", 1), ("analytics", 1), 
    ("engine", 1), ("for", 1), ("large-scale", 1), ("data", 1), 
    ("processing.", 1), ("Spark", 1), ("provides", 1), 
    ("an", 1), ("interface", 1), ("for", 1), ("programming", 1),
    ("clusters", 1), ("with", 1), ("implicit", 1), ("data", 1),
    ("parallelism", 1), ("and", 1), ("fault", 1), ("tolerance.", 1)]

    ```
- Sau đó, `reduceByKey` nhóm các tuple theo word và áp dụng hàm lambda để cộng dồn count lại cho mỗi từ

    Quá trình xử lý như sau:

    + Nhóm lại:
    
        ```python
        
        "Spark": [1, 1],
        "is": [1, 1],
        "an": [1, 1],
        "open-source": [1]
        
        ```
    + Áp dụng hàm `lambda x, y: x + y` cho mỗi nhóm:
    
        ```python
        "Spark": 1 + 1 = 2
        "is": 1 + 1 = 2
        "an": 1 + 1 = 2
        "open-source": 1 (không thay đổi vì chỉ có một phần tử)
        \
        ```

Kết quả là chúng ta có tần xuất với từng từ.

Đặt vấn đề: Ở bước reduce, ta thấy một vài word xuất hiện 2 lần (mỗi lần được đếm với count = 1) nên ta hiểu rằng lambda `x + y` tương đương `1 + 1` . Vậy thì nếu chúng xuất hiện nhiều hơn 2 lần thì sao ? Chẳng hạn như:

```python

    "Spark": [1, 1, 1],
    "is": [1, 1],
    "an": [1, 1]

```
Nếu số lượng phần tử cùng key lớn hơn 2, PySpark sẽ tự động áp dụng phép toán nhiều lần để gộp tất cả giá trị về một kết quả duy nhất.

Như vậy, cách xử lý cho từng nhóm sẽ được hình dung như sau:

```python
"Spark":
    1 + 1 = 2
    2 + 1 = 3

"is": 
    1 + 1 = 2
    
"an":
    1 + 1 = 2
```

Kết quả cuối cùng: ```[("Spark", 3), ("is", 2), ("an", 2)]```

## ASEAN countries count 📊

Bài toán truy vấn dữ liệu từ bảng dữ liệu tương tự như các câu truy vấn trong sql. 


Cho một file dữ liệu `WHO-COVID-19-20210601-213841.tsv` chứa các trường sau:

- Name 
- WHO Region 
- Cases - cumulative total 
- Cases - cumulative total per 100000 population 
- Cases - newly reported in last 7 days 
- Cases - newly reported in last 7 days per 100000 population 
- Cases - newly reported in last 24 hours 
- Deaths - cumulative total 
- Deaths - cumulative total per 100000 population 
- Deaths - newly reported in last 7 days 
- Deaths - newly reported in last 7 days per 100000 population 
- Deaths - newly reported in last 24 hours 
- Transmission Classification

Biết rằng trường `Cases - cumulative total` cho ta biết tổng số ca mắc covid của từng quốc gia ở một số khu vực 

Dựa vào cột `Cases - cumulative total` , yêu cầu: 
- Đếm xem có bao nhiêu quốc gia ở khu vực Châu Á bị nhiễm Covid
- Chỉ ra quốc gia có tổng số ca lớn nhất ở khu vực này
- Lấy ra 3 quốc gia có tổng số ca nhỏ nhất

Solutions:

In [46]:
# nhập dữ liệu
sqlc = SQLContext(sc)
df = sqlc.read.csv("WHO-COVID-19-20210601-213841.tsv",
                   header=True,
                   sep="\t")



SQLContext giúp ta khởi tạo dataframe tương tự như khi chúng ta làm việc với pandas. 

Lưu ý nhỏ: Khi khởi tạo, chúng ta luôn chỉ định `header=True` để đảm bảo nhất quán kiểu dữ liệu. Đã có nhiều case mình không làm điều này và mắc lỗi column bị tự động convert thành `object` hoặc `string` mặc dù kiểu của chúng là numerical.

In [47]:
# Đếm xem có bao nhiêu quốc gia ở khu vực Châu Á bị nhiễm Covid-19
asianCount = df.where(df['WHO Region'] == 'South-East Asia') \
    .select("Cases - cumulative total") \
    .rdd.count()
     
asianCount

11

In [48]:
# Chỉ ra quốc gia có tổng số ca lớn nhất ở khu vực này
maxAsian = df.where(df['WHO Region'] == 'South-East Asia') \
    .select("Cases - cumulative total") \
    .rdd \
    .max()
    
maxAsian

Row(Cases - cumulative total='800,540.000')

In [49]:
# Lấy ra 3 quốc gia có tổng số ca nhỏ nhất
three_min = df.where(df['WHO Region'] == 'South-East Asia') \
    .select("Name",'Cases - cumulative total') \
    .rdd \
    .takeOrdered(3, key=lambda x: x[0])
    
three_min 

[Row(Name='Bangladesh', Cases - cumulative total='800,540.000'),
 Row(Name='Bhutan', Cases - cumulative total='1,620.000'),
 Row(Name="Democratic People's Republic of Korea", Cases - cumulative total='0.000')]

## House price predicting 📊

Bài toán dự đoán giá nhà kinh điển trong machine learning. Giờ chúng ta sẽ giải quyết nó với pyspark.

Bộ dữ liệu có các trường:

- longitude
- latitude
- housing_median_age
- total_rooms
- total_bedrooms
- population
- households
- median_income
- median_house_value

Trong đó, `median_house_value` sẽ là target của mô hình

In [50]:
df = sqlc.read.csv('california_housing_test.csv',
                   header=True,
                   inferSchema=True)
df.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -122.05|   37.37|              27.0|     3885.0|         661.0|    1537.0|     606.0|       6.6085|          344700.0|
|   -118.3|   34.26|              43.0|     1510.0|         310.0|     809.0|     277.0|        3.599|          176500.0|
|  -117.81|   33.78|              27.0|     3589.0|         507.0|    1484.0|     495.0|       5.7934|          270500.0|
|  -118.36|   33.82|              28.0|       67.0|          15.0|      49.0|      11.0|       6.1359|          330000.0|
|  -119.67|   36.33|              19.0|     1241.0|         244.0|     850.0|     237.0|       2.9375|           81700.0|
|  -119.56|   36.51|    

In [51]:
# khảo sát thông tin từng feature
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [52]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

- `VectorAssembler`: Chuyển đổi nhiều cột đặc trưng thành một vector đầu vào.
- `LinearRegression`: Mô hình hồi quy tuyến tính để dự đoán giá nhà.
- `RegressionEvaluator`: Đánh giá hiệu suất mô hình

In [53]:
X = ['longitude', 'latitude', 'housing_median_age', 
     'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income']
y = 'median_house_value'

# Khởi tạo ascsembler
assembler = VectorAssembler(inputCols=X, outputCol='features')

# chọn đặc trưng và label
data = assembler.transform(df)
data = data.select('features', y)

# chia dữ liệu thành tập train và val
train, val = data.randomSplit([0.8, 0.2], seed=42)

# Huấn luyện mô hình     
lr = LinearRegression(featuresCol="features", labelCol=y)
model = lr.fit(train_df)

# Dự đoán
predictions = model.transform(val)
predictions.show()

+--------------------+------------------+------------------+
|            features|median_house_value|        prediction|
+--------------------+------------------+------------------+
|[-124.16,40.77,35...|           85600.0|160170.67677656002|
|[-124.15,40.78,41...|          104200.0|171261.05998632498|
|[-124.14,40.72,18...|          100500.0|133515.70980375214|
|[-124.01,40.97,21...|          102700.0| 141594.6981221405|
|[-123.74,40.66,25...|          136000.0| 136850.7204207722|
|[-123.47,39.8,18....|           79200.0|142440.13013207726|
|[-123.22,39.16,32...|          154600.0|206050.45340655604|
|[-123.08,40.4,10....|           37500.0| 44796.16892597685|
|[-122.84,38.4,15....|          194400.0|217804.47134948615|
|[-122.84,38.98,21...|           75000.0|102171.95075276727|
|[-122.79,38.54,5....|          213800.0|193491.70622318145|
|[-122.78,38.97,11...|           97300.0|163766.73291285057|
|[-122.75,38.48,4....|          197400.0|228880.58287195629|
|[-122.73,38.43,29...|  

In [None]:
# đánh giá mô hình
predictions = model.transform(val)
predictions.select('features', y).show(5)

evaluator = RegressionEvaluator(
    labelCol=y, 
    predictionCol="prediction", 
    metricName="rmse"
    )

rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

+--------------------+------------------+
|            features|median_house_value|
+--------------------+------------------+
|[-124.16,40.77,35...|           85600.0|
|[-124.15,40.78,41...|          104200.0|
|[-124.14,40.72,18...|          100500.0|
|[-124.01,40.97,21...|          102700.0|
|[-123.74,40.66,25...|          136000.0|
+--------------------+------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE): 74921.51407182051


In [55]:
new_data = spark.createDataFrame(
    data = [(-122.05, 37.37, 27.0 ,3885.0, 661.0,1537.0, 606.0,6.6085, 344700.0)], 
    schema = X
    )

new_data = assembler.transform(new_data)
new_pred = model.transform(new_data)
new_pred.select("prediction").show()


+------------------+
|        prediction|
+------------------+
|353612.26725589857|
+------------------+

