In [None]:
#установка
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
!tar xf spark-3.3.1-bin-hadoop3.tgz
!pip install -q findspark

In [None]:
#установка переменных окружения
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

#создание спарк сессии
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
#подвязка кегля
!rm -rf /root/.kaggle/
!mkdir /root/.kaggle
!touch /root/.kaggle/kaggle.json

!echo '{"username":"***","key":"***"}' >> /root/.kaggle/kaggle.json
!chmod 600 ~/.kaggle/kaggle.json


In [None]:
#скачиваем датасет
!kaggle datasets download -d yasirabdaali/corona-virus-covid19-us-counties
!unzip -q corona-virus-covid19-us-counties.zip

Downloading corona-virus-covid19-us-counties.zip to /content
 97% 73.0M/75.3M [00:04<00:00, 24.2MB/s]
100% 75.3M/75.3M [00:04<00:00, 16.8MB/s]


In [None]:
data = spark.read.option("delimiter", ";").csv(
    'coronavirus-covid-19-pandemic-usa-counties.csv',
        inferSchema=True, header=True, )


In [None]:
data.show(5)

+-----------------+--------------+------------------------------------------+-------------------+-----------+---------------+--------------------+
|Admin 2 FIPS Code|Province/State|Admin 2 Level (City/County/Borough/Region)|               Date|Total Death|Total Confirmed|            location|
+-----------------+--------------+------------------------------------------+-------------------+-----------+---------------+--------------------+
|            12119|       Florida|                                    Sumter|2020-01-31 00:00:00|          0|              0|28.70181754,-82.0...|
|            13153|       Georgia|                                   Houston|2020-01-31 00:00:00|          0|              0|32.45802497,-83.6...|
|            13227|       Georgia|                                   Pickens|2020-01-31 00:00:00|          0|              0|34.46589159,-84.4...|
|            13303|       Georgia|                                Washington|2020-01-31 00:00:00|          0|         

In [None]:
# в data группировать элементы по group с максимумом по maxim, asc = true - по возрастанию
def groupedMax (data, group, maxcol, asc = True):
  groupedData = data.groupBy(group).max(maxcol)
  orderstr = 'max(' + maxcol + ')'
  return groupedData.orderBy(orderstr, ascending = asc)


In [None]:
# Общая смертность по штатам

# max
deathGroupedStateDesc = groupedMax (data, 'Province/State', 'Total Death', False)
deathGroupedStateDesc.show()
# min
deathGroupedState =  groupedMax (data, 'Province/State', 'Total Death')
deathGroupedState.show()


+--------------+----------------+
|Province/State|max(Total Death)|
+--------------+----------------+
|    California|           32991|
|       Arizona|           17726|
|      Illinois|           14678|
|      New York|           13217|
|       Florida|           11438|
|         Texas|           11203|
|        Nevada|            8878|
|      Michigan|            8267|
|  Pennsylvania|            5235|
| Massachusetts|            4220|
|          Ohio|            3888|
|    New Jersey|            3662|
|     Tennessee|            3445|
|      Missouri|            3434|
|   Connecticut|            3232|
|    Washington|            3065|
|       Indiana|            3063|
|     Minnesota|            2671|
|  Rhode Island|            2590|
|      Maryland|            2587|
+--------------+----------------+
only showing top 20 rows

+--------------------+----------------+
|      Province/State|max(Total Death)|
+--------------------+----------------+
|    Diamond Princess|               0

In [None]:
# Общее количество выявленных случаев по штатам

# max
confirmedGroupedStateDesc = groupedMax (data, 'Province/State', 'Total Confirmed', False)
confirmedGroupedStateDesc.show()
# min
confirmedGroupedState =  groupedMax (data, 'Province/State', 'Total Confirmed')
confirmedGroupedState.show()

+--------------+--------------------+
|Province/State|max(Total Confirmed)|
+--------------+--------------------+
|    California|             3371673|
|       Florida|             1423397|
|       Arizona|             1406462|
|      Illinois|             1365920|
|         Texas|             1183040|
|      New York|              832986|
|        Nevada|              634733|
|    Washington|              502247|
|      Michigan|              467424|
| Massachusetts|              395054|
|          Utah|              384005|
|North Carolina|              354785|
|  Pennsylvania|              354168|
|     Minnesota|              350741|
|          Ohio|              330677|
|     Wisconsin|              308249|
|    New Jersey|              279188|
|     Tennessee|              265511|
|      Kentucky|              259472|
|      Missouri|              258895|
+--------------+--------------------+
only showing top 20 rows

+--------------------+--------------------+
|      Province/St

In [None]:
# rdd total
def createColRDD (data, colName):
  total = data.rdd.map(lambda x: x[colName])
  return total.reduce(lambda x, y: x + y)

In [None]:
# общее количество смертей и случаев заражения, процент смертности по всей стране
totalDeath = createColRDD (deathGroupedState, 'max(Total Death)')
print( 'Total Death: ', totalDeath)

totalConfirmed = createColRDD (confirmedGroupedState, 'max(Total Confirmed)')
print('Total Comfirmed Cases: ', totalConfirmed)

mortalityRate = totalDeath / totalConfirmed
print('Mortality Rate: %.3f' % mortalityRate)

Total Death:  201821
Total Comfirmed Cases:  19590737
Mortality Rate: 0.010


In [None]:
from pyspark.sql.types import *
from pyspark.sql import Window
from pyspark.sql import functions as F

In [None]:
# Проценст смертности и случаев заражения по штатам
mortalityPercentState = deathGroupedStateDesc.withColumn('Mortality Percent', F.col('max(Total Death)') / totalDeath )
mortalityPercentState.show()

сonfirmedCasesPercent = confirmedGroupedStateDesc.withColumn('Confirmed Сases Percent', F.col('max(Total Confirmed)') / totalConfirmed )
сonfirmedCasesPercent.show()

+--------------+----------------+--------------------+
|Province/State|max(Total Death)|   Mortality Percent|
+--------------+----------------+--------------------+
|    California|           32991| 0.16346663627670063|
|       Arizona|           17726| 0.08783030507231655|
|      Illinois|           14678| 0.07272781326026528|
|      New York|           13217|  0.0654887251574415|
|       Florida|           11438|0.056673983381313145|
|         Texas|           11203|  0.0555095852265126|
|        Nevada|            8878| 0.04398947582263491|
|      Michigan|            8267|  0.0409620406201535|
|  Pennsylvania|            5235|0.025938826980343967|
| Massachusetts|            4220| 0.02090961792875865|
|          Ohio|            3888|0.019264595854742567|
|    New Jersey|            3662|0.018144791671828007|
|     Tennessee|            3445| 0.01706958146079942|
|      Missouri|            3434|0.017015077717383226|
|   Connecticut|            3232| 0.01601419079283127|
|    Washi

In [None]:
# процент смертности по штатам
mortalityRateState = deathGroupedStateDesc.join(confirmedGroupedStateDesc, 'Province/State')
mortalityRateState = mortalityRateState.withColumn('Mortality Rate', F.col('max(Total Death)') / F.col('max(Total Confirmed)'))
mortalityRateState.orderBy('Mortality Rate', ascending = False).show()


+--------------+----------------+--------------------+--------------------+
|Province/State|max(Total Death)|max(Total Confirmed)|      Mortality Rate|
+--------------+----------------+--------------------+--------------------+
|Grand Princess|               3|                 103| 0.02912621359223301|
|      Michigan|            8267|              467424| 0.01768629766550284|
|      New York|           13217|              832986|0.015867013371173105|
|  Pennsylvania|            5235|              354168|0.014781120824015722|
| West Virginia|             771|               53925|0.014297635605006954|
|        Nevada|            8878|              634733|0.013986983503299813|
|   Connecticut|            3232|              237555|0.013605270358443307|
|   Mississippi|             812|               61010|0.013309293558433044|
|      Missouri|            3434|              258895|0.013264064582166515|
|    New Jersey|            3662|              279188| 0.01311660959640099|
|     Tennes

In [None]:
def deltaInDay (data, colName):
  # создание rdd ключ: (Штат, Дата), общая смертность/выявленность
  dataStateDate = data.rdd.map(lambda x: ((x['Province/State'], x['Date']), (x[colName])))

  # считаем общую смертность/выявленность в день по штатам
  datata = dataStateDate.reduceByKey(lambda x,y: x + y)

  sorted = datata.sortBy(lambda x: x[0])
  dfSchema = schema = StructType([
    StructField('StateDate', StructType([
        StructField('State', StringType(), True),
        StructField('Date', DateType(), True)])),
    StructField(colName, LongType(), True)
  ])

  df = sorted.toDF(dfSchema)

  w = Window.partitionBy(F.col('StateDate.State')).orderBy(F.col('StateDate.Date'))


  cond = F.col('StateDate.State') == F.lag(F.col('StateDate.State'), 1).over(w)
  df1 = df.withColumn("prev",F.when(cond ,F.lag(F.col(colName), 1).over(w))) \
      .withColumn("delta", F.col(colName) - F.col("prev"))
  return df1

In [None]:
# функция поиска случаев за день по штатам

def totalInDay (data, colName):

  df1 = deltaInDay (data, colName)

  dfSchema1 = schema = StructType([
    StructField('State', StringType(), True),
    StructField('Date', DateType(), True),
    StructField('max(delta) ', LongType(), True)
  ])

  maxD = groupedMax(df1, 'StateDate', 'delta', False)

  maxDelta = maxD.withColumn('State', F.col('StateDate.State'))
  maxDelta = maxDelta.withColumn('Date', F.col('StateDate.Date'))

  maxDelta = maxDelta.drop('StateDate')

  dop = maxDelta.groupBy('State').max('max(delta)')

  res = maxDelta.join(dop, 'State')

  res = res.select('State', 'Date', 'max(delta)').filter( (F.col('max(max(delta))') == F.col('max(delta)')) & (F.col('max(delta)') != 0) ).sort('max(delta)', ascending=False)

  return res


In [None]:
# максимальное число смертей за день по штатам
resDeath = totalInDay(data, 'Total Death')
resDeath.show()

# максимальное число выявленных случаев за день по штатам
resConfirmed = totalInDay(data, 'Total Confirmed')
resConfirmed.show()

+--------------+----------+----------+
|         State|      Date|max(delta)|
+--------------+----------+----------+
|       Florida|2022-01-12|     25531|
|      New York|2020-05-18|      4448|
|      Missouri|2021-12-02|      2441|
|     Tennessee|2021-12-22|      2079|
|    New Jersey|2020-06-25|      1796|
|   Puerto Rico|2021-07-23|      1786|
|      Oklahoma|2021-04-07|      1716|
|         Texas|2021-10-13|      1175|
|North Carolina|2022-05-04|      1172|
|    California|2021-02-24|      1165|
|      Nebraska|2022-04-01|      1150|
|       Indiana|2022-04-01|       979|
| Massachusetts|2020-04-25|       764|
|      Michigan|2022-01-19|       566|
|      Maryland|2021-05-27|       549|
|       Georgia|2020-11-03|       479|
|          Iowa|2020-12-11|       469|
|       Arizona|2022-03-16|       457|
|      Kentucky|2021-03-18|       448|
|      Virginia|2022-02-07|       404|
+--------------+----------+----------+
only showing top 20 rows

+--------------+----------+----------+

In [None]:
import pyspark.sql.functions as sqlF

# общий показатель colName за год
def totalInYear (dataYear, colName):

  w = Window.orderBy('year')

  df1 = dataYear.withColumn('prev', F.lag(F.col(colName)).over(w))

  df1 = df1.withColumn('Count', F.when(F.col('prev').isNull(), F.col(colName)).otherwise(F.col(colName) - F.col('prev')))

  df1 = df1.drop('prev')

  return df1


In [None]:
# общая смертность за год
dataYear = data.withColumn('year', sqlF.year(F.col('Date')))
dataYearDeath = dataYear.groupBy('year', 'Province/State').max('Total Death').withColumnRenamed('max(Total Death)', 'Total Death')
dataYearDeath1 = dataYearDeath.groupBy('year').sum('Total Death').withColumnRenamed('sum(Total Death)', 'year Total Death')
dataYearDeath1 = totalInYear (dataYearDeath1, 'year Total Death')

dataYearDeath1.show()

# общее количество выявленных случаев за год
dataYearConfirmed = dataYear.groupBy('year', 'Province/State').max('Total Confirmed').withColumnRenamed('max(Total Confirmed)', 'Total Confirmed')
dataYearConfirmed1 = dataYearConfirmed.groupBy('year').sum('Total Confirmed').withColumnRenamed('sum(Total Confirmed)', 'year Total Confirmed')
dataYearConfirmed1 = totalInYear (dataYearConfirmed1, 'year Total Confirmed')

dataYearConfirmed1.show()


+----+----------------+-----+
|year|year Total Death|Count|
+----+----------------+-----+
|2020|           77407|77407|
|2021|          159255|81848|
|2022|          201230|41975|
+----+----------------+-----+

+----+--------------------+-------+
|year|year Total Confirmed|  Count|
+----+--------------------+-------+
|2020|             4444999|4444999|
|2021|            11113602|6668603|
|2022|            19590737|8477135|
+----+--------------------+-------+



In [None]:
# общий показатель colName за год по штатам
def totalInYearColumn (dataYear, colName):

  w = Window.orderBy(F.col('Province/State'), F.col('year'))

  cond = ((F.col('Province/State') == F.lag(F.col('Province/State'), 1).over(w)))

  df1 = dataYear.withColumn('prev', F.when( cond , F.lag(F.col(colName)).over(w)))

  df1 = df1.withColumn('Count', F.when(F.col('prev').isNull(), F.col(colName)).otherwise(F.col(colName) - F.col('prev')))

  df1 = df1.drop('prev')

  return df1


In [None]:
# общая смертность за каждый год в каждом штате
dataYearDeathState = totalInYearColumn(dataYearDeath, 'Total Death')

dataYearDeathState.show()

# общее число выявленных случаев за каждый год в каждом штате

dataYearConfirmedState = totalInYearColumn(dataYearConfirmed, 'Total Confirmed')

dataYearConfirmedState.show()

+----+--------------+-----------+-----+
|year|Province/State|Total Death|Count|
+----+--------------+-----------+-----+
|2020|       Alabama|        697|  697|
|2021|       Alabama|       2024| 1327|
|2022|       Alabama|       2409|  385|
|2020|        Alaska|        120|  120|
|2021|        Alaska|        427|  307|
|2022|        Alaska|        561|  134|
|2020|American Samoa|          0|    0|
|2021|American Samoa|          0|    0|
|2022|American Samoa|         33|   33|
|2020|       Arizona|       5110| 5110|
|2021|       Arizona|      13675| 8565|
|2022|       Arizona|      17726| 4051|
|2020|      Arkansas|        390|  390|
|2021|      Arkansas|        966|  576|
|2022|      Arkansas|       1221|  255|
|2020|    California|      10359|10359|
|2021|    California|      27637|17278|
|2022|    California|      32991| 5354|
|2020|      Colorado|        667|  667|
|2021|      Colorado|       1404|  737|
+----+--------------+-----------+-----+
only showing top 20 rows

+----+--------