In [1]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

In [2]:
spark = SparkSession.builder.master("local").appName("write_rice").getOrCreate()

In [3]:
#Đọc file newPrices.csv
csvFile = spark.read.format("csv")\
.option("header", "true")\
.option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load("./newPrices.csv")

In [4]:
#Xem cấu trúc các trường trong datafame
csvFile.printSchema()

root
 |-- Date: string (nullable = true)
 |-- US_dollar: string (nullable = true)
 |-- Euro: string (nullable = true)
 |-- Japanese_yen: string (nullable = true)
 |-- Canadian_dollar: string (nullable = true)
 |-- Indian_rupee: string (nullable = true)
 |-- Chinese_renmimbi: string (nullable = true)
 |-- Indonesian_rupiah: string (nullable = true)
 |-- UAE_dirham: string (nullable = true)
 |-- Thai_baht: string (nullable = true)
 |-- Vietnamese_dong: string (nullable = true)
 |-- Egyptian_pound: string (nullable = true)
 |-- Korean_won: string (nullable = true)
 |-- Russian_ruble: string (nullable = true)
 |-- SouthAfrican_rand: string (nullable = true)
 |-- Australian_dollar: string (nullable = true)



datafame có 16 cột bao gồm: 
- Date có kiểu dữ liệu là string, cột này có ý nghĩa ngày/tháng/năm
- Các cột còn lại để là kiểu string với ý nghĩa là giá trị của vàng theo đơn vị tiền tệ của các nước

In [5]:
#số lượng bản ghi là 10941
csvFile.count()

10941

In [6]:
#Quan sát một ý dữ liệu đã đọc được. Một số giá trị là #N/A có ý nghĩa là tại đó không có dữ liệu
csvFile.select("Date", 
               "US_dollar", 
               "Euro", 
               "Japanese_yen", 
               "Vietnamese_dong", 
               "Thai_baht", 
               "Australian_dollar",
              "Chinese_renmimbi", 
              "Australian_dollar").show(20)

+----------+---------+-----+------------+---------------+---------+-----------------+----------------+-----------------+
|      Date|US_dollar| Euro|Japanese_yen|Vietnamese_dong|Thai_baht|Australian_dollar|Chinese_renmimbi|Australian_dollar|
+----------+---------+-----+------------+---------------+---------+-----------------+----------------+-----------------+
|12/29/1978|      226|137.1|        #N/A|           #N/A|     #N/A|             #N/A|            #N/A|             #N/A|
|  1/1/1979|      226|137.1|        #N/A|           #N/A|     #N/A|             #N/A|            #N/A|             #N/A|
|  1/2/1979|    226.8|137.3|   43,164.90|           #N/A| 4,454.60|            193.6|            #N/A|            193.6|
|  1/3/1979|    218.6|  134|   43,717.90|           #N/A| 4,477.70|              193|            #N/A|              193|
|  1/4/1979|    223.2|136.8|   43,674.90|           #N/A| 4,501.50|            194.6|            #N/A|            194.6|
|  1/5/1979|    225.5|138.4|   4

In [7]:
# để thao tác với các giá trị là số trong các cột, chúng em sẽ đổi lại kiểu dữ liệu cho các cột
df = csvFile.selectExpr("cast(Date as string) Date",
                        "cast(US_dollar as double) US_dollar",
                        "cast(Euro as double) Euro",
                       "cast(Japanese_yen as double) Japanese_yen",
                       "cast(Canadian_dollar as double) Canadian_dollar",
                       "cast(Indian_rupee as double) Indian_rupee",
                       "cast(Chinese_renmimbi as double) Chinese_renmimbi",
                       "cast(Indonesian_rupiah as double) Indonesian_rupiah",
                       "cast(UAE_dirham as double) UAE_dirham",
                       "cast(Thai_baht as double) Thai_baht",
                       "cast(Vietnamese_dong as double) Vietnamese_dong",
                       "cast(Egyptian_pound as double) Egyptian_pound",
                       "cast(Korean_won as double) Korean_won",
                       "cast(Russian_ruble as double) Russian_ruble",
                       "cast(SouthAfrican_rand as double) SouthAfrican_rand",
                       "cast(Australian_dollar as double) Australian_dollar")

In [8]:
#Cấu trúc của datafame sau khi đổi lại kiểu dữ liệu từ string sang double
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- US_dollar: double (nullable = true)
 |-- Euro: double (nullable = true)
 |-- Japanese_yen: double (nullable = true)
 |-- Canadian_dollar: double (nullable = true)
 |-- Indian_rupee: double (nullable = true)
 |-- Chinese_renmimbi: double (nullable = true)
 |-- Indonesian_rupiah: double (nullable = true)
 |-- UAE_dirham: double (nullable = true)
 |-- Thai_baht: double (nullable = true)
 |-- Vietnamese_dong: double (nullable = true)
 |-- Egyptian_pound: double (nullable = true)
 |-- Korean_won: double (nullable = true)
 |-- Russian_ruble: double (nullable = true)
 |-- SouthAfrican_rand: double (nullable = true)
 |-- Australian_dollar: double (nullable = true)



In [9]:
#Quan sát một ý dữ liệu đã đọc được. Các giá trị là #N/A đã thành null
df.select("Date", 
           "US_dollar", 
           "Euro", 
           "Japanese_yen", 
           "Vietnamese_dong", 
           "Thai_baht", 
           "Australian_dollar",
          "Chinese_renmimbi", 
          "Australian_dollar").show(20)

+----------+---------+-----+------------+---------------+---------+-----------------+----------------+-----------------+
|      Date|US_dollar| Euro|Japanese_yen|Vietnamese_dong|Thai_baht|Australian_dollar|Chinese_renmimbi|Australian_dollar|
+----------+---------+-----+------------+---------------+---------+-----------------+----------------+-----------------+
|12/29/1978|    226.0|137.1|        null|           null|     null|             null|            null|             null|
|  1/1/1979|    226.0|137.1|        null|           null|     null|             null|            null|             null|
|  1/2/1979|    226.8|137.3|        null|           null|     null|            193.6|            null|            193.6|
|  1/3/1979|    218.6|134.0|        null|           null|     null|            193.0|            null|            193.0|
|  1/4/1979|    223.2|136.8|        null|           null|     null|            194.6|            null|            194.6|
|  1/5/1979|    225.5|138.4|    

In [10]:
#ghi dữ liệu và trong hdfs với địa chỉ là hdfs://namenode:9000/myfile/price_gold.csv
df.write.format("csv").mode("overwrite").option("header", "true")\
.save("hdfs://namenode:9000/myfile/price_gold.csv")