本案例以2020年美国新冠肺炎疫情数据作为数据集，以Python为编程语言，使用Spark对数据进行分析.本实验主要统计以下8个指标，分别是：

1) 统计美国截止每日的累计确诊人数和累计死亡人数。做法是以date作为分组字段，对cases和deaths字段进行汇总统计。

In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.functions import UserDefinedFunction
from datetime import datetime

In [2]:
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
sc = spark.sparkContext

In [3]:
data = spark.read.csv("/data/shixunfiles/297b8e9689496aed3d0e9145de9b6884_1606802130422.csv",inferSchema="true",header="true")
data.show()

+---------+-----------+----------+-----+------+
|     date|     county|     state|cases|deaths|
+---------+-----------+----------+-----+------+
|2020/1/21|  Snohomish|Washington|    1|     0|
|2020/1/22|  Snohomish|Washington|    1|     0|
|2020/1/23|  Snohomish|Washington|    1|     0|
|2020/1/24|       Cook|  Illinois|    1|     0|
|2020/1/24|  Snohomish|Washington|    1|     0|
|2020/1/25|     Orange|California|    1|     0|
|2020/1/25|       Cook|  Illinois|    1|     0|
|2020/1/25|  Snohomish|Washington|    1|     0|
|2020/1/26|   Maricopa|   Arizona|    1|     0|
|2020/1/26|Los Angeles|California|    1|     0|
|2020/1/26|     Orange|California|    1|     0|
|2020/1/26|       Cook|  Illinois|    1|     0|
|2020/1/26|  Snohomish|Washington|    1|     0|
|2020/1/27|   Maricopa|   Arizona|    1|     0|
|2020/1/27|Los Angeles|California|    1|     0|
|2020/1/27|     Orange|California|    1|     0|
|2020/1/27|       Cook|  Illinois|    1|     0|
|2020/1/27|  Snohomish|Washington|    1|

In [4]:
data.createOrReplaceTempView("date")

In [5]:
data1 = spark.sql("select to_date(date.date,'yyyy/MM/dd') as date, sum(cases) as cases, sum(deaths) as deaths from date group by date order by date desc")

In [6]:
data1.show()

+----------+-------+------+
|      date|  cases|deaths|
+----------+-------+------+
|2020-05-19|1536471| 91936|
|2020-05-18|1515373| 90293|
|2020-05-17|1493597| 89504|
|2020-05-16|1474612| 88660|
|2020-05-15|1450964| 87434|
|2020-05-14|1424847| 85842|
|2020-05-13|1397833| 84104|
|2020-05-12|1376649| 82336|
|2020-05-11|1354356| 80681|
|2020-05-10|1336598| 79692|
|2020-05-09|1316439| 78761|
|2020-05-08|1291529| 77306|
|2020-05-07|1263875| 75733|
|2020-05-06|1235132| 74010|
|2020-05-05|1210627| 71066|
|2020-05-04|1186913| 68832|
|2020-05-03|1164994| 67772|
|2020-05-02|1138961| 66444|
|2020-05-01|1109434| 64859|
|2020-04-30|1075486| 63099|
+----------+-------+------+
only showing top 20 rows



2) 统计美国每日的新增确诊人数和新增死亡人数。因为新增数=今日数-昨日数，所以考虑使用自连接，连接条件是t1.date = t2.date + 1，然后使用t1.totalCases – t2.totalCases计算该日新增。

In [7]:
data1.createOrReplaceTempView("data1")

In [8]:
data2 = spark.sql("select t1.date,t1.cases-t2.cases as caseIncrease,t1.deaths-t2.deaths as deathIncrease from data1 t1,data1 t2 where t1.date = date_add(t2.date,1)")
data2.show()

+----------+------------+-------------+
|      date|caseIncrease|deathIncrease|
+----------+------------+-------------+
|2020-05-19|       21098|         1643|
|2020-05-18|       21776|          789|
|2020-05-17|       18985|          844|
|2020-05-16|       23648|         1226|
|2020-05-15|       26117|         1592|
|2020-05-14|       27014|         1738|
|2020-05-13|       21184|         1768|
|2020-05-12|       22293|         1655|
|2020-05-11|       17758|          989|
|2020-05-10|       20159|          931|
|2020-05-09|       24910|         1455|
|2020-05-08|       27654|         1573|
|2020-05-07|       28743|         1723|
|2020-05-06|       24505|         2944|
|2020-05-05|       23714|         2234|
|2020-05-04|       21919|         1060|
|2020-05-03|       26033|         1328|
|2020-05-02|       29527|         1585|
|2020-05-01|       33948|         1760|
|2020-04-30|       30425|         2211|
+----------+------------+-------------+
only showing top 20 rows



3) 统计截止5.19日，美国各州的累计确诊人数和死亡人数。首先筛选出5.19日的数据，然后以state作为分组字段，对cases和deaths字段进行汇总统计。

In [9]:
allData = spark.sql("select to_date(date.date,'yyyy/MM/dd') as date, county, cases, deaths from date")
allData.show()

+----------+-----------+-----+------+
|      date|     county|cases|deaths|
+----------+-----------+-----+------+
|2020-01-21|  Snohomish|    1|     0|
|2020-01-22|  Snohomish|    1|     0|
|2020-01-23|  Snohomish|    1|     0|
|2020-01-24|       Cook|    1|     0|
|2020-01-24|  Snohomish|    1|     0|
|2020-01-25|     Orange|    1|     0|
|2020-01-25|       Cook|    1|     0|
|2020-01-25|  Snohomish|    1|     0|
|2020-01-26|   Maricopa|    1|     0|
|2020-01-26|Los Angeles|    1|     0|
|2020-01-26|     Orange|    1|     0|
|2020-01-26|       Cook|    1|     0|
|2020-01-26|  Snohomish|    1|     0|
|2020-01-27|   Maricopa|    1|     0|
|2020-01-27|Los Angeles|    1|     0|
|2020-01-27|     Orange|    1|     0|
|2020-01-27|       Cook|    1|     0|
|2020-01-27|  Snohomish|    1|     0|
|2020-01-28|   Maricopa|    1|     0|
|2020-01-28|Los Angeles|    1|     0|
+----------+-----------+-----+------+
only showing top 20 rows



In [10]:
allData.createOrReplaceTempView("allData")

In [11]:
data3 = spark.sql("select date, county, sum(cases) as totalCases, sum(deaths) as totalDeaths from allData where date = '2020-05-19' group by date,county order by totalCases desc")
data3.show()

+----------+-------------+----------+-----------+
|      date|       county|totalCases|totalDeaths|
+----------+-------------+----------+-----------+
|2020-05-19|New York City|    198710|      20376|
|2020-05-19|         Cook|     64722|       2982|
|2020-05-19|      Suffolk|     55152|       2619|
|2020-05-19|  Los Angeles|     39573|       1913|
|2020-05-19|       Nassau|     39361|       2542|
|2020-05-19|    Middlesex|     35297|       2470|
|2020-05-19|  Westchester|     32401|       1424|
|2020-05-19|        Essex|     29513|       2362|
|2020-05-19|        Wayne|     21013|       2363|
|2020-05-19| Philadelphia|     20129|       1109|
|2020-05-19|   Montgomery|     18117|       1158|
|2020-05-19|       Hudson|     17621|       1082|
|2020-05-19|       Bergen|     17522|       1474|
|2020-05-19|       Orange|     16828|        537|
|2020-05-19|        Union|     16233|       1023|
|2020-05-19|   Miami-Dade|     15941|        578|
|2020-05-19|      Passaic|     15371|        842|


4) 统计截止5.19日，美国确诊人数最多的十个州。对3)的结果DataFrame注册临时表，然后按确诊人数降序排列，并取前10个州。

In [12]:
data3.createOrReplaceTempView("mostCases")

In [13]:
Top10 = spark.sql("select * from mostCases order by totalCases desc limit 10 ")
Top10.show()

+----------+-------------+----------+-----------+
|      date|       county|totalCases|totalDeaths|
+----------+-------------+----------+-----------+
|2020-05-19|New York City|    198710|      20376|
|2020-05-19|         Cook|     64722|       2982|
|2020-05-19|      Suffolk|     55152|       2619|
|2020-05-19|  Los Angeles|     39573|       1913|
|2020-05-19|       Nassau|     39361|       2542|
|2020-05-19|    Middlesex|     35297|       2470|
|2020-05-19|  Westchester|     32401|       1424|
|2020-05-19|        Essex|     29513|       2362|
|2020-05-19|        Wayne|     21013|       2363|
|2020-05-19| Philadelphia|     20129|       1109|
+----------+-------------+----------+-----------+



5) 统计截止5.19日，美国死亡人数最多的十个州。对3)的结果DataFrame注册临时表，然后按死亡人数降序排列，并取前10个州。

In [14]:
Top10Deaths = spark.sql("select * from mostCases order by totalDeaths desc limit 10 ")
Top10Deaths.show()

+----------+-------------+----------+-----------+
|      date|       county|totalCases|totalDeaths|
+----------+-------------+----------+-----------+
|2020-05-19|New York City|    198710|      20376|
|2020-05-19|         Cook|     64722|       2982|
|2020-05-19|      Suffolk|     55152|       2619|
|2020-05-19|       Nassau|     39361|       2542|
|2020-05-19|    Middlesex|     35297|       2470|
|2020-05-19|        Wayne|     21013|       2363|
|2020-05-19|        Essex|     29513|       2362|
|2020-05-19|  Los Angeles|     39573|       1913|
|2020-05-19|       Bergen|     17522|       1474|
|2020-05-19|  Westchester|     32401|       1424|
+----------+-------------+----------+-----------+



6) 统计截止5.19日，美国确诊人数最少的十个州。对3)的结果DataFrame注册临时表，然后按确诊人数升序排列，并取前10个州。

In [15]:
Lowest10Cases = spark.sql("select * from mostCases order by totalCases  limit 10 ")
Lowest10Cases.show()

+----------+-----------+----------+-----------+
|      date|     county|totalCases|totalDeaths|
+----------+-----------+----------+-----------+
|2020-05-19|      Wolfe|         1|          0|
|2020-05-19|  Wheatland|         1|          0|
|2020-05-19|   Hemphill|         1|          0|
|2020-05-19|       Coal|         1|          0|
|2020-05-19|   Magoffin|         1|          0|
|2020-05-19|      Avery|         1|          0|
|2020-05-19|     Harney|         1|          0|
|2020-05-19|Piscataquis|         1|          0|
|2020-05-19|    Ziebach|         1|          0|
|2020-05-19|  Glasscock|         1|          0|
+----------+-----------+----------+-----------+



7) 统计截止5.19日，美国死亡人数最少的十个州。对3)的结果DataFrame注册临时表，然后按死亡人数升序排列，并取前10个州

In [16]:
Lowest10Deaths = spark.sql("select * from mostCases order by totalDeaths  limit 10 ")
Lowest10Deaths.show()

+----------+-----------+----------+-----------+
|      date|     county|totalCases|totalDeaths|
+----------+-----------+----------+-----------+
|2020-05-19|    Trimble|         3|          0|
|2020-05-19|     Benzie|         4|          0|
|2020-05-19|    Roberts|        22|          0|
|2020-05-19|   Glascock|         1|          0|
|2020-05-19|      Cooke|        13|          0|
|2020-05-19|     Geneva|        23|          0|
|2020-05-19|    Prowers|        11|          0|
|2020-05-19|   Sheridan|        18|          0|
|2020-05-19|   Maverick|        89|          0|
|2020-05-19|Buena Vista|       117|          0|
+----------+-----------+----------+-----------+



8) 统计截止5.19日，全美和各州的病死率。病死率 = 死亡数/确诊数，对3)的结果DataFrame注册临时表，然后按公式计算。

In [17]:
roundCases = spark.sql("select county, round(totalDeaths/totalCases,3) as Deathrate from mostCases")
roundCases.show()

+-------------+---------+
|       county|Deathrate|
+-------------+---------+
|New York City|    0.103|
|         Cook|    0.046|
|      Suffolk|    0.047|
|  Los Angeles|    0.048|
|       Nassau|    0.065|
|    Middlesex|     0.07|
|  Westchester|    0.044|
|        Essex|     0.08|
|        Wayne|    0.112|
| Philadelphia|    0.055|
|   Montgomery|    0.064|
|       Hudson|    0.061|
|       Bergen|    0.084|
|       Orange|    0.032|
|        Union|    0.063|
|   Miami-Dade|    0.036|
|      Passaic|    0.055|
|    Fairfield|    0.079|
|    Jefferson|    0.059|
|      Unknown|     0.04|
+-------------+---------+
only showing top 20 rows

