##### 任务1：PySpark数据处理
* 步骤1：使用Python链接Spark环境
* 步骤2：创建dateframe数据
* 步骤3：用spark执行以下逻辑：找到数据行数、列数
* 步骤4：用spark筛选class为1的样本
* 步骤5：用spark筛选language >90 或 math> 90的样本

In [3]:
import pandas as pd
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName('pyspark_test') \
    .getOrCreate()

In [2]:
test = spark.createDataFrame([('001','1',100,87,67,83,98), ('002','2',87,81,90,83,83), ('003','3',86,91,83,89,63),
                            ('004','2',65,87,94,73,88), ('005','1',76,62,89,81,98), ('006','3',84,82,85,73,99),
                            ('007','3',56,76,63,72,87), ('008','1',55,62,46,78,71), ('009','2',63,72,87,98,64)],                           ['number','class','language','math','english','physic','chemical'])
test.show()

+------+-----+--------+----+-------+------+--------+
|number|class|language|math|english|physic|chemical|
+------+-----+--------+----+-------+------+--------+
|   001|    1|     100|  87|     67|    83|      98|
|   002|    2|      87|  81|     90|    83|      83|
|   003|    3|      86|  91|     83|    89|      63|
|   004|    2|      65|  87|     94|    73|      88|
|   005|    1|      76|  62|     89|    81|      98|
|   006|    3|      84|  82|     85|    73|      99|
|   007|    3|      56|  76|     63|    72|      87|
|   008|    1|      55|  62|     46|    78|      71|
|   009|    2|      63|  72|     87|    98|      64|
+------+-----+--------+----+-------+------+--------+



In [7]:
print(test.count())
print(len(test.columns))

9
7


In [9]:
test.filter(test['class']==1).show()

+------+-----+--------+----+-------+------+--------+
|number|class|language|math|english|physic|chemical|
+------+-----+--------+----+-------+------+--------+
|   001|    1|     100|  87|     67|    83|      98|
|   005|    1|      76|  62|     89|    81|      98|
|   008|    1|      55|  62|     46|    78|      71|
+------+-----+--------+----+-------+------+--------+



In [11]:
test.where('class==1').show()

+------+-----+--------+----+-------+------+--------+
|number|class|language|math|english|physic|chemical|
+------+-----+--------+----+-------+------+--------+
|   001|    1|     100|  87|     67|    83|      98|
|   005|    1|      76|  62|     89|    81|      98|
|   008|    1|      55|  62|     46|    78|      71|
+------+-----+--------+----+-------+------+--------+



In [12]:
test.where('language>90 or math >90').show()

+------+-----+--------+----+-------+------+--------+
|number|class|language|math|english|physic|chemical|
+------+-----+--------+----+-------+------+--------+
|   001|    1|     100|  87|     67|    83|      98|
|   003|    3|      86|  91|     83|    89|      63|
+------+-----+--------+----+-------+------+--------+



##### 任务2：PySpark数据统计

* 步骤1：读取文件https://cdn.coggle.club/Pokemon.csv
* 步骤2：将读取的进行保存，表头也需要保存
* 步骤3：分析每列的类型，取值个数
* 步骤4：分析每列是否包含缺失值

In [14]:
from pyspark import SparkFiles
spark.sparkContext.addFile("https://cdn.coggle.club/Pokemon.csv")
df = spark.read.csv("file://"+SparkFiles.get("Pokemon.csv"), header=True, inferSchema= True)
df = df.withColumnRenamed('Sp. Atk', 'Sp Atk')
df = df.withColumnRenamed('Sp. Def', 'Sp Def')
df.show(5)

+--------------------+------+------+-----+---+------+-------+------+------+-----+----------+---------+
|                Name|Type 1|Type 2|Total| HP|Attack|Defense|Sp Atk|Sp Def|Speed|Generation|Legendary|
+--------------------+------+------+-----+---+------+-------+------+------+-----+----------+---------+
|           Bulbasaur| Grass|Poison|  318| 45|    49|     49|    65|    65|   45|         1|    false|
|             Ivysaur| Grass|Poison|  405| 60|    62|     63|    80|    80|   60|         1|    false|
|            Venusaur| Grass|Poison|  525| 80|    82|     83|   100|   100|   80|         1|    false|
|VenusaurMega Venu...| Grass|Poison|  625| 80|   100|    123|   122|   120|   80|         1|    false|
|          Charmander|  Fire|  null|  309| 39|    52|     43|    60|    50|   65|         1|    false|
+--------------------+------+------+-----+---+------+-------+------+------+-----+----------+---------+
only showing top 5 rows



In [15]:
# 保存为csv文件
df.write.options(header='True', delimiter=',')\
    .mode('overwrite').csv('.\Pokemon.csv')

In [23]:
df.dtypes

[('Name', 'string'),
 ('Type 1', 'string'),
 ('Type 2', 'string'),
 ('Total', 'int'),
 ('HP', 'int'),
 ('Attack', 'int'),
 ('Defense', 'int'),
 ('Sp Atk', 'int'),
 ('Sp Def', 'int'),
 ('Speed', 'int'),
 ('Generation', 'int'),
 ('Legendary', 'boolean')]

In [24]:
df.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 800 entries, 0 to 799
Data columns (total 12 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   Name        800 non-null    object
 1   Type 1      800 non-null    object
 2   Type 2      414 non-null    object
 3   Total       800 non-null    int32 
 4   HP          800 non-null    int32 
 5   Attack      800 non-null    int32 
 6   Defense     800 non-null    int32 
 7   Sp Atk      800 non-null    int32 
 8   Sp Def      800 non-null    int32 
 9   Speed       800 non-null    int32 
 10  Generation  800 non-null    int32 
 11  Legendary   800 non-null    bool  
dtypes: bool(1), int32(8), object(3)
memory usage: 44.7+ KB


In [25]:
df.toPandas().isnull().any()

Name          False
Type 1        False
Type 2         True
Total         False
HP            False
Attack        False
Defense       False
Sp Atk        False
Sp Def        False
Speed         False
Generation    False
Legendary     False
dtype: bool

##### 任务3：PySpark分组聚合

* 步骤1：读取文件https://cdn.coggle.club/Pokemon.csv
* 步骤2：学习groupby分组聚合的使用
* 步骤3：学习agg分组聚合的使用
* 步骤4：学习transform的使用
* 步骤5：使用groupby、agg、transform，统计数据在Type 1分组下 HP的均值

In [27]:
from pyspark import SparkFiles
spark.sparkContext.addFile("https://cdn.coggle.club/Pokemon.csv")
df = spark.read.csv("file://"+SparkFiles.get("Pokemon.csv"), header=True, inferSchema= True)
df.show(1)

+---------+------+------+-----+---+------+-------+-------+-------+-----+----------+---------+
|     Name|Type 1|Type 2|Total| HP|Attack|Defense|Sp. Atk|Sp. Def|Speed|Generation|Legendary|
+---------+------+------+-----+---+------+-------+-------+-------+-----+----------+---------+
|Bulbasaur| Grass|Poison|  318| 45|    49|     49|     65|     65|   45|         1|    false|
+---------+------+------+-----+---+------+-------+-------+-------+-----+----------+---------+
only showing top 1 row



In [45]:
df.groupby('Type 1').agg({"HP":'avg'}).show()

+--------+-----------------+
|  Type 1|          avg(HP)|
+--------+-----------------+
|   Water|          72.0625|
|  Poison|            67.25|
|   Steel|65.22222222222223|
|    Rock|65.36363636363636|
|     Ice|             72.0|
|   Ghost|          64.4375|
|   Fairy|74.11764705882354|
| Psychic|70.63157894736842|
|  Dragon|          83.3125|
|  Flying|            70.75|
|     Bug|56.88405797101449|
|Electric|59.79545454545455|
|    Fire|69.90384615384616|
|  Ground|         73.78125|
|    Dark|66.80645161290323|
|Fighting|69.85185185185185|
|   Grass|67.27142857142857|
|  Normal|77.27551020408163|
+--------+-----------------+



In [62]:
df.toPandas().groupby('Type 1')['HP'].transform('mean')

0      67.271429
1      67.271429
2      67.271429
3      67.271429
4      69.903846
         ...    
795    65.363636
796    65.363636
797    70.631579
798    70.631579
799    69.903846
Name: HP, Length: 800, dtype: float64

##### 任务4：SparkSQL基础语法

* 步骤1：使用Spark SQL完成任务1里面的数据筛选
* 步骤2：使用Spark SQL完成任务2里面的统计（列可以不统计）
* 步骤3：使用Spark SQL完成任务3的分组统计

In [63]:
test.createOrReplaceTempView("test")

spark.sql("select * from test where class=1").show()
spark.sql("select * from test where language>90 or math>90").show()

+------+-----+--------+----+-------+------+--------+
|number|class|language|math|english|physic|chemical|
+------+-----+--------+----+-------+------+--------+
|   001|    1|     100|  87|     67|    83|      98|
|   005|    1|      76|  62|     89|    81|      98|
|   008|    1|      55|  62|     46|    78|      71|
+------+-----+--------+----+-------+------+--------+

+------+-----+--------+----+-------+------+--------+
|number|class|language|math|english|physic|chemical|
+------+-----+--------+----+-------+------+--------+
|   001|    1|     100|  87|     67|    83|      98|
|   003|    3|      86|  91|     83|    89|      63|
+------+-----+--------+----+-------+------+--------+



In [65]:
df.createOrReplaceTempView("df")

spark.sql("DESCRIBE df").show()

+----------+---------+-------+
|  col_name|data_type|comment|
+----------+---------+-------+
|      Name|   string|   null|
|    Type 1|   string|   null|
|    Type 2|   string|   null|
|     Total|      int|   null|
|        HP|      int|   null|
|    Attack|      int|   null|
|   Defense|      int|   null|
|   Sp. Atk|      int|   null|
|   Sp. Def|      int|   null|
|     Speed|      int|   null|
|Generation|      int|   null|
| Legendary|  boolean|   null|
+----------+---------+-------+



In [75]:
df.createOrReplaceTempView("df_1")

spark.sql("select `Type 1`, avg(HP) from df_1 group by `Type 1`").show()

+--------+-----------------+
|  Type 1|          avg(HP)|
+--------+-----------------+
|   Water|          72.0625|
|  Poison|            67.25|
|   Steel|65.22222222222223|
|    Rock|65.36363636363636|
|     Ice|             72.0|
|   Ghost|          64.4375|
|   Fairy|74.11764705882354|
| Psychic|70.63157894736842|
|  Dragon|          83.3125|
|  Flying|            70.75|
|     Bug|56.88405797101449|
|Electric|59.79545454545455|
|    Fire|69.90384615384616|
|  Ground|         73.78125|
|    Dark|66.80645161290323|
|Fighting|69.85185185185185|
|   Grass|67.27142857142857|
|  Normal|77.27551020408163|
+--------+-----------------+



##### 任务5：SparkML基础：数据编码

* 步骤1：学习Spark ML中数据编码模块
>* https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html#feature
>* https://spark.apache.org/docs/latest/ml-features.html
* 步骤2：读取文件Pokemon.csv，理解数据字段含义
* 步骤3：将其中的类别属性使用onehotencoder
* 步骤4：对其中的数值属性字段使用minmaxscaler
* 步骤5：对编码后的属性使用pca进行降维（维度可以自己选择）