# Spark 練習環境建立

## 套件安裝及環境建置

In [1]:
# 安裝 pyspark
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/8e/b0/bf9020b56492281b9c9d8aae8f44ff51e1bc91b3ef5a884385cb4e389a40/pyspark-3.0.0.tar.gz (204.7MB)
[K     |████████████████████████████████| 204.7MB 64kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 43.9MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.0-py2.py3-none-any.whl size=205044182 sha256=e0b326a7293e473f0cbd70efd676f16b8636424b77aceeffa6675f888f77ee0e
  Stored in directory: /root/.cache/pip/wheels/57/27/4d/ddacf7143f8d5b76c45c61ee2e43d9f8492fc5a8e78ebd7d37
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.0


In [2]:
# 啟動 spark 服務
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession.builder.master("local").getOrCreate()
sc = SparkContext.getOrCreate()

In [3]:
# 確認 spark 環境
sc

## 資料讀取

In [21]:
# 從外部抓CUST360的測試資料
from pyspark import SparkFiles
url = 'https://raw.githubusercontent.com/chia313339/Spark_practice/master/CUST_360.csv'
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("CUST_360.csv"), header=True, inferSchema= True)

In [22]:
# 將資料集讀取在memory上，DataFrame跟SparkSQL都要下此語法，如果沒下，資料每次都會重新load
df.cache()

DataFrame[APC_ID_SAS: string, GENDER: string, AGE: int, CONTACT_CITY_CD: string, EDUCATION_CD: int, MARRIAGE_CD: int, STAFF_IND: int, LIFE_INSD_CNT: int, HEIGHT: int, WEIGHT: int]

In [6]:
# 執行這句才會真的運行Spark，將資料存在記憶體
df.show()

+------------------+------+---+---------------+------------+-----------+---------+-------------+------+------+
|        APC_ID_SAS|GENDER|AGE|CONTACT_CITY_CD|EDUCATION_CD|MARRIAGE_CD|STAFF_IND|LIFE_INSD_CNT|HEIGHT|WEIGHT|
+------------------+------+---+---------------+------------+-----------+---------+-------------+------+------+
|Q2D129E954AD477523|     F| 50|         台中市|           2|          1|        0|            4|   158|    54|
|A2AFDA91C172B15A77|     F| 35|         高雄市|           3|          0|        0|            2|   162|    48|
|F1D4EF7FFBE44A51FE|     M| 33|         新北市|           3|          0|        0|            2|   172|    70|
|T117FCDED3FAE1C1F3|     M| 31|         屏東縣|           3|          0|        0|            3|   167|    58|
|L26284A55F56B398BA|     F| 55|         高雄市|           3|          1|        0|            2|   161|    55|
|E1F5DBF55F6B1427DD|     M| 60|         高雄市|           1|          0|        0|            1|   167|    72|
|F20481473EEB92E8D8

# 資料篩選

## Spark Dataframe

- 篩選年齡大於25歲的客戶

In [14]:
df.select('APC_ID_SAS','AGE').where(df.AGE > 25).show()

+------------------+---+
|        APC_ID_SAS|AGE|
+------------------+---+
|Q2D129E954AD477523| 50|
|A2AFDA91C172B15A77| 35|
|F1D4EF7FFBE44A51FE| 33|
|T117FCDED3FAE1C1F3| 31|
|L26284A55F56B398BA| 55|
|E1F5DBF55F6B1427DD| 60|
|F20481473EEB92E8D8| 37|
|H20BE9CA7E806E0551| 71|
|F1B81805D82921E3B9| 27|
|N1ED4D2C4D60B78323| 57|
|F12A8F9FE08045F1A6| 42|
|S2FBC9D681C459E262| 46|
|T24571F44004FE8145| 59|
|P2A8A6EE60E936FA24| 53|
|J1E82E68A12B9D234D| 54|
|J1882AEE9E2183B202| 56|
|Q221C6DF46F85DA0D3| 59|
|B29C7859D7C9244C3A| 45|
|A2663738EDFF7839CA| 37|
|R24A96ABFCF4742B5C| 32|
+------------------+---+
only showing top 20 rows



- 計算各學歷客戶數

In [15]:
df.select('EDUCATION_CD').groupby(df.EDUCATION_CD).count().show()

+------------+-----+
|EDUCATION_CD|count|
+------------+-----+
|        null|19138|
|           1|22184|
|           3|30428|
|           4| 3967|
|           2|24283|
+------------+-----+



- 新增欄位以公尺為單位計算身高，並從高到低排序

In [16]:
df.select('APC_ID_SAS','HEIGHT').withColumn('HEIGHT_M',df.HEIGHT/100).orderBy('HEIGHT_M',ascending=False).show()

+------------------+------+--------+
|        APC_ID_SAS|HEIGHT|HEIGHT_M|
+------------------+------+--------+
|F1057CE5E949E6EFC1|   201|    2.01|
|ACE8FCF8C6B46669D3|   198|    1.98|
|H1B5DCDA759441F52C|   196|    1.96|
|F13ACDDC993E9E53D9|   196|    1.96|
|P1C1B81800B3987528|   196|    1.96|
|C1E1615E25FC949658|   195|    1.95|
|H1804B3E772A5FE82E|   195|    1.95|
|R111E1AA083CCA93D9|   195|    1.95|
|F1C81BFF76B64F7029|   195|    1.95|
|F166C8AE8B99A075F0|   195|    1.95|
|E12E48BB533DBC264E|   195|    1.95|
|E17A0620F60388202F|   195|    1.95|
|M1FB9517B8D97BEC30|   193|    1.93|
|F1980A9D354E0DE785|   193|    1.93|
|R157828AF5AE7D5593|   193|    1.93|
|L1252D0A9147B3B57C|   193|    1.93|
|E152DE2A6C1C9D7C52|   193|    1.93|
|R195BE7E6A9A99F9BB|   193|    1.93|
|A1F2427D4A6C93A26B|   193|    1.93|
|B18167CA3279E1551B|   193|    1.93|
+------------------+------+--------+
only showing top 20 rows



## Spark SQL
使用Spark SQL的起手式，要先 register temp table

In [7]:
# register temp table
df.registerTempTable('cust_360')

- 篩選年齡大於25歲的客戶

In [11]:
spark.sql('select APC_ID_SAS, AGE from cust_360 where AGE > 25').show()

+------------------+---+
|        APC_ID_SAS|AGE|
+------------------+---+
|Q2D129E954AD477523| 50|
|A2AFDA91C172B15A77| 35|
|F1D4EF7FFBE44A51FE| 33|
|T117FCDED3FAE1C1F3| 31|
|L26284A55F56B398BA| 55|
|E1F5DBF55F6B1427DD| 60|
|F20481473EEB92E8D8| 37|
|H20BE9CA7E806E0551| 71|
|F1B81805D82921E3B9| 27|
|N1ED4D2C4D60B78323| 57|
|F12A8F9FE08045F1A6| 42|
|S2FBC9D681C459E262| 46|
|T24571F44004FE8145| 59|
|P2A8A6EE60E936FA24| 53|
|J1E82E68A12B9D234D| 54|
|J1882AEE9E2183B202| 56|
|Q221C6DF46F85DA0D3| 59|
|B29C7859D7C9244C3A| 45|
|A2663738EDFF7839CA| 37|
|R24A96ABFCF4742B5C| 32|
+------------------+---+
only showing top 20 rows



- 計算各學歷客戶數

In [12]:
spark.sql('select EDUCATION_CD, count(1) from cust_360 group by EDUCATION_CD').show()

+------------+--------+
|EDUCATION_CD|count(1)|
+------------+--------+
|        null|   19138|
|           1|   22184|
|           3|   30428|
|           4|    3967|
|           2|   24283|
+------------+--------+



- 新增欄位以公尺為單位計算身高，並從高到低排序

In [13]:
spark.sql('select APC_ID_SAS, HEIGHT, HEIGHT/100 as HEIGHT_M from cust_360 order by HEIGHT_M desc').show()

+------------------+------+--------+
|        APC_ID_SAS|HEIGHT|HEIGHT_M|
+------------------+------+--------+
|F1057CE5E949E6EFC1|   201|    2.01|
|ACE8FCF8C6B46669D3|   198|    1.98|
|H1B5DCDA759441F52C|   196|    1.96|
|F13ACDDC993E9E53D9|   196|    1.96|
|P1C1B81800B3987528|   196|    1.96|
|C1E1615E25FC949658|   195|    1.95|
|H1804B3E772A5FE82E|   195|    1.95|
|R111E1AA083CCA93D9|   195|    1.95|
|F1C81BFF76B64F7029|   195|    1.95|
|F166C8AE8B99A075F0|   195|    1.95|
|E12E48BB533DBC264E|   195|    1.95|
|E17A0620F60388202F|   195|    1.95|
|M1FB9517B8D97BEC30|   193|    1.93|
|F1980A9D354E0DE785|   193|    1.93|
|R157828AF5AE7D5593|   193|    1.93|
|L1252D0A9147B3B57C|   193|    1.93|
|E152DE2A6C1C9D7C52|   193|    1.93|
|R195BE7E6A9A99F9BB|   193|    1.93|
|A1F2427D4A6C93A26B|   193|    1.93|
|B18167CA3279E1551B|   193|    1.93|
+------------------+------+--------+
only showing top 20 rows



# 清除記憶體暫存
清除記憶體內的 cache()

## Dataframe

### Load Data

In [17]:
# 公司環境實際操作方法
# df = spark.sql("select * from life_v_dm.cust_360")

In [18]:
# Colab csv練習
df = spark.read.csv(SparkFiles.get("CUST_360.csv"), header=True, inferSchema= True)

### Delete cache

In [19]:
# 跟一般 pandas dataframe 一樣
del df

## Spark SQL

### Load Data

In [25]:
df.registerTempTable('cust_360')

### Delete cache

In [24]:
spark.catalog.dropTempView("cust_360")

# 資料篩選練習

## 第一題
篩選客戶ID、身高、體重，並計算每個客戶 BMI 值，由小排到大顯示。

In [26]:
# Dataframe 方法
df.select('APC_ID_SAS','HEIGHT','WEIGHT').withColumn('BMI',df.WEIGHT/(df.HEIGHT/100)**2).orderBy('BMI',ascending=True).show()

+------------------+------+------+------------------+
|        APC_ID_SAS|HEIGHT|WEIGHT|               BMI|
+------------------+------+------+------------------+
|E105DCAFC305033417|    49|     0|               0.0|
|H29DC5A204152934EE|    43|     0|               0.0|
|A126C93E33B4D4CB31|   172|     0|               0.0|
|A15C24ACBA604E7241|    49|     0|               0.0|
|Q19F6BBF162DFE9D3E|   151|     3|1.3157317661506074|
|A236E76AECC53C9FDC|   130|     3|1.7751479289940826|
|P210C79C71D3C77171|   155|     5|2.0811654526534857|
|A268ED34D77A261522|   165|     6| 2.203856749311295|
|P135C8C1E11AE16244|   172|     7|2.3661438615467825|
|A1110745FF4F29418D|    55|     1| 3.305785123966942|
|A1E7457FDC4C53B285|    53|     1|3.5599857600569593|
|N11FE8FC93D938A2D9|    51|     1|3.8446751249519417|
|H2E3A538DEC2F497A8|    50|     1|               4.0|
|H2FDF7168B2AC5F914|    50|     1|               4.0|
|E1033E78F40656C84A|    48|     1| 4.340277777777778|
|F1083B205D44EC8DE3|    48| 

In [27]:
# Spark SQL 方法
spark.sql('select APC_ID_SAS, HEIGHT, WEIGHT, WEIGHT/sqrt(HEIGHT/100) as BMI from cust_360 order by BMI asc').show()

+------------------+------+------+------------------+
|        APC_ID_SAS|HEIGHT|WEIGHT|               BMI|
+------------------+------+------+------------------+
|E105DCAFC305033417|    49|     0|               0.0|
|H29DC5A204152934EE|    43|     0|               0.0|
|A126C93E33B4D4CB31|   172|     0|               0.0|
|A15C24ACBA604E7241|    49|     0|               0.0|
|A1110745FF4F29418D|    55|     1| 1.348399724926484|
|A1E7457FDC4C53B285|    53|     1|1.3736056394868903|
|N11FE8FC93D938A2D9|    51|     1|1.4002800840280099|
|H2FDF7168B2AC5F914|    50|     1| 1.414213562373095|
|H2E3A538DEC2F497A8|    50|     1| 1.414213562373095|
|E1033E78F40656C84A|    48|     1|1.4433756729740643|
|F1083B205D44EC8DE3|    48|     1|1.4433756729740643|
|E24ED6669B9EC94CD1|    47|     1|1.4586499149789456|
|A1ECCB57B3EBAFFE96|    47|     1|1.4586499149789456|
|S2FCC785A60EE77479|    44|     1|1.5075567228888183|
|Q19F6BBF162DFE9D3E|   151|     3|2.4413653763134784|
|A236E76AECC53C9FDC|   130| 

## 第二題
根據上面的 BMI 欄位，新增欄位 BMI_L 顯示 BMI 等級，規則如下。<br>
過輕：BMI<18.5 <br>
正常：18.5<=BMI<24<br>
過重：24<=BMI

In [28]:
# Spark SQL 方法
# WAY 1
spark.sql('select APC_ID_SAS, HEIGHT, WEIGHT, WEIGHT/sqrt(HEIGHT/100) as BMI, \
case when WEIGHT/sqrt(HEIGHT/100) < 18.5 then "過輕" \
when WEIGHT/sqrt(HEIGHT/100) < 24 then "正常" \
else "過重" end as BMI_L from cust_360').show()

+------------------+------+------+------------------+-----+
|        APC_ID_SAS|HEIGHT|WEIGHT|               BMI|BMI_L|
+------------------+------+------+------------------+-----+
|Q2D129E954AD477523|   158|    54| 42.96009334548942| 過重|
|A2AFDA91C172B15A77|   162|    48|37.712361663282536| 過重|
|F1D4EF7FFBE44A51FE|   172|    70| 53.37449961641163| 過重|
|T117FCDED3FAE1C1F3|   167|    58| 44.88174748697994| 過重|
|L26284A55F56B398BA|   161|    55| 43.34607234315053| 過重|
|E1F5DBF55F6B1427DD|   167|    72|55.715272742457856| 過重|
|F20481473EEB92E8D8|   160|    54| 42.69074841227312| 過重|
|H20BE9CA7E806E0551|   150|    47| 38.37533930360313| 過重|
|F1B81805D82921E3B9|   182|    64|47.439956266310475| 過重|
|N1ED4D2C4D60B78323|   182|    76| 56.33494806624368| 過重|
|F12A8F9FE08045F1A6|   174|    63|47.760176745470915| 過重|
|S2FBC9D681C459E262|   160|    53| 41.90017899723102| 過重|
|T24571F44004FE8145|   163|    58| 45.42910609930153| 過重|
|P2A8A6EE60E936FA24|   153|    57| 46.08176875690326| 過重|
|J1E82E6

In [30]:
# WAY 2 registerTempTable()
spark.sql('select APC_ID_SAS, HEIGHT, WEIGHT, WEIGHT/sqrt(HEIGHT/100) as BMI from cust_360').registerTempTable('cust_360_tmp')

spark.sql('select *, \
case when BMI < 18.5 then "過輕" \
when BMI < 24 then "正常" \
else "過重" end as BMI_L \
from cust_360_tmp').show()

+------------------+------+------+------------------+-----+
|        APC_ID_SAS|HEIGHT|WEIGHT|               BMI|BMI_L|
+------------------+------+------+------------------+-----+
|Q2D129E954AD477523|   158|    54| 42.96009334548942| 過重|
|A2AFDA91C172B15A77|   162|    48|37.712361663282536| 過重|
|F1D4EF7FFBE44A51FE|   172|    70| 53.37449961641163| 過重|
|T117FCDED3FAE1C1F3|   167|    58| 44.88174748697994| 過重|
|L26284A55F56B398BA|   161|    55| 43.34607234315053| 過重|
|E1F5DBF55F6B1427DD|   167|    72|55.715272742457856| 過重|
|F20481473EEB92E8D8|   160|    54| 42.69074841227312| 過重|
|H20BE9CA7E806E0551|   150|    47| 38.37533930360313| 過重|
|F1B81805D82921E3B9|   182|    64|47.439956266310475| 過重|
|N1ED4D2C4D60B78323|   182|    76| 56.33494806624368| 過重|
|F12A8F9FE08045F1A6|   174|    63|47.760176745470915| 過重|
|S2FBC9D681C459E262|   160|    53| 41.90017899723102| 過重|
|T24571F44004FE8145|   163|    58| 45.42910609930153| 過重|
|P2A8A6EE60E936FA24|   153|    57| 46.08176875690326| 過重|
|J1E82E6

In [32]:
# Dataframe 方法
df_tmp = df.select('APC_ID_SAS','HEIGHT','WEIGHT').withColumn('BMI',df.WEIGHT/(df.HEIGHT/100)**2)

# WAY 1 withColumn()
from pyspark.sql.functions import when
df_tmp.withColumn("BMI_L",when(df_tmp.BMI < 18.5, '過輕').when(df_tmp.BMI < 24, '正常').otherwise('過重')).show()

+------------------+------+------+------------------+-----+
|        APC_ID_SAS|HEIGHT|WEIGHT|               BMI|BMI_L|
+------------------+------+------+------------------+-----+
|Q2D129E954AD477523|   158|    54| 21.63114885435026| 正常|
|A2AFDA91C172B15A77|   162|    48|18.289894833104707| 過輕|
|F1D4EF7FFBE44A51FE|   172|    70|23.661438615467823| 正常|
|T117FCDED3FAE1C1F3|   167|    58|20.796729893506402| 正常|
|L26284A55F56B398BA|   161|    55|21.218317194552675| 正常|
|E1F5DBF55F6B1427DD|   167|    72|25.816630212628635| 過重|
|F20481473EEB92E8D8|   160|    54|21.093749999999996| 正常|
|H20BE9CA7E806E0551|   150|    47| 20.88888888888889| 正常|
|F1B81805D82921E3B9|   182|    64| 19.32133800265668| 正常|
|N1ED4D2C4D60B78323|   182|    76|22.944088878154812| 正常|
|F12A8F9FE08045F1A6|   174|    63|20.808561236623067| 正常|
|S2FBC9D681C459E262|   160|    53|20.703124999999996| 正常|
|T24571F44004FE8145|   163|    58|21.829952199932254| 正常|
|P2A8A6EE60E936FA24|   153|    57| 24.34960912469563| 過重|
|J1E82E6

In [33]:
# WAY 2 select()
from pyspark.sql.functions import when
df_tmp.select('*',when(df_tmp.BMI < 18.5, '過輕').when(df_tmp.BMI < 24, '正常').otherwise('過重').alias('BMI_L')).show()

+------------------+------+------+------------------+-----+
|        APC_ID_SAS|HEIGHT|WEIGHT|               BMI|BMI_L|
+------------------+------+------+------------------+-----+
|Q2D129E954AD477523|   158|    54| 21.63114885435026| 正常|
|A2AFDA91C172B15A77|   162|    48|18.289894833104707| 過輕|
|F1D4EF7FFBE44A51FE|   172|    70|23.661438615467823| 正常|
|T117FCDED3FAE1C1F3|   167|    58|20.796729893506402| 正常|
|L26284A55F56B398BA|   161|    55|21.218317194552675| 正常|
|E1F5DBF55F6B1427DD|   167|    72|25.816630212628635| 過重|
|F20481473EEB92E8D8|   160|    54|21.093749999999996| 正常|
|H20BE9CA7E806E0551|   150|    47| 20.88888888888889| 正常|
|F1B81805D82921E3B9|   182|    64| 19.32133800265668| 正常|
|N1ED4D2C4D60B78323|   182|    76|22.944088878154812| 正常|
|F12A8F9FE08045F1A6|   174|    63|20.808561236623067| 正常|
|S2FBC9D681C459E262|   160|    53|20.703124999999996| 正常|
|T24571F44004FE8145|   163|    58|21.829952199932254| 正常|
|P2A8A6EE60E936FA24|   153|    57| 24.34960912469563| 過重|
|J1E82E6

In [34]:
# WAY 3 selectExpr()
df_tmp.selectExpr('*','case when BMI < 18.5 then "過輕" when BMI < 24 then "正常" else "過重" end as BMI_L').show()

+------------------+------+------+------------------+-----+
|        APC_ID_SAS|HEIGHT|WEIGHT|               BMI|BMI_L|
+------------------+------+------+------------------+-----+
|Q2D129E954AD477523|   158|    54| 21.63114885435026| 正常|
|A2AFDA91C172B15A77|   162|    48|18.289894833104707| 過輕|
|F1D4EF7FFBE44A51FE|   172|    70|23.661438615467823| 正常|
|T117FCDED3FAE1C1F3|   167|    58|20.796729893506402| 正常|
|L26284A55F56B398BA|   161|    55|21.218317194552675| 正常|
|E1F5DBF55F6B1427DD|   167|    72|25.816630212628635| 過重|
|F20481473EEB92E8D8|   160|    54|21.093749999999996| 正常|
|H20BE9CA7E806E0551|   150|    47| 20.88888888888889| 正常|
|F1B81805D82921E3B9|   182|    64| 19.32133800265668| 正常|
|N1ED4D2C4D60B78323|   182|    76|22.944088878154812| 正常|
|F12A8F9FE08045F1A6|   174|    63|20.808561236623067| 正常|
|S2FBC9D681C459E262|   160|    53|20.703124999999996| 正常|
|T24571F44004FE8145|   163|    58|21.829952199932254| 正常|
|P2A8A6EE60E936FA24|   153|    57| 24.34960912469563| 過重|
|J1E82E6

In [35]:
# WAY 4 select() with expr function
from pyspark.sql.functions import expr
df_tmp.select('*',expr('case when BMI < 18.5 then "過輕" when BMI < 24 then "正常" else "過重" end as BMI_L')).show()

+------------------+------+------+------------------+-----+
|        APC_ID_SAS|HEIGHT|WEIGHT|               BMI|BMI_L|
+------------------+------+------+------------------+-----+
|Q2D129E954AD477523|   158|    54| 21.63114885435026| 正常|
|A2AFDA91C172B15A77|   162|    48|18.289894833104707| 過輕|
|F1D4EF7FFBE44A51FE|   172|    70|23.661438615467823| 正常|
|T117FCDED3FAE1C1F3|   167|    58|20.796729893506402| 正常|
|L26284A55F56B398BA|   161|    55|21.218317194552675| 正常|
|E1F5DBF55F6B1427DD|   167|    72|25.816630212628635| 過重|
|F20481473EEB92E8D8|   160|    54|21.093749999999996| 正常|
|H20BE9CA7E806E0551|   150|    47| 20.88888888888889| 正常|
|F1B81805D82921E3B9|   182|    64| 19.32133800265668| 正常|
|N1ED4D2C4D60B78323|   182|    76|22.944088878154812| 正常|
|F12A8F9FE08045F1A6|   174|    63|20.808561236623067| 正常|
|S2FBC9D681C459E262|   160|    53|20.703124999999996| 正常|
|T24571F44004FE8145|   163|    58|21.829952199932254| 正常|
|P2A8A6EE60E936FA24|   153|    57| 24.34960912469563| 過重|
|J1E82E6

In [36]:
# WAY 5 withColumn() with expr function
from pyspark.sql.functions import expr
df_tmp.withColumn("BMI_L",expr('case when BMI < 18.5 then "過輕" when BMI < 24 then "正常" else "過重" end as BMI_L')).show()

+------------------+------+------+------------------+-----+
|        APC_ID_SAS|HEIGHT|WEIGHT|               BMI|BMI_L|
+------------------+------+------+------------------+-----+
|Q2D129E954AD477523|   158|    54| 21.63114885435026| 正常|
|A2AFDA91C172B15A77|   162|    48|18.289894833104707| 過輕|
|F1D4EF7FFBE44A51FE|   172|    70|23.661438615467823| 正常|
|T117FCDED3FAE1C1F3|   167|    58|20.796729893506402| 正常|
|L26284A55F56B398BA|   161|    55|21.218317194552675| 正常|
|E1F5DBF55F6B1427DD|   167|    72|25.816630212628635| 過重|
|F20481473EEB92E8D8|   160|    54|21.093749999999996| 正常|
|H20BE9CA7E806E0551|   150|    47| 20.88888888888889| 正常|
|F1B81805D82921E3B9|   182|    64| 19.32133800265668| 正常|
|N1ED4D2C4D60B78323|   182|    76|22.944088878154812| 正常|
|F12A8F9FE08045F1A6|   174|    63|20.808561236623067| 正常|
|S2FBC9D681C459E262|   160|    53|20.703124999999996| 正常|
|T24571F44004FE8145|   163|    58|21.829952199932254| 正常|
|P2A8A6EE60E936FA24|   153|    57| 24.34960912469563| 過重|
|J1E82E6

# Spark 與 Pandas 轉換

## 轉成 Pandas Dataframe

In [37]:
import pandas as pd
df_pd = df.toPandas()
df_pd.head()

Unnamed: 0,APC_ID_SAS,GENDER,AGE,CONTACT_CITY_CD,EDUCATION_CD,MARRIAGE_CD,STAFF_IND,LIFE_INSD_CNT,HEIGHT,WEIGHT
0,Q2D129E954AD477523,F,50.0,台中市,2.0,1.0,0,4,158,54
1,A2AFDA91C172B15A77,F,35.0,高雄市,3.0,0.0,0,2,162,48
2,F1D4EF7FFBE44A51FE,M,33.0,新北市,3.0,0.0,0,2,172,70
3,T117FCDED3FAE1C1F3,M,31.0,屏東縣,3.0,0.0,0,3,167,58
4,L26284A55F56B398BA,F,55.0,高雄市,3.0,1.0,0,2,161,55


## 轉成 Spark Dataframe

In [38]:
df_sd = spark.createDataFrame(df_pd)
df_sd.show(5)

+------------------+------+----+---------------+------------+-----------+---------+-------------+------+------+
|        APC_ID_SAS|GENDER| AGE|CONTACT_CITY_CD|EDUCATION_CD|MARRIAGE_CD|STAFF_IND|LIFE_INSD_CNT|HEIGHT|WEIGHT|
+------------------+------+----+---------------+------------+-----------+---------+-------------+------+------+
|Q2D129E954AD477523|     F|50.0|         台中市|         2.0|        1.0|        0|            4|   158|    54|
|A2AFDA91C172B15A77|     F|35.0|         高雄市|         3.0|        0.0|        0|            2|   162|    48|
|F1D4EF7FFBE44A51FE|     M|33.0|         新北市|         3.0|        0.0|        0|            2|   172|    70|
|T117FCDED3FAE1C1F3|     M|31.0|         屏東縣|         3.0|        0.0|        0|            3|   167|    58|
|L26284A55F56B398BA|     F|55.0|         高雄市|         3.0|        1.0|        0|            2|   161|    55|
+------------------+------+----+---------------+------------+-----------+---------+-------------+------+------+
only sh

# 資料表操作

## 基本訊息

In [39]:
# 檢視基本訊息
df.printSchema()

root
 |-- APC_ID_SAS: string (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- CONTACT_CITY_CD: string (nullable = true)
 |-- EDUCATION_CD: integer (nullable = true)
 |-- MARRIAGE_CD: integer (nullable = true)
 |-- STAFF_IND: integer (nullable = true)
 |-- LIFE_INSD_CNT: integer (nullable = true)
 |-- HEIGHT: integer (nullable = true)
 |-- WEIGHT: integer (nullable = true)



In [41]:
# 欄位清單
df.columns

['APC_ID_SAS',
 'GENDER',
 'AGE',
 'CONTACT_CITY_CD',
 'EDUCATION_CD',
 'MARRIAGE_CD',
 'STAFF_IND',
 'LIFE_INSD_CNT',
 'HEIGHT',
 'WEIGHT']

In [42]:
# 查看資料維度
print(df.count(), len(df.columns))

100000 10


In [40]:
# 各欄位基本訊息
df.describe().show()

+-------+------------------+------+------------------+---------------+------------------+------------------+-------------------+-----------------+-----------------+-----------------+
|summary|        APC_ID_SAS|GENDER|               AGE|CONTACT_CITY_CD|      EDUCATION_CD|       MARRIAGE_CD|          STAFF_IND|    LIFE_INSD_CNT|           HEIGHT|           WEIGHT|
+-------+------------------+------+------------------+---------------+------------------+------------------+-------------------+-----------------+-----------------+-----------------+
|  count|            100000| 99571|             99971|          99885|             80862|             99086|             100000|           100000|           100000|           100000|
|   mean|              null|  null|37.788008522471515|           null|2.2000692537904083|0.4749409603778536|            0.06458|          3.29753|        153.37791|         54.49487|
| stddev|              null|  null|18.661418949622714|           null| 0.898252300077

## 新增欄位

In [43]:
from pyspark.sql import functions as F
# 加一欄都是 0 的 LABEL
df.withColumn("LABEL",F.lit(0)).show()
df.withColumn("LABEL",F.lit(0)).printSchema()

+------------------+------+---+---------------+------------+-----------+---------+-------------+------+------+-----+
|        APC_ID_SAS|GENDER|AGE|CONTACT_CITY_CD|EDUCATION_CD|MARRIAGE_CD|STAFF_IND|LIFE_INSD_CNT|HEIGHT|WEIGHT|LABEL|
+------------------+------+---+---------------+------------+-----------+---------+-------------+------+------+-----+
|Q2D129E954AD477523|     F| 50|         台中市|           2|          1|        0|            4|   158|    54|    0|
|A2AFDA91C172B15A77|     F| 35|         高雄市|           3|          0|        0|            2|   162|    48|    0|
|F1D4EF7FFBE44A51FE|     M| 33|         新北市|           3|          0|        0|            2|   172|    70|    0|
|T117FCDED3FAE1C1F3|     M| 31|         屏東縣|           3|          0|        0|            3|   167|    58|    0|
|L26284A55F56B398BA|     F| 55|         高雄市|           3|          1|        0|            2|   161|    55|    0|
|E1F5DBF55F6B1427DD|     M| 60|         高雄市|           1|          0|        0|

In [44]:
from pyspark.sql import functions as F
import numpy as np
# 加一欄都是 null 的 LABEL
df.withColumn("LABEL",F.lit(None)).show()
df.withColumn("LABEL",F.lit(None)).printSchema()

+------------------+------+---+---------------+------------+-----------+---------+-------------+------+------+-----+
|        APC_ID_SAS|GENDER|AGE|CONTACT_CITY_CD|EDUCATION_CD|MARRIAGE_CD|STAFF_IND|LIFE_INSD_CNT|HEIGHT|WEIGHT|LABEL|
+------------------+------+---+---------------+------------+-----------+---------+-------------+------+------+-----+
|Q2D129E954AD477523|     F| 50|         台中市|           2|          1|        0|            4|   158|    54| null|
|A2AFDA91C172B15A77|     F| 35|         高雄市|           3|          0|        0|            2|   162|    48| null|
|F1D4EF7FFBE44A51FE|     M| 33|         新北市|           3|          0|        0|            2|   172|    70| null|
|T117FCDED3FAE1C1F3|     M| 31|         屏東縣|           3|          0|        0|            3|   167|    58| null|
|L26284A55F56B398BA|     F| 55|         高雄市|           3|          1|        0|            2|   161|    55| null|
|E1F5DBF55F6B1427DD|     M| 60|         高雄市|           1|          0|        0|

## 指定資料型態

In [45]:
# 指定欄位String型態
from pyspark.sql.types import StringType
df.withColumn("LABEL",F.lit(0).cast(StringType())).printSchema()

root
 |-- APC_ID_SAS: string (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- CONTACT_CITY_CD: string (nullable = true)
 |-- EDUCATION_CD: integer (nullable = true)
 |-- MARRIAGE_CD: integer (nullable = true)
 |-- STAFF_IND: integer (nullable = true)
 |-- LIFE_INSD_CNT: integer (nullable = true)
 |-- HEIGHT: integer (nullable = true)
 |-- WEIGHT: integer (nullable = true)
 |-- LABEL: string (nullable = false)



In [46]:
# 改變現有欄位型態
df.withColumn("AGE",df.AGE.cast(StringType())).printSchema()

root
 |-- APC_ID_SAS: string (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- AGE: string (nullable = true)
 |-- CONTACT_CITY_CD: string (nullable = true)
 |-- EDUCATION_CD: integer (nullable = true)
 |-- MARRIAGE_CD: integer (nullable = true)
 |-- STAFF_IND: integer (nullable = true)
 |-- LIFE_INSD_CNT: integer (nullable = true)
 |-- HEIGHT: integer (nullable = true)
 |-- WEIGHT: integer (nullable = true)



## 資料欄位合併

In [47]:
# 建立df_tmp，將df_tmp.AGE併進原本資料表
from pyspark.sql import functions as F
df_tmp = df[['AGE']]
df.withColumn("NEW_AGE",F.concat(df_tmp.AGE)).show()

+------------------+------+---+---------------+------------+-----------+---------+-------------+------+------+-------+
|        APC_ID_SAS|GENDER|AGE|CONTACT_CITY_CD|EDUCATION_CD|MARRIAGE_CD|STAFF_IND|LIFE_INSD_CNT|HEIGHT|WEIGHT|NEW_AGE|
+------------------+------+---+---------------+------------+-----------+---------+-------------+------+------+-------+
|Q2D129E954AD477523|     F| 50|         台中市|           2|          1|        0|            4|   158|    54|     50|
|A2AFDA91C172B15A77|     F| 35|         高雄市|           3|          0|        0|            2|   162|    48|     35|
|F1D4EF7FFBE44A51FE|     M| 33|         新北市|           3|          0|        0|            2|   172|    70|     33|
|T117FCDED3FAE1C1F3|     M| 31|         屏東縣|           3|          0|        0|            3|   167|    58|     31|
|L26284A55F56B398BA|     F| 55|         高雄市|           3|          1|        0|            2|   161|    55|     55|
|E1F5DBF55F6B1427DD|     M| 60|         高雄市|           1|      

## 資料欄位移除

In [48]:
# 移除年齡欄位
df.drop(df.AGE).show()

+------------------+------+---------------+------------+-----------+---------+-------------+------+------+
|        APC_ID_SAS|GENDER|CONTACT_CITY_CD|EDUCATION_CD|MARRIAGE_CD|STAFF_IND|LIFE_INSD_CNT|HEIGHT|WEIGHT|
+------------------+------+---------------+------------+-----------+---------+-------------+------+------+
|Q2D129E954AD477523|     F|         台中市|           2|          1|        0|            4|   158|    54|
|A2AFDA91C172B15A77|     F|         高雄市|           3|          0|        0|            2|   162|    48|
|F1D4EF7FFBE44A51FE|     M|         新北市|           3|          0|        0|            2|   172|    70|
|T117FCDED3FAE1C1F3|     M|         屏東縣|           3|          0|        0|            3|   167|    58|
|L26284A55F56B398BA|     F|         高雄市|           3|          1|        0|            2|   161|    55|
|E1F5DBF55F6B1427DD|     M|         高雄市|           1|          0|        0|            1|   167|    72|
|F20481473EEB92E8D8|     F|         新北市|           3|  

## 合併資料列

In [49]:
# 將兩個df合併疊在在一起
df_tmp = df.union(df)
print(df_tmp.count())
df_tmp.show()

200000
+------------------+------+---+---------------+------------+-----------+---------+-------------+------+------+
|        APC_ID_SAS|GENDER|AGE|CONTACT_CITY_CD|EDUCATION_CD|MARRIAGE_CD|STAFF_IND|LIFE_INSD_CNT|HEIGHT|WEIGHT|
+------------------+------+---+---------------+------------+-----------+---------+-------------+------+------+
|Q2D129E954AD477523|     F| 50|         台中市|           2|          1|        0|            4|   158|    54|
|A2AFDA91C172B15A77|     F| 35|         高雄市|           3|          0|        0|            2|   162|    48|
|F1D4EF7FFBE44A51FE|     M| 33|         新北市|           3|          0|        0|            2|   172|    70|
|T117FCDED3FAE1C1F3|     M| 31|         屏東縣|           3|          0|        0|            3|   167|    58|
|L26284A55F56B398BA|     F| 55|         高雄市|           3|          1|        0|            2|   161|    55|
|E1F5DBF55F6B1427DD|     M| 60|         高雄市|           1|          0|        0|            1|   167|    72|
|F20481473EE

## 資料表串聯
事前準備：先抓取另一張資料表-台灣各縣市衛生所資料 health_center.csv。

In [50]:
# 從外部抓衛生所的測試資料
from pyspark import SparkFiles
url = 'https://raw.githubusercontent.com/chia313339/Spark_practice/master/health_center.csv'
spark.sparkContext.addFile(url)
df2 = spark.read.csv(SparkFiles.get("health_center.csv"), header=True, inferSchema= True)

# 將資料集讀取在memory上，DataFrame跟SparkSQL都要下此語法，如果沒下，資料每次都會重新load
df2.cache()

# 執行這句才會真的運行Spark，將資料存在記憶體
df2.show()

+-------+------+-------+
|CITY_CD|HC_CNT|HCP_CNT|
+-------+------+-------+
| 新北市|    29|    430|
| 台北市|    12|    303|
| 桃園市|    13|    257|
| 台中市|    30|    332|
| 台南市|    37|    342|
| 高雄市|    38|    474|
| 宜蘭縣|    12|    139|
| 新竹縣|    13|    169|
| 苗栗縣|    18|    184|
| 彰化縣|    27|    212|
| 南投縣|    13|    181|
| 雲林縣|    20|    219|
| 嘉義縣|    18|    209|
| 屏東縣|    33|    370|
| 台中縣|    16|    209|
| 花蓮縣|    13|    161|
| 澎湖縣|    11|     87|
| 基隆市|     7|     62|
| 新竹市|     3|     31|
| 嘉義市|     2|     23|
+-------+------+-------+
only showing top 20 rows



In [51]:
# df left join df2
df.join(df2, df.CONTACT_CITY_CD == df2.CITY_CD, how = "left_outer").show()

+------------------+------+---+---------------+------------+-----------+---------+-------------+------+------+-------+------+-------+
|        APC_ID_SAS|GENDER|AGE|CONTACT_CITY_CD|EDUCATION_CD|MARRIAGE_CD|STAFF_IND|LIFE_INSD_CNT|HEIGHT|WEIGHT|CITY_CD|HC_CNT|HCP_CNT|
+------------------+------+---+---------------+------------+-----------+---------+-------------+------+------+-------+------+-------+
|Q2D129E954AD477523|     F| 50|         台中市|           2|          1|        0|            4|   158|    54| 台中市|    30|    332|
|A2AFDA91C172B15A77|     F| 35|         高雄市|           3|          0|        0|            2|   162|    48| 高雄市|    38|    474|
|F1D4EF7FFBE44A51FE|     M| 33|         新北市|           3|          0|        0|            2|   172|    70| 新北市|    29|    430|
|T117FCDED3FAE1C1F3|     M| 31|         屏東縣|           3|          0|        0|            3|   167|    58| 屏東縣|    33|    370|
|L26284A55F56B398BA|     F| 55|         高雄市|           3|          1|        0|       

# 資料串聯練習
建立一張資料表，包含 CUST_360 所有欄位，並使用 CONTACT_CITY_CD 透過 left join 串聯 HEALTH_CENTER 衛生所資料的 CITY_CD 欄位。計算並顯示各縣市衛生所的平均員工數，命名為 HC_MEAN。

In [53]:
# Dataframe
df.join(df2,df.CONTACT_CITY_CD == df2.CITY_CD, how = "left_outer") \
.withColumn("HC_MEAN",df2.HCP_CNT/df2.HC_CNT) \
.drop(*df2.columns).show()

+------------------+------+---+---------------+------------+-----------+---------+-------------+------+------+------------------+
|        APC_ID_SAS|GENDER|AGE|CONTACT_CITY_CD|EDUCATION_CD|MARRIAGE_CD|STAFF_IND|LIFE_INSD_CNT|HEIGHT|WEIGHT|           HC_MEAN|
+------------------+------+---+---------------+------------+-----------+---------+-------------+------+------+------------------+
|Q2D129E954AD477523|     F| 50|         台中市|           2|          1|        0|            4|   158|    54|11.066666666666666|
|A2AFDA91C172B15A77|     F| 35|         高雄市|           3|          0|        0|            2|   162|    48|12.473684210526315|
|F1D4EF7FFBE44A51FE|     M| 33|         新北市|           3|          0|        0|            2|   172|    70|14.827586206896552|
|T117FCDED3FAE1C1F3|     M| 31|         屏東縣|           3|          0|        0|            3|   167|    58|11.212121212121213|
|L26284A55F56B398BA|     F| 55|         高雄市|           3|          1|        0|            2|   161|  

In [57]:
# 先 register table
df2.registerTempTable('health_center')
df2.cache()
# Spark SQL
spark.sql('select APC_ID_SAS, GENDER, AGE, CONTACT_CITY_CD, EDUCATION_CD, MARRIAGE_CD, STAFF_IND, LIFE_INSD_CNT, HEIGHT, WEIGHT, HCP_CNT/HC_CNT as HC_MEAN from cust_360 left join health_center on CONTACT_CITY_CD = CITY_CD').show()

+------------------+------+---+---------------+------------+-----------+---------+-------------+------+------+------------------+
|        APC_ID_SAS|GENDER|AGE|CONTACT_CITY_CD|EDUCATION_CD|MARRIAGE_CD|STAFF_IND|LIFE_INSD_CNT|HEIGHT|WEIGHT|           HC_MEAN|
+------------------+------+---+---------------+------------+-----------+---------+-------------+------+------+------------------+
|Q2D129E954AD477523|     F| 50|         台中市|           2|          1|        0|            4|   158|    54|11.066666666666666|
|A2AFDA91C172B15A77|     F| 35|         高雄市|           3|          0|        0|            2|   162|    48|12.473684210526315|
|F1D4EF7FFBE44A51FE|     M| 33|         新北市|           3|          0|        0|            2|   172|    70|14.827586206896552|
|T117FCDED3FAE1C1F3|     M| 31|         屏東縣|           3|          0|        0|            3|   167|    58|11.212121212121213|
|L26284A55F56B398BA|     F| 55|         高雄市|           3|          1|        0|            2|   161|  

# 寫入Hive Table

In [58]:
# 公司環境
# df.write.mode("append").saveAsTable(“life_user_dm.tmp")

In [60]:
# Colab 練習環境
df.write.mode("append").saveAsTable("tmp")

In [61]:
# 儲存好的 Table 可以用 spark.sql() 直接呼叫
spark.sql('select * from tmp').show()

+------------------+------+---+---------------+------------+-----------+---------+-------------+------+------+
|        APC_ID_SAS|GENDER|AGE|CONTACT_CITY_CD|EDUCATION_CD|MARRIAGE_CD|STAFF_IND|LIFE_INSD_CNT|HEIGHT|WEIGHT|
+------------------+------+---+---------------+------------+-----------+---------+-------------+------+------+
|Q2D129E954AD477523|     F| 50|         台中市|           2|          1|        0|            4|   158|    54|
|A2AFDA91C172B15A77|     F| 35|         高雄市|           3|          0|        0|            2|   162|    48|
|F1D4EF7FFBE44A51FE|     M| 33|         新北市|           3|          0|        0|            2|   172|    70|
|T117FCDED3FAE1C1F3|     M| 31|         屏東縣|           3|          0|        0|            3|   167|    58|
|L26284A55F56B398BA|     F| 55|         高雄市|           3|          1|        0|            2|   161|    55|
|E1F5DBF55F6B1427DD|     M| 60|         高雄市|           1|          0|        0|            1|   167|    72|
|F20481473EEB92E8D8